diff --git a/common/rpc/pvp_match.go b/common/rpc/pvp_match.go new file mode 100644 index 000000000..b27acbffa --- /dev/null +++ b/common/rpc/pvp_match.go @@ -0,0 +1,163 @@ +package rpc + +import ( + "blazing/cool" + "blazing/logic/service/fight/pvpwire" + "context" + "encoding/json" + "fmt" + "sync" + "time" +) + +const ( + pvpMatchQueueTTL = 12 * time.Second + pvpMatchBanPickSecond = 45 +) + +type PVPMatchJoinPayload struct { + RuntimeServerID uint32 `json:"runtimeServerId"` + UserID uint32 `json:"userId"` + Nick string `json:"nick"` + FightMode uint32 `json:"fightMode"` + Status uint32 `json:"status"` + CatchTimes []uint32 `json:"catchTimes"` +} + +type pvpMatchCoordinator struct { + mu sync.Mutex + queues map[uint32][]pvpwire.QueuePlayerSnapshot + lastSeen map[uint32]time.Time +} + +var defaultPVPMatchCoordinator = &pvpMatchCoordinator{ + queues: make(map[uint32][]pvpwire.QueuePlayerSnapshot), + lastSeen: make(map[uint32]time.Time), +} + +func DefaultPVPMatchCoordinator() *pvpMatchCoordinator { + return defaultPVPMatchCoordinator +} + +func (m *pvpMatchCoordinator) JoinOrUpdate(payload PVPMatchJoinPayload) error { + if payload.UserID == 0 || payload.RuntimeServerID == 0 || payload.FightMode == 0 { + return fmt.Errorf("invalid pvp match payload: uid=%d server=%d mode=%d", payload.UserID, payload.RuntimeServerID, payload.FightMode) + } + + now := time.Now() + player := pvpwire.QueuePlayerSnapshot{ + RuntimeServerID: payload.RuntimeServerID, + UserID: payload.UserID, + Nick: payload.Nick, + FightMode: payload.FightMode, + Status: payload.Status, + JoinedAtUnix: now.Unix(), + CatchTimes: append([]uint32(nil), payload.CatchTimes...), + } + + var match *pvpwire.MatchFoundPayload + + m.mu.Lock() + m.pruneExpiredLocked(now) + m.removeUserLocked(payload.UserID) + m.lastSeen[payload.UserID] = now + + queue := m.queues[payload.FightMode] + if len(queue) > 0 { + host := queue[0] + queue = queue[1:] + m.queues[payload.FightMode] = queue + delete(m.lastSeen, host.UserID) + delete(m.lastSeen, payload.UserID) + + result := pvpwire.MatchFoundPayload{ + SessionID: buildPVPMatchSessionID(host.UserID, payload.UserID), + Stage: pvpwire.StageBanPick, + Host: host, + Guest: player, + BanPickTimeout: pvpMatchBanPickSecond, + } + match = &result + } else { + m.queues[payload.FightMode] = append(queue, player) + } + m.mu.Unlock() + + if match == nil { + return nil + } + if err := publishPVPMatchMessage(pvpwire.ServerTopic(match.Host.RuntimeServerID), pvpwire.MessageTypeMatchFound, *match); err != nil { + return err + } + if match.Guest.RuntimeServerID != match.Host.RuntimeServerID { + if err := publishPVPMatchMessage(pvpwire.ServerTopic(match.Guest.RuntimeServerID), pvpwire.MessageTypeMatchFound, *match); err != nil { + return err + } + } + return nil +} + +func (m *pvpMatchCoordinator) Cancel(userID uint32) { + if userID == 0 { + return + } + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.lastSeen, userID) + m.removeUserLocked(userID) +} + +func (m *pvpMatchCoordinator) pruneExpiredLocked(now time.Time) { + for mode, queue := range m.queues { + next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) + for _, queued := range queue { + last := m.lastSeen[queued.UserID] + if last.IsZero() || now.Sub(last) > pvpMatchQueueTTL { + delete(m.lastSeen, queued.UserID) + continue + } + next = append(next, queued) + } + m.queues[mode] = next + } +} + +func (m *pvpMatchCoordinator) removeUserLocked(userID uint32) { + for mode, queue := range m.queues { + next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue)) + for _, queued := range queue { + if queued.UserID == userID { + continue + } + next = append(next, queued) + } + m.queues[mode] = next + } +} + +func publishPVPMatchMessage(topic, msgType string, body any) error { + payload, err := json.Marshal(body) + if err != nil { + return err + } + envelope, err := json.Marshal(pvpwire.Envelope{ + Type: msgType, + Body: payload, + }) + if err != nil { + return err + } + conn, err := cool.Redis.Conn(context.Background()) + if err != nil { + return err + } + defer conn.Close(context.Background()) + + _, err = conn.Do(context.Background(), "publish", topic, envelope) + return err +} + +func buildPVPMatchSessionID(hostUserID, guestUserID uint32) string { + return fmt.Sprintf("xsvr-%d-%d-%d", hostUserID, guestUserID, time.Now().UnixNano()) +} diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 981cad95c..35abc4abd 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -98,6 +98,15 @@ func (*ServerHandler) RegisterLogic(ctx context.Context, id, port uint32) error } +func (*ServerHandler) MatchJoinOrUpdate(_ context.Context, payload PVPMatchJoinPayload) error { + return DefaultPVPMatchCoordinator().JoinOrUpdate(payload) +} + +func (*ServerHandler) MatchCancel(_ context.Context, userID uint32) error { + DefaultPVPMatchCoordinator().Cancel(userID) + return nil +} + func CServer() *jsonrpc.RPCServer { // create a new server instance rpcServer := jsonrpc.NewServer(jsonrpc.WithReverseClient[cool.ClientHandler]("")) @@ -114,6 +123,10 @@ func StartClient(id, port uint32, callback any) *struct { Kick func(uint32) error RegisterLogic func(uint32, uint32) error + + MatchJoinOrUpdate func(PVPMatchJoinPayload) error + + MatchCancel func(uint32) error } { //cool.Config.File.Domain = "127.0.0.1" var rpcaddr = "ws://" + cool.Config.File.Domain + gconv.String(cool.Config.Address) + "/rpc" @@ -144,6 +157,10 @@ var RPCClient struct { RegisterLogic func(uint32, uint32) error + MatchJoinOrUpdate func(PVPMatchJoinPayload) error + + MatchCancel func(uint32) error + // UserLogin func(int32, int32) error //用户登录事件 // UserLogout func(int32, int32) error //用户登出事件 } diff --git a/docs/pvp-login-rpc-match-design-2026-04-09.md b/docs/pvp-login-rpc-match-design-2026-04-09.md new file mode 100644 index 000000000..63329649a --- /dev/null +++ b/docs/pvp-login-rpc-match-design-2026-04-09.md @@ -0,0 +1,224 @@ +# PVP Match Via RPC, Battle Via Redis + +## 目标 + +本次调整先不解决 `login` 更新期间的排队保活和补偿问题,只收敛到一个更简单、可控的方案: + +- 匹配请求走 `logic -> login` 的同步 RPC +- 对战过程仍走 `logic` 本地战斗 + Redis 转发战斗指令 +- `login` 不可用时,`logic` 直接返回“匹配服务不可用” +- 前端通过轮询重新发起 / 更新匹配请求,不在后端保留离线补偿队列 + +这个方案的核心是:先把“能否立即判断匹配服务可用”做好,不继续依赖 Redis PubSub 做匹配入口。 + +## 当前现状 + +### 现有匹配入口 + +- 前端 `2458` 进入 [logic/controller/fight_巅峰.go](/workspace/logic/controller/fight_巅峰.go#L19) +- 当前 `JoINtop` 直接调用 [logic/service/fight/pvp/service.go](/workspace/logic/service/fight/pvp/service.go#L83) 的 `JoinPeakQueue` +- `JoinPeakQueue` 当前实现是本地建 `localQueueTicket`,并通过 Redis `publish` 发 `queue_join` + +### 现有跨服协调 + +- `logic` 侧订阅 PVP Redis topic 的入口在 [common/rpc/func.go](/workspace/common/rpc/func.go#L153) +- PVP 匹配状态当前存在 `logic/service/fight/pvp/service.go` 的 manager 内存里: + - `queues` + - `lastSeen` + - `localQueues` + - `sessions` + - `userSession` + +### 现有 RPC 能力 + +- `logic` 启动时通过 [common/rpc/rpc.go](/workspace/common/rpc/rpc.go#L113) 建立到 `login` 的 RPC client +- `login` 的 `/rpc/*` 入口绑定在 [modules/base/middleware/middleware.go](/workspace/modules/base/middleware/middleware.go#L152) +- `login` 侧 RPC server 由 [common/rpc/rpc.go](/workspace/common/rpc/rpc.go#L101) 暴露 + +### 当前问题 + +Redis PubSub 适合“广播消息”,不适合“同步判断服务是否可用”。 + +如果继续让匹配入口走 PubSub: + +- `logic` 无法在请求当下知道 `login` 是否真能处理 +- `login` 更新、重启、未订阅时,匹配请求可能直接丢失 +- 前端即使轮询,也只是重复投递,不能精确表达“当前匹配服务可用/不可用” + +## 收敛后的职责划分 + +### login + +`login` 只负责匹配控制面: + +- 接收 `logic` 发来的同步匹配 RPC +- 判断当前匹配服务是否可用 +- 维护匹配队列 +- 找到对手后,记录 match 结果 +- 再通过 Redis 或其他异步方式通知对应 `logic` 开始 Ban/Pick / Battle + +### logic + +`logic` 只负责: + +- 接收前端匹配请求 +- 同步 RPC 到 `login` +- RPC 失败时立即返回“匹配服务不可用” +- RPC 成功时返回“排队中” +- 收到 match 结果后负责真正 `fight.NewFight(...)` +- 对战期间继续使用现有 Redis topic 转发战斗指令 + +### Redis + +Redis 只保留在“对战消息面”: + +- `match_found` +- `ban_pick_submit` +- `battle_command` +- `packet_relay` +- `session_close` + +也就是说: + +- 匹配入口走 RPC +- 对战过程走 Redis + +## 推荐目标链路 + +### 1. 前端加入/更新匹配 + +前端定期轮询 `logic` 的加入/更新接口。 + +`logic` 处理流程: + +1. 校验玩家当前战斗状态 +2. 同步调用 `login` 的匹配 RPC +3. 如果 RPC 成功:返回排队中 +4. 如果 RPC 失败:清理本地匹配状态,返回匹配服务不可用 + +### 2. login 完成匹配 + +`login` 维护排队队列和匹配结果,匹配成功后: + +1. 确定 host / guest 所在 `logic` +2. 通过 Redis 通知两个 `logic` +3. host `logic` 开战 +4. guest `logic` 设置远端代理并进入 Ban/Pick 或战斗态 + +### 3. 对战期间 + +继续复用当前 `logic/service/fight/pvp/service.go` 内的 Redis 指令转发模式: + +- 战斗操作通过 Redis topic 转发 +- host `logic` 维持真实战斗对象 +- guest `logic` 维持 remote proxy + +## 失败语义 + +本阶段不做补偿,不做离线保队列。 + +### login 不在线 + +如果 `logic -> login` RPC 调用失败: + +- 本次匹配直接失败 +- `logic` 清理本地匹配状态 +- 返回前端“匹配服务不可用” + +### 前端轮询停止 + +如果前端不再轮询: + +- 视为用户不再持续请求匹配 +- `logic` 不负责继续保活 +- 是否从 `login` 队列移除,由 `login` 的超时策略决定 + +### login 更新中 + +如果 `login` 正在更新: + +- `logic` 的同步 RPC 会失败 +- 前端当前轮询会收到“匹配服务不可用” +- 等 `login` 恢复后,前端下一轮再发起匹配 + +这是本阶段明确接受的行为,不在后端做补偿。 + +## 最小实现建议 + +### 一、先增加 RPC 健康/匹配接口 + +在 [common/rpc/rpc.go](/workspace/common/rpc/rpc.go) 增加面向 `logic -> login` 的 RPC 方法。 + +建议最小接口: + +- `MatchJoinOrUpdate(PVPMatchJoinPayload) error` +- `MatchCancel(userID) error` + +如果需要单独健康检查,也可以加: + +- `MatchPing() error` + +但在最小方案里,`MatchJoinOrUpdate` 自身就可以承担健康检查职责。 + +### 二、logic 的匹配入口改为同步 RPC + +改造 [logic/controller/fight_巅峰.go](/workspace/logic/controller/fight_巅峰.go#L19) 和 [logic/service/fight/pvp/service.go](/workspace/logic/service/fight/pvp/service.go#L83): + +- 入口不再直接发布 `queue_join` +- 先发 RPC 到 `login` +- 成功才更新本地匹配状态 +- 失败直接返回错误 +- 取消匹配时通过 `MatchCancel` 做 best-effort 清理 + +### 三、保留 Redis 对战链路 + +[logic/service/fight/pvp/service.go](/workspace/logic/service/fight/pvp/service.go#L170) 之后的 Redis 消费、match result 处理、Ban/Pick、战斗 relay 不需要一次性重写,可以继续保留。 + +调整重点是: + +- 不再让匹配入口依赖 PubSub +- 让对战过程继续走 Redis + +## 对前端的要求 + +前端不要无脑重复“新 join”,而是按“轮询更新匹配状态”处理。 + +建议行为: + +1. 首次点击匹配时发一次加入 +2. 匹配中每隔 `3~5s` 轮询一次更新 +3. 如果返回“匹配服务不可用”,前端退出匹配态并提示 +4. 如果返回“已匹配/进入 Ban/Pick”,前端切换到对应界面 + +## 本阶段不做的事 + +以下内容明确不在这次最小改造内: + +- `login` 更新期间的排队保活 +- 持久化消息补偿 +- `login` 重启后的队列恢复 +- Redis Stream 化 +- 多 `login` 实例协调 +- 匹配服务自动拉起目标 `logic` + +## 后续可选增强 + +如果后面要继续提高可用性,可以再逐步演进为: + +1. 匹配入口仍走 RPC +2. `login` 内部把队列落 Redis +3. 加入 ticket 和续租机制 +4. login 更新时支持恢复匹配状态 + +但这不是当前阶段的目标。 + +## 最终收敛结论 + +当前阶段建议明确成一句话: + +`匹配走 RPC,对战走 Redis。` + +对应业务语义: + +- 需要立即判断服务可用性的时候,用 RPC +- 需要跨服转发战斗消息的时候,用 Redis diff --git a/logic/controller/Controller.go b/logic/controller/Controller.go index 697686d1f..3d0f8fbce 100644 --- a/logic/controller/Controller.go +++ b/logic/controller/Controller.go @@ -4,6 +4,7 @@ package controller import ( + "blazing/common/rpc" "blazing/cool" "blazing/logic/service/common" "bytes" @@ -29,6 +30,10 @@ type Controller struct { Kick func(uint32) error RegisterLogic func(uint32, uint32) error + + MatchJoinOrUpdate func(rpc.PVPMatchJoinPayload) error + + MatchCancel func(uint32) error } } diff --git a/logic/controller/fight_巅峰.go b/logic/controller/fight_巅峰.go index a0ef65ff2..fe29db723 100644 --- a/logic/controller/fight_巅峰.go +++ b/logic/controller/fight_巅峰.go @@ -1,6 +1,7 @@ package controller import ( + "blazing/common/rpc" "blazing/common/socket/errorcode" "blazing/logic/service/common" "blazing/logic/service/fight" @@ -21,11 +22,35 @@ func (h Controller) JoINtop(data *PetTOPLEVELnboundInfo, c *player.Player) (resu if err != 0 { return nil, err } + if Maincontroller.RPCClient == nil || Maincontroller.RPCClient.MatchJoinOrUpdate == nil { + pvp.CancelPeakQueue(c) + return nil, errorcode.ErrorCodes.ErrSystemBusyTryLater + } + fightMode, status, err := pvp.NormalizePeakMode(data.Mode) + if err != 0 { + pvp.CancelPeakQueue(c) + return nil, err + } + joinPayload := rpc.PVPMatchJoinPayload{ + RuntimeServerID: h.UID, + UserID: c.Info.UserID, + Nick: c.Info.Nick, + FightMode: fightMode, + Status: status, + CatchTimes: pvp.AvailableCatchTimes(c.GetPetInfo(0)), + } + if callErr := Maincontroller.RPCClient.MatchJoinOrUpdate(joinPayload); callErr != nil { + pvp.CancelPeakQueue(c) + return nil, errorcode.ErrorCodes.ErrSystemBusyTryLater + } return nil, -1 } // CancelPeakQueue 处理控制器请求。 func (h Controller) CancelPeakQueue(data *PeakQueueCancelInboundInfo, c *player.Player) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) { + if Maincontroller.RPCClient != nil && Maincontroller.RPCClient.MatchCancel != nil { + _ = Maincontroller.RPCClient.MatchCancel(c.Info.UserID) + } pvp.CancelPeakQueue(c) return nil, -1 } diff --git a/logic/service/fight/pvp/service.go b/logic/service/fight/pvp/service.go index 65a9988a4..e8d0686c0 100644 --- a/logic/service/fight/pvp/service.go +++ b/logic/service/fight/pvp/service.go @@ -92,28 +92,8 @@ func JoinPeakQueue(p *player.Player, requestedMode uint32) errorcode.ErrorCode { return err } - m := Default() - runtimeServerID := localRuntimeServerID() - ticket := &localQueueTicket{ - playerID: p.Info.UserID, - runtimeServerID: runtimeServerID, - fightMode: fightMode, - status: status, - stop: make(chan struct{}), - } - - m.mu.Lock() - if old := m.localQueues[p.Info.UserID]; old != nil { - old.Stop() - } - m.localQueues[p.Info.UserID] = ticket - delete(m.userSession, p.Info.UserID) - m.mu.Unlock() - p.Fightinfo.Mode = fightMode p.Fightinfo.Status = status - - go m.queueHeartbeatLoop(p, ticket) return 0 } @@ -129,15 +109,19 @@ func CancelPeakQueue(p *player.Player) { m.mu.Unlock() if ticket != nil { ticket.Stop() - _ = publishServerMessage(pvpwire.CoordinatorTopicPrefix, pvpwire.MessageTypeQueueCancel, pvpwire.QueueCancelPayload{ - RuntimeServerID: ticket.runtimeServerID, - UserID: ticket.playerID, - }) } atomic.StoreUint32(&p.Fightinfo.Mode, 0) atomic.StoreUint32(&p.Fightinfo.Status, 0) } +func NormalizePeakMode(requested uint32) (fightMode uint32, status uint32, err errorcode.ErrorCode) { + return normalizePeakMode(requested) +} + +func AvailableCatchTimes(pets []model.PetInfo) []uint32 { + return filterAvailableCatchTimes(pets) +} + func SubmitBanPick(p *player.Player, selected, banned []uint32) errorcode.ErrorCode { if p == nil { return errorcode.ErrorCodes.ErrSystemBusyTryLater