netstore.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "context"
  19. "time"
  20. "github.com/ethereum/go-ethereum/swarm/log"
  21. "github.com/ethereum/go-ethereum/swarm/spancontext"
  22. opentracing "github.com/opentracing/opentracing-go"
  23. )
  24. var (
  25. // NetStore.Get timeout for get and get retries
  26. // This is the maximum period that the Get will block.
  27. // If it is reached, Get will return ErrChunkNotFound.
  28. netStoreRetryTimeout = 30 * time.Second
  29. // Minimal period between calling get method on NetStore
  30. // on retry. It protects calling get very frequently if
  31. // it returns ErrChunkNotFound very fast.
  32. netStoreMinRetryDelay = 3 * time.Second
  33. // Timeout interval before retrieval is timed out.
  34. // It is used in NetStore.get on waiting for ReqC to be
  35. // closed on a single retrieve request.
  36. searchTimeout = 10 * time.Second
  37. )
  38. // NetStore implements the ChunkStore interface,
  39. // this chunk access layer assumed 2 chunk stores
  40. // local storage eg. LocalStore and network storage eg., NetStore
  41. // access by calling network is blocking with a timeout
  42. type NetStore struct {
  43. localStore *LocalStore
  44. retrieve func(ctx context.Context, chunk *Chunk) error
  45. }
  46. func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore {
  47. return &NetStore{localStore, retrieve}
  48. }
  49. // Get is the entrypoint for local retrieve requests
  50. // waits for response or times out
  51. //
  52. // Get uses get method to retrieve request, but retries if the
  53. // ErrChunkNotFound is returned by get, until the netStoreRetryTimeout
  54. // is reached.
  55. func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
  56. var sp opentracing.Span
  57. ctx, sp = spancontext.StartSpan(
  58. ctx,
  59. "netstore.get.global")
  60. defer sp.Finish()
  61. timer := time.NewTimer(netStoreRetryTimeout)
  62. defer timer.Stop()
  63. // result and resultC provide results from the goroutine
  64. // where NetStore.get is called.
  65. type result struct {
  66. chunk *Chunk
  67. err error
  68. }
  69. resultC := make(chan result)
  70. // quitC ensures that retring goroutine is terminated
  71. // when this function returns.
  72. quitC := make(chan struct{})
  73. defer close(quitC)
  74. // do retries in a goroutine so that the timer can
  75. // force this method to return after the netStoreRetryTimeout.
  76. go func() {
  77. // limiter ensures that NetStore.get is not called more frequently
  78. // then netStoreMinRetryDelay. If NetStore.get takes longer
  79. // then netStoreMinRetryDelay, the next retry call will be
  80. // without a delay.
  81. limiter := time.NewTimer(netStoreMinRetryDelay)
  82. defer limiter.Stop()
  83. for {
  84. chunk, err := ns.get(ctx, addr, 0)
  85. if err != ErrChunkNotFound {
  86. // break retry only if the error is nil
  87. // or other error then ErrChunkNotFound
  88. select {
  89. case <-quitC:
  90. // Maybe NetStore.Get function has returned
  91. // by the timer.C while we were waiting for the
  92. // results. Terminate this goroutine.
  93. case resultC <- result{chunk: chunk, err: err}:
  94. // Send the result to the parrent goroutine.
  95. }
  96. return
  97. }
  98. select {
  99. case <-quitC:
  100. // NetStore.Get function has returned, possibly
  101. // by the timer.C, which makes this goroutine
  102. // not needed.
  103. return
  104. case <-limiter.C:
  105. }
  106. // Reset the limiter for the next iteration.
  107. limiter.Reset(netStoreMinRetryDelay)
  108. log.Debug("NetStore.Get retry chunk", "key", addr)
  109. }
  110. }()
  111. select {
  112. case r := <-resultC:
  113. return r.chunk, r.err
  114. case <-timer.C:
  115. return nil, ErrChunkNotFound
  116. }
  117. }
  118. // GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter
  119. func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
  120. return ns.get(ctx, addr, timeout)
  121. }
  122. func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
  123. if timeout == 0 {
  124. timeout = searchTimeout
  125. }
  126. var sp opentracing.Span
  127. ctx, sp = spancontext.StartSpan(
  128. ctx,
  129. "netstore.get")
  130. defer sp.Finish()
  131. if ns.retrieve == nil {
  132. chunk, err = ns.localStore.Get(ctx, addr)
  133. if err == nil {
  134. return chunk, nil
  135. }
  136. if err != ErrFetching {
  137. return nil, err
  138. }
  139. } else {
  140. var created bool
  141. chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr)
  142. if chunk.ReqC == nil {
  143. return chunk, nil
  144. }
  145. if created {
  146. err := ns.retrieve(ctx, chunk)
  147. if err != nil {
  148. // mark chunk request as failed so that we can retry it later
  149. chunk.SetErrored(ErrChunkUnavailable)
  150. return nil, err
  151. }
  152. }
  153. }
  154. t := time.NewTicker(timeout)
  155. defer t.Stop()
  156. select {
  157. case <-t.C:
  158. // mark chunk request as failed so that we can retry
  159. chunk.SetErrored(ErrChunkNotFound)
  160. return nil, ErrChunkNotFound
  161. case <-chunk.ReqC:
  162. }
  163. chunk.SetErrored(nil)
  164. return chunk, nil
  165. }
  166. // Put is the entrypoint for local store requests coming from storeLoop
  167. func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) {
  168. ns.localStore.Put(ctx, chunk)
  169. }
  170. // Close chunk store
  171. func (ns *NetStore) Close() {
  172. ns.localStore.Close()
  173. }