package rpc import ( "blazing/cool" "blazing/logic/service/fight/pvpwire" "context" "encoding/json" "fmt" "sync" "time" ) const ( pvpMatchQueueTTL = 12 * time.Second pvpMatchBanPickSecond = 45 ) type PVPMatchJoinPayload struct { RuntimeServerID uint32 `json:"runtimeServerId"` UserID uint32 `json:"userId"` Nick string `json:"nick"` FightMode uint32 `json:"fightMode"` Status uint32 `json:"status"` IsVip uint32 `json:"isVip"` IsDebug uint8 `json:"isDebug"` CatchTimes []uint32 `json:"catchTimes"` } type pvpMatchQueueKey struct { FightMode uint32 IsVip uint32 IsDebug uint8 } type pvpMatchCoordinator struct { mu sync.Mutex queues map[pvpMatchQueueKey][]pvpwire.QueuePlayerSnapshot lastSeen map[uint32]time.Time } var defaultPVPMatchCoordinator = &pvpMatchCoordinator{ queues: make(map[pvpMatchQueueKey][]pvpwire.QueuePlayerSnapshot), lastSeen: make(map[uint32]time.Time), } func DefaultPVPMatchCoordinator() *pvpMatchCoordinator { return defaultPVPMatchCoordinator } func (m *pvpMatchCoordinator) JoinOrUpdate(payload PVPMatchJoinPayload) error { if payload.UserID == 0 || payload.RuntimeServerID == 0 || payload.FightMode == 0 { return fmt.Errorf("invalid pvp match payload: uid=%d server=%d mode=%d", payload.UserID, payload.RuntimeServerID, payload.FightMode) } now := time.Now() player := pvpwire.QueuePlayerSnapshot{ RuntimeServerID: payload.RuntimeServerID, UserID: payload.UserID, Nick: payload.Nick, FightMode: payload.FightMode, Status: payload.Status, IsVip: payload.IsVip, IsDebug: payload.IsDebug, JoinedAtUnix: now.Unix(), CatchTimes: append([]uint32(nil), payload.CatchTimes...), } var match *pvpwire.MatchFoundPayload m.mu.Lock() m.pruneExpiredLocked(now) m.removeUserLocked(payload.UserID) m.lastSeen[payload.UserID] = now queueKey := newPVPMatchQueueKey(player) queue := m.queues[queueKey] if len(queue) > 0 { host := queue[0] queue = queue[1:] m.queues[queueKey] = queue delete(m.lastSeen, host.UserID) delete(m.lastSeen, payload.UserID) result := pvpwire.MatchFoundPayload{ SessionID: buildPVPMatchSessionID(host.UserID, payload.UserID), Stage: pvpwire.StageBanPick, Host: host, Guest: player, BanPickTimeout: pvpMatchBanPickSecond, } match = &result } else { m.queues[queueKey] = append(queue, player) } m.mu.Unlock() if match == nil { return nil } if err := publishPVPMatchMessage(pvpwire.ServerTopic(match.Host.RuntimeServerID), pvpwire.MessageTypeMatchFound, *match); err != nil { return err } if match.Guest.RuntimeServerID != match.Host.RuntimeServerID { if err := publishPVPMatchMessage(pvpwire.ServerTopic(match.Guest.RuntimeServerID), pvpwire.MessageTypeMatchFound, *match); err != nil { return err } } return nil } func (m *pvpMatchCoordinator) Cancel(userID uint32) { if userID == 0 { return } m.mu.Lock() defer m.mu.Unlock() delete(m.lastSeen, userID) m.removeUserLocked(userID) } func (m *pvpMatchCoordinator) pruneExpiredLocked(now time.Time) { for key, queue := range m.queues { next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) for _, queued := range queue { last := m.lastSeen[queued.UserID] if last.IsZero() || now.Sub(last) > pvpMatchQueueTTL { delete(m.lastSeen, queued.UserID) continue } next = append(next, queued) } m.queues[key] = next } } func (m *pvpMatchCoordinator) removeUserLocked(userID uint32) { for key, queue := range m.queues { next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) for _, queued := range queue { if queued.UserID == userID { continue } next = append(next, queued) } m.queues[key] = next } } func newPVPMatchQueueKey(player pvpwire.QueuePlayerSnapshot) pvpMatchQueueKey { return pvpMatchQueueKey{ FightMode: player.FightMode, IsVip: player.IsVip, IsDebug: player.IsDebug, } } func publishPVPMatchMessage(topic, msgType string, body any) error { payload, err := json.Marshal(body) if err != nil { return err } envelope, err := json.Marshal(pvpwire.Envelope{ Type: msgType, Body: payload, }) if err != nil { return err } conn, err := cool.Redis.Conn(context.Background()) if err != nil { return err } defer conn.Close(context.Background()) _, err = conn.Do(context.Background(), "publish", topic, envelope) return err } func buildPVPMatchSessionID(hostUserID, guestUserID uint32) string { return fmt.Sprintf("xsvr-%d-%d-%d", hostUserID, guestUserID, time.Now().UnixNano()) }