package pvp import ( "blazing/common/socket/errorcode" "blazing/cool" "blazing/logic/service/common" "blazing/logic/service/fight" fightinfo "blazing/logic/service/fight/info" "blazing/logic/service/fight/pvpwire" "blazing/logic/service/player" "context" "encoding/base64" "encoding/json" "fmt" "strconv" "strings" "sync" "sync/atomic" "time" "blazing/modules/player/model" blservice "blazing/modules/player/service" ) const ( CoordinatorOnlineID = 1 queueHeartbeat = 3 * time.Second queueTTL = 12 * time.Second banPickTimeout = 45 * time.Second banPickStartCmd = 2461 battleLevelCap = 100 ) type localQueueTicket struct { playerID uint32 runtimeServerID uint32 fightMode uint32 status uint32 stop chan struct{} stopped uint32 } func (t *localQueueTicket) Stop() { if t == nil { return } if atomic.CompareAndSwapUint32(&t.stopped, 0, 1) { close(t.stop) } } type session struct { payload pvpwire.MatchFoundPayload hostSubmit *pvpwire.BanPickSubmitPayload guestSubmit *pvpwire.BanPickSubmitPayload stage string hostedFight common.FightI remoteProxy common.PlayerI banPickDeadline time.Time } type manager struct { mu sync.RWMutex queues map[uint32][]pvpwire.QueuePlayerSnapshot lastSeen map[uint32]time.Time localQueues map[uint32]*localQueueTicket sessions map[string]*session userSession map[uint32]string } var defaultManager = &manager{ queues: make(map[uint32][]pvpwire.QueuePlayerSnapshot), lastSeen: make(map[uint32]time.Time), localQueues: make(map[uint32]*localQueueTicket), sessions: make(map[string]*session), userSession: make(map[uint32]string), } func Default() *manager { return defaultManager } func JoinPeakQueue(p *player.Player, requestedMode uint32) errorcode.ErrorCode { if p == nil { return errorcode.ErrorCodes.ErrSystemBusyTryLater } if err := p.CanFight(); err != 0 { return err } fightMode, status, err := normalizePeakMode(requestedMode) if err != 0 { return err } p.Fightinfo.Mode = fightMode p.Fightinfo.Status = status return 0 } func CancelPeakQueue(p *player.Player) { if p == nil { return } m := Default() var ticket *localQueueTicket m.mu.Lock() ticket = m.localQueues[p.Info.UserID] delete(m.localQueues, p.Info.UserID) m.mu.Unlock() if ticket != nil { ticket.Stop() } atomic.StoreUint32(&p.Fightinfo.Mode, 0) atomic.StoreUint32(&p.Fightinfo.Status, 0) } func NormalizePeakMode(requested uint32) (fightMode uint32, status uint32, err errorcode.ErrorCode) { return normalizePeakMode(requested) } func AvailableCatchTimes(pets []model.PetInfo) []uint32 { return filterAvailableCatchTimes(pets) } func SubmitBanPick(p *player.Player, selected, banned []uint32) errorcode.ErrorCode { if p == nil { return errorcode.ErrorCodes.ErrSystemBusyTryLater } m := Default() m.mu.RLock() sessionID := m.userSession[p.Info.UserID] s := m.sessions[sessionID] m.mu.RUnlock() if s == nil { return errorcode.ErrorCodes.ErrBattleEnded } payload := pvpwire.BanPickSubmitPayload{ SessionID: sessionID, UserID: p.Info.UserID, SelectedCatchTimes: append([]uint32(nil), selected...), BanCatchTimes: append([]uint32(nil), banned...), } if s.payload.Host.RuntimeServerID == localRuntimeServerID() { m.applyBanPickSubmit(payload) return 0 } return publishServerMessage(pvpwire.ServerTopic(s.payload.Host.RuntimeServerID), pvpwire.MessageTypeBanPickSubmit, payload) } func HandleRedisMessage(channel, raw string) { Default().handleRedisMessage(channel, raw) } func (m *manager) handleRedisMessage(channel, raw string) { var envelope pvpwire.Envelope if err := json.Unmarshal([]byte(raw), &envelope); err != nil { cool.Logger.Warning(context.Background(), "pvp redis payload parse failed", err, raw) return } switch envelope.Type { case pvpwire.MessageTypeQueueJoin: if !isCoordinator() { return } var payload pvpwire.QueueJoinPayload if decodeBody(envelope.Body, &payload) { m.handleQueueJoin(payload) } case pvpwire.MessageTypeQueueCancel: if !isCoordinator() { return } var payload pvpwire.QueueCancelPayload if decodeBody(envelope.Body, &payload) { m.handleQueueCancel(payload) } case pvpwire.MessageTypeMatchFound: var payload pvpwire.MatchFoundPayload if decodeBody(envelope.Body, &payload) { m.handleMatchFound(payload) } case pvpwire.MessageTypeBanPickSubmit: var payload pvpwire.BanPickSubmitPayload if decodeBody(envelope.Body, &payload) { m.applyBanPickSubmit(payload) } case pvpwire.MessageTypeBattleCommand: var payload pvpwire.BattleCommandPayload if decodeBody(envelope.Body, &payload) { m.handleBattleCommand(payload) } case pvpwire.MessageTypePacketRelay: var payload pvpwire.PacketRelayPayload if decodeBody(envelope.Body, &payload) { m.handlePacketRelay(payload) } case pvpwire.MessageTypeSessionClose: var payload pvpwire.SessionClosePayload if decodeBody(envelope.Body, &payload) { m.closeSession(payload.SessionID, payload.Reason) } } } func (m *manager) queueHeartbeatLoop(p *player.Player, ticket *localQueueTicket) { ticker := time.NewTicker(queueHeartbeat) defer ticker.Stop() send := func() bool { if p == nil || p.Info == nil { return false } payload := pvpwire.QueueJoinPayload{ Player: pvpwire.QueuePlayerSnapshot{ RuntimeServerID: ticket.runtimeServerID, UserID: p.Info.UserID, Nick: p.Info.Nick, FightMode: ticket.fightMode, Status: ticket.status, JoinedAtUnix: time.Now().Unix(), CatchTimes: filterAvailableCatchTimes(p.GetPetInfo(0)), }, } if err := publishServerMessage(pvpwire.CoordinatorTopicPrefix, pvpwire.MessageTypeQueueJoin, payload); err != 0 { cool.Logger.Warning(context.Background(), "peak queue publish failed", err) } return true } if !send() { return } for { select { case <-ticket.stop: return case <-ticker.C: send() } } } func (m *manager) handleQueueJoin(payload pvpwire.QueueJoinPayload) { now := time.Now() m.mu.Lock() defer m.mu.Unlock() m.pruneExpiredQueueLocked(now) playerInfo := payload.Player m.lastSeen[playerInfo.UserID] = now queue := m.queues[playerInfo.FightMode] for idx, queued := range queue { if queued.UserID == playerInfo.UserID { queue[idx] = playerInfo m.queues[playerInfo.FightMode] = queue return } } if len(queue) > 0 { host := queue[0] queue = queue[1:] m.queues[playerInfo.FightMode] = queue delete(m.lastSeen, host.UserID) delete(m.lastSeen, playerInfo.UserID) sessionID := buildSessionID(host.UserID, playerInfo.UserID) match := pvpwire.MatchFoundPayload{ SessionID: sessionID, Stage: pvpwire.StageBanPick, Host: host, Guest: playerInfo, BanPickTimeout: uint32(banPickTimeout / time.Second), } _ = publishServerMessage(pvpwire.ServerTopic(host.RuntimeServerID), pvpwire.MessageTypeMatchFound, match) if playerInfo.RuntimeServerID != host.RuntimeServerID { _ = publishServerMessage(pvpwire.ServerTopic(playerInfo.RuntimeServerID), pvpwire.MessageTypeMatchFound, match) } return } m.queues[playerInfo.FightMode] = append(queue, playerInfo) } func (m *manager) handleQueueCancel(payload pvpwire.QueueCancelPayload) { m.mu.Lock() defer m.mu.Unlock() delete(m.lastSeen, payload.UserID) for mode, queue := range m.queues { next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) for _, queued := range queue { if queued.UserID == payload.UserID { continue } next = append(next, queued) } m.queues[mode] = next } } func (m *manager) handleMatchFound(payload pvpwire.MatchFoundPayload) { m.mu.Lock() if _, ok := m.sessions[payload.SessionID]; ok { m.mu.Unlock() return } s := &session{ payload: payload, stage: pvpwire.StageBanPick, banPickDeadline: time.Now().Add(time.Duration(payload.BanPickTimeout) * time.Second), } m.sessions[payload.SessionID] = s m.userSession[payload.Host.UserID] = payload.SessionID m.userSession[payload.Guest.UserID] = payload.SessionID if queued := m.localQueues[payload.Host.UserID]; queued != nil { queued.Stop() delete(m.localQueues, payload.Host.UserID) } if queued := m.localQueues[payload.Guest.UserID]; queued != nil { queued.Stop() delete(m.localQueues, payload.Guest.UserID) } m.mu.Unlock() if local := playerByUserID(payload.Host.UserID); local != nil { local.Fightinfo.Mode = payload.Host.FightMode local.Fightinfo.Status = payload.Host.Status local.SendPackCmd(banPickStartCmd, newBanPickStartInfo(payload, payload.Host, payload.Guest)) } if local := playerByUserID(payload.Guest.UserID); local != nil { local.Fightinfo.Mode = payload.Guest.FightMode local.Fightinfo.Status = payload.Guest.Status if payload.Host.RuntimeServerID != payload.Guest.RuntimeServerID { local.SetFightC(NewRemoteFightProxy(payload.SessionID, payload.Host.RuntimeServerID, payload.Guest.UserID)) } local.SendPackCmd(banPickStartCmd, newBanPickStartInfo(payload, payload.Guest, payload.Host)) } if payload.Host.RuntimeServerID == localRuntimeServerID() { time.AfterFunc(time.Duration(payload.BanPickTimeout)*time.Second, func() { m.mu.RLock() sessionID := m.userSession[payload.Host.UserID] current := m.sessions[sessionID] m.mu.RUnlock() if current == nil || current.stage != pvpwire.StageBanPick { return } m.launchBattle(payload.SessionID) }) } } func (m *manager) applyBanPickSubmit(payload pvpwire.BanPickSubmitPayload) { m.mu.Lock() s := m.sessions[payload.SessionID] if s == nil || s.stage != pvpwire.StageBanPick { m.mu.Unlock() return } if s.payload.Host.UserID == payload.UserID { copyPayload := payload s.hostSubmit = ©Payload } else if s.payload.Guest.UserID == payload.UserID { copyPayload := payload s.guestSubmit = ©Payload } ready := s.hostSubmit != nil && s.guestSubmit != nil m.mu.Unlock() if ready { m.launchBattle(payload.SessionID) } } func (m *manager) launchBattle(sessionID string) { m.mu.Lock() s := m.sessions[sessionID] if s == nil || s.stage != pvpwire.StageBanPick { m.mu.Unlock() return } s.stage = pvpwire.StageFighting hostSnapshot := s.payload.Host guestSnapshot := s.payload.Guest hostSubmit := s.hostSubmit guestSubmit := s.guestSubmit m.mu.Unlock() hostPlayer := playerByUserID(hostSnapshot.UserID) if hostPlayer == nil { m.closeSession(sessionID, "host_offline") return } hostPlayer.Fightinfo.Mode = hostSnapshot.FightMode hostPlayer.Fightinfo.Status = hostSnapshot.Status petLimit := battlePetLimit(hostSnapshot.FightMode) hostPets := buildBattlePets(hostSnapshot.CatchTimes, hostSubmit, guestSubmit, petLimit) guestPets := buildBattlePets(guestSnapshot.CatchTimes, guestSubmit, hostSubmit, petLimit) var ( fc common.FightI fightErr errorcode.ErrorCode ) if hostSnapshot.RuntimeServerID == guestSnapshot.RuntimeServerID { guestPlayer := playerByUserID(guestSnapshot.UserID) if guestPlayer == nil { m.closeSession(sessionID, "guest_offline") return } guestPlayer.Fightinfo.Mode = guestSnapshot.FightMode guestPlayer.Fightinfo.Status = guestSnapshot.Status fc, fightErr = fight.NewFight(hostPlayer, guestPlayer, hostPets, guestPets, func(model.FightOverInfo) { m.onBattleFinished(sessionID) }) } else { remote := player.NewRPCPlayer(&model.PlayerInfo{ UserID: guestSnapshot.UserID, Nick: guestSnapshot.Nick, PetList: append([]model.PetInfo(nil), guestPets...), }, guestSnapshot.RuntimeServerID, fightinfo.Fightinfo{ Mode: guestSnapshot.FightMode, Status: guestSnapshot.Status, }, sessionID) fc, fightErr = fight.NewFight(hostPlayer, remote, hostPets, guestPets, func(model.FightOverInfo) { m.onBattleFinished(sessionID) }) m.mu.Lock() if current := m.sessions[sessionID]; current != nil { current.remoteProxy = remote } m.mu.Unlock() } if fightErr != 0 { m.closeSession(sessionID, "create_fight_failed") return } m.mu.Lock() if current := m.sessions[sessionID]; current != nil { current.hostedFight = fc } m.mu.Unlock() } func (m *manager) handleBattleCommand(payload pvpwire.BattleCommandPayload) { m.mu.RLock() s := m.sessions[payload.SessionID] var fightController common.FightI var proxy common.PlayerI if s != nil { fightController = s.hostedFight proxy = s.remoteProxy } m.mu.RUnlock() if fightController == nil { return } if proxy == nil { proxy = player.NewRPCPlayer(&model.PlayerInfo{UserID: payload.UserID}, 0, fightinfo.Fightinfo{}, payload.SessionID) } switch payload.Command { case pvpwire.CommandReady: go fightController.ReadyFight(proxy) case pvpwire.CommandUseSkill: go fightController.UseSkill(proxy, payload.SkillID) case pvpwire.CommandUseSkillAt: go fightController.UseSkillAt(proxy, payload.SkillID, payload.ActorIndex, payload.TargetIndex) case pvpwire.CommandEscape: go fightController.Over(proxy, model.EnumBattleOverReason(payload.Reason)) case pvpwire.CommandChangePet: go fightController.ChangePet(proxy, payload.CatchTime) case pvpwire.CommandChangePetAt: go fightController.ChangePetAt(proxy, payload.CatchTime, payload.ActorIndex) case pvpwire.CommandLoadPercent: go fightController.LoadPercent(proxy, payload.Percent) case pvpwire.CommandUseItem: go fightController.UseItem(proxy, payload.CatchTime, payload.ItemID) case pvpwire.CommandUseItemAt: go fightController.UseItemAt(proxy, payload.CatchTime, payload.ItemID, payload.ActorIndex, payload.TargetIndex) case pvpwire.CommandChat: go fightController.Chat(proxy, payload.Message) } } func (m *manager) handlePacketRelay(payload pvpwire.PacketRelayPayload) { packet, err := base64.StdEncoding.DecodeString(payload.Packet) if err != nil { return } local := playerByUserID(payload.UserID) if local == nil { return } _ = local.SendPack(packet) if payload.Cmd == 2506 { local.QuitFight() } } func (m *manager) onBattleFinished(sessionID string) { _ = publishServerMessage(pvpwire.ServerTopic(localRuntimeServerID()), pvpwire.MessageTypeSessionClose, pvpwire.SessionClosePayload{ SessionID: sessionID, Reason: "battle_finished", }) m.mu.RLock() s := m.sessions[sessionID] m.mu.RUnlock() if s != nil && s.payload.Guest.RuntimeServerID != 0 && s.payload.Guest.RuntimeServerID != localRuntimeServerID() { _ = publishServerMessage(pvpwire.ServerTopic(s.payload.Guest.RuntimeServerID), pvpwire.MessageTypeSessionClose, pvpwire.SessionClosePayload{ SessionID: sessionID, Reason: "battle_finished", }) } } func (m *manager) closeSession(sessionID, reason string) { m.mu.Lock() s := m.sessions[sessionID] if s == nil { m.mu.Unlock() return } delete(m.userSession, s.payload.Host.UserID) delete(m.userSession, s.payload.Guest.UserID) delete(m.sessions, sessionID) m.mu.Unlock() if local := playerByUserID(s.payload.Host.UserID); local != nil && reason != "battle_finished" { local.QuitFight() } if local := playerByUserID(s.payload.Guest.UserID); local != nil { local.QuitFight() } } func (m *manager) pruneExpiredQueueLocked(now time.Time) { for mode, queue := range m.queues { next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) for _, queued := range queue { last := m.lastSeen[queued.UserID] if last.IsZero() || now.Sub(last) > queueTTL { delete(m.lastSeen, queued.UserID) continue } next = append(next, queued) } m.queues[mode] = next } } func newBanPickStartInfo(match pvpwire.MatchFoundPayload, self, opponent pvpwire.QueuePlayerSnapshot) *fight.CrossServerBanPickStartOutboundInfo { myPets := buildBanPickPetInfos(self.CatchTimes) opponentPets := buildBanPickPetInfos(opponent.CatchTimes) return &fight.CrossServerBanPickStartOutboundInfo{ SessionID: match.SessionID, OpponentUserID: opponent.UserID, OpponentNick: opponent.Nick, FightMode: self.FightMode, Status: self.Status, TimeoutSeconds: match.BanPickTimeout, SelectableCount: uint32(minInt(battlePetLimit(self.FightMode), len(myPets))), MyPets: myPets, OpponentPets: opponentPets, } } func buildBattlePets(allCatchTimes []uint32, own, opp *pvpwire.BanPickSubmitPayload, limit int) []model.PetInfo { if len(allCatchTimes) == 0 { return nil } banned := make(map[uint32]struct{}) if opp != nil { for _, catchTime := range opp.BanCatchTimes { if catchTime == 0 { continue } banned[catchTime] = struct{}{} } } filtered := make([]uint32, 0, len(allCatchTimes)) for _, catchTime := range allCatchTimes { if catchTime == 0 { continue } if _, ok := banned[catchTime]; ok { continue } filtered = append(filtered, catchTime) } if len(filtered) == 0 { filtered = append(filtered, allCatchTimes...) } if own == nil || len(own.SelectedCatchTimes) == 0 { return resolveBattlePets(filtered, limit) } selectedMap := make(map[uint32]struct{}, len(filtered)) for _, catchTime := range filtered { selectedMap[catchTime] = struct{}{} } selectedCatchTimes := make([]uint32, 0, len(own.SelectedCatchTimes)) used := make(map[uint32]struct{}, len(own.SelectedCatchTimes)) for _, catchTime := range own.SelectedCatchTimes { if catchTime == 0 { continue } if _, ok := selectedMap[catchTime]; !ok { continue } if _, exists := used[catchTime]; exists { continue } used[catchTime] = struct{}{} selectedCatchTimes = append(selectedCatchTimes, catchTime) if limit > 0 && len(selectedCatchTimes) >= limit { break } } if len(selectedCatchTimes) == 0 { return resolveBattlePets(filtered, limit) } return resolveBattlePets(selectedCatchTimes, limit) } func resolveBattlePets(catchTimes []uint32, limit int) []model.PetInfo { clampedCatchTimes := clampCatchTimes(catchTimes, limit) if len(clampedCatchTimes) == 0 { return nil } result := make([]model.PetInfo, 0, len(clampedCatchTimes)) petService := blservice.NewPetService(0) for _, catchTime := range clampedCatchTimes { pet := petService.PetInfoOneByCatchTime(catchTime) if pet == nil || pet.Data.ID == 0 { continue } petInfo := pet.Data if petInfo.Level > battleLevelCap { petInfo.Level = battleLevelCap } result = append(result, petInfo) } return result } func clampCatchTimes(catchTimes []uint32, limit int) []uint32 { if len(catchTimes) == 0 { return nil } if limit <= 0 || len(catchTimes) <= limit { return append([]uint32(nil), catchTimes...) } return append([]uint32(nil), catchTimes[:limit]...) } func filterAvailableCatchTimes(pets []model.PetInfo) []uint32 { result := make([]uint32, 0, len(pets)) used := make(map[uint32]struct{}, len(pets)) for _, pet := range pets { if pet.Hp == 0 || pet.CatchTime == 0 { continue } if _, exists := used[pet.CatchTime]; exists { continue } used[pet.CatchTime] = struct{}{} result = append(result, pet.CatchTime) } return result } func buildBanPickPetInfos(catchTimes []uint32) []fight.CrossServerBanPickPetInfo { result := make([]fight.CrossServerBanPickPetInfo, 0, len(catchTimes)) petService := blservice.NewPetService(0) for _, catchTime := range catchTimes { pet := petService.PetInfoOneByCatchTime(catchTime) if pet == nil || pet.Data.ID == 0 { continue } result = append(result, fight.CrossServerBanPickPetInfo{ CatchTime: pet.Data.CatchTime, PetID: pet.Data.ID, Name: pet.Data.Name, Level: pet.Data.Level, Hp: pet.Data.Hp, MaxHp: pet.Data.MaxHp, }) } return result } func minInt(a, b int) int { if a < b { return a } return b } func battlePetLimit(fightMode uint32) int { if fightMode == fightinfo.BattleMode.SINGLE_MODE { return 1 } return 6 } func normalizePeakMode(requested uint32) (fightMode uint32, status uint32, err errorcode.ErrorCode) { switch requested { case 19: return fightinfo.BattleMode.SINGLE_MODE, requested, 0 case 20: return fightinfo.BattleMode.MULTI_MODE, requested, 0 default: return 0, 0, errorcode.ErrorCodes.ErrSystemError } } func playerByUserID(userID uint32) *player.Player { client, ok := player.Mainplayer.Load(userID) if !ok || client == nil { return nil } return client.Player } func buildSessionID(hostUserID, guestUserID uint32) string { return fmt.Sprintf("xsvr-%d-%d-%d", hostUserID, guestUserID, time.Now().UnixNano()) } func publishServerMessage(topic, msgType string, body any) errorcode.ErrorCode { payload, err := json.Marshal(body) if err != nil { return errorcode.ErrorCodes.ErrSystemBusyTryLater } envelope, err := json.Marshal(pvpwire.Envelope{ Type: msgType, Body: payload, }) if err != nil { return errorcode.ErrorCodes.ErrSystemBusyTryLater } conn, err := cool.Redis.Conn(context.Background()) if err != nil { return errorcode.ErrorCodes.ErrSystemBusyTryLater } defer conn.Close(context.Background()) _, err = conn.Do(context.Background(), "publish", topic, envelope) if err != nil { return errorcode.ErrorCodes.ErrSystemBusyTryLater } return 0 } func decodeBody(body []byte, target any) bool { return json.Unmarshal(body, target) == nil } func isCoordinator() bool { return cool.Config.GameOnlineID == CoordinatorOnlineID } func localRuntimeServerID() uint32 { id, _ := strconv.ParseUint(strings.TrimSpace(cool.Config.ServerInfo.GetID()), 10, 32) return uint32(id) }