diff --git a/common/cool/func.go b/common/cool/func.go index d93768151..681f7b629 100644 --- a/common/cool/func.go +++ b/common/cool/func.go @@ -1,6 +1,7 @@ package cool import ( + "context" "time" "github.com/gogf/gf/v2/errors/gerror" @@ -65,39 +66,115 @@ func ClusterRunFunc(ctx g.Ctx, funcstring string) (err error) { } // ListenFunc 监听函数 +// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活 func ListenFunc(ctx g.Ctx) { - if IsRedisMode { + if !IsRedisMode { + panic(gerror.New("集群模式下, 请使用Redis作为缓存")) + } + + // 定义常量配置 + const ( + subscribeTopic = "cool:func" // 订阅的主题 + retryDelay = 10 * time.Second // 连接失败重试间隔 + heartbeatInterval = 30 * time.Second // 心跳保活间隔 + ) + + // 外层循环:负责连接断开后的整体重连 + for { + // 检查上下文是否已取消,优雅退出 + select { + case <-ctx.Done(): + Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") + return + default: + } + + // 1. 建立 Redis 连接 conn, err := g.Redis("cool").Conn(ctx) if err != nil { - panic(err) + Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) + time.Sleep(retryDelay) + continue } - defer conn.Close(ctx) - _, err = conn.Do(ctx, "subscribe", "cool:func") + Logger.Info(ctx, "成功获取 Redis 连接") + + // 2. 启动心跳保活协程,防止连接因空闲被断开 + heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) + go func() { + ticker := time.NewTicker(heartbeatInterval) + defer func() { + ticker.Stop() + heartbeatCancel() + }() + + for { + select { + case <-heartbeatCtx.Done(): + Logger.Info(ctx, "心跳协程退出") + return + case <-ticker.C: + // 发送 PING 心跳,保持连接活跃 + _, pingErr := conn.Do(ctx, "PING") + if pingErr != nil { + Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr) + // 心跳失败时主动关闭连接,触发外层重连 + _ = conn.Close(ctx) + return + } + Logger.Debug(ctx, "Redis 心跳发送成功,连接正常") + } + } + }() + + // 3. 订阅主题 + _, err = conn.Do(ctx, "subscribe", subscribeTopic) if err != nil { - panic(err) + Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err) + heartbeatCancel() // 关闭心跳协程 + _ = conn.Close(ctx) + time.Sleep(retryDelay) + continue } - for { + Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) + + // 4. 循环接收消息 + connError := false + for !connError { + select { + case <-ctx.Done(): + connError = true + continue + default: + } + + // 接收消息 data, err := conn.Receive(ctx) if err != nil { - Logger.Error(ctx, err) - time.Sleep(10 * time.Second) + Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) + connError = true // 标记连接错误,触发重连 continue } + + // 处理消息(保留原有业务逻辑) if data != nil { dataMap := data.MapStrStr() if dataMap["Kind"] == "subscribe" { continue } - if dataMap["Channel"] == "cool:func" { - Logger.Debug(ctx, "执行函数", dataMap["Payload"]) + if dataMap["Channel"] == subscribeTopic { + Logger.Debug(ctx, "执行函数", "payload", dataMap["Payload"]) err := RunFunc(ctx, dataMap["Payload"]) if err != nil { - Logger.Error(ctx, "执行函数失败", err) + Logger.Error(ctx, "执行函数失败", "payload", dataMap["Payload"], "error", err) } } } } - } else { - panic(gerror.New("集群模式下, 请使用Redis作为缓存")) + + // 5. 清理资源,准备重连 + heartbeatCancel() // 关闭心跳协程 + _ = conn.Close(ctx) // 关闭当前连接 + // Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) + time.Sleep(retryDelay) } }