Files
bl/common/serialize/go-jsonrpc/handler.go
昔念 83ecb90baf refactor(project): 重构项目并更新依赖
- 更新 README.md 中的项目结构说明
- 添加 pprof 性能分析工具的使用说明
- 更新 build.bat 文件,增加 proto 文件编译命令
- 升级 go-logr/logr 依赖至 v1.3.0
- 降级 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc 依赖至 v1.16.0
- 降级 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp 依赖至 v1.16.0
- 升级 go.opentelemetry.io/otel/trace 依赖至 v1.20.0
- 移除 logic/main.go 中的冗余代码
- 重构 logic/server.go 中的 Start 函数
- 更新 login/main.go 文件
2025-07-06 17:05:10 +08:00

514 lines
14 KiB
Go

package jsonrpc
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"reflect"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc/metrics"
)
type RawParams json.RawMessage
var rtRawParams = reflect.TypeOf(RawParams{})
// todo is there a better way to tell 'struct with any number of fields'?
func DecodeParams[T any](p RawParams) (T, error) {
var t T
err := json.Unmarshal(p, &t)
// todo also handle list-encoding automagically (json.Unmarshal doesn't do that, does it?)
return t, err
}
// methodHandler is a handler for a single method
type methodHandler struct {
paramReceivers []reflect.Type
nParams int
receiver reflect.Value
handlerFunc reflect.Value
hasCtx int
hasRawParams bool
errOut int
valOut int
}
// Request / response
type request struct {
Jsonrpc string `json:"jsonrpc"`
ID interface{} `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
Meta map[string]string `json:"meta,omitempty"`
}
// Limit request size. Ideally this limit should be specific for each field
// in the JSON request but as a simple defensive measure we just limit the
// entire HTTP body.
// Configured by WithMaxRequestSize.
const DEFAULT_MAX_REQUEST_SIZE = 100 << 20 // 100 MiB
type handler struct {
methods map[string]methodHandler
errors *Errors
maxRequestSize int64
// aliasedMethods contains a map of alias:original method names.
// These are used as fallbacks if a method is not found by the given method name.
aliasedMethods map[string]string
paramDecoders map[reflect.Type]ParamDecoder
methodNameFormatter MethodNameFormatter
tracer Tracer
}
type Tracer func(method string, params []reflect.Value, results []reflect.Value, err error)
func makeHandler(sc ServerConfig) *handler {
return &handler{
methods: make(map[string]methodHandler),
errors: sc.errors,
aliasedMethods: map[string]string{},
paramDecoders: sc.paramDecoders,
methodNameFormatter: sc.methodNameFormatter,
maxRequestSize: sc.maxRequestSize,
tracer: sc.tracer,
}
}
// Register
func (s *handler) register(namespace string, r interface{}) {
val := reflect.ValueOf(r)
// TODO: expect ptr
for i := 0; i < val.NumMethod(); i++ {
method := val.Type().Method(i)
funcType := method.Func.Type()
hasCtx := 0
if funcType.NumIn() >= 2 && funcType.In(1) == contextType {
hasCtx = 1
}
hasRawParams := false
ins := funcType.NumIn() - 1 - hasCtx
recvs := make([]reflect.Type, ins)
for i := 0; i < ins; i++ {
if hasRawParams && i > 0 {
panic("raw params must be the last parameter")
}
if funcType.In(i+1+hasCtx) == rtRawParams {
hasRawParams = true
}
recvs[i] = method.Type.In(i + 1 + hasCtx)
}
valOut, errOut, _ := processFuncOut(funcType)
s.methods[s.methodNameFormatter(namespace, method.Name)] = methodHandler{
paramReceivers: recvs,
nParams: ins,
handlerFunc: method.Func,
receiver: val,
hasCtx: hasCtx,
hasRawParams: hasRawParams,
errOut: errOut,
valOut: valOut,
}
}
}
// Handle
type rpcErrFunc func(w func(func(io.Writer)), req *request, code ErrorCode, err error)
type chanOut func(reflect.Value, interface{}) error
func (s *handler) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
wf := func(cb func(io.Writer)) {
cb(w)
}
// We read the entire request upfront in a buffer to be able to tell if the
// client sent more than maxRequestSize and report it back as an explicit error,
// instead of just silently truncating it and reporting a more vague parsing
// error.
bufferedRequest := new(bytes.Buffer)
// We use LimitReader to enforce maxRequestSize. Since it won't return an
// EOF we can't actually know if the client sent more than the maximum or
// not, so we read one byte more over the limit to explicitly query that.
// FIXME: Maybe there's a cleaner way to do this.
reqSize, err := bufferedRequest.ReadFrom(io.LimitReader(r, s.maxRequestSize+1))
if err != nil {
// ReadFrom will discard EOF so any error here is unexpected and should
// be reported.
rpcError(wf, nil, rpcParseError, xerrors.Errorf("reading request: %w", err))
return
}
if reqSize > s.maxRequestSize {
rpcError(wf, nil, rpcParseError,
// rpcParseError is the closest we have from the standard errors defined
// in [jsonrpc spec](https://www.jsonrpc.org/specification#error_object)
// to report the maximum limit.
xerrors.Errorf("request bigger than maximum %d allowed",
s.maxRequestSize))
return
}
// Trim spaces to avoid issues with batch request detection.
bufferedRequest = bytes.NewBuffer(bytes.TrimSpace(bufferedRequest.Bytes()))
reqSize = int64(bufferedRequest.Len())
if reqSize == 0 {
rpcError(wf, nil, rpcInvalidRequest, xerrors.New("Invalid request"))
return
}
if bufferedRequest.Bytes()[0] == '[' && bufferedRequest.Bytes()[reqSize-1] == ']' {
var reqs []request
if err := json.NewDecoder(bufferedRequest).Decode(&reqs); err != nil {
rpcError(wf, nil, rpcParseError, xerrors.New("Parse error"))
return
}
if len(reqs) == 0 {
rpcError(wf, nil, rpcInvalidRequest, xerrors.New("Invalid request"))
return
}
_, _ = w.Write([]byte("[")) // todo consider handling this error
for idx, req := range reqs {
if req.ID, err = normalizeID(req.ID); err != nil {
rpcError(wf, &req, rpcParseError, xerrors.Errorf("failed to parse ID: %w", err))
return
}
s.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
if idx != len(reqs)-1 {
_, _ = w.Write([]byte(",")) // todo consider handling this error
}
}
_, _ = w.Write([]byte("]")) // todo consider handling this error
} else {
var req request
if err := json.NewDecoder(bufferedRequest).Decode(&req); err != nil {
rpcError(wf, &req, rpcParseError, xerrors.New("Parse error"))
return
}
if req.ID, err = normalizeID(req.ID); err != nil {
rpcError(wf, &req, rpcParseError, xerrors.Errorf("failed to parse ID: %w", err))
return
}
s.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
}
}
func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) {
defer func() {
if i := recover(); i != nil {
err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i)
log.Desugar().WithOptions(zap.AddStacktrace(zapcore.ErrorLevel)).Sugar().Error(err)
}
}()
out = f.Call(params)
return out, nil
}
func (s *handler) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) {
if req.Meta == nil {
return ctx, nil
}
var span *trace.Span
if eSC, ok := req.Meta["SpanContext"]; ok {
bSC := make([]byte, base64.StdEncoding.DecodedLen(len(eSC)))
_, err := base64.StdEncoding.Decode(bSC, []byte(eSC))
if err != nil {
log.Errorf("SpanContext: decode", "error", err)
return ctx, nil
}
sc, ok := propagation.FromBinary(bSC)
if !ok {
log.Errorf("SpanContext: could not create span", "data", bSC)
return ctx, nil
}
ctx, span = trace.StartSpanWithRemoteParent(ctx, "api.handle", sc)
} else {
ctx, span = trace.StartSpan(ctx, "api.handle")
}
span.AddAttributes(trace.StringAttribute("method", req.Method))
return ctx, span
}
func (s *handler) createError(err error) *JSONRPCError {
var code ErrorCode = 1
if s.errors != nil {
c, ok := s.errors.byType[reflect.TypeOf(err)]
if ok {
code = c
}
}
out := &JSONRPCError{
Code: code,
Message: err.Error(),
}
switch m := err.(type) {
case RPCErrorCodec:
o, err := m.ToJSONRPCError()
if err != nil {
log.Errorf("Failed to convert error to JSONRPCError: %w", err)
} else {
out = &o
}
case marshalable:
meta, marshalErr := m.MarshalJSON()
if marshalErr == nil {
out.Meta = meta
} else {
log.Errorf("Failed to marshal error metadata: %w", marshalErr)
}
}
return out
}
func (s *handler) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
// Not sure if we need to sanitize the incoming req.Method or not.
ctx, span := s.getSpan(ctx, req)
ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method))
defer span.End()
handler, ok := s.methods[req.Method]
if !ok {
aliasTo, ok := s.aliasedMethods[req.Method]
if ok {
handler, ok = s.methods[aliasTo]
}
if !ok {
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
stats.Record(ctx, metrics.RPCInvalidMethod.M(1))
done(false)
return
}
}
outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan
defer done(outCh)
if chOut == nil && outCh {
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method))
stats.Record(ctx, metrics.RPCRequestError.M(1))
return
}
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
callParams[0] = handler.receiver
if handler.hasCtx == 1 {
callParams[1] = reflect.ValueOf(ctx)
}
if handler.hasRawParams {
// When hasRawParams is true, there is only one parameter and it is a
// json.RawMessage.
callParams[1+handler.hasCtx] = reflect.ValueOf(RawParams(req.Params))
} else {
// "normal" param list; no good way to do named params in Golang
var ps []param
if len(req.Params) > 0 {
err := json.Unmarshal(req.Params, &ps)
if err != nil {
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling param array: %w", err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
return
}
}
if len(ps) != handler.nParams {
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count (method '%s'): %d != %d", req.Method, len(ps), handler.nParams))
stats.Record(ctx, metrics.RPCRequestError.M(1))
done(false)
return
}
for i := 0; i < handler.nParams; i++ {
var rp reflect.Value
typ := handler.paramReceivers[i]
dec, found := s.paramDecoders[typ]
if !found {
rp = reflect.New(typ)
if err := json.NewDecoder(bytes.NewReader(ps[i].data)).Decode(rp.Interface()); err != nil {
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s' (param: %T): %w", req.Method, rp.Interface(), err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
return
}
rp = rp.Elem()
} else {
var err error
rp, err = dec(ctx, ps[i].data)
if err != nil {
rpcError(w, &req, rpcParseError, xerrors.Errorf("decoding params for '%s' (param: %d; custom decoder): %w", req.Method, i, err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
return
}
}
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Interface())
}
}
// /////////////////
callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
if err != nil {
rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
if s.tracer != nil {
s.tracer(req.Method, callParams, nil, err)
}
return
}
if req.ID == nil {
return // notification
}
if s.tracer != nil {
s.tracer(req.Method, callParams, callResult, nil)
}
// /////////////////
resp := response{
Jsonrpc: "2.0",
ID: req.ID,
}
if handler.errOut != -1 {
err := callResult[handler.errOut].Interface()
if err != nil {
log.Warnf("error in RPC call to '%s': %+v", req.Method, err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
resp.Error = s.createError(err.(error))
}
}
var kind reflect.Kind
var res interface{}
var nonZero bool
if handler.valOut != -1 {
res = callResult[handler.valOut].Interface()
kind = callResult[handler.valOut].Kind()
nonZero = !callResult[handler.valOut].IsZero()
}
// check error as JSON-RPC spec prohibits error and value at the same time
if resp.Error == nil {
if res != nil && kind == reflect.Chan {
// Channel responses are sent from channel control goroutine.
// Sending responses here could cause deadlocks on writeLk, or allow
// sending channel messages before this rpc call returns
//noinspection GoNilness // already checked above
err = chOut(callResult[handler.valOut], req.ID)
if err == nil {
return // channel goroutine handles responding
}
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
resp.Error = &JSONRPCError{
Code: 1,
Message: err.Error(),
}
} else {
resp.Result = res
}
}
if resp.Error != nil && nonZero {
log.Errorw("error and res returned", "request", req, "r.err", resp.Error, "res", res)
}
withLazyWriter(w, func(w io.Writer) {
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Error(err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
return
}
})
}
// withLazyWriter makes it possible to defer acquiring a writer until the first write.
// This is useful because json.Encode needs to marshal the response fully before writing, which may be
// a problem for very large responses.
func withLazyWriter(withWriterFunc func(func(io.Writer)), cb func(io.Writer)) {
lw := &lazyWriter{
withWriterFunc: withWriterFunc,
done: make(chan struct{}),
}
defer close(lw.done)
cb(lw)
}
type lazyWriter struct {
withWriterFunc func(func(io.Writer))
w io.Writer
done chan struct{}
}
func (lw *lazyWriter) Write(p []byte) (n int, err error) {
if lw.w == nil {
acquired := make(chan struct{})
go func() {
lw.withWriterFunc(func(w io.Writer) {
lw.w = w
close(acquired)
<-lw.done
})
}()
<-acquired
}
return lw.w.Write(p)
}