| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- // Copyright 2019 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package rpc
- import (
- "context"
- "encoding/json"
- "reflect"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/gopool"
- "github.com/ethereum/go-ethereum/log"
- )
- // handler handles JSON-RPC messages. There is one handler per connection. Note that
- // handler is not safe for concurrent use. Message handling never blocks indefinitely
- // because RPCs are processed on background goroutines launched by handler.
- //
- // The entry points for incoming messages are:
- //
- // h.handleMsg(message)
- // h.handleBatch(message)
- //
- // Outgoing calls use the requestOp struct. Register the request before sending it
- // on the connection:
- //
- // op := &requestOp{ids: ...}
- // h.addRequestOp(op)
- //
- // Now send the request, then wait for the reply to be delivered through handleMsg:
- //
- // if err := op.wait(...); err != nil {
- // h.removeRequestOp(op) // timeout, etc.
- // }
- //
- type handler struct {
- reg *serviceRegistry
- unsubscribeCb *callback
- idgen func() ID // subscription ID generator
- respWait map[string]*requestOp // active client requests
- clientSubs map[string]*ClientSubscription // active client subscriptions
- callWG sync.WaitGroup // pending call goroutines
- rootCtx context.Context // canceled by close()
- cancelRoot func() // cancel function for rootCtx
- conn jsonWriter // where responses will be sent
- log log.Logger
- allowSubscribe bool
- subLock sync.Mutex
- serverSubs map[ID]*Subscription
- }
- type callProc struct {
- ctx context.Context
- notifiers []*Notifier
- }
- func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
- rootCtx, cancelRoot := context.WithCancel(connCtx)
- h := &handler{
- reg: reg,
- idgen: idgen,
- conn: conn,
- respWait: make(map[string]*requestOp),
- clientSubs: make(map[string]*ClientSubscription),
- rootCtx: rootCtx,
- cancelRoot: cancelRoot,
- allowSubscribe: true,
- serverSubs: make(map[ID]*Subscription),
- log: log.Root(),
- }
- if conn.remoteAddr() != "" {
- h.log = h.log.New("conn", conn.remoteAddr())
- }
- h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
- return h
- }
- // handleBatch executes all messages in a batch and returns the responses.
- func (h *handler) handleBatch(ctx context.Context, msgs []*jsonrpcMessage) {
- // Emit error response for empty batches:
- if len(msgs) == 0 {
- h.startCallProc(func(cp *callProc) {
- h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
- })
- return
- }
- // Handle non-call messages first:
- calls := make([]*jsonrpcMessage, 0, len(msgs))
- for _, msg := range msgs {
- if handled := h.handleImmediate(msg); !handled {
- calls = append(calls, msg)
- }
- }
- if len(calls) == 0 {
- return
- }
- // Process calls on a goroutine because they may block indefinitely:
- h.startCallProc(func(cp *callProc) {
- answers := make([]*jsonrpcMessage, 0, len(msgs))
- for _, msg := range calls {
- if answer := h.handleCallMsg(cp, ctx, msg); answer != nil {
- answers = append(answers, answer)
- }
- }
- h.addSubscriptions(cp.notifiers)
- if len(answers) > 0 {
- h.conn.writeJSON(cp.ctx, answers)
- }
- for _, n := range cp.notifiers {
- n.activate()
- }
- })
- }
- // handleMsg handles a single message.
- func (h *handler) handleMsg(ctx context.Context, msg *jsonrpcMessage) {
- if ok := h.handleImmediate(msg); ok {
- return
- }
- h.startCallProc(func(cp *callProc) {
- answer := h.handleCallMsg(cp, ctx, msg)
- h.addSubscriptions(cp.notifiers)
- if answer != nil {
- h.conn.writeJSON(cp.ctx, answer)
- }
- for _, n := range cp.notifiers {
- n.activate()
- }
- })
- }
- // close cancels all requests except for inflightReq and waits for
- // call goroutines to shut down.
- func (h *handler) close(err error, inflightReq *requestOp) {
- h.cancelAllRequests(err, inflightReq)
- h.callWG.Wait()
- h.cancelRoot()
- h.cancelServerSubscriptions(err)
- }
- // addRequestOp registers a request operation.
- func (h *handler) addRequestOp(op *requestOp) {
- for _, id := range op.ids {
- h.respWait[string(id)] = op
- }
- }
- // removeRequestOps stops waiting for the given request IDs.
- func (h *handler) removeRequestOp(op *requestOp) {
- for _, id := range op.ids {
- delete(h.respWait, string(id))
- }
- }
- // cancelAllRequests unblocks and removes pending requests and active subscriptions.
- func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
- didClose := make(map[*requestOp]bool)
- if inflightReq != nil {
- didClose[inflightReq] = true
- }
- for id, op := range h.respWait {
- // Remove the op so that later calls will not close op.resp again.
- delete(h.respWait, id)
- if !didClose[op] {
- op.err = err
- close(op.resp)
- didClose[op] = true
- }
- }
- for id, sub := range h.clientSubs {
- delete(h.clientSubs, id)
- sub.close(err)
- }
- }
- func (h *handler) addSubscriptions(nn []*Notifier) {
- h.subLock.Lock()
- defer h.subLock.Unlock()
- for _, n := range nn {
- if sub := n.takeSubscription(); sub != nil {
- h.serverSubs[sub.ID] = sub
- }
- }
- }
- // cancelServerSubscriptions removes all subscriptions and closes their error channels.
- func (h *handler) cancelServerSubscriptions(err error) {
- h.subLock.Lock()
- defer h.subLock.Unlock()
- for id, s := range h.serverSubs {
- s.err <- err
- close(s.err)
- delete(h.serverSubs, id)
- }
- }
- // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
- func (h *handler) startCallProc(fn func(*callProc)) {
- h.callWG.Add(1)
- gopool.Submit(func() {
- ctx, cancel := context.WithCancel(h.rootCtx)
- defer h.callWG.Done()
- defer cancel()
- fn(&callProc{ctx: ctx})
- })
- }
- // handleImmediate executes non-call messages. It returns false if the message is a
- // call or requires a reply.
- func (h *handler) handleImmediate(msg *jsonrpcMessage) bool {
- start := time.Now()
- switch {
- case msg.isNotification():
- if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
- h.handleSubscriptionResult(msg)
- return true
- }
- return false
- case msg.isResponse():
- h.handleResponse(msg)
- h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start))
- return true
- default:
- return false
- }
- }
- // handleSubscriptionResult processes subscription notifications.
- func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
- var result subscriptionResult
- if err := json.Unmarshal(msg.Params, &result); err != nil {
- h.log.Debug("Dropping invalid subscription message")
- return
- }
- if h.clientSubs[result.ID] != nil {
- h.clientSubs[result.ID].deliver(result.Result)
- }
- }
- // handleResponse processes method call responses.
- func (h *handler) handleResponse(msg *jsonrpcMessage) {
- op := h.respWait[string(msg.ID)]
- if op == nil {
- h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID})
- return
- }
- delete(h.respWait, string(msg.ID))
- // For normal responses, just forward the reply to Call/BatchCall.
- if op.sub == nil {
- op.resp <- msg
- return
- }
- // For subscription responses, start the subscription if the server
- // indicates success. EthSubscribe gets unblocked in either case through
- // the op.resp channel.
- defer close(op.resp)
- if msg.Error != nil {
- op.err = msg.Error
- return
- }
- if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
- go op.sub.run()
- h.clientSubs[op.sub.subid] = op.sub
- }
- }
- // handleCallMsg executes a call message and returns the answer.
- func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *jsonrpcMessage) *jsonrpcMessage {
- start := time.Now()
- switch {
- case msg.isNotification():
- h.handleCall(ctx, msg)
- h.log.Debug("Served "+msg.Method, "t", time.Since(start))
- return nil
- case msg.isCall():
- resp := h.handleCall(ctx, msg)
- var ctx []interface{}
- ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start))
- if resp.Error != nil {
- xForward := reqCtx.Value("X-Forwarded-For")
- h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message, "X-Forwarded-For", xForward)
- ctx = append(ctx, "err", resp.Error.Message)
- if resp.Error.Data != nil {
- ctx = append(ctx, "errdata", resp.Error.Data)
- }
- h.log.Warn("Served "+msg.Method, ctx...)
- } else {
- h.log.Debug("Served "+msg.Method, ctx...)
- }
- return resp
- case msg.hasValidID():
- return msg.errorResponse(&invalidRequestError{"invalid request"})
- default:
- return errorMessage(&invalidRequestError{"invalid request"})
- }
- }
- // handleCall processes method calls.
- func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
- if msg.isSubscribe() {
- return h.handleSubscribe(cp, msg)
- }
- var callb *callback
- if msg.isUnsubscribe() {
- callb = h.unsubscribeCb
- } else {
- callb = h.reg.callback(msg.Method)
- }
- if callb == nil {
- return msg.errorResponse(&methodNotFoundError{method: msg.Method})
- }
- args, err := parsePositionalArguments(msg.Params, callb.argTypes)
- if err != nil {
- return msg.errorResponse(&invalidParamsError{err.Error()})
- }
- start := time.Now()
- answer := h.runMethod(cp.ctx, msg, callb, args)
- // Collect the statistics for RPC calls if metrics is enabled.
- // We only care about pure rpc call. Filter out subscription.
- if callb != h.unsubscribeCb {
- rpcRequestGauge.Inc(1)
- if answer.Error != nil {
- failedReqeustGauge.Inc(1)
- } else {
- successfulRequestGauge.Inc(1)
- }
- RpcServingTimer.UpdateSince(start)
- newRPCRequestGauge(msg.Method).Inc(1)
- newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start)
- }
- return answer
- }
- // handleSubscribe processes *_subscribe method calls.
- func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
- if !h.allowSubscribe {
- return msg.errorResponse(ErrNotificationsUnsupported)
- }
- // Subscription method name is first argument.
- name, err := parseSubscriptionName(msg.Params)
- if err != nil {
- return msg.errorResponse(&invalidParamsError{err.Error()})
- }
- namespace := msg.namespace()
- callb := h.reg.subscription(namespace, name)
- if callb == nil {
- return msg.errorResponse(&subscriptionNotFoundError{namespace, name})
- }
- // Parse subscription name arg too, but remove it before calling the callback.
- argTypes := append([]reflect.Type{stringType}, callb.argTypes...)
- args, err := parsePositionalArguments(msg.Params, argTypes)
- if err != nil {
- return msg.errorResponse(&invalidParamsError{err.Error()})
- }
- args = args[1:]
- // Install notifier in context so the subscription handler can find it.
- n := &Notifier{h: h, namespace: namespace}
- cp.notifiers = append(cp.notifiers, n)
- ctx := context.WithValue(cp.ctx, notifierKey{}, n)
- return h.runMethod(ctx, msg, callb, args)
- }
- // runMethod runs the Go callback for an RPC method.
- func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
- result, err := callb.call(ctx, msg.Method, args)
- if err != nil {
- return msg.errorResponse(err)
- }
- return msg.response(result)
- }
- // unsubscribe is the callback function for all *_unsubscribe calls.
- func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) {
- h.subLock.Lock()
- defer h.subLock.Unlock()
- s := h.serverSubs[id]
- if s == nil {
- return false, ErrSubscriptionNotFound
- }
- close(s.err)
- delete(h.serverSubs, id)
- return true, nil
- }
- type idForLog struct{ json.RawMessage }
- func (id idForLog) String() string {
- if s, err := strconv.Unquote(string(id.RawMessage)); err == nil {
- return s
- }
- return string(id.RawMessage)
- }
|