Files
bl/common/utils/lockfree-1.1.3/consumer.go

91 lines
1.9 KiB
Go
Raw Permalink Normal View History

/*
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package lockfree
import (
"fmt"
"runtime"
"sync/atomic"
)
// consumer 消费者这个消费者只会有一个g操作这样处理的好处是可以不涉及并发操作其内部不会涉及到任何锁
// 对于实际的并发操作由该g进行分配
type consumer[T any] struct {
status int32 // 运行状态
rbuf *ringBuffer[T]
seqer *sequencer
blocks blockStrategy
hdl EventHandler[T]
}
func newConsumer[T any](rbuf *ringBuffer[T], hdl EventHandler[T], sequer *sequencer, blocks blockStrategy) *consumer[T] {
return &consumer[T]{
rbuf: rbuf,
seqer: sequer,
hdl: hdl,
blocks: blocks,
status: READY,
}
}
func (c *consumer[T]) start() error {
if atomic.CompareAndSwapInt32(&c.status, READY, RUNNING) {
go c.handle()
return nil
}
return fmt.Errorf(StartErrorFormat, "Consumer")
}
func (c *consumer[T]) handle() {
// 判断是否可以获取到
rc := c.seqer.nextRead()
for {
if c.closed() {
return
}
var i = 0
for {
if c.closed() {
return
}
// 看下读取位置的seq是否OK
if v, p, exist := c.rbuf.contains(rc - 1); exist {
rc = c.seqer.readIncrement()
c.hdl.OnEvent(v)
i = 0
break
} else {
if i < spin {
procyield(30)
} else if i < spin+passiveSpin {
runtime.Gosched()
} else {
c.blocks.block(p, rc)
i = 0
}
i++
}
}
}
}
func (c *consumer[T]) close() error {
if atomic.CompareAndSwapInt32(&c.status, RUNNING, READY) {
// 防止阻塞无法释放
c.blocks.release()
return nil
}
return fmt.Errorf(CloseErrorFormat, "Consumer")
}
// closed 判断是否已关闭
// 将直接判断调整为原子操作解决data race问题
func (c *consumer[T]) closed() bool {
return atomic.LoadInt32(&c.status) == READY
}