feat(socket): 优化服务器事件处理逻辑并修复数据解码问题

- 移除重复的 `gnet.WithTicker(true)` 配置项
- 调整 `OnTick` 的执行间隔从 10 秒延长至 30 秒
- 更新 `NewClientData` 方法以传入连接对象,用于后续消息处理
- 将 `c.Read` 替换为 `c.Discard` 以正确丢弃已读数据
- 改进数据包处理逻辑,增强对不完整包的处理能力
- 修正 `TomeeHeader.Version` 类型由 string 转为 byte,并更新相关读写操作
- 在消息处理中增加错误日志打印
This commit is contained in:
2025-10-28 02:28:15 +08:00
parent d1b2f8844a
commit ec082db71d
6 changed files with 58 additions and 53 deletions

View File

@@ -14,7 +14,6 @@ import (
"blazing/logic/service/player" "blazing/logic/service/player"
"github.com/panjf2000/gnet/v2" "github.com/panjf2000/gnet/v2"
"github.com/panjf2000/gnet/v2/pkg/logging"
) )
func (s *Server) Boot() error { func (s *Server) Boot() error {
@@ -22,7 +21,6 @@ func (s *Server) Boot() error {
err := gnet.Run(s, s.network+"://"+s.addr, err := gnet.Run(s, s.network+"://"+s.addr,
gnet.WithMulticore(true), gnet.WithMulticore(true),
gnet.WithTicker(true), gnet.WithTicker(true),
gnet.WithTicker(true),
// gnet.WithReusePort(true), // gnet.WithReusePort(true),
// gnet.WithReuseAddr(true), // gnet.WithReuseAddr(true),
@@ -76,7 +74,7 @@ func (s *Server) OnTick() (delay time.Duration, action gnet.Action) {
//执行正常退出逻辑 //执行正常退出逻辑
os.Exit(0) os.Exit(0)
} }
return 10 * time.Second, gnet.None return 30 * time.Second, gnet.None
} }
func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {
s.eng = eng s.eng = eng
@@ -88,7 +86,7 @@ func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {
func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action) { func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action) {
if conn.Context() == nil { if conn.Context() == nil {
conn.SetContext(player.NewClientData()) //注入data conn.SetContext(player.NewClientData(conn)) //注入data
} }
atomic.AddInt64(&s.connected, 1) atomic.AddInt64(&s.connected, 1)
@@ -129,7 +127,7 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) {
} }
// fmt.Println(ws.Buf.Bytes()) // fmt.Println(ws.Buf.Bytes())
c.Read(make([]byte, len1)) c.Discard(len1)
if ws.Buf.Len() <= 0 { if ws.Buf.Len() <= 0 {
return gnet.None return gnet.None
} }
@@ -144,14 +142,16 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) {
for _, msg := range messages { for _, msg := range messages {
t := c.Context().(*player.ClientData) t := c.Context().(*player.ClientData)
//client := conn.RemoteAddr().String() //client := conn.RemoteAddr().String()
_ = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
t.Lf.Producer().Write(&player.RecvData{ err = t.Lf.Producer().Write(msg.Payload)
if err != nil {
Conn: c, panic(err)
Data: msg.Payload, }
})
}) })
if err != nil {
fmt.Println("work2", err)
}
} }
return gnet.None return gnet.None
@@ -159,42 +159,41 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) {
func (s *Server) handleTcp(conn gnet.Conn) (action gnet.Action) { func (s *Server) handleTcp(conn gnet.Conn) (action gnet.Action) {
if s.discorse && !conn.Context().(*player.ClientData).IsCrossDomain { for {
handle(conn) if s.discorse && !conn.Context().(*player.ClientData).IsCrossDomain {
} handle(conn)
conn.Context().(*player.ClientData).IsCrossDomain = true }
data, err := s.codec.Decode(conn) conn.Context().(*player.ClientData).IsCrossDomain = true
if err != nil { data, err := s.codec.Decode(conn)
if err == codec.ErrIncompletePacket {
break
}
if err != nil {
return gnet.Close
if err == codec.ErrIncompletePacket && conn.InboundBuffered() > 0 {
t, _ := conn.Peek(conn.InboundBuffered())
cool.Loger.Debug(context.Background(), "断包", err.Error(), conn.InboundBuffered(), hex.EncodeToString(t))
if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data
logging.Errorf("failed to wake up the connection, %v", err)
return gnet.Close
}
} else {
t, _ := conn.Peek(conn.InboundBuffered())
cool.Loger.Debug(context.Background(), "数据错误", err.Error(), conn.InboundBuffered(), hex.EncodeToString(t))
action = gnet.Close
return
} }
} cool.Loger.Debug(context.Background(), "原始数据", hex.EncodeToString(data))
if data != nil {
t := conn.Context().(*player.ClientData) t := conn.Context().(*player.ClientData)
_ = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
t.Lf.Producer().Write(&player.RecvData{ err = t.Lf.Producer().Write(data)
if err != nil {
Conn: conn, panic(err)
Data: data, }
})
}) })
} }
if conn.InboundBuffered() > 0 {
if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data
cool.Loger.Errorf(context.Background(), "failed to wake up the connection, %v", err)
return gnet.Close
}
}
return action return action
} }

View File

@@ -1,6 +1,8 @@
package codec package codec
import ( import (
"blazing/cool"
"context"
"encoding/binary" "encoding/binary"
"errors" "errors"
"io" "io"
@@ -58,6 +60,7 @@ func (codec TomeeSocketCodec) Decode(c gnet.Conn) ([]byte, error) {
} }
bodyLen := binary.BigEndian.Uint32(lenBuf) bodyLen := binary.BigEndian.Uint32(lenBuf)
cool.Loger.Print(context.TODO(), "lenBuf", bodyLen)
if bodyLen > maxBodyLen { if bodyLen > maxBodyLen {
return nil, errors.New("packet body exceeds max length") return nil, errors.New("packet body exceeds max length")
} }

View File

@@ -93,12 +93,12 @@ func (h *Controller) Login(data *user.MAIN_LOGIN_IN, c gnet.Conn) (result *user.
copier.CopyWithOption(tt, t.Info, copier.Option{DeepCopy: true}) copier.CopyWithOption(tt, t.Info, copier.Option{DeepCopy: true})
//copier.Copy(t.Info, tt) //copier.Copy(t.Info, tt)
t1 := player.NewTomeeHeader(2001, t.Info.UserID) t1 := player.NewTomeeHeader(2001, t.Info.UserID)
space.GetSpace(t.Info.MapID).User.Set(t.Info.UserID, t)
space.GetSpace(t.Info.MapID).User.IterCb(func(playerID uint32, player common.PlayerI) { space.GetSpace(t.Info.MapID).User.IterCb(func(playerID uint32, player common.PlayerI) {
player.SendPack(t1.Pack(tt)) player.SendPack(t1.Pack(tt))
}) })
space.GetSpace(t.Info.MapID).User.Set(t.Info.UserID, t)
}() }()
return result, 0 return result, 0

BIN
logic/logic2 Normal file

Binary file not shown.

View File

@@ -19,7 +19,7 @@ import (
// TomeeHeader 结构体字段定义 // TomeeHeader 结构体字段定义
type TomeeHeader struct { type TomeeHeader struct {
Len uint32 `json:"len"` Len uint32 `json:"len"`
Version string `json:"version" struc:"[1]byte"` Version byte `json:"version" struc:"[1]byte"`
CMD uint32 `json:"cmdId" struc:"uint32"` CMD uint32 `json:"cmdId" struc:"uint32"`
UserID uint32 `json:"userId"` UserID uint32 `json:"userId"`
//Error uint32 `json:"error" struc:"[0]pad"` //Error uint32 `json:"error" struc:"[0]pad"`
@@ -34,7 +34,7 @@ func NewTomeeHeader(cmd uint32, userid uint32) *TomeeHeader {
return &TomeeHeader{ return &TomeeHeader{
CMD: cmd, CMD: cmd,
// Len: 0, // Len: 0,
Version: "7", Version: 49,
Result: 0, Result: 0,
} }
@@ -85,7 +85,7 @@ func (h *TomeeHeader) Pack(data any) []byte { //组包
by := bytearray.CreateByteArray() by := bytearray.CreateByteArray()
by.WriteUInt32(h.Len) by.WriteUInt32(h.Len)
by.WriteString(h.Version) by.WriteByte(h.Version)
by.WriteUInt32(uint32(h.CMD)) by.WriteUInt32(uint32(h.CMD))
by.WriteUInt32(h.UserID) by.WriteUInt32(h.UserID)
by.WriteUInt32(h.Result) by.WriteUInt32(h.Result)
@@ -186,7 +186,7 @@ func Recv(c gnet.Conn, data TomeeHeader) {
} }
data.Version = "7" data.Version = 49
t.SendPack(data.Pack(ret[0].Interface())) t.SendPack(data.Pack(ret[0].Interface()))
} }

View File

@@ -53,16 +53,18 @@ type ClientData struct {
Mu sync.Mutex Mu sync.Mutex
ERROR_CONNUT int ERROR_CONNUT int
Wsmsg *WsCodec Wsmsg *WsCodec
Lf *lockfree.Lockfree[*RecvData] Lf *lockfree.Lockfree[[]byte]
CloseChan chan struct{} CloseChan chan struct{}
} }
func NewClientData() *ClientData { func NewClientData(c gnet.Conn) *ClientData {
// 创建事件处理器 // 创建事件处理器
handler := &eventHandler{} handler := &eventHandler{
Conn: c,
}
// 创建消费端串行处理的Lockfree // 创建消费端串行处理的Lockfree
lf := lockfree.NewLockfree[*RecvData]( lf := lockfree.NewLockfree[[]byte](
4096, 4096,
handler, handler,
lockfree.NewSleepBlockStrategy(time.Millisecond), lockfree.NewSleepBlockStrategy(time.Millisecond),
@@ -84,22 +86,23 @@ func NewClientData() *ClientData {
type eventHandler struct { type eventHandler struct {
Callback func(conn gnet.Conn, data TomeeHeader) Callback func(conn gnet.Conn, data TomeeHeader)
Conn gnet.Conn
} }
func (h *eventHandler) OnEvent(v *RecvData) { func (h *eventHandler) OnEvent(v []byte) {
header := TomeeHeader{} header := TomeeHeader{}
tempdata := bytearray.CreateByteArray(v.Data) tempdata := bytearray.CreateByteArray(v)
header.Len, _ = tempdata.ReadUInt32() header.Len, _ = tempdata.ReadUInt32()
header.Version, _ = tempdata.ReadString(1) header.Version, _ = tempdata.ReadByte()
header.CMD, _ = tempdata.ReadUInt32() header.CMD, _ = tempdata.ReadUInt32()
//header.CMD = cmd.EnumCommandID(_CMD) //header.CMD = cmd.EnumCommandID(_CMD)
header.UserID, _ = tempdata.ReadUInt32() header.UserID, _ = tempdata.ReadUInt32()
header.Result, _ = tempdata.ReadUInt32() header.Result, _ = tempdata.ReadUInt32()
header.Data = tempdata.BytesAvailable() header.Data = tempdata.BytesAvailable()
Recv(v.Conn, header) Recv(h.Conn, header)
//fmt.Println("接收封包", header) //fmt.Println("接收封包", header)
} }
@@ -350,7 +353,7 @@ func (p *Player) SendPack(b []byte) error {
} }
} else { } else {
_, err := p.MainConn.Write(b) err := p.MainConn.AsyncWrite(b, nil)
if err != nil { if err != nil {
glog.Debug(context.Background(), err) glog.Debug(context.Background(), err)