| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package storage
- import (
- "context"
- "time"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
- opentracing "github.com/opentracing/opentracing-go"
- )
- var (
- // NetStore.Get timeout for get and get retries
- // This is the maximum period that the Get will block.
- // If it is reached, Get will return ErrChunkNotFound.
- netStoreRetryTimeout = 30 * time.Second
- // Minimal period between calling get method on NetStore
- // on retry. It protects calling get very frequently if
- // it returns ErrChunkNotFound very fast.
- netStoreMinRetryDelay = 3 * time.Second
- // Timeout interval before retrieval is timed out.
- // It is used in NetStore.get on waiting for ReqC to be
- // closed on a single retrieve request.
- searchTimeout = 10 * time.Second
- )
- // NetStore implements the ChunkStore interface,
- // this chunk access layer assumed 2 chunk stores
- // local storage eg. LocalStore and network storage eg., NetStore
- // access by calling network is blocking with a timeout
- type NetStore struct {
- localStore *LocalStore
- retrieve func(ctx context.Context, chunk *Chunk) error
- }
- func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore {
- return &NetStore{localStore, retrieve}
- }
- // Get is the entrypoint for local retrieve requests
- // waits for response or times out
- //
- // Get uses get method to retrieve request, but retries if the
- // ErrChunkNotFound is returned by get, until the netStoreRetryTimeout
- // is reached.
- func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "netstore.get.global")
- defer sp.Finish()
- timer := time.NewTimer(netStoreRetryTimeout)
- defer timer.Stop()
- // result and resultC provide results from the goroutine
- // where NetStore.get is called.
- type result struct {
- chunk *Chunk
- err error
- }
- resultC := make(chan result)
- // quitC ensures that retring goroutine is terminated
- // when this function returns.
- quitC := make(chan struct{})
- defer close(quitC)
- // do retries in a goroutine so that the timer can
- // force this method to return after the netStoreRetryTimeout.
- go func() {
- // limiter ensures that NetStore.get is not called more frequently
- // then netStoreMinRetryDelay. If NetStore.get takes longer
- // then netStoreMinRetryDelay, the next retry call will be
- // without a delay.
- limiter := time.NewTimer(netStoreMinRetryDelay)
- defer limiter.Stop()
- for {
- chunk, err := ns.get(ctx, addr, 0)
- if err != ErrChunkNotFound {
- // break retry only if the error is nil
- // or other error then ErrChunkNotFound
- select {
- case <-quitC:
- // Maybe NetStore.Get function has returned
- // by the timer.C while we were waiting for the
- // results. Terminate this goroutine.
- case resultC <- result{chunk: chunk, err: err}:
- // Send the result to the parrent goroutine.
- }
- return
- }
- select {
- case <-quitC:
- // NetStore.Get function has returned, possibly
- // by the timer.C, which makes this goroutine
- // not needed.
- return
- case <-limiter.C:
- }
- // Reset the limiter for the next iteration.
- limiter.Reset(netStoreMinRetryDelay)
- log.Debug("NetStore.Get retry chunk", "key", addr)
- }
- }()
- select {
- case r := <-resultC:
- return r.chunk, r.err
- case <-timer.C:
- return nil, ErrChunkNotFound
- }
- }
- // GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter
- func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
- return ns.get(ctx, addr, timeout)
- }
- func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
- if timeout == 0 {
- timeout = searchTimeout
- }
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "netstore.get")
- defer sp.Finish()
- if ns.retrieve == nil {
- chunk, err = ns.localStore.Get(ctx, addr)
- if err == nil {
- return chunk, nil
- }
- if err != ErrFetching {
- return nil, err
- }
- } else {
- var created bool
- chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr)
- if chunk.ReqC == nil {
- return chunk, nil
- }
- if created {
- err := ns.retrieve(ctx, chunk)
- if err != nil {
- // mark chunk request as failed so that we can retry it later
- chunk.SetErrored(ErrChunkUnavailable)
- return nil, err
- }
- }
- }
- t := time.NewTicker(timeout)
- defer t.Stop()
- select {
- case <-t.C:
- // mark chunk request as failed so that we can retry
- chunk.SetErrored(ErrChunkNotFound)
- return nil, ErrChunkNotFound
- case <-chunk.ReqC:
- }
- chunk.SetErrored(nil)
- return chunk, nil
- }
- // Put is the entrypoint for local store requests coming from storeLoop
- func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) {
- ns.localStore.Put(ctx, chunk)
- }
- // Close chunk store
- func (ns *NetStore) Close() {
- ns.localStore.Close()
- }
|