重构
This commit is contained in:
@@ -124,6 +124,36 @@ func (b *Topic[T]) PubAsync(event T) {
|
||||
b.rwMu.RUnlock()
|
||||
}
|
||||
|
||||
func (b *Topic[T]) PubAsyncCallBack(event T, fn func()) {
|
||||
// 1. 先获取当前所有订阅者的回调(缩短读锁持有时间)
|
||||
b.rwMu.RLock()
|
||||
// 复制回调到临时切片,避免持有锁期间阻塞写操作
|
||||
callbacks := make([]func(T), 0, len(b.subs))
|
||||
for _, sub := range b.subs {
|
||||
callbacks = append(callbacks, sub.callback)
|
||||
}
|
||||
b.rwMu.RUnlock() // 立即释放读锁
|
||||
|
||||
// 2. 等待所有回调执行完成
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(callbacks)) // 声明需要等待的任务数量
|
||||
|
||||
for _, cb := range callbacks {
|
||||
// 显式传递回调和事件,避免循环变量捕获问题
|
||||
go func(callback func(T), e T) {
|
||||
defer wg.Done() // 任务完成后通知WaitGroup
|
||||
callback(e) // 执行订阅者回调
|
||||
}(cb, event)
|
||||
}
|
||||
|
||||
wg.Wait() // 阻塞等待所有回调执行完毕
|
||||
|
||||
// 3. 所有回调完成后,执行传入的fn(如果非nil)
|
||||
if fn != nil {
|
||||
fn()
|
||||
}
|
||||
}
|
||||
|
||||
// Bus is being returned when you subscribe, so you can manually Cancel
|
||||
type Bus[T any] struct {
|
||||
listener *Listener[T]
|
||||
|
||||
Reference in New Issue
Block a user