160 lines
4.6 KiB
Go
160 lines
4.6 KiB
Go
/*
|
||
* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved.
|
||
*
|
||
* SPDX-License-Identifier: Apache-2.0
|
||
*
|
||
*/
|
||
|
||
package lockfree
|
||
|
||
import (
|
||
"fmt"
|
||
"runtime"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// Producer 生产者
|
||
// 核心方法是Write,通过调用Write方法可以将对象写入到队列中
|
||
type Producer[T any] struct {
|
||
seqer *sequencer
|
||
rbuf *ringBuffer[T]
|
||
blocks blockStrategy
|
||
capacity uint64
|
||
status int32
|
||
}
|
||
|
||
func newProducer[T any](seqer *sequencer, rbuf *ringBuffer[T], blocks blockStrategy) *Producer[T] {
|
||
return &Producer[T]{
|
||
seqer: seqer,
|
||
rbuf: rbuf,
|
||
blocks: blocks,
|
||
capacity: rbuf.cap(),
|
||
status: READY,
|
||
}
|
||
}
|
||
|
||
func (q *Producer[T]) start() error {
|
||
if atomic.CompareAndSwapInt32(&q.status, READY, RUNNING) {
|
||
return nil
|
||
}
|
||
return fmt.Errorf(StartErrorFormat, "Producer")
|
||
}
|
||
|
||
// Write 对象写入核心逻辑
|
||
// 首先会从序号产生器中获取一个序号,该序号由atomic自增,不会重复;
|
||
// 然后通过&运算获取该序号应该放入的位置pos;
|
||
// 通过循环的方式,判断对应pos位置是否可以写入内容,这个判断是通过available数组判断的;
|
||
// 如果无法写入则持续循环等待,直到可以写入为止,此处基于一种思想即写入的实时性,写入操作不需要等太久,因此此处是没有阻塞的,
|
||
// 仅仅通过调度让出的方式,进行一部分cpu让渡,防止持续占用cpu资源
|
||
// 获取到写入资格后将内容写入到ringbuffer,同时更新available数组,并且调用release,以便于释放消费端的阻塞等待
|
||
func (q *Producer[T]) Write(v T) error {
|
||
if q.closed() {
|
||
return ClosedError
|
||
}
|
||
next := q.seqer.wc.increment()
|
||
for {
|
||
// 判断是否可以写入
|
||
r := atomic.LoadUint64(&q.seqer.rc) - 1
|
||
if next <= r+q.capacity {
|
||
// 可以写入数据,将数据写入到指定位置
|
||
q.rbuf.write(next-1, v)
|
||
// 释放,防止消费端阻塞
|
||
q.blocks.release()
|
||
return nil
|
||
}
|
||
runtime.Gosched()
|
||
// 再次判断是否已关闭
|
||
if q.closed() {
|
||
return ClosedError
|
||
}
|
||
}
|
||
}
|
||
|
||
// WriteWindow 写入窗口
|
||
// 描述当前可写入的状态,如果不能写入则返回零值,如果可以写入则返回写入窗口大小
|
||
// 由于执行时不加锁,所以该结果是不可靠的,仅用于在并发环境很高的情况下,进行丢弃行为
|
||
func (q *Producer[T]) WriteWindow() int {
|
||
next := q.seqer.wc.atomicLoad() + 1
|
||
r := atomic.LoadUint64(&q.seqer.rc)
|
||
if next < r+q.capacity {
|
||
return int(r + q.capacity - next)
|
||
}
|
||
return -int(next - (r + q.capacity))
|
||
}
|
||
|
||
// WriteTimeout 在写入的基础上设定一个时间,如果时间到了仍然没有写入则会放弃本次写入,返回写入的位置和false
|
||
// 使用方需要调用 WriteByCursor 来继续写入该位置,因为位置已经被占用,是必须要写的,不能跳跃性写入
|
||
// 在指定时间内写入成功会返回true
|
||
// 三个返回项:写入位置、是否写入成功及是否有error
|
||
func (q *Producer[T]) WriteTimeout(v T, timeout time.Duration) (uint64, bool, error) {
|
||
if q.closed() {
|
||
return 0, false, ClosedError
|
||
}
|
||
next := q.seqer.wc.increment()
|
||
|
||
// 先尝试写数据 (failfast)
|
||
ok := q.writeByCursor(v, next)
|
||
if ok {
|
||
return next, true, nil
|
||
}
|
||
|
||
// 创建定时器
|
||
waiter := time.NewTimer(timeout)
|
||
defer waiter.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-waiter.C:
|
||
// 超时触发,执行到此处表示未写入,返回对应结果即可
|
||
return next, false, nil
|
||
default:
|
||
ok = q.writeByCursor(v, next)
|
||
if ok {
|
||
return next, true, nil
|
||
}
|
||
runtime.Gosched()
|
||
}
|
||
|
||
if q.closed() {
|
||
return 0, false, ClosedError
|
||
}
|
||
}
|
||
}
|
||
|
||
// WriteByCursor 根据游标写入内容,wc是调用 WriteTimeout 方法返回false时对应的写入位置,
|
||
// 该位置有严格的含义,不要随意填值,否则会造成整个队列异常
|
||
// 函数返回值:是否写入成功和是否存在error,若返回false表示写入失败,可以继续调用重复写入
|
||
func (q *Producer[T]) WriteByCursor(v T, wc uint64) (bool, error) {
|
||
if q.closed() {
|
||
return false, ClosedError
|
||
}
|
||
|
||
return q.writeByCursor(v, wc), nil
|
||
}
|
||
|
||
func (q *Producer[T]) writeByCursor(v T, wc uint64) bool {
|
||
// 判断是否可以写入
|
||
r := atomic.LoadUint64(&q.seqer.rc) - 1
|
||
if wc <= r+q.capacity {
|
||
// 可以写入数据,将数据写入到指定位置
|
||
q.rbuf.write(wc-1, v)
|
||
// 释放,防止消费端阻塞
|
||
q.blocks.release()
|
||
// 返回写入成功标识
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
func (q *Producer[T]) close() error {
|
||
if atomic.CompareAndSwapInt32(&q.status, RUNNING, READY) {
|
||
return nil
|
||
}
|
||
return fmt.Errorf(CloseErrorFormat, "Producer")
|
||
}
|
||
|
||
func (q *Producer[T]) closed() bool {
|
||
return atomic.LoadInt32(&q.status) == READY
|
||
}
|