queue.go 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the block download scheduler to collect download tasks and schedule
  17. // them in an ordered, and throttled way.
  18. package downloader
  19. import (
  20. "errors"
  21. "fmt"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/core/state"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/logger"
  31. "github.com/ethereum/go-ethereum/logger/glog"
  32. "github.com/ethereum/go-ethereum/trie"
  33. "github.com/rcrowley/go-metrics"
  34. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  35. )
  36. var (
  37. blockCacheLimit = 8192 // Maximum number of blocks to cache before throttling the download
  38. maxInFlightStates = 8192 // Maximum number of state downloads to allow concurrently
  39. )
  40. var (
  41. errNoFetchesPending = errors.New("no fetches pending")
  42. errStateSyncPending = errors.New("state trie sync already scheduled")
  43. errStaleDelivery = errors.New("stale delivery")
  44. )
  45. // fetchRequest is a currently running data retrieval operation.
  46. type fetchRequest struct {
  47. Peer *peer // Peer to which the request was sent
  48. From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
  49. Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
  50. Headers []*types.Header // [eth/62] Requested headers, sorted by request order
  51. Time time.Time // Time when the request was made
  52. }
  53. // fetchResult is a struct collecting partial results from data fetchers until
  54. // all outstanding pieces complete and the result as a whole can be processed.
  55. type fetchResult struct {
  56. Pending int // Number of data fetches still pending
  57. Header *types.Header
  58. Uncles []*types.Header
  59. Transactions types.Transactions
  60. Receipts types.Receipts
  61. }
  62. // queue represents hashes that are either need fetching or are being fetched
  63. type queue struct {
  64. mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
  65. fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
  66. hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
  67. hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
  68. hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
  69. headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
  70. // Headers are "special", they download in batches, supported by a skeleton chain
  71. headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
  72. headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
  73. headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
  74. headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
  75. headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches
  76. headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
  77. headerProced int // [eth/62] Number of headers already processed from the results
  78. headerOffset uint64 // [eth/62] Number of the first header in the result cache
  79. headerContCh chan bool // [eth/62] Channel to notify when header download finishes
  80. // All data retrievals below are based on an already assembles header chain
  81. blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
  82. blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
  83. blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
  84. blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
  85. receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
  86. receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
  87. receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
  88. receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
  89. stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order
  90. stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority
  91. stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
  92. statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
  93. stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
  94. stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
  95. stateProcessors int32 // [eth/63] Number of currently running state processors
  96. stateSchedLock sync.RWMutex // [eth/63] Lock serialising access to the state scheduler
  97. resultCache []*fetchResult // Downloaded but not yet delivered fetch results
  98. resultOffset uint64 // Offset of the first cached fetch result in the block chain
  99. lock *sync.Mutex
  100. active *sync.Cond
  101. closed bool
  102. }
  103. // newQueue creates a new download queue for scheduling block retrieval.
  104. func newQueue(stateDb ethdb.Database) *queue {
  105. lock := new(sync.Mutex)
  106. return &queue{
  107. hashPool: make(map[common.Hash]int),
  108. hashQueue: prque.New(),
  109. headerPendPool: make(map[string]*fetchRequest),
  110. headerContCh: make(chan bool),
  111. blockTaskPool: make(map[common.Hash]*types.Header),
  112. blockTaskQueue: prque.New(),
  113. blockPendPool: make(map[string]*fetchRequest),
  114. blockDonePool: make(map[common.Hash]struct{}),
  115. receiptTaskPool: make(map[common.Hash]*types.Header),
  116. receiptTaskQueue: prque.New(),
  117. receiptPendPool: make(map[string]*fetchRequest),
  118. receiptDonePool: make(map[common.Hash]struct{}),
  119. stateTaskPool: make(map[common.Hash]int),
  120. stateTaskQueue: prque.New(),
  121. statePendPool: make(map[string]*fetchRequest),
  122. stateDatabase: stateDb,
  123. resultCache: make([]*fetchResult, blockCacheLimit),
  124. active: sync.NewCond(lock),
  125. lock: lock,
  126. }
  127. }
  128. // Reset clears out the queue contents.
  129. func (q *queue) Reset() {
  130. q.lock.Lock()
  131. defer q.lock.Unlock()
  132. q.stateSchedLock.Lock()
  133. defer q.stateSchedLock.Unlock()
  134. q.closed = false
  135. q.mode = FullSync
  136. q.fastSyncPivot = 0
  137. q.hashPool = make(map[common.Hash]int)
  138. q.hashQueue.Reset()
  139. q.hashCounter = 0
  140. q.headerHead = common.Hash{}
  141. q.headerPendPool = make(map[string]*fetchRequest)
  142. q.blockTaskPool = make(map[common.Hash]*types.Header)
  143. q.blockTaskQueue.Reset()
  144. q.blockPendPool = make(map[string]*fetchRequest)
  145. q.blockDonePool = make(map[common.Hash]struct{})
  146. q.receiptTaskPool = make(map[common.Hash]*types.Header)
  147. q.receiptTaskQueue.Reset()
  148. q.receiptPendPool = make(map[string]*fetchRequest)
  149. q.receiptDonePool = make(map[common.Hash]struct{})
  150. q.stateTaskIndex = 0
  151. q.stateTaskPool = make(map[common.Hash]int)
  152. q.stateTaskQueue.Reset()
  153. q.statePendPool = make(map[string]*fetchRequest)
  154. q.stateScheduler = nil
  155. q.resultCache = make([]*fetchResult, blockCacheLimit)
  156. q.resultOffset = 0
  157. }
  158. // Close marks the end of the sync, unblocking WaitResults.
  159. // It may be called even if the queue is already closed.
  160. func (q *queue) Close() {
  161. q.lock.Lock()
  162. q.closed = true
  163. q.lock.Unlock()
  164. q.active.Broadcast()
  165. }
  166. // PendingHeaders retrieves the number of header requests pending for retrieval.
  167. func (q *queue) PendingHeaders() int {
  168. q.lock.Lock()
  169. defer q.lock.Unlock()
  170. return q.headerTaskQueue.Size()
  171. }
  172. // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
  173. func (q *queue) PendingBlocks() int {
  174. q.lock.Lock()
  175. defer q.lock.Unlock()
  176. return q.hashQueue.Size() + q.blockTaskQueue.Size()
  177. }
  178. // PendingReceipts retrieves the number of block receipts pending for retrieval.
  179. func (q *queue) PendingReceipts() int {
  180. q.lock.Lock()
  181. defer q.lock.Unlock()
  182. return q.receiptTaskQueue.Size()
  183. }
  184. // PendingNodeData retrieves the number of node data entries pending for retrieval.
  185. func (q *queue) PendingNodeData() int {
  186. q.stateSchedLock.RLock()
  187. defer q.stateSchedLock.RUnlock()
  188. if q.stateScheduler != nil {
  189. return q.stateScheduler.Pending()
  190. }
  191. return 0
  192. }
  193. // InFlightHeaders retrieves whether there are header fetch requests currently
  194. // in flight.
  195. func (q *queue) InFlightHeaders() bool {
  196. q.lock.Lock()
  197. defer q.lock.Unlock()
  198. return len(q.headerPendPool) > 0
  199. }
  200. // InFlightBlocks retrieves whether there are block fetch requests currently in
  201. // flight.
  202. func (q *queue) InFlightBlocks() bool {
  203. q.lock.Lock()
  204. defer q.lock.Unlock()
  205. return len(q.blockPendPool) > 0
  206. }
  207. // InFlightReceipts retrieves whether there are receipt fetch requests currently
  208. // in flight.
  209. func (q *queue) InFlightReceipts() bool {
  210. q.lock.Lock()
  211. defer q.lock.Unlock()
  212. return len(q.receiptPendPool) > 0
  213. }
  214. // InFlightNodeData retrieves whether there are node data entry fetch requests
  215. // currently in flight.
  216. func (q *queue) InFlightNodeData() bool {
  217. q.lock.Lock()
  218. defer q.lock.Unlock()
  219. return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
  220. }
  221. // Idle returns if the queue is fully idle or has some data still inside. This
  222. // method is used by the tester to detect termination events.
  223. func (q *queue) Idle() bool {
  224. q.lock.Lock()
  225. defer q.lock.Unlock()
  226. queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
  227. pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
  228. cached := len(q.blockDonePool) + len(q.receiptDonePool)
  229. q.stateSchedLock.RLock()
  230. if q.stateScheduler != nil {
  231. queued += q.stateScheduler.Pending()
  232. }
  233. q.stateSchedLock.RUnlock()
  234. return (queued + pending + cached) == 0
  235. }
  236. // FastSyncPivot retrieves the currently used fast sync pivot point.
  237. func (q *queue) FastSyncPivot() uint64 {
  238. q.lock.Lock()
  239. defer q.lock.Unlock()
  240. return q.fastSyncPivot
  241. }
  242. // ShouldThrottleBlocks checks if the download should be throttled (active block (body)
  243. // fetches exceed block cache).
  244. func (q *queue) ShouldThrottleBlocks() bool {
  245. q.lock.Lock()
  246. defer q.lock.Unlock()
  247. // Calculate the currently in-flight block (body) requests
  248. pending := 0
  249. for _, request := range q.blockPendPool {
  250. pending += len(request.Hashes) + len(request.Headers)
  251. }
  252. // Throttle if more blocks (bodies) are in-flight than free space in the cache
  253. return pending >= len(q.resultCache)-len(q.blockDonePool)
  254. }
  255. // ShouldThrottleReceipts checks if the download should be throttled (active receipt
  256. // fetches exceed block cache).
  257. func (q *queue) ShouldThrottleReceipts() bool {
  258. q.lock.Lock()
  259. defer q.lock.Unlock()
  260. // Calculate the currently in-flight receipt requests
  261. pending := 0
  262. for _, request := range q.receiptPendPool {
  263. pending += len(request.Headers)
  264. }
  265. // Throttle if more receipts are in-flight than free space in the cache
  266. return pending >= len(q.resultCache)-len(q.receiptDonePool)
  267. }
  268. // Schedule61 adds a set of hashes for the download queue for scheduling, returning
  269. // the new hashes encountered.
  270. func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
  271. q.lock.Lock()
  272. defer q.lock.Unlock()
  273. // Insert all the hashes prioritised in the arrival order
  274. inserts := make([]common.Hash, 0, len(hashes))
  275. for _, hash := range hashes {
  276. // Skip anything we already have
  277. if old, ok := q.hashPool[hash]; ok {
  278. glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
  279. continue
  280. }
  281. // Update the counters and insert the hash
  282. q.hashCounter = q.hashCounter + 1
  283. inserts = append(inserts, hash)
  284. q.hashPool[hash] = q.hashCounter
  285. if fifo {
  286. q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first
  287. } else {
  288. q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
  289. }
  290. }
  291. return inserts
  292. }
  293. // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
  294. // up an already retrieved header skeleton.
  295. func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
  296. q.lock.Lock()
  297. defer q.lock.Unlock()
  298. // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
  299. if q.headerResults != nil {
  300. panic("skeleton assembly already in progress")
  301. }
  302. // Shedule all the header retrieval tasks for the skeleton assembly
  303. q.headerTaskPool = make(map[uint64]*types.Header)
  304. q.headerTaskQueue = prque.New()
  305. q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
  306. q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
  307. q.headerProced = 0
  308. q.headerOffset = from
  309. q.headerContCh = make(chan bool, 1)
  310. for i, header := range skeleton {
  311. index := from + uint64(i*MaxHeaderFetch)
  312. q.headerTaskPool[index] = header
  313. q.headerTaskQueue.Push(index, -float32(index))
  314. }
  315. }
  316. // RetrieveHeaders retrieves the header chain assemble based on the scheduled
  317. // skeleton.
  318. func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
  319. q.lock.Lock()
  320. defer q.lock.Unlock()
  321. headers, proced := q.headerResults, q.headerProced
  322. q.headerResults, q.headerProced = nil, 0
  323. return headers, proced
  324. }
  325. // Schedule adds a set of headers for the download queue for scheduling, returning
  326. // the new headers encountered.
  327. func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
  328. q.lock.Lock()
  329. defer q.lock.Unlock()
  330. // Insert all the headers prioritised by the contained block number
  331. inserts := make([]*types.Header, 0, len(headers))
  332. for _, header := range headers {
  333. // Make sure chain order is honoured and preserved throughout
  334. hash := header.Hash()
  335. if header.Number == nil || header.Number.Uint64() != from {
  336. glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
  337. break
  338. }
  339. if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
  340. glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
  341. break
  342. }
  343. // Make sure no duplicate requests are executed
  344. if _, ok := q.blockTaskPool[hash]; ok {
  345. glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
  346. continue
  347. }
  348. if _, ok := q.receiptTaskPool[hash]; ok {
  349. glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
  350. continue
  351. }
  352. // Queue the header for content retrieval
  353. q.blockTaskPool[hash] = header
  354. q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
  355. if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
  356. // Fast phase of the fast sync, retrieve receipts too
  357. q.receiptTaskPool[hash] = header
  358. q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
  359. }
  360. if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
  361. // Pivoting point of the fast sync, retrieve the state tries
  362. q.stateSchedLock.Lock()
  363. q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
  364. q.stateSchedLock.Unlock()
  365. }
  366. inserts = append(inserts, header)
  367. q.headerHead = hash
  368. from++
  369. }
  370. return inserts
  371. }
  372. // WaitResults retrieves and permanently removes a batch of fetch
  373. // results from the cache. the result slice will be empty if the queue
  374. // has been closed.
  375. func (q *queue) WaitResults() []*fetchResult {
  376. q.lock.Lock()
  377. defer q.lock.Unlock()
  378. nproc := q.countProcessableItems()
  379. for nproc == 0 && !q.closed {
  380. q.active.Wait()
  381. nproc = q.countProcessableItems()
  382. }
  383. results := make([]*fetchResult, nproc)
  384. copy(results, q.resultCache[:nproc])
  385. if len(results) > 0 {
  386. // Mark results as done before dropping them from the cache.
  387. for _, result := range results {
  388. hash := result.Header.Hash()
  389. delete(q.blockDonePool, hash)
  390. delete(q.receiptDonePool, hash)
  391. }
  392. // Delete the results from the cache and clear the tail.
  393. copy(q.resultCache, q.resultCache[nproc:])
  394. for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
  395. q.resultCache[i] = nil
  396. }
  397. // Advance the expected block number of the first cache entry.
  398. q.resultOffset += uint64(nproc)
  399. }
  400. return results
  401. }
  402. // countProcessableItems counts the processable items.
  403. func (q *queue) countProcessableItems() int {
  404. for i, result := range q.resultCache {
  405. // Don't process incomplete or unavailable items.
  406. if result == nil || result.Pending > 0 {
  407. return i
  408. }
  409. // Special handling for the fast-sync pivot block:
  410. if q.mode == FastSync {
  411. bnum := result.Header.Number.Uint64()
  412. if bnum == q.fastSyncPivot {
  413. // If the state of the pivot block is not
  414. // available yet, we cannot proceed and return 0.
  415. //
  416. // Stop before processing the pivot block to ensure that
  417. // resultCache has space for fsHeaderForceVerify items. Not
  418. // doing this could leave us unable to download the required
  419. // amount of headers.
  420. if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
  421. return i
  422. }
  423. for j := 0; j < fsHeaderForceVerify; j++ {
  424. if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
  425. return i
  426. }
  427. }
  428. }
  429. // If we're just the fast sync pivot, stop as well
  430. // because the following batch needs different insertion.
  431. // This simplifies handling the switchover in d.process.
  432. if bnum == q.fastSyncPivot+1 && i > 0 {
  433. return i
  434. }
  435. }
  436. }
  437. return len(q.resultCache)
  438. }
  439. // ReserveHeaders reserves a set of headers for the given peer, skipping any
  440. // previously failed batches.
  441. func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
  442. q.lock.Lock()
  443. defer q.lock.Unlock()
  444. // Short circuit if the peer's already downloading something (sanity check to
  445. // not corrupt state)
  446. if _, ok := q.headerPendPool[p.id]; ok {
  447. return nil
  448. }
  449. // Retrieve a batch of hashes, skipping previously failed ones
  450. send, skip := uint64(0), []uint64{}
  451. for send == 0 && !q.headerTaskQueue.Empty() {
  452. from, _ := q.headerTaskQueue.Pop()
  453. if q.headerPeerMiss[p.id] != nil {
  454. if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
  455. skip = append(skip, from.(uint64))
  456. continue
  457. }
  458. }
  459. send = from.(uint64)
  460. }
  461. // Merge all the skipped batches back
  462. for _, from := range skip {
  463. q.headerTaskQueue.Push(from, -float32(from))
  464. }
  465. // Assemble and return the block download request
  466. if send == 0 {
  467. return nil
  468. }
  469. request := &fetchRequest{
  470. Peer: p,
  471. From: send,
  472. Time: time.Now(),
  473. }
  474. q.headerPendPool[p.id] = request
  475. return request
  476. }
  477. // ReserveBlocks reserves a set of block hashes for the given peer, skipping any
  478. // previously failed download.
  479. func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
  480. q.lock.Lock()
  481. defer q.lock.Unlock()
  482. return q.reserveHashes(p, count, q.hashQueue, nil, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool))
  483. }
  484. // ReserveNodeData reserves a set of node data hashes for the given peer, skipping
  485. // any previously failed download.
  486. func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
  487. // Create a task generator to fetch status-fetch tasks if all schedules ones are done
  488. generator := func(max int) {
  489. q.stateSchedLock.Lock()
  490. defer q.stateSchedLock.Unlock()
  491. if q.stateScheduler != nil {
  492. for _, hash := range q.stateScheduler.Missing(max) {
  493. q.stateTaskPool[hash] = q.stateTaskIndex
  494. q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
  495. q.stateTaskIndex++
  496. }
  497. }
  498. }
  499. q.lock.Lock()
  500. defer q.lock.Unlock()
  501. return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, maxInFlightStates)
  502. }
  503. // reserveHashes reserves a set of hashes for the given peer, skipping previously
  504. // failed ones.
  505. //
  506. // Note, this method expects the queue lock to be already held for writing. The
  507. // reason the lock is not obtained in here is because the parameters already need
  508. // to access the queue, so they already need a lock anyway.
  509. func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
  510. // Short circuit if the peer's already downloading something (sanity check to
  511. // not corrupt state)
  512. if _, ok := pendPool[p.id]; ok {
  513. return nil
  514. }
  515. // Calculate an upper limit on the hashes we might fetch (i.e. throttling)
  516. allowance := maxPending
  517. if allowance > 0 {
  518. for _, request := range pendPool {
  519. allowance -= len(request.Hashes)
  520. }
  521. }
  522. // If there's a task generator, ask it to fill our task queue
  523. if taskGen != nil && taskQueue.Size() < allowance {
  524. taskGen(allowance - taskQueue.Size())
  525. }
  526. if taskQueue.Empty() {
  527. return nil
  528. }
  529. // Retrieve a batch of hashes, skipping previously failed ones
  530. send := make(map[common.Hash]int)
  531. skip := make(map[common.Hash]int)
  532. for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
  533. hash, priority := taskQueue.Pop()
  534. if p.Lacks(hash.(common.Hash)) {
  535. skip[hash.(common.Hash)] = int(priority)
  536. } else {
  537. send[hash.(common.Hash)] = int(priority)
  538. }
  539. }
  540. // Merge all the skipped hashes back
  541. for hash, index := range skip {
  542. taskQueue.Push(hash, float32(index))
  543. }
  544. // Assemble and return the block download request
  545. if len(send) == 0 {
  546. return nil
  547. }
  548. request := &fetchRequest{
  549. Peer: p,
  550. Hashes: send,
  551. Time: time.Now(),
  552. }
  553. pendPool[p.id] = request
  554. return request
  555. }
  556. // ReserveBodies reserves a set of body fetches for the given peer, skipping any
  557. // previously failed downloads. Beside the next batch of needed fetches, it also
  558. // returns a flag whether empty blocks were queued requiring processing.
  559. func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
  560. isNoop := func(header *types.Header) bool {
  561. return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
  562. }
  563. q.lock.Lock()
  564. defer q.lock.Unlock()
  565. return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
  566. }
  567. // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
  568. // any previously failed downloads. Beside the next batch of needed fetches, it
  569. // also returns a flag whether empty receipts were queued requiring importing.
  570. func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) {
  571. isNoop := func(header *types.Header) bool {
  572. return header.ReceiptHash == types.EmptyRootHash
  573. }
  574. q.lock.Lock()
  575. defer q.lock.Unlock()
  576. return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
  577. }
  578. // reserveHeaders reserves a set of data download operations for a given peer,
  579. // skipping any previously failed ones. This method is a generic version used
  580. // by the individual special reservation functions.
  581. //
  582. // Note, this method expects the queue lock to be already held for writing. The
  583. // reason the lock is not obtained in here is because the parameters already need
  584. // to access the queue, so they already need a lock anyway.
  585. func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  586. pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
  587. // Short circuit if the pool has been depleted, or if the peer's already
  588. // downloading something (sanity check not to corrupt state)
  589. if taskQueue.Empty() {
  590. return nil, false, nil
  591. }
  592. if _, ok := pendPool[p.id]; ok {
  593. return nil, false, nil
  594. }
  595. // Calculate an upper limit on the items we might fetch (i.e. throttling)
  596. space := len(q.resultCache) - len(donePool)
  597. for _, request := range pendPool {
  598. space -= len(request.Headers)
  599. }
  600. // Retrieve a batch of tasks, skipping previously failed ones
  601. send := make([]*types.Header, 0, count)
  602. skip := make([]*types.Header, 0)
  603. progress := false
  604. for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
  605. header := taskQueue.PopItem().(*types.Header)
  606. // If we're the first to request this task, initialise the result container
  607. index := int(header.Number.Int64() - int64(q.resultOffset))
  608. if index >= len(q.resultCache) || index < 0 {
  609. common.Report("index allocation went beyond available resultCache space")
  610. return nil, false, errInvalidChain
  611. }
  612. if q.resultCache[index] == nil {
  613. components := 1
  614. if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
  615. components = 2
  616. }
  617. q.resultCache[index] = &fetchResult{
  618. Pending: components,
  619. Header: header,
  620. }
  621. }
  622. // If this fetch task is a noop, skip this fetch operation
  623. if isNoop(header) {
  624. donePool[header.Hash()] = struct{}{}
  625. delete(taskPool, header.Hash())
  626. space, proc = space-1, proc-1
  627. q.resultCache[index].Pending--
  628. progress = true
  629. continue
  630. }
  631. // Otherwise unless the peer is known not to have the data, add to the retrieve list
  632. if p.Lacks(header.Hash()) {
  633. skip = append(skip, header)
  634. } else {
  635. send = append(send, header)
  636. }
  637. }
  638. // Merge all the skipped headers back
  639. for _, header := range skip {
  640. taskQueue.Push(header, -float32(header.Number.Uint64()))
  641. }
  642. if progress {
  643. // Wake WaitResults, resultCache was modified
  644. q.active.Signal()
  645. }
  646. // Assemble and return the block download request
  647. if len(send) == 0 {
  648. return nil, progress, nil
  649. }
  650. request := &fetchRequest{
  651. Peer: p,
  652. Headers: send,
  653. Time: time.Now(),
  654. }
  655. pendPool[p.id] = request
  656. return request, progress, nil
  657. }
  658. // CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
  659. func (q *queue) CancelHeaders(request *fetchRequest) {
  660. q.cancel(request, q.headerTaskQueue, q.headerPendPool)
  661. }
  662. // CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
  663. func (q *queue) CancelBlocks(request *fetchRequest) {
  664. q.cancel(request, q.hashQueue, q.blockPendPool)
  665. }
  666. // CancelBodies aborts a body fetch request, returning all pending headers to the
  667. // task queue.
  668. func (q *queue) CancelBodies(request *fetchRequest) {
  669. q.cancel(request, q.blockTaskQueue, q.blockPendPool)
  670. }
  671. // CancelReceipts aborts a body fetch request, returning all pending headers to
  672. // the task queue.
  673. func (q *queue) CancelReceipts(request *fetchRequest) {
  674. q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
  675. }
  676. // CancelNodeData aborts a node state data fetch request, returning all pending
  677. // hashes to the task queue.
  678. func (q *queue) CancelNodeData(request *fetchRequest) {
  679. q.cancel(request, q.stateTaskQueue, q.statePendPool)
  680. }
  681. // Cancel aborts a fetch request, returning all pending hashes to the task queue.
  682. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
  683. q.lock.Lock()
  684. defer q.lock.Unlock()
  685. if request.From > 0 {
  686. taskQueue.Push(request.From, -float32(request.From))
  687. }
  688. for hash, index := range request.Hashes {
  689. taskQueue.Push(hash, float32(index))
  690. }
  691. for _, header := range request.Headers {
  692. taskQueue.Push(header, -float32(header.Number.Uint64()))
  693. }
  694. delete(pendPool, request.Peer.id)
  695. }
  696. // Revoke cancels all pending requests belonging to a given peer. This method is
  697. // meant to be called during a peer drop to quickly reassign owned data fetches
  698. // to remaining nodes.
  699. func (q *queue) Revoke(peerId string) {
  700. q.lock.Lock()
  701. defer q.lock.Unlock()
  702. if request, ok := q.blockPendPool[peerId]; ok {
  703. for hash, index := range request.Hashes {
  704. q.hashQueue.Push(hash, float32(index))
  705. }
  706. for _, header := range request.Headers {
  707. q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
  708. }
  709. delete(q.blockPendPool, peerId)
  710. }
  711. if request, ok := q.receiptPendPool[peerId]; ok {
  712. for _, header := range request.Headers {
  713. q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
  714. }
  715. delete(q.receiptPendPool, peerId)
  716. }
  717. if request, ok := q.statePendPool[peerId]; ok {
  718. for hash, index := range request.Hashes {
  719. q.stateTaskQueue.Push(hash, float32(index))
  720. }
  721. delete(q.statePendPool, peerId)
  722. }
  723. }
  724. // ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
  725. // canceling them and returning the responsible peers for penalisation.
  726. func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
  727. q.lock.Lock()
  728. defer q.lock.Unlock()
  729. return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
  730. }
  731. // ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
  732. // canceling them and returning the responsible peers for penalisation.
  733. func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
  734. q.lock.Lock()
  735. defer q.lock.Unlock()
  736. return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter)
  737. }
  738. // ExpireBodies checks for in flight block body requests that exceeded a timeout
  739. // allowance, canceling them and returning the responsible peers for penalisation.
  740. func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
  741. q.lock.Lock()
  742. defer q.lock.Unlock()
  743. return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
  744. }
  745. // ExpireReceipts checks for in flight receipt requests that exceeded a timeout
  746. // allowance, canceling them and returning the responsible peers for penalisation.
  747. func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
  748. q.lock.Lock()
  749. defer q.lock.Unlock()
  750. return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
  751. }
  752. // ExpireNodeData checks for in flight node data requests that exceeded a timeout
  753. // allowance, canceling them and returning the responsible peers for penalisation.
  754. func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
  755. q.lock.Lock()
  756. defer q.lock.Unlock()
  757. return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter)
  758. }
  759. // expire is the generic check that move expired tasks from a pending pool back
  760. // into a task pool, returning all entities caught with expired tasks.
  761. //
  762. // Note, this method expects the queue lock to be already held. The
  763. // reason the lock is not obtained in here is because the parameters already need
  764. // to access the queue, so they already need a lock anyway.
  765. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
  766. // Iterate over the expired requests and return each to the queue
  767. expiries := make(map[string]int)
  768. for id, request := range pendPool {
  769. if time.Since(request.Time) > timeout {
  770. // Update the metrics with the timeout
  771. timeoutMeter.Mark(1)
  772. // Return any non satisfied requests to the pool
  773. if request.From > 0 {
  774. taskQueue.Push(request.From, -float32(request.From))
  775. }
  776. for hash, index := range request.Hashes {
  777. taskQueue.Push(hash, float32(index))
  778. }
  779. for _, header := range request.Headers {
  780. taskQueue.Push(header, -float32(header.Number.Uint64()))
  781. }
  782. // Add the peer to the expiry report along the the number of failed requests
  783. expirations := len(request.Hashes)
  784. if expirations < len(request.Headers) {
  785. expirations = len(request.Headers)
  786. }
  787. expiries[id] = expirations
  788. }
  789. }
  790. // Remove the expired requests from the pending pool
  791. for id, _ := range expiries {
  792. delete(pendPool, id)
  793. }
  794. return expiries
  795. }
  796. // DeliverBlocks injects a block retrieval response into the download queue. The
  797. // method returns the number of blocks accepted from the delivery and also wakes
  798. // any threads waiting for data delivery.
  799. func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
  800. q.lock.Lock()
  801. defer q.lock.Unlock()
  802. // Short circuit if the blocks were never requested
  803. request := q.blockPendPool[id]
  804. if request == nil {
  805. return 0, errNoFetchesPending
  806. }
  807. blockReqTimer.UpdateSince(request.Time)
  808. delete(q.blockPendPool, id)
  809. // If no blocks were retrieved, mark them as unavailable for the origin peer
  810. if len(blocks) == 0 {
  811. for hash, _ := range request.Hashes {
  812. request.Peer.MarkLacking(hash)
  813. }
  814. }
  815. // Iterate over the downloaded blocks and add each of them
  816. accepted, errs := 0, make([]error, 0)
  817. for _, block := range blocks {
  818. // Skip any blocks that were not requested
  819. hash := block.Hash()
  820. if _, ok := request.Hashes[hash]; !ok {
  821. errs = append(errs, fmt.Errorf("non-requested block %x", hash))
  822. continue
  823. }
  824. // Reconstruct the next result if contents match up
  825. index := int(block.Number().Int64() - int64(q.resultOffset))
  826. if index >= len(q.resultCache) || index < 0 {
  827. errs = []error{errInvalidChain}
  828. break
  829. }
  830. q.resultCache[index] = &fetchResult{
  831. Header: block.Header(),
  832. Transactions: block.Transactions(),
  833. Uncles: block.Uncles(),
  834. }
  835. q.blockDonePool[block.Hash()] = struct{}{}
  836. delete(request.Hashes, hash)
  837. delete(q.hashPool, hash)
  838. accepted++
  839. }
  840. // Return all failed or missing fetches to the queue
  841. for hash, index := range request.Hashes {
  842. q.hashQueue.Push(hash, float32(index))
  843. }
  844. // Wake up WaitResults
  845. if accepted > 0 {
  846. q.active.Signal()
  847. }
  848. // If none of the blocks were good, it's a stale delivery
  849. switch {
  850. case len(errs) == 0:
  851. return accepted, nil
  852. case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
  853. return accepted, errs[0]
  854. case len(errs) == len(blocks):
  855. return accepted, errStaleDelivery
  856. default:
  857. return accepted, fmt.Errorf("multiple failures: %v", errs)
  858. }
  859. }
  860. // DeliverHeaders injects a header retrieval response into the header results
  861. // cache. This method either accepts all headers it received, or none of them
  862. // if they do not map correctly to the skeleton.
  863. //
  864. // If the headers are accepted, the method makes an attempt to deliver the set
  865. // of ready headers to the processor to keep the pipeline full. However it will
  866. // not block to prevent stalling other pending deliveries.
  867. func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
  868. q.lock.Lock()
  869. defer q.lock.Unlock()
  870. // Short circuit if the data was never requested
  871. request := q.headerPendPool[id]
  872. if request == nil {
  873. return 0, errNoFetchesPending
  874. }
  875. headerReqTimer.UpdateSince(request.Time)
  876. delete(q.headerPendPool, id)
  877. // Ensure headers can be mapped onto the skeleton chain
  878. target := q.headerTaskPool[request.From].Hash()
  879. accepted := len(headers) == MaxHeaderFetch
  880. if accepted {
  881. if headers[0].Number.Uint64() != request.From {
  882. glog.V(logger.Detail).Infof("Peer %s: first header #%v [%x] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)
  883. accepted = false
  884. } else if headers[len(headers)-1].Hash() != target {
  885. glog.V(logger.Detail).Infof("Peer %s: last header #%v [%x] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])
  886. accepted = false
  887. }
  888. }
  889. if accepted {
  890. for i, header := range headers[1:] {
  891. hash := header.Hash()
  892. if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
  893. glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ordering, expected %d", id, header.Number, hash[:4], want)
  894. accepted = false
  895. break
  896. }
  897. if headers[i].Hash() != header.ParentHash {
  898. glog.V(logger.Warn).Infof("Peer %s: header #%v [%x] broke chain ancestry", id, header.Number, hash[:4])
  899. accepted = false
  900. break
  901. }
  902. }
  903. }
  904. // If the batch of headers wasn't accepted, mark as unavailable
  905. if !accepted {
  906. glog.V(logger.Detail).Infof("Peer %s: skeleton filling from header #%d not accepted", id, request.From)
  907. miss := q.headerPeerMiss[id]
  908. if miss == nil {
  909. q.headerPeerMiss[id] = make(map[uint64]struct{})
  910. miss = q.headerPeerMiss[id]
  911. }
  912. miss[request.From] = struct{}{}
  913. q.headerTaskQueue.Push(request.From, -float32(request.From))
  914. return 0, errors.New("delivery not accepted")
  915. }
  916. // Clean up a successful fetch and try to deliver any sub-results
  917. copy(q.headerResults[request.From-q.headerOffset:], headers)
  918. delete(q.headerTaskPool, request.From)
  919. ready := 0
  920. for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
  921. ready += MaxHeaderFetch
  922. }
  923. if ready > 0 {
  924. // Headers are ready for delivery, gather them and push forward (non blocking)
  925. process := make([]*types.Header, ready)
  926. copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
  927. select {
  928. case headerProcCh <- process:
  929. glog.V(logger.Detail).Infof("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number)
  930. q.headerProced += len(process)
  931. default:
  932. }
  933. }
  934. // Check for termination and return
  935. if len(q.headerTaskPool) == 0 {
  936. q.headerContCh <- false
  937. }
  938. return len(headers), nil
  939. }
  940. // DeliverBodies injects a block body retrieval response into the results queue.
  941. // The method returns the number of blocks bodies accepted from the delivery and
  942. // also wakes any threads waiting for data delivery.
  943. func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
  944. q.lock.Lock()
  945. defer q.lock.Unlock()
  946. reconstruct := func(header *types.Header, index int, result *fetchResult) error {
  947. if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
  948. return errInvalidBody
  949. }
  950. result.Transactions = txLists[index]
  951. result.Uncles = uncleLists[index]
  952. return nil
  953. }
  954. return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
  955. }
  956. // DeliverReceipts injects a receipt retrieval response into the results queue.
  957. // The method returns the number of transaction receipts accepted from the delivery
  958. // and also wakes any threads waiting for data delivery.
  959. func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
  960. q.lock.Lock()
  961. defer q.lock.Unlock()
  962. reconstruct := func(header *types.Header, index int, result *fetchResult) error {
  963. if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
  964. return errInvalidReceipt
  965. }
  966. result.Receipts = receiptList[index]
  967. return nil
  968. }
  969. return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
  970. }
  971. // deliver injects a data retrieval response into the results queue.
  972. //
  973. // Note, this method expects the queue lock to be already held for writing. The
  974. // reason the lock is not obtained in here is because the parameters already need
  975. // to access the queue, so they already need a lock anyway.
  976. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
  977. pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
  978. results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
  979. // Short circuit if the data was never requested
  980. request := pendPool[id]
  981. if request == nil {
  982. return 0, errNoFetchesPending
  983. }
  984. reqTimer.UpdateSince(request.Time)
  985. delete(pendPool, id)
  986. // If no data items were retrieved, mark them as unavailable for the origin peer
  987. if results == 0 {
  988. for _, header := range request.Headers {
  989. request.Peer.MarkLacking(header.Hash())
  990. }
  991. }
  992. // Assemble each of the results with their headers and retrieved data parts
  993. var (
  994. accepted int
  995. failure error
  996. useful bool
  997. )
  998. for i, header := range request.Headers {
  999. // Short circuit assembly if no more fetch results are found
  1000. if i >= results {
  1001. break
  1002. }
  1003. // Reconstruct the next result if contents match up
  1004. index := int(header.Number.Int64() - int64(q.resultOffset))
  1005. if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
  1006. failure = errInvalidChain
  1007. break
  1008. }
  1009. if err := reconstruct(header, i, q.resultCache[index]); err != nil {
  1010. failure = err
  1011. break
  1012. }
  1013. donePool[header.Hash()] = struct{}{}
  1014. q.resultCache[index].Pending--
  1015. useful = true
  1016. accepted++
  1017. // Clean up a successful fetch
  1018. request.Headers[i] = nil
  1019. delete(taskPool, header.Hash())
  1020. }
  1021. // Return all failed or missing fetches to the queue
  1022. for _, header := range request.Headers {
  1023. if header != nil {
  1024. taskQueue.Push(header, -float32(header.Number.Uint64()))
  1025. }
  1026. }
  1027. // Wake up WaitResults
  1028. if accepted > 0 {
  1029. q.active.Signal()
  1030. }
  1031. // If none of the data was good, it's a stale delivery
  1032. switch {
  1033. case failure == nil || failure == errInvalidChain:
  1034. return accepted, failure
  1035. case useful:
  1036. return accepted, fmt.Errorf("partial failure: %v", failure)
  1037. default:
  1038. return accepted, errStaleDelivery
  1039. }
  1040. }
  1041. // DeliverNodeData injects a node state data retrieval response into the queue.
  1042. // The method returns the number of node state entries originally requested, and
  1043. // the number of them actually accepted from the delivery.
  1044. func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
  1045. q.lock.Lock()
  1046. defer q.lock.Unlock()
  1047. // Short circuit if the data was never requested
  1048. request := q.statePendPool[id]
  1049. if request == nil {
  1050. return 0, errNoFetchesPending
  1051. }
  1052. stateReqTimer.UpdateSince(request.Time)
  1053. delete(q.statePendPool, id)
  1054. // If no data was retrieved, mark their hashes as unavailable for the origin peer
  1055. if len(data) == 0 {
  1056. for hash, _ := range request.Hashes {
  1057. request.Peer.MarkLacking(hash)
  1058. }
  1059. }
  1060. // Iterate over the downloaded data and verify each of them
  1061. accepted, errs := 0, make([]error, 0)
  1062. process := []trie.SyncResult{}
  1063. for _, blob := range data {
  1064. // Skip any state trie entries that were not requested
  1065. hash := common.BytesToHash(crypto.Keccak256(blob))
  1066. if _, ok := request.Hashes[hash]; !ok {
  1067. errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
  1068. continue
  1069. }
  1070. // Inject the next state trie item into the processing queue
  1071. process = append(process, trie.SyncResult{Hash: hash, Data: blob})
  1072. accepted++
  1073. delete(request.Hashes, hash)
  1074. delete(q.stateTaskPool, hash)
  1075. }
  1076. // Start the asynchronous node state data injection
  1077. atomic.AddInt32(&q.stateProcessors, 1)
  1078. go func() {
  1079. defer atomic.AddInt32(&q.stateProcessors, -1)
  1080. q.deliverNodeData(process, callback)
  1081. }()
  1082. // Return all failed or missing fetches to the queue
  1083. for hash, index := range request.Hashes {
  1084. q.stateTaskQueue.Push(hash, float32(index))
  1085. }
  1086. // If none of the data items were good, it's a stale delivery
  1087. switch {
  1088. case len(errs) == 0:
  1089. return accepted, nil
  1090. case len(errs) == len(request.Hashes):
  1091. return accepted, errStaleDelivery
  1092. default:
  1093. return accepted, fmt.Errorf("multiple failures: %v", errs)
  1094. }
  1095. }
  1096. // deliverNodeData is the asynchronous node data processor that injects a batch
  1097. // of sync results into the state scheduler.
  1098. func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
  1099. // Wake up WaitResults after the state has been written because it
  1100. // might be waiting for the pivot block state to get completed.
  1101. defer q.active.Signal()
  1102. // Process results one by one to permit task fetches in between
  1103. for i, result := range results {
  1104. q.stateSchedLock.Lock()
  1105. if q.stateScheduler == nil {
  1106. // Syncing aborted since this async delivery started, bail out
  1107. q.stateSchedLock.Unlock()
  1108. callback(errNoFetchesPending, i)
  1109. return
  1110. }
  1111. if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
  1112. // Processing a state result failed, bail out
  1113. q.stateSchedLock.Unlock()
  1114. callback(err, i)
  1115. return
  1116. }
  1117. // Item processing succeeded, release the lock (temporarily)
  1118. q.stateSchedLock.Unlock()
  1119. }
  1120. callback(nil, len(results))
  1121. }
  1122. // Prepare configures the result cache to allow accepting and caching inbound
  1123. // fetch results.
  1124. func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
  1125. q.lock.Lock()
  1126. defer q.lock.Unlock()
  1127. // Prepare the queue for sync results
  1128. if q.resultOffset < offset {
  1129. q.resultOffset = offset
  1130. }
  1131. q.fastSyncPivot = pivot
  1132. q.mode = mode
  1133. // If long running fast sync, also start up a head stateretrieval immediately
  1134. if mode == FastSync && pivot > 0 {
  1135. q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
  1136. }
  1137. }