```
All checks were successful
ci/woodpecker/push/my-first-workflow Pipeline was successful

refactor(cool): 移除Redis监听功能和用户结构体定义

移除ListenFunc函数,该函数提供Redis PubSub监听功能,
包括自动重连和心跳保活机制。同时删除User结构体定义和
相关有序集合变量,这些功能将由rpc模块替代实现。

feat(rpc): 添加对ListenFunc的调用以处理Redis监听

在login模块中
This commit is contained in:
昔念
2026-03-04 23:38:21 +08:00
parent 4751594ee8
commit aa53001982
8 changed files with 416 additions and 188 deletions

View File

@@ -1,10 +1,8 @@
package cool
import (
"context"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/text/gstr"
@@ -65,133 +63,3 @@ func ClusterRunFunc(ctx g.Ctx, funcstring string) (err error) {
return RunFunc(ctx, funcstring)
}
}
// ListenFunc 监听函数
// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活
func ListenFunc(ctx g.Ctx) {
if !IsRedisMode {
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
}
// 定义常量配置
const (
subscribeTopic = "cool:func" // 订阅的主题
retryDelay = 10 * time.Second // 连接失败重试间隔
heartbeatInterval = 30 * time.Second // 心跳保活间隔
)
// 外层循环:负责连接断开后的整体重连
for {
// 检查上下文是否已取消,优雅退出
select {
case <-ctx.Done():
Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听")
return
default:
}
// 1. 建立 Redis 连接
conn, err := g.Redis("cool").Conn(ctx)
if err != nil {
Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
time.Sleep(retryDelay)
continue
}
Logger.Info(ctx, "成功获取 Redis 连接")
// 2. 启动心跳保活协程,防止连接因空闲被断开
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer func() {
ticker.Stop()
heartbeatCancel()
}()
for {
select {
case <-heartbeatCtx.Done():
Logger.Info(ctx, "心跳协程退出")
return
case <-ticker.C:
// 发送 PING 心跳,保持连接活跃
_, pingErr := conn.Do(ctx, "PING")
if pingErr != nil {
Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr)
// 心跳失败时主动关闭连接,触发外层重连
_ = conn.Close(ctx)
return
}
Logger.Debug(ctx, "Redis 心跳发送成功,连接正常")
}
}
}()
// 3. 订阅主题
_, err = conn.Do(ctx, "subscribe", subscribeTopic)
if err != nil {
Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err)
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx)
time.Sleep(retryDelay)
continue
}
Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
_, err = conn.Do(ctx, "subscribe", "sun:join:2458") //加入队列
// 4. 循环接收消息
connError := false
for !connError {
select {
case <-ctx.Done():
connError = true
continue
default:
}
// 接收消息
data, err := conn.Receive(ctx)
if err != nil {
Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
connError = true // 标记连接错误,触发重连
continue
}
// 处理消息(保留原有业务逻辑)
if data != nil {
dataMap, ok := data.Interface().(*gredis.Message)
if !ok {
continue
}
// if dataMap. == "subscribe" {
// continue
// }
if dataMap.Channel == subscribeTopic {
Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload)
err := RunFunc(ctx, dataMap.Payload)
if err != nil {
Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err)
}
}
if dataMap.Channel == "sun:join:2458" {
println("收到sun:join:2458", dataMap.Payload)
//universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
zs.Add(1001, User{ID: 1001, JoinTime: time.Now().Unix(), Score: 1000})
if zs.Length() > 2 {
zs.FindNext(func(i User) bool { return i.ID > 1000 })
//找到上一个,如果区间分数少于一定,
//直接进行匹配
}
}
}
}
// 5. 清理资源,准备重连
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) // 关闭当前连接
// Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay)
time.Sleep(retryDelay)
}
}

View File

@@ -1,25 +0,0 @@
package cool
import "github.com/liwnn/zset"
type User struct {
JoinTime int64
ID int
Score int
}
func (u User) Key() uint32 {
return uint32(u.ID)
}
// 如果分数不对的话,就按时间排序
func (u User) Less(than User) bool {
if u.Score == than.Score {
return u.JoinTime < than.JoinTime
}
return u.Score < than.Score
}
var zs = zset.New[uint32, User](func(a, b User) bool {
return a.Less(b)
})

240
common/rpc/func.go Normal file
View File

@@ -0,0 +1,240 @@
package rpc
import (
"blazing/cool"
"context"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
// ListenFunc 监听函数
// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活
func ListenFunc(ctx g.Ctx) {
if !cool.IsRedisMode {
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
}
// 定义常量配置
const (
subscribeTopic = "cool:func" // 订阅的主题
retryDelay = 10 * time.Second // 连接失败重试间隔
heartbeatInterval = 30 * time.Second // 心跳保活间隔
)
// 外层循环:负责连接断开后的整体重连
for {
// 检查上下文是否已取消,优雅退出
select {
case <-ctx.Done():
cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听")
return
default:
}
// 1. 建立 Redis 连接
conn, err := g.Redis("cool").Conn(ctx)
if err != nil {
cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
time.Sleep(retryDelay)
continue
}
cool.Logger.Info(ctx, "成功获取 Redis 连接")
// 2. 启动心跳保活协程,防止连接因空闲被断开
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer func() {
ticker.Stop()
heartbeatCancel()
}()
for {
select {
case <-heartbeatCtx.Done():
cool.Logger.Info(ctx, "心跳协程退出")
return
case <-ticker.C:
// 发送 PING 心跳,保持连接活跃
_, pingErr := conn.Do(ctx, "PING")
if pingErr != nil {
cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr)
// 心跳失败时主动关闭连接,触发外层重连
_ = conn.Close(ctx)
return
}
cool.Logger.Debug(ctx, "Redis 心跳发送成功,连接正常")
}
}
}()
// 3. 订阅主题
_, err = conn.Do(ctx, "subscribe", subscribeTopic)
if err != nil {
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err)
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx)
time.Sleep(retryDelay)
continue
}
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
_, err = conn.Do(ctx, "subscribe", "sun:join:2458") //加入队列
// 4. 循环接收消息
connError := false
for !connError {
select {
case <-ctx.Done():
connError = true
continue
default:
}
// 接收消息
data, err := conn.Receive(ctx)
if err != nil {
cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
connError = true // 标记连接错误,触发重连
continue
}
// 处理消息(保留原有业务逻辑)
if data != nil {
dataMap, ok := data.Interface().(*gredis.Message)
if !ok {
continue
}
// if dataMap. == "subscribe" {
// continue
// }
if dataMap.Channel == subscribeTopic {
cool.Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload)
err := cool.RunFunc(ctx, dataMap.Payload)
if err != nil {
cool.Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err)
}
}
if dataMap.Channel == "sun:join:2458" {
fightmap.ADD(dataMap.Payload)
//universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
}
}
}
// 5. 清理资源,准备重连
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) // 关闭当前连接
// Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay)
time.Sleep(retryDelay)
}
}
func ListenFight(ctx g.Ctx) {
// 定义常量配置
const (
retryDelay = 10 * time.Second // 连接失败重试间隔
heartbeatInterval = 30 * time.Second // 心跳保活间隔
)
// 外层循环:负责连接断开后的整体重连
for {
// 检查上下文是否已取消,优雅退出
select {
case <-ctx.Done():
cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听")
return
default:
}
// 1. 建立 Redis 连接
conn, err := g.Redis("cool").Conn(ctx)
if err != nil {
cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
time.Sleep(retryDelay)
continue
}
cool.Logger.Info(ctx, "成功获取 Redis 连接")
// 2. 启动心跳保活协程,防止连接因空闲被断开
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer func() {
ticker.Stop()
heartbeatCancel()
}()
for {
select {
case <-heartbeatCtx.Done():
cool.Logger.Info(ctx, "心跳协程退出")
return
case <-ticker.C:
// 发送 PING 心跳,保持连接活跃
_, pingErr := conn.Do(ctx, "PING")
if pingErr != nil {
cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr)
// 心跳失败时主动关闭连接,触发外层重连
_ = conn.Close(ctx)
return
}
cool.Logger.Debug(ctx, "Redis 心跳发送成功,连接正常")
}
}
}()
// cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
//房主服务器拉取之后,所有操作通过redie转发到房主服务器去执行,非房主方只进行收包操作
conn.Do(ctx, "subscribe", "sun:start:"+gconv.String(cool.Config.ServerInfo.Port)) //发起战斗 ,由房主服务器主动拉取战斗实现
conn.Do(ctx, "subscribe", "sun:sendpack"+gconv.String(cool.Config.ServerInfo.Port)) //接受战斗的信息,直接接受包转发就行
// 4. 循环接收消息
connError := false
for !connError {
select {
case <-ctx.Done():
connError = true
continue
default:
}
// 接收消息
data, err := conn.Receive(ctx)
if err != nil {
cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
connError = true // 标记连接错误,触发重连
continue
}
// 处理消息(保留原有业务逻辑)
if data != nil {
dataMap, ok := data.Interface().(*gredis.Message)
if !ok {
continue
}
// if dataMap. == "subscribe" {
// continue
// }
if dataMap.Channel == "sun:join:2458" {
fightmap.ADD(dataMap.Payload)
//universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
}
}
}
// 5. 清理资源,准备重连
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) // 关闭当前连接
// Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay)
time.Sleep(retryDelay)
}
}

90
common/rpc/user.go Normal file
View File

@@ -0,0 +1,90 @@
package rpc
import (
"blazing/logic/service/common"
"blazing/modules/player/service"
"errors"
"strings"
"time"
"github.com/gogf/gf/v2/util/gconv"
"github.com/liwnn/zset"
csmap "github.com/mhmtszr/concurrent-swiss-map"
)
type RPCfight struct {
fightmap *csmap.CsMap[int, common.FightI]
zs *zset.ZSet[uint32, User]
}
// ExtractBetweenBrackets 提取字符串中第一个 [] 中间的文本
// 返回值:中间文本、是否成功、错误信息
func ExtractBetweenBrackets(s string) (string, bool, error) {
// 1. 找到第一个 [ 的索引
leftIdx := strings.Index(s, "[")
if leftIdx == -1 {
return "", false, errors.New("未找到左中括号 [")
}
// 2. 找到第一个 [ 之后的第一个 ] 的索引
rightIdx := strings.Index(s[leftIdx+1:], "]")
if rightIdx == -1 {
return "", false, errors.New("找到左中括号 [ 但未找到对应的右中括号 ]")
}
// 3. 计算实际的右中括号索引(加上 leftIdx+1
rightIdx += leftIdx + 1
// 4. 提取中间文本(去除前后空格,可选)
result := strings.TrimSpace(s[leftIdx+1 : rightIdx])
// 5. 检查是否为空
if result == "" {
return "", true, errors.New("中括号中间无文本")
}
return result, true, nil
}
func (r *RPCfight) ADD(s string) {
println("收到sun:join:2458", s)
t, _, _ := ExtractBetweenBrackets(s)
ret := service.NewPVPService(gconv.Uint32(t)).Get(gconv.Uint32(t))
score := 1000
if ret == nil {
score = int(ret.RankInfo.Score)
}
r.zs.Add(gconv.Uint32(t), User{ID: gconv.Uint32(t), JoinTime: time.Now().Unix(), Score: score})
if r.zs.Length() > 2 {
r.zs.FindPrev(func(i User) bool { return i.Score > score })
//找到上一个,如果区间分数少于一定,
//直接进行匹配
}
}
type User struct {
JoinTime int64
ID uint32
Score int
}
func (u User) Key() uint32 {
return uint32(u.ID)
}
// 如果分数不对的话,就按时间排序
func (u User) Less(than User) bool {
if u.Score == than.Score {
return u.JoinTime < than.JoinTime
}
return u.Score < than.Score
}
///定义map,存储用户对战斗容器的映射,便于外部传入时候进行直接操作
var fightmap = RPCfight{
fightmap: csmap.New[int, common.FightI](),
zs: zset.New[uint32, User](func(a, b User) bool {
return a.Less(b)
}),
}