1
All checks were successful
ci/woodpecker/push/my-first-workflow Pipeline was successful

This commit is contained in:
昔念
2026-02-20 13:29:43 +08:00
parent 53b18cfd0c
commit aeeac8b2ed

View File

@@ -1,6 +1,7 @@
package cool
import (
"context"
"time"
"github.com/gogf/gf/v2/errors/gerror"
@@ -65,39 +66,115 @@ func ClusterRunFunc(ctx g.Ctx, funcstring string) (err error) {
}
// ListenFunc 监听函数
// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活
func ListenFunc(ctx g.Ctx) {
if IsRedisMode {
if !IsRedisMode {
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
}
// 定义常量配置
const (
subscribeTopic = "cool:func" // 订阅的主题
retryDelay = 10 * time.Second // 连接失败重试间隔
heartbeatInterval = 30 * time.Second // 心跳保活间隔
)
// 外层循环:负责连接断开后的整体重连
for {
// 检查上下文是否已取消,优雅退出
select {
case <-ctx.Done():
Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听")
return
default:
}
// 1. 建立 Redis 连接
conn, err := g.Redis("cool").Conn(ctx)
if err != nil {
panic(err)
Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
time.Sleep(retryDelay)
continue
}
defer conn.Close(ctx)
_, err = conn.Do(ctx, "subscribe", "cool:func")
Logger.Info(ctx, "成功获取 Redis 连接")
// 2. 启动心跳保活协程,防止连接因空闲被断开
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer func() {
ticker.Stop()
heartbeatCancel()
}()
for {
select {
case <-heartbeatCtx.Done():
Logger.Info(ctx, "心跳协程退出")
return
case <-ticker.C:
// 发送 PING 心跳,保持连接活跃
_, pingErr := conn.Do(ctx, "PING")
if pingErr != nil {
Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr)
// 心跳失败时主动关闭连接,触发外层重连
_ = conn.Close(ctx)
return
}
Logger.Debug(ctx, "Redis 心跳发送成功,连接正常")
}
}
}()
// 3. 订阅主题
_, err = conn.Do(ctx, "subscribe", subscribeTopic)
if err != nil {
panic(err)
Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err)
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx)
time.Sleep(retryDelay)
continue
}
for {
Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
// 4. 循环接收消息
connError := false
for !connError {
select {
case <-ctx.Done():
connError = true
continue
default:
}
// 接收消息
data, err := conn.Receive(ctx)
if err != nil {
Logger.Error(ctx, err)
time.Sleep(10 * time.Second)
Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
connError = true // 标记连接错误,触发重连
continue
}
// 处理消息(保留原有业务逻辑)
if data != nil {
dataMap := data.MapStrStr()
if dataMap["Kind"] == "subscribe" {
continue
}
if dataMap["Channel"] == "cool:func" {
Logger.Debug(ctx, "执行函数", dataMap["Payload"])
if dataMap["Channel"] == subscribeTopic {
Logger.Debug(ctx, "执行函数", "payload", dataMap["Payload"])
err := RunFunc(ctx, dataMap["Payload"])
if err != nil {
Logger.Error(ctx, "执行函数失败", err)
Logger.Error(ctx, "执行函数失败", "payload", dataMap["Payload"], "error", err)
}
}
}
}
} else {
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
// 5. 清理资源,准备重连
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) // 关闭当前连接
// Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay)
time.Sleep(retryDelay)
}
}