subscription_pull.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package localstore
  17. import (
  18. "context"
  19. "errors"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/log"
  23. "github.com/ethereum/go-ethereum/metrics"
  24. "github.com/ethereum/go-ethereum/swarm/chunk"
  25. "github.com/ethereum/go-ethereum/swarm/shed"
  26. "github.com/ethereum/go-ethereum/swarm/spancontext"
  27. "github.com/opentracing/opentracing-go"
  28. olog "github.com/opentracing/opentracing-go/log"
  29. "github.com/syndtr/goleveldb/leveldb"
  30. )
  31. // SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
  32. // Pull syncing index can be only subscribed to a particular proximity order bin. If since
  33. // is not 0, the iteration will start from the first item stored after that id. If until is not 0,
  34. // only chunks stored up to this id will be sent to the channel, and the returned channel will be
  35. // closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop
  36. // function will terminate current and further iterations without errors, and also close the returned channel.
  37. // Make sure that you check the second returned parameter from the channel to stop iteration when its value
  38. // is false.
  39. func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
  40. metricName := "localstore.SubscribePull"
  41. metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
  42. chunkDescriptors := make(chan chunk.Descriptor)
  43. trigger := make(chan struct{}, 1)
  44. db.pullTriggersMu.Lock()
  45. if _, ok := db.pullTriggers[bin]; !ok {
  46. db.pullTriggers[bin] = make([]chan struct{}, 0)
  47. }
  48. db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
  49. db.pullTriggersMu.Unlock()
  50. // send signal for the initial iteration
  51. trigger <- struct{}{}
  52. stopChan := make(chan struct{})
  53. var stopChanOnce sync.Once
  54. // used to provide information from the iterator to
  55. // stop subscription when until chunk descriptor is reached
  56. var errStopSubscription = errors.New("stop subscription")
  57. go func() {
  58. defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
  59. // close the returned chunk.Descriptor channel at the end to
  60. // signal that the subscription is done
  61. defer close(chunkDescriptors)
  62. // sinceItem is the Item from which the next iteration
  63. // should start. The first iteration starts from the first Item.
  64. var sinceItem *shed.Item
  65. if since > 0 {
  66. sinceItem = &shed.Item{
  67. Address: db.addressInBin(bin),
  68. BinID: since,
  69. }
  70. }
  71. first := true // first iteration flag for SkipStartFromItem
  72. for {
  73. select {
  74. case <-trigger:
  75. // iterate until:
  76. // - last index Item is reached
  77. // - subscription stop is called
  78. // - context is done
  79. metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
  80. ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
  81. sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until))
  82. iterStart := time.Now()
  83. var count int
  84. err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
  85. select {
  86. case chunkDescriptors <- chunk.Descriptor{
  87. Address: item.Address,
  88. BinID: item.BinID,
  89. }:
  90. count++
  91. // until chunk descriptor is sent
  92. // break the iteration
  93. if until > 0 && item.BinID >= until {
  94. return true, errStopSubscription
  95. }
  96. // set next iteration start item
  97. // when its chunk is successfully sent to channel
  98. sinceItem = &item
  99. return false, nil
  100. case <-stopChan:
  101. // gracefully stop the iteration
  102. // on stop
  103. return true, nil
  104. case <-db.close:
  105. // gracefully stop the iteration
  106. // on database close
  107. return true, nil
  108. case <-ctx.Done():
  109. return true, ctx.Err()
  110. }
  111. }, &shed.IterateOptions{
  112. StartFrom: sinceItem,
  113. // sinceItem was sent as the last Address in the previous
  114. // iterator call, skip it in this one, but not the item with
  115. // the provided since bin id as it should be sent to a channel
  116. SkipStartFromItem: !first,
  117. Prefix: []byte{bin},
  118. })
  119. totalTimeMetric(metricName+".iter", iterStart)
  120. sp.FinishWithOptions(opentracing.FinishOptions{
  121. LogRecords: []opentracing.LogRecord{
  122. {
  123. Timestamp: time.Now(),
  124. Fields: []olog.Field{olog.Int("count", count)},
  125. },
  126. },
  127. })
  128. if err != nil {
  129. if err == errStopSubscription {
  130. // stop subscription without any errors
  131. // if until is reached
  132. return
  133. }
  134. metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
  135. log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
  136. return
  137. }
  138. first = false
  139. case <-stopChan:
  140. // terminate the subscription
  141. // on stop
  142. return
  143. case <-db.close:
  144. // terminate the subscription
  145. // on database close
  146. return
  147. case <-ctx.Done():
  148. err := ctx.Err()
  149. if err != nil {
  150. log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err)
  151. }
  152. return
  153. }
  154. }
  155. }()
  156. stop = func() {
  157. stopChanOnce.Do(func() {
  158. close(stopChan)
  159. })
  160. db.pullTriggersMu.Lock()
  161. defer db.pullTriggersMu.Unlock()
  162. for i, t := range db.pullTriggers[bin] {
  163. if t == trigger {
  164. db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
  165. break
  166. }
  167. }
  168. }
  169. return chunkDescriptors, stop
  170. }
  171. // LastPullSubscriptionBinID returns chunk bin id of the latest Chunk
  172. // in pull syncing index for a provided bin. If there are no chunks in
  173. // that bin, 0 value is returned.
  174. func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
  175. metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1)
  176. item, err := db.pullIndex.Last([]byte{bin})
  177. if err != nil {
  178. if err == leveldb.ErrNotFound {
  179. return 0, nil
  180. }
  181. return 0, err
  182. }
  183. return item.BinID, nil
  184. }
  185. // triggerPullSubscriptions is used internally for starting iterations
  186. // on Pull subscriptions for a particular bin. When new item with address
  187. // that is in particular bin for DB's baseKey is added to pull index
  188. // this function should be called.
  189. func (db *DB) triggerPullSubscriptions(bin uint8) {
  190. db.pullTriggersMu.RLock()
  191. triggers, ok := db.pullTriggers[bin]
  192. db.pullTriggersMu.RUnlock()
  193. if !ok {
  194. return
  195. }
  196. for _, t := range triggers {
  197. select {
  198. case t <- struct{}{}:
  199. default:
  200. }
  201. }
  202. }
  203. // addressInBin returns an address that is in a specific
  204. // proximity order bin from database base key.
  205. func (db *DB) addressInBin(bin uint8) (addr chunk.Address) {
  206. addr = append([]byte(nil), db.baseKey...)
  207. b := bin / 8
  208. addr[b] = addr[b] ^ (1 << (7 - bin%8))
  209. return addr
  210. }