refactor(common/rpc): 移除Redis PubSub心跳机制并优化连接管理

移除Redis PubSub连接的心跳保活功能,因为PubSub连接只应负责订阅和接收,
避免在同一连接上并发执行PING操作。更新了ListenFunc和ListenFight函数,
统一代码结构,移除了context包依赖,并添加了相关注释说明。

feat(logic/pet): 新增宠物技能提交功能

新增CommitPetSkills接口用于一次性提交宠物技能学习/替换/排序结果。
实现技能验证、费用计算和状态更新逻辑,包括新技能学习成本和排序费用。
添加isSameUint32Slice辅助函数用于比较技能数组。
```
This commit is contained in:
昔念
2026-04-12 19:14:18 +08:00
parent f9543a5156
commit 82bb99d141
5 changed files with 131 additions and 75 deletions

View File

@@ -5,7 +5,6 @@ import (
"blazing/logic/service/fight/pvp" "blazing/logic/service/fight/pvp"
"blazing/logic/service/fight/pvpwire" "blazing/logic/service/fight/pvpwire"
"context"
"fmt" "fmt"
"time" "time"
@@ -16,7 +15,8 @@ import (
) )
// ListenFunc 监听函数 // ListenFunc 监听函数
// ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连和心跳保活 // ListenFunc 改造后的 Redis PubSub 监听函数,支持自动重连
// 注意PubSub 连接只负责订阅和接收,避免在同一连接上并发 PING。
func ListenFunc(ctx g.Ctx) { func ListenFunc(ctx g.Ctx) {
if !cool.IsRedisMode { if !cool.IsRedisMode {
panic(gerror.New("集群模式下, 请使用Redis作为缓存")) panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
@@ -24,9 +24,8 @@ func ListenFunc(ctx g.Ctx) {
// 定义常量配置 // 定义常量配置
const ( const (
subscribeTopic = "cool:func" // 订阅的主题 subscribeTopic = "cool:func" // 订阅的主题
retryDelay = 10 * time.Second // 连接失败重试间隔 retryDelay = 10 * time.Second // 连接失败重试间隔
heartbeatInterval = 30 * time.Second // 心跳保活间隔
) )
// 外层循环:负责连接断开后的整体重连 // 外层循环:负责连接断开后的整体重连
@@ -47,47 +46,25 @@ func ListenFunc(ctx g.Ctx) {
continue continue
} }
// 2. 启动心跳保活协程,防止连接因空闲被断开 // 2. 订阅主题
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer func() {
ticker.Stop()
heartbeatCancel()
}()
for {
select {
case <-heartbeatCtx.Done():
cool.Logger.Info(ctx, "心跳协程退出")
return
case <-ticker.C:
// 发送 PING 心跳,保持连接活跃
_, pingErr := conn.Do(ctx, "PING")
if pingErr != nil {
cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr)
// 心跳失败时主动关闭连接,触发外层重连
_ = conn.Close(ctx)
return
}
}
}
}()
// 3. 订阅主题
_, err = conn.Do(ctx, "subscribe", subscribeTopic) _, err = conn.Do(ctx, "subscribe", subscribeTopic)
if err != nil { if err != nil {
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err) cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", subscribeTopic, "error", err)
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) _ = conn.Close(ctx)
time.Sleep(retryDelay) time.Sleep(retryDelay)
continue continue
} }
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic) cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", subscribeTopic)
_, err = conn.Do(ctx, "subscribe", "sun:join") //加入队列 _, err = conn.Do(ctx, "subscribe", "sun:join") //加入队列
if err != nil {
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", "sun:join", "error", err)
_ = conn.Close(ctx)
time.Sleep(retryDelay)
continue
}
cool.Logger.Info(ctx, "成功订阅 Redis 主题", "topic", "sun:join")
// 4. 循环接收消息 // 3. 循环接收消息
connError := false connError := false
for !connError { for !connError {
select { select {
@@ -130,15 +107,15 @@ func ListenFunc(ctx g.Ctx) {
} }
} }
// 5. 清理资源,准备重连 // 4. 清理资源,准备重连
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) // 关闭当前连接 _ = conn.Close(ctx) // 关闭当前连接
// Logger.Warn(ctx, "Redis 连接异常,准备重连", "retry_after", retryDelay) cool.Logger.Info(ctx, "Redis 订阅连接异常,准备重连", "retry_after", retryDelay)
time.Sleep(retryDelay) time.Sleep(retryDelay)
} }
} }
// ListenFight 完全对齐 ListenFunc 写法,修复收不到消息问题 // ListenFight 完全对齐 ListenFunc 写法,修复收不到消息问题
// 注意PubSub 连接只负责订阅和接收,避免在同一连接上并发 PING。
func ListenFight(ctx g.Ctx) { func ListenFight(ctx g.Ctx) {
if !cool.IsRedisMode { if !cool.IsRedisMode {
panic(gerror.New("集群模式下, 请使用Redis作为缓存")) panic(gerror.New("集群模式下, 请使用Redis作为缓存"))
@@ -146,8 +123,7 @@ func ListenFight(ctx g.Ctx) {
// 定义常量配置(对齐 ListenFunc 风格) // 定义常量配置(对齐 ListenFunc 风格)
const ( const (
retryDelay = 10 * time.Second // 连接失败重试间隔 retryDelay = 10 * time.Second // 连接失败重试间隔
heartbeatInterval = 30 * time.Second // 心跳保活间隔
) )
// 提前拼接订阅主题(避免重复拼接,便于日志打印) // 提前拼接订阅主题(避免重复拼接,便于日志打印)
@@ -176,35 +152,7 @@ func ListenFight(ctx g.Ctx) {
continue continue
} }
// 2. 启动心跳保活协程(完全对齐 ListenFunc 逻辑 // 2. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(heartbeatInterval)
defer func() {
ticker.Stop()
heartbeatCancel()
}()
for {
select {
case <-heartbeatCtx.Done():
cool.Logger.Info(ctx, "心跳协程退出")
return
case <-ticker.C:
// 发送 PING 心跳,保持连接活跃
_, pingErr := conn.Do(ctx, "PING")
if pingErr != nil {
cool.Logger.Error(ctx, "Redis 心跳失败,触发重连", "error", pingErr)
// 心跳失败时主动关闭连接,触发外层重连
_ = conn.Close(ctx)
return
}
cool.Logger.Debug(ctx, "Redis 心跳发送成功,连接正常")
}
}
}()
// 3. 订阅主题(对齐 ListenFunc 的错误处理,替换 panic 为优雅重连)
subscribeTopics := []string{startTopic, pvpServerTopic} subscribeTopics := []string{startTopic, pvpServerTopic}
if cool.Config.GameOnlineID == pvp.CoordinatorOnlineID { if cool.Config.GameOnlineID == pvp.CoordinatorOnlineID {
subscribeTopics = append(subscribeTopics, pvpCoordinatorTopic) subscribeTopics = append(subscribeTopics, pvpCoordinatorTopic)
@@ -214,7 +162,6 @@ func ListenFight(ctx g.Ctx) {
_, err = conn.Do(ctx, "subscribe", topic) _, err = conn.Do(ctx, "subscribe", topic)
if err != nil { if err != nil {
cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", topic, "error", err) cool.Logger.Error(ctx, "订阅 Redis 主题失败", "topic", topic, "error", err)
heartbeatCancel()
_ = conn.Close(ctx) _ = conn.Close(ctx)
time.Sleep(retryDelay) time.Sleep(retryDelay)
subscribeFailed = true subscribeFailed = true
@@ -240,7 +187,7 @@ func ListenFight(ctx g.Ctx) {
// 打印监听提示(保留原有日志) // 打印监听提示(保留原有日志)
fmt.Println("监听战斗", startTopic) fmt.Println("监听战斗", startTopic)
// 4. 循环接收消息(完全对齐 ListenFunc 逻辑) // 3. 循环接收消息(完全对齐 ListenFunc 逻辑)
connError := false connError := false
for !connError { for !connError {
select { select {
@@ -282,9 +229,9 @@ func ListenFight(ctx g.Ctx) {
} }
} }
// 5. 清理资源,准备重连(完全对齐 ListenFunc // 4. 清理资源,准备重连(完全对齐 ListenFunc
heartbeatCancel() // 关闭心跳协程
_ = conn.Close(ctx) // 关闭当前连接 _ = conn.Close(ctx) // 关闭当前连接
cool.Logger.Info(ctx, "Redis 战斗订阅连接异常,准备重连", "retry_after", retryDelay)
time.Sleep(retryDelay) time.Sleep(retryDelay)
} }
} }

View File

@@ -99,6 +99,12 @@ type GetPetLearnableSkillsInboundInfo struct {
CatchTime uint32 `json:"catchTime"` CatchTime uint32 `json:"catchTime"`
} }
type CommitPetSkillsInboundInfo struct {
Head common.TomeeHeader `cmd:"52313" struc:"skip"`
CatchTime uint32 `json:"catchTime"`
Skill [4]uint32 `json:"skill"`
}
type C2S_PetFusion struct { type C2S_PetFusion struct {
Head common.TomeeHeader `cmd:"2351" struc:"skip"` Head common.TomeeHeader `cmd:"2351" struc:"skip"`
Mcatchtime uint32 `json:"mcatchtime" msgpack:"mcatchtime"` Mcatchtime uint32 `json:"mcatchtime" msgpack:"mcatchtime"`

View File

@@ -15,6 +15,18 @@ type GetPetLearnableSkillsOutboundInfo struct {
SkillList []uint32 `json:"skillList"` SkillList []uint32 `json:"skillList"`
} }
func isSameUint32Slice(a []uint32, b []uint32) bool {
if len(a) != len(b) {
return false
}
for index := range a {
if a[index] != b[index] {
return false
}
}
return true
}
func collectPetLearnableSkillList(currentPet *model.PetInfo) []uint32 { func collectPetLearnableSkillList(currentPet *model.PetInfo) []uint32 {
skillSet := make(map[uint32]struct{}) skillSet := make(map[uint32]struct{})
skills := make([]uint32, 0) skills := make([]uint32, 0)
@@ -184,3 +196,88 @@ func (h Controller) SortPetSkills(data *C2S_Skill_Sort, c *player.Player) (resul
return nil, 0 return nil, 0
} }
// CommitPetSkills 按最终技能列表一次性提交学习/替换/排序结果。
func (h Controller) CommitPetSkills(
data *CommitPetSkillsInboundInfo,
c *player.Player,
) (result *fight.NullOutboundInfo, err errorcode.ErrorCode) {
const setSkillCost = 50
const skillSortCost = 50
_, currentPet, ok := c.FindPet(data.CatchTime)
if !ok {
return nil, errorcode.ErrorCodes.ErrPokemonNotExists
}
currentSkillSet := make(map[uint32]model.SkillInfo, len(currentPet.SkillList))
currentSkillOrder := make([]uint32, 0, len(currentPet.SkillList))
for _, skill := range currentPet.SkillList {
if skill.ID == 0 {
continue
}
currentSkillSet[skill.ID] = skill
currentSkillOrder = append(currentSkillOrder, skill.ID)
}
finalSkillIDs := make([]uint32, 0, 4)
usedSkillSet := make(map[uint32]struct{}, 4)
for _, skillID := range data.Skill {
if skillID == 0 {
continue
}
if _, exists := usedSkillSet[skillID]; exists {
continue
}
usedSkillSet[skillID] = struct{}{}
finalSkillIDs = append(finalSkillIDs, skillID)
}
if len(finalSkillIDs) == 0 {
return nil, errorcode.ErrorCodes.ErrSystemBusy
}
if len(finalSkillIDs) > 4 {
finalSkillIDs = finalSkillIDs[:4]
}
if isSameUint32Slice(currentSkillOrder, finalSkillIDs) {
return nil, 0
}
learnableSkillSet := make(map[uint32]struct{})
for _, skillID := range collectPetLearnableSkillList(currentPet) {
learnableSkillSet[skillID] = struct{}{}
}
newSkillCount := 0
finalSkillList := make([]model.SkillInfo, 0, len(finalSkillIDs))
for _, skillID := range finalSkillIDs {
if skill, exists := currentSkillSet[skillID]; exists {
finalSkillList = append(finalSkillList, skill)
continue
}
if _, exists := learnableSkillSet[skillID]; !exists {
return nil, errorcode.ErrorCodes.ErrSystemBusy
}
skillInfo, exists := xmlres.SkillMap[int(skillID)]
if !exists {
return nil, errorcode.ErrorCodes.ErrSystemBusy
}
newSkillCount++
finalSkillList = append(finalSkillList, model.SkillInfo{
ID: skillID,
PP: uint32(skillInfo.MaxPP),
})
}
totalCost := int64(newSkillCount * setSkillCost)
if newSkillCount == 0 {
totalCost += int64(skillSortCost)
}
if totalCost > 0 && !c.GetCoins(totalCost) {
return nil, errorcode.ErrorCodes.ErrSunDouInsufficient10016
}
c.Info.Coins -= totalCost
currentPet.SkillList = finalSkillList
return nil, 0
}

View File

@@ -23,3 +23,9 @@ type C2S_Skill_Sort struct {
Skill [4]uint32 `json:"skill_1"` // 技能1对应C# uint skill_1 Skill [4]uint32 `json:"skill_1"` // 技能1对应C# uint skill_1
} }
type CommitPetSkillsInfo struct {
Head common.TomeeHeader `cmd:"52313" struc:"skip"`
CatchTime uint32 `json:"catchTime"`
Skill [4]uint32 `json:"skill"`
}