handler.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "encoding/json"
  20. "reflect"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/log"
  26. )
  27. // handler handles JSON-RPC messages. There is one handler per connection. Note that
  28. // handler is not safe for concurrent use. Message handling never blocks indefinitely
  29. // because RPCs are processed on background goroutines launched by handler.
  30. //
  31. // The entry points for incoming messages are:
  32. //
  33. // h.handleMsg(message)
  34. // h.handleBatch(message)
  35. //
  36. // Outgoing calls use the requestOp struct. Register the request before sending it
  37. // on the connection:
  38. //
  39. // op := &requestOp{ids: ...}
  40. // h.addRequestOp(op)
  41. //
  42. // Now send the request, then wait for the reply to be delivered through handleMsg:
  43. //
  44. // if err := op.wait(...); err != nil {
  45. // h.removeRequestOp(op) // timeout, etc.
  46. // }
  47. //
  48. type handler struct {
  49. reg *serviceRegistry
  50. unsubscribeCb *callback
  51. idgen func() ID // subscription ID generator
  52. respWait map[string]*requestOp // active client requests
  53. clientSubs map[string]*ClientSubscription // active client subscriptions
  54. callWG sync.WaitGroup // pending call goroutines
  55. rootCtx context.Context // canceled by close()
  56. cancelRoot func() // cancel function for rootCtx
  57. conn jsonWriter // where responses will be sent
  58. log log.Logger
  59. allowSubscribe bool
  60. subLock sync.Mutex
  61. serverSubs map[ID]*Subscription
  62. }
  63. type callProc struct {
  64. ctx context.Context
  65. notifiers []*Notifier
  66. }
  67. func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
  68. rootCtx, cancelRoot := context.WithCancel(connCtx)
  69. h := &handler{
  70. reg: reg,
  71. idgen: idgen,
  72. conn: conn,
  73. respWait: make(map[string]*requestOp),
  74. clientSubs: make(map[string]*ClientSubscription),
  75. rootCtx: rootCtx,
  76. cancelRoot: cancelRoot,
  77. allowSubscribe: true,
  78. serverSubs: make(map[ID]*Subscription),
  79. log: log.Root(),
  80. }
  81. if conn.RemoteAddr() != "" {
  82. h.log = h.log.New("conn", conn.RemoteAddr())
  83. }
  84. h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
  85. return h
  86. }
  87. // handleBatch executes all messages in a batch and returns the responses.
  88. func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
  89. // Emit error response for empty batches:
  90. if len(msgs) == 0 {
  91. h.startCallProc(func(cp *callProc) {
  92. h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
  93. })
  94. return
  95. }
  96. // Handle non-call messages first:
  97. calls := make([]*jsonrpcMessage, 0, len(msgs))
  98. for _, msg := range msgs {
  99. if handled := h.handleImmediate(msg); !handled {
  100. calls = append(calls, msg)
  101. }
  102. }
  103. if len(calls) == 0 {
  104. return
  105. }
  106. // Process calls on a goroutine because they may block indefinitely:
  107. h.startCallProc(func(cp *callProc) {
  108. answers := make([]*jsonrpcMessage, 0, len(msgs))
  109. for _, msg := range calls {
  110. if answer := h.handleCallMsg(cp, msg); answer != nil {
  111. answers = append(answers, answer)
  112. }
  113. }
  114. h.addSubscriptions(cp.notifiers)
  115. if len(answers) > 0 {
  116. h.conn.Write(cp.ctx, answers)
  117. }
  118. for _, n := range cp.notifiers {
  119. n.activate()
  120. }
  121. })
  122. }
  123. // handleMsg handles a single message.
  124. func (h *handler) handleMsg(msg *jsonrpcMessage) {
  125. if ok := h.handleImmediate(msg); ok {
  126. return
  127. }
  128. h.startCallProc(func(cp *callProc) {
  129. answer := h.handleCallMsg(cp, msg)
  130. h.addSubscriptions(cp.notifiers)
  131. if answer != nil {
  132. h.conn.Write(cp.ctx, answer)
  133. }
  134. for _, n := range cp.notifiers {
  135. n.activate()
  136. }
  137. })
  138. }
  139. // close cancels all requests except for inflightReq and waits for
  140. // call goroutines to shut down.
  141. func (h *handler) close(err error, inflightReq *requestOp) {
  142. h.cancelAllRequests(err, inflightReq)
  143. h.callWG.Wait()
  144. h.cancelRoot()
  145. h.cancelServerSubscriptions(err)
  146. }
  147. // addRequestOp registers a request operation.
  148. func (h *handler) addRequestOp(op *requestOp) {
  149. for _, id := range op.ids {
  150. h.respWait[string(id)] = op
  151. }
  152. }
  153. // removeRequestOps stops waiting for the given request IDs.
  154. func (h *handler) removeRequestOp(op *requestOp) {
  155. for _, id := range op.ids {
  156. delete(h.respWait, string(id))
  157. }
  158. }
  159. // cancelAllRequests unblocks and removes pending requests and active subscriptions.
  160. func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
  161. didClose := make(map[*requestOp]bool)
  162. if inflightReq != nil {
  163. didClose[inflightReq] = true
  164. }
  165. for id, op := range h.respWait {
  166. // Remove the op so that later calls will not close op.resp again.
  167. delete(h.respWait, id)
  168. if !didClose[op] {
  169. op.err = err
  170. close(op.resp)
  171. didClose[op] = true
  172. }
  173. }
  174. for id, sub := range h.clientSubs {
  175. delete(h.clientSubs, id)
  176. sub.quitWithError(err, false)
  177. }
  178. }
  179. func (h *handler) addSubscriptions(nn []*Notifier) {
  180. h.subLock.Lock()
  181. defer h.subLock.Unlock()
  182. for _, n := range nn {
  183. if sub := n.takeSubscription(); sub != nil {
  184. h.serverSubs[sub.ID] = sub
  185. }
  186. }
  187. }
  188. // cancelServerSubscriptions removes all subscriptions and closes their error channels.
  189. func (h *handler) cancelServerSubscriptions(err error) {
  190. h.subLock.Lock()
  191. defer h.subLock.Unlock()
  192. for id, s := range h.serverSubs {
  193. s.err <- err
  194. close(s.err)
  195. delete(h.serverSubs, id)
  196. }
  197. }
  198. // startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
  199. func (h *handler) startCallProc(fn func(*callProc)) {
  200. h.callWG.Add(1)
  201. go func() {
  202. ctx, cancel := context.WithCancel(h.rootCtx)
  203. defer h.callWG.Done()
  204. defer cancel()
  205. fn(&callProc{ctx: ctx})
  206. }()
  207. }
  208. // handleImmediate executes non-call messages. It returns false if the message is a
  209. // call or requires a reply.
  210. func (h *handler) handleImmediate(msg *jsonrpcMessage) bool {
  211. start := time.Now()
  212. switch {
  213. case msg.isNotification():
  214. if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
  215. h.handleSubscriptionResult(msg)
  216. return true
  217. }
  218. return false
  219. case msg.isResponse():
  220. h.handleResponse(msg)
  221. h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start))
  222. return true
  223. default:
  224. return false
  225. }
  226. }
  227. // handleSubscriptionResult processes subscription notifications.
  228. func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
  229. var result subscriptionResult
  230. if err := json.Unmarshal(msg.Params, &result); err != nil {
  231. h.log.Debug("Dropping invalid subscription message")
  232. return
  233. }
  234. if h.clientSubs[result.ID] != nil {
  235. h.clientSubs[result.ID].deliver(result.Result)
  236. }
  237. }
  238. // handleResponse processes method call responses.
  239. func (h *handler) handleResponse(msg *jsonrpcMessage) {
  240. op := h.respWait[string(msg.ID)]
  241. if op == nil {
  242. h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID})
  243. return
  244. }
  245. delete(h.respWait, string(msg.ID))
  246. // For normal responses, just forward the reply to Call/BatchCall.
  247. if op.sub == nil {
  248. op.resp <- msg
  249. return
  250. }
  251. // For subscription responses, start the subscription if the server
  252. // indicates success. EthSubscribe gets unblocked in either case through
  253. // the op.resp channel.
  254. defer close(op.resp)
  255. if msg.Error != nil {
  256. op.err = msg.Error
  257. return
  258. }
  259. if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
  260. go op.sub.start()
  261. h.clientSubs[op.sub.subid] = op.sub
  262. }
  263. }
  264. // handleCallMsg executes a call message and returns the answer.
  265. func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
  266. start := time.Now()
  267. switch {
  268. case msg.isNotification():
  269. h.handleCall(ctx, msg)
  270. h.log.Debug("Served "+msg.Method, "t", time.Since(start))
  271. return nil
  272. case msg.isCall():
  273. resp := h.handleCall(ctx, msg)
  274. if resp.Error != nil {
  275. h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message)
  276. } else {
  277. h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start))
  278. }
  279. return resp
  280. case msg.hasValidID():
  281. return msg.errorResponse(&invalidRequestError{"invalid request"})
  282. default:
  283. return errorMessage(&invalidRequestError{"invalid request"})
  284. }
  285. }
  286. // handleCall processes method calls.
  287. func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
  288. if msg.isSubscribe() {
  289. return h.handleSubscribe(cp, msg)
  290. }
  291. var callb *callback
  292. if msg.isUnsubscribe() {
  293. callb = h.unsubscribeCb
  294. } else {
  295. callb = h.reg.callback(msg.Method)
  296. }
  297. if callb == nil {
  298. return msg.errorResponse(&methodNotFoundError{method: msg.Method})
  299. }
  300. args, err := parsePositionalArguments(msg.Params, callb.argTypes)
  301. if err != nil {
  302. return msg.errorResponse(&invalidParamsError{err.Error()})
  303. }
  304. return h.runMethod(cp.ctx, msg, callb, args)
  305. }
  306. // handleSubscribe processes *_subscribe method calls.
  307. func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
  308. if !h.allowSubscribe {
  309. return msg.errorResponse(ErrNotificationsUnsupported)
  310. }
  311. // Subscription method name is first argument.
  312. name, err := parseSubscriptionName(msg.Params)
  313. if err != nil {
  314. return msg.errorResponse(&invalidParamsError{err.Error()})
  315. }
  316. namespace := msg.namespace()
  317. callb := h.reg.subscription(namespace, name)
  318. if callb == nil {
  319. return msg.errorResponse(&subscriptionNotFoundError{namespace, name})
  320. }
  321. // Parse subscription name arg too, but remove it before calling the callback.
  322. argTypes := append([]reflect.Type{stringType}, callb.argTypes...)
  323. args, err := parsePositionalArguments(msg.Params, argTypes)
  324. if err != nil {
  325. return msg.errorResponse(&invalidParamsError{err.Error()})
  326. }
  327. args = args[1:]
  328. // Install notifier in context so the subscription handler can find it.
  329. n := &Notifier{h: h, namespace: namespace}
  330. cp.notifiers = append(cp.notifiers, n)
  331. ctx := context.WithValue(cp.ctx, notifierKey{}, n)
  332. return h.runMethod(ctx, msg, callb, args)
  333. }
  334. // runMethod runs the Go callback for an RPC method.
  335. func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
  336. result, err := callb.call(ctx, msg.Method, args)
  337. if err != nil {
  338. return msg.errorResponse(err)
  339. }
  340. return msg.response(result)
  341. }
  342. // unsubscribe is the callback function for all *_unsubscribe calls.
  343. func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) {
  344. h.subLock.Lock()
  345. defer h.subLock.Unlock()
  346. s := h.serverSubs[id]
  347. if s == nil {
  348. return false, ErrSubscriptionNotFound
  349. }
  350. close(s.err)
  351. delete(h.serverSubs, id)
  352. return true, nil
  353. }
  354. type idForLog struct{ json.RawMessage }
  355. func (id idForLog) String() string {
  356. if s, err := strconv.Unquote(string(id.RawMessage)); err == nil {
  357. return s
  358. }
  359. return string(id.RawMessage)
  360. }