| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- // Copyright 2021 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package downloader
- import (
- "errors"
- "sort"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/prque"
- "github.com/ethereum/go-ethereum/eth/protocols/eth"
- "github.com/ethereum/go-ethereum/log"
- )
- // timeoutGracePeriod is the amount of time to allow for a peer to deliver a
- // response to a locally already timed out request. Timeouts are not penalized
- // as a peer might be temporarily overloaded, however, they still must reply
- // to each request. Failing to do so is considered a protocol violation.
- var timeoutGracePeriod = 2 * time.Minute
- // typedQueue is an interface defining the adaptor needed to translate the type
- // specific downloader/queue schedulers into the type-agnostic general concurrent
- // fetcher algorithm calls.
- type typedQueue interface {
- // waker returns a notification channel that gets pinged in case more fetches
- // have been queued up, so the fetcher might assign it to idle peers.
- waker() chan bool
- // pending returns the number of wrapped items that are currently queued for
- // fetching by the concurrent downloader.
- pending() int
- // capacity is responsible for calculating how many items of the abstracted
- // type a particular peer is estimated to be able to retrieve within the
- // allotted round trip time.
- capacity(peer *peerConnection, rtt time.Duration) int
- // updateCapacity is responsible for updating how many items of the abstracted
- // type a particular peer is estimated to be able to retrieve in a unit time.
- updateCapacity(peer *peerConnection, items int, elapsed time.Duration)
- // reserve is responsible for allocating a requested number of pending items
- // from the download queue to the specified peer.
- reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool)
- // unreserve is responsible for removing the current retrieval allocation
- // assigned to a specific peer and placing it back into the pool to allow
- // reassigning to some other peer.
- unreserve(peer string) int
- // request is responsible for converting a generic fetch request into a typed
- // one and sending it to the remote peer for fulfillment.
- request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error)
- // deliver is responsible for taking a generic response packet from the
- // concurrent fetcher, unpacking the type specific data and delivering
- // it to the downloader's queue.
- deliver(peer *peerConnection, packet *eth.Response) (int, error)
- }
- // concurrentFetch iteratively downloads scheduled block parts, taking available
- // peers, reserving a chunk of fetch requests for each and waiting for delivery
- // or timeouts.
- func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
- // Create a delivery channel to accept responses from all peers
- responses := make(chan *eth.Response)
- // Track the currently active requests and their timeout order
- pending := make(map[string]*eth.Request)
- defer func() {
- // Abort all requests on sync cycle cancellation. The requests may still
- // be fulfilled by the remote side, but the dispatcher will not wait to
- // deliver them since nobody's going to be listening.
- for _, req := range pending {
- req.Close()
- }
- }()
- ordering := make(map[*eth.Request]int)
- timeouts := prque.New(func(data interface{}, index int) {
- ordering[data.(*eth.Request)] = index
- })
- timeout := time.NewTimer(0)
- if !timeout.Stop() {
- <-timeout.C
- }
- defer timeout.Stop()
- // Track the timed-out but not-yet-answered requests separately. We want to
- // keep tracking which peers are busy (potentially overloaded), so removing
- // all trace of a timed out request is not good. We also can't just cancel
- // the pending request altogether as that would prevent a late response from
- // being delivered, thus never unblocking the peer.
- stales := make(map[string]*eth.Request)
- defer func() {
- // Abort all requests on sync cycle cancellation. The requests may still
- // be fulfilled by the remote side, but the dispatcher will not wait to
- // deliver them since nobody's going to be listening.
- for _, req := range stales {
- req.Close()
- }
- }()
- // Subscribe to peer lifecycle events to schedule tasks to new joiners and
- // reschedule tasks upon disconnections. We don't care which event happened
- // for simplicity, so just use a single channel.
- peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection
- peeringSub := d.peers.SubscribeEvents(peering)
- defer peeringSub.Unsubscribe()
- // Prepare the queue and fetch block parts until the block header fetcher's done
- finished := false
- for {
- // Short circuit if we lost all our peers
- if d.peers.Len() == 0 && !beaconMode {
- return errNoPeers
- }
- // If there's nothing more to fetch, wait or terminate
- if queue.pending() == 0 {
- if len(pending) == 0 && finished {
- return nil
- }
- } else {
- // Send a download request to all idle peers, until throttled
- var (
- idles []*peerConnection
- caps []int
- )
- for _, peer := range d.peers.AllPeers() {
- pending, stale := pending[peer.id], stales[peer.id]
- if pending == nil && stale == nil {
- idles = append(idles, peer)
- caps = append(caps, queue.capacity(peer, time.Second))
- } else if stale != nil {
- if waited := time.Since(stale.Sent); waited > timeoutGracePeriod {
- // Request has been in flight longer than the grace period
- // permitted it, consider the peer malicious attempting to
- // stall the sync.
- peer.log.Warn("Peer stalling, dropping", "waited", common.PrettyDuration(waited))
- d.dropPeer(peer.id)
- }
- }
- }
- sort.Sort(&peerCapacitySort{idles, caps})
- var (
- progressed bool
- throttled bool
- queued = queue.pending()
- )
- for _, peer := range idles {
- // Short circuit if throttling activated or there are no more
- // queued tasks to be retrieved
- if throttled {
- break
- }
- if queued = queue.pending(); queued == 0 {
- break
- }
- // Reserve a chunk of fetches for a peer. A nil can mean either that
- // no more headers are available, or that the peer is known not to
- // have them.
- request, progress, throttle := queue.reserve(peer, queue.capacity(peer, d.peers.rates.TargetRoundTrip()))
- if progress {
- progressed = true
- }
- if throttle {
- throttled = true
- throttleCounter.Inc(1)
- }
- if request == nil {
- continue
- }
- // Fetch the chunk and make sure any errors return the hashes to the queue
- req, err := queue.request(peer, request, responses)
- if err != nil {
- // Sending the request failed, which generally means the peer
- // was disconnected in between assignment and network send.
- // Although all peer removal operations return allocated tasks
- // to the queue, that is async, and we can do better here by
- // immediately pushing the unfulfilled requests.
- queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
- continue
- }
- pending[peer.id] = req
- ttl := d.peers.rates.TargetTimeout()
- ordering[req] = timeouts.Size()
- timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
- if timeouts.Size() == 1 {
- timeout.Reset(ttl)
- }
- }
- // Make sure that we have peers available for fetching. If all peers have been tried
- // and all failed throw an error
- if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
- return errPeersUnavailable
- }
- }
- // Wait for something to happen
- select {
- case <-d.cancelCh:
- // If sync was cancelled, tear down the parallel retriever. Pending
- // requests will be cancelled locally, and the remote responses will
- // be dropped when they arrive
- return errCanceled
- case event := <-peering:
- // A peer joined or left, the tasks queue and allocations need to be
- // checked for potential assignment or reassignment
- peerid := event.peer.id
- if event.join {
- // Sanity check the internal state; this can be dropped later
- if _, ok := pending[peerid]; ok {
- event.peer.log.Error("Pending request exists for joining peer")
- }
- if _, ok := stales[peerid]; ok {
- event.peer.log.Error("Stale request exists for joining peer")
- }
- // Loop back to the entry point for task assignment
- continue
- }
- // A peer left, any existing requests need to be untracked, pending
- // tasks returned and possible reassignment checked
- if req, ok := pending[peerid]; ok {
- queue.unreserve(peerid) // TODO(karalabe): This needs a non-expiration method
- delete(pending, peerid)
- req.Close()
- if index, live := ordering[req]; live {
- timeouts.Remove(index)
- if index == 0 {
- if !timeout.Stop() {
- <-timeout.C
- }
- if timeouts.Size() > 0 {
- _, exp := timeouts.Peek()
- timeout.Reset(time.Until(time.Unix(0, -exp)))
- }
- }
- delete(ordering, req)
- }
- }
- if req, ok := stales[peerid]; ok {
- delete(stales, peerid)
- req.Close()
- }
- case <-timeout.C:
- // Retrieve the next request which should have timed out. The check
- // below is purely for to catch programming errors, given the correct
- // code, there's no possible order of events that should result in a
- // timeout firing for a non-existent event.
- item, exp := timeouts.Peek()
- if now, at := time.Now(), time.Unix(0, -exp); now.Before(at) {
- log.Error("Timeout triggered but not reached", "left", at.Sub(now))
- timeout.Reset(at.Sub(now))
- continue
- }
- req := item.(*eth.Request)
- // Stop tracking the timed out request from a timing perspective,
- // cancel it, so it's not considered in-flight anymore, but keep
- // the peer marked busy to prevent assigning a second request and
- // overloading it further.
- delete(pending, req.Peer)
- stales[req.Peer] = req
- delete(ordering, req)
- timeouts.Pop()
- if timeouts.Size() > 0 {
- _, exp := timeouts.Peek()
- timeout.Reset(time.Until(time.Unix(0, -exp)))
- }
- // New timeout potentially set if there are more requests pending,
- // reschedule the failed one to a free peer
- fails := queue.unreserve(req.Peer)
- // Finally, update the peer's retrieval capacity, or if it's already
- // below the minimum allowance, drop the peer. If a lot of retrieval
- // elements expired, we might have overestimated the remote peer or
- // perhaps ourselves. Only reset to minimal throughput but don't drop
- // just yet.
- //
- // The reason the minimum threshold is 2 is that the downloader tries
- // to estimate the bandwidth and latency of a peer separately, which
- // requires pushing the measured capacity a bit and seeing how response
- // times reacts, to it always requests one more than the minimum (i.e.
- // min 2).
- peer := d.peers.Peer(req.Peer)
- if peer == nil {
- // If the peer got disconnected in between, we should really have
- // short-circuited it already. Just in case there's some strange
- // codepath, leave this check in not to crash.
- log.Error("Delivery timeout from unknown peer", "peer", req.Peer)
- continue
- }
- if fails > 2 {
- queue.updateCapacity(peer, 0, 0)
- } else {
- d.dropPeer(peer.id)
- // If this peer was the master peer, abort sync immediately
- d.cancelLock.RLock()
- master := peer.id == d.cancelPeer
- d.cancelLock.RUnlock()
- if master {
- d.cancel()
- return errTimeout
- }
- }
- case res := <-responses:
- // Response arrived, it may be for an existing or an already timed
- // out request. If the former, update the timeout heap and perhaps
- // reschedule the timeout timer.
- index, live := ordering[res.Req]
- if live {
- timeouts.Remove(index)
- if index == 0 {
- if !timeout.Stop() {
- <-timeout.C
- }
- if timeouts.Size() > 0 {
- _, exp := timeouts.Peek()
- timeout.Reset(time.Until(time.Unix(0, -exp)))
- }
- }
- delete(ordering, res.Req)
- }
- // Delete the pending request (if it still exists) and mark the peer idle
- delete(pending, res.Req.Peer)
- delete(stales, res.Req.Peer)
- // Signal the dispatcher that the round trip is done. We'll drop the
- // peer if the data turns out to be junk.
- res.Done <- nil
- res.Req.Close()
- // If the peer was previously banned and failed to deliver its pack
- // in a reasonable time frame, ignore its message.
- if peer := d.peers.Peer(res.Req.Peer); peer != nil {
- // Deliver the received chunk of data and check chain validity
- accepted, err := queue.deliver(peer, res)
- if errors.Is(err, errInvalidChain) {
- return err
- }
- // Unless a peer delivered something completely else than requested (usually
- // caused by a timed out request which came through in the end), set it to
- // idle. If the delivery's stale, the peer should have already been idled.
- if !errors.Is(err, errStaleDelivery) {
- queue.updateCapacity(peer, accepted, res.Time)
- }
- }
- case cont := <-queue.waker():
- // The header fetcher sent a continuation flag, check if it's done
- if !cont {
- finished = true
- }
- }
- }
- }
|