feat(common): 添加无锁并发工具包依赖

新增 lockfree-1.1.3 工具包到 go.work 文件中,为项目提供无锁并发数据结构支持,
提升高并发场景下的性能表现。
This commit is contained in:
2025-12-05 00:36:28 +08:00
parent 43813932c9
commit 269256a861
40 changed files with 2900 additions and 0 deletions

View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -0,0 +1,370 @@
## Lockfree
> 如果想使用低于go1.18版本则可以引入tag:1.0.9或branchbelow-version1.18
> 通过条件编译支持386平台但性能测试发现比较差因此不建议使用
### 1. 简介
#### 1.1. 为什么要写Lockfree
在go语言中一般都是使用chan作为消息传递的队列但在实际高并发的环境下使用发现chan存在严重的性能问题其直接表现就是将对象放入到chan中时会特别耗时
即使chan的容量尚未打满在严重时甚至会产生几百ms还无法放入到chan中的情况
![放大.jpg](images%2F%E6%94%BE%E5%A4%A7.jpg)
#### 1.2. chan基本原理
##### 1.2.1. chan基本结构
chan关键字在编译阶段会被编译为runtime.hchan结构它的结构如下所示
![chan结构.jpg](images%2Fchan%E7%BB%93%E6%9E%84.jpg)
其中sudog是一个比较常见的结构是对goroutine的一个封装它的主要结构
![sudog.jpg](images%2Fsudog.jpg)
##### 1.2.2. chan写入流程
![write.jpg](images%2Fwrite.jpg)
##### 1.2.3. chan读取流程
![read.jpg](images%2Fread.jpg)
#### 1.3.
##### 1.3.1. runtime.mutex
chan结构中包含了一个lock字段`lock mutex`
这个lock看名字就知道是一个锁当然它不是我们业务中经常使用的sync.Mutex而是一个`runtime.mutex`
这个锁是一个互斥锁在linux系统中它的实现是futex在没有竞争的情况下会退化成为一个自旋操作速度非常快但是当竞争比较大时它就会在内核中休眠
futex的基本原理如下图
![futex.jpg](images%2Ffutex.jpg)
当竞争非常大时对于chan而言其整体大部分时间是出于系统调用上所以性能下降非常明显
##### 1.3.2. sync.Mutex
`sync.Mutex`包中的设计原理如下图
![.jpg](images%2F%E9%94%81.jpg)
### 2. Lockfree基本原理
#### 2.1. 模块及流程
> 在最新的设计中已经删除了available模块转而使用ringBuffer中的对象e中的c游标标识写入状态
![lockfree.jpg](images%2Flockfree.jpg)
#### 2.2. 优化点
##### 1) 无锁实现
内部所有操作都是通过原子变量(atomic)来操作唯一有可能使用锁的地方是提供给用户在RingBuffer为空时的等待策略用户可选择使用chan阻塞
##### 2) 单一消费协程
屏蔽掉消费端读操作竞争带来的性能损耗
##### 3) 写不等待原则
符合写入快的初衷当无法写入时会持续通过自旋和任务调度的方式处理一方面尽量加快写入效率另一方面则是防止占用太多CPU资源
##### 4) 泛型加速
引入泛型泛型与interface有很明显的区别泛型是在编译阶段确定类型这样可有效降低在运行时进行类型转换的耗时
##### 5) 一次性内存分配
环状结构Ringbuffer实现对象的传递通过确定大小的切片实现只需要分配一次内存不会涉及扩容等操作可有效提高处理的性能
##### 6) 运算加速
RingBuffer的容量为2的n次方通过与运算来代替取余运算提高性能
##### 7) 并行位图
用原子位运算实现位图并行操作在尽量不影响性能的条件下降低内存消耗
并行位图的思路实现历程
![bitmap.jpg](images%2Fbitmap.jpg)
##### 8) 缓存行填充
根据CPU高速缓存的特点通过填充缓存行方式屏蔽掉伪共享问题
缓存行填充应该是一个比较常见的问题它的本质是因为CPU比较快而内存比较慢所以增加了高速缓存来处理
![padding1.jpg](images%2Fpadding1.jpg)
在两个Core共享同一个L3的情况下如果同时进行修改就会出现竞争关系会涉及到缓存一致性协议MESI
![padding2.jpg](images%2Fpadding2.jpg)
在Lockfree中有两个地方用到了填充
![padding3.jpg](images%2Fpadding3.jpg)
最新版本中只保留了cursor中的填充在e中使用了游标
##### 9) Pointer替代切片
屏蔽掉切片操作必须要进行越界判断的逻辑生成更高效机器码
![pointer.jpg](images%2Fpointer.jpg)
#### 2.3. 核心模块
##### ringBuffer
具体对象的存放区域通过数组定长切片实现环状数据结构其中的数据对象是具体的结构体而非指针这样可以一次性进行内存申请
##### stateDescriptor
> 最新的版本已将该对象删除通过ringBuffer中e中的游标来描述状态这样更充分利用了内存降低了消耗
状态描述符定义了对应位置的数据状态是可读还是可写提供了三种方式
+ 1) 基于Uint32的切片每个Uint32值描述一个位置性能最高但内存消耗最大
+ 2) 基于Bitmap每个bit描述一个位置性能最低但内存消耗最小
+ 3) 基于Uint8的切片每个Uint8值描述一个位置性能适中消耗也适中最推荐的方式
##### sequencer
序号产生器维护读和写两个状态写状态具体由内部游标cursor维护读取状态由自身维护一个uint64变量维护它的核心方法是next()用于获取下个可以写入的游标
##### Producer
生产者核心方法是Write通过调用Write方法可以将对象写入到队列中支持多个g并发操作保证加入时处理的效率
##### consumer
消费者这个消费者只会有一个g操作这样处理的好处是可以不涉及并发操作其内部不会涉及到任何锁对于实际的并发操作由该g进行分配
##### blockStrategy
阻塞策略该策略用于buf中长时间没有数据时消费者阻塞设计它有两个方法block()和release()前者用于消费者阻塞后者用于释放
系统提供了多种方式不同的方式CPU资源占用和性能会有差别
+ 1) SchedBlockStrategy调用runtime.Gosched()进行调度block不需要release为推荐方式
+ 2) SleepBlockStrategy调用time.Sleep(x)进行block可自定义休眠时间不需要release为推荐方式
+ 3) ProcYieldBlockStrategy调用CPU空跑指令可自定义空跑的指令数量不需要release
+ 4) OSYieldBlockStrategy操作系统会将对应M调度出去等时间片重新分配后可执行不需要release
+ 5) ChanBlockStrategychan阻塞策略需要release为推荐方式
+ 6) CanditionBlockStrategycandition阻塞策略需要release为推荐方式
其中1/2/5/6为推荐方式如果性能要求比较高则优先考虑2和1否则建议试用5和6
##### EventHandler
事件处理器接口整个项目中唯一需要用户实现的接口该接口描述消费端收到消息时该如何处理它使用泛型通过编译阶段确定事件类型提高性能
### 3. 使用方式
#### 3.1. 导入模块
可使用 `go get github.com/bruceshao/lockfree` 获取最新版本
#### 3.2. 代码调用
为了提升性能Lockfree支持go版本1.18及以上以便于支持泛型Lockfree使用非常简单
```go
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/bruceshao/lockfree"
)
var (
goSize = 10000
sizePerGo = 10000
total = goSize * sizePerGo
)
func main() {
// lockfree计时
now := time.Now()
// 创建事件处理器
handler := &eventHandler[uint64]{
signal: make(chan struct{}, 0),
now: now,
}
// 创建消费端串行处理的Lockfree
lf := lockfree.NewLockfree[uint64](
1024*1024,
handler,
lockfree.NewSleepBlockStrategy(time.Millisecond),
)
// 启动Lockfree
if err := lf.Start(); err != nil {
panic(err)
}
// 获取生产者对象
producer := lf.Producer()
// 并发写入
var wg sync.WaitGroup
wg.Add(goSize)
for i := 0; i < goSize; i++ {
go func(start int) {
for j := 0; j < sizePerGo; j++ {
err := producer.Write(uint64(start*sizePerGo + j + 1))
if err != nil {
panic(err)
}
}
wg.Done()
}(i)
}
// wait for producer
wg.Wait()
fmt.Printf("producer has been writed, write count: %v, time cost: %v \n", total, time.Since(now).String())
// wait for consumer
handler.wait()
// 关闭Lockfree
lf.Close()
}
type eventHandler[T uint64] struct {
signal chan struct{}
gcounter uint64
now time.Time
}
func (h *eventHandler[T]) OnEvent(v uint64) {
cur := atomic.AddUint64(&h.gcounter, 1)
if cur == uint64(total) {
fmt.Printf("eventHandler has been consumed already, read count: %v, time cose: %v\n", total, time.Since(h.now))
close(h.signal)
return
}
if cur%10000000 == 0 {
fmt.Printf("eventHandler consume %v\n", cur)
}
}
func (h *eventHandler[T]) wait() {
<-h.signal
}
```
### 4. 性能对比
#### 4.1. 简述
在实际测试中发现如果lockfree和chan同时跑的话会有一些影响lockfree的表现基本是正常的和chan同时跑的时候性能基本是下降的
但chan比较奇怪和lockfree一起跑的时候性能比chan自身跑性能还高目前正在排查此问题但不影响使用
main包下提供了测试的程序可自行进行性能测试假设编译后的二进制为lockfree
+ 1单独测试lockfree./lockfree lockfree [time]加入time会有时间分布
+ 2单独测试chan./lockfree chan [time]加入time会有时间分布
+ 3合并测试lockfree和chan./lockfree [all] [time]使用time时前面必须加all参数只进行测试不关注时间分布的话可直接调用./lockfree
为描述性能除了时间外定义了**QR**Quick Ratio快速率的指标该指标描述的是写入时间在1微秒以内的操作占所有操作的比值
自然的QR越大性能越高
#### 4.2. 软硬件测试环境
CPU信息如下(4 * 2.5GHz)
```shell
[root@VM]# lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 4
On-line CPU(s) list: 0-3
Thread(s) per core: 1
Core(s) per socket: 4
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 94
Model name: Intel(R) Xeon(R) Gold 6133 CPU @ 2.50GHz
Stepping: 3
CPU MHz: 2494.120
BogoMIPS: 4988.24
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 4096K
L3 cache: 28160K
NUMA node0 CPU(s): 0-3
```
内存信息8G
```shell
[root@VM]# free -m
total used free shared buff/cache available
Mem: 7779 405 6800 116 573 7216
Swap: 0 0 0
```
操作系统centos 7.2
```shell
[root@VM]# cat /etc/centos-release
CentOS Linux release 7.2 (Final)
[root@VM]# uname -a
Linux VM-219-157-centos 3.10.107-1-tlinux2_kvm_guest-0056 #1 SMP Wed Dec 29 14:35:09 CST 2021 x86_64 x86_64 x86_64 GNU/Linux
```
云厂商腾讯云
#### 4.3. 写入性能对比
设定buffer大小为1024 * 1024无论是lockfree还是chan都是如此设置其写入的时间对比如下
其中 100 * 10000表示有100个goroutine每个goroutine写入10000次其他的依次类推
alllockfree/chan表示在lockfree和chan同时跑的情况下其分别的时间占比情况
| 类型 | 100 * 10000 | 500 * 10000 | 1000 * 10000 | 5000 * 10000 | 10000 * 10000 |
|---------------|-------------|-------------|--------------|--------------|---------------|
| lockfree | 67ms | 306ms | 676ms | 3779ms | 7703ms |
| chan | 116ms | 1991ms | 4709ms | 26897ms | 58509ms |
| alllockfree | 49ms | 414ms | 976ms | 5038ms | 10946ms |
| allchan | 83ms | 859ms | 3029ms | 19228ms | 40473ms |
#### 4.4. QR分布
快速率的分布情况如下所示
| 类型 | 100 * 10000 | 500 * 10000 | 1000 * 10000 | 5000 * 10000 | 10000 * 10000 |
|---------------|-------------|-------------|--------------|--------------|---------------|
| lockfree | 99.23 | 99.78 | 99.81 | 99.49 | 98.99 |
| chan | 91.67 | 88.99 | 57.79 | 3.98 | 1.6 |
| alllockfree | 99.69 | 99.88 | 99.88 | 99.52 | 99.02 |
| allchan | 96.72 | 93.5 | 93.1 | 51.37 | 48.2 |
#### 4.5. 结果
从上面两张表可以很明显看出如下几点
+ 1在goroutine数量比较小时lockfree和chan性能差别不明显
+ 2当goroutine打到一定数量大于1000lockfree无论从时间还是QR都远远超过chan

View File

@@ -0,0 +1,160 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"runtime"
"sync"
"sync/atomic"
"time"
)
// blockStrategy 阻塞策略
type blockStrategy interface {
// block 阻塞
block(actual *uint64, expected uint64)
// release 释放阻塞
release()
}
// SchedBlockStrategy 调度等待策略
// 调用runtime.Gosched()方法使当前 g 主动让出 cpu 资源。
type SchedBlockStrategy struct {
}
func (s *SchedBlockStrategy) block(actual *uint64, expected uint64) {
runtime.Gosched()
}
func (s *SchedBlockStrategy) release() {
}
// SleepBlockStrategy 休眠等待策略
// 调用 Sleep 方法使当前 g 主动让出 cpu 资源。
// sleep poll 参考值:
// 轮询时长为 10us 时cpu 开销约 2-3% 左右。
// 轮询时长为 5us 时cpu 开销约在 10% 左右。
// 轮询时长小于 5us 时cpu 开销接近 100% 满载。
type SleepBlockStrategy struct {
t time.Duration
}
func NewSleepBlockStrategy(wait time.Duration) *SleepBlockStrategy {
return &SleepBlockStrategy{
t: wait,
}
}
func (s *SleepBlockStrategy) block(actual *uint64, expected uint64) {
time.Sleep(s.t)
}
func (s *SleepBlockStrategy) release() {
}
// ProcYieldBlockStrategy CPU空指令策略
type ProcYieldBlockStrategy struct {
cycle uint32
}
func NewProcYieldBlockStrategy(cycle uint32) *ProcYieldBlockStrategy {
return &ProcYieldBlockStrategy{
cycle: cycle,
}
}
func (s *ProcYieldBlockStrategy) block(actual *uint64, expected uint64) {
procyield(s.cycle)
}
func (s *ProcYieldBlockStrategy) release() {
}
// OSYieldBlockStrategy 操作系统调度策略
type OSYieldBlockStrategy struct {
}
func NewOSYieldWaitStrategy() *OSYieldBlockStrategy {
return &OSYieldBlockStrategy{}
}
func (s *OSYieldBlockStrategy) block(actual *uint64, expected uint64) {
osyield()
}
func (s *OSYieldBlockStrategy) release() {
}
// ChanBlockStrategy chan阻塞策略
type ChanBlockStrategy struct {
bc chan struct{}
b uint32
}
func NewChanBlockStrategy() *ChanBlockStrategy {
return &ChanBlockStrategy{
bc: make(chan struct{}),
}
}
func (s *ChanBlockStrategy) block(actual *uint64, expected uint64) {
// 0未阻塞1阻塞
if atomic.CompareAndSwapUint32(&s.b, 0, 1) {
// 设置成功的话,表示阻塞,需要进行二次判断
if atomic.LoadUint64(actual) == expected {
// 表示阻塞失败,因为结果是一致的,此处需要重新将状态调整回来
if atomic.CompareAndSwapUint32(&s.b, 1, 0) {
// 表示回调成功,直接退出即可
return
} else {
// 表示有其他协程release了则读取对应chan即可
<-s.bc
}
} else {
// 如果说结果不一致,则表示阻塞,等待被释放即可
<-s.bc
}
}
// 没有设置成功,不用关注
}
func (s *ChanBlockStrategy) release() {
if atomic.CompareAndSwapUint32(&s.b, 1, 0) {
// 表示可以释放即chan是等待状态
s.bc <- struct{}{}
}
// 无法设置则不用关心
return
}
// ConditionBlockStrategy condition 阻塞策略
type ConditionBlockStrategy struct {
cond *sync.Cond
}
func NewConditionBlockStrategy() *ConditionBlockStrategy {
return &ConditionBlockStrategy{
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (s *ConditionBlockStrategy) block(actual *uint64, expected uint64) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
if atomic.LoadUint64(actual) == expected {
return
}
s.cond.Wait()
}
func (s *ConditionBlockStrategy) release() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
s.cond.Broadcast()
}

View File

@@ -0,0 +1,57 @@
//go:build !386
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import "sync/atomic"
type e[T any] struct {
c uint64
val T
}
// ringBuffer 具体对象的存放区域,通过数组(定长切片)实现环状数据结构
// 其中e为具体对象非指针这样可以一次性进行内存申请
type ringBuffer[T any] struct {
// 增加默认的对象以便于return处理data race问题
tDefault T
buf []e[T]
capMask uint64
}
func newRingBuffer[T any](cap int) *ringBuffer[T] {
x := ringBuffer[T]{
capMask: uint64(cap) - 1,
buf: make([]e[T], cap),
}
return &x
}
func (r *ringBuffer[T]) write(c uint64, v T) {
x := &r.buf[c&r.capMask]
x.val = v
atomic.StoreUint64(&x.c, c+1)
}
func (r *ringBuffer[T]) element(c uint64) e[T] {
return r.buf[c&r.capMask]
}
func (r *ringBuffer[T]) contains(c uint64) (T, *uint64, bool) {
x := &r.buf[c&r.capMask]
if atomic.LoadUint64(&x.c) == c+1 {
v := x.val
return v, &x.c, true
}
return r.tDefault, &x.c, false
}
func (r *ringBuffer[T]) cap() uint64 {
return r.capMask + 1
}

View File

@@ -0,0 +1,64 @@
//go:build 386
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"sync"
)
type e[T any] struct {
c uint64
val T
}
// ringBuffer 具体对象的存放区域,通过数组(定长切片)实现环状数据结构
// 其中e为具体对象非指针这样可以一次性进行内存申请
type ringBuffer[T any] struct {
sync.RWMutex
// 增加默认的对象以便于return处理data race问题
tDefault T
buf []e[T]
capMask uint64
}
func newRingBuffer[T any](cap int) *ringBuffer[T] {
x := ringBuffer[T]{
capMask: uint64(cap) - 1,
buf: make([]e[T], cap),
}
return &x
}
func (r *ringBuffer[T]) write(c uint64, v T) {
x := &r.buf[c&r.capMask]
r.Lock()
defer r.Unlock()
x.val = v
x.c = c + 1
}
func (r *ringBuffer[T]) element(c uint64) e[T] {
return r.buf[c&r.capMask]
}
func (r *ringBuffer[T]) contains(c uint64) (T, *uint64, bool) {
x := &r.buf[c&r.capMask]
r.RLock()
defer r.RUnlock()
if x.c == c+1 {
v := x.val
return v, &x.c, true
}
return r.tDefault, &x.c, false
}
func (r *ringBuffer[T]) cap() uint64 {
return r.capMask + 1
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"reflect"
"testing"
"time"
"unsafe"
)
func TestA(t *testing.T) {
sli := make([]uint8, 8)
sh := *(*reflect.SliceHeader)(unsafe.Pointer(&sli))
mem := unsafe.Pointer(sh.Data)
Sets(mem)
fmt.Printf("%v\n", sli[0])
fmt.Printf("%v\n", sli[1])
fmt.Printf("%v\n", sli[2])
fmt.Printf("%v\n", sli[3])
fmt.Printf("%v\n", sli[4])
fmt.Printf("%v\n", sli[5])
fmt.Printf("%v\n", sli[6])
fmt.Printf("%v\n", sli[7])
}
func Sets(mem unsafe.Pointer) {
for i := 0; i < 8; i++ {
*(*uint8)(unsafe.Pointer(uintptr(mem) + uintptr(i))) = 1
}
}
func TestX(t *testing.T) {
loop := 1000000
l := 1024 * 1024
bytes := make([]byte, l)
ts := time.Now()
for i := 0; i < loop; i++ {
bytes[i] = 0x01
}
tl := time.Since(ts)
fmt.Println(tl.Microseconds())
}
func TestBuffer(t *testing.T) {
buf := newRingBuffer[uint64](1024)
buf.write(0, 1)
x := buf.element(0)
fmt.Println(x)
buf.write(1023, 2)
x = buf.element(1023)
fmt.Println(x)
buf.write(1024, 3)
x = buf.element(1024)
fmt.Println(x)
}
func TestBufferAlign32(t *testing.T) {
buf := newRingBuffer[uint32](1024)
buf.write(0, 1)
x := buf.element(0)
fmt.Println(x)
buf.write(1023, 2)
x = buf.element(1023)
fmt.Println(x)
buf.write(1024, 3)
x = buf.element(1024)
fmt.Println(x)
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
func BenchmarkChan(b *testing.B) {
var (
length = 1024 * 1024
goSize = GoSize
numPerGo = SchPerGo
counter = uint64(0)
wg sync.WaitGroup
)
ch := make(chan uint64, length)
// 消费端
go func() {
var ts time.Time
var count int32
for {
<-ch
atomic.AddInt32(&count, 1)
if count == 1 {
ts = time.Now()
}
//if x%10000000 == 0 {
// fmt.Printf("read %d\n", x)
//}
if count == int32(goSize*numPerGo) {
tl := time.Since(ts)
fmt.Printf("read time = %d ms\n", tl.Milliseconds())
}
}
}()
wg.Add(b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
go func() {
for j := 0; j < numPerGo; j++ {
x := atomic.AddUint64(&counter, 1)
ch <- x
}
wg.Done()
}()
}
wg.Wait()
}
func TestChan1(t *testing.T) {
var (
t1_10us = uint64(0) // 1-10微秒
t10_100us = uint64(0) // 10-100微秒
t100_1000us = uint64(0) // 100-1000微秒
t1_10ms = uint64(0) // 1-10毫秒
t10_100ms = uint64(0) // 10-100毫秒
t100_ms = uint64(0) // 大于100毫秒
)
var (
length = 1024 * 1024
goSize = GoSize
numPerGo = SchPerGo
counter = uint64(0)
slower = uint64(0)
wg sync.WaitGroup
)
ch := make(chan uint64, length)
// 消费端
go func() {
var ts time.Time
var count int32
for {
x := <-ch
atomic.AddInt32(&count, 1)
if count == 1 {
ts = time.Now()
}
if x%1000000 == 0 {
fmt.Printf("read %d\n", x)
}
if count == int32(goSize*numPerGo) {
tl := time.Since(ts)
fmt.Printf("read time = %d ms\n", tl.Milliseconds())
}
}
}()
wg.Add(goSize)
totalS := time.Now()
for i := 0; i < goSize; i++ {
go func() {
for j := 0; j < numPerGo; j++ {
x := atomic.AddUint64(&counter, 1)
ts := time.Now()
ch <- x
tl := time.Since(ts)
ms := tl.Microseconds()
if ms > 1 {
atomic.AddUint64(&slower, 1)
if ms < 10 { // t1_10us
atomic.AddUint64(&t1_10us, 1)
} else if ms < 100 {
atomic.AddUint64(&t10_100us, 1)
} else if ms < 1000 {
atomic.AddUint64(&t100_1000us, 1)
} else if ms < 10000 {
atomic.AddUint64(&t1_10ms, 1)
} else if ms < 100000 {
atomic.AddUint64(&t10_100ms, 1)
} else {
atomic.AddUint64(&t100_ms, 1)
}
}
}
wg.Done()
}()
}
wg.Wait()
totalL := time.Since(totalS)
fmt.Printf("write total time = [%d ms]\n", totalL.Milliseconds())
time.Sleep(time.Second * 3)
fmt.Printf("slow ratio = %.2f \n", float64(slower)*100.0/float64(counter))
fmt.Printf("quick ratio = %.2f \n", float64(goSize*numPerGo-int(slower))*100.0/float64(goSize*numPerGo))
fmt.Printf("[<1us][%d] \n", counter-slower)
fmt.Printf("[1-10us][%d] \n", t1_10us)
fmt.Printf("[10-100us][%d] \n", t10_100us)
fmt.Printf("[100-1000us][%d] \n", t100_1000us)
fmt.Printf("[1-10ms][%d] \n", t1_10ms)
fmt.Printf("[10-100ms][%d] \n", t10_100ms)
fmt.Printf("[>100ms][%d] \n", t100_ms)
}

View File

@@ -0,0 +1,90 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"runtime"
"sync/atomic"
)
// consumer 消费者这个消费者只会有一个g操作这样处理的好处是可以不涉及并发操作其内部不会涉及到任何锁
// 对于实际的并发操作由该g进行分配
type consumer[T any] struct {
status int32 // 运行状态
rbuf *ringBuffer[T]
seqer *sequencer
blocks blockStrategy
hdl EventHandler[T]
}
func newConsumer[T any](rbuf *ringBuffer[T], hdl EventHandler[T], sequer *sequencer, blocks blockStrategy) *consumer[T] {
return &consumer[T]{
rbuf: rbuf,
seqer: sequer,
hdl: hdl,
blocks: blocks,
status: READY,
}
}
func (c *consumer[T]) start() error {
if atomic.CompareAndSwapInt32(&c.status, READY, RUNNING) {
go c.handle()
return nil
}
return fmt.Errorf(StartErrorFormat, "Consumer")
}
func (c *consumer[T]) handle() {
// 判断是否可以获取到
rc := c.seqer.nextRead()
for {
if c.closed() {
return
}
var i = 0
for {
if c.closed() {
return
}
// 看下读取位置的seq是否OK
if v, p, exist := c.rbuf.contains(rc - 1); exist {
rc = c.seqer.readIncrement()
c.hdl.OnEvent(v)
i = 0
break
} else {
if i < spin {
procyield(30)
} else if i < spin+passiveSpin {
runtime.Gosched()
} else {
c.blocks.block(p, rc)
i = 0
}
i++
}
}
}
}
func (c *consumer[T]) close() error {
if atomic.CompareAndSwapInt32(&c.status, RUNNING, READY) {
// 防止阻塞无法释放
c.blocks.release()
return nil
}
return fmt.Errorf(CloseErrorFormat, "Consumer")
}
// closed 判断是否已关闭
// 将直接判断调整为原子操作解决data race问题
func (c *consumer[T]) closed() bool {
return atomic.LoadInt32(&c.status) == READY
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import "sync/atomic"
// cursor 游标一直持续增长的一个uint64序列
// 该序列用于wgWrite Goroutine获取对应写入到buffer中元素的位置操作
// 通过使用atomic操作避免锁提高性能
// 通过使用padding填充的方式填充前面和后面各使用7个uint64缓存行填充避免伪共享问题
type cursor struct {
p1, p2, p3, p4, p5, p6, p7 uint64
v uint64
p9, p10, p11, p12, p13, p14, p15 uint64
}
func newCursor() *cursor {
return &cursor{}
}
func (c *cursor) increment() uint64 {
return atomic.AddUint64(&c.v, 1)
}
func (c *cursor) atomicLoad() uint64 {
return atomic.LoadUint64(&c.v)
}
func (c *cursor) load() uint64 {
return c.v
}
func (c *cursor) store(expectVal, newVal uint64) bool {
return atomic.CompareAndSwapUint64(&c.v, expectVal, newVal)
}

View File

@@ -0,0 +1,113 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestCursor(t *testing.T) {
c := newCursor()
ts := time.Now()
for i := 0; i < 100000000; i++ {
x := c.increment()
if x%1000000 == 0 {
fmt.Println(x)
}
}
tl := time.Since(ts)
fmt.Printf("time = %v\n", tl)
}
func TestCursor2(t *testing.T) {
c := newCursor()
var wg sync.WaitGroup
wg.Add(10000)
ts := time.Now()
for i := 0; i < 10000; i++ {
go func() {
for j := 0; j < 10000; j++ {
x := c.increment()
if x%10000000 == 0 {
fmt.Println(x)
}
}
wg.Done()
}()
}
wg.Wait()
fmt.Println(time.Since(ts))
}
func TestCursor3(t *testing.T) {
var c uint64
var wg sync.WaitGroup
wg.Add(10000)
ts := time.Now()
for i := 0; i < 10000; i++ {
go func() {
for j := 0; j < 10000; j++ {
x := atomic.AddUint64(&c, 1)
if x%10000000 == 0 {
fmt.Println(x)
}
}
wg.Done()
}()
}
wg.Wait()
fmt.Println(time.Since(ts))
}
type NoPad struct {
a uint64
b uint64
c uint64
}
func (np *NoPad) Increase() {
atomic.AddUint64(&np.a, 1)
atomic.AddUint64(&np.b, 1)
atomic.AddUint64(&np.c, 1)
}
type Pad struct {
a uint64
_p1 [8]uint64
b uint64
_p2 [8]uint64
c uint64
_p3 [8]uint64
}
func (p *Pad) Increase() {
atomic.AddUint64(&p.a, 1)
atomic.AddUint64(&p.b, 1)
atomic.AddUint64(&p.c, 1)
}
func BenchmarkPad_Increase(b *testing.B) {
pad := &Pad{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
pad.Increase()
}
})
}
func BenchmarkNoPad_Increase(b *testing.B) {
nopad := &NoPad{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
nopad.Increase()
}
})
}

View File

@@ -0,0 +1,130 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package main
import (
"fmt"
"github.com/bruceshao/lockfree"
"math/rand"
"sync"
"sync/atomic"
"time"
)
func main() {
fmt.Println("========== start write by discard ==========")
writeByDiscard()
fmt.Println("========== complete write by discard ==========")
fmt.Println("========== start write by cursor ==========")
writeByCursor()
fmt.Println("========== complete write by cursor ==========")
}
func writeByDiscard() {
var counter = uint64(0)
// 写入超时,如何使用
eh := &fixedSleepEventHandler[uint64]{
sm: time.Millisecond * 10,
}
disruptor := lockfree.NewLockfree[uint64](2, eh, lockfree.NewSleepBlockStrategy(time.Microsecond))
disruptor.Start()
producer := disruptor.Producer()
// 假设有100个写g
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := 0; j < 10; j++ {
v := atomic.AddUint64(&counter, 1)
ww := producer.WriteWindow()
if ww <= 0 {
// 表示无法写入,丢弃
fmt.Println("discard ", v)
continue
}
// 表示可以写入,写入即可
err := producer.Write(v)
if err != nil {
panic(err)
}
fmt.Println("write ", v)
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(3 * time.Second)
disruptor.Close()
}
func writeByCursor() {
var counter = uint64(0)
// 写入超时,如何使用
eh := &randomSleepEventHandler[uint64]{}
disruptor := lockfree.NewLockfree[uint64](2, eh, lockfree.NewSleepBlockStrategy(time.Microsecond))
disruptor.Start()
producer := disruptor.Producer()
// 假设有100个写g
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := 0; j < 10; j++ {
v := atomic.AddUint64(&counter, 1)
wc, exist, err := producer.WriteTimeout(v, time.Millisecond)
if err != nil {
return
}
if !exist {
// 重复写入1次写入不成功则丢弃重新写其他的
if ok, _ := producer.WriteByCursor(v, wc); ok {
continue
}
fmt.Println("discard ", v)
// 重新生成值,一直等待写入
v = atomic.AddUint64(&counter, 1)
for {
if ok, _ := producer.WriteByCursor(v, wc); ok {
fmt.Println("write ", v, " with x times")
break
}
// 写入不成功则休眠防止CPU暴增
time.Sleep(100 * time.Microsecond)
}
} else {
fmt.Println("write ", v, " with 1 time")
}
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(3 * time.Second)
disruptor.Close()
}
type fixedSleepEventHandler[T uint64] struct {
sm time.Duration
}
func (h *fixedSleepEventHandler[T]) OnEvent(v uint64) {
time.Sleep(h.sm)
fmt.Println("consumer ", v)
}
type randomSleepEventHandler[T uint64] struct {
count int32
}
func (h *randomSleepEventHandler[T]) OnEvent(v uint64) {
// 每次处理都会进行随机休眠,可以导致消费端变慢
intn := rand.Intn(1000)
time.Sleep(time.Duration(intn * 1000))
fmt.Println("consumer count ", atomic.AddInt32(&h.count, 1))
}

View File

@@ -0,0 +1,250 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package main
import (
"fmt"
"github.com/bruceshao/lockfree"
"os"
"runtime/pprof"
"sync"
"sync/atomic"
"time"
)
var (
goSize = 10000
sizePerGo = 10000
cap = 1024 * 1024
timeAnalyse = false
)
func main() {
f, _ := os.OpenFile("cpu.pprof", os.O_CREATE|os.O_RDWR, 0644)
defer f.Close()
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
// 启动命令: .exe [all/lockfree/chan] [time]
arg := "all"
if len(os.Args) > 1 {
arg = os.Args[1]
}
if len(os.Args) > 2 {
timeAnalyse = true
}
if arg == "all" {
fmt.Println("start lockfree and channel test")
// 表示全部启动
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
lockfreeMain()
}()
go func() {
defer wg.Done()
chanMain()
}()
wg.Wait()
} else if arg == "chan" {
fmt.Println("start channel test")
// 表示启动chan
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
chanMain()
}()
wg.Wait()
} else if arg == "lockfree" {
fmt.Println("start lockfree test")
// 表示启动lockfree
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
lockfreeMain()
}()
wg.Wait()
}
fmt.Println("all queue is over")
}
func lockfreeMain() {
var (
t1_10us = uint64(0) // 1-10微秒
t10_100us = uint64(0) // 10-100微秒
t100_1000us = uint64(0) // 100-1000微秒
t1_10ms = uint64(0) // 1-10毫秒
t10_100ms = uint64(0) // 10-100毫秒
t100_ms = uint64(0) // 大于100毫秒
slower = uint64(0)
)
// 创建事件处理器
eh := &longEventHandler[uint64]{}
// 创建消费端串行处理的Lockfree
lf := lockfree.NewLockfree[uint64](cap, eh, lockfree.NewSleepBlockStrategy(time.Millisecond))
// 启动Lockfree
if err := lf.Start(); err != nil {
panic(err)
}
// lockfree计时
ts := time.Now()
// 获取生产者对象
producer := lf.Producer()
var wg sync.WaitGroup
wg.Add(goSize)
for i := 0; i < goSize; i++ {
go func(start int) {
if timeAnalyse {
for j := 0; j < sizePerGo; j++ {
//写入数据
tb := time.Now()
err := producer.Write(uint64(start*sizePerGo + j + 1))
tl := time.Since(tb)
ms := tl.Microseconds()
if ms > 1 {
atomic.AddUint64(&slower, 1)
if ms < 10 { // t1_10us
atomic.AddUint64(&t1_10us, 1)
} else if ms < 100 {
atomic.AddUint64(&t10_100us, 1)
} else if ms < 1000 {
atomic.AddUint64(&t100_1000us, 1)
} else if ms < 10000 {
atomic.AddUint64(&t1_10ms, 1)
} else if ms < 100000 {
atomic.AddUint64(&t10_100ms, 1)
} else {
atomic.AddUint64(&t100_ms, 1)
}
}
if err != nil {
panic(err)
}
}
} else {
for j := 0; j < sizePerGo; j++ {
err := producer.Write(uint64(start*sizePerGo + j + 1))
if err != nil {
panic(err)
}
}
}
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("===== lockfree[", time.Now().Sub(ts), "] =====")
fmt.Println("----- lockfree write complete -----")
time.Sleep(3 * time.Second)
// 关闭Lockfree
lf.Close()
if timeAnalyse {
fmt.Printf("lockfree slow ratio = %.2f \n", float64(slower)*100.0/float64(goSize*sizePerGo))
fmt.Printf("lockfree quick ratio = %.2f \n", float64(goSize*sizePerGo-int(slower))*100.0/float64(goSize*sizePerGo))
fmt.Printf("lockfree [<1us][%d] \n", uint64(goSize*sizePerGo)-slower)
fmt.Printf("lockfree [1-10us][%d] \n", t1_10us)
fmt.Printf("lockfree [10-100us][%d] \n", t10_100us)
fmt.Printf("lockfree [100-1000us][%d] \n", t100_1000us)
fmt.Printf("lockfree [1-10ms][%d] \n", t1_10ms)
fmt.Printf("lockfree [10-100ms][%d] \n", t10_100ms)
fmt.Printf("lockfree [>100ms][%d] \n", t100_ms)
}
}
func chanMain() {
var (
t1_10us = uint64(0) // 1-10微秒
t10_100us = uint64(0) // 10-100微秒
t100_1000us = uint64(0) // 100-1000微秒
t1_10ms = uint64(0) // 1-10毫秒
t10_100ms = uint64(0) // 10-100毫秒
t100_ms = uint64(0) // 大于100毫秒
slower = uint64(0)
)
c := make(chan uint64, cap)
// 启动监听协程
go func() {
for {
x, ok := <-c
if !ok {
return
}
if x%10000000 == 0 {
fmt.Println("chan [", x, "]")
}
}
}()
// lockfree计时
ts := time.Now()
// 开始写入
var wg sync.WaitGroup
wg.Add(goSize)
for i := 0; i < goSize; i++ {
go func(start int) {
if timeAnalyse {
for j := 0; j < sizePerGo; j++ {
//写入数据
tb := time.Now()
c <- uint64(start*sizePerGo + j + 1)
tl := time.Since(tb)
ms := tl.Microseconds()
if ms > 1 {
atomic.AddUint64(&slower, 1)
if ms < 10 { // t1_10us
atomic.AddUint64(&t1_10us, 1)
} else if ms < 100 {
atomic.AddUint64(&t10_100us, 1)
} else if ms < 1000 {
atomic.AddUint64(&t100_1000us, 1)
} else if ms < 10000 {
atomic.AddUint64(&t1_10ms, 1)
} else if ms < 100000 {
atomic.AddUint64(&t10_100ms, 1)
} else {
atomic.AddUint64(&t100_ms, 1)
}
}
}
} else {
for j := 0; j < sizePerGo; j++ {
//写入数据
c <- uint64(start*sizePerGo + j + 1)
}
}
wg.Done()
}(i)
}
wg.Wait()
fmt.Println("=====channel[", time.Now().Sub(ts), "]=====")
fmt.Println("----- channel write complete -----")
time.Sleep(3 * time.Second)
// 关闭chan
close(c)
if timeAnalyse {
fmt.Printf("channel slow ratio = %.2f \n", float64(slower)*100.0/float64(goSize*sizePerGo))
fmt.Printf("channel quick ratio = %.2f \n", float64(goSize*sizePerGo-int(slower))*100.0/float64(goSize*sizePerGo))
fmt.Printf("channel [<1us][%d] \n", uint64(goSize*sizePerGo)-slower)
fmt.Printf("channel [1-10us][%d] \n", t1_10us)
fmt.Printf("channel [10-100us][%d] \n", t10_100us)
fmt.Printf("channel [100-1000us][%d] \n", t100_1000us)
fmt.Printf("channel [1-10ms][%d] \n", t1_10ms)
fmt.Printf("channel [10-100ms][%d] \n", t10_100ms)
fmt.Printf("channel [>100ms][%d] \n", t100_ms)
}
}
type longEventHandler[T uint64] struct {
}
func (h *longEventHandler[T]) OnEvent(v uint64) {
if v%10000000 == 0 {
fmt.Println("lockfree [", v, "]")
}
}

View File

@@ -0,0 +1,99 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/bruceshao/lockfree"
)
var (
goSize = 10000
sizePerGo = 10000
total = goSize * sizePerGo
)
func main() {
// lockfree计时
now := time.Now()
// 创建事件处理器
handler := &eventHandler[uint64]{
signal: make(chan struct{}, 0),
now: now,
}
// 创建消费端串行处理的Lockfree
lf := lockfree.NewLockfree[uint64](
1024*1024,
handler,
lockfree.NewSleepBlockStrategy(time.Millisecond),
)
// 启动Lockfree
if err := lf.Start(); err != nil {
panic(err)
}
// 获取生产者对象
producer := lf.Producer()
// 并发写入
var wg sync.WaitGroup
wg.Add(goSize)
for i := 0; i < goSize; i++ {
go func(start int) {
for j := 0; j < sizePerGo; j++ {
err := producer.Write(uint64(start*sizePerGo + j + 1))
if err != nil {
panic(err)
}
}
wg.Done()
}(i)
}
// wait for producer
wg.Wait()
fmt.Printf("producer has been writed, write count: %v, time cost: %v \n", total, time.Since(now).String())
// wait for consumer
handler.wait()
// 关闭Lockfree
lf.Close()
}
type eventHandler[T uint64] struct {
signal chan struct{}
gcounter uint64
now time.Time
}
func (h *eventHandler[T]) OnEvent(v uint64) {
cur := atomic.AddUint64(&h.gcounter, 1)
if cur == uint64(total) {
fmt.Printf("eventHandler has been consumed already, read count: %v, time cose: %v\n", total, time.Since(h.now))
close(h.signal)
return
}
if cur%10000000 == 0 {
fmt.Printf("eventHandler consume %v\n", cur)
}
}
func (h *eventHandler[T]) wait() {
<-h.signal
}

View File

@@ -0,0 +1,10 @@
module github.com/bruceshao/lockfree
go 1.18
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.8.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -0,0 +1,24 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M=
github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,16 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
// EventHandler 事件处理器接口
// 整个无锁队列中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理
// 使用泛型,通过编译阶段确定事件类型,提高性能
type EventHandler[T any] interface {
// OnEvent 用户侧实现,事件处理方法
OnEvent(t T)
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 970 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 529 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 272 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 354 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 781 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 574 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 542 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 776 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 338 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 773 KiB

View File

@@ -0,0 +1,85 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"sync/atomic"
)
// Lockfree 包装类,内部包装了生产者和消费者
type Lockfree[T any] struct {
writer *Producer[T]
consumer *consumer[T]
status int32
}
// NewLockfree 自定义创建消费端的Disruptor
// capacitybuffer的容量大小类似于chan的大小但要求必须是2^n即2的指数倍如果不是的话会被修改
// handler消费端的事件处理器
// blocks读取阻塞时的处理策略
func NewLockfree[T any](capacity int, handler EventHandler[T], blocks blockStrategy) *Lockfree[T] {
// 重新计算正确的容量
capacity = minSuitableCap(capacity)
seqer := newSequencer(capacity)
rbuf := newRingBuffer[T](capacity)
cmer := newConsumer[T](rbuf, handler, seqer, blocks)
writer := newProducer[T](seqer, rbuf, blocks)
return &Lockfree[T]{
writer: writer,
consumer: cmer,
status: READY,
}
}
func (d *Lockfree[T]) Start() error {
if atomic.CompareAndSwapInt32(&d.status, READY, RUNNING) {
// 启动消费者
if err := d.consumer.start(); err != nil {
// 恢复现场
atomic.CompareAndSwapInt32(&d.status, RUNNING, READY)
return err
}
// 启动生产者
if err := d.writer.start(); err != nil {
// 恢复现场
atomic.CompareAndSwapInt32(&d.status, RUNNING, READY)
return err
}
return nil
}
return fmt.Errorf(StartErrorFormat, "Disruptor")
}
func (d *Lockfree[T]) Producer() *Producer[T] {
return d.writer
}
func (d *Lockfree[T]) Running() bool {
return d.status == RUNNING
}
func (d *Lockfree[T]) Close() error {
if atomic.CompareAndSwapInt32(&d.status, RUNNING, READY) {
// 关闭生产者
if err := d.writer.close(); err != nil {
// 恢复现场
atomic.CompareAndSwapInt32(&d.status, READY, RUNNING)
return err
}
// 关闭消费者
if err := d.consumer.close(); err != nil {
// 恢复现场
atomic.CompareAndSwapInt32(&d.status, READY, RUNNING)
return err
}
// 关闭成功
return nil
}
return fmt.Errorf(CloseErrorFormat, "Disruptor")
}

View File

@@ -0,0 +1,104 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func BenchmarkLockFree(b *testing.B) {
var (
counter = uint64(0)
)
eh := &longEventHandler[uint64]{}
disruptor := NewLockfree[uint64](1024*1024, eh, &SleepBlockStrategy{
t: time.Microsecond,
})
disruptor.Start()
producer := disruptor.Producer()
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
go func() {
for j := 0; j < SchPerGo; j++ {
x := atomic.AddUint64(&counter, 1)
err := producer.Write(x)
if err != nil {
panic(err)
}
}
wg.Done()
}()
}
wg.Wait()
b.StopTimer()
time.Sleep(time.Second * 1)
disruptor.Close()
}
func TestChanBlockStrategy(t *testing.T) {
var (
counter = uint64(0)
goS = 1000
perGo = 100
)
eh := &longEventHandler[uint64]{}
disruptor := NewLockfree[uint64](2, eh, NewChanBlockStrategy())
disruptor.Start()
producer := disruptor.Producer()
var wg sync.WaitGroup
wg.Add(goS)
for i := 0; i < goS; i++ {
go func() {
for j := 0; j < perGo; j++ {
x := atomic.AddUint64(&counter, 1)
err := producer.Write(x)
if err != nil {
panic(err)
}
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(time.Second * 1)
disruptor.Close()
}
func TestCondBlockStrategy(t *testing.T) {
var (
counter = uint64(0)
goS = 1000
perGo = 100
)
eh := &longEventHandler[uint64]{}
disruptor := NewLockfree[uint64](2, eh, NewConditionBlockStrategy())
disruptor.Start()
producer := disruptor.Producer()
var wg sync.WaitGroup
wg.Add(goS)
for i := 0; i < goS; i++ {
go func() {
for j := 0; j < perGo; j++ {
x := atomic.AddUint64(&counter, 1)
err := producer.Write(x)
if err != nil {
panic(err)
}
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(time.Second * 1)
disruptor.Close()
}

View File

@@ -0,0 +1,338 @@
package lockfree
import (
"runtime"
"sync"
"testing"
)
type foo struct {
x, y, z int64
}
type foo64Start struct {
_ [64]byte
x, y, z int64
}
type foo64StartEnd struct {
_ [64]byte
x, y, z int64
_ [64]byte
}
type foo128Start struct {
_ [128]byte
x, y, z int64
}
type foo128StartEnd struct {
_ [128]byte
x, y, z int64
_ [128]byte
}
type foo64StartEndAligned struct {
_ [64]byte
x, y, z int64
_ [64 - 24]byte
}
type foo128StartEndAligned struct {
_ [128]byte
x, y, z int64
_ [128 - 24]byte
}
const iter = (1 << 16)
func BenchmarkFalseSharing(b *testing.B) {
var wg sync.WaitGroup
b.Run("NoPad", func(b *testing.B) {
arr := make([]foo, runtime.GOMAXPROCS(0))
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arr {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[i].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run("Pad64Start", func(b *testing.B) {
arr := make([]foo64Start, runtime.GOMAXPROCS(0))
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arr {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[i].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run("Pad64StartEnd", func(b *testing.B) {
arr := make([]foo64StartEnd, runtime.GOMAXPROCS(0))
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arr {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[i].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run("Pad128Start", func(b *testing.B) {
arr := make([]foo128Start, runtime.GOMAXPROCS(0))
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arr {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[i].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run("Pad128StartEnd", func(b *testing.B) {
arr := make([]foo128StartEnd, runtime.GOMAXPROCS(0))
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arr {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[i].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run("Pad64StartEndAligned", func(b *testing.B) {
arr := make([]foo64StartEndAligned, runtime.GOMAXPROCS(0))
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arr {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[i].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run("Pad128StartEndAligned", func(b *testing.B) {
arr := make([]foo128StartEndAligned, runtime.GOMAXPROCS(0))
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arr {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[i].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
}
func BenchmarkTrueSharing(b *testing.B) {
var wg sync.WaitGroup
b.Run("<64", func(b *testing.B) {
arr := make([]foo, runtime.GOMAXPROCS(0)*iter)
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arrChan {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[(i*iter)+j].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run(">64", func(b *testing.B) {
arr := make([]foo64Start, runtime.GOMAXPROCS(0)*iter)
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arrChan {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[(i*iter)+j].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
b.Run(">128", func(b *testing.B) {
arr := make([]foo128Start, runtime.GOMAXPROCS(0)*iter)
arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0))
for i := range arrChan {
arrChan[i] = make(chan struct{})
}
for i := range arrChan {
go func(i int) {
for range arrChan[i] {
for j := 0; j < iter; j++ {
arr[(i*iter)+j].x++
}
wg.Done()
}
}(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(runtime.GOMAXPROCS(0))
for j := range arrChan {
arrChan[j] <- struct{}{}
}
wg.Wait()
}
b.StopTimer()
for i := range arrChan {
close(arrChan[i])
}
})
}

View File

@@ -0,0 +1,159 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"runtime"
"sync/atomic"
"time"
)
// Producer 生产者
// 核心方法是Write通过调用Write方法可以将对象写入到队列中
type Producer[T any] struct {
seqer *sequencer
rbuf *ringBuffer[T]
blocks blockStrategy
capacity uint64
status int32
}
func newProducer[T any](seqer *sequencer, rbuf *ringBuffer[T], blocks blockStrategy) *Producer[T] {
return &Producer[T]{
seqer: seqer,
rbuf: rbuf,
blocks: blocks,
capacity: rbuf.cap(),
status: READY,
}
}
func (q *Producer[T]) start() error {
if atomic.CompareAndSwapInt32(&q.status, READY, RUNNING) {
return nil
}
return fmt.Errorf(StartErrorFormat, "Producer")
}
// Write 对象写入核心逻辑
// 首先会从序号产生器中获取一个序号该序号由atomic自增不会重复
// 然后通过&运算获取该序号应该放入的位置pos
// 通过循环的方式判断对应pos位置是否可以写入内容这个判断是通过available数组判断的
// 如果无法写入则持续循环等待,直到可以写入为止,此处基于一种思想即写入的实时性,写入操作不需要等太久,因此此处是没有阻塞的,
// 仅仅通过调度让出的方式进行一部分cpu让渡防止持续占用cpu资源
// 获取到写入资格后将内容写入到ringbuffer同时更新available数组并且调用release以便于释放消费端的阻塞等待
func (q *Producer[T]) Write(v T) error {
if q.closed() {
return ClosedError
}
next := q.seqer.wc.increment()
for {
// 判断是否可以写入
r := atomic.LoadUint64(&q.seqer.rc) - 1
if next <= r+q.capacity {
// 可以写入数据,将数据写入到指定位置
q.rbuf.write(next-1, v)
// 释放,防止消费端阻塞
q.blocks.release()
return nil
}
runtime.Gosched()
// 再次判断是否已关闭
if q.closed() {
return ClosedError
}
}
}
// WriteWindow 写入窗口
// 描述当前可写入的状态,如果不能写入则返回零值,如果可以写入则返回写入窗口大小
// 由于执行时不加锁,所以该结果是不可靠的,仅用于在并发环境很高的情况下,进行丢弃行为
func (q *Producer[T]) WriteWindow() int {
next := q.seqer.wc.atomicLoad() + 1
r := atomic.LoadUint64(&q.seqer.rc)
if next < r+q.capacity {
return int(r + q.capacity - next)
}
return -int(next - (r + q.capacity))
}
// WriteTimeout 在写入的基础上设定一个时间如果时间到了仍然没有写入则会放弃本次写入返回写入的位置和false
// 使用方需要调用 WriteByCursor 来继续写入该位置,因为位置已经被占用,是必须要写的,不能跳跃性写入
// 在指定时间内写入成功会返回true
// 三个返回项写入位置、是否写入成功及是否有error
func (q *Producer[T]) WriteTimeout(v T, timeout time.Duration) (uint64, bool, error) {
if q.closed() {
return 0, false, ClosedError
}
next := q.seqer.wc.increment()
// 先尝试写数据 (failfast)
ok := q.writeByCursor(v, next)
if ok {
return next, true, nil
}
// 创建定时器
waiter := time.NewTimer(timeout)
defer waiter.Stop()
for {
select {
case <-waiter.C:
// 超时触发,执行到此处表示未写入,返回对应结果即可
return next, false, nil
default:
ok = q.writeByCursor(v, next)
if ok {
return next, true, nil
}
runtime.Gosched()
}
if q.closed() {
return 0, false, ClosedError
}
}
}
// WriteByCursor 根据游标写入内容wc是调用 WriteTimeout 方法返回false时对应的写入位置
// 该位置有严格的含义,不要随意填值,否则会造成整个队列异常
// 函数返回值是否写入成功和是否存在error若返回false表示写入失败可以继续调用重复写入
func (q *Producer[T]) WriteByCursor(v T, wc uint64) (bool, error) {
if q.closed() {
return false, ClosedError
}
return q.writeByCursor(v, wc), nil
}
func (q *Producer[T]) writeByCursor(v T, wc uint64) bool {
// 判断是否可以写入
r := atomic.LoadUint64(&q.seqer.rc) - 1
if wc <= r+q.capacity {
// 可以写入数据,将数据写入到指定位置
q.rbuf.write(wc-1, v)
// 释放,防止消费端阻塞
q.blocks.release()
// 返回写入成功标识
return true
}
return false
}
func (q *Producer[T]) close() error {
if atomic.CompareAndSwapInt32(&q.status, RUNNING, READY) {
return nil
}
return fmt.Errorf(CloseErrorFormat, "Producer")
}
func (q *Producer[T]) closed() bool {
return atomic.LoadInt32(&q.status) == READY
}

View File

@@ -0,0 +1,223 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"math/rand"
"os"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
)
const (
GoSize = 5000
SchPerGo = 10000
)
type longEventHandler[T uint64] struct {
count int32
ts time.Time
}
func (h *longEventHandler[T]) OnEvent(v uint64) {
atomic.AddInt32(&h.count, 1)
if h.count == 1 {
h.ts = time.Now()
}
fmt.Println("consumer ", v)
//if v%1000000 == 0 {
// fmt.Printf("read %d\n", v)
//}
if h.count == GoSize*SchPerGo {
tl := time.Since(h.ts)
fmt.Printf("read time = %d ms\n", tl.Milliseconds())
}
}
func TestAA(t *testing.T) {
var (
t1_10us = uint64(0) // 1-10微秒
t10_100us = uint64(0) // 10-100微秒
t100_1000us = uint64(0) // 100-1000微秒
t1_10ms = uint64(0) // 1-10毫秒
t10_100ms = uint64(0) // 10-100毫秒
t100_ms = uint64(0) // 大于100毫秒
slower = uint64(0)
counter = uint64(0)
)
eh := &longEventHandler[uint64]{}
//queue, err := NewProducer[uint64](1024*1024, 1, eh, &SleepWaitStrategy{
// t: time.Nanosecond * 1,
//})
disruptor := NewLockfree[uint64](1024*1024*128, eh,
NewSleepBlockStrategy(time.Microsecond))
disruptor.Start()
producer := disruptor.Producer()
var wg sync.WaitGroup
wg.Add(GoSize)
totalS := time.Now()
for i := 0; i < GoSize; i++ {
go func() {
for j := 0; j < SchPerGo; j++ {
x := atomic.AddUint64(&counter, 1)
ts := time.Now()
err := producer.Write(x)
if err != nil {
panic(err)
}
tl := time.Since(ts)
ms := tl.Microseconds()
if ms > 1 {
atomic.AddUint64(&slower, 1)
if ms < 10 { // t1_10us
atomic.AddUint64(&t1_10us, 1)
} else if ms < 100 {
atomic.AddUint64(&t10_100us, 1)
} else if ms < 1000 {
atomic.AddUint64(&t100_1000us, 1)
} else if ms < 10000 {
atomic.AddUint64(&t1_10ms, 1)
} else if ms < 100000 {
atomic.AddUint64(&t10_100ms, 1)
} else {
atomic.AddUint64(&t100_ms, 1)
}
}
}
wg.Done()
}()
}
wg.Wait()
totalL := time.Since(totalS)
fmt.Printf("write total time = [%d ms]\n", totalL.Milliseconds())
fmt.Println("----- write complete -----")
time.Sleep(time.Second * 3)
disruptor.Close()
fmt.Printf("slow ratio = %.2f \n", float64(slower)*100.0/float64(counter))
fmt.Printf("quick ratio = %.2f \n", float64(counter-slower)*100.0/float64(counter))
fmt.Printf("[<1us][%d] \n", counter-slower)
fmt.Printf("[1-10us][%d] \n", t1_10us)
fmt.Printf("[10-100us][%d] \n", t10_100us)
fmt.Printf("[100-1000us][%d] \n", t100_1000us)
fmt.Printf("[1-10ms][%d] \n", t1_10ms)
fmt.Printf("[10-100ms][%d] \n", t10_100ms)
fmt.Printf("[>100ms][%d] \n", t100_ms)
}
func TestB(t *testing.T) {
var x = uint64(100)
typeOf := reflect.TypeOf(x)
fmt.Println(typeOf)
fmt.Println(os.Getpagesize())
}
func TestC(t *testing.T) {
x := 128 - unsafe.Sizeof(uint64(0))%128
fmt.Println(x)
}
func TestProducer_WriteWindow(t *testing.T) {
eh := &sleepEventHandler[uint64]{
sm: time.Second,
}
disruptor := NewLockfree[uint64](1, eh,
NewSleepBlockStrategy(time.Microsecond))
disruptor.Start()
producer := disruptor.Producer()
// 写入10个数0-9
// 预期结果0可以写入写入后在1ms内被取走此时0会在等待1s后被打印但由于ringbuffer有空位所以1可以被写入
// 1写入后一直无法被取走因为在等待1s内0的打印后续其他值均无法被写入因为1导致ringbuffer满了
for i := 0; i < 10; i++ {
ww := producer.WriteWindow()
if ww <= 0 {
// 表示不能写入,丢弃
fmt.Println("discard ", i , " window ", ww)
} else {
// 实际写入
producer.Write(uint64(i))
}
// 为了给予consumer时间取走
time.Sleep(time.Millisecond)
}
// 为了可以看到打印的结果
time.Sleep(3 * time.Second)
disruptor.Close()
}
func TestWriteTimeout(t *testing.T) {
var counter = uint64(0)
// 写入超时,如何使用
eh := &longSleepEventHandler[uint64]{}
disruptor := NewLockfree[uint64](2, eh, NewSleepBlockStrategy(time.Microsecond))
disruptor.Start()
producer := disruptor.Producer()
// 假设有10个写g
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
for j := 0; j < 100; j++ {
v := atomic.AddUint64(&counter, 1)
wc, exist, err := producer.WriteTimeout(v, time.Millisecond)
if err != nil {
return
}
if !exist {
// 重复写入1次写入不成功则丢弃重新写其他的
if ok, _ := producer.WriteByCursor(v, wc); ok {
continue
}
fmt.Println("discard ", v)
// 重新生成值,一直等待写入
v = atomic.AddUint64(&counter, 1)
for {
if ok, _ := producer.WriteByCursor(v, wc); ok {
fmt.Println("write ", v, " with x times")
break
}
// 写入不成功则休眠防止CPU暴增
time.Sleep(100 * time.Microsecond)
}
} else {
fmt.Println("write ", v, " with 1 time")
}
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(3 * time.Second)
disruptor.Close()
}
// sleepEventHandler 休眠性质的事件处理器
type sleepEventHandler[T uint64] struct {
sm time.Duration
}
func (h *sleepEventHandler[T]) OnEvent(v uint64) {
time.Sleep(h.sm)
fmt.Println("consumer ", v)
}
type longSleepEventHandler[T uint64] struct {
count int32
}
func (h *longSleepEventHandler[T]) OnEvent(v uint64) {
// 每次处理都会进行随机休眠,可以导致消费端变慢
intn := rand.Intn(1000)
time.Sleep(time.Duration(intn * 1000))
fmt.Println("consumer count ", atomic.AddInt32(&h.count, 1))
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"sync/atomic"
)
// sequencer 序号产生器维护读和写两个状态写状态具体由内部游标cursor维护。
// 读取状态由自身维护变量read即可
type sequencer struct {
rc uint64 // 读取游标因为该值仅会被一个g修改所以不需要使用cursor
capacity uint64
wc *cursor
}
func newSequencer(capacity int) *sequencer {
return &sequencer{
wc: newCursor(),
rc: 1,
capacity: uint64(capacity),
}
}
// nextRead 获取下个要读取的位置
// 使用原子操作解决data race问题
func (s *sequencer) nextRead() uint64 {
return atomic.LoadUint64(&s.rc)
}
func (s *sequencer) readIncrement() uint64 {
return atomic.AddUint64(&s.rc, 1)
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"errors"
"reflect"
"runtime"
"unsafe"
)
const (
activeSpin = 4
passiveSpin = 2
READY = 0 // 模块的状态之就绪态
RUNNING = 1 // 模块的状态之运行态
StartErrorFormat = "start model [%s] error"
CloseErrorFormat = "close model [%s] error"
)
var (
ncpu = runtime.NumCPU()
spin = 0
ClosedError = errors.New("the queue has been closed")
)
func init() {
if ncpu > 1 {
spin = activeSpin
}
}
//go:linkname procyield runtime.procyield
func procyield(cycles uint32)
// //go:linkname osyield runtime.osyield
// func osyield()
func osyield() {
runtime.Gosched() // 主动放弃CPU让渡给其他goroutine兼容所有Go版本
}
// byteArrayPointerWithUint8 创建uint8切片返回其对应实际内容Data的指针
func byteArrayPointerWithUint8(capacity int) unsafe.Pointer {
bytes := make([]uint8, capacity)
rs := (*reflect.SliceHeader)(unsafe.Pointer(&bytes))
return unsafe.Pointer(rs.Data)
}
// byteArrayPointer 创建uint32切片返回其对应实际内容Data的指针
func byteArrayPointerWithUint32(capacity int) unsafe.Pointer {
bytes := make([]uint32, capacity)
rs := (*reflect.SliceHeader)(unsafe.Pointer(&bytes))
return unsafe.Pointer(rs.Data)
}
// byteArrayPointer 创建int64切片返回其对应实际内容Data的指针
func byteArrayPointerWithInt64(capacity int) unsafe.Pointer {
bytes := make([]int64, capacity)
rs := (*reflect.SliceHeader)(unsafe.Pointer(&bytes))
return unsafe.Pointer(rs.Data)
}
// minSuitableCap 最小的合适的数量
func minSuitableCap(v int) int {
if v <= 0 {
return 2
}
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
return v
}

View File

@@ -0,0 +1,27 @@
/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMinSuitableCap(t *testing.T) {
x := minSuitableCap(-1)
assert.Equal(t, 2, x)
x = minSuitableCap(3)
assert.Equal(t, 4, x)
x = minSuitableCap(10)
assert.Equal(t, 16, x)
x = minSuitableCap(1023)
assert.Equal(t, 1024, x)
x = minSuitableCap(16)
assert.Equal(t, 16, x)
}

View File

@@ -14,6 +14,7 @@ use (
./common/utils/go-sensitive-word-1.3.3
./common/utils/goja
./common/utils/limit
./common/utils/lockfree-1.1.3
./common/utils/log
./common/utils/qqwry
./common/utils/sturc