From dffd6a63a6edef82a7c6ea69c3d908542911cd70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=94=E5=BF=B5?= <12574910+72wo@users.noreply.github.com> Date: Wed, 4 Mar 2026 01:30:40 +0800 Subject: [PATCH] =?UTF-8?q?```=20feat(player):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构ClientData的消息处理流程,将OnEvent方法改为非阻塞的通道投递模式, 新增MsgChan用于异步消息传递,避免eventloop阻塞问题。 fix(fight): 修复宠物闪光属性过滤条件 在initplayer方法中增加color.Alpha不为0的判断条件,确保只有有效的 闪光属性才会被添加到宠物信息中。 refactor(socket): 调整服务器事件处理逻辑 移除未使用的Lockfree库依赖,注释掉不再需要的连接关闭资源释放代码, 调整事件处理的工作池提交逻辑。 feat(rpc): 新增Redis发布功能 为RPC_player添加SendPackCmd方法,通过Redis的publish命令实现 跨服数据传输功能。 ``` --- common/socket/ServerEvent.go | 13 ++++--- logic/service/fight/input.go | 2 +- logic/service/player/pack.go | 70 ++++++++++++++++++++++++++---------- logic/service/player/rpc.go | 17 +++++++++ 4 files changed, 78 insertions(+), 24 deletions(-) diff --git a/common/socket/ServerEvent.go b/common/socket/ServerEvent.go index a742bdada..cbfb86692 100644 --- a/common/socket/ServerEvent.go +++ b/common/socket/ServerEvent.go @@ -86,7 +86,8 @@ func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) { //logging.Infof("conn[%v] disconnected", c.RemoteAddr().String()) v, _ := c.Context().(*player.ClientData) - v.LF.Close() + //v.LF.Close() + // v.LF.Close() if v.Player != nil { v.Player.Save() //保存玩家数据 @@ -281,10 +282,12 @@ func (s *Server) onevent(c gnet.Conn, v []byte) { } else { header.Data = nil // 避免空切片分配 } - //t.OnEvent(header) - s.workerPool.Submit(func() { - t.LF.Producer().Write(header) - }) + t.OnEvent(header) + // t.LF.Push(header) + // s.workerPool.Submit(func() { + // t.LF.Push(header) + // // t.LF.Producer().Write(header) + // }) } } diff --git a/logic/service/fight/input.go b/logic/service/fight/input.go index 7b412c084..c5c146008 100644 --- a/logic/service/fight/input.go +++ b/logic/service/fight/input.go @@ -179,7 +179,7 @@ func (f *FightC) initplayer(c common.PlayerI) (*input.Input, errorcode.ErrorCode pet := model.GenPetInfo(int(v.MonID), 24, int(v.Nature), int(v.Effect[0]), int(v.Lv), nil, 0) var color data.GlowFilter err := json.Unmarshal([]byte(v.Color), &color) - if err == nil { + if err == nil && color.Alpha != 0 { pet.ShinyInfo = append(pet.ShinyInfo, color) } diff --git a/logic/service/player/pack.go b/logic/service/player/pack.go index 600ba0272..c9280a50e 100644 --- a/logic/service/player/pack.go +++ b/logic/service/player/pack.go @@ -7,7 +7,6 @@ import ( "encoding/binary" "encoding/hex" "sync" - "time" "context" @@ -15,7 +14,6 @@ import ( "fmt" "reflect" - "github.com/bruceshao/lockfree" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" "github.com/gogf/gf/v2/os/glog" @@ -89,8 +87,30 @@ func XORDecryptU(encryptedData []byte, key uint32) []byte { return result } +// 原 OnEvent 改为仅负责投递消息到通道 +func (cd *ClientData) OnEvent(data common.TomeeHeader) { + + // 非阻塞发送,避免通道满时阻塞 eventloop + select { + case cd.MsgChan <- data: + default: + cool.Logger.Error(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD) + // 通道满时的降级处理:记录日志,避免消息丢失(可选) + //cool.Logger.Warn(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD) + } +} + +// 消费消息的协程:处理业务逻辑,不阻塞 eventloop +func (cd *ClientData) consumeMsg() { + for header := range cd.MsgChan { + // 执行原有的 OnEvent 逻辑 + cd.handleBizLogic(header) + } +} + +// 重写 // 遍历结构体方法并执行RECV_cmd -func (h *ClientData) OnEvent(data common.TomeeHeader) { +func (h *ClientData) handleBizLogic(data common.TomeeHeader) { defer func() { if err := recover(); err != nil { // 恢复 panic,err 为 panic 错误值 // 1. 打印错误信息 @@ -199,8 +219,7 @@ func (h *ClientData) OnEvent(data common.TomeeHeader) { } t1 := data.Pack(ret[0].Interface()) - //cool.Loger.Debug(context.Background(), "发送数据_回包", data.UserID, data.CMD, ret[0].Interface(), hex.EncodeToString(t1)) - //data.Version = 49 + playerconn.SendPack(t1) } @@ -212,9 +231,9 @@ type ClientData struct { ERROR_CONNUT int Wsmsg *WsCodec Conn gnet.Conn - LF *lockfree.Lockfree[common.TomeeHeader] + //LF *gqueue.TQueue[common.TomeeHeader] //SaveL sync.Once //保存锁 - + MsgChan chan common.TomeeHeader //SaveDone chan struct{} } @@ -224,18 +243,33 @@ func NewClientData(c gnet.Conn) *ClientData { cd := &ClientData{ - Conn: c, - Wsmsg: &WsCodec{}, - } - cd.LF = lockfree.NewLockfree( - 8, - cd, - lockfree.NewSleepBlockStrategy(time.Millisecond), - ) - // // 启动Lockfree - if err := cd.LF.Start(); err != nil { - panic(err) + Conn: c, + Wsmsg: &WsCodec{}, + MsgChan: make(chan common.TomeeHeader, 100), } + //cd.LF = make(chan common.TomeeHeader, 8) + // cd.LF = gqueue.NewTQueue[common.TomeeHeader](1) + // go func() { + // for { + // select { + // case queueItem := <-cd.LF.C: + // cd.OnEvent(queueItem) + + // } + // } + + // }() + // cd.LF = lockfree.NewLockfree( + // 8, + // cd, + // lockfree.NewSleepBlockStrategy(time.Millisecond), + // ) + // // // 启动Lockfree + // if err := cd.LF.Start(); err != nil { + // panic(err) + // } + // 启动每个连接的独立消费协程 + go cd.consumeMsg() // // // 启动Lockfree // // if err := cd.LF.Start(); err != nil { // // panic(err) diff --git a/logic/service/player/rpc.go b/logic/service/player/rpc.go index 89364ac16..827e4a669 100644 --- a/logic/service/player/rpc.go +++ b/logic/service/player/rpc.go @@ -1,8 +1,25 @@ package player +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" +) + // rpc,跨服匹配的玩家,只做数据的传输 type RPC_player struct { baseplayer // serviceid uint32 //玩家所在的ID } + +func (f *RPC_player) SendPackCmd(cmd uint32, data any) { + + conn, _ := g.Redis("cool").Conn(context.TODO()) + + defer conn.Close(context.TODO()) + conn.Do(context.TODO(), "publish", "sun:send", cmd, data) + + //fmt.Println("战斗结束") + +}