subscription.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "context"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/gopool"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. )
  24. // Subscription represents a stream of events. The carrier of the events is typically a
  25. // channel, but isn't part of the interface.
  26. //
  27. // Subscriptions can fail while established. Failures are reported through an error
  28. // channel. It receives a value if there is an issue with the subscription (e.g. the
  29. // network connection delivering the events has been closed). Only one value will ever be
  30. // sent.
  31. //
  32. // The error channel is closed when the subscription ends successfully (i.e. when the
  33. // source of events is closed). It is also closed when Unsubscribe is called.
  34. //
  35. // The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
  36. // cases to ensure that resources related to the subscription are released. It can be
  37. // called any number of times.
  38. type Subscription interface {
  39. Err() <-chan error // returns the error channel
  40. Unsubscribe() // cancels sending of events, closing the error channel
  41. }
  42. // NewSubscription runs a producer function as a subscription in a new goroutine. The
  43. // channel given to the producer is closed when Unsubscribe is called. If fn returns an
  44. // error, it is sent on the subscription's error channel.
  45. func NewSubscription(producer func(<-chan struct{}) error) Subscription {
  46. s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
  47. gopool.Submit(func() {
  48. defer close(s.err)
  49. err := producer(s.unsub)
  50. s.mu.Lock()
  51. defer s.mu.Unlock()
  52. if !s.unsubscribed {
  53. if err != nil {
  54. s.err <- err
  55. }
  56. s.unsubscribed = true
  57. }
  58. })
  59. return s
  60. }
  61. type funcSub struct {
  62. unsub chan struct{}
  63. err chan error
  64. mu sync.Mutex
  65. unsubscribed bool
  66. }
  67. func (s *funcSub) Unsubscribe() {
  68. s.mu.Lock()
  69. if s.unsubscribed {
  70. s.mu.Unlock()
  71. return
  72. }
  73. s.unsubscribed = true
  74. close(s.unsub)
  75. s.mu.Unlock()
  76. // Wait for producer shutdown.
  77. <-s.err
  78. }
  79. func (s *funcSub) Err() <-chan error {
  80. return s.err
  81. }
  82. // Resubscribe calls fn repeatedly to keep a subscription established. When the
  83. // subscription is established, Resubscribe waits for it to fail and calls fn again. This
  84. // process repeats until Unsubscribe is called or the active subscription ends
  85. // successfully.
  86. //
  87. // Resubscribe applies backoff between calls to fn. The time between calls is adapted
  88. // based on the error rate, but will never exceed backoffMax.
  89. func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
  90. return ResubscribeErr(backoffMax, func(ctx context.Context, _ error) (Subscription, error) {
  91. return fn(ctx)
  92. })
  93. }
  94. // A ResubscribeFunc attempts to establish a subscription.
  95. type ResubscribeFunc func(context.Context) (Subscription, error)
  96. // ResubscribeErr calls fn repeatedly to keep a subscription established. When the
  97. // subscription is established, ResubscribeErr waits for it to fail and calls fn again. This
  98. // process repeats until Unsubscribe is called or the active subscription ends
  99. // successfully.
  100. //
  101. // The difference between Resubscribe and ResubscribeErr is that with ResubscribeErr,
  102. // the error of the failing subscription is available to the callback for logging
  103. // purposes.
  104. //
  105. // ResubscribeErr applies backoff between calls to fn. The time between calls is adapted
  106. // based on the error rate, but will never exceed backoffMax.
  107. func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscription {
  108. s := &resubscribeSub{
  109. waitTime: backoffMax / 10,
  110. backoffMax: backoffMax,
  111. fn: fn,
  112. err: make(chan error),
  113. unsub: make(chan struct{}),
  114. }
  115. go s.loop()
  116. return s
  117. }
  118. // A ResubscribeErrFunc attempts to establish a subscription.
  119. // For every call but the first, the second argument to this function is
  120. // the error that occurred with the previous subscription.
  121. type ResubscribeErrFunc func(context.Context, error) (Subscription, error)
  122. type resubscribeSub struct {
  123. fn ResubscribeErrFunc
  124. err chan error
  125. unsub chan struct{}
  126. unsubOnce sync.Once
  127. lastTry mclock.AbsTime
  128. lastSubErr error
  129. waitTime, backoffMax time.Duration
  130. }
  131. func (s *resubscribeSub) Unsubscribe() {
  132. s.unsubOnce.Do(func() {
  133. s.unsub <- struct{}{}
  134. <-s.err
  135. })
  136. }
  137. func (s *resubscribeSub) Err() <-chan error {
  138. return s.err
  139. }
  140. func (s *resubscribeSub) loop() {
  141. defer close(s.err)
  142. var done bool
  143. for !done {
  144. sub := s.subscribe()
  145. if sub == nil {
  146. break
  147. }
  148. done = s.waitForError(sub)
  149. sub.Unsubscribe()
  150. }
  151. }
  152. func (s *resubscribeSub) subscribe() Subscription {
  153. subscribed := make(chan error)
  154. var sub Subscription
  155. for {
  156. s.lastTry = mclock.Now()
  157. ctx, cancel := context.WithCancel(context.Background())
  158. gopool.Submit(func() {
  159. rsub, err := s.fn(ctx, s.lastSubErr)
  160. sub = rsub
  161. subscribed <- err
  162. })
  163. select {
  164. case err := <-subscribed:
  165. cancel()
  166. if err == nil {
  167. if sub == nil {
  168. panic("event: ResubscribeFunc returned nil subscription and no error")
  169. }
  170. return sub
  171. }
  172. // Subscribing failed, wait before launching the next try.
  173. if s.backoffWait() {
  174. return nil // unsubscribed during wait
  175. }
  176. case <-s.unsub:
  177. cancel()
  178. <-subscribed // avoid leaking the s.fn goroutine.
  179. return nil
  180. }
  181. }
  182. }
  183. func (s *resubscribeSub) waitForError(sub Subscription) bool {
  184. defer sub.Unsubscribe()
  185. select {
  186. case err := <-sub.Err():
  187. s.lastSubErr = err
  188. return err == nil
  189. case <-s.unsub:
  190. return true
  191. }
  192. }
  193. func (s *resubscribeSub) backoffWait() bool {
  194. if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
  195. s.waitTime = s.backoffMax / 10
  196. } else {
  197. s.waitTime *= 2
  198. if s.waitTime > s.backoffMax {
  199. s.waitTime = s.backoffMax
  200. }
  201. }
  202. t := time.NewTimer(s.waitTime)
  203. defer t.Stop()
  204. select {
  205. case <-t.C:
  206. return false
  207. case <-s.unsub:
  208. return true
  209. }
  210. }
  211. // SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
  212. //
  213. // For code that handle more than one subscription, a scope can be used to conveniently
  214. // unsubscribe all of them with a single call. The example demonstrates a typical use in a
  215. // larger program.
  216. //
  217. // The zero value is ready to use.
  218. type SubscriptionScope struct {
  219. mu sync.Mutex
  220. subs map[*scopeSub]struct{}
  221. closed bool
  222. }
  223. type scopeSub struct {
  224. sc *SubscriptionScope
  225. s Subscription
  226. }
  227. // Track starts tracking a subscription. If the scope is closed, Track returns nil. The
  228. // returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
  229. // scope.
  230. func (sc *SubscriptionScope) Track(s Subscription) Subscription {
  231. sc.mu.Lock()
  232. defer sc.mu.Unlock()
  233. if sc.closed {
  234. return nil
  235. }
  236. if sc.subs == nil {
  237. sc.subs = make(map[*scopeSub]struct{})
  238. }
  239. ss := &scopeSub{sc, s}
  240. sc.subs[ss] = struct{}{}
  241. return ss
  242. }
  243. // Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
  244. // the tracked set. Calls to Track after Close return nil.
  245. func (sc *SubscriptionScope) Close() {
  246. sc.mu.Lock()
  247. defer sc.mu.Unlock()
  248. if sc.closed {
  249. return
  250. }
  251. sc.closed = true
  252. for s := range sc.subs {
  253. s.s.Unsubscribe()
  254. }
  255. sc.subs = nil
  256. }
  257. // Count returns the number of tracked subscriptions.
  258. // It is meant to be used for debugging.
  259. func (sc *SubscriptionScope) Count() int {
  260. sc.mu.Lock()
  261. defer sc.mu.Unlock()
  262. return len(sc.subs)
  263. }
  264. func (s *scopeSub) Unsubscribe() {
  265. s.s.Unsubscribe()
  266. s.sc.mu.Lock()
  267. defer s.sc.mu.Unlock()
  268. delete(s.sc.subs, s)
  269. }
  270. func (s *scopeSub) Err() <-chan error {
  271. return s.s.Err()
  272. }