subscription.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "bufio"
  19. "container/list"
  20. "context"
  21. crand "crypto/rand"
  22. "encoding/binary"
  23. "encoding/hex"
  24. "encoding/json"
  25. "errors"
  26. "math/rand"
  27. "reflect"
  28. "strings"
  29. "sync"
  30. "time"
  31. )
  32. var (
  33. // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
  34. ErrNotificationsUnsupported = errors.New("notifications not supported")
  35. // ErrNotificationNotFound is returned when the notification for the given id is not found
  36. ErrSubscriptionNotFound = errors.New("subscription not found")
  37. )
  38. var globalGen = randomIDGenerator()
  39. // ID defines a pseudo random number that is used to identify RPC subscriptions.
  40. type ID string
  41. // NewID returns a new, random ID.
  42. func NewID() ID {
  43. return globalGen()
  44. }
  45. // randomIDGenerator returns a function generates a random IDs.
  46. func randomIDGenerator() func() ID {
  47. seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
  48. if err != nil {
  49. seed = int64(time.Now().Nanosecond())
  50. }
  51. var (
  52. mu sync.Mutex
  53. rng = rand.New(rand.NewSource(seed))
  54. )
  55. return func() ID {
  56. mu.Lock()
  57. defer mu.Unlock()
  58. id := make([]byte, 16)
  59. rng.Read(id)
  60. return encodeID(id)
  61. }
  62. }
  63. func encodeID(b []byte) ID {
  64. id := hex.EncodeToString(b)
  65. id = strings.TrimLeft(id, "0")
  66. if id == "" {
  67. id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
  68. }
  69. return ID("0x" + id)
  70. }
  71. type notifierKey struct{}
  72. // NotifierFromContext returns the Notifier value stored in ctx, if any.
  73. func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
  74. n, ok := ctx.Value(notifierKey{}).(*Notifier)
  75. return n, ok
  76. }
  77. // Notifier is tied to a RPC connection that supports subscriptions.
  78. // Server callbacks use the notifier to send notifications.
  79. type Notifier struct {
  80. h *handler
  81. namespace string
  82. mu sync.Mutex
  83. sub *Subscription
  84. buffer []json.RawMessage
  85. callReturned bool
  86. activated bool
  87. }
  88. // CreateSubscription returns a new subscription that is coupled to the
  89. // RPC connection. By default subscriptions are inactive and notifications
  90. // are dropped until the subscription is marked as active. This is done
  91. // by the RPC server after the subscription ID is send to the client.
  92. func (n *Notifier) CreateSubscription() *Subscription {
  93. n.mu.Lock()
  94. defer n.mu.Unlock()
  95. if n.sub != nil {
  96. panic("can't create multiple subscriptions with Notifier")
  97. } else if n.callReturned {
  98. panic("can't create subscription after subscribe call has returned")
  99. }
  100. n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
  101. return n.sub
  102. }
  103. // Notify sends a notification to the client with the given data as payload.
  104. // If an error occurs the RPC connection is closed and the error is returned.
  105. func (n *Notifier) Notify(id ID, data interface{}) error {
  106. enc, err := json.Marshal(data)
  107. if err != nil {
  108. return err
  109. }
  110. n.mu.Lock()
  111. defer n.mu.Unlock()
  112. if n.sub == nil {
  113. panic("can't Notify before subscription is created")
  114. } else if n.sub.ID != id {
  115. panic("Notify with wrong ID")
  116. }
  117. if n.activated {
  118. return n.send(n.sub, enc)
  119. }
  120. n.buffer = append(n.buffer, enc)
  121. return nil
  122. }
  123. // Closed returns a channel that is closed when the RPC connection is closed.
  124. // Deprecated: use subscription error channel
  125. func (n *Notifier) Closed() <-chan interface{} {
  126. return n.h.conn.Closed()
  127. }
  128. // takeSubscription returns the subscription (if one has been created). No subscription can
  129. // be created after this call.
  130. func (n *Notifier) takeSubscription() *Subscription {
  131. n.mu.Lock()
  132. defer n.mu.Unlock()
  133. n.callReturned = true
  134. return n.sub
  135. }
  136. // acticate is called after the subscription ID was sent to client. Notifications are
  137. // buffered before activation. This prevents notifications being sent to the client before
  138. // the subscription ID is sent to the client.
  139. func (n *Notifier) activate() error {
  140. n.mu.Lock()
  141. defer n.mu.Unlock()
  142. for _, data := range n.buffer {
  143. if err := n.send(n.sub, data); err != nil {
  144. return err
  145. }
  146. }
  147. n.activated = true
  148. return nil
  149. }
  150. func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
  151. params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
  152. ctx := context.Background()
  153. return n.h.conn.Write(ctx, &jsonrpcMessage{
  154. Version: vsn,
  155. Method: n.namespace + notificationMethodSuffix,
  156. Params: params,
  157. })
  158. }
  159. // A Subscription is created by a notifier and tight to that notifier. The client can use
  160. // this subscription to wait for an unsubscribe request for the client, see Err().
  161. type Subscription struct {
  162. ID ID
  163. namespace string
  164. err chan error // closed on unsubscribe
  165. }
  166. // Err returns a channel that is closed when the client send an unsubscribe request.
  167. func (s *Subscription) Err() <-chan error {
  168. return s.err
  169. }
  170. // MarshalJSON marshals a subscription as its ID.
  171. func (s *Subscription) MarshalJSON() ([]byte, error) {
  172. return json.Marshal(s.ID)
  173. }
  174. // ClientSubscription is a subscription established through the Client's Subscribe or
  175. // EthSubscribe methods.
  176. type ClientSubscription struct {
  177. client *Client
  178. etype reflect.Type
  179. channel reflect.Value
  180. namespace string
  181. subid string
  182. in chan json.RawMessage
  183. quitOnce sync.Once // ensures quit is closed once
  184. quit chan struct{} // quit is closed when the subscription exits
  185. errOnce sync.Once // ensures err is closed once
  186. err chan error
  187. }
  188. func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
  189. sub := &ClientSubscription{
  190. client: c,
  191. namespace: namespace,
  192. etype: channel.Type().Elem(),
  193. channel: channel,
  194. quit: make(chan struct{}),
  195. err: make(chan error, 1),
  196. in: make(chan json.RawMessage),
  197. }
  198. return sub
  199. }
  200. // Err returns the subscription error channel. The intended use of Err is to schedule
  201. // resubscription when the client connection is closed unexpectedly.
  202. //
  203. // The error channel receives a value when the subscription has ended due
  204. // to an error. The received error is nil if Close has been called
  205. // on the underlying client and no other error has occurred.
  206. //
  207. // The error channel is closed when Unsubscribe is called on the subscription.
  208. func (sub *ClientSubscription) Err() <-chan error {
  209. return sub.err
  210. }
  211. // Unsubscribe unsubscribes the notification and closes the error channel.
  212. // It can safely be called more than once.
  213. func (sub *ClientSubscription) Unsubscribe() {
  214. sub.quitWithError(nil, true)
  215. sub.errOnce.Do(func() { close(sub.err) })
  216. }
  217. func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
  218. sub.quitOnce.Do(func() {
  219. // The dispatch loop won't be able to execute the unsubscribe call
  220. // if it is blocked on deliver. Close sub.quit first because it
  221. // unblocks deliver.
  222. close(sub.quit)
  223. if unsubscribeServer {
  224. sub.requestUnsubscribe()
  225. }
  226. if err != nil {
  227. if err == ErrClientQuit {
  228. err = nil // Adhere to subscription semantics.
  229. }
  230. sub.err <- err
  231. }
  232. })
  233. }
  234. func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
  235. select {
  236. case sub.in <- result:
  237. return true
  238. case <-sub.quit:
  239. return false
  240. }
  241. }
  242. func (sub *ClientSubscription) start() {
  243. sub.quitWithError(sub.forward())
  244. }
  245. func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
  246. cases := []reflect.SelectCase{
  247. {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
  248. {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
  249. {Dir: reflect.SelectSend, Chan: sub.channel},
  250. }
  251. buffer := list.New()
  252. defer buffer.Init()
  253. for {
  254. var chosen int
  255. var recv reflect.Value
  256. if buffer.Len() == 0 {
  257. // Idle, omit send case.
  258. chosen, recv, _ = reflect.Select(cases[:2])
  259. } else {
  260. // Non-empty buffer, send the first queued item.
  261. cases[2].Send = reflect.ValueOf(buffer.Front().Value)
  262. chosen, recv, _ = reflect.Select(cases)
  263. }
  264. switch chosen {
  265. case 0: // <-sub.quit
  266. return nil, false
  267. case 1: // <-sub.in
  268. val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
  269. if err != nil {
  270. return err, true
  271. }
  272. if buffer.Len() == maxClientSubscriptionBuffer {
  273. return ErrSubscriptionQueueOverflow, true
  274. }
  275. buffer.PushBack(val)
  276. case 2: // sub.channel<-
  277. cases[2].Send = reflect.Value{} // Don't hold onto the value.
  278. buffer.Remove(buffer.Front())
  279. }
  280. }
  281. }
  282. func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
  283. val := reflect.New(sub.etype)
  284. err := json.Unmarshal(result, val.Interface())
  285. return val.Elem().Interface(), err
  286. }
  287. func (sub *ClientSubscription) requestUnsubscribe() error {
  288. var result interface{}
  289. return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
  290. }