2025-06-20 17:13:51 +08:00
|
|
|
|
package cool
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
2026-02-20 13:29:43 +08:00
|
|
|
|
"context"
|
2025-06-20 17:13:51 +08:00
|
|
|
|
"time"
|
|
|
|
|
|
|
2026-03-04 22:47:21 +08:00
|
|
|
|
"github.com/gogf/gf/v2/database/gredis"
|
2025-06-20 17:13:51 +08:00
|
|
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
|
"github.com/gogf/gf/v2/text/gstr"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
type CoolFunc interface {
|
|
|
|
|
|
// Func handler
|
|
|
|
|
|
Func(ctx g.Ctx, param string) (err error)
|
|
|
|
|
|
// IsSingleton 是否单例,当为true时,只能有一个任务在执行,在注意函数为计划任务时使用
|
|
|
|
|
|
IsSingleton() bool
|
|
|
|
|
|
// IsAllWorker 是否所有worker都执行
|
|
|
|
|
|
IsAllWorker() bool
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// FuncMap 函数列表
|
|
|
|
|
|
var FuncMap = make(map[string]CoolFunc)
|
|
|
|
|
|
|
|
|
|
|
|
// RegisterFunc 注册函数
|
|
|
|
|
|
func RegisterFunc(name string, f CoolFunc) {
|
|
|
|
|
|
FuncMap[name] = f
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetFunc 获取函数
|
|
|
|
|
|
func GetFunc(name string) CoolFunc {
|
|
|
|
|
|
return FuncMap[name]
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// RunFunc 运行函数
|
|
|
|
|
|
func RunFunc(ctx g.Ctx, funcstring string) (err error) {
|
|
|
|
|
|
funcName := gstr.SubStr(funcstring, 0, gstr.Pos(funcstring, "("))
|
|
|
|
|
|
funcParam := gstr.SubStr(funcstring, gstr.Pos(funcstring, "(")+1, gstr.Pos(funcstring, ")")-gstr.Pos(funcstring, "(")-1)
|
|
|
|
|
|
if _, ok := FuncMap[funcName]; !ok {
|
|
|
|
|
|
err = gerror.New("函数不存在:" + funcName)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
if !FuncMap[funcName].IsAllWorker() {
|
|
|
|
|
|
// 检查当前是否为主进程, 如果不是主进程, 则不执行
|
|
|
|
|
|
if ProcessFlag != CacheManager.MustGetOrSet(ctx, "cool:masterflag", ProcessFlag, 60*time.Second).String() {
|
2025-12-25 12:14:04 +08:00
|
|
|
|
Logger.Debug(ctx, "当前进程不是主进程, 不执行单例函数", funcName)
|
2025-06-20 17:13:51 +08:00
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
err = FuncMap[funcName].Func(ctx, funcParam)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ClusterRunFunc 集群运行函数,如果是单机模式, 则直接运行函数
|
|
|
|
|
|
func ClusterRunFunc(ctx g.Ctx, funcstring string) (err error) {
|
|
|
|
|
|
if IsRedisMode {
|
|
|
|
|
|
conn, err := g.Redis("cool").Conn(ctx)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
defer conn.Close(ctx)
|
|
|
|
|
|
_, err = conn.Do(ctx, "publish", "cool:func", funcstring)
|
|
|
|
|
|
return err
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return RunFunc(ctx, funcstring)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ListenFunc 监听函数
|
2026-02-20 13:29:43 +08:00
|
|
|
|
// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活
|
2025-06-20 17:13:51 +08:00
|
|
|
|
func ListenFunc(ctx g.Ctx) {
|
2026-02-20 13:29:43 +08:00
|
|
|
|
if !IsRedisMode {
|
|
|
|
|
|
panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 定义常量配置
|
|
|
|
|
|
const (
|
|
|
|
|
|
subscribeTopic = "cool:func" // 订阅的主题
|
|
|
|
|
|
retryDelay = 10 * time.Second // 连接失败重试间隔
|
|
|
|
|
|
heartbeatInterval = 30 * time.Second // 心跳保活间隔
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// 外层循环:负责连接断开后的整体重连
|
|
|
|
|
|
for {
|
|
|
|
|
|
// 检查上下文是否已取消,优雅退出
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听")
|
|
|
|
|
|
return
|
|
|
|
|
|
default:
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 1. 建立 Redis 连接
|
2025-06-20 17:13:51 +08:00
|
|
|
|
conn, err := g.Redis("cool").Conn(ctx)
|
|
|
|
|
|
if err != nil {
|
2026-02-20 13:29:43 +08:00
|
|
|
|
Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
continue
|
2025-06-20 17:13:51 +08:00
|
|
|
|
}
|
2026-02-20 13:29:43 +08:00
|
|
|
|
Logger.Info(ctx, "成功获取 Redis 连接")
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 启动心跳保活协程,防止连接因空闲被断开
|
|
|
|
|
|
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
ticker := time.NewTicker(heartbeatInterval)
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
ticker.Stop()
|
|
|
|
|
|
heartbeatCancel()
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-heartbeatCtx.Done():
|
|
|
|
|
|
Logger.Info(ctx, "心跳协程退出")
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
|
// 发送 PING 心跳,保持连接活跃
|
|
|
|
|
|
_, pingErr := conn.Do(ctx, "PING")
|
|
|
|
|
|
if pingErr != nil {
|
|
|
|
|
|
Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr)
|
|
|
|
|
|
// 心跳失败时主动关闭连接,触发外层重连
|
|
|
|
|
|
_ = conn.Close(ctx)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
Logger.Debug(ctx, "Redis 心跳发送成功,连接正常")
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 订阅主题
|
|
|
|
|
|
_, err = conn.Do(ctx, "subscribe", subscribeTopic)
|
2025-06-20 17:13:51 +08:00
|
|
|
|
if err != nil {
|
2026-02-20 13:29:43 +08:00
|
|
|
|
Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err)
|
|
|
|
|
|
heartbeatCancel() // 关闭心跳协程
|
|
|
|
|
|
_ = conn.Close(ctx)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
|
continue
|
2025-06-20 17:13:51 +08:00
|
|
|
|
}
|
2026-02-20 13:29:43 +08:00
|
|
|
|
Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
|
2026-03-04 03:22:43 +08:00
|
|
|
|
_, err = conn.Do(ctx, "subscribe", "sun:join:2458") //加入队列
|
2026-02-20 13:29:43 +08:00
|
|
|
|
|
|
|
|
|
|
// 4. 循环接收消息
|
|
|
|
|
|
connError := false
|
|
|
|
|
|
for !connError {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
connError = true
|
|
|
|
|
|
continue
|
|
|
|
|
|
default:
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 接收消息
|
2025-06-20 17:13:51 +08:00
|
|
|
|
data, err := conn.Receive(ctx)
|
|
|
|
|
|
if err != nil {
|
2026-02-20 13:29:43 +08:00
|
|
|
|
Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err)
|
|
|
|
|
|
connError = true // 标记连接错误,触发重连
|
2025-06-20 17:13:51 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2026-02-20 13:29:43 +08:00
|
|
|
|
|
|
|
|
|
|
// 处理消息(保留原有业务逻辑)
|
2025-06-20 17:13:51 +08:00
|
|
|
|
if data != nil {
|
2026-03-04 22:47:21 +08:00
|
|
|
|
dataMap, ok := data.Interface().(*gredis.Message)
|
|
|
|
|
|
if !ok {
|
2025-06-20 17:13:51 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
2026-03-04 22:47:21 +08:00
|
|
|
|
// if dataMap. == "subscribe" {
|
|
|
|
|
|
// continue
|
|
|
|
|
|
// }
|
|
|
|
|
|
if dataMap.Channel == subscribeTopic {
|
|
|
|
|
|
Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload)
|
|
|
|
|
|
err := RunFunc(ctx, dataMap.Payload)
|
2025-06-20 17:13:51 +08:00
|
|
|
|
if err != nil {
|
2026-03-04 22:47:21 +08:00
|
|
|
|
Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err)
|
2025-06-20 17:13:51 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-03-04 22:47:21 +08:00
|
|
|
|
if dataMap.Channel == "sun:join:2458" {
|
|
|
|
|
|
|
|
|
|
|
|
println("收到sun:join:2458", dataMap.Payload)
|
|
|
|
|
|
//universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
|
|
|
|
|
|
zs.Add(1001, User{ID: 1001, JoinTime: time.Now().Unix(), Score: 1000})
|
|
|
|
|
|
if zs.Length() > 2 {
|
|
|
|
|
|
zs.FindNext(func(i User) bool { return i.ID > 1000 })
|
|
|
|
|
|
//找到上一个,如果区间分数少于一定,
|
|
|
|
|
|
//直接进行匹配
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
2025-06-20 17:13:51 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-02-20 13:29:43 +08:00
|
|
|
|
|
|
|
|
|
|
// 5. 清理资源,准备重连
|
|
|
|
|
|
heartbeatCancel() // 关闭心跳协程
|
|
|
|
|
|
_ = conn.Close(ctx) // 关闭当前连接
|
|
|
|
|
|
// Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay)
|
|
|
|
|
|
time.Sleep(retryDelay)
|
2025-06-20 17:13:51 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|