diff --git a/common/socket/ServerEvent.go b/common/socket/ServerEvent.go index 97bfb7ce6..9374db9e9 100644 --- a/common/socket/ServerEvent.go +++ b/common/socket/ServerEvent.go @@ -14,7 +14,6 @@ import ( "blazing/logic/service/player" "github.com/panjf2000/gnet/v2" - "github.com/panjf2000/gnet/v2/pkg/logging" ) func (s *Server) Boot() error { @@ -22,7 +21,6 @@ func (s *Server) Boot() error { err := gnet.Run(s, s.network+"://"+s.addr, gnet.WithMulticore(true), gnet.WithTicker(true), - gnet.WithTicker(true), // gnet.WithReusePort(true), // gnet.WithReuseAddr(true), @@ -76,7 +74,7 @@ func (s *Server) OnTick() (delay time.Duration, action gnet.Action) { //执行正常退出逻辑 os.Exit(0) } - return 10 * time.Second, gnet.None + return 30 * time.Second, gnet.None } func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { s.eng = eng @@ -88,7 +86,7 @@ func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action) { if conn.Context() == nil { - conn.SetContext(player.NewClientData()) //注入data + conn.SetContext(player.NewClientData(conn)) //注入data } atomic.AddInt64(&s.connected, 1) @@ -129,7 +127,7 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { } // fmt.Println(ws.Buf.Bytes()) - c.Read(make([]byte, len1)) + c.Discard(len1) if ws.Buf.Len() <= 0 { return gnet.None } @@ -144,14 +142,16 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { for _, msg := range messages { t := c.Context().(*player.ClientData) //client := conn.RemoteAddr().String() - _ = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 - t.Lf.Producer().Write(&player.RecvData{ - - Conn: c, - Data: msg.Payload, - }) + err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 + err = t.Lf.Producer().Write(msg.Payload) + if err != nil { + panic(err) + } }) + if err != nil { + fmt.Println("work2", err) + } } return gnet.None @@ -159,42 +159,41 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { func (s *Server) handleTcp(conn gnet.Conn) (action gnet.Action) { - if s.discorse && !conn.Context().(*player.ClientData).IsCrossDomain { - handle(conn) - } - conn.Context().(*player.ClientData).IsCrossDomain = true - data, err := s.codec.Decode(conn) - if err != nil { + for { + if s.discorse && !conn.Context().(*player.ClientData).IsCrossDomain { + handle(conn) + } + conn.Context().(*player.ClientData).IsCrossDomain = true + data, err := s.codec.Decode(conn) + + if err == codec.ErrIncompletePacket { + break + } + if err != nil { + + return gnet.Close - if err == codec.ErrIncompletePacket && conn.InboundBuffered() > 0 { - t, _ := conn.Peek(conn.InboundBuffered()) - cool.Loger.Debug(context.Background(), "断包", err.Error(), conn.InboundBuffered(), hex.EncodeToString(t)) - if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data - logging.Errorf("failed to wake up the connection, %v", err) - return gnet.Close - } - } else { - t, _ := conn.Peek(conn.InboundBuffered()) - cool.Loger.Debug(context.Background(), "数据错误", err.Error(), conn.InboundBuffered(), hex.EncodeToString(t)) - action = gnet.Close - return } - } - - if data != nil { + cool.Loger.Debug(context.Background(), "原始数据", hex.EncodeToString(data)) t := conn.Context().(*player.ClientData) - _ = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 - t.Lf.Producer().Write(&player.RecvData{ - - Conn: conn, - Data: data, - }) + err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 + err = t.Lf.Producer().Write(data) + if err != nil { + panic(err) + } }) + } + if conn.InboundBuffered() > 0 { + if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data + cool.Loger.Errorf(context.Background(), "failed to wake up the connection, %v", err) + return gnet.Close + } + } return action } diff --git a/common/socket/codec/SocketCodec_Tomee.go b/common/socket/codec/SocketCodec_Tomee.go index 94e5189ab..bd138e21a 100644 --- a/common/socket/codec/SocketCodec_Tomee.go +++ b/common/socket/codec/SocketCodec_Tomee.go @@ -1,6 +1,8 @@ package codec import ( + "blazing/cool" + "context" "encoding/binary" "errors" "io" @@ -58,6 +60,7 @@ func (codec TomeeSocketCodec) Decode(c gnet.Conn) ([]byte, error) { } bodyLen := binary.BigEndian.Uint32(lenBuf) + cool.Loger.Print(context.TODO(), "lenBuf", bodyLen) if bodyLen > maxBodyLen { return nil, errors.New("packet body exceeds max length") } diff --git a/logic/controller/login.go b/logic/controller/login.go index d352d12c0..7062a4f45 100644 --- a/logic/controller/login.go +++ b/logic/controller/login.go @@ -93,12 +93,12 @@ func (h *Controller) Login(data *user.MAIN_LOGIN_IN, c gnet.Conn) (result *user. copier.CopyWithOption(tt, t.Info, copier.Option{DeepCopy: true}) //copier.Copy(t.Info, tt) t1 := player.NewTomeeHeader(2001, t.Info.UserID) - space.GetSpace(t.Info.MapID).User.Set(t.Info.UserID, t) + space.GetSpace(t.Info.MapID).User.IterCb(func(playerID uint32, player common.PlayerI) { player.SendPack(t1.Pack(tt)) }) - + space.GetSpace(t.Info.MapID).User.Set(t.Info.UserID, t) }() return result, 0 diff --git a/logic/logic2 b/logic/logic2 new file mode 100644 index 000000000..c243b4a75 Binary files /dev/null and b/logic/logic2 differ diff --git a/logic/service/player/SocketHandler_Tomee.go b/logic/service/player/SocketHandler_Tomee.go index ea1f1346c..a84e17494 100644 --- a/logic/service/player/SocketHandler_Tomee.go +++ b/logic/service/player/SocketHandler_Tomee.go @@ -19,7 +19,7 @@ import ( // TomeeHeader 结构体字段定义 type TomeeHeader struct { Len uint32 `json:"len"` - Version string `json:"version" struc:"[1]byte"` + Version byte `json:"version" struc:"[1]byte"` CMD uint32 `json:"cmdId" struc:"uint32"` UserID uint32 `json:"userId"` //Error uint32 `json:"error" struc:"[0]pad"` @@ -34,7 +34,7 @@ func NewTomeeHeader(cmd uint32, userid uint32) *TomeeHeader { return &TomeeHeader{ CMD: cmd, // Len: 0, - Version: "7", + Version: 49, Result: 0, } @@ -85,7 +85,7 @@ func (h *TomeeHeader) Pack(data any) []byte { //组包 by := bytearray.CreateByteArray() by.WriteUInt32(h.Len) - by.WriteString(h.Version) + by.WriteByte(h.Version) by.WriteUInt32(uint32(h.CMD)) by.WriteUInt32(h.UserID) by.WriteUInt32(h.Result) @@ -186,7 +186,7 @@ func Recv(c gnet.Conn, data TomeeHeader) { } - data.Version = "7" + data.Version = 49 t.SendPack(data.Pack(ret[0].Interface())) } diff --git a/logic/service/player/player.go b/logic/service/player/player.go index fbb4c9e78..593a0fa28 100644 --- a/logic/service/player/player.go +++ b/logic/service/player/player.go @@ -53,16 +53,18 @@ type ClientData struct { Mu sync.Mutex ERROR_CONNUT int Wsmsg *WsCodec - Lf *lockfree.Lockfree[*RecvData] + Lf *lockfree.Lockfree[[]byte] CloseChan chan struct{} } -func NewClientData() *ClientData { +func NewClientData(c gnet.Conn) *ClientData { // 创建事件处理器 - handler := &eventHandler{} + handler := &eventHandler{ + Conn: c, + } // 创建消费端串行处理的Lockfree - lf := lockfree.NewLockfree[*RecvData]( + lf := lockfree.NewLockfree[[]byte]( 4096, handler, lockfree.NewSleepBlockStrategy(time.Millisecond), @@ -84,22 +86,23 @@ func NewClientData() *ClientData { type eventHandler struct { Callback func(conn gnet.Conn, data TomeeHeader) + Conn gnet.Conn } -func (h *eventHandler) OnEvent(v *RecvData) { +func (h *eventHandler) OnEvent(v []byte) { header := TomeeHeader{} - tempdata := bytearray.CreateByteArray(v.Data) + tempdata := bytearray.CreateByteArray(v) header.Len, _ = tempdata.ReadUInt32() - header.Version, _ = tempdata.ReadString(1) + header.Version, _ = tempdata.ReadByte() header.CMD, _ = tempdata.ReadUInt32() //header.CMD = cmd.EnumCommandID(_CMD) header.UserID, _ = tempdata.ReadUInt32() header.Result, _ = tempdata.ReadUInt32() header.Data = tempdata.BytesAvailable() - Recv(v.Conn, header) + Recv(h.Conn, header) //fmt.Println("接收封包", header) } @@ -350,7 +353,7 @@ func (p *Player) SendPack(b []byte) error { } } else { - _, err := p.MainConn.Write(b) + err := p.MainConn.AsyncWrite(b, nil) if err != nil { glog.Debug(context.Background(), err)