package rpc import ( "blazing/cool" "blazing/logic/service/fight/pvp" "blazing/logic/service/fight/pvpwire" "fmt" "time" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" ) // ListenFunc 监听函数 // ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连。 // 注意:PubSub 连接只负责订阅和接收,避免在同一连接上并发 PING。 func ListenFunc(ctx g.Ctx) { if !cool.IsRedisMode { panic(gerror.New("集群模式下, 请使用Redis作为缓存")) } // 定义常量配置 const ( subscribeTopic = "cool:func" // 订阅的主题 retryDelay = 10 * time.Second // 连接失败重试间隔 ) // 外层循环:负责连接断开后的整体重连 for { // 检查上下文是否已取消,优雅退出 select { case <-ctx.Done(): cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") return default: } // 1. 建立 Redis 连接 conn, err := cool.Redis.Conn(ctx) if err != nil { cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) time.Sleep(retryDelay) continue } // 2. 订阅主题 _, err = conn.Do(ctx, "subscribe", subscribeTopic) if err != nil { cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err) _ = conn.Close(ctx) time.Sleep(retryDelay) continue } cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) _, err = conn.Do(ctx, "subscribe", "sun:join") //加入队列 if err != nil { cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", "sun:join", "error", err) _ = conn.Close(ctx) time.Sleep(retryDelay) continue } cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", "sun:join") // 3. 循环接收消息 connError := false for !connError { select { case <-ctx.Done(): connError = true continue default: } // 接收消息 data, err := conn.Receive(ctx) if err != nil { cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) connError = true // 标记连接错误,触发重连 continue } // 处理消息(保留原有业务逻辑) if data != nil { dataMap, ok := data.Interface().(*gredis.Message) if !ok { continue } // if dataMap. == "subscribe" { // continue // } if dataMap.Channel == subscribeTopic { cool.Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload) err := cool.RunFunc(ctx, dataMap.Payload) if err != nil { cool.Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err) } } if dataMap.Channel == "sun:join" { fightmap.ADD(dataMap.Payload) //universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) } } } // 4. 清理资源,准备重连 _ = conn.Close(ctx) // 关闭当前连接 cool.Logger.Info(ctx, "Redis 订阅连接异常,准备重连", "retry_after", retryDelay) time.Sleep(retryDelay) } } // ListenFight 完全对齐 ListenFunc 写法,修复收不到消息问题。 // 注意:PubSub 连接只负责订阅和接收,避免在同一连接上并发 PING。 func ListenFight(ctx g.Ctx) { if !cool.IsRedisMode { panic(gerror.New("集群模式下, 请使用Redis作为缓存")) } // 定义常量配置(对齐 ListenFunc 风格) const ( retryDelay = 10 * time.Second // 连接失败重试间隔 ) // 提前拼接订阅主题(避免重复拼接,便于日志打印) serverID := cool.Config.ServerInfo.GetID() startTopic := "sun:start:" + serverID sendPackTopic := "sendpack:" + serverID pvpServerTopic := pvpwire.ServerTopic(gconv.Uint32(serverID)) pvpCoordinatorTopic := pvpwire.CoordinatorTopicPrefix // 外层循环:负责连接断开后的整体重连 for { // 检查上下文是否已取消,优雅退出(对齐 ListenFunc) select { case <-ctx.Done(): cool.Logger.Info(ctx, "ListenFight 上下文已取消,退出监听") return default: } // 1. 建立 Redis 连接(完全对齐 ListenFunc) conn, err := cool.Redis.Conn(ctx) if err != nil { cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) time.Sleep(retryDelay) continue } // 2. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连) subscribeTopics := []string{startTopic, pvpServerTopic} if cool.Config.GameOnlineID == pvp.CoordinatorOnlineID { subscribeTopics = append(subscribeTopics, pvpCoordinatorTopic) } subscribeFailed := false for _, topic := range subscribeTopics { _, err = conn.Do(ctx, "subscribe", topic) if err != nil { cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", topic, "error", err) _ = conn.Close(ctx) time.Sleep(retryDelay) subscribeFailed = true break } cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", topic) } if subscribeFailed { continue } // // 订阅 sun:sendpack:服务器ID // _, err = conn.Do(ctx, "subscribe", sendPackTopic) // if err != nil { // cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", sendPackTopic, "error", err) // heartbeatCancel() // 关闭心跳协程 // _ = conn.Close(ctx) // time.Sleep(retryDelay) // continue // } // cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", sendPackTopic) // 打印监听提示(保留原有日志) fmt.Println("监听战斗", startTopic) // 3. 循环接收消息(完全对齐 ListenFunc 逻辑) connError := false for !connError { select { case <-ctx.Done(): connError = true continue default: } // 接收消息(和 ListenFunc 保持一致的 Receive 用法) data, err := conn.Receive(ctx) if err != nil { cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) connError = true // 标记连接错误,触发重连 continue } // 处理消息(完全对齐 ListenFunc 的解析逻辑) if data != nil { dataMap, ok := data.Interface().(*gredis.Message) if !ok { continue } // 处理 sun:start:服务器ID 消息 if dataMap.Channel == startTopic { fmt.Println("战斗开始", dataMap.Payload) // universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) } if dataMap.Channel == pvpServerTopic || dataMap.Channel == pvpCoordinatorTopic { pvp.HandleRedisMessage(dataMap.Channel, dataMap.Payload) } // 【可选】处理 sun:sendpack:服务器ID 消息(如果需要) if dataMap.Channel == sendPackTopic { fmt.Println("收到战斗包", dataMap.Payload) } } } // 4. 清理资源,准备重连(完全对齐 ListenFunc) _ = conn.Close(ctx) // 关闭当前连接 cool.Logger.Info(ctx, "Redis 战斗订阅连接异常,准备重连", "retry_after", retryDelay) time.Sleep(retryDelay) } }