diff --git a/common/data/entity/wscodec.go b/common/data/entity/wscodec.go index 1fcbe183..880cda2c 100644 --- a/common/data/entity/wscodec.go +++ b/common/data/entity/wscodec.go @@ -13,10 +13,10 @@ import ( ) type WsCodec struct { - upgraded bool // 链接是否升级 + Upgraded bool // 链接是否升级 Buf bytes.Buffer // 从实际socket中读取到的数据缓存 wsMsgBuf wsMessageBuf // ws 消息缓存 - Isinitws bool + //Isinitws bool } type wsMessageBuf struct { @@ -30,7 +30,7 @@ type readWrite struct { } func (w *WsCodec) Upgrade(c gnet.Conn) (ok bool, action gnet.Action) { - if w.upgraded { + if w.Upgraded { ok = true return } @@ -54,7 +54,7 @@ func (w *WsCodec) Upgrade(c gnet.Conn) (ok bool, action gnet.Action) { logging.Infof("conn[%v] upgrade websocket protocol! Handshake: %v", c.RemoteAddr().String(), hs) ok = true - w.upgraded = true + w.Upgraded = true return } func (w *WsCodec) ReadBufferBytes(c gnet.Conn) (gnet.Action, int) { diff --git a/common/socket/ServerEvent.go b/common/socket/ServerEvent.go index a8c8540f..f61726b8 100644 --- a/common/socket/ServerEvent.go +++ b/common/socket/ServerEvent.go @@ -2,14 +2,12 @@ package socket import ( "context" - "fmt" "log" "sync/atomic" "time" "blazing/common/data/entity" - "github.com/gobwas/ws/wsutil" "github.com/gogf/gf/v2/os/glog" "github.com/panjf2000/gnet/v2" "github.com/panjf2000/gnet/v2/pkg/logging" @@ -96,7 +94,7 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { s.handleTcp(c) return gnet.None } else { - fmt.Println(ws.Buf.Bytes()) + // fmt.Println(ws.Buf.Bytes()) c.Read(make([]byte, len1)) } @@ -110,19 +108,12 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { if messages == nil { return } - for _, message := range messages { - msgLen := len(message.Payload) - if msgLen > 128 { - logging.Infof("conn[%v] receive [op=%v] [msg=%v..., len=%d]", c.RemoteAddr().String(), message.OpCode, string(message.Payload[:128]), len(message.Payload)) - } else { - logging.Infof("conn[%v] receive [op=%v] [msg=%v, len=%d]", c.RemoteAddr().String(), message.OpCode, string(message.Payload), len(message.Payload)) - } - // This is the echo server - err = wsutil.WriteServerMessage(c, message.OpCode, message.Payload) - if err != nil { - logging.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error()) - return gnet.Close - } + + for _, msg := range messages { + //client := conn.RemoteAddr().String() + _ = s.workerPool.Submit(func() { + s.parser(c, msg.Payload) + }) } return gnet.None diff --git a/logic/controller/controller.go b/logic/controller/controller.go index d0424a8f..4127eaac 100644 --- a/logic/controller/controller.go +++ b/logic/controller/controller.go @@ -14,10 +14,13 @@ import ( "fmt" "reflect" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" "github.com/gogf/gf/v2/os/gcmd" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/gconv" "github.com/lunixbochs/struc" + "github.com/panjf2000/gnet/pkg/logging" "github.com/panjf2000/gnet/v2" ) @@ -201,9 +204,19 @@ func Recv(c gnet.Conn, data handler.TomeeHeader) { ret := cmdlister.Call(params) //return core.Pack(data.Handler.UserID, data.Handler.CMD, t1, 0) //core.Pack(c, cmd cmd.EnumCommandID, data any, iserror uint32) - _, err = c.Write(ret[0].Interface().([]byte)) - if err != nil { - fmt.Println(err) + if c.Context().(*entity.ClientData).Getwsmsg().Upgraded { + // This is the echo server + err = wsutil.WriteServerMessage(c, ws.OpBinary, ret[0].Interface().([]byte)) + if err != nil { + logging.Infof("conn[%v] [err=%v]", c.RemoteAddr().String(), err.Error()) + return + } + } else { + + _, err = c.Write(ret[0].Interface().([]byte)) + if err != nil { + fmt.Println(err) + } } }