package rpc import ( "blazing/common/data/share" "blazing/cool" "blazing/cool/coolconfig" "context" "fmt" "log" "net/url" "strings" "time" config "blazing/modules/config/service" "github.com/filecoin-project/go-jsonrpc" "github.com/gogf/gf/v2/util/gconv" ) // Define the server handler type ServerHandler struct{} const kickForwardTimeout = 3 * time.Second const ClientCallTimeout = 5 * time.Second // A 服强关,留下僵尸在线状态:B 服可以通过 login 清理后登录。 // login 服不可用:B 服不会放行,仍提示系统忙。 func isDisconnectedLogicReverseClientError(err error) bool { if err == nil { return false } errText := err.Error() return strings.Contains(errText, "websocket routine exiting") || strings.Contains(errText, "sendRequest failed") || strings.Contains(errText, "closed out channel") } // 实现踢人 func (*ServerHandler) Kick(_ context.Context, userid uint32) error { useid1, err := share.ShareManager.GetUserOnline(userid) if err != nil || useid1 == 0 { // 请求到达时用户已离线,直接视为成功 return nil } cl, ok := cool.GetClientOnly(useid1) if !ok || cl == nil { // 目标服务器不在线,清理僵尸在线标记并视为成功 _ = share.ShareManager.DeleteUserOnline(userid) cool.DeleteClientOnly(useid1) return nil } resultCh := make(chan error, 1) go func() { resultCh <- cl.KickPerson(userid) // 实现指定服务器踢人 }() select { case callErr := <-resultCh: if callErr == nil { return nil } // 调用失败后兜底:用户若已离线/切服/目标服不在线都算成功 useid2, err2 := share.ShareManager.GetUserOnline(userid) if err2 != nil || useid2 == 0 || useid2 != useid1 { return nil } if cl2, ok2 := cool.GetClientOnly(useid2); !ok2 || cl2 == nil { _ = share.ShareManager.DeleteUserOnline(userid) cool.DeleteClientOnly(useid2) return nil } if isDisconnectedLogicReverseClientError(callErr) { _ = share.ShareManager.DeleteUserOnline(userid) cool.DeleteClientOnly(useid2) return nil } // 仍在线则返回失败,不按成功处理 return callErr case <-time.After(kickForwardTimeout): // 仅防止无限等待;超时不算成功 useid2, err2 := share.ShareManager.GetUserOnline(userid) if err2 != nil || useid2 == 0 || useid2 != useid1 { return nil } if cl2, ok2 := cool.GetClientOnly(useid2); !ok2 || cl2 == nil { _ = share.ShareManager.DeleteUserOnline(userid) cool.DeleteClientOnly(useid2) return nil } return fmt.Errorf("kick timeout, user still online: uid=%d server=%d", userid, useid2) } } // 注册logic服务器 func (*ServerHandler) RegisterLogic(ctx context.Context, id, port uint32) error { fmt.Println("注册logic服务器", id, port) return registerReverseLogicClient(ctx, id, port) } func (*ServerHandler) MatchJoinOrUpdate(_ context.Context, payload PVPMatchJoinPayload) error { return DefaultPVPMatchCoordinator().JoinOrUpdate(payload) } func (*ServerHandler) MatchCancel(_ context.Context, userID uint32) error { DefaultPVPMatchCoordinator().Cancel(userID) return nil } func CServer() *jsonrpc.RPCServer { // create a new server instance rpcServer := jsonrpc.NewServer(jsonrpc.WithReverseClientSetup[cool.ClientHandler]("", setupLogicReverseClient)) rpcServer.Register("", &ServerHandler{}) return rpcServer } var closer jsonrpc.ClientCloser func StartClient(id, port uint32, callback any) *struct { Kick func(context.Context, uint32) error RegisterLogic func(context.Context, uint32, uint32) error MatchJoinOrUpdate func(context.Context, PVPMatchJoinPayload) error MatchCancel func(context.Context, uint32) error } { cool.Config.File.Domain = "127.0.0.1" u := url.URL{ Scheme: "ws", Host: cool.Config.File.Domain + gconv.String(cool.Config.Address), Path: "/rpc", } q := u.Query() q.Set("logic_id", gconv.String(id)) q.Set("logic_port", gconv.String(port)) u.RawQuery = q.Encode() rpcaddr := u.String() closer1, err := jsonrpc.NewMergeClient(context.Background(), rpcaddr, "", []interface{}{ &RPCClient, }, nil, jsonrpc.WithClientHandler("", callback), ) if err != nil { log.Fatalf("Failed to create client: %v", err) } closer = closer1 return &RPCClient } // Setup RPCClient with reverse call handler var RPCClient struct { Kick func(context.Context, uint32) error //踢人 RegisterLogic func(context.Context, uint32, uint32) error MatchJoinOrUpdate func(context.Context, PVPMatchJoinPayload) error MatchCancel func(context.Context, uint32) error // UserLogin func(int32, int32) error //用户登录事件 // UserLogout func(int32, int32) error //用户登出事件 } func setupLogicReverseClient(ctx context.Context, revClient cool.ClientHandler) error { _ = revClient req, ok := jsonrpc.GetHTTPRequest(ctx) if !ok || req == nil { return fmt.Errorf("missing websocket request context") } id := gconv.Uint32(req.URL.Query().Get("logic_id")) port := gconv.Uint32(req.URL.Query().Get("logic_port")) if id == 0 || port == 0 { return fmt.Errorf("missing logic identity in websocket query: id=%d port=%d", id, port) } if err := registerReverseLogicClient(ctx, id, port); err != nil { return err } key := coolconfig.ComposeRuntimeID(id, port) go func() { <-ctx.Done() cool.DeleteClientOnly(key) }() return nil } func registerReverseLogicClient(ctx context.Context, id, port uint32) error { //TODO 待修复滚动更新可能导致的玩家可以同时在旧服务器和新服务器同时在线的bug revClient, ok := jsonrpc.ExtractReverseClient[cool.ClientHandler](ctx) if !ok { return fmt.Errorf("no reverse client") } t := config.NewServerService().GetServerID(id) aa, ok := cool.GetClient(t.OnlineID, t.Port) if ok && aa != nil { //如果已经存在且这个端口已经被存过 aa.QuitSelf(0) } cool.AddClient(coolconfig.ComposeRuntimeID(id, port), &revClient) return nil }