package socket import ( "blazing/common/socket/codec" "blazing/cool" "blazing/logic/service/player" "blazing/modules/config/service" "bytes" "context" "encoding/binary" "errors" "log" "os" "sync/atomic" "time" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/panjf2000/gnet/v2" ) const ( minPacketLen = 17 maxPacketLen = 10 * 1024 ) func (s *Server) Boot(serverid, port uint32) error { // go s.bootws() s.serverid = serverid s.port = port err := gnet.Run(s, s.network+"://"+s.addr, gnet.WithMulticore(true), gnet.WithTicker(true), // 其他调优配置↓ gnet.WithTCPNoDelay(gnet.TCPNoDelay), // 禁用Nagle算法(降低延迟,适合小数据包场景) //gnet.WithReusePort(true), // 开启SO_REUSEPORT(多核下提升并发) //gnet.WithReadBufferCap(1024*64), // 读缓冲区64KB(根据业务调整,默认太小) // gnet.WithWriteBufferCap(1024*64), // 写缓冲区64KB //gnet.WithLockOSThread(true), // 绑定goroutine到OS线程(减少上下文切换) ) if err != nil { panic(err) } return nil } func (s *Server) Stop() error { _ = s.eng.Stop(context.Background()) s.workerPool.Release() return nil } func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) { defer func() { if err := recover(); err != nil { // 恢复 panic,err 为 panic 错误值 if t, ok := c.Context().(*player.ClientData); ok { if t.Player != nil { if t.Player.Info != nil { cool.Logger.Error(context.TODO(), "OnClose 错误:", cool.Config.ServerInfo.OnlineID, t.Player.Info.UserID, err) go t.Player.SaveOnDisconnect() } } } else { cool.Logger.Error(context.TODO(), "OnClose 错误:", cool.Config.ServerInfo.OnlineID, err) } } }() atomic.AddInt64(&cool.Connected, -1) v, _ := c.Context().(*player.ClientData) if v != nil { v.Close() if v.Player != nil { v.Player.Save() //保存玩家数据 } } return } func (s *Server) OnTick() (delay time.Duration, action gnet.Action) { g.Log().Async().Info(context.Background(), gtime.Now().ISO8601(), "服务器ID", cool.Config.ServerInfo.OnlineID, "链接数", atomic.LoadInt64(&cool.Connected)) if s.quit && atomic.LoadInt64(&cool.Connected) == 0 { os.Exit(0) } return 30 * time.Second, gnet.None } func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { s.eng = eng service.NewServerService().SetServerID(s.serverid, s.port) return gnet.None } func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action) { if s.network != "tcp" { return nil, gnet.Close } if conn.Context() == nil { conn.SetContext(player.NewClientData(conn)) } atomic.AddInt64(&cool.Connected, 1) return nil, gnet.None } func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { defer func() { if err := recover(); err != nil { if t, ok := c.Context().(*player.ClientData); ok { if t.Player != nil && t.Player.Info != nil { cool.Logger.Error(context.TODO(), "OnTraffic 错误:", cool.Config.ServerInfo.OnlineID, t.Player.Info.UserID, err) t.Player.Service.Info.Save(*t.Player.Info) } } } }() client := c.Context().(*player.ClientData) if s.discorse && !client.IsCrossDomainChecked() { handled, ready, action := handle(c) if action != gnet.None { return action } if handled { client.MarkCrossDomainChecked() return gnet.None } if !ready { return gnet.None } client.MarkCrossDomainChecked() } ws := client.Wsmsg if ws.Tcp { return s.handleTCP(c) } readAction, inboundLen := ws.ReadBufferBytes(c) if readAction == gnet.Close { return gnet.Close } state, action := ws.Upgrade(c) if action != gnet.None { return action } if state == player.UpgradeNeedMoreData { return gnet.None } if state == player.UpgradeUseTCP { return s.handleTCP(c) } if inboundLen > 0 { if _, err := c.Discard(inboundLen); err != nil { return gnet.Close } ws.ResetInboundMirror() } messages, err := ws.Decode(c) if err != nil { return gnet.Close } if messages == nil { return } for _, msg := range messages { if !s.onevent(c, msg.Payload) { return gnet.Close } } return gnet.None } func (s *Server) handleTCP(conn gnet.Conn) (action gnet.Action) { client := conn.Context().(*player.ClientData) if s.discorse && !client.IsCrossDomainChecked() { handled, ready, action := handle(conn) if action != gnet.None { return action } if !ready { return gnet.None } if handled { client.MarkCrossDomainChecked() return gnet.None } client.MarkCrossDomainChecked() } body, err := s.codec.Decode(conn) if err != nil { if errors.Is(err, codec.ErrIncompletePacket) { return gnet.None } return gnet.Close } if !s.onevent(conn, body) { return gnet.Close } if conn.InboundBuffered() > 0 { if err := conn.Wake(nil); err != nil { return gnet.Close } } return action } const CROSS_DOMAIN = "\x00" const TEXT = "\x00" func handle(c gnet.Conn) (handled bool, ready bool, action gnet.Action) { probeLen := c.InboundBuffered() if probeLen == 0 { return false, false, gnet.None } if probeLen > len(TEXT) { probeLen = len(TEXT) } data, err := c.Peek(probeLen) if err != nil { log.Printf("Error reading cross-domain request: %v", err) return false, false, gnet.Close } if !bytes.Equal(data, []byte(TEXT[:probeLen])) { return false, true, gnet.None } if probeLen < len(TEXT) { return false, false, gnet.None } if _, err := c.Write([]byte(CROSS_DOMAIN)); err != nil { return false, true, gnet.Close } if _, err := c.Discard(len(TEXT)); err != nil { return false, true, gnet.Close } return true, true, gnet.None } func (s *Server) onevent(c gnet.Conn, v []byte) bool { if !isValidPacket(v) { return false } if t, ok := c.Context().(*player.ClientData); ok { t.PushEvent(v, s.workerPool.Submit) } return true } func isValidPacket(v []byte) bool { if len(v) < minPacketLen || len(v) > maxPacketLen { return false } return binary.BigEndian.Uint32(v[0:4]) == uint32(len(v)) }