diff --git a/common/socket/ServerEvent.go b/common/socket/ServerEvent.go index df0814024..cd70138c0 100644 --- a/common/socket/ServerEvent.go +++ b/common/socket/ServerEvent.go @@ -18,6 +18,7 @@ import ( "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/panjf2000/gnet/v2" + "github.com/valyala/bytebufferpool" ) func (s *Server) Boot(serverid, port uint32) error { @@ -86,9 +87,9 @@ func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) { //logging.Infof("conn[%v] disconnected", c.RemoteAddr().String()) v, _ := c.Context().(*player.ClientData) - //v.LF.Close() + v.LF.Close() // v.LF.Close() - close(v.MsgChan) + //close(v.MsgChan) if v.Player != nil { v.Player.Save() //保存玩家数据 @@ -279,17 +280,16 @@ func (s *Server) onevent(c gnet.Conn, v []byte) { // 解析数据部分(17字节之后) // 数据部分:直接引用切片,避免 make if len(v) > 17 { - header.Data = make([]byte, len(v[17:])) - copy(header.Data, v[17:]) // 核心修改:拷贝数据 - } else { - header.Data = nil // 避免空切片分配 + header.Data = bytebufferpool.Get() + header.Data.Write(v[17:]) + //copy(header.Data, v[17:]) // 核心修改:拷贝数据 } - t.OnEvent(header) + //t.OnEvent(header) // t.LF.Push(header) - // s.workerPool.Submit(func() { - // t.LF.Push(header) - // // t.LF.Producer().Write(header) - // }) + s.workerPool.Submit(func() { + t.LF.Producer().Write(header) + // t.LF.Producer().Write(header) + }) } } diff --git a/logic/service/common/pack.go b/logic/service/common/pack.go index 931d122df..e356b5c56 100644 --- a/logic/service/common/pack.go +++ b/logic/service/common/pack.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/lunixbochs/struc" + "github.com/valyala/bytebufferpool" ) // TomeeHeader 结构体字段定义 @@ -16,8 +17,9 @@ type TomeeHeader struct { UserID uint32 `json:"userId"` //Error uint32 `json:"error" struc:"skip"` - Result uint32 `json:"result"` - Data []byte `json:"data" struc:"skip"` //组包忽略此字段// struc:"skip" + Result uint32 `json:"result"` + Data *bytebufferpool.ByteBuffer `json:"data" struc:"skip"` //组包忽略此字段// struc:"skip" + Res []byte `struc:"skip"` //Return []byte `struc:"skip"` //返回记录 } diff --git a/logic/service/player/pack.go b/logic/service/player/pack.go index cac6581d3..7eb97cc98 100644 --- a/logic/service/player/pack.go +++ b/logic/service/player/pack.go @@ -7,6 +7,7 @@ import ( "encoding/binary" "encoding/hex" "sync" + "time" "context" @@ -14,6 +15,7 @@ import ( "fmt" "reflect" + "github.com/bruceshao/lockfree" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" "github.com/gogf/gf/v2/os/glog" @@ -87,27 +89,15 @@ func XORDecryptU(encryptedData []byte, key uint32) []byte { return result } -// 原 OnEvent 改为仅负责投递消息到通道 -func (cd *ClientData) OnEvent(data common.TomeeHeader) { - - // 非阻塞发送,避免通道满时阻塞 eventloop - select { - case cd.MsgChan <- data: - default: - cool.Logger.Error(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD) - // 通道满时的降级处理:记录日志,避免消息丢失(可选) - //cool.Logger.Warn(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD) - } -} - -// 消费消息的协程:处理业务逻辑,不阻塞 eventloop -func (cd *ClientData) consumeMsg() { +// 重写 +// 遍历结构体方法并执行RECV_cmd +func (h *ClientData) OnEvent(data common.TomeeHeader) { defer func() { if err := recover(); err != nil { // 恢复 panic,err 为 panic 错误值 // 1. 打印错误信息 - if cd.Player != nil { - if cd.Player.Info != nil { - cool.Logger.Error(context.TODO(), "panic 错误:", cool.Config.ServerInfo.OnlineID, cd.Player.Info.UserID, err) + if h.Player != nil { + if h.Player.Info != nil { + cool.Logger.Error(context.TODO(), "panic 错误:", cool.Config.ServerInfo.OnlineID, h.Player.Info.UserID, err) } else { cool.Logger.Error(context.TODO(), "panic 错误:", cool.Config.ServerInfo.OnlineID, err) @@ -119,16 +109,6 @@ func (cd *ClientData) consumeMsg() { } }() - for header := range cd.MsgChan { - // 执行原有的 OnEvent 逻辑 - cd.handleBizLogic(header) - } -} - -// 重写 -// 遍历结构体方法并执行RECV_cmd -func (h *ClientData) handleBizLogic(data common.TomeeHeader) { - if data.CMD > 1001 { if h.Player == nil { @@ -140,9 +120,13 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) { return } if data.Data != nil { - data.Data = XORDecryptU(data.Data, h.Player.Hash) + defer bytebufferpool.Put(data.Data) + data.Res = XORDecryptU(data.Data.Bytes(), h.Player.Hash) } + } else { + defer bytebufferpool.Put(data.Data) + data.Res = data.Data.Bytes() } if cool.Config.ServerInfo.IsDebug != 0 { fmt.Println("接收数据", data.UserID, data.CMD) @@ -164,21 +148,19 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) { // 如果需要可设置的变量(用于修改值),创建指针并解引用 ptrValue := reflect.New(cmdlister.Req) - tt1 := ptrValue.Elem().Addr().Interface() - // fmt.Println(tt1) - err := struc.Unpack(bytes.NewBuffer(data.Data), tt1) - playerconn, cok := h.Conn.Context().(*ClientData) - if !cok { //如果链接断开,就返回 - return - } - if err != nil { + if data.Res != nil { + tt1 := ptrValue.Elem().Addr().Interface() + err := struc.Unpack(bytes.NewBuffer(data.Res), tt1) - cool.Logger.Error(context.Background(), data.UserID, data.CMD, "解包失败,", err, hex.EncodeToString(data.Data)) - //fmt.Println(data.UserID, data.CMD, "解包失败,", hex.EncodeToString(data.Data)) - data.Result = uint32(errorcode.ErrorCodes.ErrSystemProcessingError) - playerconn.SendPack(data.Pack(nil)) - return + if err != nil { + + cool.Logger.Error(context.Background(), data.UserID, data.CMD, "解包失败,", err, hex.EncodeToString(data.Res)) + //fmt.Println(data.UserID, data.CMD, "解包失败,", hex.EncodeToString(data.Data)) + data.Result = uint32(errorcode.ErrorCodes.ErrSystemProcessingError) + h.SendPack(data.Pack(nil)) + return + } } ptrValue1 := ptrValue.Elem().Addr() @@ -213,7 +195,7 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) { if ok && aa != 0 { //这里实现回复错误包 glog.Info(context.Background(), data.UserID, data.CMD, aa.Code()) - playerconn.SendPack(data.Pack(nil)) + h.SendPack(data.Pack(nil)) return @@ -221,7 +203,7 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) { t1 := data.Pack(ret[0].Interface()) - playerconn.SendPack(t1) + h.SendPack(t1) } @@ -232,9 +214,9 @@ type ClientData struct { ERROR_CONNUT int Wsmsg *WsCodec Conn gnet.Conn - //LF *gqueue.TQueue[common.TomeeHeader] + LF *lockfree.Lockfree[common.TomeeHeader] //SaveL sync.Once //保存锁 - MsgChan chan common.TomeeHeader + // MsgChan chan common.TomeeHeader //SaveDone chan struct{} } @@ -244,9 +226,9 @@ func NewClientData(c gnet.Conn) *ClientData { cd := &ClientData{ - Conn: c, - Wsmsg: &WsCodec{}, - MsgChan: make(chan common.TomeeHeader, 100), + Conn: c, + Wsmsg: &WsCodec{}, + // MsgChan: make(chan common.TomeeHeader, 100), } //cd.LF = make(chan common.TomeeHeader, 8) // cd.LF = gqueue.NewTQueue[common.TomeeHeader](1) @@ -260,17 +242,17 @@ func NewClientData(c gnet.Conn) *ClientData { // } // }() - // cd.LF = lockfree.NewLockfree( - // 8, - // cd, - // lockfree.NewSleepBlockStrategy(time.Millisecond), - // ) + cd.LF = lockfree.NewLockfree( + 8, + cd, + lockfree.NewSleepBlockStrategy(time.Millisecond), + ) // // // 启动Lockfree - // if err := cd.LF.Start(); err != nil { - // panic(err) - // } + if err := cd.LF.Start(); err != nil { + panic(err) + } // 启动每个连接的独立消费协程 - go cd.consumeMsg() + // go cd.consumeMsg() // // // 启动Lockfree // // if err := cd.LF.Start(); err != nil { // // panic(err)