queue.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the block download scheduler to collect download tasks and schedule
  17. // them in an ordered, and throttled way.
  18. package downloader
  19. import (
  20. "errors"
  21. "fmt"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/rcrowley/go-metrics"
  28. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  29. )
  30. var blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
  31. var (
  32. errNoFetchesPending = errors.New("no fetches pending")
  33. errStaleDelivery = errors.New("stale delivery")
  34. )
  35. // fetchRequest is a currently running data retrieval operation.
  36. type fetchRequest struct {
  37. Peer *peer // Peer to which the request was sent
  38. From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
  39. Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
  40. Headers []*types.Header // [eth/62] Requested headers, sorted by request order
  41. Time time.Time // Time when the request was made
  42. }
  43. // fetchResult is a struct collecting partial results from data fetchers until
  44. // all outstanding pieces complete and the result as a whole can be processed.
  45. type fetchResult struct {
  46. Pending int // Number of data fetches still pending
  47. Header *types.Header
  48. Uncles []*types.Header
  49. Transactions types.Transactions
  50. Receipts types.Receipts
  51. }
  52. // queue represents hashes that are either need fetching or are being fetched
  53. type queue struct {
  54. mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
  55. fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
  56. headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
  57. // Headers are "special", they download in batches, supported by a skeleton chain
  58. headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
  59. headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
  60. headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
  61. headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
  62. headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
  63. headerProced int // [eth/62] Number of headers already processed from the results
  64. headerOffset uint64 // [eth/62] Number of the first header in the result cache
  65. headerContCh chan bool // [eth/62] Channel to notify when header download finishes
  66. // All data retrievals below are based on an already assembles header chain
  67. blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
  68. blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
  69. blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
  70. blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
  71. receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
  72. receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
  73. receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
  74. receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
  75. resultCache []*fetchResult // Downloaded but not yet delivered fetch results
  76. resultOffset uint64 // Offset of the first cached fetch result in the block chain
  77. lock *sync.Mutex
  78. active *sync.Cond
  79. closed bool
  80. }
  81. // newQueue creates a new download queue for scheduling block retrieval.
  82. func newQueue() *queue {
  83. lock := new(sync.Mutex)
  84. return &queue{
  85. headerPendPool: make(map[string]*fetchRequest),
  86. headerContCh: make(chan bool),
  87. blockTaskPool: make(map[common.Hash]*types.Header),
  88. blockTaskQueue: prque.New(),
  89. blockPendPool: make(map[string]*fetchRequest),
  90. blockDonePool: make(map[common.Hash]struct{}),
  91. receiptTaskPool: make(map[common.Hash]*types.Header),
  92. receiptTaskQueue: prque.New(),
  93. receiptPendPool: make(map[string]*fetchRequest),
  94. receiptDonePool: make(map[common.Hash]struct{}),
  95. resultCache: make([]*fetchResult, blockCacheLimit),
  96. active: sync.NewCond(lock),
  97. lock: lock,
  98. }
  99. }
  100. // Reset clears out the queue contents.
  101. func (q *queue) Reset() {
  102. q.lock.Lock()
  103. defer q.lock.Unlock()
  104. q.closed = false
  105. q.mode = FullSync
  106. q.fastSyncPivot = 0
  107. q.headerHead = common.Hash{}
  108. q.headerPendPool = make(map[string]*fetchRequest)
  109. q.blockTaskPool = make(map[common.Hash]*types.Header)
  110. q.blockTaskQueue.Reset()
  111. q.blockPendPool = make(map[string]*fetchRequest)
  112. q.blockDonePool = make(map[common.Hash]struct{})
  113. q.receiptTaskPool = make(map[common.Hash]*types.Header)
  114. q.receiptTaskQueue.Reset()
  115. q.receiptPendPool = make(map[string]*fetchRequest)
  116. q.receiptDonePool = make(map[common.Hash]struct{})
  117. q.resultCache = make([]*fetchResult, blockCacheLimit)
  118. q.resultOffset = 0
  119. }
  120. // Close marks the end of the sync, unblocking WaitResults.
  121. // It may be called even if the queue is already closed.
  122. func (q *queue) Close() {
  123. q.lock.Lock()
  124. q.closed = true
  125. q.lock.Unlock()
  126. q.active.Broadcast()
  127. }
  128. // PendingHeaders retrieves the number of header requests pending for retrieval.
  129. func (q *queue) PendingHeaders() int {
  130. q.lock.Lock()
  131. defer q.lock.Unlock()
  132. return q.headerTaskQueue.Size()
  133. }
  134. // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
  135. func (q *queue) PendingBlocks() int {
  136. q.lock.Lock()
  137. defer q.lock.Unlock()
  138. return q.blockTaskQueue.Size()
  139. }
  140. // PendingReceipts retrieves the number of block receipts pending for retrieval.
  141. func (q *queue) PendingReceipts() int {
  142. q.lock.Lock()
  143. defer q.lock.Unlock()
  144. return q.receiptTaskQueue.Size()
  145. }
  146. // InFlightHeaders retrieves whether there are header fetch requests currently
  147. // in flight.
  148. func (q *queue) InFlightHeaders() bool {
  149. q.lock.Lock()
  150. defer q.lock.Unlock()
  151. return len(q.headerPendPool) > 0
  152. }
  153. // InFlightBlocks retrieves whether there are block fetch requests currently in
  154. // flight.
  155. func (q *queue) InFlightBlocks() bool {
  156. q.lock.Lock()
  157. defer q.lock.Unlock()
  158. return len(q.blockPendPool) > 0
  159. }
  160. // InFlightReceipts retrieves whether there are receipt fetch requests currently
  161. // in flight.
  162. func (q *queue) InFlightReceipts() bool {
  163. q.lock.Lock()
  164. defer q.lock.Unlock()
  165. return len(q.receiptPendPool) > 0
  166. }
  167. // Idle returns if the queue is fully idle or has some data still inside.
  168. func (q *queue) Idle() bool {
  169. q.lock.Lock()
  170. defer q.lock.Unlock()
  171. queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
  172. pending := len(q.blockPendPool) + len(q.receiptPendPool)
  173. cached := len(q.blockDonePool) + len(q.receiptDonePool)
  174. return (queued + pending + cached) == 0
  175. }
  176. // FastSyncPivot retrieves the currently used fast sync pivot point.
  177. func (q *queue) FastSyncPivot() uint64 {
  178. q.lock.Lock()
  179. defer q.lock.Unlock()
  180. return q.fastSyncPivot
  181. }
  182. // ShouldThrottleBlocks checks if the download should be throttled (active block (body)
  183. // fetches exceed block cache).
  184. func (q *queue) ShouldThrottleBlocks() bool {
  185. q.lock.Lock()
  186. defer q.lock.Unlock()
  187. // Calculate the currently in-flight block (body) requests
  188. pending := 0
  189. for _, request := range q.blockPendPool {
  190. pending += len(request.Hashes) + len(request.Headers)
  191. }
  192. // Throttle if more blocks (bodies) are in-flight than free space in the cache
  193. return pending >= len(q.resultCache)-len(q.blockDonePool)
  194. }
  195. // ShouldThrottleReceipts checks if the download should be throttled (active receipt
  196. // fetches exceed block cache).
  197. func (q *queue) ShouldThrottleReceipts() bool {
  198. q.lock.Lock()
  199. defer q.lock.Unlock()
  200. // Calculate the currently in-flight receipt requests
  201. pending := 0
  202. for _, request := range q.receiptPendPool {
  203. pending += len(request.Headers)
  204. }
  205. // Throttle if more receipts are in-flight than free space in the cache
  206. return pending >= len(q.resultCache)-len(q.receiptDonePool)
  207. }
  208. // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
  209. // up an already retrieved header skeleton.
  210. func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
  211. q.lock.Lock()
  212. defer q.lock.Unlock()
  213. // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
  214. if q.headerResults != nil {
  215. panic("skeleton assembly already in progress")
  216. }
  217. // Shedule all the header retrieval tasks for the skeleton assembly
  218. q.headerTaskPool = make(map[uint64]*types.Header)
  219. q.headerTaskQueue = prque.New()
  220. q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
  221. q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
  222. q.headerProced = 0
  223. q.headerOffset = from
  224. q.headerContCh = make(chan bool, 1)
  225. for i, header := range skeleton {
  226. index := from + uint64(i*MaxHeaderFetch)
  227. q.headerTaskPool[index] = header
  228. q.headerTaskQueue.Push(index, -float32(index))
  229. }
  230. }
  231. // RetrieveHeaders retrieves the header chain assemble based on the scheduled
  232. // skeleton.
  233. func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
  234. q.lock.Lock()
  235. defer q.lock.Unlock()
  236. headers, proced := q.headerResults, q.headerProced
  237. q.headerResults, q.headerProced = nil, 0
  238. return headers, proced
  239. }
  240. // Schedule adds a set of headers for the download queue for scheduling, returning
  241. // the new headers encountered.
  242. func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
  243. q.lock.Lock()
  244. defer q.lock.Unlock()
  245. // Insert all the headers prioritised by the contained block number
  246. inserts := make([]*types.Header, 0, len(headers))
  247. for _, header := range headers {
  248. // Make sure chain order is honoured and preserved throughout
  249. hash := header.Hash()
  250. if header.Number == nil || header.Number.Uint64() != from {
  251. log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
  252. break
  253. }
  254. if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
  255. log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
  256. break
  257. }
  258. // Make sure no duplicate requests are executed
  259. if _, ok := q.blockTaskPool[hash]; ok {
  260. log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
  261. continue
  262. }
  263. if _, ok := q.receiptTaskPool[hash]; ok {
  264. log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
  265. continue
  266. }
  267. // Queue the header for content retrieval
  268. q.blockTaskPool[hash] = header
  269. q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
  270. if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
  271. // Fast phase of the fast sync, retrieve receipts too
  272. q.receiptTaskPool[hash] = header
  273. q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
  274. }
  275. inserts = append(inserts, header)
  276. q.headerHead = hash
  277. from++
  278. }
  279. return inserts
  280. }
  281. // WaitResults retrieves and permanently removes a batch of fetch
  282. // results from the cache. the result slice will be empty if the queue
  283. // has been closed.
  284. func (q *queue) WaitResults() []*fetchResult {
  285. q.lock.Lock()
  286. defer q.lock.Unlock()
  287. nproc := q.countProcessableItems()
  288. for nproc == 0 && !q.closed {
  289. q.active.Wait()
  290. nproc = q.countProcessableItems()
  291. }
  292. results := make([]*fetchResult, nproc)
  293. copy(results, q.resultCache[:nproc])
  294. if len(results) > 0 {
  295. // Mark results as done before dropping them from the cache.
  296. for _, result := range results {
  297. hash := result.Header.Hash()
  298. delete(q.blockDonePool, hash)
  299. delete(q.receiptDonePool, hash)
  300. }
  301. // Delete the results from the cache and clear the tail.
  302. copy(q.resultCache, q.resultCache[nproc:])
  303. for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
  304. q.resultCache[i] = nil
  305. }
  306. // Advance the expected block number of the first cache entry.
  307. q.resultOffset += uint64(nproc)
  308. }
  309. return results
  310. }
  311. // countProcessableItems counts the processable items.
  312. func (q *queue) countProcessableItems() int {
  313. for i, result := range q.resultCache {
  314. // Don't process incomplete or unavailable items.
  315. if result == nil || result.Pending > 0 {
  316. return i
  317. }
  318. // Stop before processing the pivot block to ensure that
  319. // resultCache has space for fsHeaderForceVerify items. Not
  320. // doing this could leave us unable to download the required
  321. // amount of headers.
  322. if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
  323. for j := 0; j < fsHeaderForceVerify; j++ {
  324. if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
  325. return i
  326. }
  327. }
  328. }
  329. }
  330. return len(q.resultCache)
  331. }
  332. // ReserveHeaders reserves a set of headers for the given peer, skipping any
  333. // previously failed batches.
  334. func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
  335. q.lock.Lock()
  336. defer q.lock.Unlock()
  337. // Short circuit if the peer's already downloading something (sanity check to
  338. // not corrupt state)
  339. if _, ok := q.headerPendPool[p.id]; ok {
  340. return nil
  341. }
  342. // Retrieve a batch of hashes, skipping previously failed ones
  343. send, skip := uint64(0), []uint64{}
  344. for send == 0 && !q.headerTaskQueue.Empty() {
  345. from, _ := q.headerTaskQueue.Pop()
  346. if q.headerPeerMiss[p.id] != nil {
  347. if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
  348. skip = append(skip, from.(uint64))
  349. continue
  350. }
  351. }
  352. send = from.(uint64)
  353. }
  354. // Merge all the skipped batches back
  355. for _, from := range skip {
  356. q.headerTaskQueue.Push(from, -float32(from))
  357. }
  358. // Assemble and return the block download request
  359. if send == 0 {
  360. return nil
  361. }
  362. request := &fetchRequest{
  363. Peer: p,
  364. From: send,
  365. Time: time.Now(),
  366. }
  367. q.headerPendPool[p.id] = request
  368. return request
  369. }
  370. // ReserveBodies reserves a set of body fetches for the given peer, skipping any
  371. // previously failed downloads. Beside the next batch of needed fetches, it also
  372. // returns a flag whether empty blocks were queued requiring processing.
  373. func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
  374. isNoop := func(header *types.Header) bool {
  375. return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
  376. }
  377. q.lock.Lock()
  378. defer q.lock.Unlock()
  379. return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
  380. }
  381. // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
  382. // any previously failed downloads. Beside the next batch of needed fetches, it
  383. // also returns a flag whether empty receipts were queued requiring importing.
  384. func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) {
  385. isNoop := func(header *types.Header) bool {
  386. return header.ReceiptHash == types.EmptyRootHash
  387. }
  388. q.lock.Lock()
  389. defer q.lock.Unlock()
  390. return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
  391. }
  392. // reserveHeaders reserves a set of data download operations for a given peer,
  393. // skipping any previously failed ones. This method is a generic version used
  394. // by the individual special reservation functions.
  395. //
  396. // Note, this method expects the queue lock to be already held for writing. The
  397. // reason the lock is not obtained in here is because the parameters already need
  398. // to access the queue, so they already need a lock anyway.
  399. func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  400. pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
  401. // Short circuit if the pool has been depleted, or if the peer's already
  402. // downloading something (sanity check not to corrupt state)
  403. if taskQueue.Empty() {
  404. return nil, false, nil
  405. }
  406. if _, ok := pendPool[p.id]; ok {
  407. return nil, false, nil
  408. }
  409. // Calculate an upper limit on the items we might fetch (i.e. throttling)
  410. space := len(q.resultCache) - len(donePool)
  411. for _, request := range pendPool {
  412. space -= len(request.Headers)
  413. }
  414. // Retrieve a batch of tasks, skipping previously failed ones
  415. send := make([]*types.Header, 0, count)
  416. skip := make([]*types.Header, 0)
  417. progress := false
  418. for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
  419. header := taskQueue.PopItem().(*types.Header)
  420. // If we're the first to request this task, initialise the result container
  421. index := int(header.Number.Int64() - int64(q.resultOffset))
  422. if index >= len(q.resultCache) || index < 0 {
  423. common.Report("index allocation went beyond available resultCache space")
  424. return nil, false, errInvalidChain
  425. }
  426. if q.resultCache[index] == nil {
  427. components := 1
  428. if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
  429. components = 2
  430. }
  431. q.resultCache[index] = &fetchResult{
  432. Pending: components,
  433. Header: header,
  434. }
  435. }
  436. // If this fetch task is a noop, skip this fetch operation
  437. if isNoop(header) {
  438. donePool[header.Hash()] = struct{}{}
  439. delete(taskPool, header.Hash())
  440. space, proc = space-1, proc-1
  441. q.resultCache[index].Pending--
  442. progress = true
  443. continue
  444. }
  445. // Otherwise unless the peer is known not to have the data, add to the retrieve list
  446. if p.Lacks(header.Hash()) {
  447. skip = append(skip, header)
  448. } else {
  449. send = append(send, header)
  450. }
  451. }
  452. // Merge all the skipped headers back
  453. for _, header := range skip {
  454. taskQueue.Push(header, -float32(header.Number.Uint64()))
  455. }
  456. if progress {
  457. // Wake WaitResults, resultCache was modified
  458. q.active.Signal()
  459. }
  460. // Assemble and return the block download request
  461. if len(send) == 0 {
  462. return nil, progress, nil
  463. }
  464. request := &fetchRequest{
  465. Peer: p,
  466. Headers: send,
  467. Time: time.Now(),
  468. }
  469. pendPool[p.id] = request
  470. return request, progress, nil
  471. }
  472. // CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
  473. func (q *queue) CancelHeaders(request *fetchRequest) {
  474. q.cancel(request, q.headerTaskQueue, q.headerPendPool)
  475. }
  476. // CancelBodies aborts a body fetch request, returning all pending headers to the
  477. // task queue.
  478. func (q *queue) CancelBodies(request *fetchRequest) {
  479. q.cancel(request, q.blockTaskQueue, q.blockPendPool)
  480. }
  481. // CancelReceipts aborts a body fetch request, returning all pending headers to
  482. // the task queue.
  483. func (q *queue) CancelReceipts(request *fetchRequest) {
  484. q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
  485. }
  486. // Cancel aborts a fetch request, returning all pending hashes to the task queue.
  487. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
  488. q.lock.Lock()
  489. defer q.lock.Unlock()
  490. if request.From > 0 {
  491. taskQueue.Push(request.From, -float32(request.From))
  492. }
  493. for hash, index := range request.Hashes {
  494. taskQueue.Push(hash, float32(index))
  495. }
  496. for _, header := range request.Headers {
  497. taskQueue.Push(header, -float32(header.Number.Uint64()))
  498. }
  499. delete(pendPool, request.Peer.id)
  500. }
  501. // Revoke cancels all pending requests belonging to a given peer. This method is
  502. // meant to be called during a peer drop to quickly reassign owned data fetches
  503. // to remaining nodes.
  504. func (q *queue) Revoke(peerId string) {
  505. q.lock.Lock()
  506. defer q.lock.Unlock()
  507. if request, ok := q.blockPendPool[peerId]; ok {
  508. for _, header := range request.Headers {
  509. q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
  510. }
  511. delete(q.blockPendPool, peerId)
  512. }
  513. if request, ok := q.receiptPendPool[peerId]; ok {
  514. for _, header := range request.Headers {
  515. q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
  516. }
  517. delete(q.receiptPendPool, peerId)
  518. }
  519. }
  520. // ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
  521. // canceling them and returning the responsible peers for penalisation.
  522. func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
  523. q.lock.Lock()
  524. defer q.lock.Unlock()
  525. return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
  526. }
  527. // ExpireBodies checks for in flight block body requests that exceeded a timeout
  528. // allowance, canceling them and returning the responsible peers for penalisation.
  529. func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
  530. q.lock.Lock()
  531. defer q.lock.Unlock()
  532. return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
  533. }
  534. // ExpireReceipts checks for in flight receipt requests that exceeded a timeout
  535. // allowance, canceling them and returning the responsible peers for penalisation.
  536. func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
  537. q.lock.Lock()
  538. defer q.lock.Unlock()
  539. return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
  540. }
  541. // expire is the generic check that move expired tasks from a pending pool back
  542. // into a task pool, returning all entities caught with expired tasks.
  543. //
  544. // Note, this method expects the queue lock to be already held. The
  545. // reason the lock is not obtained in here is because the parameters already need
  546. // to access the queue, so they already need a lock anyway.
  547. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
  548. // Iterate over the expired requests and return each to the queue
  549. expiries := make(map[string]int)
  550. for id, request := range pendPool {
  551. if time.Since(request.Time) > timeout {
  552. // Update the metrics with the timeout
  553. timeoutMeter.Mark(1)
  554. // Return any non satisfied requests to the pool
  555. if request.From > 0 {
  556. taskQueue.Push(request.From, -float32(request.From))
  557. }
  558. for hash, index := range request.Hashes {
  559. taskQueue.Push(hash, float32(index))
  560. }
  561. for _, header := range request.Headers {
  562. taskQueue.Push(header, -float32(header.Number.Uint64()))
  563. }
  564. // Add the peer to the expiry report along the the number of failed requests
  565. expirations := len(request.Hashes)
  566. if expirations < len(request.Headers) {
  567. expirations = len(request.Headers)
  568. }
  569. expiries[id] = expirations
  570. }
  571. }
  572. // Remove the expired requests from the pending pool
  573. for id := range expiries {
  574. delete(pendPool, id)
  575. }
  576. return expiries
  577. }
  578. // DeliverHeaders injects a header retrieval response into the header results
  579. // cache. This method either accepts all headers it received, or none of them
  580. // if they do not map correctly to the skeleton.
  581. //
  582. // If the headers are accepted, the method makes an attempt to deliver the set
  583. // of ready headers to the processor to keep the pipeline full. However it will
  584. // not block to prevent stalling other pending deliveries.
  585. func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
  586. q.lock.Lock()
  587. defer q.lock.Unlock()
  588. // Short circuit if the data was never requested
  589. request := q.headerPendPool[id]
  590. if request == nil {
  591. return 0, errNoFetchesPending
  592. }
  593. headerReqTimer.UpdateSince(request.Time)
  594. delete(q.headerPendPool, id)
  595. // Ensure headers can be mapped onto the skeleton chain
  596. target := q.headerTaskPool[request.From].Hash()
  597. accepted := len(headers) == MaxHeaderFetch
  598. if accepted {
  599. if headers[0].Number.Uint64() != request.From {
  600. log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
  601. accepted = false
  602. } else if headers[len(headers)-1].Hash() != target {
  603. log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
  604. accepted = false
  605. }
  606. }
  607. if accepted {
  608. for i, header := range headers[1:] {
  609. hash := header.Hash()
  610. if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
  611. log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
  612. accepted = false
  613. break
  614. }
  615. if headers[i].Hash() != header.ParentHash {
  616. log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
  617. accepted = false
  618. break
  619. }
  620. }
  621. }
  622. // If the batch of headers wasn't accepted, mark as unavailable
  623. if !accepted {
  624. log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)
  625. miss := q.headerPeerMiss[id]
  626. if miss == nil {
  627. q.headerPeerMiss[id] = make(map[uint64]struct{})
  628. miss = q.headerPeerMiss[id]
  629. }
  630. miss[request.From] = struct{}{}
  631. q.headerTaskQueue.Push(request.From, -float32(request.From))
  632. return 0, errors.New("delivery not accepted")
  633. }
  634. // Clean up a successful fetch and try to deliver any sub-results
  635. copy(q.headerResults[request.From-q.headerOffset:], headers)
  636. delete(q.headerTaskPool, request.From)
  637. ready := 0
  638. for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
  639. ready += MaxHeaderFetch
  640. }
  641. if ready > 0 {
  642. // Headers are ready for delivery, gather them and push forward (non blocking)
  643. process := make([]*types.Header, ready)
  644. copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
  645. select {
  646. case headerProcCh <- process:
  647. log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
  648. q.headerProced += len(process)
  649. default:
  650. }
  651. }
  652. // Check for termination and return
  653. if len(q.headerTaskPool) == 0 {
  654. q.headerContCh <- false
  655. }
  656. return len(headers), nil
  657. }
  658. // DeliverBodies injects a block body retrieval response into the results queue.
  659. // The method returns the number of blocks bodies accepted from the delivery and
  660. // also wakes any threads waiting for data delivery.
  661. func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
  662. q.lock.Lock()
  663. defer q.lock.Unlock()
  664. reconstruct := func(header *types.Header, index int, result *fetchResult) error {
  665. if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
  666. return errInvalidBody
  667. }
  668. result.Transactions = txLists[index]
  669. result.Uncles = uncleLists[index]
  670. return nil
  671. }
  672. return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
  673. }
  674. // DeliverReceipts injects a receipt retrieval response into the results queue.
  675. // The method returns the number of transaction receipts accepted from the delivery
  676. // and also wakes any threads waiting for data delivery.
  677. func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
  678. q.lock.Lock()
  679. defer q.lock.Unlock()
  680. reconstruct := func(header *types.Header, index int, result *fetchResult) error {
  681. if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
  682. return errInvalidReceipt
  683. }
  684. result.Receipts = receiptList[index]
  685. return nil
  686. }
  687. return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
  688. }
  689. // deliver injects a data retrieval response into the results queue.
  690. //
  691. // Note, this method expects the queue lock to be already held for writing. The
  692. // reason the lock is not obtained in here is because the parameters already need
  693. // to access the queue, so they already need a lock anyway.
  694. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  695. pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
  696. results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
  697. // Short circuit if the data was never requested
  698. request := pendPool[id]
  699. if request == nil {
  700. return 0, errNoFetchesPending
  701. }
  702. reqTimer.UpdateSince(request.Time)
  703. delete(pendPool, id)
  704. // If no data items were retrieved, mark them as unavailable for the origin peer
  705. if results == 0 {
  706. for _, header := range request.Headers {
  707. request.Peer.MarkLacking(header.Hash())
  708. }
  709. }
  710. // Assemble each of the results with their headers and retrieved data parts
  711. var (
  712. accepted int
  713. failure error
  714. useful bool
  715. )
  716. for i, header := range request.Headers {
  717. // Short circuit assembly if no more fetch results are found
  718. if i >= results {
  719. break
  720. }
  721. // Reconstruct the next result if contents match up
  722. index := int(header.Number.Int64() - int64(q.resultOffset))
  723. if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
  724. failure = errInvalidChain
  725. break
  726. }
  727. if err := reconstruct(header, i, q.resultCache[index]); err != nil {
  728. failure = err
  729. break
  730. }
  731. donePool[header.Hash()] = struct{}{}
  732. q.resultCache[index].Pending--
  733. useful = true
  734. accepted++
  735. // Clean up a successful fetch
  736. request.Headers[i] = nil
  737. delete(taskPool, header.Hash())
  738. }
  739. // Return all failed or missing fetches to the queue
  740. for _, header := range request.Headers {
  741. if header != nil {
  742. taskQueue.Push(header, -float32(header.Number.Uint64()))
  743. }
  744. }
  745. // Wake up WaitResults
  746. if accepted > 0 {
  747. q.active.Signal()
  748. }
  749. // If none of the data was good, it's a stale delivery
  750. switch {
  751. case failure == nil || failure == errInvalidChain:
  752. return accepted, failure
  753. case useful:
  754. return accepted, fmt.Errorf("partial failure: %v", failure)
  755. default:
  756. return accepted, errStaleDelivery
  757. }
  758. }
  759. // Prepare configures the result cache to allow accepting and caching inbound
  760. // fetch results.
  761. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
  762. q.lock.Lock()
  763. defer q.lock.Unlock()
  764. // Prepare the queue for sync results
  765. if q.resultOffset < offset {
  766. q.resultOffset = offset
  767. }
  768. q.fastSyncPivot = pivot
  769. q.mode = mode
  770. }