refactor(socket): 移除 Lockfree 依赖并优化事件处理逻辑

将原有的基于 Lockfree 的消息队列机制移除,改为直接在协程池中调用客户端数据处理器。
同时调整了 `ClientData` 结构体和相关方法实现,使代码更简洁、易于维护。
此外,注释掉了一处调试日志输出,并修正了获取玩家对象的方式。
```
This commit is contained in:
2025-10-29 23:51:03 +08:00
parent 95b2a3cd41
commit fdd55ed99b
3 changed files with 9 additions and 37 deletions

View File

@@ -56,8 +56,6 @@ func (s *Server) OnClose(c gnet.Conn, _ error) (action gnet.Action) {
v, _ := c.Context().(*player.ClientData) v, _ := c.Context().(*player.ClientData)
s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
v.Lf.Close() //关闭lockfree
if v.Player != nil { if v.Player != nil {
//cool.Loger.Info(context.TODO(), "准备保存", v.Player.Info.UserID) //cool.Loger.Info(context.TODO(), "准备保存", v.Player.Info.UserID)
v.Player.Save() //保存玩家数据 v.Player.Save() //保存玩家数据
@@ -152,10 +150,7 @@ func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) {
t := c.Context().(*player.ClientData) t := c.Context().(*player.ClientData)
//client := conn.RemoteAddr().String() //client := conn.RemoteAddr().String()
err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
err = t.Lf.Producer().Write(msg.Payload) t.OnEvent(msg.Payload)
if err != nil {
panic(err)
}
}) })
@@ -186,10 +181,7 @@ func (s *Server) handleTcp(conn gnet.Conn) (action gnet.Action) {
t := conn.Context().(*player.ClientData) t := conn.Context().(*player.ClientData)
err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复 err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
err = t.Lf.Producer().Write(data) t.OnEvent(data)
if err != nil {
panic(err)
}
}) })

View File

@@ -133,7 +133,7 @@ func Recv(c gnet.Conn, data TomeeHeader) {
} }
// fmt.Println(cmdlister) // fmt.Println(cmdlister)
glog.Debug(context.Background(), "接收数据", data.UserID, data.CMD) //glog.Debug(context.Background(), "接收数据", data.UserID, data.CMD)
params := []reflect.Value{} params := []reflect.Value{}
//funct := cmdlister.Type().NumIn() //funct := cmdlister.Type().NumIn()
@@ -182,7 +182,7 @@ func Recv(c gnet.Conn, data TomeeHeader) {
return return
} }
t := GetPlayer(c, data.UserID) t := clientdata.Player
if t == nil { if t == nil {
return return
} }

View File

@@ -25,7 +25,6 @@ import (
"time" "time"
"github.com/antlabs/timer" "github.com/antlabs/timer"
"github.com/bruceshao/lockfree"
"github.com/gobwas/ws" "github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil" "github.com/gobwas/ws/wsutil"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
@@ -55,45 +54,25 @@ type ClientData struct {
Mu sync.Mutex Mu sync.Mutex
ERROR_CONNUT int ERROR_CONNUT int
Wsmsg *WsCodec Wsmsg *WsCodec
Lf *lockfree.Lockfree[[]byte] Conn gnet.Conn
CloseChan chan struct{} CloseChan chan struct{}
} }
func NewClientData(c gnet.Conn) *ClientData { func NewClientData(c gnet.Conn) *ClientData {
// 创建事件处理器 // 创建事件处理器
handler := &eventHandler{
Conn: c,
}
lfs := lockfree.NewConditionBlockStrategy()
// 创建消费端串行处理的Lockfree
lf := lockfree.NewLockfree[[]byte](
4096,
handler,
lfs,
)
// 启动Lockfree
if err := lf.Start(); err != nil {
panic(err)
}
cd := ClientData{ cd := ClientData{
IsCrossDomain: false, IsCrossDomain: false,
Player: nil, Player: nil,
Lf: lf, Conn: c,
Wsmsg: &WsCodec{},
Wsmsg: &WsCodec{},
} }
return &cd return &cd
} }
type eventHandler struct { func (h *ClientData) OnEvent(v []byte) {
Callback func(conn gnet.Conn, data TomeeHeader)
Conn gnet.Conn
}
func (h *eventHandler) OnEvent(v []byte) {
defer func() { defer func() {
if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值 if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值
// 1. 打印错误信息 // 1. 打印错误信息
@@ -409,6 +388,7 @@ func (p *Player) Save() {
} }
if p.FightC != nil { if p.FightC != nil {
go p.FightC.Over(p, info.BattleOverReason.PlayerOffline) //玩家逃跑,但是不能锁线程 go p.FightC.Over(p, info.BattleOverReason.PlayerOffline) //玩家逃跑,但是不能锁线程
} }