client.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/json"
  21. "errors"
  22. "fmt"
  23. "net/url"
  24. "reflect"
  25. "strconv"
  26. "sync/atomic"
  27. "time"
  28. "github.com/ethereum/go-ethereum/log"
  29. )
  30. var (
  31. ErrClientQuit = errors.New("client is closed")
  32. ErrNoResult = errors.New("no result in JSON-RPC response")
  33. ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")
  34. errClientReconnected = errors.New("client reconnected")
  35. errDead = errors.New("connection lost")
  36. )
  37. const (
  38. // Timeouts
  39. tcpKeepAliveInterval = 30 * time.Second
  40. defaultDialTimeout = 10 * time.Second // used if context has no deadline
  41. subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
  42. )
  43. const (
  44. // Subscriptions are removed when the subscriber cannot keep up.
  45. //
  46. // This can be worked around by supplying a channel with sufficiently sized buffer,
  47. // but this can be inconvenient and hard to explain in the docs. Another issue with
  48. // buffered channels is that the buffer is static even though it might not be needed
  49. // most of the time.
  50. //
  51. // The approach taken here is to maintain a per-subscription linked list buffer
  52. // shrinks on demand. If the buffer reaches the size below, the subscription is
  53. // dropped.
  54. maxClientSubscriptionBuffer = 20000
  55. )
  56. // BatchElem is an element in a batch request.
  57. type BatchElem struct {
  58. Method string
  59. Args []interface{}
  60. // The result is unmarshaled into this field. Result must be set to a
  61. // non-nil pointer value of the desired type, otherwise the response will be
  62. // discarded.
  63. Result interface{}
  64. // Error is set if the server returns an error for this request, or if
  65. // unmarshaling into Result fails. It is not set for I/O errors.
  66. Error error
  67. }
  68. // Client represents a connection to an RPC server.
  69. type Client struct {
  70. idgen func() ID // for subscriptions
  71. isHTTP bool
  72. services *serviceRegistry
  73. idCounter uint32
  74. // This function, if non-nil, is called when the connection is lost.
  75. reconnectFunc reconnectFunc
  76. // writeConn is used for writing to the connection on the caller's goroutine. It should
  77. // only be accessed outside of dispatch, with the write lock held. The write lock is
  78. // taken by sending on requestOp and released by sending on sendDone.
  79. writeConn jsonWriter
  80. // for dispatch
  81. close chan struct{}
  82. closing chan struct{} // closed when client is quitting
  83. didClose chan struct{} // closed when client quits
  84. reconnected chan ServerCodec // where write/reconnect sends the new connection
  85. readOp chan readOp // read messages
  86. readErr chan error // errors from read
  87. reqInit chan *requestOp // register response IDs, takes write lock
  88. reqSent chan error // signals write completion, releases write lock
  89. reqTimeout chan *requestOp // removes response IDs when call timeout expires
  90. }
  91. type reconnectFunc func(ctx context.Context) (ServerCodec, error)
  92. type clientContextKey struct{}
  93. type clientConn struct {
  94. codec ServerCodec
  95. handler *handler
  96. }
  97. func (c *Client) newClientConn(conn ServerCodec) *clientConn {
  98. ctx := context.WithValue(context.Background(), clientContextKey{}, c)
  99. handler := newHandler(ctx, conn, c.idgen, c.services)
  100. return &clientConn{conn, handler}
  101. }
  102. func (cc *clientConn) close(err error, inflightReq *requestOp) {
  103. cc.handler.close(err, inflightReq)
  104. cc.codec.Close()
  105. }
  106. type readOp struct {
  107. msgs []*jsonrpcMessage
  108. batch bool
  109. }
  110. type requestOp struct {
  111. ids []json.RawMessage
  112. err error
  113. resp chan *jsonrpcMessage // receives up to len(ids) responses
  114. sub *ClientSubscription // only set for EthSubscribe requests
  115. }
  116. func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
  117. select {
  118. case <-ctx.Done():
  119. // Send the timeout to dispatch so it can remove the request IDs.
  120. if !c.isHTTP {
  121. select {
  122. case c.reqTimeout <- op:
  123. case <-c.closing:
  124. }
  125. }
  126. return nil, ctx.Err()
  127. case resp := <-op.resp:
  128. return resp, op.err
  129. }
  130. }
  131. // Dial creates a new client for the given URL.
  132. //
  133. // The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
  134. // file name with no URL scheme, a local socket connection is established using UNIX
  135. // domain sockets on supported platforms and named pipes on Windows. If you want to
  136. // configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
  137. //
  138. // For websocket connections, the origin is set to the local host name.
  139. //
  140. // The client reconnects automatically if the connection is lost.
  141. func Dial(rawurl string) (*Client, error) {
  142. return DialContext(context.Background(), rawurl)
  143. }
  144. // DialContext creates a new RPC client, just like Dial.
  145. //
  146. // The context is used to cancel or time out the initial connection establishment. It does
  147. // not affect subsequent interactions with the client.
  148. func DialContext(ctx context.Context, rawurl string) (*Client, error) {
  149. u, err := url.Parse(rawurl)
  150. if err != nil {
  151. return nil, err
  152. }
  153. switch u.Scheme {
  154. case "http", "https":
  155. return DialHTTP(rawurl)
  156. case "ws", "wss":
  157. return DialWebsocket(ctx, rawurl, "")
  158. case "stdio":
  159. return DialStdIO(ctx)
  160. case "":
  161. return DialIPC(ctx, rawurl)
  162. default:
  163. return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
  164. }
  165. }
  166. // Client retrieves the client from the context, if any. This can be used to perform
  167. // 'reverse calls' in a handler method.
  168. func ClientFromContext(ctx context.Context) (*Client, bool) {
  169. client, ok := ctx.Value(clientContextKey{}).(*Client)
  170. return client, ok
  171. }
  172. func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) {
  173. conn, err := connect(initctx)
  174. if err != nil {
  175. return nil, err
  176. }
  177. c := initClient(conn, randomIDGenerator(), new(serviceRegistry))
  178. c.reconnectFunc = connect
  179. return c, nil
  180. }
  181. func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
  182. _, isHTTP := conn.(*httpConn)
  183. c := &Client{
  184. idgen: idgen,
  185. isHTTP: isHTTP,
  186. services: services,
  187. writeConn: conn,
  188. close: make(chan struct{}),
  189. closing: make(chan struct{}),
  190. didClose: make(chan struct{}),
  191. reconnected: make(chan ServerCodec),
  192. readOp: make(chan readOp),
  193. readErr: make(chan error),
  194. reqInit: make(chan *requestOp),
  195. reqSent: make(chan error, 1),
  196. reqTimeout: make(chan *requestOp),
  197. }
  198. if !isHTTP {
  199. go c.dispatch(conn)
  200. }
  201. return c
  202. }
  203. // RegisterName creates a service for the given receiver type under the given name. When no
  204. // methods on the given receiver match the criteria to be either a RPC method or a
  205. // subscription an error is returned. Otherwise a new service is created and added to the
  206. // service collection this client provides to the server.
  207. func (c *Client) RegisterName(name string, receiver interface{}) error {
  208. return c.services.registerName(name, receiver)
  209. }
  210. func (c *Client) nextID() json.RawMessage {
  211. id := atomic.AddUint32(&c.idCounter, 1)
  212. return strconv.AppendUint(nil, uint64(id), 10)
  213. }
  214. // SupportedModules calls the rpc_modules method, retrieving the list of
  215. // APIs that are available on the server.
  216. func (c *Client) SupportedModules() (map[string]string, error) {
  217. var result map[string]string
  218. ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
  219. defer cancel()
  220. err := c.CallContext(ctx, &result, "rpc_modules")
  221. return result, err
  222. }
  223. // Close closes the client, aborting any in-flight requests.
  224. func (c *Client) Close() {
  225. if c.isHTTP {
  226. return
  227. }
  228. select {
  229. case c.close <- struct{}{}:
  230. <-c.didClose
  231. case <-c.didClose:
  232. }
  233. }
  234. // Call performs a JSON-RPC call with the given arguments and unmarshals into
  235. // result if no error occurred.
  236. //
  237. // The result must be a pointer so that package json can unmarshal into it. You
  238. // can also pass nil, in which case the result is ignored.
  239. func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
  240. ctx := context.Background()
  241. return c.CallContext(ctx, result, method, args...)
  242. }
  243. // CallContext performs a JSON-RPC call with the given arguments. If the context is
  244. // canceled before the call has successfully returned, CallContext returns immediately.
  245. //
  246. // The result must be a pointer so that package json can unmarshal into it. You
  247. // can also pass nil, in which case the result is ignored.
  248. func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
  249. msg, err := c.newMessage(method, args...)
  250. if err != nil {
  251. return err
  252. }
  253. op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
  254. if c.isHTTP {
  255. err = c.sendHTTP(ctx, op, msg)
  256. } else {
  257. err = c.send(ctx, op, msg)
  258. }
  259. if err != nil {
  260. return err
  261. }
  262. // dispatch has accepted the request and will close the channel when it quits.
  263. switch resp, err := op.wait(ctx, c); {
  264. case err != nil:
  265. return err
  266. case resp.Error != nil:
  267. return resp.Error
  268. case len(resp.Result) == 0:
  269. return ErrNoResult
  270. default:
  271. return json.Unmarshal(resp.Result, &result)
  272. }
  273. }
  274. // BatchCall sends all given requests as a single batch and waits for the server
  275. // to return a response for all of them.
  276. //
  277. // In contrast to Call, BatchCall only returns I/O errors. Any error specific to
  278. // a request is reported through the Error field of the corresponding BatchElem.
  279. //
  280. // Note that batch calls may not be executed atomically on the server side.
  281. func (c *Client) BatchCall(b []BatchElem) error {
  282. ctx := context.Background()
  283. return c.BatchCallContext(ctx, b)
  284. }
  285. // BatchCall sends all given requests as a single batch and waits for the server
  286. // to return a response for all of them. The wait duration is bounded by the
  287. // context's deadline.
  288. //
  289. // In contrast to CallContext, BatchCallContext only returns errors that have occurred
  290. // while sending the request. Any error specific to a request is reported through the
  291. // Error field of the corresponding BatchElem.
  292. //
  293. // Note that batch calls may not be executed atomically on the server side.
  294. func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
  295. msgs := make([]*jsonrpcMessage, len(b))
  296. op := &requestOp{
  297. ids: make([]json.RawMessage, len(b)),
  298. resp: make(chan *jsonrpcMessage, len(b)),
  299. }
  300. for i, elem := range b {
  301. msg, err := c.newMessage(elem.Method, elem.Args...)
  302. if err != nil {
  303. return err
  304. }
  305. msgs[i] = msg
  306. op.ids[i] = msg.ID
  307. }
  308. var err error
  309. if c.isHTTP {
  310. err = c.sendBatchHTTP(ctx, op, msgs)
  311. } else {
  312. err = c.send(ctx, op, msgs)
  313. }
  314. // Wait for all responses to come back.
  315. for n := 0; n < len(b) && err == nil; n++ {
  316. var resp *jsonrpcMessage
  317. resp, err = op.wait(ctx, c)
  318. if err != nil {
  319. break
  320. }
  321. // Find the element corresponding to this response.
  322. // The element is guaranteed to be present because dispatch
  323. // only sends valid IDs to our channel.
  324. var elem *BatchElem
  325. for i := range msgs {
  326. if bytes.Equal(msgs[i].ID, resp.ID) {
  327. elem = &b[i]
  328. break
  329. }
  330. }
  331. if resp.Error != nil {
  332. elem.Error = resp.Error
  333. continue
  334. }
  335. if len(resp.Result) == 0 {
  336. elem.Error = ErrNoResult
  337. continue
  338. }
  339. elem.Error = json.Unmarshal(resp.Result, elem.Result)
  340. }
  341. return err
  342. }
  343. // Notify sends a notification, i.e. a method call that doesn't expect a response.
  344. func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error {
  345. op := new(requestOp)
  346. msg, err := c.newMessage(method, args...)
  347. if err != nil {
  348. return err
  349. }
  350. msg.ID = nil
  351. if c.isHTTP {
  352. return c.sendHTTP(ctx, op, msg)
  353. } else {
  354. return c.send(ctx, op, msg)
  355. }
  356. }
  357. // EthSubscribe registers a subscripion under the "eth" namespace.
  358. func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
  359. return c.Subscribe(ctx, "eth", channel, args...)
  360. }
  361. // ShhSubscribe registers a subscripion under the "shh" namespace.
  362. func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
  363. return c.Subscribe(ctx, "shh", channel, args...)
  364. }
  365. // Subscribe calls the "<namespace>_subscribe" method with the given arguments,
  366. // registering a subscription. Server notifications for the subscription are
  367. // sent to the given channel. The element type of the channel must match the
  368. // expected type of content returned by the subscription.
  369. //
  370. // The context argument cancels the RPC request that sets up the subscription but has no
  371. // effect on the subscription after Subscribe has returned.
  372. //
  373. // Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
  374. // before considering the subscriber dead. The subscription Err channel will receive
  375. // ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
  376. // that the channel usually has at least one reader to prevent this issue.
  377. func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
  378. // Check type of channel first.
  379. chanVal := reflect.ValueOf(channel)
  380. if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
  381. panic("first argument to Subscribe must be a writable channel")
  382. }
  383. if chanVal.IsNil() {
  384. panic("channel given to Subscribe must not be nil")
  385. }
  386. if c.isHTTP {
  387. return nil, ErrNotificationsUnsupported
  388. }
  389. msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
  390. if err != nil {
  391. return nil, err
  392. }
  393. op := &requestOp{
  394. ids: []json.RawMessage{msg.ID},
  395. resp: make(chan *jsonrpcMessage),
  396. sub: newClientSubscription(c, namespace, chanVal),
  397. }
  398. // Send the subscription request.
  399. // The arrival and validity of the response is signaled on sub.quit.
  400. if err := c.send(ctx, op, msg); err != nil {
  401. return nil, err
  402. }
  403. if _, err := op.wait(ctx, c); err != nil {
  404. return nil, err
  405. }
  406. return op.sub, nil
  407. }
  408. func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
  409. msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method}
  410. if paramsIn != nil { // prevent sending "params":null
  411. var err error
  412. if msg.Params, err = json.Marshal(paramsIn); err != nil {
  413. return nil, err
  414. }
  415. }
  416. return msg, nil
  417. }
  418. // send registers op with the dispatch loop, then sends msg on the connection.
  419. // if sending fails, op is deregistered.
  420. func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
  421. select {
  422. case c.reqInit <- op:
  423. err := c.write(ctx, msg)
  424. c.reqSent <- err
  425. return err
  426. case <-ctx.Done():
  427. // This can happen if the client is overloaded or unable to keep up with
  428. // subscription notifications.
  429. return ctx.Err()
  430. case <-c.closing:
  431. return ErrClientQuit
  432. }
  433. }
  434. func (c *Client) write(ctx context.Context, msg interface{}) error {
  435. // The previous write failed. Try to establish a new connection.
  436. if c.writeConn == nil {
  437. if err := c.reconnect(ctx); err != nil {
  438. return err
  439. }
  440. }
  441. err := c.writeConn.Write(ctx, msg)
  442. if err != nil {
  443. c.writeConn = nil
  444. }
  445. return err
  446. }
  447. func (c *Client) reconnect(ctx context.Context) error {
  448. if c.reconnectFunc == nil {
  449. return errDead
  450. }
  451. if _, ok := ctx.Deadline(); !ok {
  452. var cancel func()
  453. ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
  454. defer cancel()
  455. }
  456. newconn, err := c.reconnectFunc(ctx)
  457. if err != nil {
  458. log.Trace("RPC client reconnect failed", "err", err)
  459. return err
  460. }
  461. select {
  462. case c.reconnected <- newconn:
  463. c.writeConn = newconn
  464. return nil
  465. case <-c.didClose:
  466. newconn.Close()
  467. return ErrClientQuit
  468. }
  469. }
  470. // dispatch is the main loop of the client.
  471. // It sends read messages to waiting calls to Call and BatchCall
  472. // and subscription notifications to registered subscriptions.
  473. func (c *Client) dispatch(codec ServerCodec) {
  474. var (
  475. lastOp *requestOp // tracks last send operation
  476. reqInitLock = c.reqInit // nil while the send lock is held
  477. conn = c.newClientConn(codec)
  478. reading = true
  479. )
  480. defer func() {
  481. close(c.closing)
  482. if reading {
  483. conn.close(ErrClientQuit, nil)
  484. c.drainRead()
  485. }
  486. close(c.didClose)
  487. }()
  488. // Spawn the initial read loop.
  489. go c.read(codec)
  490. for {
  491. select {
  492. case <-c.close:
  493. return
  494. // Read path:
  495. case op := <-c.readOp:
  496. if op.batch {
  497. conn.handler.handleBatch(op.msgs)
  498. } else {
  499. conn.handler.handleMsg(op.msgs[0])
  500. }
  501. case err := <-c.readErr:
  502. conn.handler.log.Debug("RPC connection read error", "err", err)
  503. conn.close(err, lastOp)
  504. reading = false
  505. // Reconnect:
  506. case newcodec := <-c.reconnected:
  507. log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr())
  508. if reading {
  509. // Wait for the previous read loop to exit. This is a rare case which
  510. // happens if this loop isn't notified in time after the connection breaks.
  511. // In those cases the caller will notice first and reconnect. Closing the
  512. // handler terminates all waiting requests (closing op.resp) except for
  513. // lastOp, which will be transferred to the new handler.
  514. conn.close(errClientReconnected, lastOp)
  515. c.drainRead()
  516. }
  517. go c.read(newcodec)
  518. reading = true
  519. conn = c.newClientConn(newcodec)
  520. // Re-register the in-flight request on the new handler
  521. // because that's where it will be sent.
  522. conn.handler.addRequestOp(lastOp)
  523. // Send path:
  524. case op := <-reqInitLock:
  525. // Stop listening for further requests until the current one has been sent.
  526. reqInitLock = nil
  527. lastOp = op
  528. conn.handler.addRequestOp(op)
  529. case err := <-c.reqSent:
  530. if err != nil {
  531. // Remove response handlers for the last send. When the read loop
  532. // goes down, it will signal all other current operations.
  533. conn.handler.removeRequestOp(lastOp)
  534. }
  535. // Let the next request in.
  536. reqInitLock = c.reqInit
  537. lastOp = nil
  538. case op := <-c.reqTimeout:
  539. conn.handler.removeRequestOp(op)
  540. }
  541. }
  542. }
  543. // drainRead drops read messages until an error occurs.
  544. func (c *Client) drainRead() {
  545. for {
  546. select {
  547. case <-c.readOp:
  548. case <-c.readErr:
  549. return
  550. }
  551. }
  552. }
  553. // read decodes RPC messages from a codec, feeding them into dispatch.
  554. func (c *Client) read(codec ServerCodec) {
  555. for {
  556. msgs, batch, err := codec.Read()
  557. if _, ok := err.(*json.SyntaxError); ok {
  558. codec.Write(context.Background(), errorMessage(&parseError{err.Error()}))
  559. }
  560. if err != nil {
  561. c.readErr <- err
  562. return
  563. }
  564. c.readOp <- readOp{msgs, batch}
  565. }
  566. }