package rpc import ( "blazing/cool" "context" "time" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" ) // ListenFunc 监听函数 // ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活 func ListenFunc(ctx g.Ctx) { if !cool.IsRedisMode { panic(gerror.New("集群模式下, 请使用Redis作为缓存")) } // 定义常量配置 const ( subscribeTopic = "cool:func" // 订阅的主题 retryDelay = 10 * time.Second // 连接失败重试间隔 heartbeatInterval = 30 * time.Second // 心跳保活间隔 ) // 外层循环:负责连接断开后的整体重连 for { // 检查上下文是否已取消,优雅退出 select { case <-ctx.Done(): cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") return default: } // 1. 建立 Redis 连接 conn, err := g.Redis("cool").Conn(ctx) if err != nil { cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) time.Sleep(retryDelay) continue } cool.Logger.Info(ctx, "成功获取 Redis 连接") // 2. 启动心跳保活协程,防止连接因空闲被断开 heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) go func() { ticker := time.NewTicker(heartbeatInterval) defer func() { ticker.Stop() heartbeatCancel() }() for { select { case <-heartbeatCtx.Done(): cool.Logger.Info(ctx, "心跳协程退出") return case <-ticker.C: // 发送 PING 心跳,保持连接活跃 _, pingErr := conn.Do(ctx, "PING") if pingErr != nil { cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr) // 心跳失败时主动关闭连接,触发外层重连 _ = conn.Close(ctx) return } cool.Logger.Debug(ctx, "Redis 心跳发送成功,连接正常") } } }() // 3. 订阅主题 _, err = conn.Do(ctx, "subscribe", subscribeTopic) if err != nil { cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err) heartbeatCancel() // 关闭心跳协程 _ = conn.Close(ctx) time.Sleep(retryDelay) continue } cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) _, err = conn.Do(ctx, "subscribe", "sun:join") //加入队列 // 4. 循环接收消息 connError := false for !connError { select { case <-ctx.Done(): connError = true continue default: } // 接收消息 data, err := conn.Receive(ctx) if err != nil { cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) connError = true // 标记连接错误,触发重连 continue } // 处理消息(保留原有业务逻辑) if data != nil { dataMap, ok := data.Interface().(*gredis.Message) if !ok { continue } // if dataMap. == "subscribe" { // continue // } if dataMap.Channel == subscribeTopic { cool.Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload) err := cool.RunFunc(ctx, dataMap.Payload) if err != nil { cool.Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err) } } if dataMap.Channel == "sun:join" { fightmap.ADD(dataMap.Payload) //universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) } } } // 5. 清理资源,准备重连 heartbeatCancel() // 关闭心跳协程 _ = conn.Close(ctx) // 关闭当前连接 // Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) time.Sleep(retryDelay) } } func ListenFight(ctx g.Ctx) { // 定义常量配置 const ( retryDelay = 10 * time.Second // 连接失败重试间隔 heartbeatInterval = 30 * time.Second // 心跳保活间隔 ) // 外层循环:负责连接断开后的整体重连 for { // 检查上下文是否已取消,优雅退出 select { case <-ctx.Done(): cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") return default: } // 1. 建立 Redis 连接 conn, err := g.Redis("cool").Conn(ctx) if err != nil { cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) time.Sleep(retryDelay) continue } cool.Logger.Info(ctx, "成功获取 Redis 连接") // 2. 启动心跳保活协程,防止连接因空闲被断开 heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) go func() { ticker := time.NewTicker(heartbeatInterval) defer func() { ticker.Stop() heartbeatCancel() }() for { select { case <-heartbeatCtx.Done(): cool.Logger.Info(ctx, "心跳协程退出") return case <-ticker.C: // 发送 PING 心跳,保持连接活跃 _, pingErr := conn.Do(ctx, "PING") if pingErr != nil { cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr) // 心跳失败时主动关闭连接,触发外层重连 _ = conn.Close(ctx) return } cool.Logger.Debug(ctx, "Redis 心跳发送成功,连接正常") } } }() // cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) //房主服务器拉取之后,所有操作通过redie转发到房主服务器去执行,非房主方只进行收包操作 conn.Do(ctx, "subscribe", "sun:start:"+gconv.String(100000*cool.Config.ServerInfo.OnlineID+cool.Config.ServerInfo.Port)) //发起战斗 ,由房主服务器主动拉取战斗实现 conn.Do(ctx, "subscribe", "sun:sendpack:"+gconv.String(100000*cool.Config.ServerInfo.OnlineID+cool.Config.ServerInfo.Port)) //接受战斗的信息,直接接受包转发就行 // 4. 循环接收消息 connError := false for !connError { select { case <-ctx.Done(): connError = true continue default: } // 接收消息 data, err := conn.Receive(ctx) if err != nil { cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) connError = true // 标记连接错误,触发重连 continue } // 处理消息(保留原有业务逻辑) if data != nil { dataMap, ok := data.Interface().(*gredis.Message) if !ok { continue } // if dataMap. == "subscribe" { // continue // } if dataMap.Channel == "sun:join:2458" { fightmap.ADD(dataMap.Payload) //universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) } } } // 5. 清理资源,准备重连 heartbeatCancel() // 关闭心跳协程 _ = conn.Close(ctx) // 关闭当前连接 // Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) time.Sleep(retryDelay) } }