handler.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "encoding/json"
  20. "reflect"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common/gopool"
  26. "github.com/ethereum/go-ethereum/log"
  27. )
  28. // handler handles JSON-RPC messages. There is one handler per connection. Note that
  29. // handler is not safe for concurrent use. Message handling never blocks indefinitely
  30. // because RPCs are processed on background goroutines launched by handler.
  31. //
  32. // The entry points for incoming messages are:
  33. //
  34. // h.handleMsg(message)
  35. // h.handleBatch(message)
  36. //
  37. // Outgoing calls use the requestOp struct. Register the request before sending it
  38. // on the connection:
  39. //
  40. // op := &requestOp{ids: ...}
  41. // h.addRequestOp(op)
  42. //
  43. // Now send the request, then wait for the reply to be delivered through handleMsg:
  44. //
  45. // if err := op.wait(...); err != nil {
  46. // h.removeRequestOp(op) // timeout, etc.
  47. // }
  48. //
  49. type handler struct {
  50. reg *serviceRegistry
  51. unsubscribeCb *callback
  52. idgen func() ID // subscription ID generator
  53. respWait map[string]*requestOp // active client requests
  54. clientSubs map[string]*ClientSubscription // active client subscriptions
  55. callWG sync.WaitGroup // pending call goroutines
  56. rootCtx context.Context // canceled by close()
  57. cancelRoot func() // cancel function for rootCtx
  58. conn jsonWriter // where responses will be sent
  59. log log.Logger
  60. allowSubscribe bool
  61. subLock sync.Mutex
  62. serverSubs map[ID]*Subscription
  63. }
  64. type callProc struct {
  65. ctx context.Context
  66. notifiers []*Notifier
  67. }
  68. func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
  69. rootCtx, cancelRoot := context.WithCancel(connCtx)
  70. h := &handler{
  71. reg: reg,
  72. idgen: idgen,
  73. conn: conn,
  74. respWait: make(map[string]*requestOp),
  75. clientSubs: make(map[string]*ClientSubscription),
  76. rootCtx: rootCtx,
  77. cancelRoot: cancelRoot,
  78. allowSubscribe: true,
  79. serverSubs: make(map[ID]*Subscription),
  80. log: log.Root(),
  81. }
  82. if conn.remoteAddr() != "" {
  83. h.log = h.log.New("conn", conn.remoteAddr())
  84. }
  85. h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
  86. return h
  87. }
  88. // handleBatch executes all messages in a batch and returns the responses.
  89. func (h *handler) handleBatch(ctx context.Context, msgs []*jsonrpcMessage) {
  90. // Emit error response for empty batches:
  91. if len(msgs) == 0 {
  92. h.startCallProc(func(cp *callProc) {
  93. h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
  94. })
  95. return
  96. }
  97. // Handle non-call messages first:
  98. calls := make([]*jsonrpcMessage, 0, len(msgs))
  99. for _, msg := range msgs {
  100. if handled := h.handleImmediate(msg); !handled {
  101. calls = append(calls, msg)
  102. }
  103. }
  104. if len(calls) == 0 {
  105. return
  106. }
  107. // Process calls on a goroutine because they may block indefinitely:
  108. h.startCallProc(func(cp *callProc) {
  109. answers := make([]*jsonrpcMessage, 0, len(msgs))
  110. for _, msg := range calls {
  111. if answer := h.handleCallMsg(cp, ctx, msg); answer != nil {
  112. answers = append(answers, answer)
  113. }
  114. }
  115. h.addSubscriptions(cp.notifiers)
  116. if len(answers) > 0 {
  117. h.conn.writeJSON(cp.ctx, answers)
  118. }
  119. for _, n := range cp.notifiers {
  120. n.activate()
  121. }
  122. })
  123. }
  124. // handleMsg handles a single message.
  125. func (h *handler) handleMsg(ctx context.Context, msg *jsonrpcMessage) {
  126. if ok := h.handleImmediate(msg); ok {
  127. return
  128. }
  129. h.startCallProc(func(cp *callProc) {
  130. answer := h.handleCallMsg(cp, ctx, msg)
  131. h.addSubscriptions(cp.notifiers)
  132. if answer != nil {
  133. h.conn.writeJSON(cp.ctx, answer)
  134. }
  135. for _, n := range cp.notifiers {
  136. n.activate()
  137. }
  138. })
  139. }
  140. // close cancels all requests except for inflightReq and waits for
  141. // call goroutines to shut down.
  142. func (h *handler) close(err error, inflightReq *requestOp) {
  143. h.cancelAllRequests(err, inflightReq)
  144. h.callWG.Wait()
  145. h.cancelRoot()
  146. h.cancelServerSubscriptions(err)
  147. }
  148. // addRequestOp registers a request operation.
  149. func (h *handler) addRequestOp(op *requestOp) {
  150. for _, id := range op.ids {
  151. h.respWait[string(id)] = op
  152. }
  153. }
  154. // removeRequestOps stops waiting for the given request IDs.
  155. func (h *handler) removeRequestOp(op *requestOp) {
  156. for _, id := range op.ids {
  157. delete(h.respWait, string(id))
  158. }
  159. }
  160. // cancelAllRequests unblocks and removes pending requests and active subscriptions.
  161. func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
  162. didClose := make(map[*requestOp]bool)
  163. if inflightReq != nil {
  164. didClose[inflightReq] = true
  165. }
  166. for id, op := range h.respWait {
  167. // Remove the op so that later calls will not close op.resp again.
  168. delete(h.respWait, id)
  169. if !didClose[op] {
  170. op.err = err
  171. close(op.resp)
  172. didClose[op] = true
  173. }
  174. }
  175. for id, sub := range h.clientSubs {
  176. delete(h.clientSubs, id)
  177. sub.close(err)
  178. }
  179. }
  180. func (h *handler) addSubscriptions(nn []*Notifier) {
  181. h.subLock.Lock()
  182. defer h.subLock.Unlock()
  183. for _, n := range nn {
  184. if sub := n.takeSubscription(); sub != nil {
  185. h.serverSubs[sub.ID] = sub
  186. }
  187. }
  188. }
  189. // cancelServerSubscriptions removes all subscriptions and closes their error channels.
  190. func (h *handler) cancelServerSubscriptions(err error) {
  191. h.subLock.Lock()
  192. defer h.subLock.Unlock()
  193. for id, s := range h.serverSubs {
  194. s.err <- err
  195. close(s.err)
  196. delete(h.serverSubs, id)
  197. }
  198. }
  199. // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
  200. func (h *handler) startCallProc(fn func(*callProc)) {
  201. h.callWG.Add(1)
  202. gopool.Submit(func() {
  203. ctx, cancel := context.WithCancel(h.rootCtx)
  204. defer h.callWG.Done()
  205. defer cancel()
  206. fn(&callProc{ctx: ctx})
  207. })
  208. }
  209. // handleImmediate executes non-call messages. It returns false if the message is a
  210. // call or requires a reply.
  211. func (h *handler) handleImmediate(msg *jsonrpcMessage) bool {
  212. start := time.Now()
  213. switch {
  214. case msg.isNotification():
  215. if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
  216. h.handleSubscriptionResult(msg)
  217. return true
  218. }
  219. return false
  220. case msg.isResponse():
  221. h.handleResponse(msg)
  222. h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start))
  223. return true
  224. default:
  225. return false
  226. }
  227. }
  228. // handleSubscriptionResult processes subscription notifications.
  229. func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
  230. var result subscriptionResult
  231. if err := json.Unmarshal(msg.Params, &result); err != nil {
  232. h.log.Debug("Dropping invalid subscription message")
  233. return
  234. }
  235. if h.clientSubs[result.ID] != nil {
  236. h.clientSubs[result.ID].deliver(result.Result)
  237. }
  238. }
  239. // handleResponse processes method call responses.
  240. func (h *handler) handleResponse(msg *jsonrpcMessage) {
  241. op := h.respWait[string(msg.ID)]
  242. if op == nil {
  243. h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID})
  244. return
  245. }
  246. delete(h.respWait, string(msg.ID))
  247. // For normal responses, just forward the reply to Call/BatchCall.
  248. if op.sub == nil {
  249. op.resp <- msg
  250. return
  251. }
  252. // For subscription responses, start the subscription if the server
  253. // indicates success. EthSubscribe gets unblocked in either case through
  254. // the op.resp channel.
  255. defer close(op.resp)
  256. if msg.Error != nil {
  257. op.err = msg.Error
  258. return
  259. }
  260. if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
  261. go op.sub.run()
  262. h.clientSubs[op.sub.subid] = op.sub
  263. }
  264. }
  265. // handleCallMsg executes a call message and returns the answer.
  266. func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *jsonrpcMessage) *jsonrpcMessage {
  267. start := time.Now()
  268. switch {
  269. case msg.isNotification():
  270. h.handleCall(ctx, msg)
  271. h.log.Debug("Served "+msg.Method, "t", time.Since(start))
  272. return nil
  273. case msg.isCall():
  274. resp := h.handleCall(ctx, msg)
  275. var ctx []interface{}
  276. ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start))
  277. if resp.Error != nil {
  278. xForward := reqCtx.Value("X-Forwarded-For")
  279. h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message, "X-Forwarded-For", xForward)
  280. ctx = append(ctx, "err", resp.Error.Message)
  281. if resp.Error.Data != nil {
  282. ctx = append(ctx, "errdata", resp.Error.Data)
  283. }
  284. h.log.Warn("Served "+msg.Method, ctx...)
  285. } else {
  286. h.log.Debug("Served "+msg.Method, ctx...)
  287. }
  288. return resp
  289. case msg.hasValidID():
  290. return msg.errorResponse(&invalidRequestError{"invalid request"})
  291. default:
  292. return errorMessage(&invalidRequestError{"invalid request"})
  293. }
  294. }
  295. // handleCall processes method calls.
  296. func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
  297. if msg.isSubscribe() {
  298. return h.handleSubscribe(cp, msg)
  299. }
  300. var callb *callback
  301. if msg.isUnsubscribe() {
  302. callb = h.unsubscribeCb
  303. } else {
  304. callb = h.reg.callback(msg.Method)
  305. }
  306. if callb == nil {
  307. return msg.errorResponse(&methodNotFoundError{method: msg.Method})
  308. }
  309. args, err := parsePositionalArguments(msg.Params, callb.argTypes)
  310. if err != nil {
  311. return msg.errorResponse(&invalidParamsError{err.Error()})
  312. }
  313. start := time.Now()
  314. answer := h.runMethod(cp.ctx, msg, callb, args)
  315. // Collect the statistics for RPC calls if metrics is enabled.
  316. // We only care about pure rpc call. Filter out subscription.
  317. if callb != h.unsubscribeCb {
  318. rpcRequestGauge.Inc(1)
  319. if answer.Error != nil {
  320. failedReqeustGauge.Inc(1)
  321. } else {
  322. successfulRequestGauge.Inc(1)
  323. }
  324. RpcServingTimer.UpdateSince(start)
  325. newRPCRequestGauge(msg.Method).Inc(1)
  326. newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start)
  327. }
  328. return answer
  329. }
  330. // handleSubscribe processes *_subscribe method calls.
  331. func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
  332. if !h.allowSubscribe {
  333. return msg.errorResponse(ErrNotificationsUnsupported)
  334. }
  335. // Subscription method name is first argument.
  336. name, err := parseSubscriptionName(msg.Params)
  337. if err != nil {
  338. return msg.errorResponse(&invalidParamsError{err.Error()})
  339. }
  340. namespace := msg.namespace()
  341. callb := h.reg.subscription(namespace, name)
  342. if callb == nil {
  343. return msg.errorResponse(&subscriptionNotFoundError{namespace, name})
  344. }
  345. // Parse subscription name arg too, but remove it before calling the callback.
  346. argTypes := append([]reflect.Type{stringType}, callb.argTypes...)
  347. args, err := parsePositionalArguments(msg.Params, argTypes)
  348. if err != nil {
  349. return msg.errorResponse(&invalidParamsError{err.Error()})
  350. }
  351. args = args[1:]
  352. // Install notifier in context so the subscription handler can find it.
  353. n := &Notifier{h: h, namespace: namespace}
  354. cp.notifiers = append(cp.notifiers, n)
  355. ctx := context.WithValue(cp.ctx, notifierKey{}, n)
  356. return h.runMethod(ctx, msg, callb, args)
  357. }
  358. // runMethod runs the Go callback for an RPC method.
  359. func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
  360. result, err := callb.call(ctx, msg.Method, args)
  361. if err != nil {
  362. return msg.errorResponse(err)
  363. }
  364. return msg.response(result)
  365. }
  366. // unsubscribe is the callback function for all *_unsubscribe calls.
  367. func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) {
  368. h.subLock.Lock()
  369. defer h.subLock.Unlock()
  370. s := h.serverSubs[id]
  371. if s == nil {
  372. return false, ErrSubscriptionNotFound
  373. }
  374. close(s.err)
  375. delete(h.serverSubs, id)
  376. return true, nil
  377. }
  378. type idForLog struct{ json.RawMessage }
  379. func (id idForLog) String() string {
  380. if s, err := strconv.Unquote(string(id.RawMessage)); err == nil {
  381. return s
  382. }
  383. return string(id.RawMessage)
  384. }