subscription.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "container/list"
  19. "context"
  20. crand "crypto/rand"
  21. "encoding/binary"
  22. "encoding/hex"
  23. "encoding/json"
  24. "errors"
  25. "math/rand"
  26. "reflect"
  27. "strings"
  28. "sync"
  29. "time"
  30. )
  31. var (
  32. // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
  33. ErrNotificationsUnsupported = errors.New("notifications not supported")
  34. // ErrNotificationNotFound is returned when the notification for the given id is not found
  35. ErrSubscriptionNotFound = errors.New("subscription not found")
  36. )
  37. var globalGen = randomIDGenerator()
  38. // ID defines a pseudo random number that is used to identify RPC subscriptions.
  39. type ID string
  40. // NewID returns a new, random ID.
  41. func NewID() ID {
  42. return globalGen()
  43. }
  44. // randomIDGenerator returns a function generates a random IDs.
  45. func randomIDGenerator() func() ID {
  46. var buf = make([]byte, 8)
  47. var seed int64
  48. if _, err := crand.Read(buf); err == nil {
  49. seed = int64(binary.BigEndian.Uint64(buf))
  50. } else {
  51. seed = int64(time.Now().Nanosecond())
  52. }
  53. var (
  54. mu sync.Mutex
  55. rng = rand.New(rand.NewSource(seed))
  56. )
  57. return func() ID {
  58. mu.Lock()
  59. defer mu.Unlock()
  60. id := make([]byte, 16)
  61. rng.Read(id)
  62. return encodeID(id)
  63. }
  64. }
  65. func encodeID(b []byte) ID {
  66. id := hex.EncodeToString(b)
  67. id = strings.TrimLeft(id, "0")
  68. if id == "" {
  69. id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
  70. }
  71. return ID("0x" + id)
  72. }
  73. type notifierKey struct{}
  74. // NotifierFromContext returns the Notifier value stored in ctx, if any.
  75. func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
  76. n, ok := ctx.Value(notifierKey{}).(*Notifier)
  77. return n, ok
  78. }
  79. // Notifier is tied to a RPC connection that supports subscriptions.
  80. // Server callbacks use the notifier to send notifications.
  81. type Notifier struct {
  82. h *handler
  83. namespace string
  84. mu sync.Mutex
  85. sub *Subscription
  86. buffer []json.RawMessage
  87. callReturned bool
  88. activated bool
  89. }
  90. // CreateSubscription returns a new subscription that is coupled to the
  91. // RPC connection. By default subscriptions are inactive and notifications
  92. // are dropped until the subscription is marked as active. This is done
  93. // by the RPC server after the subscription ID is send to the client.
  94. func (n *Notifier) CreateSubscription() *Subscription {
  95. n.mu.Lock()
  96. defer n.mu.Unlock()
  97. if n.sub != nil {
  98. panic("can't create multiple subscriptions with Notifier")
  99. } else if n.callReturned {
  100. panic("can't create subscription after subscribe call has returned")
  101. }
  102. n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
  103. return n.sub
  104. }
  105. // Notify sends a notification to the client with the given data as payload.
  106. // If an error occurs the RPC connection is closed and the error is returned.
  107. func (n *Notifier) Notify(id ID, data interface{}) error {
  108. enc, err := json.Marshal(data)
  109. if err != nil {
  110. return err
  111. }
  112. n.mu.Lock()
  113. defer n.mu.Unlock()
  114. if n.sub == nil {
  115. panic("can't Notify before subscription is created")
  116. } else if n.sub.ID != id {
  117. panic("Notify with wrong ID")
  118. }
  119. if n.activated {
  120. return n.send(n.sub, enc)
  121. }
  122. n.buffer = append(n.buffer, enc)
  123. return nil
  124. }
  125. // Closed returns a channel that is closed when the RPC connection is closed.
  126. // Deprecated: use subscription error channel
  127. func (n *Notifier) Closed() <-chan interface{} {
  128. return n.h.conn.closed()
  129. }
  130. // takeSubscription returns the subscription (if one has been created). No subscription can
  131. // be created after this call.
  132. func (n *Notifier) takeSubscription() *Subscription {
  133. n.mu.Lock()
  134. defer n.mu.Unlock()
  135. n.callReturned = true
  136. return n.sub
  137. }
  138. // activate is called after the subscription ID was sent to client. Notifications are
  139. // buffered before activation. This prevents notifications being sent to the client before
  140. // the subscription ID is sent to the client.
  141. func (n *Notifier) activate() error {
  142. n.mu.Lock()
  143. defer n.mu.Unlock()
  144. for _, data := range n.buffer {
  145. if err := n.send(n.sub, data); err != nil {
  146. return err
  147. }
  148. }
  149. n.activated = true
  150. return nil
  151. }
  152. func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
  153. params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
  154. ctx := context.Background()
  155. return n.h.conn.writeJSON(ctx, &jsonrpcMessage{
  156. Version: vsn,
  157. Method: n.namespace + notificationMethodSuffix,
  158. Params: params,
  159. })
  160. }
  161. // A Subscription is created by a notifier and tied to that notifier. The client can use
  162. // this subscription to wait for an unsubscribe request for the client, see Err().
  163. type Subscription struct {
  164. ID ID
  165. namespace string
  166. err chan error // closed on unsubscribe
  167. }
  168. // Err returns a channel that is closed when the client send an unsubscribe request.
  169. func (s *Subscription) Err() <-chan error {
  170. return s.err
  171. }
  172. // MarshalJSON marshals a subscription as its ID.
  173. func (s *Subscription) MarshalJSON() ([]byte, error) {
  174. return json.Marshal(s.ID)
  175. }
  176. // ClientSubscription is a subscription established through the Client's Subscribe or
  177. // EthSubscribe methods.
  178. type ClientSubscription struct {
  179. client *Client
  180. etype reflect.Type
  181. channel reflect.Value
  182. namespace string
  183. subid string
  184. // The in channel receives notification values from client dispatcher.
  185. in chan json.RawMessage
  186. // The error channel receives the error from the forwarding loop.
  187. // It is closed by Unsubscribe.
  188. err chan error
  189. errOnce sync.Once
  190. // Closing of the subscription is requested by sending on 'quit'. This is handled by
  191. // the forwarding loop, which closes 'forwardDone' when it has stopped sending to
  192. // sub.channel. Finally, 'unsubDone' is closed after unsubscribing on the server side.
  193. quit chan error
  194. forwardDone chan struct{}
  195. unsubDone chan struct{}
  196. }
  197. // This is the sentinel value sent on sub.quit when Unsubscribe is called.
  198. var errUnsubscribed = errors.New("unsubscribed")
  199. func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
  200. sub := &ClientSubscription{
  201. client: c,
  202. namespace: namespace,
  203. etype: channel.Type().Elem(),
  204. channel: channel,
  205. in: make(chan json.RawMessage),
  206. quit: make(chan error),
  207. forwardDone: make(chan struct{}),
  208. unsubDone: make(chan struct{}),
  209. err: make(chan error, 1),
  210. }
  211. return sub
  212. }
  213. // Err returns the subscription error channel. The intended use of Err is to schedule
  214. // resubscription when the client connection is closed unexpectedly.
  215. //
  216. // The error channel receives a value when the subscription has ended due to an error. The
  217. // received error is nil if Close has been called on the underlying client and no other
  218. // error has occurred.
  219. //
  220. // The error channel is closed when Unsubscribe is called on the subscription.
  221. func (sub *ClientSubscription) Err() <-chan error {
  222. return sub.err
  223. }
  224. // Unsubscribe unsubscribes the notification and closes the error channel.
  225. // It can safely be called more than once.
  226. func (sub *ClientSubscription) Unsubscribe() {
  227. sub.errOnce.Do(func() {
  228. select {
  229. case sub.quit <- errUnsubscribed:
  230. <-sub.unsubDone
  231. case <-sub.unsubDone:
  232. }
  233. close(sub.err)
  234. })
  235. }
  236. // deliver is called by the client's message dispatcher to send a notification value.
  237. func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
  238. select {
  239. case sub.in <- result:
  240. return true
  241. case <-sub.forwardDone:
  242. return false
  243. }
  244. }
  245. // close is called by the client's message dispatcher when the connection is closed.
  246. func (sub *ClientSubscription) close(err error) {
  247. select {
  248. case sub.quit <- err:
  249. case <-sub.forwardDone:
  250. }
  251. }
  252. // run is the forwarding loop of the subscription. It runs in its own goroutine and
  253. // is launched by the client's handler after the subscription has been created.
  254. func (sub *ClientSubscription) run() {
  255. defer close(sub.unsubDone)
  256. unsubscribe, err := sub.forward()
  257. // The client's dispatch loop won't be able to execute the unsubscribe call if it is
  258. // blocked in sub.deliver() or sub.close(). Closing forwardDone unblocks them.
  259. close(sub.forwardDone)
  260. // Call the unsubscribe method on the server.
  261. if unsubscribe {
  262. sub.requestUnsubscribe()
  263. }
  264. // Send the error.
  265. if err != nil {
  266. if err == ErrClientQuit {
  267. // ErrClientQuit gets here when Client.Close is called. This is reported as a
  268. // nil error because it's not an error, but we can't close sub.err here.
  269. err = nil
  270. }
  271. sub.err <- err
  272. }
  273. }
  274. // forward is the forwarding loop. It takes in RPC notifications and sends them
  275. // on the subscription channel.
  276. func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
  277. cases := []reflect.SelectCase{
  278. {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
  279. {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
  280. {Dir: reflect.SelectSend, Chan: sub.channel},
  281. }
  282. buffer := list.New()
  283. for {
  284. var chosen int
  285. var recv reflect.Value
  286. if buffer.Len() == 0 {
  287. // Idle, omit send case.
  288. chosen, recv, _ = reflect.Select(cases[:2])
  289. } else {
  290. // Non-empty buffer, send the first queued item.
  291. cases[2].Send = reflect.ValueOf(buffer.Front().Value)
  292. chosen, recv, _ = reflect.Select(cases)
  293. }
  294. switch chosen {
  295. case 0: // <-sub.quit
  296. if !recv.IsNil() {
  297. err = recv.Interface().(error)
  298. }
  299. if err == errUnsubscribed {
  300. // Exiting because Unsubscribe was called, unsubscribe on server.
  301. return true, nil
  302. }
  303. return false, err
  304. case 1: // <-sub.in
  305. val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
  306. if err != nil {
  307. return true, err
  308. }
  309. if buffer.Len() == maxClientSubscriptionBuffer {
  310. return true, ErrSubscriptionQueueOverflow
  311. }
  312. buffer.PushBack(val)
  313. case 2: // sub.channel<-
  314. cases[2].Send = reflect.Value{} // Don't hold onto the value.
  315. buffer.Remove(buffer.Front())
  316. }
  317. }
  318. }
  319. func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
  320. val := reflect.New(sub.etype)
  321. err := json.Unmarshal(result, val.Interface())
  322. return val.Elem().Interface(), err
  323. }
  324. func (sub *ClientSubscription) requestUnsubscribe() error {
  325. var result interface{}
  326. return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
  327. }