diff --git a/common/utils/lockfree-1.1.3/LICENSE b/common/utils/lockfree-1.1.3/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/common/utils/lockfree-1.1.3/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/common/utils/lockfree-1.1.3/README.md b/common/utils/lockfree-1.1.3/README.md new file mode 100644 index 00000000..b127ea16 --- /dev/null +++ b/common/utils/lockfree-1.1.3/README.md @@ -0,0 +1,370 @@ +## Lockfree + +> 如果想使用低于go1.18版本则可以引入tag:1.0.9或branch:below-version1.18 + +> 通过条件编译支持386平台,但性能测试发现比较差,因此不建议使用 + +### 1. 简介 +#### 1.1. 为什么要写Lockfree +在go语言中一般都是使用chan作为消息传递的队列,但在实际高并发的环境下使用发现chan存在严重的性能问题,其直接表现就是将对象放入到chan中时会特别耗时, +即使chan的容量尚未打满,在严重时甚至会产生几百ms还无法放入到chan中的情况。 + +![放大.jpg](images%2F%E6%94%BE%E5%A4%A7.jpg) + +#### 1.2. chan基本原理 + +##### 1.2.1. chan基本结构 + +chan关键字在编译阶段会被编译为runtime.hchan结构,它的结构如下所示: + +![chan结构.jpg](images%2Fchan%E7%BB%93%E6%9E%84.jpg) + +其中sudog是一个比较常见的结构,是对goroutine的一个封装,它的主要结构: + +![sudog.jpg](images%2Fsudog.jpg) + + +##### 1.2.2. chan写入流程 + +![write.jpg](images%2Fwrite.jpg) + +##### 1.2.3. chan读取流程 + +![read.jpg](images%2Fread.jpg) + +#### 1.3. 锁 + +##### 1.3.1. runtime.mutex +chan结构中包含了一个lock字段:`lock mutex`。 +这个lock看名字就知道是一个锁,当然它不是我们业务中经常使用的sync.Mutex,而是一个`runtime.mutex`。 +这个锁是一个互斥锁,在linux系统中它的实现是futex,在没有竞争的情况下,会退化成为一个自旋操作,速度非常快,但是当竞争比较大时,它就会在内核中休眠。 + +futex的基本原理如下图: +![futex.jpg](images%2Ffutex.jpg) + +当竞争非常大时,对于chan而言其整体大部分时间是出于系统调用上,所以性能下降非常明显。 + +##### 1.3.2. sync.Mutex + +而`sync.Mutex`包中的设计原理如下图: + +![锁.jpg](images%2F%E9%94%81.jpg) + + +### 2. Lockfree基本原理 + +#### 2.1. 模块及流程 + +> 在最新的设计中已经删除了available模块,转而使用ringBuffer中的对象(e)中的c(游标)标识写入状态。 + +![lockfree.jpg](images%2Flockfree.jpg) + + + +#### 2.2. 优化点 + +##### 1) 无锁实现 + +内部所有操作都是通过原子变量(atomic)来操作,唯一有可能使用锁的地方,是提供给用户在RingBuffer为空时的等待策略,用户可选择使用chan阻塞 + +##### 2) 单一消费协程 + +屏蔽掉消费端读操作竞争带来的性能损耗 + +##### 3) 写不等待原则 + +符合写入快的初衷,当无法写入时会持续通过自旋和任务调度的方式处理,一方面尽量加快写入效率,另一方面则是防止占用太多CPU资源 + +##### 4) 泛型加速 + +引入泛型,泛型与interface有很明显的区别,泛型是在编译阶段确定类型,这样可有效降低在运行时进行类型转换的耗时 + +##### 5) 一次性内存分配 + +环状结构Ringbuffer实现对象的传递,通过确定大小的切片实现,只需要分配一次内存,不会涉及扩容等操作,可有效提高处理的性能 + +##### 6) 运算加速 + +RingBuffer的容量为2的n次方,通过与运算来代替取余运算,提高性能 + +##### 7) 并行位图 + +用原子位运算实现位图并行操作,在尽量不影响性能的条件下,降低内存消耗 + +并行位图的思路实现历程: + +![bitmap.jpg](images%2Fbitmap.jpg) + +##### 8) 缓存行填充 + +根据CPU高速缓存的特点,通过填充缓存行方式屏蔽掉伪共享问题。 + +缓存行填充应该是一个比较常见的问题,它的本质是因为CPU比较快,而内存比较慢,所以增加了高速缓存来处理: + +![padding1.jpg](images%2Fpadding1.jpg) + +在两个Core共享同一个L3的情况下,如果同时进行修改,就会出现竞争关系(会涉及到缓存一致性协议:MESI): + +![padding2.jpg](images%2Fpadding2.jpg) + +在Lockfree中有两个地方用到了填充: + +![padding3.jpg](images%2Fpadding3.jpg) + +最新版本中只保留了cursor中的填充,在e中使用了游标。 + +##### 9) Pointer替代切片 + +屏蔽掉切片操作必须要进行越界判断的逻辑,生成更高效机器码。 + +![pointer.jpg](images%2Fpointer.jpg) + + +#### 2.3. 核心模块 + +##### ringBuffer +具体对象的存放区域,通过数组(定长切片)实现环状数据结构,其中的数据对象是具体的结构体而非指针,这样可以一次性进行内存申请。 + +##### stateDescriptor + +> 注:最新的版本已将该对象删除,通过ringBuffer中e中的游标来描述状态。这样更充分利用了内存,降低了消耗。 + +状态描述符,定义了对应位置的数据状态,是可读还是可写。提供了三种方式: + + + 1) 基于Uint32的切片:每个Uint32值描述一个位置,性能最高,但内存消耗最大; + + + 2) 基于Bitmap:每个bit描述一个位置,性能最低,但内存消耗最小; + + + 3) 基于Uint8的切片:每个Uint8值描述一个位置,性能适中,消耗也适中,最推荐的方式。 + +##### sequencer +序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护,读取状态由自身维护,一个uint64变量维护。它的核心方法是next(),用于获取下个可以写入的游标。 + +##### Producer +生产者,核心方法是Write,通过调用Write方法可以将对象写入到队列中。支持多个g并发操作,保证加入时处理的效率。 + +##### consumer +消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁,对于实际的并发操作由该g进行分配。 + +##### blockStrategy +阻塞策略,该策略用于buf中长时间没有数据时,消费者阻塞设计。它有两个方法:block()和release()。前者用于消费者阻塞,后者用于释放。 +系统提供了多种方式,不同的方式CPU资源占用和性能会有差别: + ++ 1) SchedBlockStrategy:调用runtime.Gosched()进行调度block,不需要release,为推荐方式; + ++ 2) SleepBlockStrategy:调用time.Sleep(x)进行block,可自定义休眠时间,不需要release,为推荐方式; + ++ 3) ProcYieldBlockStrategy:调用CPU空跑指令,可自定义空跑的指令数量,不需要release; + ++ 4) OSYieldBlockStrategy:操作系统会将对应M调度出去,等时间片重新分配后可执行,不需要release; + ++ 5) ChanBlockStrategy:chan阻塞策略,需要release,为推荐方式; + ++ 6) CanditionBlockStrategy:candition阻塞策略,需要release,为推荐方式; + +其中1/2/5/6为推荐方式,如果性能要求比较高,则优先考虑2和1,否则建议试用5和6。 + +##### EventHandler +事件处理器接口,整个项目中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理,它使用泛型,通过编译阶段确定事件类型,提高性能。 + +### 3. 使用方式 + +#### 3.1. 导入模块 +可使用 `go get github.com/bruceshao/lockfree` 获取最新版本 + +#### 3.2. 代码调用 +为了提升性能,Lockfree支持go版本1.18及以上,以便于支持泛型,Lockfree使用非常简单: + +```go +package main + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/bruceshao/lockfree" +) + +var ( + goSize = 10000 + sizePerGo = 10000 + + total = goSize * sizePerGo +) + +func main() { + // lockfree计时 + now := time.Now() + + // 创建事件处理器 + handler := &eventHandler[uint64]{ + signal: make(chan struct{}, 0), + now: now, + } + + // 创建消费端串行处理的Lockfree + lf := lockfree.NewLockfree[uint64]( + 1024*1024, + handler, + lockfree.NewSleepBlockStrategy(time.Millisecond), + ) + + // 启动Lockfree + if err := lf.Start(); err != nil { + panic(err) + } + + // 获取生产者对象 + producer := lf.Producer() + + // 并发写入 + var wg sync.WaitGroup + wg.Add(goSize) + for i := 0; i < goSize; i++ { + go func(start int) { + for j := 0; j < sizePerGo; j++ { + err := producer.Write(uint64(start*sizePerGo + j + 1)) + if err != nil { + panic(err) + } + } + wg.Done() + }(i) + } + + // wait for producer + wg.Wait() + + fmt.Printf("producer has been writed, write count: %v, time cost: %v \n", total, time.Since(now).String()) + + // wait for consumer + handler.wait() + + // 关闭Lockfree + lf.Close() +} + +type eventHandler[T uint64] struct { + signal chan struct{} + gcounter uint64 + now time.Time +} + +func (h *eventHandler[T]) OnEvent(v uint64) { + cur := atomic.AddUint64(&h.gcounter, 1) + if cur == uint64(total) { + fmt.Printf("eventHandler has been consumed already, read count: %v, time cose: %v\n", total, time.Since(h.now)) + close(h.signal) + return + } + + if cur%10000000 == 0 { + fmt.Printf("eventHandler consume %v\n", cur) + } +} + +func (h *eventHandler[T]) wait() { + <-h.signal +} +``` + +### 4. 性能对比 + +#### 4.1. 简述 + +在实际测试中发现,如果lockfree和chan同时跑的话会有一些影响,lockfree的表现基本是正常的,和chan同时跑的时候性能基本是下降的。 +但chan比较奇怪,和lockfree一起跑的时候性能比chan自身跑性能还高。目前正在排查此问题,但不影响使用。 + +main包下提供了测试的程序,可自行进行性能测试(假设编译后的二进制为lockfree): + + + 1)单独测试lockfree:./lockfree lockfree [time],加入time会有时间分布; + + 2)单独测试chan:./lockfree chan [time],加入time会有时间分布; + + 3)合并测试lockfree和chan:./lockfree [all] [time],使用time时,前面必须加all参数,只进行测试,不关注时间分布的话,可直接调用./lockfree; + +为描述性能,除了时间外,定义了**QR**(Quick Ratio,快速率)的指标,该指标描述的是写入时间在1微秒以内的操作占所有操作的比值。 +自然的,QR越大,性能越高。 + +#### 4.2. 软硬件测试环境 + +CPU信息如下(4 * 2.5GHz): +```shell +[root@VM]# lscpu +Architecture: x86_64 +CPU op-mode(s): 32-bit, 64-bit +Byte Order: Little Endian +CPU(s): 4 +On-line CPU(s) list: 0-3 +Thread(s) per core: 1 +Core(s) per socket: 4 +Socket(s): 1 +NUMA node(s): 1 +Vendor ID: GenuineIntel +CPU family: 6 +Model: 94 +Model name: Intel(R) Xeon(R) Gold 6133 CPU @ 2.50GHz +Stepping: 3 +CPU MHz: 2494.120 +BogoMIPS: 4988.24 +Hypervisor vendor: KVM +Virtualization type: full +L1d cache: 32K +L1i cache: 32K +L2 cache: 4096K +L3 cache: 28160K +NUMA node0 CPU(s): 0-3 +``` + +内存信息(8G): +```shell +[root@VM]# free -m + total used free shared buff/cache available +Mem: 7779 405 6800 116 573 7216 +Swap: 0 0 0 +``` + +操作系统(centos 7.2): +```shell +[root@VM]# cat /etc/centos-release +CentOS Linux release 7.2 (Final) +[root@VM]# uname -a +Linux VM-219-157-centos 3.10.107-1-tlinux2_kvm_guest-0056 #1 SMP Wed Dec 29 14:35:09 CST 2021 x86_64 x86_64 x86_64 GNU/Linux +``` + +云厂商:腾讯云。 + + +#### 4.3. 写入性能对比 + +设定buffer大小为1024 * 1024,无论是lockfree还是chan都是如此设置。其写入的时间对比如下: + +其中 100 * 10000表示有100个goroutine,每个goroutine写入10000次,其他的依次类推。 + +all(lockfree/chan)表示在lockfree和chan同时跑的情况下,其分别的时间占比情况。 + +| 类型 | 100 * 10000 | 500 * 10000 | 1000 * 10000 | 5000 * 10000 | 10000 * 10000 | +|---------------|-------------|-------------|--------------|--------------|---------------| +| lockfree | 67ms | 306ms | 676ms | 3779ms | 7703ms | +| chan | 116ms | 1991ms | 4709ms | 26897ms | 58509ms | +| all(lockfree) | 49ms | 414ms | 976ms | 5038ms | 10946ms | +| all(chan) | 83ms | 859ms | 3029ms | 19228ms | 40473ms | + + +#### 4.4. QR分布 + +快速率的分布情况如下所示: + +| 类型 | 100 * 10000 | 500 * 10000 | 1000 * 10000 | 5000 * 10000 | 10000 * 10000 | +|---------------|-------------|-------------|--------------|--------------|---------------| +| lockfree | 99.23 | 99.78 | 99.81 | 99.49 | 98.99 | +| chan | 91.67 | 88.99 | 57.79 | 3.98 | 1.6 | +| all(lockfree) | 99.69 | 99.88 | 99.88 | 99.52 | 99.02 | +| all(chan) | 96.72 | 93.5 | 93.1 | 51.37 | 48.2 | + + +#### 4.5. 结果 + +从上面两张表可以很明显看出如下几点: + + 1)在goroutine数量比较小时,lockfree和chan性能差别不明显; + + 2)当goroutine打到一定数量(大于1000)后,lockfree无论从时间还是QR都远远超过chan; \ No newline at end of file diff --git a/common/utils/lockfree-1.1.3/blocks.go b/common/utils/lockfree-1.1.3/blocks.go new file mode 100644 index 00000000..464a53b7 --- /dev/null +++ b/common/utils/lockfree-1.1.3/blocks.go @@ -0,0 +1,160 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "runtime" + "sync" + "sync/atomic" + "time" +) + +// blockStrategy 阻塞策略 +type blockStrategy interface { + // block 阻塞 + block(actual *uint64, expected uint64) + + // release 释放阻塞 + release() +} + +// SchedBlockStrategy 调度等待策略 +// 调用runtime.Gosched()方法使当前 g 主动让出 cpu 资源。 +type SchedBlockStrategy struct { +} + +func (s *SchedBlockStrategy) block(actual *uint64, expected uint64) { + runtime.Gosched() +} + +func (s *SchedBlockStrategy) release() { +} + +// SleepBlockStrategy 休眠等待策略 +// 调用 Sleep 方法使当前 g 主动让出 cpu 资源。 +// sleep poll 参考值: +// 轮询时长为 10us 时,cpu 开销约 2-3% 左右。 +// 轮询时长为 5us 时,cpu 开销约在 10% 左右。 +// 轮询时长小于 5us 时,cpu 开销接近 100% 满载。 +type SleepBlockStrategy struct { + t time.Duration +} + +func NewSleepBlockStrategy(wait time.Duration) *SleepBlockStrategy { + return &SleepBlockStrategy{ + t: wait, + } +} + +func (s *SleepBlockStrategy) block(actual *uint64, expected uint64) { + time.Sleep(s.t) +} + +func (s *SleepBlockStrategy) release() { +} + +// ProcYieldBlockStrategy CPU空指令策略 +type ProcYieldBlockStrategy struct { + cycle uint32 +} + +func NewProcYieldBlockStrategy(cycle uint32) *ProcYieldBlockStrategy { + return &ProcYieldBlockStrategy{ + cycle: cycle, + } +} + +func (s *ProcYieldBlockStrategy) block(actual *uint64, expected uint64) { + procyield(s.cycle) +} + +func (s *ProcYieldBlockStrategy) release() { +} + +// OSYieldBlockStrategy 操作系统调度策略 +type OSYieldBlockStrategy struct { +} + +func NewOSYieldWaitStrategy() *OSYieldBlockStrategy { + return &OSYieldBlockStrategy{} +} + +func (s *OSYieldBlockStrategy) block(actual *uint64, expected uint64) { + osyield() +} + +func (s *OSYieldBlockStrategy) release() { +} + +// ChanBlockStrategy chan阻塞策略 +type ChanBlockStrategy struct { + bc chan struct{} + b uint32 +} + +func NewChanBlockStrategy() *ChanBlockStrategy { + return &ChanBlockStrategy{ + bc: make(chan struct{}), + } +} + +func (s *ChanBlockStrategy) block(actual *uint64, expected uint64) { + // 0:未阻塞;1:阻塞 + if atomic.CompareAndSwapUint32(&s.b, 0, 1) { + // 设置成功的话,表示阻塞,需要进行二次判断 + if atomic.LoadUint64(actual) == expected { + // 表示阻塞失败,因为结果是一致的,此处需要重新将状态调整回来 + if atomic.CompareAndSwapUint32(&s.b, 1, 0) { + // 表示回调成功,直接退出即可 + return + } else { + // 表示有其他协程release了,则读取对应chan即可 + <-s.bc + } + } else { + // 如果说结果不一致,则表示阻塞,等待被释放即可 + <-s.bc + } + } + // 没有设置成功,不用关注 +} + +func (s *ChanBlockStrategy) release() { + if atomic.CompareAndSwapUint32(&s.b, 1, 0) { + // 表示可以释放,即chan是等待状态 + s.bc <- struct{}{} + } + // 无法设置则不用关心 + return +} + +// ConditionBlockStrategy condition 阻塞策略 +type ConditionBlockStrategy struct { + cond *sync.Cond +} + +func NewConditionBlockStrategy() *ConditionBlockStrategy { + return &ConditionBlockStrategy{ + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (s *ConditionBlockStrategy) block(actual *uint64, expected uint64) { + s.cond.L.Lock() + defer s.cond.L.Unlock() + if atomic.LoadUint64(actual) == expected { + return + } + s.cond.Wait() +} + +func (s *ConditionBlockStrategy) release() { + s.cond.L.Lock() + defer s.cond.L.Unlock() + s.cond.Broadcast() +} diff --git a/common/utils/lockfree-1.1.3/buffer.go b/common/utils/lockfree-1.1.3/buffer.go new file mode 100644 index 00000000..23ebf618 --- /dev/null +++ b/common/utils/lockfree-1.1.3/buffer.go @@ -0,0 +1,57 @@ +//go:build !386 + +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import "sync/atomic" + +type e[T any] struct { + c uint64 + val T +} + +// ringBuffer 具体对象的存放区域,通过数组(定长切片)实现环状数据结构 +// 其中e为具体对象,非指针,这样可以一次性进行内存申请 +type ringBuffer[T any] struct { + // 增加默认的对象以便于return,处理data race问题 + tDefault T + buf []e[T] + capMask uint64 +} + +func newRingBuffer[T any](cap int) *ringBuffer[T] { + x := ringBuffer[T]{ + capMask: uint64(cap) - 1, + buf: make([]e[T], cap), + } + return &x +} + +func (r *ringBuffer[T]) write(c uint64, v T) { + x := &r.buf[c&r.capMask] + x.val = v + atomic.StoreUint64(&x.c, c+1) +} + +func (r *ringBuffer[T]) element(c uint64) e[T] { + return r.buf[c&r.capMask] +} + +func (r *ringBuffer[T]) contains(c uint64) (T, *uint64, bool) { + x := &r.buf[c&r.capMask] + if atomic.LoadUint64(&x.c) == c+1 { + v := x.val + return v, &x.c, true + } + return r.tDefault, &x.c, false +} + +func (r *ringBuffer[T]) cap() uint64 { + return r.capMask + 1 +} diff --git a/common/utils/lockfree-1.1.3/buffer_386.go b/common/utils/lockfree-1.1.3/buffer_386.go new file mode 100644 index 00000000..ee70e76d --- /dev/null +++ b/common/utils/lockfree-1.1.3/buffer_386.go @@ -0,0 +1,64 @@ +//go:build 386 + +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "sync" +) + +type e[T any] struct { + c uint64 + val T +} + +// ringBuffer 具体对象的存放区域,通过数组(定长切片)实现环状数据结构 +// 其中e为具体对象,非指针,这样可以一次性进行内存申请 +type ringBuffer[T any] struct { + sync.RWMutex + // 增加默认的对象以便于return,处理data race问题 + tDefault T + buf []e[T] + capMask uint64 +} + +func newRingBuffer[T any](cap int) *ringBuffer[T] { + x := ringBuffer[T]{ + capMask: uint64(cap) - 1, + buf: make([]e[T], cap), + } + return &x +} + +func (r *ringBuffer[T]) write(c uint64, v T) { + x := &r.buf[c&r.capMask] + r.Lock() + defer r.Unlock() + x.val = v + x.c = c + 1 +} + +func (r *ringBuffer[T]) element(c uint64) e[T] { + return r.buf[c&r.capMask] +} + +func (r *ringBuffer[T]) contains(c uint64) (T, *uint64, bool) { + x := &r.buf[c&r.capMask] + r.RLock() + defer r.RUnlock() + if x.c == c+1 { + v := x.val + return v, &x.c, true + } + return r.tDefault, &x.c, false +} + +func (r *ringBuffer[T]) cap() uint64 { + return r.capMask + 1 +} diff --git a/common/utils/lockfree-1.1.3/buffer_test.go b/common/utils/lockfree-1.1.3/buffer_test.go new file mode 100644 index 00000000..b891a803 --- /dev/null +++ b/common/utils/lockfree-1.1.3/buffer_test.go @@ -0,0 +1,79 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "fmt" + "reflect" + "testing" + "time" + "unsafe" +) + +func TestA(t *testing.T) { + sli := make([]uint8, 8) + sh := *(*reflect.SliceHeader)(unsafe.Pointer(&sli)) + mem := unsafe.Pointer(sh.Data) + Sets(mem) + fmt.Printf("%v\n", sli[0]) + fmt.Printf("%v\n", sli[1]) + fmt.Printf("%v\n", sli[2]) + fmt.Printf("%v\n", sli[3]) + fmt.Printf("%v\n", sli[4]) + fmt.Printf("%v\n", sli[5]) + fmt.Printf("%v\n", sli[6]) + fmt.Printf("%v\n", sli[7]) +} + +func Sets(mem unsafe.Pointer) { + for i := 0; i < 8; i++ { + *(*uint8)(unsafe.Pointer(uintptr(mem) + uintptr(i))) = 1 + } +} + +func TestX(t *testing.T) { + loop := 1000000 + l := 1024 * 1024 + bytes := make([]byte, l) + ts := time.Now() + for i := 0; i < loop; i++ { + bytes[i] = 0x01 + } + tl := time.Since(ts) + fmt.Println(tl.Microseconds()) +} + +func TestBuffer(t *testing.T) { + buf := newRingBuffer[uint64](1024) + buf.write(0, 1) + x := buf.element(0) + fmt.Println(x) + + buf.write(1023, 2) + x = buf.element(1023) + fmt.Println(x) + + buf.write(1024, 3) + x = buf.element(1024) + fmt.Println(x) +} + +func TestBufferAlign32(t *testing.T) { + buf := newRingBuffer[uint32](1024) + buf.write(0, 1) + x := buf.element(0) + fmt.Println(x) + + buf.write(1023, 2) + x = buf.element(1023) + fmt.Println(x) + + buf.write(1024, 3) + x = buf.element(1024) + fmt.Println(x) +} diff --git a/common/utils/lockfree-1.1.3/channel_test.go b/common/utils/lockfree-1.1.3/channel_test.go new file mode 100644 index 00000000..81313779 --- /dev/null +++ b/common/utils/lockfree-1.1.3/channel_test.go @@ -0,0 +1,141 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +func BenchmarkChan(b *testing.B) { + var ( + length = 1024 * 1024 + goSize = GoSize + numPerGo = SchPerGo + counter = uint64(0) + wg sync.WaitGroup + ) + ch := make(chan uint64, length) + // 消费端 + go func() { + var ts time.Time + var count int32 + for { + <-ch + atomic.AddInt32(&count, 1) + if count == 1 { + ts = time.Now() + } + //if x%10000000 == 0 { + // fmt.Printf("read %d\n", x) + //} + if count == int32(goSize*numPerGo) { + tl := time.Since(ts) + fmt.Printf("read time = %d ms\n", tl.Milliseconds()) + } + } + }() + wg.Add(b.N) + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + for j := 0; j < numPerGo; j++ { + x := atomic.AddUint64(&counter, 1) + ch <- x + } + wg.Done() + }() + } + wg.Wait() +} + +func TestChan1(t *testing.T) { + var ( + t1_10us = uint64(0) // 1-10微秒 + t10_100us = uint64(0) // 10-100微秒 + t100_1000us = uint64(0) // 100-1000微秒 + t1_10ms = uint64(0) // 1-10毫秒 + t10_100ms = uint64(0) // 10-100毫秒 + t100_ms = uint64(0) // 大于100毫秒 + ) + + var ( + length = 1024 * 1024 + goSize = GoSize + numPerGo = SchPerGo + counter = uint64(0) + slower = uint64(0) + wg sync.WaitGroup + ) + ch := make(chan uint64, length) + // 消费端 + go func() { + var ts time.Time + var count int32 + for { + x := <-ch + atomic.AddInt32(&count, 1) + if count == 1 { + ts = time.Now() + } + if x%1000000 == 0 { + fmt.Printf("read %d\n", x) + } + if count == int32(goSize*numPerGo) { + tl := time.Since(ts) + fmt.Printf("read time = %d ms\n", tl.Milliseconds()) + } + } + }() + wg.Add(goSize) + totalS := time.Now() + for i := 0; i < goSize; i++ { + go func() { + for j := 0; j < numPerGo; j++ { + x := atomic.AddUint64(&counter, 1) + ts := time.Now() + ch <- x + tl := time.Since(ts) + ms := tl.Microseconds() + if ms > 1 { + atomic.AddUint64(&slower, 1) + if ms < 10 { // t1_10us + atomic.AddUint64(&t1_10us, 1) + } else if ms < 100 { + atomic.AddUint64(&t10_100us, 1) + } else if ms < 1000 { + atomic.AddUint64(&t100_1000us, 1) + } else if ms < 10000 { + atomic.AddUint64(&t1_10ms, 1) + } else if ms < 100000 { + atomic.AddUint64(&t10_100ms, 1) + } else { + atomic.AddUint64(&t100_ms, 1) + } + } + } + wg.Done() + }() + } + wg.Wait() + totalL := time.Since(totalS) + fmt.Printf("write total time = [%d ms]\n", totalL.Milliseconds()) + time.Sleep(time.Second * 3) + fmt.Printf("slow ratio = %.2f \n", float64(slower)*100.0/float64(counter)) + fmt.Printf("quick ratio = %.2f \n", float64(goSize*numPerGo-int(slower))*100.0/float64(goSize*numPerGo)) + fmt.Printf("[<1us][%d] \n", counter-slower) + fmt.Printf("[1-10us][%d] \n", t1_10us) + fmt.Printf("[10-100us][%d] \n", t10_100us) + fmt.Printf("[100-1000us][%d] \n", t100_1000us) + fmt.Printf("[1-10ms][%d] \n", t1_10ms) + fmt.Printf("[10-100ms][%d] \n", t10_100ms) + fmt.Printf("[>100ms][%d] \n", t100_ms) +} diff --git a/common/utils/lockfree-1.1.3/consumer.go b/common/utils/lockfree-1.1.3/consumer.go new file mode 100644 index 00000000..cf7f15ad --- /dev/null +++ b/common/utils/lockfree-1.1.3/consumer.go @@ -0,0 +1,90 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "fmt" + "runtime" + "sync/atomic" +) + +// consumer 消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁 +// 对于实际的并发操作由该g进行分配 +type consumer[T any] struct { + status int32 // 运行状态 + rbuf *ringBuffer[T] + seqer *sequencer + blocks blockStrategy + hdl EventHandler[T] +} + +func newConsumer[T any](rbuf *ringBuffer[T], hdl EventHandler[T], sequer *sequencer, blocks blockStrategy) *consumer[T] { + return &consumer[T]{ + rbuf: rbuf, + seqer: sequer, + hdl: hdl, + blocks: blocks, + status: READY, + } +} + +func (c *consumer[T]) start() error { + if atomic.CompareAndSwapInt32(&c.status, READY, RUNNING) { + go c.handle() + return nil + } + return fmt.Errorf(StartErrorFormat, "Consumer") +} + +func (c *consumer[T]) handle() { + // 判断是否可以获取到 + rc := c.seqer.nextRead() + for { + if c.closed() { + return + } + var i = 0 + for { + if c.closed() { + return + } + // 看下读取位置的seq是否OK + if v, p, exist := c.rbuf.contains(rc - 1); exist { + rc = c.seqer.readIncrement() + c.hdl.OnEvent(v) + i = 0 + break + } else { + if i < spin { + procyield(30) + } else if i < spin+passiveSpin { + runtime.Gosched() + } else { + c.blocks.block(p, rc) + i = 0 + } + i++ + } + } + } +} + +func (c *consumer[T]) close() error { + if atomic.CompareAndSwapInt32(&c.status, RUNNING, READY) { + // 防止阻塞无法释放 + c.blocks.release() + return nil + } + return fmt.Errorf(CloseErrorFormat, "Consumer") +} + +// closed 判断是否已关闭 +// 将直接判断调整为原子操作,解决data race问题 +func (c *consumer[T]) closed() bool { + return atomic.LoadInt32(&c.status) == READY +} diff --git a/common/utils/lockfree-1.1.3/cursor.go b/common/utils/lockfree-1.1.3/cursor.go new file mode 100644 index 00000000..6700cda3 --- /dev/null +++ b/common/utils/lockfree-1.1.3/cursor.go @@ -0,0 +1,40 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import "sync/atomic" + +// cursor 游标,一直持续增长的一个uint64序列 +// 该序列用于wg(Write Goroutine)获取对应写入到buffer中元素的位置操作 +// 通过使用atomic操作避免锁,提高性能 +// 通过使用padding填充的方式,填充前面和后面各使用7个uint64(缓存行填充),避免伪共享问题 +type cursor struct { + p1, p2, p3, p4, p5, p6, p7 uint64 + v uint64 + p9, p10, p11, p12, p13, p14, p15 uint64 +} + +func newCursor() *cursor { + return &cursor{} +} + +func (c *cursor) increment() uint64 { + return atomic.AddUint64(&c.v, 1) +} + +func (c *cursor) atomicLoad() uint64 { + return atomic.LoadUint64(&c.v) +} + +func (c *cursor) load() uint64 { + return c.v +} + +func (c *cursor) store(expectVal, newVal uint64) bool { + return atomic.CompareAndSwapUint64(&c.v, expectVal, newVal) +} diff --git a/common/utils/lockfree-1.1.3/cursor_test.go b/common/utils/lockfree-1.1.3/cursor_test.go new file mode 100644 index 00000000..20577d8f --- /dev/null +++ b/common/utils/lockfree-1.1.3/cursor_test.go @@ -0,0 +1,113 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestCursor(t *testing.T) { + c := newCursor() + ts := time.Now() + for i := 0; i < 100000000; i++ { + x := c.increment() + if x%1000000 == 0 { + fmt.Println(x) + } + } + tl := time.Since(ts) + fmt.Printf("time = %v\n", tl) +} + +func TestCursor2(t *testing.T) { + c := newCursor() + var wg sync.WaitGroup + wg.Add(10000) + ts := time.Now() + for i := 0; i < 10000; i++ { + go func() { + for j := 0; j < 10000; j++ { + x := c.increment() + if x%10000000 == 0 { + fmt.Println(x) + } + } + wg.Done() + }() + } + wg.Wait() + fmt.Println(time.Since(ts)) +} + +func TestCursor3(t *testing.T) { + var c uint64 + var wg sync.WaitGroup + wg.Add(10000) + ts := time.Now() + for i := 0; i < 10000; i++ { + go func() { + for j := 0; j < 10000; j++ { + x := atomic.AddUint64(&c, 1) + if x%10000000 == 0 { + fmt.Println(x) + } + } + wg.Done() + }() + } + wg.Wait() + fmt.Println(time.Since(ts)) +} + +type NoPad struct { + a uint64 + b uint64 + c uint64 +} + +func (np *NoPad) Increase() { + atomic.AddUint64(&np.a, 1) + atomic.AddUint64(&np.b, 1) + atomic.AddUint64(&np.c, 1) +} + +type Pad struct { + a uint64 + _p1 [8]uint64 + b uint64 + _p2 [8]uint64 + c uint64 + _p3 [8]uint64 +} + +func (p *Pad) Increase() { + atomic.AddUint64(&p.a, 1) + atomic.AddUint64(&p.b, 1) + atomic.AddUint64(&p.c, 1) +} + +func BenchmarkPad_Increase(b *testing.B) { + pad := &Pad{} + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + pad.Increase() + } + }) +} +func BenchmarkNoPad_Increase(b *testing.B) { + nopad := &NoPad{} + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + nopad.Increase() + } + }) +} diff --git a/common/utils/lockfree-1.1.3/example/complex/main.go b/common/utils/lockfree-1.1.3/example/complex/main.go new file mode 100644 index 00000000..2207d9f3 --- /dev/null +++ b/common/utils/lockfree-1.1.3/example/complex/main.go @@ -0,0 +1,130 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package main + +import ( + "fmt" + "github.com/bruceshao/lockfree" + "math/rand" + "sync" + "sync/atomic" + "time" +) + +func main() { + fmt.Println("========== start write by discard ==========") + writeByDiscard() + fmt.Println("========== complete write by discard ==========") + fmt.Println("========== start write by cursor ==========") + writeByCursor() + fmt.Println("========== complete write by cursor ==========") +} + +func writeByDiscard() { + var counter = uint64(0) + // 写入超时,如何使用 + eh := &fixedSleepEventHandler[uint64]{ + sm: time.Millisecond * 10, + } + disruptor := lockfree.NewLockfree[uint64](2, eh, lockfree.NewSleepBlockStrategy(time.Microsecond)) + disruptor.Start() + producer := disruptor.Producer() + // 假设有100个写g + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + for j := 0; j < 10; j++ { + v := atomic.AddUint64(&counter, 1) + ww := producer.WriteWindow() + if ww <= 0 { + // 表示无法写入,丢弃 + fmt.Println("discard ", v) + continue + } + // 表示可以写入,写入即可 + err := producer.Write(v) + if err != nil { + panic(err) + } + fmt.Println("write ", v) + } + wg.Done() + }() + } + wg.Wait() + time.Sleep(3 * time.Second) + disruptor.Close() +} + +func writeByCursor() { + var counter = uint64(0) + // 写入超时,如何使用 + eh := &randomSleepEventHandler[uint64]{} + disruptor := lockfree.NewLockfree[uint64](2, eh, lockfree.NewSleepBlockStrategy(time.Microsecond)) + disruptor.Start() + producer := disruptor.Producer() + // 假设有100个写g + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + for j := 0; j < 10; j++ { + v := atomic.AddUint64(&counter, 1) + wc, exist, err := producer.WriteTimeout(v, time.Millisecond) + if err != nil { + return + } + if !exist { + // 重复写入1次,写入不成功则丢弃重新写其他的 + if ok, _ := producer.WriteByCursor(v, wc); ok { + continue + } + fmt.Println("discard ", v) + // 重新生成值,一直等待写入 + v = atomic.AddUint64(&counter, 1) + for { + if ok, _ := producer.WriteByCursor(v, wc); ok { + fmt.Println("write ", v, " with x times") + break + } + // 写入不成功则休眠,防止CPU暴增 + time.Sleep(100 * time.Microsecond) + } + } else { + fmt.Println("write ", v, " with 1 time") + } + } + wg.Done() + }() + } + wg.Wait() + time.Sleep(3 * time.Second) + disruptor.Close() +} + + +type fixedSleepEventHandler[T uint64] struct { + sm time.Duration +} + +func (h *fixedSleepEventHandler[T]) OnEvent(v uint64) { + time.Sleep(h.sm) + fmt.Println("consumer ", v) +} + +type randomSleepEventHandler[T uint64] struct { + count int32 +} + +func (h *randomSleepEventHandler[T]) OnEvent(v uint64) { + // 每次处理都会进行随机休眠,可以导致消费端变慢 + intn := rand.Intn(1000) + time.Sleep(time.Duration(intn * 1000)) + fmt.Println("consumer count ", atomic.AddInt32(&h.count, 1)) +} \ No newline at end of file diff --git a/common/utils/lockfree-1.1.3/example/perfect/main.go b/common/utils/lockfree-1.1.3/example/perfect/main.go new file mode 100644 index 00000000..c0e844a0 --- /dev/null +++ b/common/utils/lockfree-1.1.3/example/perfect/main.go @@ -0,0 +1,250 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package main + +import ( + "fmt" + "github.com/bruceshao/lockfree" + "os" + "runtime/pprof" + "sync" + "sync/atomic" + "time" +) + +var ( + goSize = 10000 + sizePerGo = 10000 + cap = 1024 * 1024 + timeAnalyse = false +) + +func main() { + f, _ := os.OpenFile("cpu.pprof", os.O_CREATE|os.O_RDWR, 0644) + defer f.Close() + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + // 启动命令: .exe [all/lockfree/chan] [time] + arg := "all" + if len(os.Args) > 1 { + arg = os.Args[1] + } + if len(os.Args) > 2 { + timeAnalyse = true + } + if arg == "all" { + fmt.Println("start lockfree and channel test") + // 表示全部启动 + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + lockfreeMain() + }() + go func() { + defer wg.Done() + chanMain() + }() + wg.Wait() + } else if arg == "chan" { + fmt.Println("start channel test") + // 表示启动chan + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + chanMain() + }() + wg.Wait() + } else if arg == "lockfree" { + fmt.Println("start lockfree test") + // 表示启动lockfree + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + lockfreeMain() + }() + wg.Wait() + } + fmt.Println("all queue is over") +} + +func lockfreeMain() { + var ( + t1_10us = uint64(0) // 1-10微秒 + t10_100us = uint64(0) // 10-100微秒 + t100_1000us = uint64(0) // 100-1000微秒 + t1_10ms = uint64(0) // 1-10毫秒 + t10_100ms = uint64(0) // 10-100毫秒 + t100_ms = uint64(0) // 大于100毫秒 + slower = uint64(0) + ) + // 创建事件处理器 + eh := &longEventHandler[uint64]{} + // 创建消费端串行处理的Lockfree + lf := lockfree.NewLockfree[uint64](cap, eh, lockfree.NewSleepBlockStrategy(time.Millisecond)) + // 启动Lockfree + if err := lf.Start(); err != nil { + panic(err) + } + // lockfree计时 + ts := time.Now() + // 获取生产者对象 + producer := lf.Producer() + var wg sync.WaitGroup + wg.Add(goSize) + for i := 0; i < goSize; i++ { + go func(start int) { + if timeAnalyse { + for j := 0; j < sizePerGo; j++ { + //写入数据 + tb := time.Now() + err := producer.Write(uint64(start*sizePerGo + j + 1)) + tl := time.Since(tb) + ms := tl.Microseconds() + if ms > 1 { + atomic.AddUint64(&slower, 1) + if ms < 10 { // t1_10us + atomic.AddUint64(&t1_10us, 1) + } else if ms < 100 { + atomic.AddUint64(&t10_100us, 1) + } else if ms < 1000 { + atomic.AddUint64(&t100_1000us, 1) + } else if ms < 10000 { + atomic.AddUint64(&t1_10ms, 1) + } else if ms < 100000 { + atomic.AddUint64(&t10_100ms, 1) + } else { + atomic.AddUint64(&t100_ms, 1) + } + } + if err != nil { + panic(err) + } + } + } else { + for j := 0; j < sizePerGo; j++ { + err := producer.Write(uint64(start*sizePerGo + j + 1)) + if err != nil { + panic(err) + } + } + } + wg.Done() + }(i) + } + wg.Wait() + fmt.Println("===== lockfree[", time.Now().Sub(ts), "] =====") + fmt.Println("----- lockfree write complete -----") + time.Sleep(3 * time.Second) + // 关闭Lockfree + lf.Close() + if timeAnalyse { + fmt.Printf("lockfree slow ratio = %.2f \n", float64(slower)*100.0/float64(goSize*sizePerGo)) + fmt.Printf("lockfree quick ratio = %.2f \n", float64(goSize*sizePerGo-int(slower))*100.0/float64(goSize*sizePerGo)) + fmt.Printf("lockfree [<1us][%d] \n", uint64(goSize*sizePerGo)-slower) + fmt.Printf("lockfree [1-10us][%d] \n", t1_10us) + fmt.Printf("lockfree [10-100us][%d] \n", t10_100us) + fmt.Printf("lockfree [100-1000us][%d] \n", t100_1000us) + fmt.Printf("lockfree [1-10ms][%d] \n", t1_10ms) + fmt.Printf("lockfree [10-100ms][%d] \n", t10_100ms) + fmt.Printf("lockfree [>100ms][%d] \n", t100_ms) + } +} + +func chanMain() { + var ( + t1_10us = uint64(0) // 1-10微秒 + t10_100us = uint64(0) // 10-100微秒 + t100_1000us = uint64(0) // 100-1000微秒 + t1_10ms = uint64(0) // 1-10毫秒 + t10_100ms = uint64(0) // 10-100毫秒 + t100_ms = uint64(0) // 大于100毫秒 + slower = uint64(0) + ) + c := make(chan uint64, cap) + // 启动监听协程 + go func() { + for { + x, ok := <-c + if !ok { + return + } + if x%10000000 == 0 { + fmt.Println("chan [", x, "]") + } + } + }() + // lockfree计时 + ts := time.Now() + // 开始写入 + var wg sync.WaitGroup + wg.Add(goSize) + for i := 0; i < goSize; i++ { + go func(start int) { + if timeAnalyse { + for j := 0; j < sizePerGo; j++ { + //写入数据 + tb := time.Now() + c <- uint64(start*sizePerGo + j + 1) + tl := time.Since(tb) + ms := tl.Microseconds() + if ms > 1 { + atomic.AddUint64(&slower, 1) + if ms < 10 { // t1_10us + atomic.AddUint64(&t1_10us, 1) + } else if ms < 100 { + atomic.AddUint64(&t10_100us, 1) + } else if ms < 1000 { + atomic.AddUint64(&t100_1000us, 1) + } else if ms < 10000 { + atomic.AddUint64(&t1_10ms, 1) + } else if ms < 100000 { + atomic.AddUint64(&t10_100ms, 1) + } else { + atomic.AddUint64(&t100_ms, 1) + } + } + } + } else { + for j := 0; j < sizePerGo; j++ { + //写入数据 + c <- uint64(start*sizePerGo + j + 1) + } + } + wg.Done() + }(i) + } + wg.Wait() + fmt.Println("=====channel[", time.Now().Sub(ts), "]=====") + fmt.Println("----- channel write complete -----") + time.Sleep(3 * time.Second) + // 关闭chan + close(c) + if timeAnalyse { + fmt.Printf("channel slow ratio = %.2f \n", float64(slower)*100.0/float64(goSize*sizePerGo)) + fmt.Printf("channel quick ratio = %.2f \n", float64(goSize*sizePerGo-int(slower))*100.0/float64(goSize*sizePerGo)) + fmt.Printf("channel [<1us][%d] \n", uint64(goSize*sizePerGo)-slower) + fmt.Printf("channel [1-10us][%d] \n", t1_10us) + fmt.Printf("channel [10-100us][%d] \n", t10_100us) + fmt.Printf("channel [100-1000us][%d] \n", t100_1000us) + fmt.Printf("channel [1-10ms][%d] \n", t1_10ms) + fmt.Printf("channel [10-100ms][%d] \n", t10_100ms) + fmt.Printf("channel [>100ms][%d] \n", t100_ms) + } +} + +type longEventHandler[T uint64] struct { +} + +func (h *longEventHandler[T]) OnEvent(v uint64) { + if v%10000000 == 0 { + fmt.Println("lockfree [", v, "]") + } +} diff --git a/common/utils/lockfree-1.1.3/example/simple/main.go b/common/utils/lockfree-1.1.3/example/simple/main.go new file mode 100644 index 00000000..41532162 --- /dev/null +++ b/common/utils/lockfree-1.1.3/example/simple/main.go @@ -0,0 +1,99 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package main + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/bruceshao/lockfree" +) + +var ( + goSize = 10000 + sizePerGo = 10000 + + total = goSize * sizePerGo +) + +func main() { + // lockfree计时 + now := time.Now() + + // 创建事件处理器 + handler := &eventHandler[uint64]{ + signal: make(chan struct{}, 0), + now: now, + } + + // 创建消费端串行处理的Lockfree + lf := lockfree.NewLockfree[uint64]( + 1024*1024, + handler, + lockfree.NewSleepBlockStrategy(time.Millisecond), + ) + + // 启动Lockfree + if err := lf.Start(); err != nil { + panic(err) + } + + // 获取生产者对象 + producer := lf.Producer() + + // 并发写入 + var wg sync.WaitGroup + wg.Add(goSize) + for i := 0; i < goSize; i++ { + go func(start int) { + for j := 0; j < sizePerGo; j++ { + err := producer.Write(uint64(start*sizePerGo + j + 1)) + if err != nil { + panic(err) + } + } + wg.Done() + }(i) + } + + // wait for producer + wg.Wait() + + fmt.Printf("producer has been writed, write count: %v, time cost: %v \n", total, time.Since(now).String()) + + // wait for consumer + handler.wait() + + // 关闭Lockfree + lf.Close() +} + +type eventHandler[T uint64] struct { + signal chan struct{} + gcounter uint64 + now time.Time +} + +func (h *eventHandler[T]) OnEvent(v uint64) { + cur := atomic.AddUint64(&h.gcounter, 1) + if cur == uint64(total) { + fmt.Printf("eventHandler has been consumed already, read count: %v, time cose: %v\n", total, time.Since(h.now)) + close(h.signal) + return + } + + if cur%10000000 == 0 { + fmt.Printf("eventHandler consume %v\n", cur) + } +} + +func (h *eventHandler[T]) wait() { + <-h.signal +} diff --git a/common/utils/lockfree-1.1.3/go.mod b/common/utils/lockfree-1.1.3/go.mod new file mode 100644 index 00000000..dc048887 --- /dev/null +++ b/common/utils/lockfree-1.1.3/go.mod @@ -0,0 +1,10 @@ +module github.com/bruceshao/lockfree + +go 1.18 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.8.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/common/utils/lockfree-1.1.3/go.sum b/common/utils/lockfree-1.1.3/go.sum new file mode 100644 index 00000000..c789e2f0 --- /dev/null +++ b/common/utils/lockfree-1.1.3/go.sum @@ -0,0 +1,24 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M= +github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/common/utils/lockfree-1.1.3/handler.go b/common/utils/lockfree-1.1.3/handler.go new file mode 100644 index 00000000..01a54389 --- /dev/null +++ b/common/utils/lockfree-1.1.3/handler.go @@ -0,0 +1,16 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +// EventHandler 事件处理器接口 +// 整个无锁队列中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理 +// 使用泛型,通过编译阶段确定事件类型,提高性能 +type EventHandler[T any] interface { + // OnEvent 用户侧实现,事件处理方法 + OnEvent(t T) +} diff --git a/common/utils/lockfree-1.1.3/images/bitmap.jpg b/common/utils/lockfree-1.1.3/images/bitmap.jpg new file mode 100644 index 00000000..6b55ba19 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/bitmap.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/chan结构.jpg b/common/utils/lockfree-1.1.3/images/chan结构.jpg new file mode 100644 index 00000000..0eb2dd37 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/chan结构.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/futex.jpg b/common/utils/lockfree-1.1.3/images/futex.jpg new file mode 100644 index 00000000..f8bf40da Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/futex.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/lockfree.jpg b/common/utils/lockfree-1.1.3/images/lockfree.jpg new file mode 100644 index 00000000..4334f645 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/lockfree.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/padding1.jpg b/common/utils/lockfree-1.1.3/images/padding1.jpg new file mode 100644 index 00000000..5db019f7 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/padding1.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/padding2.jpg b/common/utils/lockfree-1.1.3/images/padding2.jpg new file mode 100644 index 00000000..08149183 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/padding2.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/padding3.jpg b/common/utils/lockfree-1.1.3/images/padding3.jpg new file mode 100644 index 00000000..cf97fc69 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/padding3.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/pointer.jpg b/common/utils/lockfree-1.1.3/images/pointer.jpg new file mode 100644 index 00000000..838048ee Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/pointer.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/read.jpg b/common/utils/lockfree-1.1.3/images/read.jpg new file mode 100644 index 00000000..ff086453 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/read.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/sudog.jpg b/common/utils/lockfree-1.1.3/images/sudog.jpg new file mode 100644 index 00000000..e4022da6 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/sudog.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/time.jpg b/common/utils/lockfree-1.1.3/images/time.jpg new file mode 100644 index 00000000..c4bf9844 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/time.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/total.jpg b/common/utils/lockfree-1.1.3/images/total.jpg new file mode 100644 index 00000000..94df8e3d Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/total.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/write.jpg b/common/utils/lockfree-1.1.3/images/write.jpg new file mode 100644 index 00000000..2aae35d7 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/write.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/放大.jpg b/common/utils/lockfree-1.1.3/images/放大.jpg new file mode 100644 index 00000000..6074a493 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/放大.jpg differ diff --git a/common/utils/lockfree-1.1.3/images/锁.jpg b/common/utils/lockfree-1.1.3/images/锁.jpg new file mode 100644 index 00000000..a726cba4 Binary files /dev/null and b/common/utils/lockfree-1.1.3/images/锁.jpg differ diff --git a/common/utils/lockfree-1.1.3/lockfree.go b/common/utils/lockfree-1.1.3/lockfree.go new file mode 100644 index 00000000..3ca1045a --- /dev/null +++ b/common/utils/lockfree-1.1.3/lockfree.go @@ -0,0 +1,85 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "fmt" + "sync/atomic" +) + +// Lockfree 包装类,内部包装了生产者和消费者 +type Lockfree[T any] struct { + writer *Producer[T] + consumer *consumer[T] + status int32 +} + +// NewLockfree 自定义创建消费端的Disruptor +// capacity:buffer的容量大小,类似于chan的大小,但要求必须是2^n,即2的指数倍,如果不是的话会被修改 +// handler:消费端的事件处理器 +// blocks:读取阻塞时的处理策略 +func NewLockfree[T any](capacity int, handler EventHandler[T], blocks blockStrategy) *Lockfree[T] { + // 重新计算正确的容量 + capacity = minSuitableCap(capacity) + seqer := newSequencer(capacity) + rbuf := newRingBuffer[T](capacity) + cmer := newConsumer[T](rbuf, handler, seqer, blocks) + writer := newProducer[T](seqer, rbuf, blocks) + return &Lockfree[T]{ + writer: writer, + consumer: cmer, + status: READY, + } +} + +func (d *Lockfree[T]) Start() error { + if atomic.CompareAndSwapInt32(&d.status, READY, RUNNING) { + // 启动消费者 + if err := d.consumer.start(); err != nil { + // 恢复现场 + atomic.CompareAndSwapInt32(&d.status, RUNNING, READY) + return err + } + // 启动生产者 + if err := d.writer.start(); err != nil { + // 恢复现场 + atomic.CompareAndSwapInt32(&d.status, RUNNING, READY) + return err + } + return nil + } + return fmt.Errorf(StartErrorFormat, "Disruptor") +} + +func (d *Lockfree[T]) Producer() *Producer[T] { + return d.writer +} + +func (d *Lockfree[T]) Running() bool { + return d.status == RUNNING +} + +func (d *Lockfree[T]) Close() error { + if atomic.CompareAndSwapInt32(&d.status, RUNNING, READY) { + // 关闭生产者 + if err := d.writer.close(); err != nil { + // 恢复现场 + atomic.CompareAndSwapInt32(&d.status, READY, RUNNING) + return err + } + // 关闭消费者 + if err := d.consumer.close(); err != nil { + // 恢复现场 + atomic.CompareAndSwapInt32(&d.status, READY, RUNNING) + return err + } + // 关闭成功 + return nil + } + return fmt.Errorf(CloseErrorFormat, "Disruptor") +} diff --git a/common/utils/lockfree-1.1.3/lockfree_test.go b/common/utils/lockfree-1.1.3/lockfree_test.go new file mode 100644 index 00000000..96189214 --- /dev/null +++ b/common/utils/lockfree-1.1.3/lockfree_test.go @@ -0,0 +1,104 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func BenchmarkLockFree(b *testing.B) { + var ( + counter = uint64(0) + ) + eh := &longEventHandler[uint64]{} + disruptor := NewLockfree[uint64](1024*1024, eh, &SleepBlockStrategy{ + t: time.Microsecond, + }) + disruptor.Start() + producer := disruptor.Producer() + var wg sync.WaitGroup + wg.Add(b.N) + b.ResetTimer() + for i := 0; i < b.N; i++ { + go func() { + for j := 0; j < SchPerGo; j++ { + x := atomic.AddUint64(&counter, 1) + err := producer.Write(x) + if err != nil { + panic(err) + } + } + wg.Done() + }() + } + wg.Wait() + b.StopTimer() + time.Sleep(time.Second * 1) + disruptor.Close() +} + +func TestChanBlockStrategy(t *testing.T) { + var ( + counter = uint64(0) + goS = 1000 + perGo = 100 + ) + eh := &longEventHandler[uint64]{} + disruptor := NewLockfree[uint64](2, eh, NewChanBlockStrategy()) + disruptor.Start() + producer := disruptor.Producer() + var wg sync.WaitGroup + wg.Add(goS) + for i := 0; i < goS; i++ { + go func() { + for j := 0; j < perGo; j++ { + x := atomic.AddUint64(&counter, 1) + err := producer.Write(x) + if err != nil { + panic(err) + } + } + wg.Done() + }() + } + wg.Wait() + time.Sleep(time.Second * 1) + disruptor.Close() +} + +func TestCondBlockStrategy(t *testing.T) { + var ( + counter = uint64(0) + goS = 1000 + perGo = 100 + ) + eh := &longEventHandler[uint64]{} + disruptor := NewLockfree[uint64](2, eh, NewConditionBlockStrategy()) + disruptor.Start() + producer := disruptor.Producer() + var wg sync.WaitGroup + wg.Add(goS) + for i := 0; i < goS; i++ { + go func() { + for j := 0; j < perGo; j++ { + x := atomic.AddUint64(&counter, 1) + err := producer.Write(x) + if err != nil { + panic(err) + } + } + wg.Done() + }() + } + wg.Wait() + time.Sleep(time.Second * 1) + disruptor.Close() +} \ No newline at end of file diff --git a/common/utils/lockfree-1.1.3/padding_test.go b/common/utils/lockfree-1.1.3/padding_test.go new file mode 100644 index 00000000..3ee9e341 --- /dev/null +++ b/common/utils/lockfree-1.1.3/padding_test.go @@ -0,0 +1,338 @@ +package lockfree + +import ( + "runtime" + "sync" + "testing" +) + +type foo struct { + x, y, z int64 +} +type foo64Start struct { + _ [64]byte + x, y, z int64 +} +type foo64StartEnd struct { + _ [64]byte + x, y, z int64 + _ [64]byte +} +type foo128Start struct { + _ [128]byte + x, y, z int64 +} +type foo128StartEnd struct { + _ [128]byte + x, y, z int64 + _ [128]byte +} +type foo64StartEndAligned struct { + _ [64]byte + x, y, z int64 + _ [64 - 24]byte +} +type foo128StartEndAligned struct { + _ [128]byte + x, y, z int64 + _ [128 - 24]byte +} + +const iter = (1 << 16) + +func BenchmarkFalseSharing(b *testing.B) { + var wg sync.WaitGroup + b.Run("NoPad", func(b *testing.B) { + arr := make([]foo, runtime.GOMAXPROCS(0)) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arr { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[i].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run("Pad64Start", func(b *testing.B) { + arr := make([]foo64Start, runtime.GOMAXPROCS(0)) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arr { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[i].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run("Pad64StartEnd", func(b *testing.B) { + arr := make([]foo64StartEnd, runtime.GOMAXPROCS(0)) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arr { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[i].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run("Pad128Start", func(b *testing.B) { + arr := make([]foo128Start, runtime.GOMAXPROCS(0)) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arr { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[i].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run("Pad128StartEnd", func(b *testing.B) { + arr := make([]foo128StartEnd, runtime.GOMAXPROCS(0)) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arr { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[i].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run("Pad64StartEndAligned", func(b *testing.B) { + arr := make([]foo64StartEndAligned, runtime.GOMAXPROCS(0)) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arr { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[i].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run("Pad128StartEndAligned", func(b *testing.B) { + arr := make([]foo128StartEndAligned, runtime.GOMAXPROCS(0)) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arr { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[i].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) +} +func BenchmarkTrueSharing(b *testing.B) { + var wg sync.WaitGroup + b.Run("<64", func(b *testing.B) { + arr := make([]foo, runtime.GOMAXPROCS(0)*iter) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arrChan { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[(i*iter)+j].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run(">64", func(b *testing.B) { + arr := make([]foo64Start, runtime.GOMAXPROCS(0)*iter) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arrChan { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[(i*iter)+j].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) + b.Run(">128", func(b *testing.B) { + arr := make([]foo128Start, runtime.GOMAXPROCS(0)*iter) + arrChan := make([]chan struct{}, runtime.GOMAXPROCS(0)) + for i := range arrChan { + arrChan[i] = make(chan struct{}) + } + for i := range arrChan { + go func(i int) { + for range arrChan[i] { + for j := 0; j < iter; j++ { + arr[(i*iter)+j].x++ + } + wg.Done() + } + }(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(runtime.GOMAXPROCS(0)) + for j := range arrChan { + arrChan[j] <- struct{}{} + } + wg.Wait() + } + b.StopTimer() + for i := range arrChan { + close(arrChan[i]) + } + }) +} diff --git a/common/utils/lockfree-1.1.3/producer.go b/common/utils/lockfree-1.1.3/producer.go new file mode 100644 index 00000000..dc3dae32 --- /dev/null +++ b/common/utils/lockfree-1.1.3/producer.go @@ -0,0 +1,159 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "fmt" + "runtime" + "sync/atomic" + "time" +) + +// Producer 生产者 +// 核心方法是Write,通过调用Write方法可以将对象写入到队列中 +type Producer[T any] struct { + seqer *sequencer + rbuf *ringBuffer[T] + blocks blockStrategy + capacity uint64 + status int32 +} + +func newProducer[T any](seqer *sequencer, rbuf *ringBuffer[T], blocks blockStrategy) *Producer[T] { + return &Producer[T]{ + seqer: seqer, + rbuf: rbuf, + blocks: blocks, + capacity: rbuf.cap(), + status: READY, + } +} + +func (q *Producer[T]) start() error { + if atomic.CompareAndSwapInt32(&q.status, READY, RUNNING) { + return nil + } + return fmt.Errorf(StartErrorFormat, "Producer") +} + +// Write 对象写入核心逻辑 +// 首先会从序号产生器中获取一个序号,该序号由atomic自增,不会重复; +// 然后通过&运算获取该序号应该放入的位置pos; +// 通过循环的方式,判断对应pos位置是否可以写入内容,这个判断是通过available数组判断的; +// 如果无法写入则持续循环等待,直到可以写入为止,此处基于一种思想即写入的实时性,写入操作不需要等太久,因此此处是没有阻塞的, +// 仅仅通过调度让出的方式,进行一部分cpu让渡,防止持续占用cpu资源 +// 获取到写入资格后将内容写入到ringbuffer,同时更新available数组,并且调用release,以便于释放消费端的阻塞等待 +func (q *Producer[T]) Write(v T) error { + if q.closed() { + return ClosedError + } + next := q.seqer.wc.increment() + for { + // 判断是否可以写入 + r := atomic.LoadUint64(&q.seqer.rc) - 1 + if next <= r+q.capacity { + // 可以写入数据,将数据写入到指定位置 + q.rbuf.write(next-1, v) + // 释放,防止消费端阻塞 + q.blocks.release() + return nil + } + runtime.Gosched() + // 再次判断是否已关闭 + if q.closed() { + return ClosedError + } + } +} + +// WriteWindow 写入窗口 +// 描述当前可写入的状态,如果不能写入则返回零值,如果可以写入则返回写入窗口大小 +// 由于执行时不加锁,所以该结果是不可靠的,仅用于在并发环境很高的情况下,进行丢弃行为 +func (q *Producer[T]) WriteWindow() int { + next := q.seqer.wc.atomicLoad() + 1 + r := atomic.LoadUint64(&q.seqer.rc) + if next < r+q.capacity { + return int(r + q.capacity - next) + } + return -int(next - (r + q.capacity)) +} + +// WriteTimeout 在写入的基础上设定一个时间,如果时间到了仍然没有写入则会放弃本次写入,返回写入的位置和false +// 使用方需要调用 WriteByCursor 来继续写入该位置,因为位置已经被占用,是必须要写的,不能跳跃性写入 +// 在指定时间内写入成功会返回true +// 三个返回项:写入位置、是否写入成功及是否有error +func (q *Producer[T]) WriteTimeout(v T, timeout time.Duration) (uint64, bool, error) { + if q.closed() { + return 0, false, ClosedError + } + next := q.seqer.wc.increment() + + // 先尝试写数据 (failfast) + ok := q.writeByCursor(v, next) + if ok { + return next, true, nil + } + + // 创建定时器 + waiter := time.NewTimer(timeout) + defer waiter.Stop() + + for { + select { + case <-waiter.C: + // 超时触发,执行到此处表示未写入,返回对应结果即可 + return next, false, nil + default: + ok = q.writeByCursor(v, next) + if ok { + return next, true, nil + } + runtime.Gosched() + } + + if q.closed() { + return 0, false, ClosedError + } + } +} + +// WriteByCursor 根据游标写入内容,wc是调用 WriteTimeout 方法返回false时对应的写入位置, +// 该位置有严格的含义,不要随意填值,否则会造成整个队列异常 +// 函数返回值:是否写入成功和是否存在error,若返回false表示写入失败,可以继续调用重复写入 +func (q *Producer[T]) WriteByCursor(v T, wc uint64) (bool, error) { + if q.closed() { + return false, ClosedError + } + + return q.writeByCursor(v, wc), nil +} + +func (q *Producer[T]) writeByCursor(v T, wc uint64) bool { + // 判断是否可以写入 + r := atomic.LoadUint64(&q.seqer.rc) - 1 + if wc <= r+q.capacity { + // 可以写入数据,将数据写入到指定位置 + q.rbuf.write(wc-1, v) + // 释放,防止消费端阻塞 + q.blocks.release() + // 返回写入成功标识 + return true + } + return false +} + +func (q *Producer[T]) close() error { + if atomic.CompareAndSwapInt32(&q.status, RUNNING, READY) { + return nil + } + return fmt.Errorf(CloseErrorFormat, "Producer") +} + +func (q *Producer[T]) closed() bool { + return atomic.LoadInt32(&q.status) == READY +} diff --git a/common/utils/lockfree-1.1.3/producer_test.go b/common/utils/lockfree-1.1.3/producer_test.go new file mode 100644 index 00000000..de8e2df4 --- /dev/null +++ b/common/utils/lockfree-1.1.3/producer_test.go @@ -0,0 +1,223 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "fmt" + "math/rand" + "os" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + "unsafe" +) + +const ( + GoSize = 5000 + SchPerGo = 10000 +) + +type longEventHandler[T uint64] struct { + count int32 + ts time.Time +} + +func (h *longEventHandler[T]) OnEvent(v uint64) { + atomic.AddInt32(&h.count, 1) + if h.count == 1 { + h.ts = time.Now() + } + fmt.Println("consumer ", v) + //if v%1000000 == 0 { + // fmt.Printf("read %d\n", v) + //} + if h.count == GoSize*SchPerGo { + tl := time.Since(h.ts) + fmt.Printf("read time = %d ms\n", tl.Milliseconds()) + } +} + +func TestAA(t *testing.T) { + var ( + t1_10us = uint64(0) // 1-10微秒 + t10_100us = uint64(0) // 10-100微秒 + t100_1000us = uint64(0) // 100-1000微秒 + t1_10ms = uint64(0) // 1-10毫秒 + t10_100ms = uint64(0) // 10-100毫秒 + t100_ms = uint64(0) // 大于100毫秒 + slower = uint64(0) + counter = uint64(0) + ) + eh := &longEventHandler[uint64]{} + //queue, err := NewProducer[uint64](1024*1024, 1, eh, &SleepWaitStrategy{ + // t: time.Nanosecond * 1, + //}) + disruptor := NewLockfree[uint64](1024*1024*128, eh, + NewSleepBlockStrategy(time.Microsecond)) + disruptor.Start() + producer := disruptor.Producer() + var wg sync.WaitGroup + wg.Add(GoSize) + totalS := time.Now() + for i := 0; i < GoSize; i++ { + go func() { + for j := 0; j < SchPerGo; j++ { + x := atomic.AddUint64(&counter, 1) + ts := time.Now() + err := producer.Write(x) + if err != nil { + panic(err) + } + tl := time.Since(ts) + ms := tl.Microseconds() + if ms > 1 { + atomic.AddUint64(&slower, 1) + if ms < 10 { // t1_10us + atomic.AddUint64(&t1_10us, 1) + } else if ms < 100 { + atomic.AddUint64(&t10_100us, 1) + } else if ms < 1000 { + atomic.AddUint64(&t100_1000us, 1) + } else if ms < 10000 { + atomic.AddUint64(&t1_10ms, 1) + } else if ms < 100000 { + atomic.AddUint64(&t10_100ms, 1) + } else { + atomic.AddUint64(&t100_ms, 1) + } + } + } + wg.Done() + }() + } + wg.Wait() + totalL := time.Since(totalS) + fmt.Printf("write total time = [%d ms]\n", totalL.Milliseconds()) + fmt.Println("----- write complete -----") + time.Sleep(time.Second * 3) + disruptor.Close() + fmt.Printf("slow ratio = %.2f \n", float64(slower)*100.0/float64(counter)) + fmt.Printf("quick ratio = %.2f \n", float64(counter-slower)*100.0/float64(counter)) + fmt.Printf("[<1us][%d] \n", counter-slower) + fmt.Printf("[1-10us][%d] \n", t1_10us) + fmt.Printf("[10-100us][%d] \n", t10_100us) + fmt.Printf("[100-1000us][%d] \n", t100_1000us) + fmt.Printf("[1-10ms][%d] \n", t1_10ms) + fmt.Printf("[10-100ms][%d] \n", t10_100ms) + fmt.Printf("[>100ms][%d] \n", t100_ms) +} + +func TestB(t *testing.T) { + var x = uint64(100) + typeOf := reflect.TypeOf(x) + fmt.Println(typeOf) + fmt.Println(os.Getpagesize()) +} + +func TestC(t *testing.T) { + x := 128 - unsafe.Sizeof(uint64(0))%128 + fmt.Println(x) +} + +func TestProducer_WriteWindow(t *testing.T) { + eh := &sleepEventHandler[uint64]{ + sm: time.Second, + } + disruptor := NewLockfree[uint64](1, eh, + NewSleepBlockStrategy(time.Microsecond)) + disruptor.Start() + producer := disruptor.Producer() + // 写入10个数:0-9 + // 预期结果,0可以写入,写入后在1ms内被取走,此时0会在等待1s后被打印,但由于ringbuffer有空位,所以1可以被写入 + // 1写入后一直无法被取走,因为在等待1s内0的打印,后续其他值均无法被写入,因为1导致ringbuffer满了 + for i := 0; i < 10; i++ { + ww := producer.WriteWindow() + if ww <= 0 { + // 表示不能写入,丢弃 + fmt.Println("discard ", i , " window ", ww) + } else { + // 实际写入 + producer.Write(uint64(i)) + } + // 为了给予consumer时间取走 + time.Sleep(time.Millisecond) + } + // 为了可以看到打印的结果 + time.Sleep(3 * time.Second) + disruptor.Close() +} + +func TestWriteTimeout(t *testing.T) { + var counter = uint64(0) + // 写入超时,如何使用 + eh := &longSleepEventHandler[uint64]{} + disruptor := NewLockfree[uint64](2, eh, NewSleepBlockStrategy(time.Microsecond)) + disruptor.Start() + producer := disruptor.Producer() + // 假设有10个写g + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + for j := 0; j < 100; j++ { + v := atomic.AddUint64(&counter, 1) + wc, exist, err := producer.WriteTimeout(v, time.Millisecond) + if err != nil { + return + } + if !exist { + // 重复写入1次,写入不成功则丢弃重新写其他的 + if ok, _ := producer.WriteByCursor(v, wc); ok { + continue + } + fmt.Println("discard ", v) + // 重新生成值,一直等待写入 + v = atomic.AddUint64(&counter, 1) + for { + if ok, _ := producer.WriteByCursor(v, wc); ok { + fmt.Println("write ", v, " with x times") + break + } + // 写入不成功则休眠,防止CPU暴增 + time.Sleep(100 * time.Microsecond) + } + } else { + fmt.Println("write ", v, " with 1 time") + } + } + wg.Done() + }() + } + wg.Wait() + time.Sleep(3 * time.Second) + disruptor.Close() +} + + +// sleepEventHandler 休眠性质的事件处理器 +type sleepEventHandler[T uint64] struct { + sm time.Duration +} + +func (h *sleepEventHandler[T]) OnEvent(v uint64) { + time.Sleep(h.sm) + fmt.Println("consumer ", v) +} + +type longSleepEventHandler[T uint64] struct { + count int32 +} + +func (h *longSleepEventHandler[T]) OnEvent(v uint64) { + // 每次处理都会进行随机休眠,可以导致消费端变慢 + intn := rand.Intn(1000) + time.Sleep(time.Duration(intn * 1000)) + fmt.Println("consumer count ", atomic.AddInt32(&h.count, 1)) +} \ No newline at end of file diff --git a/common/utils/lockfree-1.1.3/sequencer.go b/common/utils/lockfree-1.1.3/sequencer.go new file mode 100644 index 00000000..d9f18b5b --- /dev/null +++ b/common/utils/lockfree-1.1.3/sequencer.go @@ -0,0 +1,38 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "sync/atomic" +) + +// sequencer 序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护。 +// 读取状态由自身维护,变量read即可 +type sequencer struct { + rc uint64 // 读取游标,因为该值仅会被一个g修改,所以不需要使用cursor + capacity uint64 + wc *cursor +} + +func newSequencer(capacity int) *sequencer { + return &sequencer{ + wc: newCursor(), + rc: 1, + capacity: uint64(capacity), + } +} + +// nextRead 获取下个要读取的位置 +// 使用原子操作解决data race问题 +func (s *sequencer) nextRead() uint64 { + return atomic.LoadUint64(&s.rc) +} + +func (s *sequencer) readIncrement() uint64 { + return atomic.AddUint64(&s.rc, 1) +} diff --git a/common/utils/lockfree-1.1.3/util.go b/common/utils/lockfree-1.1.3/util.go new file mode 100644 index 00000000..c9ab309d --- /dev/null +++ b/common/utils/lockfree-1.1.3/util.go @@ -0,0 +1,81 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "errors" + "reflect" + "runtime" + "unsafe" +) + +const ( + activeSpin = 4 + passiveSpin = 2 + READY = 0 // 模块的状态之就绪态 + RUNNING = 1 // 模块的状态之运行态 + StartErrorFormat = "start model [%s] error" + CloseErrorFormat = "close model [%s] error" +) + +var ( + ncpu = runtime.NumCPU() + spin = 0 + ClosedError = errors.New("the queue has been closed") +) + +func init() { + if ncpu > 1 { + spin = activeSpin + } +} + +//go:linkname procyield runtime.procyield +func procyield(cycles uint32) + +// //go:linkname osyield runtime.osyield +// func osyield() +func osyield() { + runtime.Gosched() // 主动放弃CPU,让渡给其他goroutine,兼容所有Go版本 +} + +// byteArrayPointerWithUint8 创建uint8切片,返回其对应实际内容(Data)的指针 +func byteArrayPointerWithUint8(capacity int) unsafe.Pointer { + bytes := make([]uint8, capacity) + rs := (*reflect.SliceHeader)(unsafe.Pointer(&bytes)) + return unsafe.Pointer(rs.Data) +} + +// byteArrayPointer 创建uint32切片,返回其对应实际内容(Data)的指针 +func byteArrayPointerWithUint32(capacity int) unsafe.Pointer { + bytes := make([]uint32, capacity) + rs := (*reflect.SliceHeader)(unsafe.Pointer(&bytes)) + return unsafe.Pointer(rs.Data) +} + +// byteArrayPointer 创建int64切片,返回其对应实际内容(Data)的指针 +func byteArrayPointerWithInt64(capacity int) unsafe.Pointer { + bytes := make([]int64, capacity) + rs := (*reflect.SliceHeader)(unsafe.Pointer(&bytes)) + return unsafe.Pointer(rs.Data) +} + +// minSuitableCap 最小的合适的数量 +func minSuitableCap(v int) int { + if v <= 0 { + return 2 + } + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + return v +} diff --git a/common/utils/lockfree-1.1.3/util_test.go b/common/utils/lockfree-1.1.3/util_test.go new file mode 100644 index 00000000..72442f1a --- /dev/null +++ b/common/utils/lockfree-1.1.3/util_test.go @@ -0,0 +1,27 @@ +/* + * Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package lockfree + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMinSuitableCap(t *testing.T) { + x := minSuitableCap(-1) + assert.Equal(t, 2, x) + x = minSuitableCap(3) + assert.Equal(t, 4, x) + x = minSuitableCap(10) + assert.Equal(t, 16, x) + x = minSuitableCap(1023) + assert.Equal(t, 1024, x) + x = minSuitableCap(16) + assert.Equal(t, 16, x) +} diff --git a/go.work b/go.work index 51ee8658..ee0ca976 100644 --- a/go.work +++ b/go.work @@ -14,6 +14,7 @@ use ( ./common/utils/go-sensitive-word-1.3.3 ./common/utils/goja ./common/utils/limit + ./common/utils/lockfree-1.1.3 ./common/utils/log ./common/utils/qqwry ./common/utils/sturc