diff --git a/common/cool/func.go b/common/cool/func.go index 426f92f31..cc0af2d0b 100644 --- a/common/cool/func.go +++ b/common/cool/func.go @@ -1,10 +1,8 @@ package cool import ( - "context" "time" - "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/text/gstr" @@ -65,133 +63,3 @@ func ClusterRunFunc(ctx g.Ctx, funcstring string) (err error) { return RunFunc(ctx, funcstring) } } - -// ListenFunc 监听函数 -// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活 -func ListenFunc(ctx g.Ctx) { - if !IsRedisMode { - panic(gerror.New("集群模式下, 请使用Redis作为缓存")) - } - - // 定义常量配置 - const ( - subscribeTopic = "cool:func" // 订阅的主题 - retryDelay = 10 * time.Second // 连接失败重试间隔 - heartbeatInterval = 30 * time.Second // 心跳保活间隔 - ) - - // 外层循环:负责连接断开后的整体重连 - for { - // 检查上下文是否已取消,优雅退出 - select { - case <-ctx.Done(): - Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") - return - default: - } - - // 1. 建立 Redis 连接 - conn, err := g.Redis("cool").Conn(ctx) - if err != nil { - Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) - time.Sleep(retryDelay) - continue - } - Logger.Info(ctx, "成功获取 Redis 连接") - - // 2. 启动心跳保活协程,防止连接因空闲被断开 - heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) - go func() { - ticker := time.NewTicker(heartbeatInterval) - defer func() { - ticker.Stop() - heartbeatCancel() - }() - - for { - select { - case <-heartbeatCtx.Done(): - Logger.Info(ctx, "心跳协程退出") - return - case <-ticker.C: - // 发送 PING 心跳,保持连接活跃 - _, pingErr := conn.Do(ctx, "PING") - if pingErr != nil { - Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr) - // 心跳失败时主动关闭连接,触发外层重连 - _ = conn.Close(ctx) - return - } - Logger.Debug(ctx, "Redis 心跳发送成功,连接正常") - } - } - }() - - // 3. 订阅主题 - _, err = conn.Do(ctx, "subscribe", subscribeTopic) - if err != nil { - Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err) - heartbeatCancel() // 关闭心跳协程 - _ = conn.Close(ctx) - time.Sleep(retryDelay) - continue - } - Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) - _, err = conn.Do(ctx, "subscribe", "sun:join:2458") //加入队列 - - // 4. 循环接收消息 - connError := false - for !connError { - select { - case <-ctx.Done(): - connError = true - continue - default: - } - - // 接收消息 - data, err := conn.Receive(ctx) - if err != nil { - Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) - connError = true // 标记连接错误,触发重连 - continue - } - - // 处理消息(保留原有业务逻辑) - if data != nil { - dataMap, ok := data.Interface().(*gredis.Message) - if !ok { - continue - } - // if dataMap. == "subscribe" { - // continue - // } - if dataMap.Channel == subscribeTopic { - Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload) - err := RunFunc(ctx, dataMap.Payload) - if err != nil { - Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err) - } - } - if dataMap.Channel == "sun:join:2458" { - - println("收到sun:join:2458", dataMap.Payload) - //universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) - zs.Add(1001, User{ID: 1001, JoinTime: time.Now().Unix(), Score: 1000}) - if zs.Length() > 2 { - zs.FindNext(func(i User) bool { return i.ID > 1000 }) - //找到上一个,如果区间分数少于一定, - //直接进行匹配 - } - - } - } - } - - // 5. 清理资源,准备重连 - heartbeatCancel() // 关闭心跳协程 - _ = conn.Close(ctx) // 关闭当前连接 - // Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) - time.Sleep(retryDelay) - } -} diff --git a/common/cool/user.go b/common/cool/user.go deleted file mode 100644 index 41db76fba..000000000 --- a/common/cool/user.go +++ /dev/null @@ -1,25 +0,0 @@ -package cool - -import "github.com/liwnn/zset" - -type User struct { - JoinTime int64 - ID int - Score int -} - -func (u User) Key() uint32 { - return uint32(u.ID) -} - -// 如果分数不对的话,就按时间排序 -func (u User) Less(than User) bool { - if u.Score == than.Score { - return u.JoinTime < than.JoinTime - } - return u.Score < than.Score -} - -var zs = zset.New[uint32, User](func(a, b User) bool { - return a.Less(b) -}) diff --git a/common/rpc/func.go b/common/rpc/func.go new file mode 100644 index 000000000..636b24d14 --- /dev/null +++ b/common/rpc/func.go @@ -0,0 +1,240 @@ +package rpc + +import ( + "blazing/cool" + "context" + "time" + + "github.com/gogf/gf/v2/database/gredis" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" +) + +// ListenFunc 监听函数 +// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活 +func ListenFunc(ctx g.Ctx) { + if !cool.IsRedisMode { + panic(gerror.New("集群模式下, 请使用Redis作为缓存")) + } + + // 定义常量配置 + const ( + subscribeTopic = "cool:func" // 订阅的主题 + retryDelay = 10 * time.Second // 连接失败重试间隔 + heartbeatInterval = 30 * time.Second // 心跳保活间隔 + ) + + // 外层循环:负责连接断开后的整体重连 + for { + // 检查上下文是否已取消,优雅退出 + select { + case <-ctx.Done(): + cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") + return + default: + } + + // 1. 建立 Redis 连接 + conn, err := g.Redis("cool").Conn(ctx) + if err != nil { + cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) + time.Sleep(retryDelay) + continue + } + cool.Logger.Info(ctx, "成功获取 Redis 连接") + + // 2. 启动心跳保活协程,防止连接因空闲被断开 + heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) + go func() { + ticker := time.NewTicker(heartbeatInterval) + defer func() { + ticker.Stop() + heartbeatCancel() + }() + + for { + select { + case <-heartbeatCtx.Done(): + cool.Logger.Info(ctx, "心跳协程退出") + return + case <-ticker.C: + // 发送 PING 心跳,保持连接活跃 + _, pingErr := conn.Do(ctx, "PING") + if pingErr != nil { + cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr) + // 心跳失败时主动关闭连接,触发外层重连 + _ = conn.Close(ctx) + return + } + cool.Logger.Debug(ctx, "Redis 心跳发送成功,连接正常") + } + } + }() + + // 3. 订阅主题 + _, err = conn.Do(ctx, "subscribe", subscribeTopic) + if err != nil { + cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err) + heartbeatCancel() // 关闭心跳协程 + _ = conn.Close(ctx) + time.Sleep(retryDelay) + continue + } + cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) + _, err = conn.Do(ctx, "subscribe", "sun:join:2458") //加入队列 + + // 4. 循环接收消息 + connError := false + for !connError { + select { + case <-ctx.Done(): + connError = true + continue + default: + } + + // 接收消息 + data, err := conn.Receive(ctx) + if err != nil { + cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) + connError = true // 标记连接错误,触发重连 + continue + } + + // 处理消息(保留原有业务逻辑) + if data != nil { + dataMap, ok := data.Interface().(*gredis.Message) + if !ok { + continue + } + // if dataMap. == "subscribe" { + // continue + // } + if dataMap.Channel == subscribeTopic { + cool.Logger.Debug(ctx, "执行函数", "payload", dataMap.Payload) + err := cool.RunFunc(ctx, dataMap.Payload) + if err != nil { + cool.Logger.Error(ctx, "执行函数失败", "payload", dataMap.Payload, "error", err) + } + } + if dataMap.Channel == "sun:join:2458" { + + fightmap.ADD(dataMap.Payload) + //universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) + + } + } + } + + // 5. 清理资源,准备重连 + heartbeatCancel() // 关闭心跳协程 + _ = conn.Close(ctx) // 关闭当前连接 + // Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) + time.Sleep(retryDelay) + } +} + +func ListenFight(ctx g.Ctx) { + + // 定义常量配置 + const ( + retryDelay = 10 * time.Second // 连接失败重试间隔 + heartbeatInterval = 30 * time.Second // 心跳保活间隔 + ) + + // 外层循环:负责连接断开后的整体重连 + for { + // 检查上下文是否已取消,优雅退出 + select { + case <-ctx.Done(): + cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") + return + default: + } + + // 1. 建立 Redis 连接 + conn, err := g.Redis("cool").Conn(ctx) + if err != nil { + cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) + time.Sleep(retryDelay) + continue + } + cool.Logger.Info(ctx, "成功获取 Redis 连接") + + // 2. 启动心跳保活协程,防止连接因空闲被断开 + heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) + go func() { + ticker := time.NewTicker(heartbeatInterval) + defer func() { + ticker.Stop() + heartbeatCancel() + }() + + for { + select { + case <-heartbeatCtx.Done(): + cool.Logger.Info(ctx, "心跳协程退出") + return + case <-ticker.C: + // 发送 PING 心跳,保持连接活跃 + _, pingErr := conn.Do(ctx, "PING") + if pingErr != nil { + cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr) + // 心跳失败时主动关闭连接,触发外层重连 + _ = conn.Close(ctx) + return + } + cool.Logger.Debug(ctx, "Redis 心跳发送成功,连接正常") + } + } + }() + + // cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) + //房主服务器拉取之后,所有操作通过redie转发到房主服务器去执行,非房主方只进行收包操作 + conn.Do(ctx, "subscribe", "sun:start:"+gconv.String(cool.Config.ServerInfo.Port)) //发起战斗 ,由房主服务器主动拉取战斗实现 + conn.Do(ctx, "subscribe", "sun:sendpack"+gconv.String(cool.Config.ServerInfo.Port)) //接受战斗的信息,直接接受包转发就行 + // 4. 循环接收消息 + connError := false + for !connError { + select { + case <-ctx.Done(): + connError = true + continue + default: + } + + // 接收消息 + data, err := conn.Receive(ctx) + if err != nil { + cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) + connError = true // 标记连接错误,触发重连 + continue + } + + // 处理消息(保留原有业务逻辑) + if data != nil { + dataMap, ok := data.Interface().(*gredis.Message) + if !ok { + continue + } + // if dataMap. == "subscribe" { + // continue + // } + + if dataMap.Channel == "sun:join:2458" { + + fightmap.ADD(dataMap.Payload) + //universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) + + } + } + } + + // 5. 清理资源,准备重连 + heartbeatCancel() // 关闭心跳协程 + _ = conn.Close(ctx) // 关闭当前连接 + // Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) + time.Sleep(retryDelay) + } +} diff --git a/common/rpc/user.go b/common/rpc/user.go new file mode 100644 index 000000000..cfde28a30 --- /dev/null +++ b/common/rpc/user.go @@ -0,0 +1,90 @@ +package rpc + +import ( + "blazing/logic/service/common" + "blazing/modules/player/service" + "errors" + "strings" + "time" + + "github.com/gogf/gf/v2/util/gconv" + "github.com/liwnn/zset" + csmap "github.com/mhmtszr/concurrent-swiss-map" +) + +type RPCfight struct { + fightmap *csmap.CsMap[int, common.FightI] + zs *zset.ZSet[uint32, User] +} + +// ExtractBetweenBrackets 提取字符串中第一个 [] 中间的文本 +// 返回值:中间文本、是否成功、错误信息 +func ExtractBetweenBrackets(s string) (string, bool, error) { + // 1. 找到第一个 [ 的索引 + leftIdx := strings.Index(s, "[") + if leftIdx == -1 { + return "", false, errors.New("未找到左中括号 [") + } + + // 2. 找到第一个 [ 之后的第一个 ] 的索引 + rightIdx := strings.Index(s[leftIdx+1:], "]") + if rightIdx == -1 { + return "", false, errors.New("找到左中括号 [ 但未找到对应的右中括号 ]") + } + + // 3. 计算实际的右中括号索引(加上 leftIdx+1) + rightIdx += leftIdx + 1 + + // 4. 提取中间文本(去除前后空格,可选) + result := strings.TrimSpace(s[leftIdx+1 : rightIdx]) + + // 5. 检查是否为空 + if result == "" { + return "", true, errors.New("中括号中间无文本") + } + + return result, true, nil +} + +func (r *RPCfight) ADD(s string) { + println("收到sun:join:2458", s) + t, _, _ := ExtractBetweenBrackets(s) + ret := service.NewPVPService(gconv.Uint32(t)).Get(gconv.Uint32(t)) + score := 1000 + if ret == nil { + score = int(ret.RankInfo.Score) + } + r.zs.Add(gconv.Uint32(t), User{ID: gconv.Uint32(t), JoinTime: time.Now().Unix(), Score: score}) + if r.zs.Length() > 2 { + r.zs.FindPrev(func(i User) bool { return i.Score > score }) + //找到上一个,如果区间分数少于一定, + //直接进行匹配 + } +} + +type User struct { + JoinTime int64 + ID uint32 + Score int +} + +func (u User) Key() uint32 { + return uint32(u.ID) +} + +// 如果分数不对的话,就按时间排序 +func (u User) Less(than User) bool { + if u.Score == than.Score { + return u.JoinTime < than.JoinTime + } + return u.Score < than.Score +} + +///定义map,存储用户对战斗容器的映射,便于外部传入时候进行直接操作 + +var fightmap = RPCfight{ + fightmap: csmap.New[int, common.FightI](), + zs: zset.New[uint32, User](func(a, b User) bool { + return a.Less(b) + }), +} diff --git a/login/internal/cmd/cmd.go b/login/internal/cmd/cmd.go index 244391bfd..b61193420 100644 --- a/login/internal/cmd/cmd.go +++ b/login/internal/cmd/cmd.go @@ -1,6 +1,7 @@ package cmd import ( + "blazing/common/rpc" "blazing/cool" "context" "time" @@ -30,7 +31,7 @@ var ( cool.Config.ServerInfo.IsDebug = 1 } if cool.IsRedisMode { - go cool.ListenFunc(ctx) + go rpc.ListenFunc(ctx) } // // 从文件加载IP数据库 if err := qqwry.LoadFile("public/qqwry.ipdb"); err != nil { diff --git a/modules/player/model/fightinfo.go b/modules/player/model/fightinfo.go new file mode 100644 index 000000000..09bd7aa27 --- /dev/null +++ b/modules/player/model/fightinfo.go @@ -0,0 +1,32 @@ +package model + +import "blazing/cool" + +// PVPMatchRecord PVP单场记录 +// 记录每一场的详细信息,便于复盘和统计 +type PVPMatchRecord struct { + *cool.Model + Noteinfo NoteReadyToFightInfo `gorm:"type:jsonb; " ` //携带信息 + FightOver FightOverInfo `gorm:"type:jsonb; " ` //结束信息 + //ScoreChange int32 `json:"score_change"` // 本场积分变化(+10/-5等) + // WinStreak uint32 `json:"win_streak"` // 本场结束后的连胜数 + +} + +// 表名常量 +const TableNameFightinfo = "player_fight_info" + +// TableName 返回表名 +func (*PVPMatchRecord) TableName() string { + return TableNameFightinfo +} + +// GroupName 返回表组名 +func (*PVPMatchRecord) GroupName() string { + return "default" +} + +// init 程序启动时自动创建表 +func init() { + cool.CreateTable(&PVPMatchRecord{}) +} diff --git a/modules/player/model/pvp.go b/modules/player/model/pvp.go index e858c8ffb..20542e9de 100644 --- a/modules/player/model/pvp.go +++ b/modules/player/model/pvp.go @@ -12,42 +12,33 @@ const TableNamePlayerPVP = "player_pvp" // PVP 对应数据库表 player_pvp,用于记录用户PVP赛季数据及场次统计 type PVP struct { - Base + *cool.Model PlayerID uint64 `gorm:"not null;index:idx_pvp_player_id;comment:'所属玩家ID'" json:"player_id"` - //SeasonID uint32 `gorm:"not null;index:idx_pvp_season_id;comment:'赛季ID(如2026S1=101)'" json:"season_id"` - SeasonData []PVPRankInfo `gorm:"type:jsonb;not null;comment:'赛季核心数据'" json:"season_data"` - MatchRecords []PVPMatchRecord `gorm:"type:jsonb;not null;comment:'本赛季场次记录(仅存最近100场)'" json:"match_records"` - RankInfo PVPRankInfo `gorm:"type:jsonb;not null;comment:'本赛季排名信息'" json:"rank_info"` + + SeasonData []PVPRankInfo `gorm:"type:jsonb;not null;comment:'赛季核心数据'" json:"season_data"` + + RankInfo PVPRankInfo `gorm:"type:jsonb;not null;comment:'本赛季排名信息'" json:"rank_info"` +} + +func NewPVP() *PVP { + return &PVP{ + Model: cool.NewModel(), + } } // PVPSeasonData PVP赛季核心统计数据 // 聚合维度:总场次、胜负、积分、胜率等 type PVPRankInfo struct { - Rank uint32 `json:"rank"` // 本赛季全服排名(0=未上榜) - Segment uint32 `json:"segment"` // 段位ID(如1=青铜 2=白银...) - SegmentStar uint32 `json:"segment_star"` // 段位星级(如青铜3星) - NextSegmentScore int32 `json:"next_segment_score"` // 升段所需积分 - TotalMatch uint32 `json:"total_match"` // 本赛季总场次 - WinMatch uint32 `json:"win_match"` // 本赛季胜利场次 - LoseMatch uint32 `json:"lose_match"` // 本赛季失败场次 - DrawMatch uint32 `json:"draw_match"` // 本赛季平局场次 - TotalScore int32 `json:"total_score"` // 本赛季总积分(胜加负减) - HighestScore int32 `json:"highest_score"` // 本赛季最高积分 - ContinuousWin uint32 `json:"continuous_win"` // 本赛季最高连胜次数 - LastMatchTime uint64 `json:"last_match_time"` // 最后一场PVP时间(时间戳) -} - -// PVPMatchRecord PVP单场记录 -// 记录每一场的详细信息,便于复盘和统计 -type PVPMatchRecord struct { - MatchID string `json:"match_id"` // 匹配局ID(全局唯一) - MatchTime uint64 `json:"match_time"` // 对局时间(时间戳) - NoteReadyToFightInfo - Result uint32 `json:"result"` // 对局结果:0=负 1=胜 2=平 - ScoreChange int32 `json:"score_change"` // 本场积分变化(+10/-5等) - UsedPetIDs []uint32 `json:"used_pet_ids"` // 本场使用的精灵ID列表 - WinStreak uint32 `json:"win_streak"` // 本场结束后的连胜数 - Duration uint32 `json:"duration"` // 对局时长(秒) + Rank uint32 `json:"rank"` // 本赛季全服排名(0=未上榜) + Score int32 `json:"score"` // 当前积分 + TotalMatch uint32 `json:"total_match"` // 本赛季总场次 + WinMatch uint32 `json:"win_match"` // 本赛季胜利场次 + LoseMatch uint32 `json:"lose_match"` // 本赛季失败场次 + DrawMatch uint32 `json:"draw_match"` // 本赛季平局场次 + TotalScore int32 `json:"total_score"` // 本赛季总积分(胜加负减) + HighestScore int32 `json:"highest_score"` // 本赛季最高积分 + ContinuousWin uint32 `json:"continuous_win"` // 本赛季最高连胜次数 + LastMatchTime uint64 `json:"last_match_time"` // 最后一场PVP时间(时间戳) } // NoteReadyToFightInfo 战斗准备就绪消息结构体,NoteReadyToFightInfo @@ -127,6 +118,7 @@ type FightOverInfo struct { Winpet *PetInfo `struc:"skip"` Round uint32 `struc:"skip"` LastAttavue AttackValue `struc:"skip"` + Duration uint32 `struc:"skip"` // 对局时长(秒) //7 切磋结束 Reason EnumBattleOverReason // 固定值0 WinnerId uint32 // 胜者的米米号 野怪为0 diff --git a/modules/player/service/pvp.go b/modules/player/service/pvp.go new file mode 100644 index 000000000..fb2d83d03 --- /dev/null +++ b/modules/player/service/pvp.go @@ -0,0 +1,30 @@ +package service + +import ( + "blazing/cool" + "blazing/modules/player/model" +) + +type PVPService struct { + BaseService +} + +func NewPVPService(id uint32) *PVPService { + return &PVPService{ + + BaseService: BaseService{userid: id, + + Service: &cool.Service{Model: model.NewPVP()}, + }, + } + +} +func (s *PVPService) Get(userid uint32) (ret *model.PVP) { + + //todo待测试 + + m := s.dbm_fix(s.Model) + + m.Scan(&ret) + return +}