netstore.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "context"
  19. "encoding/hex"
  20. "fmt"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/p2p/enode"
  25. "github.com/ethereum/go-ethereum/swarm/log"
  26. "github.com/ethereum/go-ethereum/swarm/spancontext"
  27. "github.com/opentracing/opentracing-go"
  28. olog "github.com/opentracing/opentracing-go/log"
  29. "github.com/syndtr/goleveldb/leveldb"
  30. lru "github.com/hashicorp/golang-lru"
  31. )
  32. type (
  33. NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher
  34. )
  35. type NetFetcher interface {
  36. Request(hopCount uint8)
  37. Offer(source *enode.ID)
  38. }
  39. // NetStore is an extension of local storage
  40. // it implements the ChunkStore interface
  41. // on request it initiates remote cloud retrieval using a fetcher
  42. // fetchers are unique to a chunk and are stored in fetchers LRU memory cache
  43. // fetchFuncFactory is a factory object to create a fetch function for a specific chunk address
  44. type NetStore struct {
  45. mu sync.Mutex
  46. store SyncChunkStore
  47. fetchers *lru.Cache
  48. NewNetFetcherFunc NewNetFetcherFunc
  49. closeC chan struct{}
  50. }
  51. var fetcherTimeout = 2 * time.Minute // timeout to cancel the fetcher even if requests are coming in
  52. // NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a
  53. // constructor function that can create a fetch function for a specific chunk address.
  54. func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) {
  55. fetchers, err := lru.New(defaultChunkRequestsCacheCapacity)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return &NetStore{
  60. store: store,
  61. fetchers: fetchers,
  62. NewNetFetcherFunc: nnf,
  63. closeC: make(chan struct{}),
  64. }, nil
  65. }
  66. // Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in
  67. // the fetchers cache
  68. func (n *NetStore) Put(ctx context.Context, ch Chunk) error {
  69. n.mu.Lock()
  70. defer n.mu.Unlock()
  71. // put to the chunk to the store, there should be no error
  72. err := n.store.Put(ctx, ch)
  73. if err != nil {
  74. return err
  75. }
  76. // if chunk is now put in the store, check if there was an active fetcher and call deliver on it
  77. // (this delivers the chunk to requestors via the fetcher)
  78. log.Trace("n.getFetcher", "ref", ch.Address())
  79. if f := n.getFetcher(ch.Address()); f != nil {
  80. log.Trace("n.getFetcher deliver", "ref", ch.Address())
  81. f.deliver(ctx, ch)
  82. }
  83. return nil
  84. }
  85. // Get retrieves the chunk from the NetStore DPA synchronously.
  86. // It calls NetStore.get, and if the chunk is not in local Storage
  87. // it calls fetch with the request, which blocks until the chunk
  88. // arrived or context is done
  89. func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) {
  90. chunk, fetch, err := n.get(rctx, ref)
  91. if err != nil {
  92. return nil, err
  93. }
  94. if chunk != nil {
  95. // this is not measuring how long it takes to get the chunk for the localstore, but
  96. // rather just adding a span for clarity when inspecting traces in Jaeger, in order
  97. // to make it easier to reason which is the node that actually delivered a chunk.
  98. _, sp := spancontext.StartSpan(
  99. rctx,
  100. "localstore.get")
  101. defer sp.Finish()
  102. return chunk, nil
  103. }
  104. return fetch(rctx)
  105. }
  106. func (n *NetStore) BinIndex(po uint8) uint64 {
  107. return n.store.BinIndex(po)
  108. }
  109. func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
  110. return n.store.Iterator(from, to, po, f)
  111. }
  112. // FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function,
  113. // which returns after the chunk is available or the context is done
  114. func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error {
  115. chunk, fetch, _ := n.get(ctx, ref)
  116. if chunk != nil {
  117. return nil
  118. }
  119. return func(ctx context.Context) error {
  120. _, err := fetch(ctx)
  121. return err
  122. }
  123. }
  124. // Close chunk store
  125. func (n *NetStore) Close() {
  126. close(n.closeC)
  127. n.store.Close()
  128. wg := sync.WaitGroup{}
  129. for _, key := range n.fetchers.Keys() {
  130. if f, ok := n.fetchers.Get(key); ok {
  131. if fetch, ok := f.(*fetcher); ok {
  132. wg.Add(1)
  133. go func(fetch *fetcher) {
  134. defer wg.Done()
  135. fetch.cancel()
  136. select {
  137. case <-fetch.deliveredC:
  138. case <-fetch.cancelledC:
  139. }
  140. }(fetch)
  141. }
  142. }
  143. }
  144. wg.Wait()
  145. }
  146. // get attempts at retrieving the chunk from LocalStore
  147. // If it is not found then using getOrCreateFetcher:
  148. // 1. Either there is already a fetcher to retrieve it
  149. // 2. A new fetcher is created and saved in the fetchers cache
  150. // From here on, all Get will hit on this fetcher until the chunk is delivered
  151. // or all fetcher contexts are done.
  152. // It returns a chunk, a fetcher function and an error
  153. // If chunk is nil, the returned fetch function needs to be called with a context to return the chunk.
  154. func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) {
  155. n.mu.Lock()
  156. defer n.mu.Unlock()
  157. chunk, err := n.store.Get(ctx, ref)
  158. if err != nil {
  159. // TODO: Fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped.
  160. if err != ErrChunkNotFound && err != leveldb.ErrNotFound {
  161. log.Debug("Received error from LocalStore other than ErrNotFound", "err", err)
  162. }
  163. // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
  164. // if it doesn't exist yet
  165. f := n.getOrCreateFetcher(ctx, ref)
  166. // If the caller needs the chunk, it has to use the returned fetch function to get it
  167. return nil, f.Fetch, nil
  168. }
  169. return chunk, nil, nil
  170. }
  171. // Has is the storage layer entry point to query the underlying
  172. // database to return if it has a chunk or not.
  173. // Called from the DebugAPI
  174. func (n *NetStore) Has(ctx context.Context, ref Address) bool {
  175. return n.store.Has(ctx, ref)
  176. }
  177. // getOrCreateFetcher attempts at retrieving an existing fetchers
  178. // if none exists, creates one and saves it in the fetchers cache
  179. // caller must hold the lock
  180. func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher {
  181. if f := n.getFetcher(ref); f != nil {
  182. return f
  183. }
  184. // no fetcher for the given address, we have to create a new one
  185. key := hex.EncodeToString(ref)
  186. // create the context during which fetching is kept alive
  187. cctx, cancel := context.WithTimeout(ctx, fetcherTimeout)
  188. // destroy is called when all requests finish
  189. destroy := func() {
  190. // remove fetcher from fetchers
  191. n.fetchers.Remove(key)
  192. // stop fetcher by cancelling context called when
  193. // all requests cancelled/timedout or chunk is delivered
  194. cancel()
  195. }
  196. // peers always stores all the peers which have an active request for the chunk. It is shared
  197. // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because
  198. // the peers which requested the chunk should not be requested to deliver it.
  199. peers := &sync.Map{}
  200. cctx, sp := spancontext.StartSpan(
  201. cctx,
  202. "netstore.fetcher",
  203. )
  204. sp.LogFields(olog.String("ref", ref.String()))
  205. fetcher := newFetcher(sp, ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
  206. n.fetchers.Add(key, fetcher)
  207. return fetcher
  208. }
  209. // getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists,
  210. // otherwise it returns nil
  211. func (n *NetStore) getFetcher(ref Address) *fetcher {
  212. key := hex.EncodeToString(ref)
  213. f, ok := n.fetchers.Get(key)
  214. if ok {
  215. return f.(*fetcher)
  216. }
  217. return nil
  218. }
  219. // RequestsCacheLen returns the current number of outgoing requests stored in the cache
  220. func (n *NetStore) RequestsCacheLen() int {
  221. return n.fetchers.Len()
  222. }
  223. // One fetcher object is responsible to fetch one chunk for one address, and keep track of all the
  224. // peers who have requested it and did not receive it yet.
  225. type fetcher struct {
  226. addr Address // address of chunk
  227. chunk Chunk // fetcher can set the chunk on the fetcher
  228. deliveredC chan struct{} // chan signalling chunk delivery to requests
  229. cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore)
  230. netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context
  231. cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called
  232. peers *sync.Map // the peers which asked for the chunk
  233. requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called
  234. deliverOnce *sync.Once // guarantees that we only close deliveredC once
  235. span opentracing.Span // measure retrieve time per chunk
  236. }
  237. // newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually
  238. // does the retrieval (in non-test cases this is coming from the network package). cancel function is
  239. // called either
  240. // 1. when the chunk has been fetched all peers have been either notified or their context has been done
  241. // 2. the chunk has not been fetched but all context from all the requests has been done
  242. // The peers map stores all the peers which have requested chunk.
  243. func newFetcher(span opentracing.Span, addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher {
  244. cancelOnce := &sync.Once{} // cancel should only be called once
  245. return &fetcher{
  246. addr: addr,
  247. deliveredC: make(chan struct{}),
  248. deliverOnce: &sync.Once{},
  249. cancelledC: closeC,
  250. netFetcher: nf,
  251. cancel: func() {
  252. cancelOnce.Do(func() {
  253. cancel()
  254. })
  255. },
  256. peers: peers,
  257. span: span,
  258. }
  259. }
  260. // Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available
  261. // locally.
  262. func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
  263. atomic.AddInt32(&f.requestCnt, 1)
  264. defer func() {
  265. // if all the requests are done the fetcher can be cancelled
  266. if atomic.AddInt32(&f.requestCnt, -1) == 0 {
  267. f.cancel()
  268. }
  269. f.span.Finish()
  270. }()
  271. // The peer asking for the chunk. Store in the shared peers map, but delete after the request
  272. // has been delivered
  273. peer := rctx.Value("peer")
  274. if peer != nil {
  275. f.peers.Store(peer, time.Now())
  276. defer f.peers.Delete(peer)
  277. }
  278. // If there is a source in the context then it is an offer, otherwise a request
  279. sourceIF := rctx.Value("source")
  280. hopCount, _ := rctx.Value("hopcount").(uint8)
  281. if sourceIF != nil {
  282. var source enode.ID
  283. if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
  284. return nil, err
  285. }
  286. f.netFetcher.Offer(&source)
  287. } else {
  288. f.netFetcher.Request(hopCount)
  289. }
  290. // wait until either the chunk is delivered or the context is done
  291. select {
  292. case <-rctx.Done():
  293. return nil, rctx.Err()
  294. case <-f.deliveredC:
  295. return f.chunk, nil
  296. case <-f.cancelledC:
  297. return nil, fmt.Errorf("fetcher cancelled")
  298. }
  299. }
  300. // deliver is called by NetStore.Put to notify all pending requests
  301. func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
  302. f.deliverOnce.Do(func() {
  303. f.chunk = ch
  304. // closing the deliveredC channel will terminate ongoing requests
  305. close(f.deliveredC)
  306. log.Trace("n.getFetcher close deliveredC", "ref", ch.Address())
  307. })
  308. }