feat(player): 优化玩家连接与消息发送逻辑 - 为 `Conn` 结构体新增 `Close()` 方法,并在多处替换原有的 `GetConn().Close()` 调用, 统一通过 `MainConn` 操作以提高代码一致性。 - 在 `SendPack` 和 `GetPlayer` 等方法中增加互斥锁保护,确保对 `MainConn` 的访问是线程安全的。 - 移除冗余的 `GetConn()` 方法,直接使用 `MainConn` 提升性能并简化结构。 refactor(rpc): 临时将 RPC 客户端地址硬编码为本地地址 - 将 rpcaddr 临时设置为 "127.0.0.1",便于调试和开发环境测试。 refactor(handler): 扩展 JSON-RPC 方法过滤条件 - 增加了对 "Kick" 和 "RegisterLogic" 方法的排除,防止其被注册到 RPC 处理器中。 refactor(fight): 精简战斗服务中的判断逻辑 - 简化 `IsFirst` 函数返回表达式。 - 移除了多余的 defer 语句,提前设置 `closefight` 标志位。 refactor(pet): 改进宠物经验添加机制 - `AddPetExp` 方法新增布尔参数 `bro` 控制是否广播更新。 - 修改调用点以适配新参数,修复潜在的数据同步问题。 - 初始化宠物时禁止触发经验变更广播。 chore(build): 删除旧版二进制文件 logic1 - 清理无用的编译产物,避免混淆项目结构。 ```
522 lines
14 KiB
Go
522 lines
14 KiB
Go
package jsonrpc
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"reflect"
|
|
|
|
"go.opencensus.io/stats"
|
|
"go.opencensus.io/tag"
|
|
"go.opencensus.io/trace"
|
|
"go.opencensus.io/trace/propagation"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-jsonrpc/metrics"
|
|
)
|
|
|
|
type RawParams json.RawMessage
|
|
|
|
var rtRawParams = reflect.TypeOf(RawParams{})
|
|
|
|
// todo is there a better way to tell 'struct with any number of fields'?
|
|
func DecodeParams[T any](p RawParams) (T, error) {
|
|
var t T
|
|
err := json.Unmarshal(p, &t)
|
|
|
|
// todo also handle list-encoding automagically (json.Unmarshal doesn't do that, does it?)
|
|
|
|
return t, err
|
|
}
|
|
|
|
// methodHandler is a handler for a single method
|
|
type methodHandler struct {
|
|
paramReceivers []reflect.Type
|
|
nParams int
|
|
|
|
receiver reflect.Value
|
|
handlerFunc reflect.Value
|
|
|
|
hasCtx int
|
|
hasRawParams bool
|
|
|
|
errOut int
|
|
valOut int
|
|
}
|
|
|
|
// Request / response
|
|
|
|
type request struct {
|
|
Jsonrpc string `json:"jsonrpc"`
|
|
ID interface{} `json:"id,omitempty"`
|
|
Method string `json:"method"`
|
|
Params json.RawMessage `json:"params"`
|
|
Meta map[string]string `json:"meta,omitempty"`
|
|
}
|
|
|
|
// Limit request size. Ideally this limit should be specific for each field
|
|
// in the JSON request but as a simple defensive measure we just limit the
|
|
// entire HTTP body.
|
|
// Configured by WithMaxRequestSize.
|
|
const DEFAULT_MAX_REQUEST_SIZE = 100 << 20 // 100 MiB
|
|
|
|
type handler struct {
|
|
methods map[string]methodHandler
|
|
errors *Errors
|
|
|
|
maxRequestSize int64
|
|
|
|
// aliasedMethods contains a map of alias:original method names.
|
|
// These are used as fallbacks if a method is not found by the given method name.
|
|
aliasedMethods map[string]string
|
|
|
|
paramDecoders map[reflect.Type]ParamDecoder
|
|
|
|
methodNameFormatter MethodNameFormatter
|
|
|
|
tracer Tracer
|
|
}
|
|
|
|
type Tracer func(method string, params []reflect.Value, results []reflect.Value, err error)
|
|
|
|
func makeHandler(sc ServerConfig) *handler {
|
|
return &handler{
|
|
methods: make(map[string]methodHandler),
|
|
errors: sc.errors,
|
|
|
|
aliasedMethods: map[string]string{},
|
|
paramDecoders: sc.paramDecoders,
|
|
|
|
methodNameFormatter: sc.methodNameFormatter,
|
|
|
|
maxRequestSize: sc.maxRequestSize,
|
|
|
|
tracer: sc.tracer,
|
|
}
|
|
}
|
|
|
|
// Register
|
|
|
|
func (s *handler) register(namespace string, r interface{}) {
|
|
val := reflect.ValueOf(r)
|
|
// TODO: expect ptr
|
|
|
|
for i := 0; i < val.NumMethod(); i++ {
|
|
method := val.Type().Method(i)
|
|
//fmt.Println("method: %s", method.Name)
|
|
if method.Name != "KickPerson" &&
|
|
method.Name != "QuitSelf" &&
|
|
method.Name != "Kick" &&
|
|
method.Name != "RegisterLogic" {
|
|
continue
|
|
|
|
}
|
|
|
|
funcType := method.Func.Type()
|
|
hasCtx := 0
|
|
if funcType.NumIn() >= 2 && funcType.In(1) == contextType {
|
|
hasCtx = 1
|
|
}
|
|
|
|
hasRawParams := false
|
|
ins := funcType.NumIn() - 1 - hasCtx
|
|
recvs := make([]reflect.Type, ins)
|
|
for i := 0; i < ins; i++ {
|
|
if hasRawParams && i > 0 {
|
|
panic("raw params must be the last parameter")
|
|
}
|
|
if funcType.In(i+1+hasCtx) == rtRawParams {
|
|
hasRawParams = true
|
|
}
|
|
recvs[i] = method.Type.In(i + 1 + hasCtx)
|
|
}
|
|
|
|
valOut, errOut, _ := processFuncOut(funcType)
|
|
|
|
s.methods[s.methodNameFormatter(namespace, method.Name)] = methodHandler{
|
|
paramReceivers: recvs,
|
|
nParams: ins,
|
|
|
|
handlerFunc: method.Func,
|
|
receiver: val,
|
|
|
|
hasCtx: hasCtx,
|
|
hasRawParams: hasRawParams,
|
|
|
|
errOut: errOut,
|
|
valOut: valOut,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle
|
|
|
|
type rpcErrFunc func(w func(func(io.Writer)), req *request, code ErrorCode, err error)
|
|
type chanOut func(reflect.Value, interface{}) error
|
|
|
|
func (s *handler) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
|
wf := func(cb func(io.Writer)) {
|
|
cb(w)
|
|
}
|
|
|
|
// We read the entire request upfront in a buffer to be able to tell if the
|
|
// client sent more than maxRequestSize and report it back as an explicit error,
|
|
// instead of just silently truncating it and reporting a more vague parsing
|
|
// error.
|
|
bufferedRequest := new(bytes.Buffer)
|
|
// We use LimitReader to enforce maxRequestSize. Since it won't return an
|
|
// EOF we can't actually know if the client sent more than the maximum or
|
|
// not, so we read one byte more over the limit to explicitly query that.
|
|
// FIXME: Maybe there's a cleaner way to do this.
|
|
reqSize, err := bufferedRequest.ReadFrom(io.LimitReader(r, s.maxRequestSize+1))
|
|
if err != nil {
|
|
// ReadFrom will discard EOF so any error here is unexpected and should
|
|
// be reported.
|
|
rpcError(wf, nil, rpcParseError, xerrors.Errorf("reading request: %w", err))
|
|
return
|
|
}
|
|
if reqSize > s.maxRequestSize {
|
|
rpcError(wf, nil, rpcParseError,
|
|
// rpcParseError is the closest we have from the standard errors defined
|
|
// in [jsonrpc spec](https://www.jsonrpc.org/specification#error_object)
|
|
// to report the maximum limit.
|
|
xerrors.Errorf("request bigger than maximum %d allowed",
|
|
s.maxRequestSize))
|
|
return
|
|
}
|
|
|
|
// Trim spaces to avoid issues with batch request detection.
|
|
bufferedRequest = bytes.NewBuffer(bytes.TrimSpace(bufferedRequest.Bytes()))
|
|
reqSize = int64(bufferedRequest.Len())
|
|
|
|
if reqSize == 0 {
|
|
rpcError(wf, nil, rpcInvalidRequest, xerrors.New("Invalid request"))
|
|
return
|
|
}
|
|
|
|
if bufferedRequest.Bytes()[0] == '[' && bufferedRequest.Bytes()[reqSize-1] == ']' {
|
|
var reqs []request
|
|
|
|
if err := json.NewDecoder(bufferedRequest).Decode(&reqs); err != nil {
|
|
rpcError(wf, nil, rpcParseError, xerrors.New("Parse error"))
|
|
return
|
|
}
|
|
|
|
if len(reqs) == 0 {
|
|
rpcError(wf, nil, rpcInvalidRequest, xerrors.New("Invalid request"))
|
|
return
|
|
}
|
|
|
|
_, _ = w.Write([]byte("[")) // todo consider handling this error
|
|
for idx, req := range reqs {
|
|
if req.ID, err = normalizeID(req.ID); err != nil {
|
|
rpcError(wf, &req, rpcParseError, xerrors.Errorf("failed to parse ID: %w", err))
|
|
return
|
|
}
|
|
|
|
s.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
|
|
|
|
if idx != len(reqs)-1 {
|
|
_, _ = w.Write([]byte(",")) // todo consider handling this error
|
|
}
|
|
}
|
|
_, _ = w.Write([]byte("]")) // todo consider handling this error
|
|
} else {
|
|
var req request
|
|
if err := json.NewDecoder(bufferedRequest).Decode(&req); err != nil {
|
|
rpcError(wf, &req, rpcParseError, xerrors.New("Parse error"))
|
|
return
|
|
}
|
|
|
|
if req.ID, err = normalizeID(req.ID); err != nil {
|
|
rpcError(wf, &req, rpcParseError, xerrors.Errorf("failed to parse ID: %w", err))
|
|
return
|
|
}
|
|
|
|
s.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
|
|
}
|
|
}
|
|
|
|
func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) {
|
|
defer func() {
|
|
if i := recover(); i != nil {
|
|
err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i)
|
|
log.Desugar().WithOptions(zap.AddStacktrace(zapcore.ErrorLevel)).Sugar().Error(err)
|
|
}
|
|
}()
|
|
|
|
out = f.Call(params)
|
|
return out, nil
|
|
}
|
|
|
|
func (s *handler) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) {
|
|
if req.Meta == nil {
|
|
return ctx, nil
|
|
}
|
|
|
|
var span *trace.Span
|
|
if eSC, ok := req.Meta["SpanContext"]; ok {
|
|
bSC := make([]byte, base64.StdEncoding.DecodedLen(len(eSC)))
|
|
_, err := base64.StdEncoding.Decode(bSC, []byte(eSC))
|
|
if err != nil {
|
|
log.Errorf("SpanContext: decode", "error", err)
|
|
return ctx, nil
|
|
}
|
|
sc, ok := propagation.FromBinary(bSC)
|
|
if !ok {
|
|
log.Errorf("SpanContext: could not create span", "data", bSC)
|
|
return ctx, nil
|
|
}
|
|
ctx, span = trace.StartSpanWithRemoteParent(ctx, "api.handle", sc)
|
|
} else {
|
|
ctx, span = trace.StartSpan(ctx, "api.handle")
|
|
}
|
|
|
|
span.AddAttributes(trace.StringAttribute("method", req.Method))
|
|
return ctx, span
|
|
}
|
|
|
|
func (s *handler) createError(err error) *JSONRPCError {
|
|
var code ErrorCode = 1
|
|
if s.errors != nil {
|
|
c, ok := s.errors.byType[reflect.TypeOf(err)]
|
|
if ok {
|
|
code = c
|
|
}
|
|
}
|
|
|
|
out := &JSONRPCError{
|
|
Code: code,
|
|
Message: err.Error(),
|
|
}
|
|
|
|
switch m := err.(type) {
|
|
case RPCErrorCodec:
|
|
o, err := m.ToJSONRPCError()
|
|
if err != nil {
|
|
log.Errorf("Failed to convert error to JSONRPCError: %w", err)
|
|
} else {
|
|
out = &o
|
|
}
|
|
case marshalable:
|
|
meta, marshalErr := m.MarshalJSON()
|
|
if marshalErr == nil {
|
|
out.Meta = meta
|
|
} else {
|
|
log.Errorf("Failed to marshal error metadata: %w", marshalErr)
|
|
}
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
func (s *handler) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
|
|
// Not sure if we need to sanitize the incoming req.Method or not.
|
|
ctx, span := s.getSpan(ctx, req)
|
|
ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method))
|
|
defer span.End()
|
|
|
|
handler, ok := s.methods[req.Method]
|
|
if !ok {
|
|
aliasTo, ok := s.aliasedMethods[req.Method]
|
|
if ok {
|
|
handler, ok = s.methods[aliasTo]
|
|
}
|
|
if !ok {
|
|
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
|
|
stats.Record(ctx, metrics.RPCInvalidMethod.M(1))
|
|
done(false)
|
|
return
|
|
}
|
|
}
|
|
|
|
outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan
|
|
defer done(outCh)
|
|
|
|
if chOut == nil && outCh {
|
|
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method))
|
|
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
|
return
|
|
}
|
|
|
|
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
|
|
callParams[0] = handler.receiver
|
|
if handler.hasCtx == 1 {
|
|
callParams[1] = reflect.ValueOf(ctx)
|
|
}
|
|
|
|
if handler.hasRawParams {
|
|
// When hasRawParams is true, there is only one parameter and it is a
|
|
// json.RawMessage.
|
|
|
|
callParams[1+handler.hasCtx] = reflect.ValueOf(RawParams(req.Params))
|
|
} else {
|
|
// "normal" param list; no good way to do named params in Golang
|
|
|
|
var ps []param
|
|
if len(req.Params) > 0 {
|
|
err := json.Unmarshal(req.Params, &ps)
|
|
if err != nil {
|
|
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling param array: %w", err))
|
|
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
|
return
|
|
}
|
|
}
|
|
|
|
if len(ps) != handler.nParams {
|
|
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count (method '%s'): %d != %d", req.Method, len(ps), handler.nParams))
|
|
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
|
done(false)
|
|
return
|
|
}
|
|
|
|
for i := 0; i < handler.nParams; i++ {
|
|
var rp reflect.Value
|
|
|
|
typ := handler.paramReceivers[i]
|
|
dec, found := s.paramDecoders[typ]
|
|
if !found {
|
|
rp = reflect.New(typ)
|
|
if err := json.NewDecoder(bytes.NewReader(ps[i].data)).Decode(rp.Interface()); err != nil {
|
|
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s' (param: %T): %w", req.Method, rp.Interface(), err))
|
|
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
|
return
|
|
}
|
|
rp = rp.Elem()
|
|
} else {
|
|
var err error
|
|
rp, err = dec(ctx, ps[i].data)
|
|
if err != nil {
|
|
rpcError(w, &req, rpcParseError, xerrors.Errorf("decoding params for '%s' (param: %d; custom decoder): %w", req.Method, i, err))
|
|
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
|
return
|
|
}
|
|
}
|
|
|
|
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Interface())
|
|
}
|
|
}
|
|
|
|
// /////////////////
|
|
|
|
callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
|
|
if err != nil {
|
|
rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err))
|
|
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
|
if s.tracer != nil {
|
|
s.tracer(req.Method, callParams, nil, err)
|
|
}
|
|
return
|
|
}
|
|
if req.ID == nil {
|
|
return // notification
|
|
}
|
|
|
|
if s.tracer != nil {
|
|
s.tracer(req.Method, callParams, callResult, nil)
|
|
}
|
|
// /////////////////
|
|
|
|
resp := response{
|
|
Jsonrpc: "2.0",
|
|
ID: req.ID,
|
|
}
|
|
|
|
if handler.errOut != -1 {
|
|
err := callResult[handler.errOut].Interface()
|
|
if err != nil {
|
|
log.Warnf("error in RPC call to '%s': %+v", req.Method, err)
|
|
stats.Record(ctx, metrics.RPCResponseError.M(1))
|
|
|
|
resp.Error = s.createError(err.(error))
|
|
}
|
|
}
|
|
|
|
var kind reflect.Kind
|
|
var res interface{}
|
|
var nonZero bool
|
|
if handler.valOut != -1 {
|
|
res = callResult[handler.valOut].Interface()
|
|
kind = callResult[handler.valOut].Kind()
|
|
nonZero = !callResult[handler.valOut].IsZero()
|
|
}
|
|
|
|
// check error as JSON-RPC spec prohibits error and value at the same time
|
|
if resp.Error == nil {
|
|
if res != nil && kind == reflect.Chan {
|
|
// Channel responses are sent from channel control goroutine.
|
|
// Sending responses here could cause deadlocks on writeLk, or allow
|
|
// sending channel messages before this rpc call returns
|
|
|
|
//noinspection GoNilness // already checked above
|
|
err = chOut(callResult[handler.valOut], req.ID)
|
|
if err == nil {
|
|
return // channel goroutine handles responding
|
|
}
|
|
|
|
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
|
|
stats.Record(ctx, metrics.RPCResponseError.M(1))
|
|
|
|
resp.Error = &JSONRPCError{
|
|
Code: 1,
|
|
Message: err.Error(),
|
|
}
|
|
} else {
|
|
resp.Result = res
|
|
}
|
|
}
|
|
if resp.Error != nil && nonZero {
|
|
log.Errorw("error and res returned", "request", req, "r.err", resp.Error, "res", res)
|
|
}
|
|
|
|
withLazyWriter(w, func(w io.Writer) {
|
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
|
log.Error(err)
|
|
stats.Record(ctx, metrics.RPCResponseError.M(1))
|
|
return
|
|
}
|
|
})
|
|
}
|
|
|
|
// withLazyWriter makes it possible to defer acquiring a writer until the first write.
|
|
// This is useful because json.Encode needs to marshal the response fully before writing, which may be
|
|
// a problem for very large responses.
|
|
func withLazyWriter(withWriterFunc func(func(io.Writer)), cb func(io.Writer)) {
|
|
lw := &lazyWriter{
|
|
withWriterFunc: withWriterFunc,
|
|
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
defer close(lw.done)
|
|
cb(lw)
|
|
}
|
|
|
|
type lazyWriter struct {
|
|
withWriterFunc func(func(io.Writer))
|
|
|
|
w io.Writer
|
|
done chan struct{}
|
|
}
|
|
|
|
func (lw *lazyWriter) Write(p []byte) (n int, err error) {
|
|
if lw.w == nil {
|
|
acquired := make(chan struct{})
|
|
go func() {
|
|
lw.withWriterFunc(func(w io.Writer) {
|
|
lw.w = w
|
|
close(acquired)
|
|
<-lw.done
|
|
})
|
|
}()
|
|
<-acquired
|
|
}
|
|
|
|
return lw.w.Write(p)
|
|
}
|