refactor: 优化代码结构和逻辑
This commit is contained in:
@@ -20,7 +20,6 @@ import (
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/lunixbochs/struc"
|
||||
"github.com/panjf2000/gnet/v2"
|
||||
"github.com/valyala/bytebufferpool"
|
||||
)
|
||||
|
||||
// getUnderlyingValue 递归解析reflect.Value,解包指针、interface{}到底层具体类型
|
||||
@@ -48,44 +47,73 @@ func getUnderlyingValue(val reflect.Value) (reflect.Value, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// XORDecryptU 优化后的异或解密:减少内存分配,支持复用缓冲区
|
||||
// XORDecryptU 基于bytebufferpool优化的异或解密函数
|
||||
// 保留原有接口,无侵入式优化,高频调用下大幅减少内存分配和GC
|
||||
// XORDecryptU 原地执行异或解密,避免额外分配和拷贝。
|
||||
func XORDecryptU(encryptedData []byte, key uint32) []byte {
|
||||
if len(encryptedData) == 0 {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
// 1. 栈上分配密钥字节数组(无GC压力,保留原优化)
|
||||
var keyBytes [4]byte
|
||||
binary.BigEndian.PutUint32(keyBytes[:], key)
|
||||
keyLen := len(keyBytes)
|
||||
|
||||
// 2. 从bytebufferpool获取池化缓冲区(替代make分配)
|
||||
buf := bytebufferpool.Get()
|
||||
defer bytebufferpool.Put(buf) // 函数结束自动归还缓冲区到池
|
||||
|
||||
// 3. 调整缓冲区长度,匹配待解密数据(避免扩容)
|
||||
buf.B = buf.B[:0] // 清空原有数据,保留底层数组
|
||||
if cap(buf.B) < len(encryptedData) {
|
||||
// 若缓冲区容量不足,直接扩容(bytebufferpool会自动管理)
|
||||
buf.B = make([]byte, len(encryptedData))
|
||||
} else {
|
||||
// 容量足够,直接调整长度
|
||||
buf.B = buf.B[:len(encryptedData)]
|
||||
}
|
||||
|
||||
// 4. 核心异或解密逻辑(直接操作buf.B,无额外内存分配)
|
||||
decrypted := buf.B
|
||||
for i, b := range encryptedData {
|
||||
decrypted[i] = b ^ keyBytes[i%keyLen]
|
||||
encryptedData[i] = b ^ keyBytes[i%keyLen]
|
||||
}
|
||||
|
||||
// 5. 拷贝结果(关键:避免返回池化缓冲区,防止被后续调用覆盖)
|
||||
result := make([]byte, len(decrypted))
|
||||
copy(result, decrypted)
|
||||
return encryptedData
|
||||
}
|
||||
|
||||
return result
|
||||
var packetDataPoolSizes = [...]int{64, 128, 256, 512, 1024, 2048, 4096}
|
||||
|
||||
var packetDataPools = func() []sync.Pool {
|
||||
pools := make([]sync.Pool, len(packetDataPoolSizes))
|
||||
for i, size := range packetDataPoolSizes {
|
||||
size := size
|
||||
pools[i].New = func() any {
|
||||
return make([]byte, size)
|
||||
}
|
||||
}
|
||||
return pools
|
||||
}()
|
||||
|
||||
func getPacketData(size int) []byte {
|
||||
if size <= 0 {
|
||||
return nil
|
||||
}
|
||||
for i, poolSize := range packetDataPoolSizes {
|
||||
if size <= poolSize {
|
||||
return packetDataPools[i].Get().([]byte)[:size]
|
||||
}
|
||||
}
|
||||
return make([]byte, size)
|
||||
}
|
||||
|
||||
func putPacketData(buf []byte) {
|
||||
if buf == nil {
|
||||
return
|
||||
}
|
||||
for i, poolSize := range packetDataPoolSizes {
|
||||
if cap(buf) == poolSize {
|
||||
packetDataPools[i].Put(buf[:poolSize])
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ClientData) PushEvent(v []byte, submit func(task func()) error) {
|
||||
var header common.TomeeHeader
|
||||
header.Len = binary.BigEndian.Uint32(v[0:4])
|
||||
header.CMD = binary.BigEndian.Uint32(v[5:9])
|
||||
header.UserID = binary.BigEndian.Uint32(v[9:13])
|
||||
if dataLen := len(v) - 17; dataLen > 0 {
|
||||
header.Data = getPacketData(dataLen)
|
||||
copy(header.Data, v[17:])
|
||||
}
|
||||
|
||||
_ = submit(func() {
|
||||
h.LF.Producer().Write(header)
|
||||
})
|
||||
}
|
||||
|
||||
// 重写
|
||||
@@ -119,13 +147,13 @@ func (h *ClientData) OnEvent(data common.TomeeHeader) {
|
||||
return
|
||||
}
|
||||
if data.Data != nil {
|
||||
defer bytebufferpool.Put(data.Data)
|
||||
data.Res = XORDecryptU(data.Data.Bytes(), h.Player.Hash)
|
||||
defer putPacketData(data.Data)
|
||||
data.Res = XORDecryptU(data.Data, h.Player.Hash)
|
||||
}
|
||||
|
||||
} else {
|
||||
defer bytebufferpool.Put(data.Data)
|
||||
data.Res = data.Data.Bytes()
|
||||
} else if data.Data != nil {
|
||||
defer putPacketData(data.Data)
|
||||
data.Res = data.Data
|
||||
}
|
||||
if cool.Config.ServerInfo.IsDebug != 0 {
|
||||
fmt.Println("接收数据", data.UserID, data.CMD)
|
||||
@@ -172,7 +200,7 @@ func (h *ClientData) OnEvent(data common.TomeeHeader) {
|
||||
|
||||
// fmt.Println(data.CMD, "接收 变量的地址 ", &t.Info, t.Info.UserID)
|
||||
|
||||
params = append(params, ptrValue1, reflect.ValueOf(h.Conn.Context().(*ClientData).Player))
|
||||
params = append(params, ptrValue1, reflect.ValueOf(h.Player))
|
||||
} else {
|
||||
|
||||
params = append(params, ptrValue1, reflect.ValueOf(h.Conn))
|
||||
@@ -209,14 +237,10 @@ func (h *ClientData) OnEvent(data common.TomeeHeader) {
|
||||
type ClientData struct {
|
||||
IsCrossDomain sync.Once //是否跨域过
|
||||
Player *Player //客户实体
|
||||
//Mu sync.RWMutex
|
||||
ERROR_CONNUT int
|
||||
Wsmsg *WsCodec
|
||||
Conn gnet.Conn
|
||||
LF *lockfree.Lockfree[common.TomeeHeader]
|
||||
//SaveL sync.Once //保存锁
|
||||
// MsgChan chan common.TomeeHeader
|
||||
//SaveDone chan struct{}
|
||||
ERROR_CONNUT int
|
||||
Wsmsg *WsCodec
|
||||
Conn gnet.Conn
|
||||
LF *lockfree.Lockfree[common.TomeeHeader]
|
||||
}
|
||||
|
||||
func (p *ClientData) GetPlayer(userid uint32) *Player { //TODO 这里待优化,可能存在内存泄漏问题
|
||||
@@ -232,42 +256,18 @@ func (p *ClientData) GetPlayer(userid uint32) *Player { //TODO 这里待优化,
|
||||
// return nil
|
||||
}
|
||||
func NewClientData(c gnet.Conn) *ClientData {
|
||||
// 创建事件处理器
|
||||
// 创建消费端串行处理的Lockfree
|
||||
|
||||
cd := &ClientData{
|
||||
|
||||
Conn: c,
|
||||
Wsmsg: &WsCodec{},
|
||||
// MsgChan: make(chan common.TomeeHeader, 100),
|
||||
}
|
||||
//cd.LF = make(chan common.TomeeHeader, 8)
|
||||
// cd.LF = gqueue.NewTQueue[common.TomeeHeader](1)
|
||||
// go func() {
|
||||
// for {
|
||||
// select {
|
||||
// case queueItem := <-cd.LF.C:
|
||||
// cd.OnEvent(queueItem)
|
||||
|
||||
// }
|
||||
// }
|
||||
|
||||
// }()
|
||||
cd.LF = lockfree.NewLockfree(
|
||||
8,
|
||||
cd,
|
||||
lockfree.NewConditionBlockStrategy(),
|
||||
)
|
||||
// // // 启动Lockfree
|
||||
if err := cd.LF.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// 启动每个连接的独立消费协程
|
||||
// go cd.consumeMsg()
|
||||
// // // 启动Lockfree
|
||||
// // if err := cd.LF.Start(); err != nil {
|
||||
// // panic(err)
|
||||
// // }
|
||||
return cd
|
||||
|
||||
}
|
||||
@@ -305,16 +305,11 @@ func XORDecrypt(encryptedData []byte, keyStr string) []byte {
|
||||
}
|
||||
|
||||
func (p *ClientData) SendPack(b []byte) error {
|
||||
cli, ok := p.Conn.Context().(*ClientData)
|
||||
if !ok {
|
||||
return fmt.Errorf("链接错误,取消发包")
|
||||
|
||||
}
|
||||
if cli.Wsmsg == nil {
|
||||
if p.Wsmsg == nil {
|
||||
return fmt.Errorf("ws空")
|
||||
}
|
||||
|
||||
if cli.Wsmsg.Upgraded {
|
||||
if p.Wsmsg.Upgraded {
|
||||
// This is the echo server
|
||||
wsutil.WriteServerMessage(p.Conn, ws.OpBinary, b)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user