Files
bl/common/socket/ServerEvent.go
昔念 f06638d6b6 ```
feat(socket): 优化TCP连接处理逻辑并引入批量读取机制

- 在 `ServerEvent.go` 中调整了 `OnTraffic` 方法的处理逻辑,
  避免不必要的循环,确保跨域请求优先处理。
- 新增 `batchRead` 配置项,用于控制单次处理的最大数据包数量,
 默认值设为 10。
- 修复 `OnClose` 方法中可能存在的执行顺序问题,并显式关闭 Lockfree 资源。
- 在 `ClientData` 初始化时,将 Lockfree 的阻塞策略从 Sleep 策略
  替换为 ConditionBlock 策略,提升并发处理性能。
- 微调玩家登录完成时的地图ID判断条件,由 1000
2025-10-29 03:19:32 +08:00

234 lines
5.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package socket
import (
"context"
"log"
"os"
"runtime/debug"
"sync/atomic"
"time"
"blazing/common/socket/codec"
"blazing/cool"
"blazing/logic/service/player"
"github.com/panjf2000/gnet/v2"
)
func (s *Server) Boot() error {
// go s.bootws()
err := gnet.Run(s, s.network+"://"+s.addr,
gnet.WithMulticore(true),
gnet.WithTicker(true),
// gnet.WithReusePort(true),
// gnet.WithReuseAddr(true),
gnet.WithSocketRecvBuffer(s.bufferSize))
if err != nil {
return err
}
// err := gnet.Run(s, s.network+"://"+s.addr, gnet.WithMulticore(s.multicore))
cool.Loger.Debug(context.Background(), "server exits with error: %v", err)
// logging.Infof("server exits with error: %v", err)
return nil
}
func (s *Server) Stop() error {
_ = s.eng.Stop(context.Background())
s.workerPool.Release()
return nil
}
func (s *Server) OnClose(c gnet.Conn, _ error) (action gnet.Action) {
defer func() {
if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值
// 1. 打印错误信息
log.Printf("捕获到 panic 错误:%v", err)
// 2. 打印完整调用堆栈debug.Stack() 获取堆栈字节流,转为字符串)
log.Printf("panic 详细堆栈:\n%s", debug.Stack())
}
}()
atomic.AddInt64(&s.connected, -1)
//logging.Infof("conn[%v] disconnected", c.RemoteAddr().String())
v, _ := c.Context().(*player.ClientData)
s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
v.Lf.Close() //关闭lockfree
if v.Player != nil {
//cool.Loger.Info(context.TODO(), "准备保存", v.Player.Info.UserID)
v.Player.Save() //保存玩家数据
//cool.Loger.Info(context.TODO(), "保存完成", v.Player.Info.UserID)
if v.CloseChan != nil {
close(v.CloseChan)
}
}
})
//}
//关闭连接
return
}
func (s *Server) OnTick() (delay time.Duration, action gnet.Action) {
cool.Loger.Infof(context.Background(), "[connected-count=%v]", atomic.LoadInt64(&s.connected))
if s.quit && atomic.LoadInt64(&s.connected) == 0 {
//执行正常退出逻辑
os.Exit(0)
}
return 30 * time.Second, gnet.None
}
func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {
s.eng = eng
// cool.Loger.Infof(context.Background(), " server is listening on %s\n", s.addr)
return gnet.None
}
func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action) {
if conn.Context() == nil {
conn.SetContext(player.NewClientData(conn)) //注入data
}
atomic.AddInt64(&s.connected, 1)
return nil, gnet.None
}
func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) {
defer func() {
if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值
// 1. 打印错误信息
log.Printf("捕获到 panic 错误:%v", err)
// 2. 打印完整调用堆栈debug.Stack() 获取堆栈字节流,转为字符串)
log.Printf("panic 详细堆栈:\n%s", debug.Stack())
}
}()
if s.network != "tcp" {
return gnet.Close
}
ws := c.Context().(*player.ClientData).Wsmsg
if ws.Tcp { //升级失败时候防止缓冲区溢出
return s.handleTcp(c)
}
tt, len1 := ws.ReadBufferBytes(c)
if tt == gnet.Close {
return gnet.Close
}
ok, action := ws.Upgrade(c)
if action != gnet.None { //连接断开
return action
}
if !ok { //升级失败,说明是tcp连接
ws.Tcp = true
return s.handleTcp(c)
}
// fmt.Println(ws.Buf.Bytes())
c.Discard(len1)
if ws.Buf.Len() <= 0 {
return gnet.None
}
messages, err := ws.Decode(c)
if err != nil {
return gnet.Close
}
if messages == nil {
return
}
for _, msg := range messages {
t := c.Context().(*player.ClientData)
//client := conn.RemoteAddr().String()
err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
err = t.Lf.Producer().Write(msg.Payload)
if err != nil {
panic(err)
}
})
}
return gnet.None
}
func (s *Server) handleTcp(conn gnet.Conn) (action gnet.Action) {
if s.discorse && !conn.Context().(*player.ClientData).IsCrossDomain {
handle(conn)
}
conn.Context().(*player.ClientData).IsCrossDomain = true
for i := 0; i < s.batchRead; i++ {
data, err := s.codec.Decode(conn)
if err == codec.ErrIncompletePacket {
break
}
if err != nil {
return gnet.Close
}
//cool.Loger.Debug(context.Background(), "原始数据", hex.EncodeToString(data))
t := conn.Context().(*player.ClientData)
err = s.workerPool.Submit(func() { //TODO 这里可能存在顺序执行问题,待修复
err = t.Lf.Producer().Write(data)
if err != nil {
panic(err)
}
})
}
if conn.InboundBuffered() > 0 {
if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data
cool.Loger.Errorf(context.Background(), "failed to wake up the connection, %v", err)
return gnet.Close
}
}
return action
}
// CROSS_DOMAIN 定义跨域策略文件内容
const CROSS_DOMAIN = "<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy><cross-domain-policy><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>\x00"
// TEXT 定义跨域请求的文本格式
const TEXT = "<policy-file-request/>\x00"
func handle(c gnet.Conn) {
// 读取数据并检查是否为跨域请求
data, err := c.Peek(len(TEXT))
if err != nil {
log.Printf("Error reading cross-domain request: %v", err)
return
}
if string(data) == TEXT { //判断是否是跨域请求
log.Printf("Received cross-domain request from %s", c.RemoteAddr())
// 处理跨域请求
c.Write([]byte(CROSS_DOMAIN))
c.Discard(len(TEXT))
return
}
//return
}