fetchers_concurrent.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. // Copyright 2021 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "errors"
  19. "sort"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  24. "github.com/ethereum/go-ethereum/log"
  25. )
  26. // timeoutGracePeriod is the amount of time to allow for a peer to deliver a
  27. // response to a locally already timed out request. Timeouts are not penalized
  28. // as a peer might be temporarily overloaded, however, they still must reply
  29. // to each request. Failing to do so is considered a protocol violation.
  30. var timeoutGracePeriod = 2 * time.Minute
  31. // typedQueue is an interface defining the adaptor needed to translate the type
  32. // specific downloader/queue schedulers into the type-agnostic general concurrent
  33. // fetcher algorithm calls.
  34. type typedQueue interface {
  35. // waker returns a notification channel that gets pinged in case more fetches
  36. // have been queued up, so the fetcher might assign it to idle peers.
  37. waker() chan bool
  38. // pending returns the number of wrapped items that are currently queued for
  39. // fetching by the concurrent downloader.
  40. pending() int
  41. // capacity is responsible for calculating how many items of the abstracted
  42. // type a particular peer is estimated to be able to retrieve within the
  43. // allotted round trip time.
  44. capacity(peer *peerConnection, rtt time.Duration) int
  45. // updateCapacity is responsible for updating how many items of the abstracted
  46. // type a particular peer is estimated to be able to retrieve in a unit time.
  47. updateCapacity(peer *peerConnection, items int, elapsed time.Duration)
  48. // reserve is responsible for allocating a requested number of pending items
  49. // from the download queue to the specified peer.
  50. reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool)
  51. // unreserve is responsible for removing the current retrieval allocation
  52. // assigned to a specific peer and placing it back into the pool to allow
  53. // reassigning to some other peer.
  54. unreserve(peer string) int
  55. // request is responsible for converting a generic fetch request into a typed
  56. // one and sending it to the remote peer for fulfillment.
  57. request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error)
  58. // deliver is responsible for taking a generic response packet from the
  59. // concurrent fetcher, unpacking the type specific data and delivering
  60. // it to the downloader's queue.
  61. deliver(peer *peerConnection, packet *eth.Response) (int, error)
  62. }
  63. // concurrentFetch iteratively downloads scheduled block parts, taking available
  64. // peers, reserving a chunk of fetch requests for each and waiting for delivery
  65. // or timeouts.
  66. func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
  67. // Create a delivery channel to accept responses from all peers
  68. responses := make(chan *eth.Response)
  69. // Track the currently active requests and their timeout order
  70. pending := make(map[string]*eth.Request)
  71. defer func() {
  72. // Abort all requests on sync cycle cancellation. The requests may still
  73. // be fulfilled by the remote side, but the dispatcher will not wait to
  74. // deliver them since nobody's going to be listening.
  75. for _, req := range pending {
  76. req.Close()
  77. }
  78. }()
  79. ordering := make(map[*eth.Request]int)
  80. timeouts := prque.New(func(data interface{}, index int) {
  81. ordering[data.(*eth.Request)] = index
  82. })
  83. timeout := time.NewTimer(0)
  84. if !timeout.Stop() {
  85. <-timeout.C
  86. }
  87. defer timeout.Stop()
  88. // Track the timed-out but not-yet-answered requests separately. We want to
  89. // keep tracking which peers are busy (potentially overloaded), so removing
  90. // all trace of a timed out request is not good. We also can't just cancel
  91. // the pending request altogether as that would prevent a late response from
  92. // being delivered, thus never unblocking the peer.
  93. stales := make(map[string]*eth.Request)
  94. defer func() {
  95. // Abort all requests on sync cycle cancellation. The requests may still
  96. // be fulfilled by the remote side, but the dispatcher will not wait to
  97. // deliver them since nobody's going to be listening.
  98. for _, req := range stales {
  99. req.Close()
  100. }
  101. }()
  102. // Subscribe to peer lifecycle events to schedule tasks to new joiners and
  103. // reschedule tasks upon disconnections. We don't care which event happened
  104. // for simplicity, so just use a single channel.
  105. peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection
  106. peeringSub := d.peers.SubscribeEvents(peering)
  107. defer peeringSub.Unsubscribe()
  108. // Prepare the queue and fetch block parts until the block header fetcher's done
  109. finished := false
  110. for {
  111. // Short circuit if we lost all our peers
  112. if d.peers.Len() == 0 && !beaconMode {
  113. return errNoPeers
  114. }
  115. // If there's nothing more to fetch, wait or terminate
  116. if queue.pending() == 0 {
  117. if len(pending) == 0 && finished {
  118. return nil
  119. }
  120. } else {
  121. // Send a download request to all idle peers, until throttled
  122. var (
  123. idles []*peerConnection
  124. caps []int
  125. )
  126. for _, peer := range d.peers.AllPeers() {
  127. pending, stale := pending[peer.id], stales[peer.id]
  128. if pending == nil && stale == nil {
  129. idles = append(idles, peer)
  130. caps = append(caps, queue.capacity(peer, time.Second))
  131. } else if stale != nil {
  132. if waited := time.Since(stale.Sent); waited > timeoutGracePeriod {
  133. // Request has been in flight longer than the grace period
  134. // permitted it, consider the peer malicious attempting to
  135. // stall the sync.
  136. peer.log.Warn("Peer stalling, dropping", "waited", common.PrettyDuration(waited))
  137. d.dropPeer(peer.id)
  138. }
  139. }
  140. }
  141. sort.Sort(&peerCapacitySort{idles, caps})
  142. var (
  143. progressed bool
  144. throttled bool
  145. queued = queue.pending()
  146. )
  147. for _, peer := range idles {
  148. // Short circuit if throttling activated or there are no more
  149. // queued tasks to be retrieved
  150. if throttled {
  151. break
  152. }
  153. if queued = queue.pending(); queued == 0 {
  154. break
  155. }
  156. // Reserve a chunk of fetches for a peer. A nil can mean either that
  157. // no more headers are available, or that the peer is known not to
  158. // have them.
  159. request, progress, throttle := queue.reserve(peer, queue.capacity(peer, d.peers.rates.TargetRoundTrip()))
  160. if progress {
  161. progressed = true
  162. }
  163. if throttle {
  164. throttled = true
  165. throttleCounter.Inc(1)
  166. }
  167. if request == nil {
  168. continue
  169. }
  170. // Fetch the chunk and make sure any errors return the hashes to the queue
  171. req, err := queue.request(peer, request, responses)
  172. if err != nil {
  173. // Sending the request failed, which generally means the peer
  174. // was disconnected in between assignment and network send.
  175. // Although all peer removal operations return allocated tasks
  176. // to the queue, that is async, and we can do better here by
  177. // immediately pushing the unfulfilled requests.
  178. queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
  179. continue
  180. }
  181. pending[peer.id] = req
  182. ttl := d.peers.rates.TargetTimeout()
  183. ordering[req] = timeouts.Size()
  184. timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
  185. if timeouts.Size() == 1 {
  186. timeout.Reset(ttl)
  187. }
  188. }
  189. // Make sure that we have peers available for fetching. If all peers have been tried
  190. // and all failed throw an error
  191. if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
  192. return errPeersUnavailable
  193. }
  194. }
  195. // Wait for something to happen
  196. select {
  197. case <-d.cancelCh:
  198. // If sync was cancelled, tear down the parallel retriever. Pending
  199. // requests will be cancelled locally, and the remote responses will
  200. // be dropped when they arrive
  201. return errCanceled
  202. case event := <-peering:
  203. // A peer joined or left, the tasks queue and allocations need to be
  204. // checked for potential assignment or reassignment
  205. peerid := event.peer.id
  206. if event.join {
  207. // Sanity check the internal state; this can be dropped later
  208. if _, ok := pending[peerid]; ok {
  209. event.peer.log.Error("Pending request exists for joining peer")
  210. }
  211. if _, ok := stales[peerid]; ok {
  212. event.peer.log.Error("Stale request exists for joining peer")
  213. }
  214. // Loop back to the entry point for task assignment
  215. continue
  216. }
  217. // A peer left, any existing requests need to be untracked, pending
  218. // tasks returned and possible reassignment checked
  219. if req, ok := pending[peerid]; ok {
  220. queue.unreserve(peerid) // TODO(karalabe): This needs a non-expiration method
  221. delete(pending, peerid)
  222. req.Close()
  223. if index, live := ordering[req]; live {
  224. timeouts.Remove(index)
  225. if index == 0 {
  226. if !timeout.Stop() {
  227. <-timeout.C
  228. }
  229. if timeouts.Size() > 0 {
  230. _, exp := timeouts.Peek()
  231. timeout.Reset(time.Until(time.Unix(0, -exp)))
  232. }
  233. }
  234. delete(ordering, req)
  235. }
  236. }
  237. if req, ok := stales[peerid]; ok {
  238. delete(stales, peerid)
  239. req.Close()
  240. }
  241. case <-timeout.C:
  242. // Retrieve the next request which should have timed out. The check
  243. // below is purely for to catch programming errors, given the correct
  244. // code, there's no possible order of events that should result in a
  245. // timeout firing for a non-existent event.
  246. item, exp := timeouts.Peek()
  247. if now, at := time.Now(), time.Unix(0, -exp); now.Before(at) {
  248. log.Error("Timeout triggered but not reached", "left", at.Sub(now))
  249. timeout.Reset(at.Sub(now))
  250. continue
  251. }
  252. req := item.(*eth.Request)
  253. // Stop tracking the timed out request from a timing perspective,
  254. // cancel it, so it's not considered in-flight anymore, but keep
  255. // the peer marked busy to prevent assigning a second request and
  256. // overloading it further.
  257. delete(pending, req.Peer)
  258. stales[req.Peer] = req
  259. delete(ordering, req)
  260. timeouts.Pop()
  261. if timeouts.Size() > 0 {
  262. _, exp := timeouts.Peek()
  263. timeout.Reset(time.Until(time.Unix(0, -exp)))
  264. }
  265. // New timeout potentially set if there are more requests pending,
  266. // reschedule the failed one to a free peer
  267. fails := queue.unreserve(req.Peer)
  268. // Finally, update the peer's retrieval capacity, or if it's already
  269. // below the minimum allowance, drop the peer. If a lot of retrieval
  270. // elements expired, we might have overestimated the remote peer or
  271. // perhaps ourselves. Only reset to minimal throughput but don't drop
  272. // just yet.
  273. //
  274. // The reason the minimum threshold is 2 is that the downloader tries
  275. // to estimate the bandwidth and latency of a peer separately, which
  276. // requires pushing the measured capacity a bit and seeing how response
  277. // times reacts, to it always requests one more than the minimum (i.e.
  278. // min 2).
  279. peer := d.peers.Peer(req.Peer)
  280. if peer == nil {
  281. // If the peer got disconnected in between, we should really have
  282. // short-circuited it already. Just in case there's some strange
  283. // codepath, leave this check in not to crash.
  284. log.Error("Delivery timeout from unknown peer", "peer", req.Peer)
  285. continue
  286. }
  287. if fails > 2 {
  288. queue.updateCapacity(peer, 0, 0)
  289. } else {
  290. d.dropPeer(peer.id)
  291. // If this peer was the master peer, abort sync immediately
  292. d.cancelLock.RLock()
  293. master := peer.id == d.cancelPeer
  294. d.cancelLock.RUnlock()
  295. if master {
  296. d.cancel()
  297. return errTimeout
  298. }
  299. }
  300. case res := <-responses:
  301. // Response arrived, it may be for an existing or an already timed
  302. // out request. If the former, update the timeout heap and perhaps
  303. // reschedule the timeout timer.
  304. index, live := ordering[res.Req]
  305. if live {
  306. timeouts.Remove(index)
  307. if index == 0 {
  308. if !timeout.Stop() {
  309. <-timeout.C
  310. }
  311. if timeouts.Size() > 0 {
  312. _, exp := timeouts.Peek()
  313. timeout.Reset(time.Until(time.Unix(0, -exp)))
  314. }
  315. }
  316. delete(ordering, res.Req)
  317. }
  318. // Delete the pending request (if it still exists) and mark the peer idle
  319. delete(pending, res.Req.Peer)
  320. delete(stales, res.Req.Peer)
  321. // Signal the dispatcher that the round trip is done. We'll drop the
  322. // peer if the data turns out to be junk.
  323. res.Done <- nil
  324. res.Req.Close()
  325. // If the peer was previously banned and failed to deliver its pack
  326. // in a reasonable time frame, ignore its message.
  327. if peer := d.peers.Peer(res.Req.Peer); peer != nil {
  328. // Deliver the received chunk of data and check chain validity
  329. accepted, err := queue.deliver(peer, res)
  330. if errors.Is(err, errInvalidChain) {
  331. return err
  332. }
  333. // Unless a peer delivered something completely else than requested (usually
  334. // caused by a timed out request which came through in the end), set it to
  335. // idle. If the delivery's stale, the peer should have already been idled.
  336. if !errors.Is(err, errStaleDelivery) {
  337. queue.updateCapacity(peer, accepted, res.Time)
  338. }
  339. }
  340. case cont := <-queue.waker():
  341. // The header fetcher sent a continuation flag, check if it's done
  342. if !cont {
  343. finished = true
  344. }
  345. }
  346. }
  347. }