2026-03-04 23:38:21 +08:00
|
|
|
|
package rpc
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"blazing/cool"
|
2026-04-05 05:04:04 +08:00
|
|
|
|
"blazing/logic/service/fight/pvp"
|
|
|
|
|
|
"blazing/logic/service/fight/pvpwire"
|
2026-04-05 11:14:25 +08:00
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
"fmt"
|
2026-03-04 23:38:21 +08:00
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/gogf/gf/v2/database/gredis"
|
|
|
|
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
2026-04-05 05:04:04 +08:00
|
|
|
|
"github.com/gogf/gf/v2/util/gconv"
|
2026-03-04 23:38:21 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// ListenFunc 监听函数
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连。
|
|
|
|
|
|
// 注意:PubSub 连接只负责订阅和接收,避免在同一连接上并发 PING。
|
2026-03-04 23:38:21 +08:00
|
|
|
|
func ListenFunc(ctx g.Ctx) {
|
|
|
|
|
|
if !cool.IsRedisMode {
|
|
|
|
|
|
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 定义常量配置
|
|
|
|
|
|
const (
|
2026-04-12 19:14:18 +08:00
|
|
|
|
subscribeTopic = "cool:func" // 订阅的主题
|
|
|
|
|
|
retryDelay = 10 * time.Second // 连接失败重试间隔
|
2026-03-04 23:38:21 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// 外层循环:负责连接断开后的整体重连
|
|
|
|
|
|
for {
|
|
|
|
|
|
// 检查上下文是否已取消,优雅退出
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听")
|
|
|
|
|
|
return
|
|
|
|
|
|
default:
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 1. 建立 Redis 连接
|
2026-03-20 04:58:23 +08:00
|
|
|
|
conn, err := cool.Redis.Conn(ctx)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// 2. 订阅主题
|
2026-03-04 23:38:21 +08:00
|
|
|
|
_, err = conn.Do(ctx, "subscribe", subscribeTopic)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err)
|
|
|
|
|
|
_ = conn.Close(ctx)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
|
2026-03-12 14:35:27 +08:00
|
|
|
|
_, err = conn.Do(ctx, "subscribe", "sun:join") //加入队列
|
2026-04-12 19:14:18 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", "sun:join", "error", err)
|
|
|
|
|
|
_ = conn.Close(ctx)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", "sun:join")
|
2026-03-04 23:38:21 +08:00
|
|
|
|
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// 3. 循环接收消息
|
2026-03-04 23:38:21 +08:00
|
|
|
|
connError := false
|
|
|
|
|
|
for !connError {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
connError = true
|
|
|
|
|
|
continue
|
|
|
|
|
|
default:
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 接收消息
|
|
|
|
|
|
data, err := conn.Receive(ctx)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
|
|
|
|
|
|
connError = true // 标记连接错误,触发重连
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 处理消息(保留原有业务逻辑)
|
|
|
|
|
|
if data != nil {
|
|
|
|
|
|
dataMap, ok := data.Interface().(*gredis.Message)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
// if dataMap. == "subscribe" {
|
|
|
|
|
|
// continue
|
|
|
|
|
|
// }
|
|
|
|
|
|
if dataMap.Channel == subscribeTopic {
|
|
|
|
|
|
cool.Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload)
|
|
|
|
|
|
err := cool.RunFunc(ctx, dataMap.Payload)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-03-05 11:21:38 +08:00
|
|
|
|
if dataMap.Channel == "sun:join" {
|
2026-03-04 23:38:21 +08:00
|
|
|
|
|
|
|
|
|
|
fightmap.ADD(dataMap.Payload)
|
|
|
|
|
|
//universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// 4. 清理资源,准备重连
|
2026-03-04 23:38:21 +08:00
|
|
|
|
_ = conn.Close(ctx) // 关闭当前连接
|
2026-04-12 19:14:18 +08:00
|
|
|
|
cool.Logger.Info(ctx, "Redis 订阅连接异常,准备重连", "retry_after", retryDelay)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// ListenFight 完全对齐 ListenFunc 写法,修复收不到消息问题。
|
|
|
|
|
|
// 注意:PubSub 连接只负责订阅和接收,避免在同一连接上并发 PING。
|
2026-03-04 23:38:21 +08:00
|
|
|
|
func ListenFight(ctx g.Ctx) {
|
2026-03-20 04:58:23 +08:00
|
|
|
|
if !cool.IsRedisMode {
|
|
|
|
|
|
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
|
|
|
|
|
|
}
|
2026-03-04 23:38:21 +08:00
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
// 定义常量配置(对齐 ListenFunc 风格)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
const (
|
2026-04-12 19:14:18 +08:00
|
|
|
|
retryDelay = 10 * time.Second // 连接失败重试间隔
|
2026-03-04 23:38:21 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
// 提前拼接订阅主题(避免重复拼接,便于日志打印)
|
|
|
|
|
|
serverID := cool.Config.ServerInfo.GetID()
|
|
|
|
|
|
startTopic := "sun:start:" + serverID
|
|
|
|
|
|
sendPackTopic := "sendpack:" + serverID
|
2026-04-05 05:04:04 +08:00
|
|
|
|
pvpServerTopic := pvpwire.ServerTopic(gconv.Uint32(serverID))
|
|
|
|
|
|
pvpCoordinatorTopic := pvpwire.CoordinatorTopicPrefix
|
2026-03-20 04:58:23 +08:00
|
|
|
|
|
2026-03-04 23:38:21 +08:00
|
|
|
|
// 外层循环:负责连接断开后的整体重连
|
|
|
|
|
|
for {
|
2026-03-20 04:58:23 +08:00
|
|
|
|
|
|
|
|
|
|
// 检查上下文是否已取消,优雅退出(对齐 ListenFunc)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
2026-03-20 04:58:23 +08:00
|
|
|
|
cool.Logger.Info(ctx, "ListenFight 上下文已取消,退出监听")
|
2026-03-04 23:38:21 +08:00
|
|
|
|
return
|
|
|
|
|
|
default:
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
// 1. 建立 Redis 连接(完全对齐 ListenFunc)
|
|
|
|
|
|
conn, err := cool.Redis.Conn(ctx)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// 2. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连)
|
2026-04-05 05:04:04 +08:00
|
|
|
|
subscribeTopics := []string{startTopic, pvpServerTopic}
|
|
|
|
|
|
if cool.Config.GameOnlineID == pvp.CoordinatorOnlineID {
|
|
|
|
|
|
subscribeTopics = append(subscribeTopics, pvpCoordinatorTopic)
|
|
|
|
|
|
}
|
|
|
|
|
|
subscribeFailed := false
|
|
|
|
|
|
for _, topic := range subscribeTopics {
|
|
|
|
|
|
_, err = conn.Do(ctx, "subscribe", topic)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", topic, "error", err)
|
|
|
|
|
|
_ = conn.Close(ctx)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
subscribeFailed = true
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", topic)
|
|
|
|
|
|
}
|
|
|
|
|
|
if subscribeFailed {
|
2026-03-20 04:58:23 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// // 订阅 sun:sendpack:服务器ID
|
|
|
|
|
|
// _, err = conn.Do(ctx, "subscribe", sendPackTopic)
|
|
|
|
|
|
// if err != nil {
|
|
|
|
|
|
// cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", sendPackTopic, "error", err)
|
|
|
|
|
|
// heartbeatCancel() // 关闭心跳协程
|
|
|
|
|
|
// _ = conn.Close(ctx)
|
|
|
|
|
|
// time.Sleep(retryDelay)
|
|
|
|
|
|
// continue
|
|
|
|
|
|
// }
|
|
|
|
|
|
// cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", sendPackTopic)
|
|
|
|
|
|
|
|
|
|
|
|
// 打印监听提示(保留原有日志)
|
|
|
|
|
|
fmt.Println("监听战斗", startTopic)
|
|
|
|
|
|
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// 3. 循环接收消息(完全对齐 ListenFunc 逻辑)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
connError := false
|
|
|
|
|
|
for !connError {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
connError = true
|
|
|
|
|
|
continue
|
|
|
|
|
|
default:
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
// 接收消息(和 ListenFunc 保持一致的 Receive 用法)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
data, err := conn.Receive(ctx)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
|
|
|
|
|
|
connError = true // 标记连接错误,触发重连
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
// 处理消息(完全对齐 ListenFunc 的解析逻辑)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
if data != nil {
|
|
|
|
|
|
dataMap, ok := data.Interface().(*gredis.Message)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
// 处理 sun:start:服务器ID 消息
|
|
|
|
|
|
if dataMap.Channel == startTopic {
|
|
|
|
|
|
fmt.Println("战斗开始", dataMap.Payload)
|
|
|
|
|
|
// universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
|
|
|
|
|
|
}
|
2026-03-04 23:38:21 +08:00
|
|
|
|
|
2026-04-05 05:04:04 +08:00
|
|
|
|
if dataMap.Channel == pvpServerTopic || dataMap.Channel == pvpCoordinatorTopic {
|
|
|
|
|
|
pvp.HandleRedisMessage(dataMap.Channel, dataMap.Payload)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-20 04:58:23 +08:00
|
|
|
|
// 【可选】处理 sun:sendpack:服务器ID 消息(如果需要)
|
|
|
|
|
|
if dataMap.Channel == sendPackTopic {
|
|
|
|
|
|
fmt.Println("收到战斗包", dataMap.Payload)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-12 19:14:18 +08:00
|
|
|
|
// 4. 清理资源,准备重连(完全对齐 ListenFunc)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
_ = conn.Close(ctx) // 关闭当前连接
|
2026-04-12 19:14:18 +08:00
|
|
|
|
cool.Logger.Info(ctx, "Redis 战斗订阅连接异常,准备重连", "retry_after", retryDelay)
|
2026-03-04 23:38:21 +08:00
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|