api.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "context"
  19. "sync"
  20. "github.com/ethereum/go-ethereum"
  21. "github.com/ethereum/go-ethereum/common/gopool"
  22. "github.com/ethereum/go-ethereum/event"
  23. "github.com/ethereum/go-ethereum/rpc"
  24. )
  25. // PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
  26. // It offers only methods that operates on data that can be available to anyone without security risks.
  27. type PublicDownloaderAPI struct {
  28. d *Downloader
  29. mux *event.TypeMux
  30. installSyncSubscription chan chan interface{}
  31. uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
  32. }
  33. // NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
  34. // listens for events from the downloader through the global event mux. In case it receives one of
  35. // these events it broadcasts it to all syncing subscriptions that are installed through the
  36. // installSyncSubscription channel.
  37. func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
  38. api := &PublicDownloaderAPI{
  39. d: d,
  40. mux: m,
  41. installSyncSubscription: make(chan chan interface{}),
  42. uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
  43. }
  44. go api.eventLoop()
  45. return api
  46. }
  47. // eventLoop runs a loop until the event mux closes. It will install and uninstall new
  48. // sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
  49. func (api *PublicDownloaderAPI) eventLoop() {
  50. var (
  51. sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
  52. syncSubscriptions = make(map[chan interface{}]struct{})
  53. )
  54. for {
  55. select {
  56. case i := <-api.installSyncSubscription:
  57. syncSubscriptions[i] = struct{}{}
  58. case u := <-api.uninstallSyncSubscription:
  59. delete(syncSubscriptions, u.c)
  60. close(u.uninstalled)
  61. case event := <-sub.Chan():
  62. if event == nil {
  63. return
  64. }
  65. var notification interface{}
  66. switch event.Data.(type) {
  67. case StartEvent:
  68. notification = &SyncingResult{
  69. Syncing: true,
  70. Status: api.d.Progress(),
  71. }
  72. case DoneEvent, FailedEvent:
  73. notification = false
  74. }
  75. // broadcast
  76. for c := range syncSubscriptions {
  77. c <- notification
  78. }
  79. }
  80. }
  81. }
  82. // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
  83. func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
  84. notifier, supported := rpc.NotifierFromContext(ctx)
  85. if !supported {
  86. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  87. }
  88. rpcSub := notifier.CreateSubscription()
  89. gopool.Submit(func() {
  90. statuses := make(chan interface{})
  91. sub := api.SubscribeSyncStatus(statuses)
  92. for {
  93. select {
  94. case status := <-statuses:
  95. notifier.Notify(rpcSub.ID, status)
  96. case <-rpcSub.Err():
  97. sub.Unsubscribe()
  98. return
  99. case <-notifier.Closed():
  100. sub.Unsubscribe()
  101. return
  102. }
  103. }
  104. })
  105. return rpcSub, nil
  106. }
  107. // SyncingResult provides information about the current synchronisation status for this node.
  108. type SyncingResult struct {
  109. Syncing bool `json:"syncing"`
  110. Status ethereum.SyncProgress `json:"status"`
  111. }
  112. // uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
  113. type uninstallSyncSubscriptionRequest struct {
  114. c chan interface{}
  115. uninstalled chan interface{}
  116. }
  117. // SyncStatusSubscription represents a syncing subscription.
  118. type SyncStatusSubscription struct {
  119. api *PublicDownloaderAPI // register subscription in event loop of this api instance
  120. c chan interface{} // channel where events are broadcasted to
  121. unsubOnce sync.Once // make sure unsubscribe logic is executed once
  122. }
  123. // Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
  124. // The status channel that was passed to subscribeSyncStatus isn't used anymore
  125. // after this method returns.
  126. func (s *SyncStatusSubscription) Unsubscribe() {
  127. s.unsubOnce.Do(func() {
  128. req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
  129. s.api.uninstallSyncSubscription <- &req
  130. for {
  131. select {
  132. case <-s.c:
  133. // drop new status events until uninstall confirmation
  134. continue
  135. case <-req.uninstalled:
  136. return
  137. }
  138. }
  139. })
  140. }
  141. // SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
  142. // The given channel must receive interface values, the result can either
  143. func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
  144. api.installSyncSubscription <- status
  145. return &SyncStatusSubscription{api: api, c: status}
  146. }