feat(player): 优化客户端数据处理机制 重构ClientData的消息处理流程,将OnEvent方法改为非阻塞的通道投递模式, 新增MsgChan用于异步消息传递,避免eventloop阻塞问题。 fix(fight): 修复宠物闪光属性过滤条件 在initplayer方法中增加color.Alpha不为0的判断条件,确保只有有效的 闪光属性才会被添加到宠物信息中。 refactor(socket): 调整服务器事件处理逻辑 移除未使用的Lockfree库依赖,注释掉不再需要的连接关闭资源释放代码, 调整事件处理的工作池提交逻辑。 feat(rpc): 新增Redis发布功能 为RPC_player添加SendPackCmd方法,通过Redis的publish命令实现 跨服数据传输功能。 ```
This commit is contained in:
@@ -7,7 +7,6 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"context"
|
||||
|
||||
@@ -15,7 +14,6 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/bruceshao/lockfree"
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
@@ -89,8 +87,30 @@ func XORDecryptU(encryptedData []byte, key uint32) []byte {
|
||||
return result
|
||||
}
|
||||
|
||||
// 原 OnEvent 改为仅负责投递消息到通道
|
||||
func (cd *ClientData) OnEvent(data common.TomeeHeader) {
|
||||
|
||||
// 非阻塞发送,避免通道满时阻塞 eventloop
|
||||
select {
|
||||
case cd.MsgChan <- data:
|
||||
default:
|
||||
cool.Logger.Error(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD)
|
||||
// 通道满时的降级处理:记录日志,避免消息丢失(可选)
|
||||
//cool.Logger.Warn(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD)
|
||||
}
|
||||
}
|
||||
|
||||
// 消费消息的协程:处理业务逻辑,不阻塞 eventloop
|
||||
func (cd *ClientData) consumeMsg() {
|
||||
for header := range cd.MsgChan {
|
||||
// 执行原有的 OnEvent 逻辑
|
||||
cd.handleBizLogic(header)
|
||||
}
|
||||
}
|
||||
|
||||
// 重写
|
||||
// 遍历结构体方法并执行RECV_cmd
|
||||
func (h *ClientData) OnEvent(data common.TomeeHeader) {
|
||||
func (h *ClientData) handleBizLogic(data common.TomeeHeader) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil { // 恢复 panic,err 为 panic 错误值
|
||||
// 1. 打印错误信息
|
||||
@@ -199,8 +219,7 @@ func (h *ClientData) OnEvent(data common.TomeeHeader) {
|
||||
}
|
||||
|
||||
t1 := data.Pack(ret[0].Interface())
|
||||
//cool.Loger.Debug(context.Background(), "发送数据_回包", data.UserID, data.CMD, ret[0].Interface(), hex.EncodeToString(t1))
|
||||
//data.Version = 49
|
||||
|
||||
playerconn.SendPack(t1)
|
||||
|
||||
}
|
||||
@@ -212,9 +231,9 @@ type ClientData struct {
|
||||
ERROR_CONNUT int
|
||||
Wsmsg *WsCodec
|
||||
Conn gnet.Conn
|
||||
LF *lockfree.Lockfree[common.TomeeHeader]
|
||||
//LF *gqueue.TQueue[common.TomeeHeader]
|
||||
//SaveL sync.Once //保存锁
|
||||
|
||||
MsgChan chan common.TomeeHeader
|
||||
//SaveDone chan struct{}
|
||||
}
|
||||
|
||||
@@ -224,18 +243,33 @@ func NewClientData(c gnet.Conn) *ClientData {
|
||||
|
||||
cd := &ClientData{
|
||||
|
||||
Conn: c,
|
||||
Wsmsg: &WsCodec{},
|
||||
}
|
||||
cd.LF = lockfree.NewLockfree(
|
||||
8,
|
||||
cd,
|
||||
lockfree.NewSleepBlockStrategy(time.Millisecond),
|
||||
)
|
||||
// // 启动Lockfree
|
||||
if err := cd.LF.Start(); err != nil {
|
||||
panic(err)
|
||||
Conn: c,
|
||||
Wsmsg: &WsCodec{},
|
||||
MsgChan: make(chan common.TomeeHeader, 100),
|
||||
}
|
||||
//cd.LF = make(chan common.TomeeHeader, 8)
|
||||
// cd.LF = gqueue.NewTQueue[common.TomeeHeader](1)
|
||||
// go func() {
|
||||
// for {
|
||||
// select {
|
||||
// case queueItem := <-cd.LF.C:
|
||||
// cd.OnEvent(queueItem)
|
||||
|
||||
// }
|
||||
// }
|
||||
|
||||
// }()
|
||||
// cd.LF = lockfree.NewLockfree(
|
||||
// 8,
|
||||
// cd,
|
||||
// lockfree.NewSleepBlockStrategy(time.Millisecond),
|
||||
// )
|
||||
// // // 启动Lockfree
|
||||
// if err := cd.LF.Start(); err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// 启动每个连接的独立消费协程
|
||||
go cd.consumeMsg()
|
||||
// // // 启动Lockfree
|
||||
// // if err := cd.LF.Start(); err != nil {
|
||||
// // panic(err)
|
||||
|
||||
Reference in New Issue
Block a user