diff --git a/common/rpc/func.go b/common/rpc/func.go index a5e3cce73..ec74875da 100644 --- a/common/rpc/func.go +++ b/common/rpc/func.go @@ -2,6 +2,8 @@ package rpc import ( "blazing/cool" + "blazing/logic/service/fight/pvp" + "blazing/logic/service/fight/pvpwire" "context" "fmt" "time" @@ -9,6 +11,7 @@ import ( "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" ) // ListenFunc 监听函数 @@ -150,6 +153,8 @@ func ListenFight(ctx g.Ctx) { serverID := cool.Config.ServerInfo.GetID() startTopic := "sun:start:" + serverID sendPackTopic := "sendpack:" + serverID + pvpServerTopic := pvpwire.ServerTopic(gconv.Uint32(serverID)) + pvpCoordinatorTopic := pvpwire.CoordinatorTopicPrefix // 外层循环:负责连接断开后的整体重连 for { @@ -199,16 +204,26 @@ func ListenFight(ctx g.Ctx) { }() // 3. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连) - // 订阅 sun:start:服务器ID - _, err = conn.Do(ctx, "subscribe", startTopic) - if err != nil { - cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", startTopic, "error", err) - heartbeatCancel() // 关闭心跳协程 - _ = conn.Close(ctx) - time.Sleep(retryDelay) + subscribeTopics := []string{startTopic, pvpServerTopic} + if cool.Config.GameOnlineID == pvp.CoordinatorOnlineID { + subscribeTopics = append(subscribeTopics, pvpCoordinatorTopic) + } + subscribeFailed := false + for _, topic := range subscribeTopics { + _, err = conn.Do(ctx, "subscribe", topic) + if err != nil { + cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", topic, "error", err) + heartbeatCancel() + _ = conn.Close(ctx) + time.Sleep(retryDelay) + subscribeFailed = true + break + } + cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", topic) + } + if subscribeFailed { continue } - cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", startTopic) // // 订阅 sun:sendpack:服务器ID // _, err = conn.Do(ctx, "subscribe", sendPackTopic) @@ -255,6 +270,10 @@ func ListenFight(ctx g.Ctx) { // universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient) } + if dataMap.Channel == pvpServerTopic || dataMap.Channel == pvpCoordinatorTopic { + pvp.HandleRedisMessage(dataMap.Channel, dataMap.Payload) + } + // 【可选】处理 sun:sendpack:服务器ID 消息(如果需要) if dataMap.Channel == sendPackTopic { fmt.Println("收到战斗包", dataMap.Payload) diff --git a/logic/controller/fight_巅峰.go b/logic/controller/fight_巅峰.go index 42e53e132..75a87933e 100644 --- a/logic/controller/fight_巅峰.go +++ b/logic/controller/fight_巅峰.go @@ -2,12 +2,10 @@ package controller import ( "blazing/common/socket/errorcode" - "blazing/cool" "blazing/logic/service/common" "blazing/logic/service/fight" - "blazing/logic/service/fight/info" + "blazing/logic/service/fight/pvp" "blazing/logic/service/player" - "context" ) // 表示"宠物王加入"的入站消息数据 @@ -18,14 +16,22 @@ type PetTOPLEVELnboundInfo struct { } func (h Controller) JoINtop(data *PetTOPLEVELnboundInfo, c *player.Player) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) { - cool.RedisDo(context.TODO(), "sun:join", info.RPCFightinfo{ - PlayerID: c.Info.UserID, - Mode: data.Mode, - Type: 1, - }) - - // // 类型断言为 UniversalClient - // universalClient, _ := client.(goredis.UniversalClient) - // repo.NewPlayerRepository(universalClient).AddPlayerToPool(context.TODO(), data.Head.UserID, 1) + err = pvp.JoinPeakQueue(c, data.Mode) + if err != 0 { + return nil, err + } + return nil, -1 +} + +func (h Controller) CancelPeakQueue(data *fight.PeakQueueCancelInboundInfo, c *player.Player) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) { + pvp.CancelPeakQueue(c) + return nil, -1 +} + +func (h Controller) SubmitPeakBanPick(data *fight.PeakBanPickSubmitInboundInfo, c *player.Player) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) { + err = pvp.SubmitBanPick(c, data.SelectedCatchTimes, data.BanCatchTimes) + if err != 0 { + return nil, err + } return nil, -1 } diff --git a/logic/main.go b/logic/main.go index 897bdac7c..1aa0bbb42 100644 --- a/logic/main.go +++ b/logic/main.go @@ -13,7 +13,6 @@ import ( "github.com/gogf/gf/v2/os/gcmd" "github.com/gogf/gf/v2/os/gproc" - "blazing/common/data/xmlres" "blazing/logic/service/player" "blazing/cool" diff --git a/logic/service/fight/cmd.go b/logic/service/fight/cmd.go index 75919ff1b..d51b7ecfa 100644 --- a/logic/service/fight/cmd.go +++ b/logic/service/fight/cmd.go @@ -68,6 +68,52 @@ type PetKingJoinInboundInfo struct { FightType uint32 // 仅当Type为11时有效 } +// PeakQueueCancelInboundInfo 取消跨服巅峰队列。 +type PeakQueueCancelInboundInfo struct { + Head common.TomeeHeader `cmd:"2459" struc:"skip"` +} + +// PeakBanPickSubmitInboundInfo 提交 ban/pick 结果。 +type PeakBanPickSubmitInboundInfo struct { + Head common.TomeeHeader `cmd:"2460" struc:"skip"` + + SelectedCatchTimesLen uint32 `struc:"sizeof=SelectedCatchTimes"` + SelectedCatchTimes []uint32 `json:"selectedCatchTimes"` + + BanCatchTimesLen uint32 `struc:"sizeof=BanCatchTimes"` + BanCatchTimes []uint32 `json:"banCatchTimes"` +} + +// CrossServerBanPickStartOutboundInfo 跨服匹配成功后,通知前端进入 ban/pick。 +type CrossServerBanPickPetInfo struct { + CatchTime uint32 `json:"catchTime"` + PetID uint32 `json:"petId"` + Name string `struc:"[16]byte" json:"name"` + Level uint32 `json:"level"` + Hp uint32 `json:"hp"` + MaxHp uint32 `json:"maxHp"` +} + +type CrossServerBanPickStartOutboundInfo struct { + SessionIDLen uint32 `struc:"sizeof=SessionID"` + SessionID string `json:"sessionId"` + + OpponentUserID uint32 `json:"opponentUserId"` + OpponentNick string `struc:"[16]byte" json:"opponentNick"` + + FightMode uint32 `json:"fightMode"` + Status uint32 `json:"status"` + + TimeoutSeconds uint32 `json:"timeoutSeconds"` + SelectableCount uint32 `json:"selectableCount"` + + MyPetsLen uint32 `struc:"sizeof=MyPets"` + MyPets []CrossServerBanPickPetInfo `json:"myPets"` + + OpponentPetsLen uint32 `struc:"sizeof=OpponentPets"` + OpponentPets []CrossServerBanPickPetInfo `json:"opponentPets"` +} + // HandleFightInviteInboundInfo 处理战斗邀请的入站消息 type HandleFightInviteInboundInfo struct { diff --git a/logic/service/fight/pvp/proxy.go b/logic/service/fight/pvp/proxy.go new file mode 100644 index 000000000..88f5b4995 --- /dev/null +++ b/logic/service/fight/pvp/proxy.go @@ -0,0 +1,121 @@ +package pvp + +import ( + "blazing/logic/service/common" + "blazing/logic/service/fight/info" + "blazing/logic/service/fight/pvpwire" + "blazing/modules/player/model" +) + +type RemoteFightProxy struct { + sessionID string + hostServerID uint32 + userID uint32 +} + +func NewRemoteFightProxy(sessionID string, hostServerID, userID uint32) *RemoteFightProxy { + return &RemoteFightProxy{ + sessionID: sessionID, + hostServerID: hostServerID, + userID: userID, + } +} + +func (r *RemoteFightProxy) relay(payload pvpwire.BattleCommandPayload) { + if r == nil { + return + } + payload.SessionID = r.sessionID + payload.UserID = r.userID + _ = publishServerMessage(pvpwire.ServerTopic(r.hostServerID), pvpwire.MessageTypeBattleCommand, payload) +} + +func (r *RemoteFightProxy) Over(_ common.PlayerI, reason model.EnumBattleOverReason) { + r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandEscape, Reason: uint32(reason)}) +} + +func (r *RemoteFightProxy) UseSkill(_ common.PlayerI, id uint32) { + r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandUseSkill, SkillID: id}) +} + +func (r *RemoteFightProxy) UseSkillAt(_ common.PlayerI, id uint32, actorIndex, targetIndex int) { + r.relay(pvpwire.BattleCommandPayload{ + Command: pvpwire.CommandUseSkillAt, + SkillID: id, + ActorIndex: actorIndex, + TargetIndex: targetIndex, + }) +} + +func (r *RemoteFightProxy) GetCurrPET(common.PlayerI) *info.BattlePetEntity { + return nil +} + +func (r *RemoteFightProxy) GetCurrPETAt(common.PlayerI, int) *info.BattlePetEntity { + return nil +} + +func (r *RemoteFightProxy) GetOverInfo() model.FightOverInfo { + return model.FightOverInfo{} +} + +func (r *RemoteFightProxy) Ownerid() uint32 { + return r.userID +} + +func (r *RemoteFightProxy) ReadyFight(common.PlayerI) { + r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandReady}) +} + +func (r *RemoteFightProxy) ChangePet(_ common.PlayerI, id uint32) { + r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandChangePet, CatchTime: id}) +} + +func (r *RemoteFightProxy) ChangePetAt(_ common.PlayerI, id uint32, actorIndex int) { + r.relay(pvpwire.BattleCommandPayload{ + Command: pvpwire.CommandChangePetAt, + CatchTime: id, + ActorIndex: actorIndex, + }) +} + +func (r *RemoteFightProxy) Capture(common.PlayerI, uint32) { +} + +func (r *RemoteFightProxy) LoadPercent(_ common.PlayerI, percent int32) { + r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandLoadPercent, Percent: percent}) +} + +func (r *RemoteFightProxy) UseItem(_ common.PlayerI, catchTime, itemID uint32) { + r.relay(pvpwire.BattleCommandPayload{ + Command: pvpwire.CommandUseItem, + CatchTime: catchTime, + ItemID: itemID, + }) +} + +func (r *RemoteFightProxy) UseItemAt(_ common.PlayerI, catchTime, itemID uint32, actorIndex, targetIndex int) { + r.relay(pvpwire.BattleCommandPayload{ + Command: pvpwire.CommandUseItemAt, + CatchTime: catchTime, + ItemID: itemID, + ActorIndex: actorIndex, + TargetIndex: targetIndex, + }) +} + +func (r *RemoteFightProxy) Chat(_ common.PlayerI, msg string) { + r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandChat, Message: msg}) +} + +func (r *RemoteFightProxy) IsFirst(common.PlayerI) bool { + return false +} + +func (r *RemoteFightProxy) GetOverChan() chan struct{} { + return nil +} + +func (r *RemoteFightProxy) GetAttackValue(bool) *model.AttackValue { + return nil +} diff --git a/logic/service/fight/pvp/service.go b/logic/service/fight/pvp/service.go new file mode 100644 index 000000000..1ba204a11 --- /dev/null +++ b/logic/service/fight/pvp/service.go @@ -0,0 +1,791 @@ +package pvp + +import ( + "blazing/common/socket/errorcode" + "blazing/cool" + "blazing/logic/service/common" + "blazing/logic/service/fight" + fightinfo "blazing/logic/service/fight/info" + "blazing/logic/service/fight/pvpwire" + "blazing/logic/service/player" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "blazing/modules/player/model" + blservice "blazing/modules/player/service" +) + +const ( + CoordinatorOnlineID = 1 + queueHeartbeat = 3 * time.Second + queueTTL = 12 * time.Second + banPickTimeout = 45 * time.Second + banPickStartCmd = 2461 +) + +type localQueueTicket struct { + playerID uint32 + runtimeServerID uint32 + fightMode uint32 + status uint32 + stop chan struct{} + stopped uint32 +} + +func (t *localQueueTicket) Stop() { + if t == nil { + return + } + if atomic.CompareAndSwapUint32(&t.stopped, 0, 1) { + close(t.stop) + } +} + +type session struct { + payload pvpwire.MatchFoundPayload + hostSubmit *pvpwire.BanPickSubmitPayload + guestSubmit *pvpwire.BanPickSubmitPayload + stage string + hostedFight common.FightI + remoteProxy common.PlayerI + banPickDeadline time.Time +} + +type manager struct { + mu sync.RWMutex + queues map[uint32][]pvpwire.QueuePlayerSnapshot + lastSeen map[uint32]time.Time + localQueues map[uint32]*localQueueTicket + sessions map[string]*session + userSession map[uint32]string +} + +var defaultManager = &manager{ + queues: make(map[uint32][]pvpwire.QueuePlayerSnapshot), + lastSeen: make(map[uint32]time.Time), + localQueues: make(map[uint32]*localQueueTicket), + sessions: make(map[string]*session), + userSession: make(map[uint32]string), +} + +func Default() *manager { + return defaultManager +} + +func JoinPeakQueue(p *player.Player, requestedMode uint32) errorcode.ErrorCode { + if p == nil { + return errorcode.ErrorCodes.ErrSystemBusyTryLater + } + if err := p.CanFight(); err != 0 { + return err + } + fightMode, status, err := normalizePeakMode(requestedMode) + if err != 0 { + return err + } + + m := Default() + runtimeServerID := localRuntimeServerID() + ticket := &localQueueTicket{ + playerID: p.Info.UserID, + runtimeServerID: runtimeServerID, + fightMode: fightMode, + status: status, + stop: make(chan struct{}), + } + + m.mu.Lock() + if old := m.localQueues[p.Info.UserID]; old != nil { + old.Stop() + } + m.localQueues[p.Info.UserID] = ticket + delete(m.userSession, p.Info.UserID) + m.mu.Unlock() + + p.Fightinfo.Mode = fightMode + p.Fightinfo.Status = status + + go m.queueHeartbeatLoop(p, ticket) + return 0 +} + +func CancelPeakQueue(p *player.Player) { + if p == nil { + return + } + m := Default() + var ticket *localQueueTicket + m.mu.Lock() + ticket = m.localQueues[p.Info.UserID] + delete(m.localQueues, p.Info.UserID) + m.mu.Unlock() + if ticket != nil { + ticket.Stop() + _ = publishServerMessage(pvpwire.CoordinatorTopicPrefix, pvpwire.MessageTypeQueueCancel, pvpwire.QueueCancelPayload{ + RuntimeServerID: ticket.runtimeServerID, + UserID: ticket.playerID, + }) + } + atomic.StoreUint32(&p.Fightinfo.Mode, 0) + atomic.StoreUint32(&p.Fightinfo.Status, 0) +} + +func SubmitBanPick(p *player.Player, selected, banned []uint32) errorcode.ErrorCode { + if p == nil { + return errorcode.ErrorCodes.ErrSystemBusyTryLater + } + + m := Default() + m.mu.RLock() + sessionID := m.userSession[p.Info.UserID] + s := m.sessions[sessionID] + m.mu.RUnlock() + if s == nil { + return errorcode.ErrorCodes.ErrBattleEnded + } + + payload := pvpwire.BanPickSubmitPayload{ + SessionID: sessionID, + UserID: p.Info.UserID, + SelectedCatchTimes: append([]uint32(nil), selected...), + BanCatchTimes: append([]uint32(nil), banned...), + } + + if s.payload.Host.RuntimeServerID == localRuntimeServerID() { + m.applyBanPickSubmit(payload) + return 0 + } + + return publishServerMessage(pvpwire.ServerTopic(s.payload.Host.RuntimeServerID), pvpwire.MessageTypeBanPickSubmit, payload) +} + +func HandleRedisMessage(channel, raw string) { + Default().handleRedisMessage(channel, raw) +} + +func (m *manager) handleRedisMessage(channel, raw string) { + var envelope pvpwire.Envelope + if err := json.Unmarshal([]byte(raw), &envelope); err != nil { + cool.Logger.Warning(context.Background(), "pvp redis payload parse failed", err, raw) + return + } + + switch envelope.Type { + case pvpwire.MessageTypeQueueJoin: + if !isCoordinator() { + return + } + var payload pvpwire.QueueJoinPayload + if decodeBody(envelope.Body, &payload) { + m.handleQueueJoin(payload) + } + case pvpwire.MessageTypeQueueCancel: + if !isCoordinator() { + return + } + var payload pvpwire.QueueCancelPayload + if decodeBody(envelope.Body, &payload) { + m.handleQueueCancel(payload) + } + case pvpwire.MessageTypeMatchFound: + var payload pvpwire.MatchFoundPayload + if decodeBody(envelope.Body, &payload) { + m.handleMatchFound(payload) + } + case pvpwire.MessageTypeBanPickSubmit: + var payload pvpwire.BanPickSubmitPayload + if decodeBody(envelope.Body, &payload) { + m.applyBanPickSubmit(payload) + } + case pvpwire.MessageTypeBattleCommand: + var payload pvpwire.BattleCommandPayload + if decodeBody(envelope.Body, &payload) { + m.handleBattleCommand(payload) + } + case pvpwire.MessageTypePacketRelay: + var payload pvpwire.PacketRelayPayload + if decodeBody(envelope.Body, &payload) { + m.handlePacketRelay(payload) + } + case pvpwire.MessageTypeSessionClose: + var payload pvpwire.SessionClosePayload + if decodeBody(envelope.Body, &payload) { + m.closeSession(payload.SessionID, payload.Reason) + } + } +} + +func (m *manager) queueHeartbeatLoop(p *player.Player, ticket *localQueueTicket) { + ticker := time.NewTicker(queueHeartbeat) + defer ticker.Stop() + + send := func() bool { + if p == nil || p.Info == nil { + return false + } + payload := pvpwire.QueueJoinPayload{ + Player: pvpwire.QueuePlayerSnapshot{ + RuntimeServerID: ticket.runtimeServerID, + UserID: p.Info.UserID, + Nick: p.Info.Nick, + FightMode: ticket.fightMode, + Status: ticket.status, + JoinedAtUnix: time.Now().Unix(), + CatchTimes: filterAvailableCatchTimes(p.GetPetInfo(0)), + }, + } + if err := publishServerMessage(pvpwire.CoordinatorTopicPrefix, pvpwire.MessageTypeQueueJoin, payload); err != nil { + cool.Logger.Warning(context.Background(), "peak queue publish failed", err) + } + return true + } + + if !send() { + return + } + + for { + select { + case <-ticket.stop: + return + case <-ticker.C: + send() + } + } +} + +func (m *manager) handleQueueJoin(payload pvpwire.QueueJoinPayload) { + now := time.Now() + m.mu.Lock() + defer m.mu.Unlock() + + m.pruneExpiredQueueLocked(now) + playerInfo := payload.Player + m.lastSeen[playerInfo.UserID] = now + queue := m.queues[playerInfo.FightMode] + + for idx, queued := range queue { + if queued.UserID == playerInfo.UserID { + queue[idx] = playerInfo + m.queues[playerInfo.FightMode] = queue + return + } + } + + if len(queue) > 0 { + host := queue[0] + queue = queue[1:] + m.queues[playerInfo.FightMode] = queue + delete(m.lastSeen, host.UserID) + delete(m.lastSeen, playerInfo.UserID) + + sessionID := buildSessionID(host.UserID, playerInfo.UserID) + match := pvpwire.MatchFoundPayload{ + SessionID: sessionID, + Stage: pvpwire.StageBanPick, + Host: host, + Guest: playerInfo, + BanPickTimeout: uint32(banPickTimeout / time.Second), + } + _ = publishServerMessage(pvpwire.ServerTopic(host.RuntimeServerID), pvpwire.MessageTypeMatchFound, match) + if playerInfo.RuntimeServerID != host.RuntimeServerID { + _ = publishServerMessage(pvpwire.ServerTopic(playerInfo.RuntimeServerID), pvpwire.MessageTypeMatchFound, match) + } + return + } + + m.queues[playerInfo.FightMode] = append(queue, playerInfo) +} + +func (m *manager) handleQueueCancel(payload pvpwire.QueueCancelPayload) { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.lastSeen, payload.UserID) + for mode, queue := range m.queues { + next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) + for _, queued := range queue { + if queued.UserID == payload.UserID { + continue + } + next = append(next, queued) + } + m.queues[mode] = next + } +} + +func (m *manager) handleMatchFound(payload pvpwire.MatchFoundPayload) { + m.mu.Lock() + if _, ok := m.sessions[payload.SessionID]; ok { + m.mu.Unlock() + return + } + s := &session{ + payload: payload, + stage: pvpwire.StageBanPick, + banPickDeadline: time.Now().Add(time.Duration(payload.BanPickTimeout) * time.Second), + } + m.sessions[payload.SessionID] = s + m.userSession[payload.Host.UserID] = payload.SessionID + m.userSession[payload.Guest.UserID] = payload.SessionID + if queued := m.localQueues[payload.Host.UserID]; queued != nil { + queued.Stop() + delete(m.localQueues, payload.Host.UserID) + } + if queued := m.localQueues[payload.Guest.UserID]; queued != nil { + queued.Stop() + delete(m.localQueues, payload.Guest.UserID) + } + m.mu.Unlock() + + if local := playerByUserID(payload.Host.UserID); local != nil { + local.Fightinfo.Mode = payload.Host.FightMode + local.Fightinfo.Status = payload.Host.Status + local.SendPackCmd(banPickStartCmd, newBanPickStartInfo(payload, payload.Host, payload.Guest)) + } + if local := playerByUserID(payload.Guest.UserID); local != nil { + local.Fightinfo.Mode = payload.Guest.FightMode + local.Fightinfo.Status = payload.Guest.Status + if payload.Host.RuntimeServerID != payload.Guest.RuntimeServerID { + local.SetFightC(NewRemoteFightProxy(payload.SessionID, payload.Host.RuntimeServerID, payload.Guest.UserID)) + } + local.SendPackCmd(banPickStartCmd, newBanPickStartInfo(payload, payload.Guest, payload.Host)) + } + + if payload.Host.RuntimeServerID == localRuntimeServerID() { + time.AfterFunc(time.Duration(payload.BanPickTimeout)*time.Second, func() { + m.mu.RLock() + sessionID := m.userSession[payload.Host.UserID] + current := m.sessions[sessionID] + m.mu.RUnlock() + if current == nil || current.stage != pvpwire.StageBanPick { + return + } + m.launchBattle(payload.SessionID) + }) + } +} + +func (m *manager) applyBanPickSubmit(payload pvpwire.BanPickSubmitPayload) { + m.mu.Lock() + s := m.sessions[payload.SessionID] + if s == nil || s.stage != pvpwire.StageBanPick { + m.mu.Unlock() + return + } + if s.payload.Host.UserID == payload.UserID { + copyPayload := payload + s.hostSubmit = ©Payload + } else if s.payload.Guest.UserID == payload.UserID { + copyPayload := payload + s.guestSubmit = ©Payload + } + ready := s.hostSubmit != nil && s.guestSubmit != nil + m.mu.Unlock() + + if ready { + m.launchBattle(payload.SessionID) + } +} + +func (m *manager) launchBattle(sessionID string) { + m.mu.Lock() + s := m.sessions[sessionID] + if s == nil || s.stage != pvpwire.StageBanPick { + m.mu.Unlock() + return + } + s.stage = pvpwire.StageFighting + hostSnapshot := s.payload.Host + guestSnapshot := s.payload.Guest + hostSubmit := s.hostSubmit + guestSubmit := s.guestSubmit + m.mu.Unlock() + + hostPlayer := playerByUserID(hostSnapshot.UserID) + if hostPlayer == nil { + m.closeSession(sessionID, "host_offline") + return + } + hostPlayer.Fightinfo.Mode = hostSnapshot.FightMode + hostPlayer.Fightinfo.Status = hostSnapshot.Status + + petLimit := battlePetLimit(hostSnapshot.FightMode) + hostPets := buildBattlePets(hostSnapshot.CatchTimes, hostSubmit, guestSubmit, petLimit) + guestPets := buildBattlePets(guestSnapshot.CatchTimes, guestSubmit, hostSubmit, petLimit) + + var ( + fc common.FightI + fightErr errorcode.ErrorCode + ) + + if hostSnapshot.RuntimeServerID == guestSnapshot.RuntimeServerID { + guestPlayer := playerByUserID(guestSnapshot.UserID) + if guestPlayer == nil { + m.closeSession(sessionID, "guest_offline") + return + } + guestPlayer.Fightinfo.Mode = guestSnapshot.FightMode + guestPlayer.Fightinfo.Status = guestSnapshot.Status + fc, fightErr = fight.NewFight(hostPlayer, guestPlayer, hostPets, guestPets, func(model.FightOverInfo) { + m.onBattleFinished(sessionID) + }) + } else { + remote := player.NewRPCPlayer(&model.PlayerInfo{ + UserID: guestSnapshot.UserID, + Nick: guestSnapshot.Nick, + PetList: append([]model.PetInfo(nil), guestPets...), + }, guestSnapshot.RuntimeServerID, fightinfo.Fightinfo{ + Mode: guestSnapshot.FightMode, + Status: guestSnapshot.Status, + }, sessionID) + fc, fightErr = fight.NewFight(hostPlayer, remote, hostPets, guestPets, func(model.FightOverInfo) { + m.onBattleFinished(sessionID) + }) + m.mu.Lock() + if current := m.sessions[sessionID]; current != nil { + current.remoteProxy = remote + } + m.mu.Unlock() + } + + if fightErr != 0 { + m.closeSession(sessionID, "create_fight_failed") + return + } + + m.mu.Lock() + if current := m.sessions[sessionID]; current != nil { + current.hostedFight = fc + } + m.mu.Unlock() +} + +func (m *manager) handleBattleCommand(payload pvpwire.BattleCommandPayload) { + m.mu.RLock() + s := m.sessions[payload.SessionID] + var fightController common.FightI + var proxy common.PlayerI + if s != nil { + fightController = s.hostedFight + proxy = s.remoteProxy + } + m.mu.RUnlock() + if fightController == nil { + return + } + if proxy == nil { + proxy = player.NewRPCPlayer(&model.PlayerInfo{UserID: payload.UserID}, 0, fightinfo.Fightinfo{}, payload.SessionID) + } + + switch payload.Command { + case pvpwire.CommandReady: + go fightController.ReadyFight(proxy) + case pvpwire.CommandUseSkill: + go fightController.UseSkill(proxy, payload.SkillID) + case pvpwire.CommandUseSkillAt: + go fightController.UseSkillAt(proxy, payload.SkillID, payload.ActorIndex, payload.TargetIndex) + case pvpwire.CommandEscape: + go fightController.Over(proxy, model.EnumBattleOverReason(payload.Reason)) + case pvpwire.CommandChangePet: + go fightController.ChangePet(proxy, payload.CatchTime) + case pvpwire.CommandChangePetAt: + go fightController.ChangePetAt(proxy, payload.CatchTime, payload.ActorIndex) + case pvpwire.CommandLoadPercent: + go fightController.LoadPercent(proxy, payload.Percent) + case pvpwire.CommandUseItem: + go fightController.UseItem(proxy, payload.CatchTime, payload.ItemID) + case pvpwire.CommandUseItemAt: + go fightController.UseItemAt(proxy, payload.CatchTime, payload.ItemID, payload.ActorIndex, payload.TargetIndex) + case pvpwire.CommandChat: + go fightController.Chat(proxy, payload.Message) + } +} + +func (m *manager) handlePacketRelay(payload pvpwire.PacketRelayPayload) { + packet, err := base64.StdEncoding.DecodeString(payload.Packet) + if err != nil { + return + } + local := playerByUserID(payload.UserID) + if local == nil { + return + } + _ = local.SendPack(packet) + if payload.Cmd == 2506 { + local.QuitFight() + } +} + +func (m *manager) onBattleFinished(sessionID string) { + _ = publishServerMessage(pvpwire.ServerTopic(localRuntimeServerID()), pvpwire.MessageTypeSessionClose, pvpwire.SessionClosePayload{ + SessionID: sessionID, + Reason: "battle_finished", + }) + + m.mu.RLock() + s := m.sessions[sessionID] + m.mu.RUnlock() + if s != nil && s.payload.Guest.RuntimeServerID != 0 && s.payload.Guest.RuntimeServerID != localRuntimeServerID() { + _ = publishServerMessage(pvpwire.ServerTopic(s.payload.Guest.RuntimeServerID), pvpwire.MessageTypeSessionClose, pvpwire.SessionClosePayload{ + SessionID: sessionID, + Reason: "battle_finished", + }) + } +} + +func (m *manager) closeSession(sessionID, reason string) { + m.mu.Lock() + s := m.sessions[sessionID] + if s == nil { + m.mu.Unlock() + return + } + delete(m.userSession, s.payload.Host.UserID) + delete(m.userSession, s.payload.Guest.UserID) + delete(m.sessions, sessionID) + m.mu.Unlock() + + if local := playerByUserID(s.payload.Host.UserID); local != nil && reason != "battle_finished" { + local.QuitFight() + } + if local := playerByUserID(s.payload.Guest.UserID); local != nil { + local.QuitFight() + } +} + +func (m *manager) pruneExpiredQueueLocked(now time.Time) { + for mode, queue := range m.queues { + next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) + for _, queued := range queue { + last := m.lastSeen[queued.UserID] + if last.IsZero() || now.Sub(last) > queueTTL { + delete(m.lastSeen, queued.UserID) + continue + } + next = append(next, queued) + } + m.queues[mode] = next + } +} + +func newBanPickStartInfo(match pvpwire.MatchFoundPayload, self, opponent pvpwire.QueuePlayerSnapshot) *fight.CrossServerBanPickStartOutboundInfo { + myPets := buildBanPickPetInfos(self.CatchTimes) + opponentPets := buildBanPickPetInfos(opponent.CatchTimes) + return &fight.CrossServerBanPickStartOutboundInfo{ + SessionID: match.SessionID, + OpponentUserID: opponent.UserID, + OpponentNick: opponent.Nick, + FightMode: self.FightMode, + Status: self.Status, + TimeoutSeconds: match.BanPickTimeout, + SelectableCount: uint32(minInt(battlePetLimit(self.FightMode), len(myPets))), + MyPets: myPets, + OpponentPets: opponentPets, + } +} + +func buildBattlePets(allCatchTimes []uint32, own, opp *pvpwire.BanPickSubmitPayload, limit int) []model.PetInfo { + if len(allCatchTimes) == 0 { + return nil + } + banned := make(map[uint32]struct{}) + if opp != nil { + for _, catchTime := range opp.BanCatchTimes { + if catchTime == 0 { + continue + } + banned[catchTime] = struct{}{} + } + } + + filtered := make([]uint32, 0, len(allCatchTimes)) + for _, catchTime := range allCatchTimes { + if catchTime == 0 { + continue + } + if _, ok := banned[catchTime]; ok { + continue + } + filtered = append(filtered, catchTime) + } + if len(filtered) == 0 { + filtered = append(filtered, allCatchTimes...) + } + + if own == nil || len(own.SelectedCatchTimes) == 0 { + return resolveBattlePets(filtered, limit) + } + + selectedMap := make(map[uint32]struct{}, len(filtered)) + for _, catchTime := range filtered { + selectedMap[catchTime] = struct{}{} + } + + selectedCatchTimes := make([]uint32, 0, len(own.SelectedCatchTimes)) + used := make(map[uint32]struct{}, len(own.SelectedCatchTimes)) + for _, catchTime := range own.SelectedCatchTimes { + if catchTime == 0 { + continue + } + if _, ok := selectedMap[catchTime]; !ok { + continue + } + if _, exists := used[catchTime]; exists { + continue + } + used[catchTime] = struct{}{} + selectedCatchTimes = append(selectedCatchTimes, catchTime) + if limit > 0 && len(selectedCatchTimes) >= limit { + break + } + } + if len(selectedCatchTimes) == 0 { + return resolveBattlePets(filtered, limit) + } + return resolveBattlePets(selectedCatchTimes, limit) +} + +func resolveBattlePets(catchTimes []uint32, limit int) []model.PetInfo { + clampedCatchTimes := clampCatchTimes(catchTimes, limit) + if len(clampedCatchTimes) == 0 { + return nil + } + result := make([]model.PetInfo, 0, len(clampedCatchTimes)) + petService := blservice.NewPetService(0) + for _, catchTime := range clampedCatchTimes { + pet := petService.PetInfoOneByCatchTime(catchTime) + if pet == nil || pet.Data.ID == 0 { + continue + } + result = append(result, pet.Data) + } + return result +} + +func clampCatchTimes(catchTimes []uint32, limit int) []uint32 { + if len(catchTimes) == 0 { + return nil + } + if limit <= 0 || len(catchTimes) <= limit { + return append([]uint32(nil), catchTimes...) + } + return append([]uint32(nil), catchTimes[:limit]...) +} + +func filterAvailableCatchTimes(pets []model.PetInfo) []uint32 { + result := make([]uint32, 0, len(pets)) + used := make(map[uint32]struct{}, len(pets)) + for _, pet := range pets { + if pet.Hp == 0 || pet.CatchTime == 0 { + continue + } + if _, exists := used[pet.CatchTime]; exists { + continue + } + used[pet.CatchTime] = struct{}{} + result = append(result, pet.CatchTime) + } + return result +} + +func buildBanPickPetInfos(catchTimes []uint32) []fight.CrossServerBanPickPetInfo { + result := make([]fight.CrossServerBanPickPetInfo, 0, len(catchTimes)) + petService := blservice.NewPetService(0) + for _, catchTime := range catchTimes { + pet := petService.PetInfoOneByCatchTime(catchTime) + if pet == nil || pet.Data.ID == 0 { + continue + } + result = append(result, fight.CrossServerBanPickPetInfo{ + CatchTime: pet.Data.CatchTime, + PetID: pet.Data.ID, + Name: pet.Data.Name, + Level: pet.Data.Level, + Hp: pet.Data.Hp, + MaxHp: pet.Data.MaxHp, + }) + } + return result +} + +func minInt(a, b int) int { + if a < b { + return a + } + return b +} + +func battlePetLimit(fightMode uint32) int { + if fightMode == fightinfo.BattleMode.SINGLE_MODE { + return 1 + } + return 6 +} + +func normalizePeakMode(requested uint32) (fightMode uint32, status uint32, err errorcode.ErrorCode) { + switch requested { + case 19: + return fightinfo.BattleMode.SINGLE_MODE, requested, 0 + case 20: + return fightinfo.BattleMode.MULTI_MODE, requested, 0 + default: + return 0, 0, errorcode.ErrorCodes.ErrSystemError + } +} + +func playerByUserID(userID uint32) *player.Player { + client, ok := player.Mainplayer.Load(userID) + if !ok || client == nil { + return nil + } + return client.Player +} + +func buildSessionID(hostUserID, guestUserID uint32) string { + return fmt.Sprintf("xsvr-%d-%d-%d", hostUserID, guestUserID, time.Now().UnixNano()) +} + +func publishServerMessage(topic, msgType string, body any) errorcode.ErrorCode { + payload, err := json.Marshal(body) + if err != nil { + return errorcode.ErrorCodes.ErrSystemBusyTryLater + } + envelope, err := json.Marshal(pvpwire.Envelope{ + Type: msgType, + Body: payload, + }) + if err != nil { + return errorcode.ErrorCodes.ErrSystemBusyTryLater + } + conn, err := cool.Redis.Conn(context.Background()) + if err != nil { + return errorcode.ErrorCodes.ErrSystemBusyTryLater + } + defer conn.Close(context.Background()) + _, err = conn.Do(context.Background(), "publish", topic, envelope) + if err != nil { + return errorcode.ErrorCodes.ErrSystemBusyTryLater + } + return 0 +} + +func decodeBody(body []byte, target any) bool { + return json.Unmarshal(body, target) == nil +} + +func isCoordinator() bool { + return cool.Config.GameOnlineID == CoordinatorOnlineID +} + +func localRuntimeServerID() uint32 { + id, _ := strconv.ParseUint(strings.TrimSpace(cool.Config.ServerInfo.GetID()), 10, 32) + return uint32(id) +} diff --git a/logic/service/fight/pvpwire/types.go b/logic/service/fight/pvpwire/types.go new file mode 100644 index 000000000..9b8015b46 --- /dev/null +++ b/logic/service/fight/pvpwire/types.go @@ -0,0 +1,109 @@ +package pvpwire + +import ( + "fmt" +) + +const ( + CoordinatorTopicPrefix = "sun:pvp:coordinator" + ServerTopicPrefix = "sun:pvp:server" +) + +const ( + MessageTypeQueueJoin = "queue_join" + MessageTypeQueueCancel = "queue_cancel" + MessageTypeMatchFound = "match_found" + MessageTypeBanPickSubmit = "ban_pick_submit" + MessageTypeBattleCommand = "battle_command" + MessageTypePacketRelay = "packet_relay" + MessageTypeSessionClose = "session_close" +) + +const ( + StageQueued = "queued" + StageBanPick = "ban_pick" + StageFighting = "fighting" + StageClosed = "closed" +) + +const ( + CommandReady = "ready" + CommandUseSkill = "use_skill" + CommandUseSkillAt = "use_skill_at" + CommandEscape = "escape" + CommandChangePet = "change_pet" + CommandChangePetAt = "change_pet_at" + CommandLoadPercent = "load_percent" + CommandUseItem = "use_item" + CommandUseItemAt = "use_item_at" + CommandChat = "chat" +) + +type Envelope struct { + Type string `json:"type"` + Body []byte `json:"body"` +} + +type QueuePlayerSnapshot struct { + RuntimeServerID uint32 `json:"runtimeServerId"` + UserID uint32 `json:"userId"` + Nick string `json:"nick"` + FightMode uint32 `json:"fightMode"` + Status uint32 `json:"status"` + JoinedAtUnix int64 `json:"joinedAtUnix"` + CatchTimes []uint32 `json:"catchTimes"` +} + +type QueueJoinPayload struct { + Player QueuePlayerSnapshot `json:"player"` +} + +type QueueCancelPayload struct { + RuntimeServerID uint32 `json:"runtimeServerId"` + UserID uint32 `json:"userId"` +} + +type MatchFoundPayload struct { + SessionID string `json:"sessionId"` + Stage string `json:"stage"` + Host QueuePlayerSnapshot `json:"host"` + Guest QueuePlayerSnapshot `json:"guest"` + BanPickTimeout uint32 `json:"banPickTimeout"` +} + +type BanPickSubmitPayload struct { + SessionID string `json:"sessionId"` + UserID uint32 `json:"userId"` + SelectedCatchTimes []uint32 `json:"selectedCatchTimes"` + BanCatchTimes []uint32 `json:"banCatchTimes"` +} + +type BattleCommandPayload struct { + SessionID string `json:"sessionId"` + UserID uint32 `json:"userId"` + Command string `json:"command"` + SkillID uint32 `json:"skillId,omitempty"` + ActorIndex int `json:"actorIndex,omitempty"` + TargetIndex int `json:"targetIndex,omitempty"` + CatchTime uint32 `json:"catchTime,omitempty"` + ItemID uint32 `json:"itemId,omitempty"` + Percent int32 `json:"percent,omitempty"` + Message string `json:"message,omitempty"` + Reason uint32 `json:"reason,omitempty"` +} + +type PacketRelayPayload struct { + SessionID string `json:"sessionId"` + UserID uint32 `json:"userId"` + Cmd uint32 `json:"cmd"` + Packet string `json:"packet"` +} + +type SessionClosePayload struct { + SessionID string `json:"sessionId"` + Reason string `json:"reason"` +} + +func ServerTopic(runtimeServerID uint32) string { + return fmt.Sprintf("%s:%d", ServerTopicPrefix, runtimeServerID) +} diff --git a/logic/service/player/rpc.go b/logic/service/player/rpc.go index ca47dc2aa..37ba093e8 100644 --- a/logic/service/player/rpc.go +++ b/logic/service/player/rpc.go @@ -1,24 +1,71 @@ package player import ( + "blazing/common/socket/errorcode" "blazing/cool" + "blazing/logic/service/common" + "blazing/logic/service/fight/info" + "blazing/logic/service/fight/pvpwire" + "blazing/modules/player/model" "context" + "encoding/base64" + "encoding/json" ) -// rpc,跨服匹配的玩家,只做数据的传输 +// RPC_player 是跨服战斗中的远端玩家代理,只承载快照和发包能力。 type RPC_player struct { baseplayer - // - serviceid uint32 //玩家所在的ID + serviceid uint32 + fightinfo info.Fightinfo + sessionID string +} + +func NewRPCPlayer(info *model.PlayerInfo, serviceID uint32, fightInfo info.Fightinfo, sessionID string) *RPC_player { + ret := &RPC_player{ + baseplayer: newbaseplayer(), + serviceid: serviceID, + fightinfo: fightInfo, + sessionID: sessionID, + } + ret.Info = info + return ret } func (f *RPC_player) SendPackCmd(cmd uint32, data any) { + if f == nil || f.Info == nil || f.serviceid == 0 { + return + } - conn, _ := cool.Redis.Conn(context.TODO()) + packet := common.NewTomeeHeader(cmd, f.Info.UserID).Pack(data) + payload, err := json.Marshal(pvpwire.PacketRelayPayload{ + SessionID: f.sessionID, + UserID: f.Info.UserID, + Cmd: cmd, + Packet: base64.StdEncoding.EncodeToString(packet), + }) + if err != nil { + return + } + envelope, err := json.Marshal(pvpwire.Envelope{ + Type: pvpwire.MessageTypePacketRelay, + Body: payload, + }) + if err != nil { + return + } + conn, err := cool.Redis.Conn(context.TODO()) + if err != nil || conn == nil { + return + } defer conn.Close(context.TODO()) - conn.Do(context.TODO(), "publish", "sun:send", cmd, data) - - //fmt.Println("战斗结束") - + _, _ = conn.Do(context.TODO(), "publish", pvpwire.ServerTopic(f.serviceid), envelope) +} + +func (f *RPC_player) Getfightinfo() info.Fightinfo { + return f.fightinfo +} + +func (f *RPC_player) CanFight() errorcode.ErrorCode { + return 0 }