```
All checks were successful
ci/woodpecker/push/my-first-workflow Pipeline was successful

feat(socket): 使用bytebufferpool优化内存分配并重构消息处理机制

引入bytebufferpool减少内存分配开销,在ServerEvent.go中修改数据处理逻辑,
将直接的数据拷贝改为使用缓冲池。同时移除原有的消息通道机制,改用lock
This commit is contained in:
昔念
2026-03-04 14:00:55 +08:00
parent 98c4caac68
commit fc8fc1ed8d
3 changed files with 55 additions and 71 deletions

View File

@@ -18,6 +18,7 @@ import (
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/panjf2000/gnet/v2"
"github.com/valyala/bytebufferpool"
)
func (s *Server) Boot(serverid, port uint32) error {
@@ -86,9 +87,9 @@ func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) {
//logging.Infof("conn[%v] disconnected", c.RemoteAddr().String())
v, _ := c.Context().(*player.ClientData)
//v.LF.Close()
v.LF.Close()
// v.LF.Close()
close(v.MsgChan)
//close(v.MsgChan)
if v.Player != nil {
v.Player.Save() //保存玩家数据
@@ -279,17 +280,16 @@ func (s *Server) onevent(c gnet.Conn, v []byte) {
// 解析数据部分17字节之后
// 数据部分:直接引用切片,避免 make
if len(v) > 17 {
header.Data = make([]byte, len(v[17:]))
copy(header.Data, v[17:]) // 核心修改:拷贝数据
} else {
header.Data = nil // 避免空切片分配
header.Data = bytebufferpool.Get()
header.Data.Write(v[17:])
//copy(header.Data, v[17:]) // 核心修改:拷贝数据
}
t.OnEvent(header)
//t.OnEvent(header)
// t.LF.Push(header)
// s.workerPool.Submit(func() {
// t.LF.Push(header)
// // t.LF.Producer().Write(header)
// })
s.workerPool.Submit(func() {
t.LF.Producer().Write(header)
// t.LF.Producer().Write(header)
})
}
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/lunixbochs/struc"
"github.com/valyala/bytebufferpool"
)
// TomeeHeader 结构体字段定义
@@ -16,8 +17,9 @@ type TomeeHeader struct {
UserID uint32 `json:"userId"`
//Error uint32 `json:"error" struc:"skip"`
Result uint32 `json:"result"`
Data []byte `json:"data" struc:"skip"` //组包忽略此字段// struc:"skip"
Result uint32 `json:"result"`
Data *bytebufferpool.ByteBuffer `json:"data" struc:"skip"` //组包忽略此字段// struc:"skip"
Res []byte `struc:"skip"`
//Return []byte `struc:"skip"` //返回记录
}

View File

@@ -7,6 +7,7 @@ import (
"encoding/binary"
"encoding/hex"
"sync"
"time"
"context"
@@ -14,6 +15,7 @@ import (
"fmt"
"reflect"
"github.com/bruceshao/lockfree"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/gogf/gf/v2/os/glog"
@@ -87,27 +89,15 @@ func XORDecryptU(encryptedData []byte, key uint32) []byte {
return result
}
// 原 OnEvent 改为仅负责投递消息到通道
func (cd *ClientData) OnEvent(data common.TomeeHeader) {
// 非阻塞发送,避免通道满时阻塞 eventloop
select {
case cd.MsgChan <- data:
default:
cool.Logger.Error(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD)
// 通道满时的降级处理:记录日志,避免消息丢失(可选)
//cool.Logger.Warn(context.TODO(), "消息通道已满,丢弃消息", data.UserID, data.CMD)
}
}
// 消费消息的协程:处理业务逻辑,不阻塞 eventloop
func (cd *ClientData) consumeMsg() {
// 重写
// 遍历结构体方法并执行RECV_cmd
func (h *ClientData) OnEvent(data common.TomeeHeader) {
defer func() {
if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值
// 1. 打印错误信息
if cd.Player != nil {
if cd.Player.Info != nil {
cool.Logger.Error(context.TODO(), "panic 错误:", cool.Config.ServerInfo.OnlineID, cd.Player.Info.UserID, err)
if h.Player != nil {
if h.Player.Info != nil {
cool.Logger.Error(context.TODO(), "panic 错误:", cool.Config.ServerInfo.OnlineID, h.Player.Info.UserID, err)
} else {
cool.Logger.Error(context.TODO(), "panic 错误:", cool.Config.ServerInfo.OnlineID, err)
@@ -119,16 +109,6 @@ func (cd *ClientData) consumeMsg() {
}
}()
for header := range cd.MsgChan {
// 执行原有的 OnEvent 逻辑
cd.handleBizLogic(header)
}
}
// 重写
// 遍历结构体方法并执行RECV_cmd
func (h *ClientData) handleBizLogic(data common.TomeeHeader) {
if data.CMD > 1001 {
if h.Player == nil {
@@ -140,9 +120,13 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) {
return
}
if data.Data != nil {
data.Data = XORDecryptU(data.Data, h.Player.Hash)
defer bytebufferpool.Put(data.Data)
data.Res = XORDecryptU(data.Data.Bytes(), h.Player.Hash)
}
} else {
defer bytebufferpool.Put(data.Data)
data.Res = data.Data.Bytes()
}
if cool.Config.ServerInfo.IsDebug != 0 {
fmt.Println("接收数据", data.UserID, data.CMD)
@@ -164,21 +148,19 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) {
// 如果需要可设置的变量(用于修改值),创建指针并解引用
ptrValue := reflect.New(cmdlister.Req)
tt1 := ptrValue.Elem().Addr().Interface()
// fmt.Println(tt1)
err := struc.Unpack(bytes.NewBuffer(data.Data), tt1)
playerconn, cok := h.Conn.Context().(*ClientData)
if !cok { //如果链接断开,就返回
return
}
if err != nil {
if data.Res != nil {
tt1 := ptrValue.Elem().Addr().Interface()
err := struc.Unpack(bytes.NewBuffer(data.Res), tt1)
cool.Logger.Error(context.Background(), data.UserID, data.CMD, "解包失败,", err, hex.EncodeToString(data.Data))
//fmt.Println(data.UserID, data.CMD, "解包失败,", hex.EncodeToString(data.Data))
data.Result = uint32(errorcode.ErrorCodes.ErrSystemProcessingError)
playerconn.SendPack(data.Pack(nil))
return
if err != nil {
cool.Logger.Error(context.Background(), data.UserID, data.CMD, "解包失败,", err, hex.EncodeToString(data.Res))
//fmt.Println(data.UserID, data.CMD, "解包失败,", hex.EncodeToString(data.Data))
data.Result = uint32(errorcode.ErrorCodes.ErrSystemProcessingError)
h.SendPack(data.Pack(nil))
return
}
}
ptrValue1 := ptrValue.Elem().Addr()
@@ -213,7 +195,7 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) {
if ok && aa != 0 { //这里实现回复错误包
glog.Info(context.Background(), data.UserID, data.CMD, aa.Code())
playerconn.SendPack(data.Pack(nil))
h.SendPack(data.Pack(nil))
return
@@ -221,7 +203,7 @@ func (h *ClientData) handleBizLogic(data common.TomeeHeader) {
t1 := data.Pack(ret[0].Interface())
playerconn.SendPack(t1)
h.SendPack(t1)
}
@@ -232,9 +214,9 @@ type ClientData struct {
ERROR_CONNUT int
Wsmsg *WsCodec
Conn gnet.Conn
//LF *gqueue.TQueue[common.TomeeHeader]
LF *lockfree.Lockfree[common.TomeeHeader]
//SaveL sync.Once //保存锁
MsgChan chan common.TomeeHeader
// MsgChan chan common.TomeeHeader
//SaveDone chan struct{}
}
@@ -244,9 +226,9 @@ func NewClientData(c gnet.Conn) *ClientData {
cd := &ClientData{
Conn: c,
Wsmsg: &WsCodec{},
MsgChan: make(chan common.TomeeHeader, 100),
Conn: c,
Wsmsg: &WsCodec{},
// MsgChan: make(chan common.TomeeHeader, 100),
}
//cd.LF = make(chan common.TomeeHeader, 8)
// cd.LF = gqueue.NewTQueue[common.TomeeHeader](1)
@@ -260,17 +242,17 @@ func NewClientData(c gnet.Conn) *ClientData {
// }
// }()
// cd.LF = lockfree.NewLockfree(
// 8,
// cd,
// lockfree.NewSleepBlockStrategy(time.Millisecond),
// )
cd.LF = lockfree.NewLockfree(
8,
cd,
lockfree.NewSleepBlockStrategy(time.Millisecond),
)
// // // 启动Lockfree
// if err := cd.LF.Start(); err != nil {
// panic(err)
// }
if err := cd.LF.Start(); err != nil {
panic(err)
}
// 启动每个连接的独立消费协程
go cd.consumeMsg()
// go cd.consumeMsg()
// // // 启动Lockfree
// // if err := cd.LF.Start(); err != nil {
// // panic(err)