Files
bl/common/utils/lockfree-1.1.3/example/complex/main.go
昔念 269256a861 feat(common): 添加无锁并发工具包依赖
新增 lockfree-1.1.3 工具包到 go.work 文件中,为项目提供无锁并发数据结构支持,
提升高并发场景下的性能表现。
2025-12-05 00:36:28 +08:00

130 lines
3.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package main
import (
"fmt"
"github.com/bruceshao/lockfree"
"math/rand"
"sync"
"sync/atomic"
"time"
)
func main() {
fmt.Println("========== start write by discard ==========")
writeByDiscard()
fmt.Println("========== complete write by discard ==========")
fmt.Println("========== start write by cursor ==========")
writeByCursor()
fmt.Println("========== complete write by cursor ==========")
}
func writeByDiscard() {
var counter = uint64(0)
// 写入超时,如何使用
eh := &fixedSleepEventHandler[uint64]{
sm: time.Millisecond * 10,
}
disruptor := lockfree.NewLockfree[uint64](2, eh, lockfree.NewSleepBlockStrategy(time.Microsecond))
disruptor.Start()
producer := disruptor.Producer()
// 假设有100个写g
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := 0; j < 10; j++ {
v := atomic.AddUint64(&counter, 1)
ww := producer.WriteWindow()
if ww <= 0 {
// 表示无法写入,丢弃
fmt.Println("discard ", v)
continue
}
// 表示可以写入,写入即可
err := producer.Write(v)
if err != nil {
panic(err)
}
fmt.Println("write ", v)
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(3 * time.Second)
disruptor.Close()
}
func writeByCursor() {
var counter = uint64(0)
// 写入超时,如何使用
eh := &randomSleepEventHandler[uint64]{}
disruptor := lockfree.NewLockfree[uint64](2, eh, lockfree.NewSleepBlockStrategy(time.Microsecond))
disruptor.Start()
producer := disruptor.Producer()
// 假设有100个写g
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := 0; j < 10; j++ {
v := atomic.AddUint64(&counter, 1)
wc, exist, err := producer.WriteTimeout(v, time.Millisecond)
if err != nil {
return
}
if !exist {
// 重复写入1次写入不成功则丢弃重新写其他的
if ok, _ := producer.WriteByCursor(v, wc); ok {
continue
}
fmt.Println("discard ", v)
// 重新生成值,一直等待写入
v = atomic.AddUint64(&counter, 1)
for {
if ok, _ := producer.WriteByCursor(v, wc); ok {
fmt.Println("write ", v, " with x times")
break
}
// 写入不成功则休眠防止CPU暴增
time.Sleep(100 * time.Microsecond)
}
} else {
fmt.Println("write ", v, " with 1 time")
}
}
wg.Done()
}()
}
wg.Wait()
time.Sleep(3 * time.Second)
disruptor.Close()
}
type fixedSleepEventHandler[T uint64] struct {
sm time.Duration
}
func (h *fixedSleepEventHandler[T]) OnEvent(v uint64) {
time.Sleep(h.sm)
fmt.Println("consumer ", v)
}
type randomSleepEventHandler[T uint64] struct {
count int32
}
func (h *randomSleepEventHandler[T]) OnEvent(v uint64) {
// 每次处理都会进行随机休眠,可以导致消费端变慢
intn := rand.Intn(1000)
time.Sleep(time.Duration(intn * 1000))
fmt.Println("consumer count ", atomic.AddInt32(&h.count, 1))
}