diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index b951f038e..1a1fb12c4 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -56,7 +56,7 @@ func (h *ServerHandler) RegisterLogic(ctx context.Context, id, port uint16) erro } t, _ := blservice.NewLoginServiceService().GetServerID(id) aa, ok := clientmap[t] - if ok { //如果已经存在且这个端口已经被存过 + if ok && aa != nil { //如果已经存在且这个端口已经被存过 aa.QuitSelf(0) } clientmap[port] = &revClient diff --git a/common/socket/ServerEvent.go b/common/socket/ServerEvent.go index 57f61d449..88cb6d769 100644 --- a/common/socket/ServerEvent.go +++ b/common/socket/ServerEvent.go @@ -27,9 +27,6 @@ func (s *Server) Boot() error { panic(err) } - // err := gnet.Run(s, s.network+"://"+s.addr, gnet.WithMulticore(s.multicore)) - cool.Loger.Debug(context.Background(), "server exits with error: %v", err) - // logging.Infof("server exits with error: %v", err) return nil } @@ -53,21 +50,14 @@ func (s *Server) OnClose(c gnet.Conn, _ error) (action gnet.Action) { atomic.AddInt64(&s.connected, -1) //logging.Infof("conn[%v] disconnected", c.RemoteAddr().String()) v, _ := c.Context().(*player.ClientData) - s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 + if v.Player != nil { + v.SaveL.Do(func() { //使用保存锁,确保在踢人和掉线的时候只保存一次 + //cool.Loger.Info(context.TODO(), "准备保存", v.Player.Info.UserID) + v.Player.Save() //保存玩家数据 - if v.Player != nil { - v.SaveL.Do(func() { //使用保存锁,确保在踢人和掉线的时候只保存一次 - //cool.Loger.Info(context.TODO(), "准备保存", v.Player.Info.UserID) - v.Player.Save() //保存玩家数据 - //cool.Loger.Info(context.TODO(), "保存完成", v.Player.Info.UserID) - if v.CloseChan != nil { - close(v.CloseChan) - } - }) + }) - } - - }) + } //} //关闭连接 @@ -90,6 +80,10 @@ func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { } func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action) { + if s.network != "tcp" { + return nil, gnet.Close + } + if conn.Context() == nil { conn.SetContext(player.NewClientData(conn)) //注入data } @@ -108,9 +102,6 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { } }() - if s.network != "tcp" { - return gnet.Close - } ws := c.Context().(*player.ClientData).Wsmsg if ws.Tcp { //升级失败时候防止缓冲区溢出 @@ -136,9 +127,7 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { } // fmt.Println(ws.Buf.Bytes()) c.Discard(len1) - if ws.Buf.Len() <= 0 { - return gnet.None - } + messages, err := ws.Decode(c) if err != nil { return gnet.Close @@ -151,9 +140,8 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { t := c.Context().(*player.ClientData) //client := conn.RemoteAddr().String() s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 - t.Mu.RLock() + t.OnEvent(msg.Payload) - t.Mu.RUnlock() }) @@ -163,33 +151,28 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { } func (s *Server) handleTcp(conn gnet.Conn) (action gnet.Action) { - if s.discorse && !conn.Context().(*player.ClientData).IsCrossDomain { + + conn.Context().(*player.ClientData).IsCrossDomain.Do(func() { //跨域检测 handle(conn) - } - conn.Context().(*player.ClientData).IsCrossDomain = true - for i := 0; i < s.batchRead; i++ { - - data, err := s.codec.Decode(conn) + }) + data, err := s.codec.Decode(conn) + if err != nil { if err == codec.ErrIncompletePacket { - break + if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data + cool.Loger.Errorf(context.Background(), "failed to wake up the connection, %v", err) + return gnet.Close + } } - if err != nil { - - return gnet.Close - - } - - //cool.Loger.Debug(context.Background(), "原始数据", hex.EncodeToString(data)) - t := conn.Context().(*player.ClientData) - - err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 - t.OnEvent(data) - - }) + return gnet.Close } + s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 + conn.Context().(*player.ClientData).OnEvent(data) + + }) + if conn.InboundBuffered() > 0 { if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data cool.Loger.Errorf(context.Background(), "failed to wake up the connection, %v", err) diff --git a/common/socket/codec/SocketCodec_Tomee.go b/common/socket/codec/SocketCodec_Tomee.go index 716690a3a..e8a954371 100644 --- a/common/socket/codec/SocketCodec_Tomee.go +++ b/common/socket/codec/SocketCodec_Tomee.go @@ -63,21 +63,11 @@ func (codec TomeeSocketCodec) Decode(c gnet.Conn) ([]byte, error) { return nil, errors.New("packet body exceeds max length") } - // 检查整个包是否完整 - buf, err := c.Peek(int(bodyLen)) - if err != nil { - if errors.Is(err, io.ErrShortBuffer) { - return nil, ErrIncompletePacket - } - return nil, err + if c.InboundBuffered() < int(bodyLen) { + return nil, ErrIncompletePacket } - // 提取包体 - body := make([]byte, bodyLen) - copy(body, buf) - - // 从缓冲区中丢弃已读取的数据 - _, _ = c.Discard(int(bodyLen)) + body, _ := c.Next(int(bodyLen)) return body, nil } diff --git a/common/utils/bytearray/bytearray.go b/common/utils/bytearray/bytearray.go index f7cb4407c..34f2e2f6a 100644 --- a/common/utils/bytearray/bytearray.go +++ b/common/utils/bytearray/bytearray.go @@ -27,7 +27,7 @@ var bufferpool = &sync.Pool{ } // CreateByteArray 创建一个新的ByteArray实例,使用指定的字节数组 -func CreateByteArray(bytes ...[]byte) *ByteArray { +func CreateByteArray(bytes []byte) *ByteArray { var ba *ByteArray if len(bytes) == 0 { //如果是0,则为新创建 ba = bufferpool.Get().(*ByteArray) @@ -35,10 +35,7 @@ func CreateByteArray(bytes ...[]byte) *ByteArray { ba = &ByteArray{endian: defaultEndian} } - for _, num := range bytes { - ba.buf = append(ba.buf, num...) - } - + ba.buf = append(ba.buf, bytes...) ba.ResetPos() return ba } diff --git a/logic/logic1 b/logic/logic1 new file mode 100644 index 000000000..ee7d53d21 Binary files /dev/null and b/logic/logic1 differ diff --git a/logic/service/fight/effect/NewSeIdx_1.go b/logic/service/fight/boss/NewSeIdx_1.go similarity index 100% rename from logic/service/fight/effect/NewSeIdx_1.go rename to logic/service/fight/boss/NewSeIdx_1.go diff --git a/logic/service/fight/fightc.go b/logic/service/fight/fightc.go index bbc715de5..87c51ea65 100644 --- a/logic/service/fight/fightc.go +++ b/logic/service/fight/fightc.go @@ -659,7 +659,7 @@ func (f *FightC) enterturn(fattack, sattack *action.SelectSkillAction) { ) if defender.CurrentPet.Info.Hp == 0 { // defender.AttackValue.SkillID = 0 - + //todo 解耦成战斗循环defer defender.CanChange = true //被打死就可以切精灵了 if f.IsWin(attacker, defender.CurrentPet.Info.CatchTime) { //然后检查是否战斗结束 var WinnerId uint32 diff --git a/logic/service/fight/input/input.go b/logic/service/fight/input/input.go index 295224bbb..985da9dd3 100644 --- a/logic/service/fight/input/input.go +++ b/logic/service/fight/input/input.go @@ -117,7 +117,6 @@ func (i *Input) Parseskill(defender *Input, skill *action.SelectSkillAction) { //这里是给双方添加buff if t != nil { t.SetArgs(i, temparg[:args]...) //设置入参,施加方永远是我方 - t.ID(v) if t.GetOwner() { //如果取反,说明是给对方添加的回合效果 //实际上,owner永远为反,说明是对方给我添加的 diff --git a/logic/service/fight/input/node.go b/logic/service/fight/input/node.go index ca1e8e59b..22fa26d8a 100644 --- a/logic/service/fight/input/node.go +++ b/logic/service/fight/input/node.go @@ -22,6 +22,7 @@ var EffectType = enum.New[struct { var NodeM = make(map[int]Effect, 0) func InitEffect(etype EnumEffectType, id int, t Effect) { + t.ID(id) //设置ID NodeM[id+int(etype)] = t } diff --git a/logic/service/player/SocketHandler_Tomee.go b/logic/service/player/SocketHandler_Tomee.go index a9e695d35..5df4164ac 100644 --- a/logic/service/player/SocketHandler_Tomee.go +++ b/logic/service/player/SocketHandler_Tomee.go @@ -87,7 +87,7 @@ func (h *TomeeHeader) Pack(data any) []byte { //组包 } h.Len = uint32(len(datar) + 17) - by := bytearray.CreateByteArray() + by := bytearray.CreateByteArray(nil) by.WriteUInt32(h.Len) by.WriteByte(h.Version) by.WriteUInt32(uint32(h.CMD)) @@ -147,7 +147,8 @@ func (h *ClientData) Recv(data TomeeHeader) { // fmt.Println(tt1) err := struc.Unpack(bytes.NewBuffer(data.Data), tt1) if err != nil { - fmt.Println(err) + cool.Loger.Error(context.Background(), data.UserID, data.CMD, "解包失败") + return } //fmt.Println(tt1) ptrValue1 := ptrValue.Elem().Addr() @@ -197,25 +198,24 @@ func (h *ClientData) Recv(data TomeeHeader) { } type ClientData struct { - IsCrossDomain bool //是否跨域过 - Player *Player //客户实体 - Mu sync.RWMutex - ERROR_CONNUT int - Wsmsg *WsCodec - Conn gnet.Conn - SaveL sync.Once //保存锁 + IsCrossDomain sync.Once //是否跨域过 + Player *Player //客户实体 + //Mu sync.RWMutex + ERROR_CONNUT int + Wsmsg *WsCodec + Conn gnet.Conn + SaveL sync.Once //保存锁 - CloseChan chan struct{} } func NewClientData(c gnet.Conn) *ClientData { // 创建事件处理器 cd := ClientData{ - IsCrossDomain: false, - Player: nil, - Conn: c, - Wsmsg: &WsCodec{}, + + Player: nil, + Conn: c, + Wsmsg: &WsCodec{}, } return &cd @@ -249,13 +249,10 @@ func (h *ClientData) OnEvent(v []byte) { } func (p *ClientData) SendPack(b []byte) error { - // if _, ok := p.MainConn.Context().(*ClientData); !ok { - // return fmt.Errorf("链接错误,取消发包") + if _, ok := p.Conn.Context().(*ClientData); !ok { + return fmt.Errorf("链接错误,取消发包") - // } - - p.Conn.Context().(*ClientData).Mu.Lock() - defer p.Conn.Context().(*ClientData).Mu.Unlock() + } if p.Conn.Context().(*ClientData).Wsmsg.Upgraded { // This is the echo server @@ -266,7 +263,7 @@ func (p *ClientData) SendPack(b []byte) error { } } else { - _, err := p.Conn.Write(b) + err := p.Conn.AsyncWrite(b, nil) if err != nil { glog.Debug(context.Background(), err) diff --git a/logic/service/player/player.go b/logic/service/player/player.go index 86a505760..7bc37a5a0 100644 --- a/logic/service/player/player.go +++ b/logic/service/player/player.go @@ -280,13 +280,14 @@ func (player1 *Player) Kick() { //实际上这里有个问题,会造成重复保存问题 player1.SendPack(head.Pack(nil)) - player1.MainConn.Context().(*ClientData).CloseChan = make(chan struct{}) - player1.MainConn.Context().(*ClientData).Mu.Lock() - player1.MainConn.Close() - player1.MainConn.Context().(*ClientData).Mu.Unlock() - // clientdata.Player = player + CloseChan := make(chan struct{}) - <-player1.MainConn.Context().(*ClientData).CloseChan + player1.MainConn.CloseWithCallback(func(c gnet.Conn, err error) error { + + close(CloseChan) + return nil + }) + <-CloseChan } }