Files
bl/common/socket/ServerEvent.go
xinian c021b40fbe
All checks were successful
ci/woodpecker/push/my-first-workflow Pipeline was successful
feat: 增强踢人逻辑与BOSS脚本支持
优化踢人超时处理和僵尸连接清理,支持BOSS动作脚本并增加测试,修复事件匹配与战斗循环中的并发问题。
2026-04-05 21:59:22 +08:00

268 lines
6.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package socket
import (
"context"
"encoding/binary"
"errors"
"io"
"log"
"os"
"sync/atomic"
"time"
"blazing/cool"
"blazing/logic/service/player"
"blazing/modules/config/service"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/panjf2000/gnet/v2"
)
func (s *Server) Boot(serverid, port uint32) error {
// go s.bootws()
s.serverid = serverid
s.port = port
err := gnet.Run(s, s.network+"://"+s.addr,
gnet.WithMulticore(true),
gnet.WithTicker(true),
// 其他调优配置↓
gnet.WithTCPNoDelay(gnet.TCPNoDelay), // 禁用Nagle算法降低延迟适合小数据包场景
//gnet.WithReusePort(true), // 开启SO_REUSEPORT多核下提升并发
//gnet.WithReadBufferCap(1024*64), // 读缓冲区64KB根据业务调整默认太小
// gnet.WithWriteBufferCap(1024*64), // 写缓冲区64KB
//gnet.WithLockOSThread(true), // 绑定goroutine到OS线程减少上下文切换
)
if err != nil {
panic(err)
}
return nil
}
func (s *Server) Stop() error {
_ = s.eng.Stop(context.Background())
s.workerPool.Release()
return nil
}
func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) {
defer func() {
if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值
// 1. 打印错误信息
if t, ok := c.Context().(*player.ClientData); ok {
if t.Player != nil {
if t.Player.Info != nil {
cool.Logger.Error(context.TODO(), "OnClose 错误:", cool.Config.ServerInfo.OnlineID, t.Player.Info.UserID, err)
t.Player.Service.Info.Save(*t.Player.Info)
}
}
} else {
cool.Logger.Error(context.TODO(), "OnClose 错误:", cool.Config.ServerInfo.OnlineID, err)
}
}
}()
// 识别 RST 导致的连接中断(错误信息含 "connection reset"
// if err != nil && (strings.Contains(err.Error(), "connection reset") || strings.Contains(err.Error(), "reset by peer")) {
// remoteIP := c.RemoteAddr().(*net.TCPAddr).IP.String()
// log.Printf("RST 攻击检测: 来源 %s, 累计攻击次数 %d", remoteIP)
// // 防护逻辑:临时封禁异常 IP可扩展为 IP 黑名单)
// // go s.tempBlockIP(remoteIP, 5*time.Minute)
// }
//fmt.Println(err, c.RemoteAddr().String(), "断开连接")
atomic.AddInt64(&cool.Connected, -1)
//logging.Infof("conn[%v] disconnected", c.RemoteAddr().String())
v, _ := c.Context().(*player.ClientData)
if v != nil {
v.Close()
if v.Player != nil {
v.Player.Save() //保存玩家数据
}
}
//}
//关闭连接
return
}
func (s *Server) OnTick() (delay time.Duration, action gnet.Action) {
g.Log().Async().Info(context.Background(), gtime.Now().ISO8601(), "服务器ID", cool.Config.ServerInfo.OnlineID, "链接数", atomic.LoadInt64(&cool.Connected))
if s.quit && atomic.LoadInt64(&cool.Connected) == 0 {
//执行正常退出逻辑
os.Exit(0)
}
return 30 * time.Second, gnet.None
}
func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {
s.eng = eng
service.NewServerService().SetServerID(s.serverid, s.port) //设置当前服务器端口
return gnet.None
}
func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action) {
if s.network != "tcp" {
return nil, gnet.Close
}
if conn.Context() == nil {
conn.SetContext(player.NewClientData(conn)) //注入data
}
atomic.AddInt64(&cool.Connected, 1)
return nil, gnet.None
}
func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) {
defer func() {
if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值
// 1. 打印错误信息
if t, ok := c.Context().(*player.ClientData); ok {
if t.Player != nil {
if t.Player.Info != nil {
cool.Logger.Error(context.TODO(), "OnTraffic 错误:", cool.Config.ServerInfo.OnlineID, t.Player.Info.UserID, err)
t.Player.Service.Info.Save(*t.Player.Info)
}
}
}
}
}()
ws := c.Context().(*player.ClientData).Wsmsg
if ws.Tcp { //升级失败时候防止缓冲区溢出
return s.handleTCP(c)
}
tt, len1 := ws.ReadBufferBytes(c)
if tt == gnet.Close {
return gnet.Close
}
ok, action := ws.Upgrade(c)
if action != gnet.None { //连接断开
return action
}
if !ok { //升级失败,说明是tcp连接
ws.Tcp = true
return s.handleTCP(c)
}
// fmt.Println(ws.Buf.Bytes())
c.Discard(len1)
messages, err := ws.Decode(c)
if err != nil {
return gnet.Close
}
if messages == nil {
return
}
for _, msg := range messages {
s.onevent(c, msg.Payload)
//t.OnEvent(msg.Payload)
}
return gnet.None
}
const maxBodyLen = 10 * 1024 // 业务最大包体长度,按需调整
func (s *Server) handleTCP(conn gnet.Conn) (action gnet.Action) {
conn.Context().(*player.ClientData).IsCrossDomain.Do(func() { //跨域检测
handle(conn)
})
// handle(c)
// 先读取4字节的包长度
lenBuf, err := conn.Peek(4)
if err != nil {
if errors.Is(err, io.ErrShortBuffer) {
return
}
return gnet.Close
}
bodyLen := binary.BigEndian.Uint32(lenBuf)
if bodyLen > maxBodyLen {
return gnet.Close
}
if conn.InboundBuffered() < int(bodyLen) {
return
}
// 提取包体
body, err := conn.Next(int(bodyLen))
if err != nil {
if errors.Is(err, io.ErrShortBuffer) {
return
}
return gnet.Close
}
s.onevent(conn, body)
if conn.InboundBuffered() > 0 {
if err := conn.Wake(nil); err != nil { // wake up the connection manually to avoid missing the leftover data
return gnet.Close
}
}
return action
}
// CROSS_DOMAIN 定义跨域策略文件内容
const CROSS_DOMAIN = "<?xml version=\"1.0\"?><!DOCTYPE cross-domain-policy><cross-domain-policy><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>\x00"
// TEXT 定义跨域请求的文本格式
const TEXT = "<policy-file-request/>\x00"
func handle(c gnet.Conn) {
// 读取数据并检查是否为跨域请求
data, err := c.Peek(len(TEXT))
if err != nil {
log.Printf("Error reading cross-domain request: %v", err)
return
}
if string(data) == TEXT { //判断是否是跨域请求
//log.Printf("Received cross-domain request from %s", c.RemoteAddr())
// 处理跨域请求
c.Write([]byte(CROSS_DOMAIN))
c.Discard(len(TEXT))
return
}
//return
}
func (s *Server) onevent(c gnet.Conn, v []byte) {
if t, ok := c.Context().(*player.ClientData); ok {
t.PushEvent(v, s.workerPool.Submit)
}
}