syncer.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. "fmt"
  20. "strconv"
  21. "time"
  22. "github.com/ethereum/go-ethereum/metrics"
  23. "github.com/ethereum/go-ethereum/swarm/chunk"
  24. "github.com/ethereum/go-ethereum/swarm/log"
  25. "github.com/ethereum/go-ethereum/swarm/storage"
  26. )
  27. const (
  28. BatchSize = 128
  29. )
  30. // SwarmSyncerServer implements an Server for history syncing on bins
  31. // offered streams:
  32. // * live request delivery with or without checkback
  33. // * (live/non-live historical) chunk syncing per proximity bin
  34. type SwarmSyncerServer struct {
  35. correlateId string //used for logging
  36. po uint8
  37. netStore *storage.NetStore
  38. quit chan struct{}
  39. }
  40. // NewSwarmSyncerServer is constructor for SwarmSyncerServer
  41. func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error) {
  42. return &SwarmSyncerServer{
  43. correlateId: correlateId,
  44. po: po,
  45. netStore: netStore,
  46. quit: make(chan struct{}),
  47. }, nil
  48. }
  49. func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) {
  50. streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, _ bool) (Server, error) {
  51. po, err := ParseSyncBinKey(t)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return NewSwarmSyncerServer(po, netStore, fmt.Sprintf("%s|%d", p.ID(), po))
  56. })
  57. // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
  58. // return NewOutgoingProvableSwarmSyncer(po, db)
  59. // })
  60. }
  61. // Close needs to be called on a stream server
  62. func (s *SwarmSyncerServer) Close() {
  63. close(s.quit)
  64. }
  65. // GetData retrieves the actual chunk from netstore
  66. func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
  67. ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
  68. if err != nil {
  69. return nil, err
  70. }
  71. return ch.Data(), nil
  72. }
  73. // SessionIndex returns current storage bin (po) index.
  74. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
  75. return s.netStore.LastPullSubscriptionBinID(s.po)
  76. }
  77. // SetNextBatch retrieves the next batch of hashes from the localstore.
  78. // It expects a range of bin IDs, both ends inclusive in syncing, and returns
  79. // concatenated byte slice of chunk addresses and bin IDs of the first and
  80. // the last one in that slice. The batch may have up to BatchSize number of
  81. // chunk addresses. If at least one chunk is added to the batch and no new chunks
  82. // are added in batchTimeout period, the batch will be returned. This function
  83. // will block until new chunks are received from localstore pull subscription.
  84. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
  85. batchStart := time.Now()
  86. descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
  87. defer stop()
  88. const batchTimeout = 2 * time.Second
  89. var (
  90. batch []byte
  91. batchSize int
  92. batchStartID *uint64
  93. batchEndID uint64
  94. timer *time.Timer
  95. timerC <-chan time.Time
  96. )
  97. defer func(start time.Time) {
  98. metrics.GetOrRegisterResettingTimer("syncer.set-next-batch.total-time", nil).UpdateSince(start)
  99. metrics.GetOrRegisterCounter("syncer.set-next-batch.batch-size", nil).Inc(int64(batchSize))
  100. if timer != nil {
  101. timer.Stop()
  102. }
  103. }(batchStart)
  104. for iterate := true; iterate; {
  105. select {
  106. case d, ok := <-descriptors:
  107. if !ok {
  108. iterate = false
  109. break
  110. }
  111. batch = append(batch, d.Address[:]...)
  112. // This is the most naive approach to label the chunk as synced
  113. // allowing it to be garbage collected. A proper way requires
  114. // validating that the chunk is successfully stored by the peer.
  115. err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
  116. if err != nil {
  117. metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
  118. log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
  119. return nil, 0, 0, nil, err
  120. }
  121. batchSize++
  122. if batchStartID == nil {
  123. // set batch start id only if
  124. // this is the first iteration
  125. batchStartID = &d.BinID
  126. }
  127. batchEndID = d.BinID
  128. if batchSize >= BatchSize {
  129. iterate = false
  130. metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1)
  131. log.Trace("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
  132. }
  133. if timer == nil {
  134. timer = time.NewTimer(batchTimeout)
  135. } else {
  136. log.Trace("syncer pull subscription - stopping timer", "correlateId", s.correlateId)
  137. if !timer.Stop() {
  138. <-timer.C
  139. }
  140. log.Trace("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId)
  141. timer.Reset(batchTimeout)
  142. }
  143. timerC = timer.C
  144. case <-timerC:
  145. // return batch if new chunks are not
  146. // received after some time
  147. iterate = false
  148. metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1)
  149. log.Trace("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
  150. case <-s.quit:
  151. iterate = false
  152. log.Trace("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID)
  153. }
  154. }
  155. if batchStartID == nil {
  156. // if batch start id is not set, return 0
  157. batchStartID = new(uint64)
  158. }
  159. return batch, *batchStartID, batchEndID, nil, nil
  160. }
  161. // SwarmSyncerClient
  162. type SwarmSyncerClient struct {
  163. netStore *storage.NetStore
  164. peer *Peer
  165. stream Stream
  166. }
  167. // NewSwarmSyncerClient is a contructor for provable data exchange syncer
  168. func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) {
  169. return &SwarmSyncerClient{
  170. netStore: netStore,
  171. peer: p,
  172. stream: stream,
  173. }, nil
  174. }
  175. // RegisterSwarmSyncerClient registers the client constructor function for
  176. // to handle incoming sync streams
  177. func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) {
  178. streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
  179. return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live))
  180. })
  181. }
  182. // NeedData
  183. func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
  184. return s.netStore.FetchFunc(ctx, key)
  185. }
  186. // BatchDone
  187. func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
  188. // TODO: reenable this with putter/getter refactored code
  189. // if s.chunker != nil {
  190. // return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
  191. // }
  192. return nil
  193. }
  194. func (s *SwarmSyncerClient) Close() {}
  195. // base for parsing and formating sync bin key
  196. // it must be 2 <= base <= 36
  197. const syncBinKeyBase = 36
  198. // FormatSyncBinKey returns a string representation of
  199. // Kademlia bin number to be used as key for SYNC stream.
  200. func FormatSyncBinKey(bin uint8) string {
  201. return strconv.FormatUint(uint64(bin), syncBinKeyBase)
  202. }
  203. // ParseSyncBinKey parses the string representation
  204. // and returns the Kademlia bin number.
  205. func ParseSyncBinKey(s string) (uint8, error) {
  206. bin, err := strconv.ParseUint(s, syncBinKeyBase, 8)
  207. if err != nil {
  208. return 0, err
  209. }
  210. return uint8(bin), nil
  211. }