Files
bl/common/utils/lockfree-1.1.3/consumer.go
昔念 269256a861 feat(common): 添加无锁并发工具包依赖
新增 lockfree-1.1.3 工具包到 go.work 文件中,为项目提供无锁并发数据结构支持,
提升高并发场景下的性能表现。
2025-12-05 00:36:28 +08:00

91 lines
1.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"runtime"
"sync/atomic"
)
// consumer 消费者这个消费者只会有一个g操作这样处理的好处是可以不涉及并发操作其内部不会涉及到任何锁
// 对于实际的并发操作由该g进行分配
type consumer[T any] struct {
status int32 // 运行状态
rbuf *ringBuffer[T]
seqer *sequencer
blocks blockStrategy
hdl EventHandler[T]
}
func newConsumer[T any](rbuf *ringBuffer[T], hdl EventHandler[T], sequer *sequencer, blocks blockStrategy) *consumer[T] {
return &consumer[T]{
rbuf: rbuf,
seqer: sequer,
hdl: hdl,
blocks: blocks,
status: READY,
}
}
func (c *consumer[T]) start() error {
if atomic.CompareAndSwapInt32(&c.status, READY, RUNNING) {
go c.handle()
return nil
}
return fmt.Errorf(StartErrorFormat, "Consumer")
}
func (c *consumer[T]) handle() {
// 判断是否可以获取到
rc := c.seqer.nextRead()
for {
if c.closed() {
return
}
var i = 0
for {
if c.closed() {
return
}
// 看下读取位置的seq是否OK
if v, p, exist := c.rbuf.contains(rc - 1); exist {
rc = c.seqer.readIncrement()
c.hdl.OnEvent(v)
i = 0
break
} else {
if i < spin {
procyield(30)
} else if i < spin+passiveSpin {
runtime.Gosched()
} else {
c.blocks.block(p, rc)
i = 0
}
i++
}
}
}
}
func (c *consumer[T]) close() error {
if atomic.CompareAndSwapInt32(&c.status, RUNNING, READY) {
// 防止阻塞无法释放
c.blocks.release()
return nil
}
return fmt.Errorf(CloseErrorFormat, "Consumer")
}
// closed 判断是否已关闭
// 将直接判断调整为原子操作解决data race问题
func (c *consumer[T]) closed() bool {
return atomic.LoadInt32(&c.status) == READY
}