feat: 实现跨服PVP匹配和战斗功能

This commit is contained in:
xinian
2026-04-05 05:04:04 +08:00
committed by cnb
parent 102d87da3e
commit 7d054bbe91
8 changed files with 1167 additions and 29 deletions

View File

@@ -2,6 +2,8 @@ package rpc
import (
"blazing/cool"
"blazing/logic/service/fight/pvp"
"blazing/logic/service/fight/pvpwire"
"context"
"fmt"
"time"
@@ -9,6 +11,7 @@ import (
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
// ListenFunc 监听函数
@@ -150,6 +153,8 @@ func ListenFight(ctx g.Ctx) {
serverID := cool.Config.ServerInfo.GetID()
startTopic := "sun:start:" + serverID
sendPackTopic := "sendpack:" + serverID
pvpServerTopic := pvpwire.ServerTopic(gconv.Uint32(serverID))
pvpCoordinatorTopic := pvpwire.CoordinatorTopicPrefix
// 外层循环:负责连接断开后的整体重连
for {
@@ -199,16 +204,26 @@ func ListenFight(ctx g.Ctx) {
}()
// 3. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连)
// 订阅 sun:start:服务器ID
_, err = conn.Do(ctx, "subscribe", startTopic)
if err != nil {
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", startTopic, "error", err)
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx)
time.Sleep(retryDelay)
subscribeTopics := []string{startTopic, pvpServerTopic}
if cool.Config.GameOnlineID == pvp.CoordinatorOnlineID {
subscribeTopics = append(subscribeTopics, pvpCoordinatorTopic)
}
subscribeFailed := false
for _, topic := range subscribeTopics {
_, err = conn.Do(ctx, "subscribe", topic)
if err != nil {
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", topic, "error", err)
heartbeatCancel()
_ = conn.Close(ctx)
time.Sleep(retryDelay)
subscribeFailed = true
break
}
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", topic)
}
if subscribeFailed {
continue
}
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", startTopic)
// // 订阅 sun:sendpack:服务器ID
// _, err = conn.Do(ctx, "subscribe", sendPackTopic)
@@ -255,6 +270,10 @@ func ListenFight(ctx g.Ctx) {
// universalClient, _ := g.Redis("cool").Client().(goredis.UniversalClient)
}
if dataMap.Channel == pvpServerTopic || dataMap.Channel == pvpCoordinatorTopic {
pvp.HandleRedisMessage(dataMap.Channel, dataMap.Payload)
}
// 【可选】处理 sun:sendpack:服务器ID 消息(如果需要)
if dataMap.Channel == sendPackTopic {
fmt.Println("收到战斗包", dataMap.Payload)

View File

@@ -2,12 +2,10 @@ package controller
import (
"blazing/common/socket/errorcode"
"blazing/cool"
"blazing/logic/service/common"
"blazing/logic/service/fight"
"blazing/logic/service/fight/info"
"blazing/logic/service/fight/pvp"
"blazing/logic/service/player"
"context"
)
// 表示"宠物王加入"的入站消息数据
@@ -18,14 +16,22 @@ type PetTOPLEVELnboundInfo struct {
}
func (h Controller) JoINtop(data *PetTOPLEVELnboundInfo, c *player.Player) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) {
cool.RedisDo(context.TODO(), "sun:join", info.RPCFightinfo{
PlayerID: c.Info.UserID,
Mode: data.Mode,
Type: 1,
})
// // 类型断言为 UniversalClient
// universalClient, _ := client.(goredis.UniversalClient)
// repo.NewPlayerRepository(universalClient).AddPlayerToPool(context.TODO(), data.Head.UserID, 1)
err = pvp.JoinPeakQueue(c, data.Mode)
if err != 0 {
return nil, err
}
return nil, -1
}
func (h Controller) CancelPeakQueue(data *fight.PeakQueueCancelInboundInfo, c *player.Player) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) {
pvp.CancelPeakQueue(c)
return nil, -1
}
func (h Controller) SubmitPeakBanPick(data *fight.PeakBanPickSubmitInboundInfo, c *player.Player) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) {
err = pvp.SubmitBanPick(c, data.SelectedCatchTimes, data.BanCatchTimes)
if err != 0 {
return nil, err
}
return nil, -1
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/gogf/gf/v2/os/gcmd"
"github.com/gogf/gf/v2/os/gproc"
"blazing/common/data/xmlres"
"blazing/logic/service/player"
"blazing/cool"

View File

@@ -68,6 +68,52 @@ type PetKingJoinInboundInfo struct {
FightType uint32 // 仅当Type为11时有效
}
// PeakQueueCancelInboundInfo 取消跨服巅峰队列。
type PeakQueueCancelInboundInfo struct {
Head common.TomeeHeader `cmd:"2459" struc:"skip"`
}
// PeakBanPickSubmitInboundInfo 提交 ban/pick 结果。
type PeakBanPickSubmitInboundInfo struct {
Head common.TomeeHeader `cmd:"2460" struc:"skip"`
SelectedCatchTimesLen uint32 `struc:"sizeof=SelectedCatchTimes"`
SelectedCatchTimes []uint32 `json:"selectedCatchTimes"`
BanCatchTimesLen uint32 `struc:"sizeof=BanCatchTimes"`
BanCatchTimes []uint32 `json:"banCatchTimes"`
}
// CrossServerBanPickStartOutboundInfo 跨服匹配成功后,通知前端进入 ban/pick。
type CrossServerBanPickPetInfo struct {
CatchTime uint32 `json:"catchTime"`
PetID uint32 `json:"petId"`
Name string `struc:"[16]byte" json:"name"`
Level uint32 `json:"level"`
Hp uint32 `json:"hp"`
MaxHp uint32 `json:"maxHp"`
}
type CrossServerBanPickStartOutboundInfo struct {
SessionIDLen uint32 `struc:"sizeof=SessionID"`
SessionID string `json:"sessionId"`
OpponentUserID uint32 `json:"opponentUserId"`
OpponentNick string `struc:"[16]byte" json:"opponentNick"`
FightMode uint32 `json:"fightMode"`
Status uint32 `json:"status"`
TimeoutSeconds uint32 `json:"timeoutSeconds"`
SelectableCount uint32 `json:"selectableCount"`
MyPetsLen uint32 `struc:"sizeof=MyPets"`
MyPets []CrossServerBanPickPetInfo `json:"myPets"`
OpponentPetsLen uint32 `struc:"sizeof=OpponentPets"`
OpponentPets []CrossServerBanPickPetInfo `json:"opponentPets"`
}
// HandleFightInviteInboundInfo 处理战斗邀请的入站消息
type HandleFightInviteInboundInfo struct {

View File

@@ -0,0 +1,121 @@
package pvp
import (
"blazing/logic/service/common"
"blazing/logic/service/fight/info"
"blazing/logic/service/fight/pvpwire"
"blazing/modules/player/model"
)
type RemoteFightProxy struct {
sessionID string
hostServerID uint32
userID uint32
}
func NewRemoteFightProxy(sessionID string, hostServerID, userID uint32) *RemoteFightProxy {
return &RemoteFightProxy{
sessionID: sessionID,
hostServerID: hostServerID,
userID: userID,
}
}
func (r *RemoteFightProxy) relay(payload pvpwire.BattleCommandPayload) {
if r == nil {
return
}
payload.SessionID = r.sessionID
payload.UserID = r.userID
_ = publishServerMessage(pvpwire.ServerTopic(r.hostServerID), pvpwire.MessageTypeBattleCommand, payload)
}
func (r *RemoteFightProxy) Over(_ common.PlayerI, reason model.EnumBattleOverReason) {
r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandEscape, Reason: uint32(reason)})
}
func (r *RemoteFightProxy) UseSkill(_ common.PlayerI, id uint32) {
r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandUseSkill, SkillID: id})
}
func (r *RemoteFightProxy) UseSkillAt(_ common.PlayerI, id uint32, actorIndex, targetIndex int) {
r.relay(pvpwire.BattleCommandPayload{
Command: pvpwire.CommandUseSkillAt,
SkillID: id,
ActorIndex: actorIndex,
TargetIndex: targetIndex,
})
}
func (r *RemoteFightProxy) GetCurrPET(common.PlayerI) *info.BattlePetEntity {
return nil
}
func (r *RemoteFightProxy) GetCurrPETAt(common.PlayerI, int) *info.BattlePetEntity {
return nil
}
func (r *RemoteFightProxy) GetOverInfo() model.FightOverInfo {
return model.FightOverInfo{}
}
func (r *RemoteFightProxy) Ownerid() uint32 {
return r.userID
}
func (r *RemoteFightProxy) ReadyFight(common.PlayerI) {
r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandReady})
}
func (r *RemoteFightProxy) ChangePet(_ common.PlayerI, id uint32) {
r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandChangePet, CatchTime: id})
}
func (r *RemoteFightProxy) ChangePetAt(_ common.PlayerI, id uint32, actorIndex int) {
r.relay(pvpwire.BattleCommandPayload{
Command: pvpwire.CommandChangePetAt,
CatchTime: id,
ActorIndex: actorIndex,
})
}
func (r *RemoteFightProxy) Capture(common.PlayerI, uint32) {
}
func (r *RemoteFightProxy) LoadPercent(_ common.PlayerI, percent int32) {
r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandLoadPercent, Percent: percent})
}
func (r *RemoteFightProxy) UseItem(_ common.PlayerI, catchTime, itemID uint32) {
r.relay(pvpwire.BattleCommandPayload{
Command: pvpwire.CommandUseItem,
CatchTime: catchTime,
ItemID: itemID,
})
}
func (r *RemoteFightProxy) UseItemAt(_ common.PlayerI, catchTime, itemID uint32, actorIndex, targetIndex int) {
r.relay(pvpwire.BattleCommandPayload{
Command: pvpwire.CommandUseItemAt,
CatchTime: catchTime,
ItemID: itemID,
ActorIndex: actorIndex,
TargetIndex: targetIndex,
})
}
func (r *RemoteFightProxy) Chat(_ common.PlayerI, msg string) {
r.relay(pvpwire.BattleCommandPayload{Command: pvpwire.CommandChat, Message: msg})
}
func (r *RemoteFightProxy) IsFirst(common.PlayerI) bool {
return false
}
func (r *RemoteFightProxy) GetOverChan() chan struct{} {
return nil
}
func (r *RemoteFightProxy) GetAttackValue(bool) *model.AttackValue {
return nil
}

View File

@@ -0,0 +1,791 @@
package pvp
import (
"blazing/common/socket/errorcode"
"blazing/cool"
"blazing/logic/service/common"
"blazing/logic/service/fight"
fightinfo "blazing/logic/service/fight/info"
"blazing/logic/service/fight/pvpwire"
"blazing/logic/service/player"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"blazing/modules/player/model"
blservice "blazing/modules/player/service"
)
const (
CoordinatorOnlineID = 1
queueHeartbeat = 3 * time.Second
queueTTL = 12 * time.Second
banPickTimeout = 45 * time.Second
banPickStartCmd = 2461
)
type localQueueTicket struct {
playerID uint32
runtimeServerID uint32
fightMode uint32
status uint32
stop chan struct{}
stopped uint32
}
func (t *localQueueTicket) Stop() {
if t == nil {
return
}
if atomic.CompareAndSwapUint32(&t.stopped, 0, 1) {
close(t.stop)
}
}
type session struct {
payload pvpwire.MatchFoundPayload
hostSubmit *pvpwire.BanPickSubmitPayload
guestSubmit *pvpwire.BanPickSubmitPayload
stage string
hostedFight common.FightI
remoteProxy common.PlayerI
banPickDeadline time.Time
}
type manager struct {
mu sync.RWMutex
queues map[uint32][]pvpwire.QueuePlayerSnapshot
lastSeen map[uint32]time.Time
localQueues map[uint32]*localQueueTicket
sessions map[string]*session
userSession map[uint32]string
}
var defaultManager = &manager{
queues: make(map[uint32][]pvpwire.QueuePlayerSnapshot),
lastSeen: make(map[uint32]time.Time),
localQueues: make(map[uint32]*localQueueTicket),
sessions: make(map[string]*session),
userSession: make(map[uint32]string),
}
func Default() *manager {
return defaultManager
}
func JoinPeakQueue(p *player.Player, requestedMode uint32) errorcode.ErrorCode {
if p == nil {
return errorcode.ErrorCodes.ErrSystemBusyTryLater
}
if err := p.CanFight(); err != 0 {
return err
}
fightMode, status, err := normalizePeakMode(requestedMode)
if err != 0 {
return err
}
m := Default()
runtimeServerID := localRuntimeServerID()
ticket := &localQueueTicket{
playerID: p.Info.UserID,
runtimeServerID: runtimeServerID,
fightMode: fightMode,
status: status,
stop: make(chan struct{}),
}
m.mu.Lock()
if old := m.localQueues[p.Info.UserID]; old != nil {
old.Stop()
}
m.localQueues[p.Info.UserID] = ticket
delete(m.userSession, p.Info.UserID)
m.mu.Unlock()
p.Fightinfo.Mode = fightMode
p.Fightinfo.Status = status
go m.queueHeartbeatLoop(p, ticket)
return 0
}
func CancelPeakQueue(p *player.Player) {
if p == nil {
return
}
m := Default()
var ticket *localQueueTicket
m.mu.Lock()
ticket = m.localQueues[p.Info.UserID]
delete(m.localQueues, p.Info.UserID)
m.mu.Unlock()
if ticket != nil {
ticket.Stop()
_ = publishServerMessage(pvpwire.CoordinatorTopicPrefix, pvpwire.MessageTypeQueueCancel, pvpwire.QueueCancelPayload{
RuntimeServerID: ticket.runtimeServerID,
UserID: ticket.playerID,
})
}
atomic.StoreUint32(&p.Fightinfo.Mode, 0)
atomic.StoreUint32(&p.Fightinfo.Status, 0)
}
func SubmitBanPick(p *player.Player, selected, banned []uint32) errorcode.ErrorCode {
if p == nil {
return errorcode.ErrorCodes.ErrSystemBusyTryLater
}
m := Default()
m.mu.RLock()
sessionID := m.userSession[p.Info.UserID]
s := m.sessions[sessionID]
m.mu.RUnlock()
if s == nil {
return errorcode.ErrorCodes.ErrBattleEnded
}
payload := pvpwire.BanPickSubmitPayload{
SessionID: sessionID,
UserID: p.Info.UserID,
SelectedCatchTimes: append([]uint32(nil), selected...),
BanCatchTimes: append([]uint32(nil), banned...),
}
if s.payload.Host.RuntimeServerID == localRuntimeServerID() {
m.applyBanPickSubmit(payload)
return 0
}
return publishServerMessage(pvpwire.ServerTopic(s.payload.Host.RuntimeServerID), pvpwire.MessageTypeBanPickSubmit, payload)
}
func HandleRedisMessage(channel, raw string) {
Default().handleRedisMessage(channel, raw)
}
func (m *manager) handleRedisMessage(channel, raw string) {
var envelope pvpwire.Envelope
if err := json.Unmarshal([]byte(raw), &envelope); err != nil {
cool.Logger.Warning(context.Background(), "pvp redis payload parse failed", err, raw)
return
}
switch envelope.Type {
case pvpwire.MessageTypeQueueJoin:
if !isCoordinator() {
return
}
var payload pvpwire.QueueJoinPayload
if decodeBody(envelope.Body, &payload) {
m.handleQueueJoin(payload)
}
case pvpwire.MessageTypeQueueCancel:
if !isCoordinator() {
return
}
var payload pvpwire.QueueCancelPayload
if decodeBody(envelope.Body, &payload) {
m.handleQueueCancel(payload)
}
case pvpwire.MessageTypeMatchFound:
var payload pvpwire.MatchFoundPayload
if decodeBody(envelope.Body, &payload) {
m.handleMatchFound(payload)
}
case pvpwire.MessageTypeBanPickSubmit:
var payload pvpwire.BanPickSubmitPayload
if decodeBody(envelope.Body, &payload) {
m.applyBanPickSubmit(payload)
}
case pvpwire.MessageTypeBattleCommand:
var payload pvpwire.BattleCommandPayload
if decodeBody(envelope.Body, &payload) {
m.handleBattleCommand(payload)
}
case pvpwire.MessageTypePacketRelay:
var payload pvpwire.PacketRelayPayload
if decodeBody(envelope.Body, &payload) {
m.handlePacketRelay(payload)
}
case pvpwire.MessageTypeSessionClose:
var payload pvpwire.SessionClosePayload
if decodeBody(envelope.Body, &payload) {
m.closeSession(payload.SessionID, payload.Reason)
}
}
}
func (m *manager) queueHeartbeatLoop(p *player.Player, ticket *localQueueTicket) {
ticker := time.NewTicker(queueHeartbeat)
defer ticker.Stop()
send := func() bool {
if p == nil || p.Info == nil {
return false
}
payload := pvpwire.QueueJoinPayload{
Player: pvpwire.QueuePlayerSnapshot{
RuntimeServerID: ticket.runtimeServerID,
UserID: p.Info.UserID,
Nick: p.Info.Nick,
FightMode: ticket.fightMode,
Status: ticket.status,
JoinedAtUnix: time.Now().Unix(),
CatchTimes: filterAvailableCatchTimes(p.GetPetInfo(0)),
},
}
if err := publishServerMessage(pvpwire.CoordinatorTopicPrefix, pvpwire.MessageTypeQueueJoin, payload); err != nil {
cool.Logger.Warning(context.Background(), "peak queue publish failed", err)
}
return true
}
if !send() {
return
}
for {
select {
case <-ticket.stop:
return
case <-ticker.C:
send()
}
}
}
func (m *manager) handleQueueJoin(payload pvpwire.QueueJoinPayload) {
now := time.Now()
m.mu.Lock()
defer m.mu.Unlock()
m.pruneExpiredQueueLocked(now)
playerInfo := payload.Player
m.lastSeen[playerInfo.UserID] = now
queue := m.queues[playerInfo.FightMode]
for idx, queued := range queue {
if queued.UserID == playerInfo.UserID {
queue[idx] = playerInfo
m.queues[playerInfo.FightMode] = queue
return
}
}
if len(queue) > 0 {
host := queue[0]
queue = queue[1:]
m.queues[playerInfo.FightMode] = queue
delete(m.lastSeen, host.UserID)
delete(m.lastSeen, playerInfo.UserID)
sessionID := buildSessionID(host.UserID, playerInfo.UserID)
match := pvpwire.MatchFoundPayload{
SessionID: sessionID,
Stage: pvpwire.StageBanPick,
Host: host,
Guest: playerInfo,
BanPickTimeout: uint32(banPickTimeout / time.Second),
}
_ = publishServerMessage(pvpwire.ServerTopic(host.RuntimeServerID), pvpwire.MessageTypeMatchFound, match)
if playerInfo.RuntimeServerID != host.RuntimeServerID {
_ = publishServerMessage(pvpwire.ServerTopic(playerInfo.RuntimeServerID), pvpwire.MessageTypeMatchFound, match)
}
return
}
m.queues[playerInfo.FightMode] = append(queue, playerInfo)
}
func (m *manager) handleQueueCancel(payload pvpwire.QueueCancelPayload) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.lastSeen, payload.UserID)
for mode, queue := range m.queues {
next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue))
for _, queued := range queue {
if queued.UserID == payload.UserID {
continue
}
next = append(next, queued)
}
m.queues[mode] = next
}
}
func (m *manager) handleMatchFound(payload pvpwire.MatchFoundPayload) {
m.mu.Lock()
if _, ok := m.sessions[payload.SessionID]; ok {
m.mu.Unlock()
return
}
s := &session{
payload: payload,
stage: pvpwire.StageBanPick,
banPickDeadline: time.Now().Add(time.Duration(payload.BanPickTimeout) * time.Second),
}
m.sessions[payload.SessionID] = s
m.userSession[payload.Host.UserID] = payload.SessionID
m.userSession[payload.Guest.UserID] = payload.SessionID
if queued := m.localQueues[payload.Host.UserID]; queued != nil {
queued.Stop()
delete(m.localQueues, payload.Host.UserID)
}
if queued := m.localQueues[payload.Guest.UserID]; queued != nil {
queued.Stop()
delete(m.localQueues, payload.Guest.UserID)
}
m.mu.Unlock()
if local := playerByUserID(payload.Host.UserID); local != nil {
local.Fightinfo.Mode = payload.Host.FightMode
local.Fightinfo.Status = payload.Host.Status
local.SendPackCmd(banPickStartCmd, newBanPickStartInfo(payload, payload.Host, payload.Guest))
}
if local := playerByUserID(payload.Guest.UserID); local != nil {
local.Fightinfo.Mode = payload.Guest.FightMode
local.Fightinfo.Status = payload.Guest.Status
if payload.Host.RuntimeServerID != payload.Guest.RuntimeServerID {
local.SetFightC(NewRemoteFightProxy(payload.SessionID, payload.Host.RuntimeServerID, payload.Guest.UserID))
}
local.SendPackCmd(banPickStartCmd, newBanPickStartInfo(payload, payload.Guest, payload.Host))
}
if payload.Host.RuntimeServerID == localRuntimeServerID() {
time.AfterFunc(time.Duration(payload.BanPickTimeout)*time.Second, func() {
m.mu.RLock()
sessionID := m.userSession[payload.Host.UserID]
current := m.sessions[sessionID]
m.mu.RUnlock()
if current == nil || current.stage != pvpwire.StageBanPick {
return
}
m.launchBattle(payload.SessionID)
})
}
}
func (m *manager) applyBanPickSubmit(payload pvpwire.BanPickSubmitPayload) {
m.mu.Lock()
s := m.sessions[payload.SessionID]
if s == nil || s.stage != pvpwire.StageBanPick {
m.mu.Unlock()
return
}
if s.payload.Host.UserID == payload.UserID {
copyPayload := payload
s.hostSubmit = &copyPayload
} else if s.payload.Guest.UserID == payload.UserID {
copyPayload := payload
s.guestSubmit = &copyPayload
}
ready := s.hostSubmit != nil && s.guestSubmit != nil
m.mu.Unlock()
if ready {
m.launchBattle(payload.SessionID)
}
}
func (m *manager) launchBattle(sessionID string) {
m.mu.Lock()
s := m.sessions[sessionID]
if s == nil || s.stage != pvpwire.StageBanPick {
m.mu.Unlock()
return
}
s.stage = pvpwire.StageFighting
hostSnapshot := s.payload.Host
guestSnapshot := s.payload.Guest
hostSubmit := s.hostSubmit
guestSubmit := s.guestSubmit
m.mu.Unlock()
hostPlayer := playerByUserID(hostSnapshot.UserID)
if hostPlayer == nil {
m.closeSession(sessionID, "host_offline")
return
}
hostPlayer.Fightinfo.Mode = hostSnapshot.FightMode
hostPlayer.Fightinfo.Status = hostSnapshot.Status
petLimit := battlePetLimit(hostSnapshot.FightMode)
hostPets := buildBattlePets(hostSnapshot.CatchTimes, hostSubmit, guestSubmit, petLimit)
guestPets := buildBattlePets(guestSnapshot.CatchTimes, guestSubmit, hostSubmit, petLimit)
var (
fc common.FightI
fightErr errorcode.ErrorCode
)
if hostSnapshot.RuntimeServerID == guestSnapshot.RuntimeServerID {
guestPlayer := playerByUserID(guestSnapshot.UserID)
if guestPlayer == nil {
m.closeSession(sessionID, "guest_offline")
return
}
guestPlayer.Fightinfo.Mode = guestSnapshot.FightMode
guestPlayer.Fightinfo.Status = guestSnapshot.Status
fc, fightErr = fight.NewFight(hostPlayer, guestPlayer, hostPets, guestPets, func(model.FightOverInfo) {
m.onBattleFinished(sessionID)
})
} else {
remote := player.NewRPCPlayer(&model.PlayerInfo{
UserID: guestSnapshot.UserID,
Nick: guestSnapshot.Nick,
PetList: append([]model.PetInfo(nil), guestPets...),
}, guestSnapshot.RuntimeServerID, fightinfo.Fightinfo{
Mode: guestSnapshot.FightMode,
Status: guestSnapshot.Status,
}, sessionID)
fc, fightErr = fight.NewFight(hostPlayer, remote, hostPets, guestPets, func(model.FightOverInfo) {
m.onBattleFinished(sessionID)
})
m.mu.Lock()
if current := m.sessions[sessionID]; current != nil {
current.remoteProxy = remote
}
m.mu.Unlock()
}
if fightErr != 0 {
m.closeSession(sessionID, "create_fight_failed")
return
}
m.mu.Lock()
if current := m.sessions[sessionID]; current != nil {
current.hostedFight = fc
}
m.mu.Unlock()
}
func (m *manager) handleBattleCommand(payload pvpwire.BattleCommandPayload) {
m.mu.RLock()
s := m.sessions[payload.SessionID]
var fightController common.FightI
var proxy common.PlayerI
if s != nil {
fightController = s.hostedFight
proxy = s.remoteProxy
}
m.mu.RUnlock()
if fightController == nil {
return
}
if proxy == nil {
proxy = player.NewRPCPlayer(&model.PlayerInfo{UserID: payload.UserID}, 0, fightinfo.Fightinfo{}, payload.SessionID)
}
switch payload.Command {
case pvpwire.CommandReady:
go fightController.ReadyFight(proxy)
case pvpwire.CommandUseSkill:
go fightController.UseSkill(proxy, payload.SkillID)
case pvpwire.CommandUseSkillAt:
go fightController.UseSkillAt(proxy, payload.SkillID, payload.ActorIndex, payload.TargetIndex)
case pvpwire.CommandEscape:
go fightController.Over(proxy, model.EnumBattleOverReason(payload.Reason))
case pvpwire.CommandChangePet:
go fightController.ChangePet(proxy, payload.CatchTime)
case pvpwire.CommandChangePetAt:
go fightController.ChangePetAt(proxy, payload.CatchTime, payload.ActorIndex)
case pvpwire.CommandLoadPercent:
go fightController.LoadPercent(proxy, payload.Percent)
case pvpwire.CommandUseItem:
go fightController.UseItem(proxy, payload.CatchTime, payload.ItemID)
case pvpwire.CommandUseItemAt:
go fightController.UseItemAt(proxy, payload.CatchTime, payload.ItemID, payload.ActorIndex, payload.TargetIndex)
case pvpwire.CommandChat:
go fightController.Chat(proxy, payload.Message)
}
}
func (m *manager) handlePacketRelay(payload pvpwire.PacketRelayPayload) {
packet, err := base64.StdEncoding.DecodeString(payload.Packet)
if err != nil {
return
}
local := playerByUserID(payload.UserID)
if local == nil {
return
}
_ = local.SendPack(packet)
if payload.Cmd == 2506 {
local.QuitFight()
}
}
func (m *manager) onBattleFinished(sessionID string) {
_ = publishServerMessage(pvpwire.ServerTopic(localRuntimeServerID()), pvpwire.MessageTypeSessionClose, pvpwire.SessionClosePayload{
SessionID: sessionID,
Reason: "battle_finished",
})
m.mu.RLock()
s := m.sessions[sessionID]
m.mu.RUnlock()
if s != nil && s.payload.Guest.RuntimeServerID != 0 && s.payload.Guest.RuntimeServerID != localRuntimeServerID() {
_ = publishServerMessage(pvpwire.ServerTopic(s.payload.Guest.RuntimeServerID), pvpwire.MessageTypeSessionClose, pvpwire.SessionClosePayload{
SessionID: sessionID,
Reason: "battle_finished",
})
}
}
func (m *manager) closeSession(sessionID, reason string) {
m.mu.Lock()
s := m.sessions[sessionID]
if s == nil {
m.mu.Unlock()
return
}
delete(m.userSession, s.payload.Host.UserID)
delete(m.userSession, s.payload.Guest.UserID)
delete(m.sessions, sessionID)
m.mu.Unlock()
if local := playerByUserID(s.payload.Host.UserID); local != nil && reason != "battle_finished" {
local.QuitFight()
}
if local := playerByUserID(s.payload.Guest.UserID); local != nil {
local.QuitFight()
}
}
func (m *manager) pruneExpiredQueueLocked(now time.Time) {
for mode, queue := range m.queues {
next := make([]pvpwire.QueuePlayerSnapshot, 0, len(queue))
for _, queued := range queue {
last := m.lastSeen[queued.UserID]
if last.IsZero() || now.Sub(last) > queueTTL {
delete(m.lastSeen, queued.UserID)
continue
}
next = append(next, queued)
}
m.queues[mode] = next
}
}
func newBanPickStartInfo(match pvpwire.MatchFoundPayload, self, opponent pvpwire.QueuePlayerSnapshot) *fight.CrossServerBanPickStartOutboundInfo {
myPets := buildBanPickPetInfos(self.CatchTimes)
opponentPets := buildBanPickPetInfos(opponent.CatchTimes)
return &fight.CrossServerBanPickStartOutboundInfo{
SessionID: match.SessionID,
OpponentUserID: opponent.UserID,
OpponentNick: opponent.Nick,
FightMode: self.FightMode,
Status: self.Status,
TimeoutSeconds: match.BanPickTimeout,
SelectableCount: uint32(minInt(battlePetLimit(self.FightMode), len(myPets))),
MyPets: myPets,
OpponentPets: opponentPets,
}
}
func buildBattlePets(allCatchTimes []uint32, own, opp *pvpwire.BanPickSubmitPayload, limit int) []model.PetInfo {
if len(allCatchTimes) == 0 {
return nil
}
banned := make(map[uint32]struct{})
if opp != nil {
for _, catchTime := range opp.BanCatchTimes {
if catchTime == 0 {
continue
}
banned[catchTime] = struct{}{}
}
}
filtered := make([]uint32, 0, len(allCatchTimes))
for _, catchTime := range allCatchTimes {
if catchTime == 0 {
continue
}
if _, ok := banned[catchTime]; ok {
continue
}
filtered = append(filtered, catchTime)
}
if len(filtered) == 0 {
filtered = append(filtered, allCatchTimes...)
}
if own == nil || len(own.SelectedCatchTimes) == 0 {
return resolveBattlePets(filtered, limit)
}
selectedMap := make(map[uint32]struct{}, len(filtered))
for _, catchTime := range filtered {
selectedMap[catchTime] = struct{}{}
}
selectedCatchTimes := make([]uint32, 0, len(own.SelectedCatchTimes))
used := make(map[uint32]struct{}, len(own.SelectedCatchTimes))
for _, catchTime := range own.SelectedCatchTimes {
if catchTime == 0 {
continue
}
if _, ok := selectedMap[catchTime]; !ok {
continue
}
if _, exists := used[catchTime]; exists {
continue
}
used[catchTime] = struct{}{}
selectedCatchTimes = append(selectedCatchTimes, catchTime)
if limit > 0 && len(selectedCatchTimes) >= limit {
break
}
}
if len(selectedCatchTimes) == 0 {
return resolveBattlePets(filtered, limit)
}
return resolveBattlePets(selectedCatchTimes, limit)
}
func resolveBattlePets(catchTimes []uint32, limit int) []model.PetInfo {
clampedCatchTimes := clampCatchTimes(catchTimes, limit)
if len(clampedCatchTimes) == 0 {
return nil
}
result := make([]model.PetInfo, 0, len(clampedCatchTimes))
petService := blservice.NewPetService(0)
for _, catchTime := range clampedCatchTimes {
pet := petService.PetInfoOneByCatchTime(catchTime)
if pet == nil || pet.Data.ID == 0 {
continue
}
result = append(result, pet.Data)
}
return result
}
func clampCatchTimes(catchTimes []uint32, limit int) []uint32 {
if len(catchTimes) == 0 {
return nil
}
if limit <= 0 || len(catchTimes) <= limit {
return append([]uint32(nil), catchTimes...)
}
return append([]uint32(nil), catchTimes[:limit]...)
}
func filterAvailableCatchTimes(pets []model.PetInfo) []uint32 {
result := make([]uint32, 0, len(pets))
used := make(map[uint32]struct{}, len(pets))
for _, pet := range pets {
if pet.Hp == 0 || pet.CatchTime == 0 {
continue
}
if _, exists := used[pet.CatchTime]; exists {
continue
}
used[pet.CatchTime] = struct{}{}
result = append(result, pet.CatchTime)
}
return result
}
func buildBanPickPetInfos(catchTimes []uint32) []fight.CrossServerBanPickPetInfo {
result := make([]fight.CrossServerBanPickPetInfo, 0, len(catchTimes))
petService := blservice.NewPetService(0)
for _, catchTime := range catchTimes {
pet := petService.PetInfoOneByCatchTime(catchTime)
if pet == nil || pet.Data.ID == 0 {
continue
}
result = append(result, fight.CrossServerBanPickPetInfo{
CatchTime: pet.Data.CatchTime,
PetID: pet.Data.ID,
Name: pet.Data.Name,
Level: pet.Data.Level,
Hp: pet.Data.Hp,
MaxHp: pet.Data.MaxHp,
})
}
return result
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func battlePetLimit(fightMode uint32) int {
if fightMode == fightinfo.BattleMode.SINGLE_MODE {
return 1
}
return 6
}
func normalizePeakMode(requested uint32) (fightMode uint32, status uint32, err errorcode.ErrorCode) {
switch requested {
case 19:
return fightinfo.BattleMode.SINGLE_MODE, requested, 0
case 20:
return fightinfo.BattleMode.MULTI_MODE, requested, 0
default:
return 0, 0, errorcode.ErrorCodes.ErrSystemError
}
}
func playerByUserID(userID uint32) *player.Player {
client, ok := player.Mainplayer.Load(userID)
if !ok || client == nil {
return nil
}
return client.Player
}
func buildSessionID(hostUserID, guestUserID uint32) string {
return fmt.Sprintf("xsvr-%d-%d-%d", hostUserID, guestUserID, time.Now().UnixNano())
}
func publishServerMessage(topic, msgType string, body any) errorcode.ErrorCode {
payload, err := json.Marshal(body)
if err != nil {
return errorcode.ErrorCodes.ErrSystemBusyTryLater
}
envelope, err := json.Marshal(pvpwire.Envelope{
Type: msgType,
Body: payload,
})
if err != nil {
return errorcode.ErrorCodes.ErrSystemBusyTryLater
}
conn, err := cool.Redis.Conn(context.Background())
if err != nil {
return errorcode.ErrorCodes.ErrSystemBusyTryLater
}
defer conn.Close(context.Background())
_, err = conn.Do(context.Background(), "publish", topic, envelope)
if err != nil {
return errorcode.ErrorCodes.ErrSystemBusyTryLater
}
return 0
}
func decodeBody(body []byte, target any) bool {
return json.Unmarshal(body, target) == nil
}
func isCoordinator() bool {
return cool.Config.GameOnlineID == CoordinatorOnlineID
}
func localRuntimeServerID() uint32 {
id, _ := strconv.ParseUint(strings.TrimSpace(cool.Config.ServerInfo.GetID()), 10, 32)
return uint32(id)
}

View File

@@ -0,0 +1,109 @@
package pvpwire
import (
"fmt"
)
const (
CoordinatorTopicPrefix = "sun:pvp:coordinator"
ServerTopicPrefix = "sun:pvp:server"
)
const (
MessageTypeQueueJoin = "queue_join"
MessageTypeQueueCancel = "queue_cancel"
MessageTypeMatchFound = "match_found"
MessageTypeBanPickSubmit = "ban_pick_submit"
MessageTypeBattleCommand = "battle_command"
MessageTypePacketRelay = "packet_relay"
MessageTypeSessionClose = "session_close"
)
const (
StageQueued = "queued"
StageBanPick = "ban_pick"
StageFighting = "fighting"
StageClosed = "closed"
)
const (
CommandReady = "ready"
CommandUseSkill = "use_skill"
CommandUseSkillAt = "use_skill_at"
CommandEscape = "escape"
CommandChangePet = "change_pet"
CommandChangePetAt = "change_pet_at"
CommandLoadPercent = "load_percent"
CommandUseItem = "use_item"
CommandUseItemAt = "use_item_at"
CommandChat = "chat"
)
type Envelope struct {
Type string `json:"type"`
Body []byte `json:"body"`
}
type QueuePlayerSnapshot struct {
RuntimeServerID uint32 `json:"runtimeServerId"`
UserID uint32 `json:"userId"`
Nick string `json:"nick"`
FightMode uint32 `json:"fightMode"`
Status uint32 `json:"status"`
JoinedAtUnix int64 `json:"joinedAtUnix"`
CatchTimes []uint32 `json:"catchTimes"`
}
type QueueJoinPayload struct {
Player QueuePlayerSnapshot `json:"player"`
}
type QueueCancelPayload struct {
RuntimeServerID uint32 `json:"runtimeServerId"`
UserID uint32 `json:"userId"`
}
type MatchFoundPayload struct {
SessionID string `json:"sessionId"`
Stage string `json:"stage"`
Host QueuePlayerSnapshot `json:"host"`
Guest QueuePlayerSnapshot `json:"guest"`
BanPickTimeout uint32 `json:"banPickTimeout"`
}
type BanPickSubmitPayload struct {
SessionID string `json:"sessionId"`
UserID uint32 `json:"userId"`
SelectedCatchTimes []uint32 `json:"selectedCatchTimes"`
BanCatchTimes []uint32 `json:"banCatchTimes"`
}
type BattleCommandPayload struct {
SessionID string `json:"sessionId"`
UserID uint32 `json:"userId"`
Command string `json:"command"`
SkillID uint32 `json:"skillId,omitempty"`
ActorIndex int `json:"actorIndex,omitempty"`
TargetIndex int `json:"targetIndex,omitempty"`
CatchTime uint32 `json:"catchTime,omitempty"`
ItemID uint32 `json:"itemId,omitempty"`
Percent int32 `json:"percent,omitempty"`
Message string `json:"message,omitempty"`
Reason uint32 `json:"reason,omitempty"`
}
type PacketRelayPayload struct {
SessionID string `json:"sessionId"`
UserID uint32 `json:"userId"`
Cmd uint32 `json:"cmd"`
Packet string `json:"packet"`
}
type SessionClosePayload struct {
SessionID string `json:"sessionId"`
Reason string `json:"reason"`
}
func ServerTopic(runtimeServerID uint32) string {
return fmt.Sprintf("%s:%d", ServerTopicPrefix, runtimeServerID)
}

View File

@@ -1,24 +1,71 @@
package player
import (
"blazing/common/socket/errorcode"
"blazing/cool"
"blazing/logic/service/common"
"blazing/logic/service/fight/info"
"blazing/logic/service/fight/pvpwire"
"blazing/modules/player/model"
"context"
"encoding/base64"
"encoding/json"
)
// rpc,跨服匹配的玩家,只做数据的传输
// RPC_player 是跨服战斗中的远端玩家代理,只承载快照和发包能力。
type RPC_player struct {
baseplayer
//
serviceid uint32 //玩家所在的ID
serviceid uint32
fightinfo info.Fightinfo
sessionID string
}
func NewRPCPlayer(info *model.PlayerInfo, serviceID uint32, fightInfo info.Fightinfo, sessionID string) *RPC_player {
ret := &RPC_player{
baseplayer: newbaseplayer(),
serviceid: serviceID,
fightinfo: fightInfo,
sessionID: sessionID,
}
ret.Info = info
return ret
}
func (f *RPC_player) SendPackCmd(cmd uint32, data any) {
if f == nil || f.Info == nil || f.serviceid == 0 {
return
}
conn, _ := cool.Redis.Conn(context.TODO())
packet := common.NewTomeeHeader(cmd, f.Info.UserID).Pack(data)
payload, err := json.Marshal(pvpwire.PacketRelayPayload{
SessionID: f.sessionID,
UserID: f.Info.UserID,
Cmd: cmd,
Packet: base64.StdEncoding.EncodeToString(packet),
})
if err != nil {
return
}
envelope, err := json.Marshal(pvpwire.Envelope{
Type: pvpwire.MessageTypePacketRelay,
Body: payload,
})
if err != nil {
return
}
conn, err := cool.Redis.Conn(context.TODO())
if err != nil || conn == nil {
return
}
defer conn.Close(context.TODO())
conn.Do(context.TODO(), "publish", "sun:send", cmd, data)
//fmt.Println("战斗结束")
_, _ = conn.Do(context.TODO(), "publish", pvpwire.ServerTopic(f.serviceid), envelope)
}
func (f *RPC_player) Getfightinfo() info.Fightinfo {
return f.fightinfo
}
func (f *RPC_player) CanFight() errorcode.ErrorCode {
return 0
}