```
All checks were successful
ci/woodpecker/push/my-first-workflow Pipeline was successful

refactor(common): 统一Redis连接方式并优化代码结构

- 将 g.Redis("cool").Conn(ctx) 统一改为 Redis.Conn(ctx) 的调用方式
- 在coolconfig中添加ServerList.GetID()方法用于生成服务器唯一标识
- 引入gconv包用于类型转换操作

feat(rpc): 完善ListenFight函数实现集群消息监听

- 新增ListenFight函数,完全对齐ListenFunc
This commit is contained in:
昔念
2026-03-20 04:58:23 +08:00
parent 5657f1e673
commit 90b62b44e4
13 changed files with 119 additions and 60 deletions

View File

@@ -3,12 +3,12 @@ package rpc
import (
"blazing/cool"
"context"
"fmt"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
// ListenFunc 监听函数
@@ -36,7 +36,7 @@ func ListenFunc(ctx g.Ctx) {
}
// 1. 建立 Redis 连接
conn, err := g.Redis("cool").Conn(ctx)
conn, err := cool.Redis.Conn(ctx)
if err != nil {
cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
time.Sleep(retryDelay)
@@ -135,26 +135,36 @@ func ListenFunc(ctx g.Ctx) {
}
}
// ListenFight 完全对齐 ListenFunc 写法,修复收不到消息问题
func ListenFight(ctx g.Ctx) {
if !cool.IsRedisMode {
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
}
// 定义常量配置
// 定义常量配置(对齐 ListenFunc 风格)
const (
retryDelay = 10 * time.Second // 连接失败重试间隔
heartbeatInterval = 30 * time.Second // 心跳保活间隔
)
// 提前拼接订阅主题(避免重复拼接,便于日志打印)
serverID := cool.Config.ServerInfo.GetID()
startTopic := "sun:start:" + serverID
sendPackTopic := "sendpack:" + serverID
// 外层循环:负责连接断开后的整体重连
for {
// 检查上下文是否已取消,优雅退出
// 检查上下文是否已取消,优雅退出(对齐 ListenFunc
select {
case <-ctx.Done():
cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听")
cool.Logger.Info(ctx, "ListenFight 上下文已取消,退出监听")
return
default:
}
// 1. 建立 Redis 连接
conn, err := g.Redis("cool").Conn(ctx)
// 1. 建立 Redis 连接(完全对齐 ListenFunc
conn, err := cool.Redis.Conn(ctx)
if err != nil {
cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
time.Sleep(retryDelay)
@@ -162,7 +172,7 @@ func ListenFight(ctx g.Ctx) {
}
cool.Logger.Info(ctx, "成功获取 Redis 连接")
// 2. 启动心跳保活协程,防止连接因空闲被断开
// 2. 启动心跳保活协程(完全对齐 ListenFunc 逻辑)
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(heartbeatInterval)
@@ -190,11 +200,33 @@ func ListenFight(ctx g.Ctx) {
}
}()
// cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
//房主服务器拉取之后,所有操作通过redie转发到房主服务器去执行,非房主方只进行收包操作
conn.Do(ctx, "subscribe", "sun:start:"+gconv.String(100000*cool.Config.ServerInfo.OnlineID+cool.Config.ServerInfo.Port)) //发起战斗 ,由房主服务器主动拉取战斗实现
conn.Do(ctx, "subscribe", "sun:sendpack:"+gconv.String(100000*cool.Config.ServerInfo.OnlineID+cool.Config.ServerInfo.Port)) //接受战斗的信息,直接接受包转发就行
// 4. 循环接收消息
// 3. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连)
// 订阅 sun:start:服务器ID
_, err = conn.Do(ctx, "subscribe", startTopic)
if err != nil {
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", startTopic, "error", err)
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx)
time.Sleep(retryDelay)
continue
}
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", startTopic)
// // 订阅 sun:sendpack:服务器ID
// _, err = conn.Do(ctx, "subscribe", sendPackTopic)
// if err != nil {
// cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", sendPackTopic, "error", err)
// heartbeatCancel() // 关闭心跳协程
// _ = conn.Close(ctx)
// time.Sleep(retryDelay)
// continue
// }
// cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", sendPackTopic)
// 打印监听提示(保留原有日志)
fmt.Println("监听战斗", startTopic)
// 4. 循环接收消息(完全对齐 ListenFunc 逻辑)
connError := false
for !connError {
select {
@@ -204,7 +236,7 @@ func ListenFight(ctx g.Ctx) {
default:
}
// 接收消息
// 接收消息(和 ListenFunc 保持一致的 Receive 用法)
data, err := conn.Receive(ctx)
if err != nil {
cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
@@ -212,29 +244,29 @@ func ListenFight(ctx g.Ctx) {
continue
}
// 处理消息(保留原有业务逻辑)
// 处理消息(完全对齐 ListenFunc 的解析逻辑)
if data != nil {
dataMap, ok := data.Interface().(*gredis.Message)
if !ok {
continue
}
// if dataMap. == "subscribe" {
// continue
// }
if dataMap.Channel == "sun:join:2458" {
fightmap.ADD(dataMap.Payload)
//universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
// 处理 sun:start:服务器ID 消息
if dataMap.Channel == startTopic {
fmt.Println("战斗开始", dataMap.Payload)
// universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
}
// 【可选】处理 sun:sendpack:服务器ID 消息(如果需要)
if dataMap.Channel == sendPackTopic {
fmt.Println("收到战斗包", dataMap.Payload)
}
}
}
// 5. 清理资源,准备重连
// 5. 清理资源,准备重连(完全对齐 ListenFunc
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) // 关闭当前连接
// Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay)
time.Sleep(retryDelay)
}
}