queue.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the block download scheduler to collect download tasks and schedule
  17. // them in an ordered, and throttled way.
  18. package downloader
  19. import (
  20. "errors"
  21. "fmt"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/prque"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. )
  31. const (
  32. bodyType = uint(0)
  33. receiptType = uint(1)
  34. )
  35. var (
  36. blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download
  37. blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks
  38. blockCacheMemory = 256 * 1024 * 1024 // Maximum amount of memory to use for block caching
  39. blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
  40. )
  41. var (
  42. errNoFetchesPending = errors.New("no fetches pending")
  43. errStaleDelivery = errors.New("stale delivery")
  44. )
  45. // fetchRequest is a currently running data retrieval operation.
  46. type fetchRequest struct {
  47. Peer *peerConnection // Peer to which the request was sent
  48. From uint64 // Requested chain element index (used for skeleton fills only)
  49. Headers []*types.Header // Requested headers, sorted by request order
  50. Time time.Time // Time when the request was made
  51. }
  52. // fetchResult is a struct collecting partial results from data fetchers until
  53. // all outstanding pieces complete and the result as a whole can be processed.
  54. type fetchResult struct {
  55. pending int32 // Flag telling what deliveries are outstanding
  56. Header *types.Header
  57. Uncles []*types.Header
  58. Transactions types.Transactions
  59. Receipts types.Receipts
  60. }
  61. func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
  62. item := &fetchResult{
  63. Header: header,
  64. }
  65. if !header.EmptyBody() {
  66. item.pending |= (1 << bodyType)
  67. }
  68. if fastSync && !header.EmptyReceipts() {
  69. item.pending |= (1 << receiptType)
  70. }
  71. return item
  72. }
  73. // SetBodyDone flags the body as finished.
  74. func (f *fetchResult) SetBodyDone() {
  75. if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 {
  76. atomic.AddInt32(&f.pending, -1)
  77. }
  78. }
  79. // AllDone checks if item is done.
  80. func (f *fetchResult) AllDone() bool {
  81. return atomic.LoadInt32(&f.pending) == 0
  82. }
  83. // SetReceiptsDone flags the receipts as finished.
  84. func (f *fetchResult) SetReceiptsDone() {
  85. if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 {
  86. atomic.AddInt32(&f.pending, -2)
  87. }
  88. }
  89. // Done checks if the given type is done already
  90. func (f *fetchResult) Done(kind uint) bool {
  91. v := atomic.LoadInt32(&f.pending)
  92. return v&(1<<kind) == 0
  93. }
  94. // queue represents hashes that are either need fetching or are being fetched
  95. type queue struct {
  96. mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
  97. // Headers are "special", they download in batches, supported by a skeleton chain
  98. headerHead common.Hash // Hash of the last queued header to verify order
  99. headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
  100. headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
  101. headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
  102. headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
  103. headerResults []*types.Header // Result cache accumulating the completed headers
  104. headerHashes []common.Hash // Result cache accumulating the completed header hashes
  105. headerProced int // Number of headers already processed from the results
  106. headerOffset uint64 // Number of the first header in the result cache
  107. headerContCh chan bool // Channel to notify when header download finishes
  108. // All data retrievals below are based on an already assembles header chain
  109. blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
  110. blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
  111. blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
  112. blockWakeCh chan bool // Channel to notify the block fetcher of new tasks
  113. receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
  114. receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
  115. receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
  116. receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks
  117. resultCache *resultStore // Downloaded but not yet delivered fetch results
  118. resultSize common.StorageSize // Approximate size of a block (exponential moving average)
  119. lock *sync.RWMutex
  120. active *sync.Cond
  121. closed bool
  122. lastStatLog time.Time
  123. }
  124. // newQueue creates a new download queue for scheduling block retrieval.
  125. func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
  126. lock := new(sync.RWMutex)
  127. q := &queue{
  128. headerContCh: make(chan bool, 1),
  129. blockTaskQueue: prque.New(nil),
  130. blockWakeCh: make(chan bool, 1),
  131. receiptTaskQueue: prque.New(nil),
  132. receiptWakeCh: make(chan bool, 1),
  133. active: sync.NewCond(lock),
  134. lock: lock,
  135. }
  136. q.Reset(blockCacheLimit, thresholdInitialSize)
  137. return q
  138. }
  139. // Reset clears out the queue contents.
  140. func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
  141. q.lock.Lock()
  142. defer q.lock.Unlock()
  143. q.closed = false
  144. q.mode = FullSync
  145. q.headerHead = common.Hash{}
  146. q.headerPendPool = make(map[string]*fetchRequest)
  147. q.blockTaskPool = make(map[common.Hash]*types.Header)
  148. q.blockTaskQueue.Reset()
  149. q.blockPendPool = make(map[string]*fetchRequest)
  150. q.receiptTaskPool = make(map[common.Hash]*types.Header)
  151. q.receiptTaskQueue.Reset()
  152. q.receiptPendPool = make(map[string]*fetchRequest)
  153. q.resultCache = newResultStore(blockCacheLimit)
  154. q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
  155. }
  156. // Close marks the end of the sync, unblocking Results.
  157. // It may be called even if the queue is already closed.
  158. func (q *queue) Close() {
  159. q.lock.Lock()
  160. q.closed = true
  161. q.active.Signal()
  162. q.lock.Unlock()
  163. }
  164. // PendingHeaders retrieves the number of header requests pending for retrieval.
  165. func (q *queue) PendingHeaders() int {
  166. q.lock.Lock()
  167. defer q.lock.Unlock()
  168. return q.headerTaskQueue.Size()
  169. }
  170. // PendingBodies retrieves the number of block body requests pending for retrieval.
  171. func (q *queue) PendingBodies() int {
  172. q.lock.Lock()
  173. defer q.lock.Unlock()
  174. return q.blockTaskQueue.Size()
  175. }
  176. // PendingReceipts retrieves the number of block receipts pending for retrieval.
  177. func (q *queue) PendingReceipts() int {
  178. q.lock.Lock()
  179. defer q.lock.Unlock()
  180. return q.receiptTaskQueue.Size()
  181. }
  182. // InFlightBlocks retrieves whether there are block fetch requests currently in
  183. // flight.
  184. func (q *queue) InFlightBlocks() bool {
  185. q.lock.Lock()
  186. defer q.lock.Unlock()
  187. return len(q.blockPendPool) > 0
  188. }
  189. // InFlightReceipts retrieves whether there are receipt fetch requests currently
  190. // in flight.
  191. func (q *queue) InFlightReceipts() bool {
  192. q.lock.Lock()
  193. defer q.lock.Unlock()
  194. return len(q.receiptPendPool) > 0
  195. }
  196. // Idle returns if the queue is fully idle or has some data still inside.
  197. func (q *queue) Idle() bool {
  198. q.lock.Lock()
  199. defer q.lock.Unlock()
  200. queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
  201. pending := len(q.blockPendPool) + len(q.receiptPendPool)
  202. return (queued + pending) == 0
  203. }
  204. // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
  205. // up an already retrieved header skeleton.
  206. func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
  207. q.lock.Lock()
  208. defer q.lock.Unlock()
  209. // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
  210. if q.headerResults != nil {
  211. panic("skeleton assembly already in progress")
  212. }
  213. // Schedule all the header retrieval tasks for the skeleton assembly
  214. q.headerTaskPool = make(map[uint64]*types.Header)
  215. q.headerTaskQueue = prque.New(nil)
  216. q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
  217. q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
  218. q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch)
  219. q.headerProced = 0
  220. q.headerOffset = from
  221. q.headerContCh = make(chan bool, 1)
  222. for i, header := range skeleton {
  223. index := from + uint64(i*MaxHeaderFetch)
  224. q.headerTaskPool[index] = header
  225. q.headerTaskQueue.Push(index, -int64(index))
  226. }
  227. }
  228. // RetrieveHeaders retrieves the header chain assemble based on the scheduled
  229. // skeleton.
  230. func (q *queue) RetrieveHeaders() ([]*types.Header, []common.Hash, int) {
  231. q.lock.Lock()
  232. defer q.lock.Unlock()
  233. headers, hashes, proced := q.headerResults, q.headerHashes, q.headerProced
  234. q.headerResults, q.headerHashes, q.headerProced = nil, nil, 0
  235. return headers, hashes, proced
  236. }
  237. // Schedule adds a set of headers for the download queue for scheduling, returning
  238. // the new headers encountered.
  239. func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) []*types.Header {
  240. q.lock.Lock()
  241. defer q.lock.Unlock()
  242. // Insert all the headers prioritised by the contained block number
  243. inserts := make([]*types.Header, 0, len(headers))
  244. for i, header := range headers {
  245. // Make sure chain order is honoured and preserved throughout
  246. hash := hashes[i]
  247. if header.Number == nil || header.Number.Uint64() != from {
  248. log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
  249. break
  250. }
  251. if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
  252. log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
  253. break
  254. }
  255. // Make sure no duplicate requests are executed
  256. // We cannot skip this, even if the block is empty, since this is
  257. // what triggers the fetchResult creation.
  258. if _, ok := q.blockTaskPool[hash]; ok {
  259. log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
  260. } else {
  261. q.blockTaskPool[hash] = header
  262. q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
  263. }
  264. // Queue for receipt retrieval
  265. if q.mode == SnapSync && !header.EmptyReceipts() {
  266. if _, ok := q.receiptTaskPool[hash]; ok {
  267. log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
  268. } else {
  269. q.receiptTaskPool[hash] = header
  270. q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
  271. }
  272. }
  273. inserts = append(inserts, header)
  274. q.headerHead = hash
  275. from++
  276. }
  277. return inserts
  278. }
  279. // Results retrieves and permanently removes a batch of fetch results from
  280. // the cache. the result slice will be empty if the queue has been closed.
  281. // Results can be called concurrently with Deliver and Schedule,
  282. // but assumes that there are not two simultaneous callers to Results
  283. func (q *queue) Results(block bool) []*fetchResult {
  284. // Abort early if there are no items and non-blocking requested
  285. if !block && !q.resultCache.HasCompletedItems() {
  286. return nil
  287. }
  288. closed := false
  289. for !closed && !q.resultCache.HasCompletedItems() {
  290. // In order to wait on 'active', we need to obtain the lock.
  291. // That may take a while, if someone is delivering at the same
  292. // time, so after obtaining the lock, we check again if there
  293. // are any results to fetch.
  294. // Also, in-between we ask for the lock and the lock is obtained,
  295. // someone can have closed the queue. In that case, we should
  296. // return the available results and stop blocking
  297. q.lock.Lock()
  298. if q.resultCache.HasCompletedItems() || q.closed {
  299. q.lock.Unlock()
  300. break
  301. }
  302. // No items available, and not closed
  303. q.active.Wait()
  304. closed = q.closed
  305. q.lock.Unlock()
  306. }
  307. // Regardless if closed or not, we can still deliver whatever we have
  308. results := q.resultCache.GetCompleted(maxResultsProcess)
  309. for _, result := range results {
  310. // Recalculate the result item weights to prevent memory exhaustion
  311. size := result.Header.Size()
  312. for _, uncle := range result.Uncles {
  313. size += uncle.Size()
  314. }
  315. for _, receipt := range result.Receipts {
  316. size += receipt.Size()
  317. }
  318. for _, tx := range result.Transactions {
  319. size += tx.Size()
  320. }
  321. q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
  322. (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
  323. }
  324. // Using the newly calibrated resultsize, figure out the new throttle limit
  325. // on the result cache
  326. throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
  327. throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
  328. // With results removed from the cache, wake throttled fetchers
  329. for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh} {
  330. select {
  331. case ch <- true:
  332. default:
  333. }
  334. }
  335. // Log some info at certain times
  336. if time.Since(q.lastStatLog) > 60*time.Second {
  337. q.lastStatLog = time.Now()
  338. info := q.Stats()
  339. info = append(info, "throttle", throttleThreshold)
  340. log.Info("Downloader queue stats", info...)
  341. }
  342. return results
  343. }
  344. func (q *queue) Stats() []interface{} {
  345. q.lock.RLock()
  346. defer q.lock.RUnlock()
  347. return q.stats()
  348. }
  349. func (q *queue) stats() []interface{} {
  350. return []interface{}{
  351. "receiptTasks", q.receiptTaskQueue.Size(),
  352. "blockTasks", q.blockTaskQueue.Size(),
  353. "itemSize", q.resultSize,
  354. }
  355. }
  356. // ReserveHeaders reserves a set of headers for the given peer, skipping any
  357. // previously failed batches.
  358. func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
  359. q.lock.Lock()
  360. defer q.lock.Unlock()
  361. // Short circuit if the peer's already downloading something (sanity check to
  362. // not corrupt state)
  363. if _, ok := q.headerPendPool[p.id]; ok {
  364. return nil
  365. }
  366. // Retrieve a batch of hashes, skipping previously failed ones
  367. send, skip := uint64(0), []uint64{}
  368. for send == 0 && !q.headerTaskQueue.Empty() {
  369. from, _ := q.headerTaskQueue.Pop()
  370. if q.headerPeerMiss[p.id] != nil {
  371. if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
  372. skip = append(skip, from.(uint64))
  373. continue
  374. }
  375. }
  376. send = from.(uint64)
  377. }
  378. // Merge all the skipped batches back
  379. for _, from := range skip {
  380. q.headerTaskQueue.Push(from, -int64(from))
  381. }
  382. // Assemble and return the block download request
  383. if send == 0 {
  384. return nil
  385. }
  386. request := &fetchRequest{
  387. Peer: p,
  388. From: send,
  389. Time: time.Now(),
  390. }
  391. q.headerPendPool[p.id] = request
  392. return request
  393. }
  394. // ReserveBodies reserves a set of body fetches for the given peer, skipping any
  395. // previously failed downloads. Beside the next batch of needed fetches, it also
  396. // returns a flag whether empty blocks were queued requiring processing.
  397. func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, bool) {
  398. q.lock.Lock()
  399. defer q.lock.Unlock()
  400. return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, bodyType)
  401. }
  402. // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
  403. // any previously failed downloads. Beside the next batch of needed fetches, it
  404. // also returns a flag whether empty receipts were queued requiring importing.
  405. func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, bool) {
  406. q.lock.Lock()
  407. defer q.lock.Unlock()
  408. return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType)
  409. }
  410. // reserveHeaders reserves a set of data download operations for a given peer,
  411. // skipping any previously failed ones. This method is a generic version used
  412. // by the individual special reservation functions.
  413. //
  414. // Note, this method expects the queue lock to be already held for writing. The
  415. // reason the lock is not obtained in here is because the parameters already need
  416. // to access the queue, so they already need a lock anyway.
  417. //
  418. // Returns:
  419. // item - the fetchRequest
  420. // progress - whether any progress was made
  421. // throttle - if the caller should throttle for a while
  422. func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  423. pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
  424. // Short circuit if the pool has been depleted, or if the peer's already
  425. // downloading something (sanity check not to corrupt state)
  426. if taskQueue.Empty() {
  427. return nil, false, true
  428. }
  429. if _, ok := pendPool[p.id]; ok {
  430. return nil, false, false
  431. }
  432. // Retrieve a batch of tasks, skipping previously failed ones
  433. send := make([]*types.Header, 0, count)
  434. skip := make([]*types.Header, 0)
  435. progress := false
  436. throttled := false
  437. for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
  438. // the task queue will pop items in order, so the highest prio block
  439. // is also the lowest block number.
  440. h, _ := taskQueue.Peek()
  441. header := h.(*types.Header)
  442. // we can ask the resultcache if this header is within the
  443. // "prioritized" segment of blocks. If it is not, we need to throttle
  444. stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == SnapSync)
  445. if stale {
  446. // Don't put back in the task queue, this item has already been
  447. // delivered upstream
  448. taskQueue.PopItem()
  449. progress = true
  450. delete(taskPool, header.Hash())
  451. proc = proc - 1
  452. log.Error("Fetch reservation already delivered", "number", header.Number.Uint64())
  453. continue
  454. }
  455. if throttle {
  456. // There are no resultslots available. Leave it in the task queue
  457. // However, if there are any left as 'skipped', we should not tell
  458. // the caller to throttle, since we still want some other
  459. // peer to fetch those for us
  460. throttled = len(skip) == 0
  461. break
  462. }
  463. if err != nil {
  464. // this most definitely should _not_ happen
  465. log.Warn("Failed to reserve headers", "err", err)
  466. // There are no resultslots available. Leave it in the task queue
  467. break
  468. }
  469. if item.Done(kind) {
  470. // If it's a noop, we can skip this task
  471. delete(taskPool, header.Hash())
  472. taskQueue.PopItem()
  473. proc = proc - 1
  474. progress = true
  475. continue
  476. }
  477. // Remove it from the task queue
  478. taskQueue.PopItem()
  479. // Otherwise unless the peer is known not to have the data, add to the retrieve list
  480. if p.Lacks(header.Hash()) {
  481. skip = append(skip, header)
  482. } else {
  483. send = append(send, header)
  484. }
  485. }
  486. // Merge all the skipped headers back
  487. for _, header := range skip {
  488. taskQueue.Push(header, -int64(header.Number.Uint64()))
  489. }
  490. if q.resultCache.HasCompletedItems() {
  491. // Wake Results, resultCache was modified
  492. q.active.Signal()
  493. }
  494. // Assemble and return the block download request
  495. if len(send) == 0 {
  496. return nil, progress, throttled
  497. }
  498. request := &fetchRequest{
  499. Peer: p,
  500. Headers: send,
  501. Time: time.Now(),
  502. }
  503. pendPool[p.id] = request
  504. return request, progress, throttled
  505. }
  506. // Revoke cancels all pending requests belonging to a given peer. This method is
  507. // meant to be called during a peer drop to quickly reassign owned data fetches
  508. // to remaining nodes.
  509. func (q *queue) Revoke(peerID string) {
  510. q.lock.Lock()
  511. defer q.lock.Unlock()
  512. if request, ok := q.headerPendPool[peerID]; ok {
  513. q.headerTaskQueue.Push(request.From, -int64(request.From))
  514. delete(q.headerPendPool, peerID)
  515. }
  516. if request, ok := q.blockPendPool[peerID]; ok {
  517. for _, header := range request.Headers {
  518. q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
  519. }
  520. delete(q.blockPendPool, peerID)
  521. }
  522. if request, ok := q.receiptPendPool[peerID]; ok {
  523. for _, header := range request.Headers {
  524. q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
  525. }
  526. delete(q.receiptPendPool, peerID)
  527. }
  528. }
  529. // ExpireHeaders cancels a request that timed out and moves the pending fetch
  530. // task back into the queue for rescheduling.
  531. func (q *queue) ExpireHeaders(peer string) int {
  532. q.lock.Lock()
  533. defer q.lock.Unlock()
  534. headerTimeoutMeter.Mark(1)
  535. return q.expire(peer, q.headerPendPool, q.headerTaskQueue)
  536. }
  537. // ExpireBodies checks for in flight block body requests that exceeded a timeout
  538. // allowance, canceling them and returning the responsible peers for penalisation.
  539. func (q *queue) ExpireBodies(peer string) int {
  540. q.lock.Lock()
  541. defer q.lock.Unlock()
  542. bodyTimeoutMeter.Mark(1)
  543. return q.expire(peer, q.blockPendPool, q.blockTaskQueue)
  544. }
  545. // ExpireReceipts checks for in flight receipt requests that exceeded a timeout
  546. // allowance, canceling them and returning the responsible peers for penalisation.
  547. func (q *queue) ExpireReceipts(peer string) int {
  548. q.lock.Lock()
  549. defer q.lock.Unlock()
  550. receiptTimeoutMeter.Mark(1)
  551. return q.expire(peer, q.receiptPendPool, q.receiptTaskQueue)
  552. }
  553. // expire is the generic check that moves a specific expired task from a pending
  554. // pool back into a task pool.
  555. //
  556. // Note, this method expects the queue lock to be already held. The reason the
  557. // lock is not obtained in here is that the parameters already need to access
  558. // the queue, so they already need a lock anyway.
  559. func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) int {
  560. // Retrieve the request being expired and log an error if it's non-existnet,
  561. // as there's no order of events that should lead to such expirations.
  562. req := pendPool[peer]
  563. if req == nil {
  564. log.Error("Expired request does not exist", "peer", peer)
  565. return 0
  566. }
  567. delete(pendPool, peer)
  568. // Return any non-satisfied requests to the pool
  569. if req.From > 0 {
  570. taskQueue.Push(req.From, -int64(req.From))
  571. }
  572. for _, header := range req.Headers {
  573. taskQueue.Push(header, -int64(header.Number.Uint64()))
  574. }
  575. return len(req.Headers)
  576. }
  577. // DeliverHeaders injects a header retrieval response into the header results
  578. // cache. This method either accepts all headers it received, or none of them
  579. // if they do not map correctly to the skeleton.
  580. //
  581. // If the headers are accepted, the method makes an attempt to deliver the set
  582. // of ready headers to the processor to keep the pipeline full. However, it will
  583. // not block to prevent stalling other pending deliveries.
  584. func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []common.Hash, headerProcCh chan *headerTask) (int, error) {
  585. q.lock.Lock()
  586. defer q.lock.Unlock()
  587. var logger log.Logger
  588. if len(id) < 16 {
  589. // Tests use short IDs, don't choke on them
  590. logger = log.New("peer", id)
  591. } else {
  592. logger = log.New("peer", id[:16])
  593. }
  594. // Short circuit if the data was never requested
  595. request := q.headerPendPool[id]
  596. if request == nil {
  597. headerDropMeter.Mark(int64(len(headers)))
  598. return 0, errNoFetchesPending
  599. }
  600. delete(q.headerPendPool, id)
  601. headerReqTimer.UpdateSince(request.Time)
  602. headerInMeter.Mark(int64(len(headers)))
  603. // Ensure headers can be mapped onto the skeleton chain
  604. target := q.headerTaskPool[request.From].Hash()
  605. accepted := len(headers) == MaxHeaderFetch
  606. if accepted {
  607. if headers[0].Number.Uint64() != request.From {
  608. logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", hashes[0], "expected", request.From)
  609. accepted = false
  610. } else if hashes[len(headers)-1] != target {
  611. logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", hashes[len(headers)-1], "expected", target)
  612. accepted = false
  613. }
  614. }
  615. if accepted {
  616. parentHash := hashes[0]
  617. for i, header := range headers[1:] {
  618. hash := hashes[i+1]
  619. if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
  620. logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want)
  621. accepted = false
  622. break
  623. }
  624. if parentHash != header.ParentHash {
  625. logger.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
  626. accepted = false
  627. break
  628. }
  629. // Set-up parent hash for next round
  630. parentHash = hash
  631. }
  632. }
  633. // If the batch of headers wasn't accepted, mark as unavailable
  634. if !accepted {
  635. logger.Trace("Skeleton filling not accepted", "from", request.From)
  636. headerDropMeter.Mark(int64(len(headers)))
  637. miss := q.headerPeerMiss[id]
  638. if miss == nil {
  639. q.headerPeerMiss[id] = make(map[uint64]struct{})
  640. miss = q.headerPeerMiss[id]
  641. }
  642. miss[request.From] = struct{}{}
  643. q.headerTaskQueue.Push(request.From, -int64(request.From))
  644. return 0, errors.New("delivery not accepted")
  645. }
  646. // Clean up a successful fetch and try to deliver any sub-results
  647. copy(q.headerResults[request.From-q.headerOffset:], headers)
  648. copy(q.headerHashes[request.From-q.headerOffset:], hashes)
  649. delete(q.headerTaskPool, request.From)
  650. ready := 0
  651. for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
  652. ready += MaxHeaderFetch
  653. }
  654. if ready > 0 {
  655. // Headers are ready for delivery, gather them and push forward (non blocking)
  656. processHeaders := make([]*types.Header, ready)
  657. copy(processHeaders, q.headerResults[q.headerProced:q.headerProced+ready])
  658. processHashes := make([]common.Hash, ready)
  659. copy(processHashes, q.headerHashes[q.headerProced:q.headerProced+ready])
  660. select {
  661. case headerProcCh <- &headerTask{
  662. headers: processHeaders,
  663. hashes: processHashes,
  664. }:
  665. logger.Trace("Pre-scheduled new headers", "count", len(processHeaders), "from", processHeaders[0].Number)
  666. q.headerProced += len(processHeaders)
  667. default:
  668. }
  669. }
  670. // Check for termination and return
  671. if len(q.headerTaskPool) == 0 {
  672. q.headerContCh <- false
  673. }
  674. return len(headers), nil
  675. }
  676. // DeliverBodies injects a block body retrieval response into the results queue.
  677. // The method returns the number of blocks bodies accepted from the delivery and
  678. // also wakes any threads waiting for data delivery.
  679. func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListHashes []common.Hash, uncleLists [][]*types.Header, uncleListHashes []common.Hash) (int, error) {
  680. q.lock.Lock()
  681. defer q.lock.Unlock()
  682. validate := func(index int, header *types.Header) error {
  683. if txListHashes[index] != header.TxHash {
  684. return errInvalidBody
  685. }
  686. if uncleListHashes[index] != header.UncleHash {
  687. return errInvalidBody
  688. }
  689. return nil
  690. }
  691. reconstruct := func(index int, result *fetchResult) {
  692. result.Transactions = txLists[index]
  693. result.Uncles = uncleLists[index]
  694. result.SetBodyDone()
  695. }
  696. return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
  697. bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct)
  698. }
  699. // DeliverReceipts injects a receipt retrieval response into the results queue.
  700. // The method returns the number of transaction receipts accepted from the delivery
  701. // and also wakes any threads waiting for data delivery.
  702. func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, receiptListHashes []common.Hash) (int, error) {
  703. q.lock.Lock()
  704. defer q.lock.Unlock()
  705. validate := func(index int, header *types.Header) error {
  706. if receiptListHashes[index] != header.ReceiptHash {
  707. return errInvalidReceipt
  708. }
  709. return nil
  710. }
  711. reconstruct := func(index int, result *fetchResult) {
  712. result.Receipts = receiptList[index]
  713. result.SetReceiptsDone()
  714. }
  715. return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
  716. receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
  717. }
  718. // deliver injects a data retrieval response into the results queue.
  719. //
  720. // Note, this method expects the queue lock to be already held for writing. The
  721. // reason this lock is not obtained in here is because the parameters already need
  722. // to access the queue, so they already need a lock anyway.
  723. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
  724. taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
  725. reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter,
  726. results int, validate func(index int, header *types.Header) error,
  727. reconstruct func(index int, result *fetchResult)) (int, error) {
  728. // Short circuit if the data was never requested
  729. request := pendPool[id]
  730. if request == nil {
  731. resDropMeter.Mark(int64(results))
  732. return 0, errNoFetchesPending
  733. }
  734. delete(pendPool, id)
  735. reqTimer.UpdateSince(request.Time)
  736. resInMeter.Mark(int64(results))
  737. // If no data items were retrieved, mark them as unavailable for the origin peer
  738. if results == 0 {
  739. for _, header := range request.Headers {
  740. request.Peer.MarkLacking(header.Hash())
  741. }
  742. }
  743. // Assemble each of the results with their headers and retrieved data parts
  744. var (
  745. accepted int
  746. failure error
  747. i int
  748. hashes []common.Hash
  749. )
  750. for _, header := range request.Headers {
  751. // Short circuit assembly if no more fetch results are found
  752. if i >= results {
  753. break
  754. }
  755. // Validate the fields
  756. if err := validate(i, header); err != nil {
  757. failure = err
  758. break
  759. }
  760. hashes = append(hashes, header.Hash())
  761. i++
  762. }
  763. for _, header := range request.Headers[:i] {
  764. if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
  765. reconstruct(accepted, res)
  766. } else {
  767. // else: between here and above, some other peer filled this result,
  768. // or it was indeed a no-op. This should not happen, but if it does it's
  769. // not something to panic about
  770. log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err)
  771. failure = errStaleDelivery
  772. }
  773. // Clean up a successful fetch
  774. delete(taskPool, hashes[accepted])
  775. accepted++
  776. }
  777. resDropMeter.Mark(int64(results - accepted))
  778. // Return all failed or missing fetches to the queue
  779. for _, header := range request.Headers[accepted:] {
  780. taskQueue.Push(header, -int64(header.Number.Uint64()))
  781. }
  782. // Wake up Results
  783. if accepted > 0 {
  784. q.active.Signal()
  785. }
  786. if failure == nil {
  787. return accepted, nil
  788. }
  789. // If none of the data was good, it's a stale delivery
  790. if accepted > 0 {
  791. return accepted, fmt.Errorf("partial failure: %v", failure)
  792. }
  793. return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
  794. }
  795. // Prepare configures the result cache to allow accepting and caching inbound
  796. // fetch results.
  797. func (q *queue) Prepare(offset uint64, mode SyncMode) {
  798. q.lock.Lock()
  799. defer q.lock.Unlock()
  800. // Prepare the queue for sync results
  801. q.resultCache.Prepare(offset)
  802. q.mode = mode
  803. }