queue.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the block download scheduler to collect download tasks and schedule
  17. // them in an ordered, and throttled way.
  18. package downloader
  19. import (
  20. "errors"
  21. "fmt"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/prque"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. )
  30. var (
  31. blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download
  32. blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
  33. blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
  34. )
  35. var (
  36. errNoFetchesPending = errors.New("no fetches pending")
  37. errStaleDelivery = errors.New("stale delivery")
  38. )
  39. // fetchRequest is a currently running data retrieval operation.
  40. type fetchRequest struct {
  41. Peer *peerConnection // Peer to which the request was sent
  42. From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
  43. Headers []*types.Header // [eth/62] Requested headers, sorted by request order
  44. Time time.Time // Time when the request was made
  45. }
  46. // fetchResult is a struct collecting partial results from data fetchers until
  47. // all outstanding pieces complete and the result as a whole can be processed.
  48. type fetchResult struct {
  49. Pending int // Number of data fetches still pending
  50. Hash common.Hash // Hash of the header to prevent recalculating
  51. Header *types.Header
  52. Uncles []*types.Header
  53. Transactions types.Transactions
  54. Receipts types.Receipts
  55. }
  56. // queue represents hashes that are either need fetching or are being fetched
  57. type queue struct {
  58. mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
  59. // Headers are "special", they download in batches, supported by a skeleton chain
  60. headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
  61. headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
  62. headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
  63. headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
  64. headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
  65. headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
  66. headerProced int // [eth/62] Number of headers already processed from the results
  67. headerOffset uint64 // [eth/62] Number of the first header in the result cache
  68. headerContCh chan bool // [eth/62] Channel to notify when header download finishes
  69. // All data retrievals below are based on an already assembles header chain
  70. blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
  71. blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
  72. blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
  73. blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
  74. receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
  75. receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
  76. receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
  77. receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
  78. resultCache []*fetchResult // Downloaded but not yet delivered fetch results
  79. resultOffset uint64 // Offset of the first cached fetch result in the block chain
  80. resultSize common.StorageSize // Approximate size of a block (exponential moving average)
  81. lock *sync.Mutex
  82. active *sync.Cond
  83. closed bool
  84. }
  85. // newQueue creates a new download queue for scheduling block retrieval.
  86. func newQueue() *queue {
  87. lock := new(sync.Mutex)
  88. return &queue{
  89. headerPendPool: make(map[string]*fetchRequest),
  90. headerContCh: make(chan bool),
  91. blockTaskPool: make(map[common.Hash]*types.Header),
  92. blockTaskQueue: prque.New(nil),
  93. blockPendPool: make(map[string]*fetchRequest),
  94. blockDonePool: make(map[common.Hash]struct{}),
  95. receiptTaskPool: make(map[common.Hash]*types.Header),
  96. receiptTaskQueue: prque.New(nil),
  97. receiptPendPool: make(map[string]*fetchRequest),
  98. receiptDonePool: make(map[common.Hash]struct{}),
  99. resultCache: make([]*fetchResult, blockCacheItems),
  100. active: sync.NewCond(lock),
  101. lock: lock,
  102. }
  103. }
  104. // Reset clears out the queue contents.
  105. func (q *queue) Reset() {
  106. q.lock.Lock()
  107. defer q.lock.Unlock()
  108. q.closed = false
  109. q.mode = FullSync
  110. q.headerHead = common.Hash{}
  111. q.headerPendPool = make(map[string]*fetchRequest)
  112. q.blockTaskPool = make(map[common.Hash]*types.Header)
  113. q.blockTaskQueue.Reset()
  114. q.blockPendPool = make(map[string]*fetchRequest)
  115. q.blockDonePool = make(map[common.Hash]struct{})
  116. q.receiptTaskPool = make(map[common.Hash]*types.Header)
  117. q.receiptTaskQueue.Reset()
  118. q.receiptPendPool = make(map[string]*fetchRequest)
  119. q.receiptDonePool = make(map[common.Hash]struct{})
  120. q.resultCache = make([]*fetchResult, blockCacheItems)
  121. q.resultOffset = 0
  122. }
  123. // Close marks the end of the sync, unblocking Results.
  124. // It may be called even if the queue is already closed.
  125. func (q *queue) Close() {
  126. q.lock.Lock()
  127. q.closed = true
  128. q.lock.Unlock()
  129. q.active.Broadcast()
  130. }
  131. // PendingHeaders retrieves the number of header requests pending for retrieval.
  132. func (q *queue) PendingHeaders() int {
  133. q.lock.Lock()
  134. defer q.lock.Unlock()
  135. return q.headerTaskQueue.Size()
  136. }
  137. // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
  138. func (q *queue) PendingBlocks() int {
  139. q.lock.Lock()
  140. defer q.lock.Unlock()
  141. return q.blockTaskQueue.Size()
  142. }
  143. // PendingReceipts retrieves the number of block receipts pending for retrieval.
  144. func (q *queue) PendingReceipts() int {
  145. q.lock.Lock()
  146. defer q.lock.Unlock()
  147. return q.receiptTaskQueue.Size()
  148. }
  149. // InFlightHeaders retrieves whether there are header fetch requests currently
  150. // in flight.
  151. func (q *queue) InFlightHeaders() bool {
  152. q.lock.Lock()
  153. defer q.lock.Unlock()
  154. return len(q.headerPendPool) > 0
  155. }
  156. // InFlightBlocks retrieves whether there are block fetch requests currently in
  157. // flight.
  158. func (q *queue) InFlightBlocks() bool {
  159. q.lock.Lock()
  160. defer q.lock.Unlock()
  161. return len(q.blockPendPool) > 0
  162. }
  163. // InFlightReceipts retrieves whether there are receipt fetch requests currently
  164. // in flight.
  165. func (q *queue) InFlightReceipts() bool {
  166. q.lock.Lock()
  167. defer q.lock.Unlock()
  168. return len(q.receiptPendPool) > 0
  169. }
  170. // Idle returns if the queue is fully idle or has some data still inside.
  171. func (q *queue) Idle() bool {
  172. q.lock.Lock()
  173. defer q.lock.Unlock()
  174. queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
  175. pending := len(q.blockPendPool) + len(q.receiptPendPool)
  176. cached := len(q.blockDonePool) + len(q.receiptDonePool)
  177. return (queued + pending + cached) == 0
  178. }
  179. // ShouldThrottleBlocks checks if the download should be throttled (active block (body)
  180. // fetches exceed block cache).
  181. func (q *queue) ShouldThrottleBlocks() bool {
  182. q.lock.Lock()
  183. defer q.lock.Unlock()
  184. return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0
  185. }
  186. // ShouldThrottleReceipts checks if the download should be throttled (active receipt
  187. // fetches exceed block cache).
  188. func (q *queue) ShouldThrottleReceipts() bool {
  189. q.lock.Lock()
  190. defer q.lock.Unlock()
  191. return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0
  192. }
  193. // resultSlots calculates the number of results slots available for requests
  194. // whilst adhering to both the item and the memory limits of the result cache.
  195. func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
  196. // Calculate the maximum length capped by the memory limit
  197. limit := len(q.resultCache)
  198. if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) {
  199. limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
  200. }
  201. // Calculate the number of slots already finished
  202. finished := 0
  203. for _, result := range q.resultCache[:limit] {
  204. if result == nil {
  205. break
  206. }
  207. if _, ok := donePool[result.Hash]; ok {
  208. finished++
  209. }
  210. }
  211. // Calculate the number of slots currently downloading
  212. pending := 0
  213. for _, request := range pendPool {
  214. for _, header := range request.Headers {
  215. if header.Number.Uint64() < q.resultOffset+uint64(limit) {
  216. pending++
  217. }
  218. }
  219. }
  220. // Return the free slots to distribute
  221. return limit - finished - pending
  222. }
  223. // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
  224. // up an already retrieved header skeleton.
  225. func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
  226. q.lock.Lock()
  227. defer q.lock.Unlock()
  228. // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
  229. if q.headerResults != nil {
  230. panic("skeleton assembly already in progress")
  231. }
  232. // Schedule all the header retrieval tasks for the skeleton assembly
  233. q.headerTaskPool = make(map[uint64]*types.Header)
  234. q.headerTaskQueue = prque.New(nil)
  235. q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
  236. q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
  237. q.headerProced = 0
  238. q.headerOffset = from
  239. q.headerContCh = make(chan bool, 1)
  240. for i, header := range skeleton {
  241. index := from + uint64(i*MaxHeaderFetch)
  242. q.headerTaskPool[index] = header
  243. q.headerTaskQueue.Push(index, -int64(index))
  244. }
  245. }
  246. // RetrieveHeaders retrieves the header chain assemble based on the scheduled
  247. // skeleton.
  248. func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
  249. q.lock.Lock()
  250. defer q.lock.Unlock()
  251. headers, proced := q.headerResults, q.headerProced
  252. q.headerResults, q.headerProced = nil, 0
  253. return headers, proced
  254. }
  255. // Schedule adds a set of headers for the download queue for scheduling, returning
  256. // the new headers encountered.
  257. func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
  258. q.lock.Lock()
  259. defer q.lock.Unlock()
  260. // Insert all the headers prioritised by the contained block number
  261. inserts := make([]*types.Header, 0, len(headers))
  262. for _, header := range headers {
  263. // Make sure chain order is honoured and preserved throughout
  264. hash := header.Hash()
  265. if header.Number == nil || header.Number.Uint64() != from {
  266. log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
  267. break
  268. }
  269. if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
  270. log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
  271. break
  272. }
  273. // Make sure no duplicate requests are executed
  274. if _, ok := q.blockTaskPool[hash]; ok {
  275. log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
  276. continue
  277. }
  278. if _, ok := q.receiptTaskPool[hash]; ok {
  279. log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
  280. continue
  281. }
  282. // Queue the header for content retrieval
  283. q.blockTaskPool[hash] = header
  284. q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
  285. if q.mode == FastSync {
  286. q.receiptTaskPool[hash] = header
  287. q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
  288. }
  289. inserts = append(inserts, header)
  290. q.headerHead = hash
  291. from++
  292. }
  293. return inserts
  294. }
  295. // Results retrieves and permanently removes a batch of fetch results from
  296. // the cache. The result slice will be empty if the queue has been closed.
  297. func (q *queue) Results(block bool) []*fetchResult {
  298. q.lock.Lock()
  299. defer q.lock.Unlock()
  300. // Count the number of items available for processing
  301. nproc := q.countProcessableItems()
  302. for nproc == 0 && !q.closed {
  303. if !block {
  304. return nil
  305. }
  306. q.active.Wait()
  307. nproc = q.countProcessableItems()
  308. }
  309. // Since we have a batch limit, don't pull more into "dangling" memory
  310. if nproc > maxResultsProcess {
  311. nproc = maxResultsProcess
  312. }
  313. results := make([]*fetchResult, nproc)
  314. copy(results, q.resultCache[:nproc])
  315. if len(results) > 0 {
  316. // Mark results as done before dropping them from the cache.
  317. for _, result := range results {
  318. hash := result.Header.Hash()
  319. delete(q.blockDonePool, hash)
  320. delete(q.receiptDonePool, hash)
  321. }
  322. // Delete the results from the cache and clear the tail.
  323. copy(q.resultCache, q.resultCache[nproc:])
  324. for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
  325. q.resultCache[i] = nil
  326. }
  327. // Advance the expected block number of the first cache entry.
  328. q.resultOffset += uint64(nproc)
  329. // Recalculate the result item weights to prevent memory exhaustion
  330. for _, result := range results {
  331. size := result.Header.Size()
  332. for _, uncle := range result.Uncles {
  333. size += uncle.Size()
  334. }
  335. for _, receipt := range result.Receipts {
  336. size += receipt.Size()
  337. }
  338. for _, tx := range result.Transactions {
  339. size += tx.Size()
  340. }
  341. q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
  342. }
  343. }
  344. return results
  345. }
  346. // countProcessableItems counts the processable items.
  347. func (q *queue) countProcessableItems() int {
  348. for i, result := range q.resultCache {
  349. if result == nil || result.Pending > 0 {
  350. return i
  351. }
  352. }
  353. return len(q.resultCache)
  354. }
  355. // ReserveHeaders reserves a set of headers for the given peer, skipping any
  356. // previously failed batches.
  357. func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
  358. q.lock.Lock()
  359. defer q.lock.Unlock()
  360. // Short circuit if the peer's already downloading something (sanity check to
  361. // not corrupt state)
  362. if _, ok := q.headerPendPool[p.id]; ok {
  363. return nil
  364. }
  365. // Retrieve a batch of hashes, skipping previously failed ones
  366. send, skip := uint64(0), []uint64{}
  367. for send == 0 && !q.headerTaskQueue.Empty() {
  368. from, _ := q.headerTaskQueue.Pop()
  369. if q.headerPeerMiss[p.id] != nil {
  370. if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
  371. skip = append(skip, from.(uint64))
  372. continue
  373. }
  374. }
  375. send = from.(uint64)
  376. }
  377. // Merge all the skipped batches back
  378. for _, from := range skip {
  379. q.headerTaskQueue.Push(from, -int64(from))
  380. }
  381. // Assemble and return the block download request
  382. if send == 0 {
  383. return nil
  384. }
  385. request := &fetchRequest{
  386. Peer: p,
  387. From: send,
  388. Time: time.Now(),
  389. }
  390. q.headerPendPool[p.id] = request
  391. return request
  392. }
  393. // ReserveBodies reserves a set of body fetches for the given peer, skipping any
  394. // previously failed downloads. Beside the next batch of needed fetches, it also
  395. // returns a flag whether empty blocks were queued requiring processing.
  396. func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
  397. isNoop := func(header *types.Header) bool {
  398. return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
  399. }
  400. q.lock.Lock()
  401. defer q.lock.Unlock()
  402. return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
  403. }
  404. // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
  405. // any previously failed downloads. Beside the next batch of needed fetches, it
  406. // also returns a flag whether empty receipts were queued requiring importing.
  407. func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
  408. isNoop := func(header *types.Header) bool {
  409. return header.ReceiptHash == types.EmptyRootHash
  410. }
  411. q.lock.Lock()
  412. defer q.lock.Unlock()
  413. return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
  414. }
  415. // reserveHeaders reserves a set of data download operations for a given peer,
  416. // skipping any previously failed ones. This method is a generic version used
  417. // by the individual special reservation functions.
  418. //
  419. // Note, this method expects the queue lock to be already held for writing. The
  420. // reason the lock is not obtained in here is because the parameters already need
  421. // to access the queue, so they already need a lock anyway.
  422. func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  423. pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
  424. // Short circuit if the pool has been depleted, or if the peer's already
  425. // downloading something (sanity check not to corrupt state)
  426. if taskQueue.Empty() {
  427. return nil, false, nil
  428. }
  429. if _, ok := pendPool[p.id]; ok {
  430. return nil, false, nil
  431. }
  432. // Calculate an upper limit on the items we might fetch (i.e. throttling)
  433. space := q.resultSlots(pendPool, donePool)
  434. // Retrieve a batch of tasks, skipping previously failed ones
  435. send := make([]*types.Header, 0, count)
  436. skip := make([]*types.Header, 0)
  437. progress := false
  438. for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
  439. header := taskQueue.PopItem().(*types.Header)
  440. hash := header.Hash()
  441. // If we're the first to request this task, initialise the result container
  442. index := int(header.Number.Int64() - int64(q.resultOffset))
  443. if index >= len(q.resultCache) || index < 0 {
  444. common.Report("index allocation went beyond available resultCache space")
  445. return nil, false, errInvalidChain
  446. }
  447. if q.resultCache[index] == nil {
  448. components := 1
  449. if q.mode == FastSync {
  450. components = 2
  451. }
  452. q.resultCache[index] = &fetchResult{
  453. Pending: components,
  454. Hash: hash,
  455. Header: header,
  456. }
  457. }
  458. // If this fetch task is a noop, skip this fetch operation
  459. if isNoop(header) {
  460. donePool[hash] = struct{}{}
  461. delete(taskPool, hash)
  462. space, proc = space-1, proc-1
  463. q.resultCache[index].Pending--
  464. progress = true
  465. continue
  466. }
  467. // Otherwise unless the peer is known not to have the data, add to the retrieve list
  468. if p.Lacks(hash) {
  469. skip = append(skip, header)
  470. } else {
  471. send = append(send, header)
  472. }
  473. }
  474. // Merge all the skipped headers back
  475. for _, header := range skip {
  476. taskQueue.Push(header, -int64(header.Number.Uint64()))
  477. }
  478. if progress {
  479. // Wake Results, resultCache was modified
  480. q.active.Signal()
  481. }
  482. // Assemble and return the block download request
  483. if len(send) == 0 {
  484. return nil, progress, nil
  485. }
  486. request := &fetchRequest{
  487. Peer: p,
  488. Headers: send,
  489. Time: time.Now(),
  490. }
  491. pendPool[p.id] = request
  492. return request, progress, nil
  493. }
  494. // CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
  495. func (q *queue) CancelHeaders(request *fetchRequest) {
  496. q.lock.Lock()
  497. defer q.lock.Unlock()
  498. q.cancel(request, q.headerTaskQueue, q.headerPendPool)
  499. }
  500. // CancelBodies aborts a body fetch request, returning all pending headers to the
  501. // task queue.
  502. func (q *queue) CancelBodies(request *fetchRequest) {
  503. q.lock.Lock()
  504. defer q.lock.Unlock()
  505. q.cancel(request, q.blockTaskQueue, q.blockPendPool)
  506. }
  507. // CancelReceipts aborts a body fetch request, returning all pending headers to
  508. // the task queue.
  509. func (q *queue) CancelReceipts(request *fetchRequest) {
  510. q.lock.Lock()
  511. defer q.lock.Unlock()
  512. q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
  513. }
  514. // Cancel aborts a fetch request, returning all pending hashes to the task queue.
  515. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
  516. if request.From > 0 {
  517. taskQueue.Push(request.From, -int64(request.From))
  518. }
  519. for _, header := range request.Headers {
  520. taskQueue.Push(header, -int64(header.Number.Uint64()))
  521. }
  522. delete(pendPool, request.Peer.id)
  523. }
  524. // Revoke cancels all pending requests belonging to a given peer. This method is
  525. // meant to be called during a peer drop to quickly reassign owned data fetches
  526. // to remaining nodes.
  527. func (q *queue) Revoke(peerID string) {
  528. q.lock.Lock()
  529. defer q.lock.Unlock()
  530. if request, ok := q.blockPendPool[peerID]; ok {
  531. for _, header := range request.Headers {
  532. q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
  533. }
  534. delete(q.blockPendPool, peerID)
  535. }
  536. if request, ok := q.receiptPendPool[peerID]; ok {
  537. for _, header := range request.Headers {
  538. q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
  539. }
  540. delete(q.receiptPendPool, peerID)
  541. }
  542. }
  543. // ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
  544. // canceling them and returning the responsible peers for penalisation.
  545. func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
  546. q.lock.Lock()
  547. defer q.lock.Unlock()
  548. return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
  549. }
  550. // ExpireBodies checks for in flight block body requests that exceeded a timeout
  551. // allowance, canceling them and returning the responsible peers for penalisation.
  552. func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
  553. q.lock.Lock()
  554. defer q.lock.Unlock()
  555. return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
  556. }
  557. // ExpireReceipts checks for in flight receipt requests that exceeded a timeout
  558. // allowance, canceling them and returning the responsible peers for penalisation.
  559. func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
  560. q.lock.Lock()
  561. defer q.lock.Unlock()
  562. return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
  563. }
  564. // expire is the generic check that move expired tasks from a pending pool back
  565. // into a task pool, returning all entities caught with expired tasks.
  566. //
  567. // Note, this method expects the queue lock to be already held. The
  568. // reason the lock is not obtained in here is because the parameters already need
  569. // to access the queue, so they already need a lock anyway.
  570. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
  571. // Iterate over the expired requests and return each to the queue
  572. expiries := make(map[string]int)
  573. for id, request := range pendPool {
  574. if time.Since(request.Time) > timeout {
  575. // Update the metrics with the timeout
  576. timeoutMeter.Mark(1)
  577. // Return any non satisfied requests to the pool
  578. if request.From > 0 {
  579. taskQueue.Push(request.From, -int64(request.From))
  580. }
  581. for _, header := range request.Headers {
  582. taskQueue.Push(header, -int64(header.Number.Uint64()))
  583. }
  584. // Add the peer to the expiry report along the number of failed requests
  585. expiries[id] = len(request.Headers)
  586. // Remove the expired requests from the pending pool directly
  587. delete(pendPool, id)
  588. }
  589. }
  590. return expiries
  591. }
  592. // DeliverHeaders injects a header retrieval response into the header results
  593. // cache. This method either accepts all headers it received, or none of them
  594. // if they do not map correctly to the skeleton.
  595. //
  596. // If the headers are accepted, the method makes an attempt to deliver the set
  597. // of ready headers to the processor to keep the pipeline full. However it will
  598. // not block to prevent stalling other pending deliveries.
  599. func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
  600. q.lock.Lock()
  601. defer q.lock.Unlock()
  602. // Short circuit if the data was never requested
  603. request := q.headerPendPool[id]
  604. if request == nil {
  605. return 0, errNoFetchesPending
  606. }
  607. headerReqTimer.UpdateSince(request.Time)
  608. delete(q.headerPendPool, id)
  609. // Ensure headers can be mapped onto the skeleton chain
  610. target := q.headerTaskPool[request.From].Hash()
  611. accepted := len(headers) == MaxHeaderFetch
  612. if accepted {
  613. if headers[0].Number.Uint64() != request.From {
  614. log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
  615. accepted = false
  616. } else if headers[len(headers)-1].Hash() != target {
  617. log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
  618. accepted = false
  619. }
  620. }
  621. if accepted {
  622. for i, header := range headers[1:] {
  623. hash := header.Hash()
  624. if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
  625. log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
  626. accepted = false
  627. break
  628. }
  629. if headers[i].Hash() != header.ParentHash {
  630. log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
  631. accepted = false
  632. break
  633. }
  634. }
  635. }
  636. // If the batch of headers wasn't accepted, mark as unavailable
  637. if !accepted {
  638. log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)
  639. miss := q.headerPeerMiss[id]
  640. if miss == nil {
  641. q.headerPeerMiss[id] = make(map[uint64]struct{})
  642. miss = q.headerPeerMiss[id]
  643. }
  644. miss[request.From] = struct{}{}
  645. q.headerTaskQueue.Push(request.From, -int64(request.From))
  646. return 0, errors.New("delivery not accepted")
  647. }
  648. // Clean up a successful fetch and try to deliver any sub-results
  649. copy(q.headerResults[request.From-q.headerOffset:], headers)
  650. delete(q.headerTaskPool, request.From)
  651. ready := 0
  652. for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
  653. ready += MaxHeaderFetch
  654. }
  655. if ready > 0 {
  656. // Headers are ready for delivery, gather them and push forward (non blocking)
  657. process := make([]*types.Header, ready)
  658. copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
  659. select {
  660. case headerProcCh <- process:
  661. log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
  662. q.headerProced += len(process)
  663. default:
  664. }
  665. }
  666. // Check for termination and return
  667. if len(q.headerTaskPool) == 0 {
  668. q.headerContCh <- false
  669. }
  670. return len(headers), nil
  671. }
  672. // DeliverBodies injects a block body retrieval response into the results queue.
  673. // The method returns the number of blocks bodies accepted from the delivery and
  674. // also wakes any threads waiting for data delivery.
  675. func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
  676. q.lock.Lock()
  677. defer q.lock.Unlock()
  678. reconstruct := func(header *types.Header, index int, result *fetchResult) error {
  679. if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
  680. return errInvalidBody
  681. }
  682. result.Transactions = txLists[index]
  683. result.Uncles = uncleLists[index]
  684. return nil
  685. }
  686. return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
  687. }
  688. // DeliverReceipts injects a receipt retrieval response into the results queue.
  689. // The method returns the number of transaction receipts accepted from the delivery
  690. // and also wakes any threads waiting for data delivery.
  691. func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
  692. q.lock.Lock()
  693. defer q.lock.Unlock()
  694. reconstruct := func(header *types.Header, index int, result *fetchResult) error {
  695. if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
  696. return errInvalidReceipt
  697. }
  698. result.Receipts = receiptList[index]
  699. return nil
  700. }
  701. return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
  702. }
  703. // deliver injects a data retrieval response into the results queue.
  704. //
  705. // Note, this method expects the queue lock to be already held for writing. The
  706. // reason the lock is not obtained in here is because the parameters already need
  707. // to access the queue, so they already need a lock anyway.
  708. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  709. pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
  710. results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
  711. // Short circuit if the data was never requested
  712. request := pendPool[id]
  713. if request == nil {
  714. return 0, errNoFetchesPending
  715. }
  716. reqTimer.UpdateSince(request.Time)
  717. delete(pendPool, id)
  718. // If no data items were retrieved, mark them as unavailable for the origin peer
  719. if results == 0 {
  720. for _, header := range request.Headers {
  721. request.Peer.MarkLacking(header.Hash())
  722. }
  723. }
  724. // Assemble each of the results with their headers and retrieved data parts
  725. var (
  726. accepted int
  727. failure error
  728. useful bool
  729. )
  730. for i, header := range request.Headers {
  731. // Short circuit assembly if no more fetch results are found
  732. if i >= results {
  733. break
  734. }
  735. // Reconstruct the next result if contents match up
  736. index := int(header.Number.Int64() - int64(q.resultOffset))
  737. if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
  738. failure = errInvalidChain
  739. break
  740. }
  741. if err := reconstruct(header, i, q.resultCache[index]); err != nil {
  742. failure = err
  743. break
  744. }
  745. hash := header.Hash()
  746. donePool[hash] = struct{}{}
  747. q.resultCache[index].Pending--
  748. useful = true
  749. accepted++
  750. // Clean up a successful fetch
  751. request.Headers[i] = nil
  752. delete(taskPool, hash)
  753. }
  754. // Return all failed or missing fetches to the queue
  755. for _, header := range request.Headers {
  756. if header != nil {
  757. taskQueue.Push(header, -int64(header.Number.Uint64()))
  758. }
  759. }
  760. // Wake up Results
  761. if accepted > 0 {
  762. q.active.Signal()
  763. }
  764. // If none of the data was good, it's a stale delivery
  765. switch {
  766. case failure == nil || failure == errInvalidChain:
  767. return accepted, failure
  768. case useful:
  769. return accepted, fmt.Errorf("partial failure: %v", failure)
  770. default:
  771. return accepted, errStaleDelivery
  772. }
  773. }
  774. // Prepare configures the result cache to allow accepting and caching inbound
  775. // fetch results.
  776. func (q *queue) Prepare(offset uint64, mode SyncMode) {
  777. q.lock.Lock()
  778. defer q.lock.Unlock()
  779. // Prepare the queue for sync results
  780. if q.resultOffset < offset {
  781. q.resultOffset = offset
  782. }
  783. q.mode = mode
  784. }