```
feat(player): 添加玩家断开连接时的安全保存机制 - 实现 SaveOnDisconnect 方法,确保玩家数据在断开连接时安全保存 - 添加并发控制防止重复保存操作,使用互斥锁和完成通道确保一次保存 - 在 socket 关闭事件中改为异步调用 SaveOnDisconnect 避免阻塞 - 添加 panic 恢复机制保护保存过程中的异常情况 refactor(login): 优化登录时的踢人逻辑和超时处理
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"blazing/cool"
|
||||
"blazing/logic/service/fight/pvp"
|
||||
"blazing/logic/service/fight/pvpwire"
|
||||
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -58,7 +58,7 @@ func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) {
|
||||
if t.Player != nil {
|
||||
if t.Player.Info != nil {
|
||||
cool.Logger.Error(context.TODO(), "OnClose 错误:", cool.Config.ServerInfo.OnlineID, t.Player.Info.UserID, err)
|
||||
t.Player.Service.Info.Save(*t.Player.Info)
|
||||
go t.Player.SaveOnDisconnect()
|
||||
}
|
||||
|
||||
}
|
||||
@@ -89,7 +89,7 @@ func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) {
|
||||
// v.LF.Close()
|
||||
//close(v.MsgChan)
|
||||
if v.Player != nil {
|
||||
v.Player.Save() //保存玩家数据
|
||||
go v.Player.SaveOnDisconnect() //保存玩家数据
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -2,14 +2,11 @@ package controller
|
||||
|
||||
import (
|
||||
"blazing/common/data/share"
|
||||
"blazing/cool"
|
||||
|
||||
"blazing/common/socket/errorcode"
|
||||
|
||||
"blazing/logic/service/user"
|
||||
|
||||
"blazing/cool"
|
||||
"blazing/logic/service/player"
|
||||
"blazing/logic/service/space"
|
||||
"blazing/logic/service/user"
|
||||
"blazing/modules/player/service"
|
||||
"context"
|
||||
"time"
|
||||
@@ -17,6 +14,32 @@ import (
|
||||
"github.com/panjf2000/gnet/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
waitUserOfflineTimeout = 30 * time.Second
|
||||
waitUserOfflineInterval = 200 * time.Millisecond
|
||||
waitUserOfflineKickGap = 5 * time.Second
|
||||
)
|
||||
|
||||
func waitUserOffline(userID uint32, timeout time.Duration) bool {
|
||||
deadline := time.Now().Add(timeout)
|
||||
lastKickAt := time.Now()
|
||||
for {
|
||||
if _, onlineErr := share.ShareManager.GetUserOnline(userID); onlineErr != nil {
|
||||
return true
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return false
|
||||
}
|
||||
if time.Since(lastKickAt) >= waitUserOfflineKickGap {
|
||||
if kickErr := Maincontroller.RPCClient.Kick(userID); kickErr != nil {
|
||||
cool.Logger.Error(context.Background(), "补踢失败", userID, kickErr)
|
||||
}
|
||||
lastKickAt = time.Now()
|
||||
}
|
||||
time.Sleep(waitUserOfflineInterval)
|
||||
}
|
||||
}
|
||||
|
||||
// Login 处理命令: 1001
|
||||
func (h Controller) Login(data *MAIN_LOGIN_IN, c gnet.Conn) (result *user.LoginMSInfo, err errorcode.ErrorCode) { //这个时候player应该是空的
|
||||
|
||||
@@ -30,12 +53,18 @@ func (h Controller) Login(data *MAIN_LOGIN_IN, c gnet.Conn) (result *user.LoginM
|
||||
defer c.Close()
|
||||
return
|
||||
}
|
||||
_, erre := share.ShareManager.GetUserOnline(data.Head.UserID)
|
||||
if erre == nil {
|
||||
error := Maincontroller.RPCClient.Kick(data.Head.UserID) //通知其他服务器踢人
|
||||
if error != nil {
|
||||
cool.Logger.Error(context.Background(), "踢人失败", err)
|
||||
|
||||
if onlineServerID, onlineErr := share.ShareManager.GetUserOnline(data.Head.UserID); onlineErr == nil {
|
||||
kickErr := Maincontroller.RPCClient.Kick(data.Head.UserID) //通知其他服务器踢人
|
||||
if kickErr != nil {
|
||||
cool.Logger.Error(context.Background(), "踢人失败", data.Head.UserID, onlineServerID, kickErr)
|
||||
err = errorcode.ErrorCodes.ErrSystemBusyTryLater
|
||||
defer c.Close()
|
||||
return
|
||||
}
|
||||
if ok := waitUserOffline(data.Head.UserID, waitUserOfflineTimeout); !ok {
|
||||
cool.Logger.Error(context.Background(), "等待旧会话下线超时", data.Head.UserID, onlineServerID, waitUserOfflineTimeout)
|
||||
err = errorcode.ErrorCodes.ErrSystemBusyTryLater
|
||||
defer c.Close()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"blazing/logic/service/fight/info"
|
||||
"blazing/logic/service/space"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -116,6 +117,10 @@ type Player struct {
|
||||
Hash uint32
|
||||
|
||||
ArenaFlags uint32
|
||||
|
||||
logoutMu sync.Mutex
|
||||
logoutDone chan struct{}
|
||||
logoutSaved bool
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -447,14 +452,16 @@ func (player1 *Player) Kick(isquit bool) {
|
||||
// --- 新增超时机制核心代码 ---
|
||||
// 设定超时时间(可根据业务需求调整,这里以3秒为例)
|
||||
const kickTimeout = 10 * time.Second
|
||||
timeout := false
|
||||
select {
|
||||
case <-CloseChan:
|
||||
// 正常流程:连接关闭回调已执行,CloseChan 被关闭
|
||||
case <-time.After(kickTimeout):
|
||||
player1.Save()
|
||||
service.NewBaseSysLogService().RecordKick(uint32(player1.Info.UserID), fmt.Errorf("踢人操作超时(超时时间:%v)", kickTimeout).Error())
|
||||
// 超时处理:避免永久阻塞,可添加日志便于排查问题
|
||||
// 注意:这里不会中断 CloseWithCallback 的执行,仅解除当前协程的阻塞
|
||||
timeout = true
|
||||
}
|
||||
player1.SaveOnDisconnect()
|
||||
if timeout {
|
||||
service.NewBaseSysLogService().RecordKick(uint32(player1.Info.UserID), fmt.Sprintf("踢人操作超时(超时时间:%v)", kickTimeout))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,13 +15,13 @@ import (
|
||||
|
||||
// Save 保存玩家数据
|
||||
func (p *Player) Save() {
|
||||
cool.CacheManager.Remove(context.TODO(), fmt.Sprintf("player:%d", p.Info.UserID))
|
||||
|
||||
if p.Info == nil {
|
||||
if p == nil || p.Info == nil {
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
cacheCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
cool.CacheManager.Remove(cacheCtx, fmt.Sprintf("player:%d", p.Info.UserID))
|
||||
newtime := time.Now().Unix()
|
||||
p.Info.TimeToday = p.Info.TimeToday + newtime - int64(p.Logintime) //保存电池时间
|
||||
// p.Info.FightTime = p.Info.FightTime + (newtime - int64(p.Logintime))
|
||||
@@ -52,7 +52,11 @@ func (p *Player) Save() {
|
||||
|
||||
}
|
||||
p.IsLogin = false
|
||||
p.Service.Info.Save(*p.Info)
|
||||
if p.Service != nil && p.Service.Info != nil {
|
||||
p.Service.Info.SaveUntilSuccess(*p.Info)
|
||||
} else {
|
||||
cool.Logger.Error(context.TODO(), "player save skipped: service not ready", p.Info.UserID)
|
||||
}
|
||||
space.GetSpace(p.Info.MapID).LeaveMap(p)
|
||||
|
||||
p.MapNPC.Stop() //停止刷怪
|
||||
@@ -61,6 +65,46 @@ func (p *Player) Save() {
|
||||
share.ShareManager.DeleteUserOnline(p.Info.UserID) //设置用户登录服务器
|
||||
|
||||
}
|
||||
func (p *Player) SaveOnDisconnect() {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
|
||||
p.logoutMu.Lock()
|
||||
if p.logoutSaved {
|
||||
p.logoutMu.Unlock()
|
||||
return
|
||||
}
|
||||
if p.logoutDone != nil {
|
||||
done := p.logoutDone
|
||||
p.logoutMu.Unlock()
|
||||
<-done
|
||||
return
|
||||
}
|
||||
done := make(chan struct{})
|
||||
p.logoutDone = done
|
||||
p.logoutMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
if recoverErr := recover(); recoverErr != nil {
|
||||
if p.Info != nil {
|
||||
cool.Logger.Error(context.TODO(), "SaveOnDisconnect panic", p.Info.UserID, recoverErr)
|
||||
} else {
|
||||
cool.Logger.Error(context.TODO(), "SaveOnDisconnect panic", recoverErr)
|
||||
}
|
||||
}
|
||||
|
||||
p.logoutMu.Lock()
|
||||
p.logoutSaved = true
|
||||
if p.logoutDone == done {
|
||||
close(done)
|
||||
p.logoutDone = nil
|
||||
}
|
||||
p.logoutMu.Unlock()
|
||||
}()
|
||||
|
||||
p.Save()
|
||||
}
|
||||
func (p *Player) CanGet() bool {
|
||||
if p.Info.TimeToday >= p.Info.TimeLimit {
|
||||
return false
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"github.com/gogf/gf/v2/util/grand"
|
||||
"github.com/google/uuid"
|
||||
csmap "github.com/mhmtszr/concurrent-swiss-map"
|
||||
)
|
||||
@@ -247,18 +248,64 @@ func (s *InfoService) Save(data model.PlayerInfo) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := s.dbm_fix(s.Model).Data("data", data).Update()
|
||||
_ = s.saveWithRetry(data, true)
|
||||
}
|
||||
|
||||
func (s *InfoService) SaveUntilSuccess(data model.PlayerInfo) {
|
||||
if cool.Config.ServerInfo.IsVip != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
attempt = 0
|
||||
backoff = time.Second
|
||||
maxBackoff = 30 * time.Second
|
||||
)
|
||||
for {
|
||||
attempt++
|
||||
fallback := attempt == 1 || attempt%10 == 0
|
||||
err := s.saveWithRetry(data, fallback)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if i == 2 {
|
||||
cool.Logger.Error(context.TODO(), "player save failed after retries, fallback to local file", data.UserID, err)
|
||||
s.saveToLocalFile(&data, err)
|
||||
return
|
||||
if attempt == 1 || attempt%10 == 0 {
|
||||
cool.Logger.Error(context.TODO(), "player save retrying until success", data.UserID, attempt, err)
|
||||
}
|
||||
halfBackoff := int(backoff / 2)
|
||||
if halfBackoff < 1 {
|
||||
halfBackoff = 1
|
||||
}
|
||||
jitter := time.Duration(grand.Intn(halfBackoff))
|
||||
sleepFor := backoff + jitter
|
||||
if sleepFor > maxBackoff {
|
||||
sleepFor = maxBackoff
|
||||
}
|
||||
time.Sleep(sleepFor)
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *InfoService) saveWithRetry(data model.PlayerInfo, fallback bool) error {
|
||||
var lastErr error
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := s.dbm_fix(s.Model).Data("data", data).Update()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
if fallback && lastErr != nil {
|
||||
cool.Logger.Error(context.TODO(), "player save failed after retries, fallback to local file", data.UserID, lastErr)
|
||||
s.saveToLocalFile(&data, lastErr)
|
||||
}
|
||||
return lastErr
|
||||
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user