diff --git a/common/cool/cool.go b/common/cool/cool.go index c5fa7c2d0..642e93209 100644 --- a/common/cool/cool.go +++ b/common/cool/cool.go @@ -98,7 +98,7 @@ func Fail(message string) *BaseRes { func RedisDo(ctx context.Context, funcstring string, a ...any) { - conn, err := g.Redis("cool").Conn(ctx) + conn, err := Redis.Conn(ctx) if err != nil { panic(err) } diff --git a/common/cool/coolconfig/config.go b/common/cool/coolconfig/config.go index 8f4a8ed71..3b44a9bbc 100644 --- a/common/cool/coolconfig/config.go +++ b/common/cool/coolconfig/config.go @@ -4,6 +4,7 @@ import ( "time" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" ) // cool config @@ -46,6 +47,10 @@ type ServerList struct { ExpireTime time.Time `gorm:"default:0;comment:'到期时间'" json:"expire_time"` } +func (s *ServerList) GetID() string { + return gconv.String(100000*s.OnlineID + s.Port) +} + // OSS相关配置 type oss struct { Endpoint string `json:"endpoint"` diff --git a/common/cool/func.go b/common/cool/func.go index cc0af2d0b..c3769a28a 100644 --- a/common/cool/func.go +++ b/common/cool/func.go @@ -52,7 +52,7 @@ func RunFunc(ctx g.Ctx, funcstring string) (err error) { // ClusterRunFunc 集群运行函数,如果是单机模式, 则直接运行函数 func ClusterRunFunc(ctx g.Ctx, funcstring string) (err error) { if IsRedisMode { - conn, err := g.Redis("cool").Conn(ctx) + conn, err := Redis.Conn(ctx) if err != nil { return err } diff --git a/common/rpc/func.go b/common/rpc/func.go index 3fc6d4804..a254e0f89 100644 --- a/common/rpc/func.go +++ b/common/rpc/func.go @@ -3,12 +3,12 @@ package rpc import ( "blazing/cool" "context" + "fmt" "time" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/util/gconv" ) // ListenFunc 监听函数 @@ -36,7 +36,7 @@ func ListenFunc(ctx g.Ctx) { } // 1. 建立 Redis 连接 - conn, err := g.Redis("cool").Conn(ctx) + conn, err := cool.Redis.Conn(ctx) if err != nil { cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) time.Sleep(retryDelay) @@ -135,26 +135,36 @@ func ListenFunc(ctx g.Ctx) { } } +// ListenFight 完全对齐 ListenFunc 写法,修复收不到消息问题 func ListenFight(ctx g.Ctx) { + if !cool.IsRedisMode { + panic(gerror.New("集群模式下, 请使用Redis作为缓存")) + } - // 定义常量配置 + // 定义常量配置(对齐 ListenFunc 风格) const ( retryDelay = 10 * time.Second // 连接失败重试间隔 heartbeatInterval = 30 * time.Second // 心跳保活间隔 ) + // 提前拼接订阅主题(避免重复拼接,便于日志打印) + serverID := cool.Config.ServerInfo.GetID() + startTopic := "sun:start:" + serverID + sendPackTopic := "sendpack:" + serverID + // 外层循环:负责连接断开后的整体重连 for { - // 检查上下文是否已取消,优雅退出 + + // 检查上下文是否已取消,优雅退出(对齐 ListenFunc) select { case <-ctx.Done(): - cool.Logger.Info(ctx, "ListenFunc 上下文已取消,退出监听") + cool.Logger.Info(ctx, "ListenFight 上下文已取消,退出监听") return default: } - // 1. 建立 Redis 连接 - conn, err := g.Redis("cool").Conn(ctx) + // 1. 建立 Redis 连接(完全对齐 ListenFunc) + conn, err := cool.Redis.Conn(ctx) if err != nil { cool.Logger.Error(ctx, "获取 Redis 连接失败", "error", err, "retry_after", retryDelay) time.Sleep(retryDelay) @@ -162,7 +172,7 @@ func ListenFight(ctx g.Ctx) { } cool.Logger.Info(ctx, "成功获取 Redis 连接") - // 2. 启动心跳保活协程,防止连接因空闲被断开 + // 2. 启动心跳保活协程(完全对齐 ListenFunc 逻辑) heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) go func() { ticker := time.NewTicker(heartbeatInterval) @@ -190,11 +200,33 @@ func ListenFight(ctx g.Ctx) { } }() - // cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) - //房主服务器拉取之后,所有操作通过redie转发到房主服务器去执行,非房主方只进行收包操作 - conn.Do(ctx, "subscribe", "sun:start:"+gconv.String(100000*cool.Config.ServerInfo.OnlineID+cool.Config.ServerInfo.Port)) //发起战斗 ,由房主服务器主动拉取战斗实现 - conn.Do(ctx, "subscribe", "sun:sendpack:"+gconv.String(100000*cool.Config.ServerInfo.OnlineID+cool.Config.ServerInfo.Port)) //接受战斗的信息,直接接受包转发就行 - // 4. 循环接收消息 + // 3. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连) + // 订阅 sun:start:服务器ID + _, err = conn.Do(ctx, "subscribe", startTopic) + if err != nil { + cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", startTopic, "error", err) + heartbeatCancel() // 关闭心跳协程 + _ = conn.Close(ctx) + time.Sleep(retryDelay) + continue + } + cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", startTopic) + + // // 订阅 sun:sendpack:服务器ID + // _, err = conn.Do(ctx, "subscribe", sendPackTopic) + // if err != nil { + // cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", sendPackTopic, "error", err) + // heartbeatCancel() // 关闭心跳协程 + // _ = conn.Close(ctx) + // time.Sleep(retryDelay) + // continue + // } + // cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", sendPackTopic) + + // 打印监听提示(保留原有日志) + fmt.Println("监听战斗", startTopic) + + // 4. 循环接收消息(完全对齐 ListenFunc 逻辑) connError := false for !connError { select { @@ -204,7 +236,7 @@ func ListenFight(ctx g.Ctx) { default: } - // 接收消息 + // 接收消息(和 ListenFunc 保持一致的 Receive 用法) data, err := conn.Receive(ctx) if err != nil { cool.Logger.Error(ctx, "Redis PubSub Receive 失败", "error", err) @@ -212,29 +244,29 @@ func ListenFight(ctx g.Ctx) { continue } - // 处理消息(保留原有业务逻辑) + // 处理消息(完全对齐 ListenFunc 的解析逻辑) if data != nil { dataMap, ok := data.Interface().(*gredis.Message) if !ok { continue } - // if dataMap. == "subscribe" { - // continue - // } - if dataMap.Channel == "sun:join:2458" { - - fightmap.ADD(dataMap.Payload) - //universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) + // 处理 sun:start:服务器ID 消息 + if dataMap.Channel == startTopic { + fmt.Println("战斗开始", dataMap.Payload) + // universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) + } + // 【可选】处理 sun:sendpack:服务器ID 消息(如果需要) + if dataMap.Channel == sendPackTopic { + fmt.Println("收到战斗包", dataMap.Payload) } } } - // 5. 清理资源,准备重连 + // 5. 清理资源,准备重连(完全对齐 ListenFunc) heartbeatCancel() // 关闭心跳协程 _ = conn.Close(ctx) // 关闭当前连接 - // Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) time.Sleep(retryDelay) } } diff --git a/common/rpc/user.go b/common/rpc/user.go index 25649736d..81b5827e0 100644 --- a/common/rpc/user.go +++ b/common/rpc/user.go @@ -1,13 +1,18 @@ package rpc import ( + "blazing/common/data/share" + "blazing/cool" "blazing/logic/service/common" "blazing/logic/service/fight/info" "blazing/modules/player/model" "blazing/modules/player/service" + "context" "encoding/json" "time" + "github.com/gogf/gf/v2/os/gtime" + "github.com/gogf/gf/v2/util/gconv" "github.com/liwnn/zset" csmap "github.com/mhmtszr/concurrent-swiss-map" ) @@ -20,22 +25,26 @@ type RPCfight struct { func (r *RPCfight) join(pvp info.RPCFightinfo) { ret := service.NewPVPService(pvp.PlayerID).Get(pvp.PlayerID) - score := 800 - if ret != nil { - score = int(ret.RankInfo[len(ret.RankInfo)-1].Score) - } - r.zs.Add(pvp.PlayerID, - ret) - if r.zs.Length() > 2 { - u, s := r.zs.FindPrev(func(i *model.PVP) bool { return i.RankInfo[len(ret.RankInfo)-1].Score > score }) + ret.RankInfo.LastMatchTime = gtime.Now() - diff := s - score + r.zs.Add(pvp.PlayerID, ret) + if r.zs.Length() > 1 { + u, _ := r.zs.FindNext(func(i *model.PVP) bool { return i.RankInfo.Score >= ret.RankInfo.Score }) + + diff := u.RankInfo.Score - ret.RankInfo.Score // 等待越久,允许区间越大 - wait := time.Now().Sub(u.RankInfo[len(ret.RankInfo)-1].LastMatchTime.Time).Seconds() + wait := time.Now().Sub(u.RankInfo.LastMatchTime.Time).Seconds() maxAllow := 100 + int(wait)*10 if diff < maxAllow { //找到上一个,如果区间分数少于一定, //直接进行匹配 + useid1, _ := share.ShareManager.GetUserOnline(u.PlayerID) + cool.RedisDo(context.TODO(), "sun:start:"+gconv.String(useid1), info.RPCFightStartinfo{ + Serverid: int(useid1), + PlayerID: u.PlayerID, + Mode: pvp.Mode, + Status: pvp.Status, + }) } } @@ -43,12 +52,13 @@ func (r *RPCfight) join(pvp info.RPCFightinfo) { func (r *RPCfight) ADD(s string) { println("收到sun:join", s) - var pvp info.RPCFightinfo + var pvp []info.RPCFightinfo + json.Unmarshal([]byte(s), &pvp) - if pvp.Type == 1 { - r.join(pvp) + if pvp[0].Type == 1 { + r.join(pvp[0]) } else { //==0 退出 - r.cancel(pvp) + r.cancel(pvp[0]) } } diff --git a/logic/controller/fight_巅峰.go b/logic/controller/fight_巅峰.go index 683936d19..42e53e132 100644 --- a/logic/controller/fight_巅峰.go +++ b/logic/controller/fight_巅峰.go @@ -21,6 +21,7 @@ func (h Controller) JoINtop(data *PetTOPLEVELnboundInfo, c *player.Player) (resu cool.RedisDo(context.TODO(), "sun:join", info.RPCFightinfo{ PlayerID: c.Info.UserID, Mode: data.Mode, + Type: 1, }) // // 类型断言为 UniversalClient diff --git a/logic/controller/login_main.go b/logic/controller/login_main.go index e5148d268..100eb3b2a 100644 --- a/logic/controller/login_main.go +++ b/logic/controller/login_main.go @@ -72,6 +72,7 @@ func (h Controller) Login(data *user.MAIN_LOGIN_IN, c gnet.Conn) (result *user.L // } // currentPlayer.Info.SetTask(314, model.Completed) // currentPlayer.Info.SetTask(315, model.Completed) + //fmt.Println("任务", 291, currentPlayer.Info.GetTask(145)) currentPlayer.IsLogin = true return result, 0 diff --git a/logic/server.go b/logic/server.go index 6587a30bc..4fdfb15f8 100644 --- a/logic/server.go +++ b/logic/server.go @@ -5,6 +5,7 @@ import ( "blazing/common/rpc" "blazing/common/socket" "blazing/cool" + "context" "blazing/logic/controller" @@ -60,7 +61,7 @@ func Start() { // if cool.Config.ServerInfo.IsVip != 0 { // g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(cool.Redis)) //设置数据库 // } - + go rpc.ListenFight(context.Background()) if cool.Config.ServerInfo.IsDebug == 1 { g.DB().SetDebug(true) @@ -81,7 +82,7 @@ func Start() { controller.Maincontroller.RPCClient = rpcClient //将RPC赋值Start - controller.Maincontroller.UID = uint32(100000*serverID + uint32(port)) //赋值服务器ID + controller.Maincontroller.UID = gconv.Uint32(cool.Config.ServerInfo.GetID()) //赋值服务器ID controller.Init(true) xmlres.Initfile() diff --git a/logic/service/fight/info/info.go b/logic/service/fight/info/info.go index 20d93f957..98bac6402 100644 --- a/logic/service/fight/info/info.go +++ b/logic/service/fight/info/info.go @@ -52,7 +52,7 @@ type Fightinfo struct { FightType uint32 } type RPCFightinfo struct { - Type int + Type int //加入还是退出 PlayerID uint32 // 战斗模式 1 = 1v1 2 = 6v6 3大乱斗 0 什么都不做 Mode uint32 @@ -62,8 +62,8 @@ type RPCFightinfo struct { // FightType uint32 } type RPCFightStartinfo struct { - Servierid int // 非0表示rpc跨服 - PlayerID uint32 + Serverid int // 非0表示rpc跨服 + PlayerID uint32 // 战斗模式 1 = 1v1 2 = 6v6 3大乱斗 0 什么都不做 Mode uint32 //Type uint32 //战斗类型 diff --git a/logic/service/player/rpc.go b/logic/service/player/rpc.go index 827e4a669..ca47dc2aa 100644 --- a/logic/service/player/rpc.go +++ b/logic/service/player/rpc.go @@ -1,9 +1,8 @@ package player import ( + "blazing/cool" "context" - - "github.com/gogf/gf/v2/frame/g" ) // rpc,跨服匹配的玩家,只做数据的传输 @@ -15,7 +14,7 @@ type RPC_player struct { func (f *RPC_player) SendPackCmd(cmd uint32, data any) { - conn, _ := g.Redis("cool").Conn(context.TODO()) + conn, _ := cool.Redis.Conn(context.TODO()) defer conn.Close(context.TODO()) conn.Do(context.TODO(), "publish", "sun:send", cmd, data) diff --git a/modules/player/controller/admin/pvp.go b/modules/player/controller/admin/pvp.go index 303c54934..d6f2022f9 100644 --- a/modules/player/controller/admin/pvp.go +++ b/modules/player/controller/admin/pvp.go @@ -37,7 +37,7 @@ func (c *PVPController) Get(ctx context.Context, req *GetPVPReq) (res *cool.Base ) alltitile := service.NewPVPService(uint32(admin.UserId)).Get(uint32(admin.UserId)) - res.Data = alltitile + res.Data = alltitile.RankInfo return } diff --git a/modules/player/model/pvp.go b/modules/player/model/pvp.go index ce012069e..9051f830f 100644 --- a/modules/player/model/pvp.go +++ b/modules/player/model/pvp.go @@ -10,25 +10,23 @@ import ( // 表名常量 const TableNamePlayerPVP = "player_pvp" +const Curpvp = 0 // PVP 对应数据库表 player_pvp,用于记录用户PVP赛季数据及场次统计 type PVP struct { *cool.Model PlayerID uint32 `gorm:"not null;index:idx_pvp_player_id;comment:'所属玩家ID'" json:"player_id"` + //第几赛季 + Season uint32 `gorm:"not null;default:0;comment:'赛季R'" json:"season"` //本赛季排名信息'通过下标来确认当前赛季 - RankInfo []PVPRankInfo `gorm:"type:jsonb;not null;comment:'赛季核心数据'" json:"season_data"` + RankInfo PVPRankInfo `gorm:"type:jsonb;not null;comment:'赛季核心数据'" json:"rank_info"` } -func (u *PVP) Key() uint32 { - return uint32(u.ID) -} - -// 如果分数不对的话,就按时间排序 func (u *PVP) Less(than *PVP) bool { - if u.RankInfo[len(u.RankInfo)-1].Score == than.RankInfo[len(u.RankInfo)-1].Score { - return u.RankInfo[len(u.RankInfo)-1].LastMatchTime.Unix() < than.RankInfo[len(u.RankInfo)-1].LastMatchTime.Unix() + if u.RankInfo.Score == than.RankInfo.Score { + return u.RankInfo.LastMatchTime.Unix() < than.RankInfo.LastMatchTime.Unix() } - return u.RankInfo[len(u.RankInfo)-1].Score < than.RankInfo[len(u.RankInfo)-1].Score + return u.RankInfo.Score < than.RankInfo.Score } func NewPVP() *PVP { return &PVP{ @@ -41,6 +39,7 @@ func NewPVP() *PVP { type PVPRankInfo struct { Rank uint32 `json:"rank"` // 本赛季全服排名(0=未上榜) Score int `json:"score"` // 当前积分 + WinStreak uint32 `json:"win_streak"` // 当前连胜场次 TotalMatch uint32 `json:"total_match"` // 本赛季总场次 WinMatch uint32 `json:"win_match"` // 本赛季胜利场次 LoseMatch uint32 `json:"lose_match"` // 本赛季失败场次 @@ -51,6 +50,12 @@ type PVPRankInfo struct { LastMatchTime *gtime.Time `json:"last_match_time"` // 最后一场PVP时间(时间戳) } +// func (u *PVPRankInfo) Key() uint32 { +// return uint32(u.ID) +// } + +// 如果分数不对的话,就按时间排序 + // NoteReadyToFightInfo 战斗准备就绪消息结构体,NoteReadyToFightInfo type NoteReadyToFightInfo struct { //MAXPET uint32 `struc:"skip"` // 最大精灵数 struc:"skip"` diff --git a/modules/player/service/pvp.go b/modules/player/service/pvp.go index 7bd3265c7..24a520250 100644 --- a/modules/player/service/pvp.go +++ b/modules/player/service/pvp.go @@ -23,14 +23,19 @@ func (s *PVPService) Get(userid uint32) (ret *model.PVP) { //todo待测试 - m := s.dbm_fix(s.Model) + m := s.dbm_fix(s.Model).Where("season", model.Curpvp) m.Scan(&ret) if ret == nil { ret = &model.PVP{ PlayerID: uint32(userid), - RankInfo: make([]model.PVPRankInfo, 0), + Season: uint32(model.Curpvp), + RankInfo: model.PVPRankInfo{ + Score: 800, + }, } + s.dbm_fix(s.Model).Data(ret).Insert() } + return }