91 lines
1.9 KiB
Go
91 lines
1.9 KiB
Go
/*
|
||
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
|
||
*
|
||
* SPDX-License-Identifier: Apache-2.0
|
||
*
|
||
*/
|
||
|
||
package lockfree
|
||
|
||
import (
|
||
"fmt"
|
||
"runtime"
|
||
"sync/atomic"
|
||
)
|
||
|
||
// consumer 消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁
|
||
// 对于实际的并发操作由该g进行分配
|
||
type consumer[T any] struct {
|
||
status int32 // 运行状态
|
||
rbuf *ringBuffer[T]
|
||
seqer *sequencer
|
||
blocks blockStrategy
|
||
hdl EventHandler[T]
|
||
}
|
||
|
||
func newConsumer[T any](rbuf *ringBuffer[T], hdl EventHandler[T], sequer *sequencer, blocks blockStrategy) *consumer[T] {
|
||
return &consumer[T]{
|
||
rbuf: rbuf,
|
||
seqer: sequer,
|
||
hdl: hdl,
|
||
blocks: blocks,
|
||
status: READY,
|
||
}
|
||
}
|
||
|
||
func (c *consumer[T]) start() error {
|
||
if atomic.CompareAndSwapInt32(&c.status, READY, RUNNING) {
|
||
go c.handle()
|
||
return nil
|
||
}
|
||
return fmt.Errorf(StartErrorFormat, "Consumer")
|
||
}
|
||
|
||
func (c *consumer[T]) handle() {
|
||
// 判断是否可以获取到
|
||
rc := c.seqer.nextRead()
|
||
for {
|
||
if c.closed() {
|
||
return
|
||
}
|
||
var i = 0
|
||
for {
|
||
if c.closed() {
|
||
return
|
||
}
|
||
// 看下读取位置的seq是否OK
|
||
if v, p, exist := c.rbuf.contains(rc - 1); exist {
|
||
rc = c.seqer.readIncrement()
|
||
c.hdl.OnEvent(v)
|
||
i = 0
|
||
break
|
||
} else {
|
||
if i < spin {
|
||
procyield(30)
|
||
} else if i < spin+passiveSpin {
|
||
runtime.Gosched()
|
||
} else {
|
||
c.blocks.block(p, rc)
|
||
i = 0
|
||
}
|
||
i++
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (c *consumer[T]) close() error {
|
||
if atomic.CompareAndSwapInt32(&c.status, RUNNING, READY) {
|
||
// 防止阻塞无法释放
|
||
c.blocks.release()
|
||
return nil
|
||
}
|
||
return fmt.Errorf(CloseErrorFormat, "Consumer")
|
||
}
|
||
|
||
// closed 判断是否已关闭
|
||
// 将直接判断调整为原子操作,解决data race问题
|
||
func (c *consumer[T]) closed() bool {
|
||
return atomic.LoadInt32(&c.status) == READY
|
||
}
|