refactor(rpc): 重构 RPC 客户端并添加重连机制

- 更新了 RPC 客户端的初始化和重连逻辑
- 添加了重连函数和最大重试次数的配置
- 优化了与服务器的连接管理
- 调整了端口相关的数据类型
This commit is contained in:
2025-07-17 05:20:30 +08:00
parent b6231f6eb9
commit bf72b91fc6
18 changed files with 131 additions and 46 deletions

5
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"go.toolsEnvVars": {
"GOROOT": "E:\\golang\\go"
}
}

View File

@@ -9,7 +9,7 @@ type sConfig struct {
File *file `json:"file,omitempty"` // 文件上传配置
Name string `json:"name"` // 项目名称
Port string `json:"port"`
PortBL uint32 `json:"port_bl"`
PortBL uint16 `json:"port_bl"`
}
// OSS相关配置

View File

@@ -73,7 +73,7 @@ func (w *WsCodec) ReadBufferBytes(c gnet.Conn) (gnet.Action, int) {
return gnet.None, size
}
func (w *WsCodec) Decode(c gnet.Conn) (outs []wsutil.Message, err error) {
fmt.Println("do Decode")
// fmt.Println("do Decode")
messages, err := w.readWsMessages()
if err != nil {
logging.Errorf("Error reading message! %v", err)

View File

@@ -13,6 +13,7 @@ import (
const rpcaddr = "127.0.0.1:40000"
var clientmap = make(map[uint16]*ClientHandler) //客户端map
var clientidmap = make(map[uint16]uint16) //客户端map
//var usermap = make(map[int]int) //用户->客户端的map
// Define the client handler interface
@@ -45,18 +46,21 @@ func (h *ServerHandler) Kick(ctx context.Context, userid uint32) error {
}
// 注册logic服务器
func (h *ServerHandler) RegisterLogic(ctx context.Context, port uint16) error {
func (h *ServerHandler) RegisterLogic(ctx context.Context, id, port uint16) error {
//TODO 待修复滚动更新可能导致的玩家可以同时在旧服务器和新服务器同时在线的bug
revClient, ok := jsonrpc.ExtractReverseClient[ClientHandler](ctx)
if !ok {
return fmt.Errorf("no reverse client")
}
aa, ok := clientmap[port]
if ok && aa != nil {
aa.QuitSelf(0)
aa, ok := clientidmap[id]
if ok { //如果已经存在且这个端口已经被存过
t := clientmap[aa]
t.QuitSelf(0)
}
clientmap[port] = &revClient
clientidmap[id] = port
return nil
}
@@ -77,21 +81,24 @@ func StartServer() {
var closer jsonrpc.ClientCloser
func StartClient(port uint16, callback any) *struct {
func StartClient(id, port uint16, callback any) *struct {
Kick func(uint32) error
RegisterLogic func(uint16) error
RegisterLogic func(uint16, uint16) error
} {
closer1, err := jsonrpc.NewMergeClient(context.Background(), "ws://"+rpcaddr, "", []interface{}{
closer1, err := jsonrpc.NewMergeClient(context.Background(),
"ws://"+rpcaddr, "", []interface{}{
&RPCClient,
}, nil, jsonrpc.WithClientHandler("", callback))
}, nil, jsonrpc.WithClientHandler("", callback),
jsonrpc.WithReconnFun(func() { RPCClient.RegisterLogic(id, port) }),
)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
if port != 0 { //注册logic
RPCClient.RegisterLogic(port)
RPCClient.RegisterLogic(id, port)
}
@@ -110,7 +117,7 @@ func CloseClient() {
var RPCClient struct {
Kick func(uint32) error //踢人
RegisterLogic func(uint16) error //注册服务器消息
RegisterLogic func(uint16, uint16) error
// UserLogin func(int32, int32) error //用户登录事件
// UserLogout func(int32, int32) error //用户登出事件

View File

@@ -323,6 +323,7 @@ func websocketClient(ctx context.Context, addr string, namespace string, outs []
requests: requests,
stop: stop,
exiting: exiting,
reconfun: config.reconnfun,
}
go func() {

View File

@@ -32,6 +32,7 @@ type Config struct {
proxyConnFactory func(func() (*websocket.Conn, error)) func() (*websocket.Conn, error) // for testing
methodNamer MethodNameFormatter
reconnfun func()
}
func defaultConfig() Config {
@@ -100,6 +101,11 @@ func WithClientHandler(ns string, hnd interface{}) func(c *Config) {
c.reverseHandlers = append(c.reverseHandlers, clientHandler{ns, hnd})
}
}
func WithReconnFun(s func()) func(c *Config) {
return func(c *Config) {
c.reconnfun = s
}
}
// WithClientHandlerAlias creates an alias for a client HANDLER method - for handlers created
// with WithClientHandler

View File

@@ -61,6 +61,7 @@ type wsConn struct {
stopPings func()
stop <-chan struct{}
exiting chan struct{}
reconfun func()
// incoming messages
incoming chan io.Reader
@@ -661,6 +662,7 @@ func (c *wsConn) tryReconnect(ctx context.Context) bool {
c.writeLk.Unlock()
go c.nextMessage()
c.reconfun()
}()
return true
@@ -789,6 +791,7 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
log.Debugw("websocket error", "error", err, "lastAction", action, "time", time.Since(start))
// only client needs to reconnect
if !c.tryReconnect(ctx) {
return // failed to reconnect
}
@@ -938,3 +941,34 @@ func normalizeID(id interface{}) (interface{}, error) {
return nil, xerrors.Errorf("invalid id type: %T", id)
}
}
// Retry 执行带指数退避的重试
// 参数:
// - baseDelay: 基础延迟时间
// - maxRetries: 最大重试次数
// - fn: 需要执行的函数返回bool表示是否成功error为具体错误
//
// 返回:
// - 最后一次错误(如果所有重试都失败)
func Retry(baseDelay time.Duration, maxRetries int, fn func() (bool, error)) error {
var lastErr error
for i := 0; i <= maxRetries; i++ {
if i > 0 {
// 计算当前重试的延迟时间(指数增长)
delay := baseDelay * time.Duration(1<<i)
fmt.Printf("重试 %d/%d: 将在 %v 后重试,上次错误: %v", i, maxRetries, delay, lastErr)
time.Sleep(delay)
}
// 执行函数
success, err := fn()
if success {
return nil // 成功返回nil
}
lastErr = err // 记录错误,用于后续重试失败时返回
}
return fmt.Errorf("达到最大重试次数 (%d),最后错误: %w", maxRetries, lastErr)
}

View File

@@ -8,6 +8,7 @@ import (
"blazing/common/data/entity"
"blazing/common/data/share"
"blazing/cool"
"github.com/gogf/gf/v2/os/glog"
"github.com/panjf2000/gnet/v2"
@@ -49,7 +50,7 @@ func (s *Server) OnClose(c gnet.Conn, _ error) (action gnet.Action) {
t := v.GetPlayer()
if t != nil {
glog.Debug(context.Background(), t.UserID, "断开连接")
cool.Mainplayer.Delete(t.UserID)
share.ShareManager.DeleteUserOnline(t.UserID) //设置用户登录服务器
}

View File

@@ -5,6 +5,7 @@ import (
"blazing/common/socket/handler"
"blazing/cool"
"blazing/logic/service"
"os"
"time"
"bytes"
@@ -26,7 +27,7 @@ type Controller struct {
RPCClient struct {
Kick func(uint32) error
RegisterLogic func(uint16) error
RegisterLogic func(uint16, uint16) error
}
}
@@ -46,7 +47,12 @@ func (h *Controller) QuitSelf(a int) error {
//entity.ConutPlayer()
fmt.Println("当前在线人数", entity.ConutPlayer())
<-time.After((1000))
if entity.ConutPlayer() <= 0 {
//执行退出逻辑
os.Exit(1)
}
<-time.After((50000))
}
}()
//service.KickPlayer(uint32(a))
@@ -67,7 +73,7 @@ func parseCmd[T any](a T, data []byte) T {
func init() { //默认初始化扫描
// 解析命令行参数
cool.Config.PortBL = gcmd.GetOpt("port", "1").Uint32()
cool.Config.PortBL = gcmd.GetOpt("port", "1").Uint16()
// 获取对象的反射值和类型
value := reflect.ValueOf(Maincontroller)

View File

@@ -2,14 +2,13 @@ package controller
import (
"blazing/logic/service/server"
"fmt"
"github.com/panjf2000/gnet/v2"
)
// 处理命令: 105
func (h Controller) GetServer(data *server.SidInfo, c gnet.Conn) { //这个时候player应该是空的
t := data.Init() //初始化方法,然后可以返回默认返回值修改
fmt.Println(t)
data.Def() //初始化方法,然后可以返回默认返回值修改
//return //TODO 这里待实现改成接口调用Ret方法
}

View File

@@ -18,8 +18,9 @@ func (h *Controller) Login(data *login.LoginSidInfo, c gnet.Conn) { //这个时
service.SetPlayer(c, data.Head.UserID)
share.ShareManager.SetUserOnline(data.Head.UserID, h.Port) //设置用户登录服务器
data.Def()
}
data.Init()
//data.Def()
//return //t1, _ := hex.DecodeString("000186A6683F89CF6E69656F0000000000000000000000000008000F00000000000000000000000000000000000000000000000000000001000001DB0000018B000000000000A8C000000000000000000000000000000000000000080001388000000001000000017FFFFFFF00000000FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030000000000000000000000000000000000000064000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000001FFFFFFFF000000004E4F4E4F0000000000000000000000000000000000000001000000010000000100000001000000010000000100000001000000000003030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030000000100000064000000000000000000000000000000000000001F000000000000006400000000000093F4000093F4000000D5000000F7000000AD00000088000000920000008C0000009C00000000000000000000000000000000000000000000000000000004000027900000001B00004E6200000014000028380000002800004E3E0000002368493DC60000000000000000000000000000000000000000000100000000000000A937000007D1000186A600000000000186A66E69656F00000000000000000000000000000000000000000000000F0000000000000000000001DB0000018B0000000000000002000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000FFFFFFFF0000000000000001000000000000000000000000000000000000000000000000000000000000000000000000")
// return t1

View File

@@ -26,7 +26,7 @@ const (
var defaultPort = gconv.Int(cool.Config.Port) //读入默认的端口
// determinePort 确定服务器使用的端口
func determinePort(serverid uint32) (int, error) {
func determinePort(serverid uint16) (int, error) {
rand.Seed(time.Now().UnixNano())
if serverid == 0 {
return defaultPort, nil
@@ -60,7 +60,7 @@ func isPortAvailable(port int) bool {
}
// 如果id是0,那就是login server
func Start(serverid uint32) {
func Start(serverid uint16) {
//ants.NewPool(100)
head := handler.NewTomeeHandler()
head.Callback = controller.Recv
@@ -72,7 +72,7 @@ func Start(serverid uint32) {
}
// go func() {
t := rpc.StartClient(uint16(port), controller.Maincontroller)
t := rpc.StartClient(serverid, uint16(port), controller.Maincontroller)
//TODO 待实现掉线重新连接login
controller.Maincontroller.RPCClient = *t //将RPC赋值Start
controller.Maincontroller.Port = uint16(port) //赋值服务器ID

View File

@@ -16,7 +16,7 @@ type LoginSidInfo struct { //这里直接使用组合来实现将传入的原始
// ErrorPassWord uint32 `struc:"[0]pad"`
}
func (s *LoginSidInfo) Init() []byte { //默认返回方法
func (s *LoginSidInfo) Def() { //默认返回方法
t1, _ := hex.DecodeString("0000045D37000003E9000186A600000000000186A6683F89CF6E69656F0000000000000000000000000008000F00000000000000000000000000000000000000000000000000000001000001DB0000018B000000000000A8C000000000000000000000000000000000000000080001388000000001000000017FFFFFFF00000000FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF03030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030000000000000000000000000000000000000064000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000001FFFFFFFF000000004E4F4E4F0000000000000000000000000000000000000001000000010000000100000001000000010000000100000001000000000003030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030000000100000064000000000000000000000000000000000000001F000000000000006400000000000093F4000093F4000000D5000000F7000000AD00000088000000920000008C0000009C00000000000000000000000000000000000000000000000000000004000027900000001B00004E6200000014000028380000002800004E3E0000002368493DC60000000000000000000000000000000000000000000100000000000000A937000007D1000186A600000000000186A66E69656F00000000000000000000000000000000000000000000000F0000000000000000000001DB0000018B0000000000000002000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000FFFFFFFF0000000000000001000000000000000000000000000000000000000000000000000000000000000000000000")
//t1 = t1[17:]
fmt.Println(t1[:40])
@@ -25,7 +25,7 @@ func (s *LoginSidInfo) Init() []byte { //默认返回方法
s.Head.Result = 0
//s //return data.Head.Pack(t1[17:])[:40]
s.Head.Set(t1[17:]) //返回传参
return t1[:40]
//return t1[:40]
}
func (l *LoginSidInfo) CheakSession() bool {

View File

@@ -20,11 +20,10 @@ type SidInfo struct { //这里直接使用组合来实现将传入的原始头
// ErrorPassWord uint32 `struc:"[0]pad"`
}
func (s *SidInfo) Init() *commendSvrInfo { //默认返回方法
func (s *SidInfo) Def() { //默认返回方法
r := newCommendSvrInfo()
r.ServerList = GetServerInfoList()
s.Head.Set(r) //返回传参
return r
}
@@ -96,7 +95,7 @@ func GetServerInfoList() []ServerInfo {
for _, v := range ret {
tt := newServerInfo()
tt.OnlineID = v.OnlineID
tt.OnlineID = uint32(v.OnlineID)
// tt.UserCnt = v.UserCnt
//tt.IP = v.IP
tt.IP = ip

View File

@@ -1,11 +1,8 @@
package main
import (
"fmt"
_ "github.com/gogf/gf/contrib/nosql/redis/v2"
"blazing/common/rpc"
_ "blazing/contrib/drivers/pgsql"
_ "blazing/contrib/files/local"
@@ -33,14 +30,14 @@ func main() {
}
func kick(id int) {
// go Start(cool.Config.Port)
//go rpc()
go func() {
t := rpc.StartClient(0, &struct{}{})
err := t.Kick(1)
fmt.Println(err)
// // go Start(cool.Config.Port)
// //go rpc()
// go func() {
// t := rpc.StartClient(0, &struct{}{})
// err := t.Kick(1)
err = t.Kick(10001)
fmt.Println(err)
}()
// fmt.Println(err)
// //err := t.Kick(1)
// err = t.Kick(10001)
// fmt.Println(err)
// }()
}

View File

@@ -3,10 +3,13 @@ package admin
import (
"context"
"blazing/common/data/share"
"blazing/cool"
"blazing/modules/base/service"
blazing_service "blazing/modules/blazing/service"
"github.com/gogf/gf/v2/frame/g"
)
@@ -30,9 +33,35 @@ type UserMoveReq struct {
g.Meta `path:"/move" method:"GET"`
Authorization string `json:"Authorization" in:"header"`
}
type SessionReq struct {
g.Meta `path:"/getSession" method:"GET"`
Authorization string `json:"Authorization" in:"header"`
}
func (c *BaseSysUserController) Move(ctx context.Context, req *UserMoveReq) (res *cool.BaseRes, err error) {
err = service.NewBaseSysUserService().Move(ctx)
res = cool.Ok(nil)
return
}
func (c *BaseSysUserController) GetSession(ctx context.Context, req *SessionReq) (res *cool.BaseRes, err error) {
t := cool.GetAdmin(ctx)
if t == nil || t.UserId == 0 {
return cool.Fail("未登录"), nil
}
retsid, sid, err := blazing_service.NewLoginServiceService().GetSessionId(t.UserId)
if err != nil {
return cool.Fail(err.Error()), nil
}
res = cool.Ok(nil)
res.Data = retsid
if err := share.ShareManager.SaveSession(sid, uint32(t.UserId)); err != nil {
return cool.Fail(err.Error()), nil
}
return
}

View File

@@ -9,7 +9,7 @@ const TableNameServerList = "server_list"
// ServerList mapped from table <server_list>
type ServerList struct {
*cool.Model
OnlineID uint32 `gorm:"column:online_id;comment:'在线ID';uniqueIndex" json:"online_id"`
OnlineID uint16 `gorm:"column:online_id;comment:'在线ID';uniqueIndex" json:"online_id"`
//IP string `gorm:"type:varchar(16);comment:'服务器IP'" json:"ip"`
Port uint16 `gorm:"comment:'端口号,通常是小整数'" json:"port"`
//IsOpen bool `gorm:"default:true;not null;comment:'服务器是否开启,默认为开启状态'" json:"is_open"`

View File

@@ -50,10 +50,10 @@ func (s *LoginService) GetSessionId(accountID uint) (string, string, error) {
// /t1.
// 以上过程只需全局一次且应在生成ID之前完成。
}
func (s *LoginService) SetServerID(OnlineID uint32, Port uint16, t *struct {
func (s *LoginService) SetServerID(OnlineID uint16, Port uint16, t *struct {
Kick func(uint32) error
RegisterLogic func(uint16) error
RegisterLogic func(uint16, uint16) error
}) error {
m := cool.DBM(s.Model).Where("online_id", OnlineID)