downloader.go 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package downloader contains the manual full chain synchronisation.
  17. package downloader
  18. import (
  19. "crypto/rand"
  20. "errors"
  21. "fmt"
  22. "math"
  23. "math/big"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/logger"
  33. "github.com/ethereum/go-ethereum/logger/glog"
  34. "github.com/rcrowley/go-metrics"
  35. )
  36. var (
  37. MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
  38. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  39. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  40. MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
  41. MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
  42. MaxStateFetch = 384 // Amount of node state values to allow fetching per request
  43. hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
  44. blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
  45. blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
  46. headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
  47. bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
  48. bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
  49. receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
  50. receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
  51. stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
  52. stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
  53. maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
  54. maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
  55. maxResultsProcess = 256 // Number of download results to import at once into the chain
  56. fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
  57. fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
  58. fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
  59. fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
  60. fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
  61. )
  62. var (
  63. errBusy = errors.New("busy")
  64. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  65. errBadPeer = errors.New("action from bad peer ignored")
  66. errStallingPeer = errors.New("peer is stalling")
  67. errNoPeers = errors.New("no peers to keep download active")
  68. errTimeout = errors.New("timeout")
  69. errEmptyHashSet = errors.New("empty hash set by peer")
  70. errEmptyHeaderSet = errors.New("empty header set by peer")
  71. errPeersUnavailable = errors.New("no peers available or all tried for download")
  72. errAlreadyInPool = errors.New("hash already in pool")
  73. errInvalidChain = errors.New("retrieved hash chain is invalid")
  74. errInvalidBlock = errors.New("retrieved block is invalid")
  75. errInvalidBody = errors.New("retrieved block body is invalid")
  76. errInvalidReceipt = errors.New("retrieved receipt is invalid")
  77. errCancelHashFetch = errors.New("hash download canceled (requested)")
  78. errCancelBlockFetch = errors.New("block download canceled (requested)")
  79. errCancelHeaderFetch = errors.New("block header download canceled (requested)")
  80. errCancelBodyFetch = errors.New("block body download canceled (requested)")
  81. errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
  82. errCancelStateFetch = errors.New("state data download canceled (requested)")
  83. errCancelProcessing = errors.New("processing canceled (requested)")
  84. errNoSyncActive = errors.New("no sync active")
  85. )
  86. type Downloader struct {
  87. mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
  88. noFast bool // Flag to disable fast syncing in case of a security error
  89. mux *event.TypeMux // Event multiplexer to announce sync operation events
  90. queue *queue // Scheduler for selecting the hashes to download
  91. peers *peerSet // Set of active peers from which download can proceed
  92. interrupt int32 // Atomic boolean to signal termination
  93. // Statistics
  94. syncStatsChainOrigin uint64 // Origin block number where syncing started at
  95. syncStatsChainHeight uint64 // Highest block number known when syncing started
  96. syncStatsStateTotal uint64 // Total number of node state entries known so far
  97. syncStatsStateDone uint64 // Number of state trie entries already pulled
  98. syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
  99. // Callbacks
  100. hasHeader headerCheckFn // Checks if a header is present in the chain
  101. hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
  102. getHeader headerRetrievalFn // Retrieves a header from the chain
  103. getBlock blockRetrievalFn // Retrieves a block from the chain
  104. headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
  105. headBlock headBlockRetrievalFn // Retrieves the head block from the chain
  106. headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
  107. commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
  108. getTd tdRetrievalFn // Retrieves the TD of a block from the chain
  109. insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
  110. insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
  111. insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
  112. rollback chainRollbackFn // Removes a batch of recently added chain links
  113. dropPeer peerDropFn // Drops a peer for misbehaving
  114. // Status
  115. synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  116. synchronising int32
  117. notified int32
  118. // Channels
  119. newPeerCh chan *peer
  120. hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
  121. blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
  122. headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
  123. bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
  124. receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
  125. stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
  126. blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
  127. bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
  128. receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
  129. stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
  130. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  131. cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
  132. // Testing hooks
  133. syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
  134. bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
  135. receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
  136. chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
  137. }
  138. // New creates a new downloader to fetch hashes and blocks from remote peers.
  139. func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
  140. getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
  141. headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
  142. insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
  143. return &Downloader{
  144. mode: FullSync,
  145. mux: mux,
  146. queue: newQueue(stateDb),
  147. peers: newPeerSet(),
  148. hasHeader: hasHeader,
  149. hasBlockAndState: hasBlockAndState,
  150. getHeader: getHeader,
  151. getBlock: getBlock,
  152. headHeader: headHeader,
  153. headBlock: headBlock,
  154. headFastBlock: headFastBlock,
  155. commitHeadBlock: commitHeadBlock,
  156. getTd: getTd,
  157. insertHeaders: insertHeaders,
  158. insertBlocks: insertBlocks,
  159. insertReceipts: insertReceipts,
  160. rollback: rollback,
  161. dropPeer: dropPeer,
  162. newPeerCh: make(chan *peer, 1),
  163. hashCh: make(chan dataPack, 1),
  164. blockCh: make(chan dataPack, 1),
  165. headerCh: make(chan dataPack, 1),
  166. bodyCh: make(chan dataPack, 1),
  167. receiptCh: make(chan dataPack, 1),
  168. stateCh: make(chan dataPack, 1),
  169. blockWakeCh: make(chan bool, 1),
  170. bodyWakeCh: make(chan bool, 1),
  171. receiptWakeCh: make(chan bool, 1),
  172. stateWakeCh: make(chan bool, 1),
  173. }
  174. }
  175. // Progress retrieves the synchronisation boundaries, specifically the origin
  176. // block where synchronisation started at (may have failed/suspended); the block
  177. // or header sync is currently at; and the latest known block which the sync targets.
  178. //
  179. // In addition, during the state download phase of fast synchronisation the number
  180. // of processed and the total number of known states are also returned. Otherwise
  181. // these are zero.
  182. func (d *Downloader) Progress() (uint64, uint64, uint64, uint64, uint64) {
  183. // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
  184. pendingStates := uint64(d.queue.PendingNodeData())
  185. // Lock the current stats and return the progress
  186. d.syncStatsLock.RLock()
  187. defer d.syncStatsLock.RUnlock()
  188. current := uint64(0)
  189. switch d.mode {
  190. case FullSync:
  191. current = d.headBlock().NumberU64()
  192. case FastSync:
  193. current = d.headFastBlock().NumberU64()
  194. case LightSync:
  195. current = d.headHeader().Number.Uint64()
  196. }
  197. return d.syncStatsChainOrigin, current, d.syncStatsChainHeight, d.syncStatsStateDone, d.syncStatsStateDone + pendingStates
  198. }
  199. // Synchronising returns whether the downloader is currently retrieving blocks.
  200. func (d *Downloader) Synchronising() bool {
  201. return atomic.LoadInt32(&d.synchronising) > 0
  202. }
  203. // RegisterPeer injects a new download peer into the set of block source to be
  204. // used for fetching hashes and blocks from.
  205. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
  206. getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
  207. getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
  208. getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
  209. glog.V(logger.Detail).Infoln("Registering peer", id)
  210. if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
  211. glog.V(logger.Error).Infoln("Register failed:", err)
  212. return err
  213. }
  214. return nil
  215. }
  216. // UnregisterPeer remove a peer from the known list, preventing any action from
  217. // the specified peer. An effort is also made to return any pending fetches into
  218. // the queue.
  219. func (d *Downloader) UnregisterPeer(id string) error {
  220. glog.V(logger.Detail).Infoln("Unregistering peer", id)
  221. if err := d.peers.Unregister(id); err != nil {
  222. glog.V(logger.Error).Infoln("Unregister failed:", err)
  223. return err
  224. }
  225. d.queue.Revoke(id)
  226. return nil
  227. }
  228. // Synchronise tries to sync up our local block chain with a remote peer, both
  229. // adding various sanity checks as well as wrapping it with various log entries.
  230. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
  231. glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
  232. err := d.synchronise(id, head, td, mode)
  233. switch err {
  234. case nil:
  235. glog.V(logger.Detail).Infof("Synchronisation completed")
  236. case errBusy:
  237. glog.V(logger.Detail).Infof("Synchronisation already in progress")
  238. case errTimeout, errBadPeer, errStallingPeer, errEmptyHashSet, errEmptyHeaderSet, errPeersUnavailable, errInvalidChain:
  239. glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
  240. d.dropPeer(id)
  241. default:
  242. glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
  243. }
  244. return err
  245. }
  246. // synchronise will select the peer and use it for synchronising. If an empty string is given
  247. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  248. // checks fail an error will be returned. This method is synchronous
  249. func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  250. // Mock out the synchronisation if testing
  251. if d.synchroniseMock != nil {
  252. return d.synchroniseMock(id, hash)
  253. }
  254. // Make sure only one goroutine is ever allowed past this point at once
  255. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  256. return errBusy
  257. }
  258. defer atomic.StoreInt32(&d.synchronising, 0)
  259. // Post a user notification of the sync (only once per session)
  260. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  261. glog.V(logger.Info).Infoln("Block synchronisation started")
  262. }
  263. // Reset the queue, peer set and wake channels to clean any internal leftover state
  264. d.queue.Reset()
  265. d.peers.Reset()
  266. for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  267. select {
  268. case <-ch:
  269. default:
  270. }
  271. }
  272. // Reset any ephemeral sync statistics
  273. d.syncStatsLock.Lock()
  274. d.syncStatsStateTotal = 0
  275. d.syncStatsStateDone = 0
  276. d.syncStatsLock.Unlock()
  277. // Create cancel channel for aborting mid-flight
  278. d.cancelLock.Lock()
  279. d.cancelCh = make(chan struct{})
  280. d.cancelLock.Unlock()
  281. // Set the requested sync mode, unless it's forbidden
  282. d.mode = mode
  283. if d.mode == FastSync && d.noFast {
  284. d.mode = FullSync
  285. }
  286. // Retrieve the origin peer and initiate the downloading process
  287. p := d.peers.Peer(id)
  288. if p == nil {
  289. return errUnknownPeer
  290. }
  291. return d.syncWithPeer(p, hash, td)
  292. }
  293. // syncWithPeer starts a block synchronization based on the hash chain from the
  294. // specified peer and head hash.
  295. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
  296. d.mux.Post(StartEvent{})
  297. defer func() {
  298. // reset on error
  299. if err != nil {
  300. d.mux.Post(FailedEvent{err})
  301. } else {
  302. d.mux.Post(DoneEvent{})
  303. }
  304. }()
  305. glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
  306. defer func(start time.Time) {
  307. glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
  308. }(time.Now())
  309. switch {
  310. case p.version == 61:
  311. // Look up the sync boundaries: the common ancestor and the target block
  312. latest, err := d.fetchHeight61(p)
  313. if err != nil {
  314. return err
  315. }
  316. origin, err := d.findAncestor61(p)
  317. if err != nil {
  318. return err
  319. }
  320. d.syncStatsLock.Lock()
  321. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  322. d.syncStatsChainOrigin = origin
  323. }
  324. d.syncStatsChainHeight = latest
  325. d.syncStatsLock.Unlock()
  326. // Initiate the sync using a concurrent hash and block retrieval algorithm
  327. d.queue.Prepare(origin+1, d.mode, 0)
  328. if d.syncInitHook != nil {
  329. d.syncInitHook(origin, latest)
  330. }
  331. return d.spawnSync(
  332. func() error { return d.fetchHashes61(p, td, origin+1) },
  333. func() error { return d.fetchBlocks61(origin + 1) },
  334. )
  335. case p.version >= 62:
  336. // Look up the sync boundaries: the common ancestor and the target block
  337. latest, err := d.fetchHeight(p)
  338. if err != nil {
  339. return err
  340. }
  341. origin, err := d.findAncestor(p)
  342. if err != nil {
  343. return err
  344. }
  345. d.syncStatsLock.Lock()
  346. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  347. d.syncStatsChainOrigin = origin
  348. }
  349. d.syncStatsChainHeight = latest
  350. d.syncStatsLock.Unlock()
  351. // Initiate the sync using a concurrent header and content retrieval algorithm
  352. pivot := uint64(0)
  353. switch d.mode {
  354. case LightSync:
  355. pivot = latest
  356. case FastSync:
  357. // Calculate the new fast/slow sync pivot point
  358. pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
  359. if err != nil {
  360. panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
  361. }
  362. if latest > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
  363. pivot = latest - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
  364. }
  365. // If the point is below the origin, move origin back to ensure state download
  366. if pivot < origin {
  367. if pivot > 0 {
  368. origin = pivot - 1
  369. } else {
  370. origin = 0
  371. }
  372. }
  373. glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
  374. }
  375. d.queue.Prepare(origin+1, d.mode, pivot)
  376. if d.syncInitHook != nil {
  377. d.syncInitHook(origin, latest)
  378. }
  379. return d.spawnSync(
  380. func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
  381. func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
  382. func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
  383. func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
  384. )
  385. default:
  386. // Something very wrong, stop right here
  387. glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
  388. return errBadPeer
  389. }
  390. }
  391. // spawnSync runs d.process and all given fetcher functions to completion in
  392. // separate goroutines, returning the first error that appears.
  393. func (d *Downloader) spawnSync(fetchers ...func() error) error {
  394. var wg sync.WaitGroup
  395. errc := make(chan error, len(fetchers)+1)
  396. wg.Add(len(fetchers) + 1)
  397. go func() { defer wg.Done(); errc <- d.process() }()
  398. for _, fn := range fetchers {
  399. fn := fn
  400. go func() { defer wg.Done(); errc <- fn() }()
  401. }
  402. // Wait for the first error, then terminate the others.
  403. var err error
  404. for i := 0; i < len(fetchers)+1; i++ {
  405. if i == len(fetchers) {
  406. // Close the queue when all fetchers have exited.
  407. // This will cause the block processor to end when
  408. // it has processed the queue.
  409. d.queue.Close()
  410. }
  411. if err = <-errc; err != nil {
  412. break
  413. }
  414. }
  415. d.queue.Close()
  416. d.cancel()
  417. wg.Wait()
  418. return err
  419. }
  420. // cancel cancels all of the operations and resets the queue. It returns true
  421. // if the cancel operation was completed.
  422. func (d *Downloader) cancel() {
  423. // Close the current cancel channel
  424. d.cancelLock.Lock()
  425. if d.cancelCh != nil {
  426. select {
  427. case <-d.cancelCh:
  428. // Channel was already closed
  429. default:
  430. close(d.cancelCh)
  431. }
  432. }
  433. d.cancelLock.Unlock()
  434. }
  435. // Terminate interrupts the downloader, canceling all pending operations.
  436. // The downloader cannot be reused after calling Terminate.
  437. func (d *Downloader) Terminate() {
  438. atomic.StoreInt32(&d.interrupt, 1)
  439. d.cancel()
  440. }
  441. // fetchHeight61 retrieves the head block of the remote peer to aid in estimating
  442. // the total time a pending synchronisation would take.
  443. func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
  444. glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
  445. // Request the advertised remote head block and wait for the response
  446. go p.getBlocks([]common.Hash{p.head})
  447. timeout := time.After(hashTTL)
  448. for {
  449. select {
  450. case <-d.cancelCh:
  451. return 0, errCancelBlockFetch
  452. case packet := <-d.blockCh:
  453. // Discard anything not from the origin peer
  454. if packet.PeerId() != p.id {
  455. glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId())
  456. break
  457. }
  458. // Make sure the peer actually gave something valid
  459. blocks := packet.(*blockPack).blocks
  460. if len(blocks) != 1 {
  461. glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
  462. return 0, errBadPeer
  463. }
  464. return blocks[0].NumberU64(), nil
  465. case <-timeout:
  466. glog.V(logger.Debug).Infof("%v: head block timeout", p)
  467. return 0, errTimeout
  468. case <-d.hashCh:
  469. // Out of bounds hashes received, ignore them
  470. case <-d.headerCh:
  471. case <-d.bodyCh:
  472. case <-d.stateCh:
  473. case <-d.receiptCh:
  474. // Ignore eth/{62,63} packets because this is eth/61.
  475. // These can arrive as a late delivery from a previous sync.
  476. }
  477. }
  478. }
  479. // findAncestor61 tries to locate the common ancestor block of the local chain and
  480. // a remote peers blockchain. In the general case when our node was in sync and
  481. // on the correct chain, checking the top N blocks should already get us a match.
  482. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  483. // the head blocks match), we do a binary search to find the common ancestor.
  484. func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
  485. glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)
  486. // Request out head blocks to short circuit ancestor location
  487. head := d.headBlock().NumberU64()
  488. from := int64(head) - int64(MaxHashFetch) + 1
  489. if from < 0 {
  490. from = 0
  491. }
  492. go p.getAbsHashes(uint64(from), MaxHashFetch)
  493. // Wait for the remote response to the head fetch
  494. number, hash := uint64(0), common.Hash{}
  495. timeout := time.After(hashTTL)
  496. for finished := false; !finished; {
  497. select {
  498. case <-d.cancelCh:
  499. return 0, errCancelHashFetch
  500. case packet := <-d.hashCh:
  501. // Discard anything not from the origin peer
  502. if packet.PeerId() != p.id {
  503. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
  504. break
  505. }
  506. // Make sure the peer actually gave something valid
  507. hashes := packet.(*hashPack).hashes
  508. if len(hashes) == 0 {
  509. glog.V(logger.Debug).Infof("%v: empty head hash set", p)
  510. return 0, errEmptyHashSet
  511. }
  512. // Check if a common ancestor was found
  513. finished = true
  514. for i := len(hashes) - 1; i >= 0; i-- {
  515. // Skip any headers that underflow/overflow our requested set
  516. header := d.getHeader(hashes[i])
  517. if header == nil || header.Number.Int64() < from || header.Number.Uint64() > head {
  518. continue
  519. }
  520. // Otherwise check if we already know the header or not
  521. if d.hasBlockAndState(hashes[i]) {
  522. number, hash = header.Number.Uint64(), header.Hash()
  523. break
  524. }
  525. }
  526. case <-timeout:
  527. glog.V(logger.Debug).Infof("%v: head hash timeout", p)
  528. return 0, errTimeout
  529. case <-d.blockCh:
  530. // Out of bounds blocks received, ignore them
  531. case <-d.headerCh:
  532. case <-d.bodyCh:
  533. case <-d.stateCh:
  534. case <-d.receiptCh:
  535. // Ignore eth/{62,63} packets because this is eth/61.
  536. // These can arrive as a late delivery from a previous sync.
  537. }
  538. }
  539. // If the head fetch already found an ancestor, return
  540. if !common.EmptyHash(hash) {
  541. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
  542. return number, nil
  543. }
  544. // Ancestor not found, we need to binary search over our chain
  545. start, end := uint64(0), head
  546. for start+1 < end {
  547. // Split our chain interval in two, and request the hash to cross check
  548. check := (start + end) / 2
  549. timeout := time.After(hashTTL)
  550. go p.getAbsHashes(uint64(check), 1)
  551. // Wait until a reply arrives to this request
  552. for arrived := false; !arrived; {
  553. select {
  554. case <-d.cancelCh:
  555. return 0, errCancelHashFetch
  556. case packet := <-d.hashCh:
  557. // Discard anything not from the origin peer
  558. if packet.PeerId() != p.id {
  559. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
  560. break
  561. }
  562. // Make sure the peer actually gave something valid
  563. hashes := packet.(*hashPack).hashes
  564. if len(hashes) != 1 {
  565. glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
  566. return 0, errBadPeer
  567. }
  568. arrived = true
  569. // Modify the search interval based on the response
  570. if !d.hasBlockAndState(hashes[0]) {
  571. end = check
  572. break
  573. }
  574. block := d.getBlock(hashes[0]) // this doesn't check state, hence the above explicit check
  575. if block.NumberU64() != check {
  576. glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
  577. return 0, errBadPeer
  578. }
  579. start = check
  580. case <-timeout:
  581. glog.V(logger.Debug).Infof("%v: search hash timeout", p)
  582. return 0, errTimeout
  583. case <-d.blockCh:
  584. // Out of bounds blocks received, ignore them
  585. case <-d.headerCh:
  586. case <-d.bodyCh:
  587. case <-d.stateCh:
  588. case <-d.receiptCh:
  589. // Ignore eth/{62,63} packets because this is eth/61.
  590. // These can arrive as a late delivery from a previous sync.
  591. }
  592. }
  593. }
  594. return start, nil
  595. }
  596. // fetchHashes61 keeps retrieving hashes from the requested number, until no more
  597. // are returned, potentially throttling on the way.
  598. func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
  599. glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)
  600. // Create a timeout timer, and the associated hash fetcher
  601. request := time.Now() // time of the last fetch request
  602. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  603. <-timeout.C // timeout channel should be initially empty
  604. defer timeout.Stop()
  605. getHashes := func(from uint64) {
  606. glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)
  607. go p.getAbsHashes(from, MaxHashFetch)
  608. request = time.Now()
  609. timeout.Reset(hashTTL)
  610. }
  611. // Start pulling hashes, until all are exhausted
  612. getHashes(from)
  613. gotHashes := false
  614. for {
  615. select {
  616. case <-d.cancelCh:
  617. return errCancelHashFetch
  618. case packet := <-d.hashCh:
  619. // Make sure the active peer is giving us the hashes
  620. if packet.PeerId() != p.id {
  621. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
  622. break
  623. }
  624. hashReqTimer.UpdateSince(request)
  625. timeout.Stop()
  626. // If no more hashes are inbound, notify the block fetcher and return
  627. if packet.Items() == 0 {
  628. glog.V(logger.Debug).Infof("%v: no available hashes", p)
  629. select {
  630. case d.blockWakeCh <- false:
  631. case <-d.cancelCh:
  632. }
  633. // If no hashes were retrieved at all, the peer violated it's TD promise that it had a
  634. // better chain compared to ours. The only exception is if it's promised blocks were
  635. // already imported by other means (e.g. fetcher):
  636. //
  637. // R <remote peer>, L <local node>: Both at block 10
  638. // R: Mine block 11, and propagate it to L
  639. // L: Queue block 11 for import
  640. // L: Notice that R's head and TD increased compared to ours, start sync
  641. // L: Import of block 11 finishes
  642. // L: Sync begins, and finds common ancestor at 11
  643. // L: Request new hashes up from 11 (R's TD was higher, it must have something)
  644. // R: Nothing to give
  645. if !gotHashes && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  646. return errStallingPeer
  647. }
  648. return nil
  649. }
  650. gotHashes = true
  651. hashes := packet.(*hashPack).hashes
  652. // Otherwise insert all the new hashes, aborting in case of junk
  653. glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from)
  654. inserts := d.queue.Schedule61(hashes, true)
  655. if len(inserts) != len(hashes) {
  656. glog.V(logger.Debug).Infof("%v: stale hashes", p)
  657. return errBadPeer
  658. }
  659. // Notify the block fetcher of new hashes, but stop if queue is full
  660. if d.queue.PendingBlocks() < maxQueuedHashes {
  661. // We still have hashes to fetch, send continuation wake signal (potential)
  662. select {
  663. case d.blockWakeCh <- true:
  664. default:
  665. }
  666. } else {
  667. // Hash limit reached, send a termination wake signal (enforced)
  668. select {
  669. case d.blockWakeCh <- false:
  670. case <-d.cancelCh:
  671. }
  672. return nil
  673. }
  674. // Queue not yet full, fetch the next batch
  675. from += uint64(len(hashes))
  676. getHashes(from)
  677. case <-timeout.C:
  678. glog.V(logger.Debug).Infof("%v: hash request timed out", p)
  679. hashTimeoutMeter.Mark(1)
  680. return errTimeout
  681. case <-d.headerCh:
  682. case <-d.bodyCh:
  683. case <-d.stateCh:
  684. case <-d.receiptCh:
  685. // Ignore eth/{62,63} packets because this is eth/61.
  686. // These can arrive as a late delivery from a previous sync.
  687. }
  688. }
  689. }
  690. // fetchBlocks61 iteratively downloads the scheduled hashes, taking any available
  691. // peers, reserving a chunk of blocks for each, waiting for delivery and also
  692. // periodically checking for timeouts.
  693. func (d *Downloader) fetchBlocks61(from uint64) error {
  694. glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
  695. defer glog.V(logger.Debug).Infof("Block download terminated")
  696. // Create a timeout timer for scheduling expiration tasks
  697. ticker := time.NewTicker(100 * time.Millisecond)
  698. defer ticker.Stop()
  699. update := make(chan struct{}, 1)
  700. // Fetch blocks until the hash fetcher's done
  701. finished := false
  702. for {
  703. select {
  704. case <-d.cancelCh:
  705. return errCancelBlockFetch
  706. case packet := <-d.blockCh:
  707. // If the peer was previously banned and failed to deliver it's pack
  708. // in a reasonable time frame, ignore it's message.
  709. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  710. blocks := packet.(*blockPack).blocks
  711. // Deliver the received chunk of blocks and check chain validity
  712. accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
  713. if err == errInvalidChain {
  714. return err
  715. }
  716. // Unless a peer delivered something completely else than requested (usually
  717. // caused by a timed out request which came through in the end), set it to
  718. // idle. If the delivery's stale, the peer should have already been idled.
  719. if err != errStaleDelivery {
  720. peer.SetBlocksIdle(accepted)
  721. }
  722. // Issue a log to the user to see what's going on
  723. switch {
  724. case err == nil && len(blocks) == 0:
  725. glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
  726. case err == nil:
  727. glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
  728. default:
  729. glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
  730. }
  731. }
  732. // Blocks arrived, try to update the progress
  733. select {
  734. case update <- struct{}{}:
  735. default:
  736. }
  737. case cont := <-d.blockWakeCh:
  738. // The hash fetcher sent a continuation flag, check if it's done
  739. if !cont {
  740. finished = true
  741. }
  742. // Hashes arrive, try to update the progress
  743. select {
  744. case update <- struct{}{}:
  745. default:
  746. }
  747. case <-ticker.C:
  748. // Sanity check update the progress
  749. select {
  750. case update <- struct{}{}:
  751. default:
  752. }
  753. case <-update:
  754. // Short circuit if we lost all our peers
  755. if d.peers.Len() == 0 {
  756. return errNoPeers
  757. }
  758. // Check for block request timeouts and demote the responsible peers
  759. for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
  760. if peer := d.peers.Peer(pid); peer != nil {
  761. if fails > 1 {
  762. glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
  763. peer.SetBlocksIdle(0)
  764. } else {
  765. glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
  766. d.dropPeer(pid)
  767. }
  768. }
  769. }
  770. // If there's nothing more to fetch, wait or terminate
  771. if d.queue.PendingBlocks() == 0 {
  772. if !d.queue.InFlightBlocks() && finished {
  773. glog.V(logger.Debug).Infof("Block fetching completed")
  774. return nil
  775. }
  776. break
  777. }
  778. // Send a download request to all idle peers, until throttled
  779. throttled := false
  780. idles, total := d.peers.BlockIdlePeers()
  781. for _, peer := range idles {
  782. // Short circuit if throttling activated
  783. if d.queue.ShouldThrottleBlocks() {
  784. throttled = true
  785. break
  786. }
  787. // Reserve a chunk of hashes for a peer. A nil can mean either that
  788. // no more hashes are available, or that the peer is known not to
  789. // have them.
  790. request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
  791. if request == nil {
  792. continue
  793. }
  794. if glog.V(logger.Detail) {
  795. glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
  796. }
  797. // Fetch the chunk and make sure any errors return the hashes to the queue
  798. if err := peer.Fetch61(request); err != nil {
  799. // Although we could try and make an attempt to fix this, this error really
  800. // means that we've double allocated a fetch task to a peer. If that is the
  801. // case, the internal state of the downloader and the queue is very wrong so
  802. // better hard crash and note the error instead of silently accumulating into
  803. // a much bigger issue.
  804. panic(fmt.Sprintf("%v: fetch assignment failed", peer))
  805. }
  806. }
  807. // Make sure that we have peers available for fetching. If all peers have been tried
  808. // and all failed throw an error
  809. if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
  810. return errPeersUnavailable
  811. }
  812. case <-d.headerCh:
  813. case <-d.bodyCh:
  814. case <-d.stateCh:
  815. case <-d.receiptCh:
  816. // Ignore eth/{62,63} packets because this is eth/61.
  817. // These can arrive as a late delivery from a previous sync.
  818. }
  819. }
  820. }
  821. // fetchHeight retrieves the head header of the remote peer to aid in estimating
  822. // the total time a pending synchronisation would take.
  823. func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
  824. glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
  825. // Request the advertised remote head block and wait for the response
  826. go p.getRelHeaders(p.head, 1, 0, false)
  827. timeout := time.After(headerTTL)
  828. for {
  829. select {
  830. case <-d.cancelCh:
  831. return 0, errCancelBlockFetch
  832. case packet := <-d.headerCh:
  833. // Discard anything not from the origin peer
  834. if packet.PeerId() != p.id {
  835. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  836. break
  837. }
  838. // Make sure the peer actually gave something valid
  839. headers := packet.(*headerPack).headers
  840. if len(headers) != 1 {
  841. glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
  842. return 0, errBadPeer
  843. }
  844. return headers[0].Number.Uint64(), nil
  845. case <-timeout:
  846. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  847. return 0, errTimeout
  848. case <-d.bodyCh:
  849. case <-d.stateCh:
  850. case <-d.receiptCh:
  851. // Out of bounds delivery, ignore
  852. case <-d.hashCh:
  853. case <-d.blockCh:
  854. // Ignore eth/61 packets because this is eth/62+.
  855. // These can arrive as a late delivery from a previous sync.
  856. }
  857. }
  858. }
  859. // findAncestor tries to locate the common ancestor link of the local chain and
  860. // a remote peers blockchain. In the general case when our node was in sync and
  861. // on the correct chain, checking the top N links should already get us a match.
  862. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  863. // the head links match), we do a binary search to find the common ancestor.
  864. func (d *Downloader) findAncestor(p *peer) (uint64, error) {
  865. glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)
  866. // Request our head headers to short circuit ancestor location
  867. head := d.headHeader().Number.Uint64()
  868. if d.mode == FullSync {
  869. head = d.headBlock().NumberU64()
  870. } else if d.mode == FastSync {
  871. head = d.headFastBlock().NumberU64()
  872. }
  873. from := int64(head) - int64(MaxHeaderFetch) + 1
  874. if from < 0 {
  875. from = 0
  876. }
  877. go p.getAbsHeaders(uint64(from), MaxHeaderFetch, 0, false)
  878. // Wait for the remote response to the head fetch
  879. number, hash := uint64(0), common.Hash{}
  880. timeout := time.After(hashTTL)
  881. for finished := false; !finished; {
  882. select {
  883. case <-d.cancelCh:
  884. return 0, errCancelHashFetch
  885. case packet := <-d.headerCh:
  886. // Discard anything not from the origin peer
  887. if packet.PeerId() != p.id {
  888. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  889. break
  890. }
  891. // Make sure the peer actually gave something valid
  892. headers := packet.(*headerPack).headers
  893. if len(headers) == 0 {
  894. glog.V(logger.Warn).Infof("%v: empty head header set", p)
  895. return 0, errEmptyHeaderSet
  896. }
  897. // Make sure the peer's reply conforms to the request
  898. for i := 0; i < len(headers); i++ {
  899. if number := headers[i].Number.Int64(); number != from+int64(i) {
  900. glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i), number)
  901. return 0, errInvalidChain
  902. }
  903. if i > 0 && headers[i-1].Hash() != headers[i].ParentHash {
  904. glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ancestry: expected [%x], got [%x]", p, i, headers[i-1].Hash().Bytes()[:4], headers[i].ParentHash[:4])
  905. return 0, errInvalidChain
  906. }
  907. }
  908. // Check if a common ancestor was found
  909. finished = true
  910. for i := len(headers) - 1; i >= 0; i-- {
  911. // Skip any headers that underflow/overflow our requested set
  912. if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > head {
  913. continue
  914. }
  915. // Otherwise check if we already know the header or not
  916. if (d.mode != LightSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) {
  917. number, hash = headers[i].Number.Uint64(), headers[i].Hash()
  918. break
  919. }
  920. }
  921. case <-timeout:
  922. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  923. return 0, errTimeout
  924. case <-d.bodyCh:
  925. case <-d.stateCh:
  926. case <-d.receiptCh:
  927. // Out of bounds delivery, ignore
  928. case <-d.hashCh:
  929. case <-d.blockCh:
  930. // Ignore eth/61 packets because this is eth/62+.
  931. // These can arrive as a late delivery from a previous sync.
  932. }
  933. }
  934. // If the head fetch already found an ancestor, return
  935. if !common.EmptyHash(hash) {
  936. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
  937. return number, nil
  938. }
  939. // Ancestor not found, we need to binary search over our chain
  940. start, end := uint64(0), head
  941. for start+1 < end {
  942. // Split our chain interval in two, and request the hash to cross check
  943. check := (start + end) / 2
  944. timeout := time.After(hashTTL)
  945. go p.getAbsHeaders(uint64(check), 1, 0, false)
  946. // Wait until a reply arrives to this request
  947. for arrived := false; !arrived; {
  948. select {
  949. case <-d.cancelCh:
  950. return 0, errCancelHashFetch
  951. case packer := <-d.headerCh:
  952. // Discard anything not from the origin peer
  953. if packer.PeerId() != p.id {
  954. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
  955. break
  956. }
  957. // Make sure the peer actually gave something valid
  958. headers := packer.(*headerPack).headers
  959. if len(headers) != 1 {
  960. glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
  961. return 0, errBadPeer
  962. }
  963. arrived = true
  964. // Modify the search interval based on the response
  965. if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
  966. end = check
  967. break
  968. }
  969. header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
  970. if header.Number.Uint64() != check {
  971. glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
  972. return 0, errBadPeer
  973. }
  974. start = check
  975. case <-timeout:
  976. glog.V(logger.Debug).Infof("%v: search header timeout", p)
  977. return 0, errTimeout
  978. case <-d.bodyCh:
  979. case <-d.stateCh:
  980. case <-d.receiptCh:
  981. // Out of bounds delivery, ignore
  982. case <-d.hashCh:
  983. case <-d.blockCh:
  984. // Ignore eth/61 packets because this is eth/62+.
  985. // These can arrive as a late delivery from a previous sync.
  986. }
  987. }
  988. }
  989. return start, nil
  990. }
  991. // fetchHeaders keeps retrieving headers from the requested number, until no more
  992. // are returned, potentially throttling on the way.
  993. //
  994. // The queue parameter can be used to switch between queuing headers for block
  995. // body download too, or directly import as pure header chains.
  996. func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
  997. glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from)
  998. defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
  999. // Calculate the pivoting point for switching from fast to slow sync
  1000. pivot := d.queue.FastSyncPivot()
  1001. // Keep a count of uncertain headers to roll back
  1002. rollback := []*types.Header{}
  1003. defer func() {
  1004. if len(rollback) > 0 {
  1005. // Flatten the headers and roll them back
  1006. hashes := make([]common.Hash, len(rollback))
  1007. for i, header := range rollback {
  1008. hashes[i] = header.Hash()
  1009. }
  1010. lh, lfb, lb := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
  1011. d.rollback(hashes)
  1012. glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
  1013. len(hashes), lh, d.headHeader().Number, lfb, d.headFastBlock().Number(), lb, d.headBlock().Number())
  1014. // If we're already past the pivot point, this could be an attack, disable fast sync
  1015. if rollback[len(rollback)-1].Number.Uint64() > pivot {
  1016. d.noFast = true
  1017. }
  1018. }
  1019. }()
  1020. // Create a timeout timer, and the associated hash fetcher
  1021. request := time.Now() // time of the last fetch request
  1022. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  1023. <-timeout.C // timeout channel should be initially empty
  1024. defer timeout.Stop()
  1025. getHeaders := func(from uint64) {
  1026. glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from)
  1027. go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
  1028. request = time.Now()
  1029. timeout.Reset(headerTTL)
  1030. }
  1031. // Start pulling headers, until all are exhausted
  1032. getHeaders(from)
  1033. gotHeaders := false
  1034. for {
  1035. select {
  1036. case <-d.cancelCh:
  1037. return errCancelHeaderFetch
  1038. case packet := <-d.headerCh:
  1039. // Make sure the active peer is giving us the headers
  1040. if packet.PeerId() != p.id {
  1041. glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
  1042. break
  1043. }
  1044. headerReqTimer.UpdateSince(request)
  1045. timeout.Stop()
  1046. // If no more headers are inbound, notify the content fetchers and return
  1047. if packet.Items() == 0 {
  1048. glog.V(logger.Debug).Infof("%v: no available headers", p)
  1049. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1050. select {
  1051. case ch <- false:
  1052. case <-d.cancelCh:
  1053. }
  1054. }
  1055. // If no headers were retrieved at all, the peer violated it's TD promise that it had a
  1056. // better chain compared to ours. The only exception is if it's promised blocks were
  1057. // already imported by other means (e.g. fetcher):
  1058. //
  1059. // R <remote peer>, L <local node>: Both at block 10
  1060. // R: Mine block 11, and propagate it to L
  1061. // L: Queue block 11 for import
  1062. // L: Notice that R's head and TD increased compared to ours, start sync
  1063. // L: Import of block 11 finishes
  1064. // L: Sync begins, and finds common ancestor at 11
  1065. // L: Request new headers up from 11 (R's TD was higher, it must have something)
  1066. // R: Nothing to give
  1067. if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  1068. return errStallingPeer
  1069. }
  1070. // If fast or light syncing, ensure promised headers are indeed delivered. This is
  1071. // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
  1072. // of delivering the post-pivot blocks that would flag the invalid content.
  1073. //
  1074. // This check cannot be executed "as is" for full imports, since blocks may still be
  1075. // queued for processing when the header download completes. However, as long as the
  1076. // peer gave us something useful, we're already happy/progressed (above check).
  1077. if d.mode == FastSync || d.mode == LightSync {
  1078. if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
  1079. return errStallingPeer
  1080. }
  1081. }
  1082. rollback = nil
  1083. return nil
  1084. }
  1085. gotHeaders = true
  1086. headers := packet.(*headerPack).headers
  1087. // Otherwise insert all the new headers, aborting in case of junk
  1088. glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
  1089. if d.mode == FastSync || d.mode == LightSync {
  1090. // Collect the yet unknown headers to mark them as uncertain
  1091. unknown := make([]*types.Header, 0, len(headers))
  1092. for _, header := range headers {
  1093. if !d.hasHeader(header.Hash()) {
  1094. unknown = append(unknown, header)
  1095. }
  1096. }
  1097. // If we're importing pure headers, verify based on their recentness
  1098. frequency := fsHeaderCheckFrequency
  1099. if headers[len(headers)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
  1100. frequency = 1
  1101. }
  1102. if n, err := d.insertHeaders(headers, frequency); err != nil {
  1103. // If some headers were inserted, add them too to the rollback list
  1104. if n > 0 {
  1105. rollback = append(rollback, headers[:n]...)
  1106. }
  1107. glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
  1108. return errInvalidChain
  1109. }
  1110. // All verifications passed, store newly found uncertain headers
  1111. rollback = append(rollback, unknown...)
  1112. if len(rollback) > fsHeaderSafetyNet {
  1113. rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
  1114. }
  1115. }
  1116. if d.mode == FullSync || d.mode == FastSync {
  1117. inserts := d.queue.Schedule(headers, from)
  1118. if len(inserts) != len(headers) {
  1119. glog.V(logger.Debug).Infof("%v: stale headers", p)
  1120. return errBadPeer
  1121. }
  1122. }
  1123. // Notify the content fetchers of new headers, but stop if queue is full
  1124. cont := d.queue.PendingBlocks() < maxQueuedHeaders && d.queue.PendingReceipts() < maxQueuedHeaders
  1125. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1126. if cont {
  1127. // We still have headers to fetch, send continuation wake signal (potential)
  1128. select {
  1129. case ch <- true:
  1130. default:
  1131. }
  1132. } else {
  1133. // Header limit reached, send a termination wake signal (enforced)
  1134. select {
  1135. case ch <- false:
  1136. case <-d.cancelCh:
  1137. }
  1138. }
  1139. }
  1140. if !cont {
  1141. return nil
  1142. }
  1143. // Queue not yet full, fetch the next batch
  1144. from += uint64(len(headers))
  1145. getHeaders(from)
  1146. case <-timeout.C:
  1147. // Header retrieval timed out, consider the peer bad and drop
  1148. glog.V(logger.Debug).Infof("%v: header request timed out", p)
  1149. headerTimeoutMeter.Mark(1)
  1150. d.dropPeer(p.id)
  1151. // Finish the sync gracefully instead of dumping the gathered data though
  1152. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1153. select {
  1154. case ch <- false:
  1155. case <-d.cancelCh:
  1156. }
  1157. }
  1158. return nil
  1159. case <-d.hashCh:
  1160. case <-d.blockCh:
  1161. // Ignore eth/61 packets because this is eth/62+.
  1162. // These can arrive as a late delivery from a previous sync.
  1163. }
  1164. }
  1165. }
  1166. // fetchBodies iteratively downloads the scheduled block bodies, taking any
  1167. // available peers, reserving a chunk of blocks for each, waiting for delivery
  1168. // and also periodically checking for timeouts.
  1169. func (d *Downloader) fetchBodies(from uint64) error {
  1170. glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
  1171. var (
  1172. deliver = func(packet dataPack) (int, error) {
  1173. pack := packet.(*bodyPack)
  1174. return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
  1175. }
  1176. expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
  1177. fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
  1178. capacity = func(p *peer) int { return p.BlockCapacity() }
  1179. setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
  1180. )
  1181. err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
  1182. d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
  1183. d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body")
  1184. glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
  1185. return err
  1186. }
  1187. // fetchReceipts iteratively downloads the scheduled block receipts, taking any
  1188. // available peers, reserving a chunk of receipts for each, waiting for delivery
  1189. // and also periodically checking for timeouts.
  1190. func (d *Downloader) fetchReceipts(from uint64) error {
  1191. glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
  1192. var (
  1193. deliver = func(packet dataPack) (int, error) {
  1194. pack := packet.(*receiptPack)
  1195. return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
  1196. }
  1197. expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
  1198. fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
  1199. capacity = func(p *peer) int { return p.ReceiptCapacity() }
  1200. setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
  1201. )
  1202. err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
  1203. d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
  1204. d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
  1205. glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
  1206. return err
  1207. }
  1208. // fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
  1209. // available peers, reserving a chunk of nodes for each, waiting for delivery and
  1210. // also periodically checking for timeouts.
  1211. func (d *Downloader) fetchNodeData() error {
  1212. glog.V(logger.Debug).Infof("Downloading node state data")
  1213. var (
  1214. deliver = func(packet dataPack) (int, error) {
  1215. start := time.Now()
  1216. return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
  1217. if err != nil {
  1218. // If the node data processing failed, the root hash is very wrong, abort
  1219. glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
  1220. d.cancel()
  1221. return
  1222. }
  1223. // Processing succeeded, notify state fetcher of continuation
  1224. if d.queue.PendingNodeData() > 0 {
  1225. select {
  1226. case d.stateWakeCh <- true:
  1227. default:
  1228. }
  1229. }
  1230. // Log a message to the user and return
  1231. d.syncStatsLock.Lock()
  1232. defer d.syncStatsLock.Unlock()
  1233. d.syncStatsStateDone += uint64(delivered)
  1234. glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
  1235. })
  1236. }
  1237. expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
  1238. throttle = func() bool { return false }
  1239. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  1240. return d.queue.ReserveNodeData(p, count), false, nil
  1241. }
  1242. fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
  1243. capacity = func(p *peer) int { return p.NodeDataCapacity() }
  1244. setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
  1245. )
  1246. err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
  1247. d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
  1248. d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State")
  1249. glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
  1250. return err
  1251. }
  1252. // fetchParts iteratively downloads scheduled block parts, taking any available
  1253. // peers, reserving a chunk of fetch requests for each, waiting for delivery and
  1254. // also periodically checking for timeouts.
  1255. func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
  1256. expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
  1257. fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
  1258. idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
  1259. // Create a ticker to detect expired retrieval tasks
  1260. ticker := time.NewTicker(100 * time.Millisecond)
  1261. defer ticker.Stop()
  1262. update := make(chan struct{}, 1)
  1263. // Prepare the queue and fetch block parts until the block header fetcher's done
  1264. finished := false
  1265. for {
  1266. select {
  1267. case <-d.cancelCh:
  1268. return errCancel
  1269. case packet := <-deliveryCh:
  1270. // If the peer was previously banned and failed to deliver it's pack
  1271. // in a reasonable time frame, ignore it's message.
  1272. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  1273. // Deliver the received chunk of data and check chain validity
  1274. accepted, err := deliver(packet)
  1275. if err == errInvalidChain {
  1276. return err
  1277. }
  1278. // Unless a peer delivered something completely else than requested (usually
  1279. // caused by a timed out request which came through in the end), set it to
  1280. // idle. If the delivery's stale, the peer should have already been idled.
  1281. if err != errStaleDelivery {
  1282. setIdle(peer, accepted)
  1283. }
  1284. // Issue a log to the user to see what's going on
  1285. switch {
  1286. case err == nil && packet.Items() == 0:
  1287. glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
  1288. case err == nil:
  1289. glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
  1290. default:
  1291. glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
  1292. }
  1293. }
  1294. // Blocks assembled, try to update the progress
  1295. select {
  1296. case update <- struct{}{}:
  1297. default:
  1298. }
  1299. case cont := <-wakeCh:
  1300. // The header fetcher sent a continuation flag, check if it's done
  1301. if !cont {
  1302. finished = true
  1303. }
  1304. // Headers arrive, try to update the progress
  1305. select {
  1306. case update <- struct{}{}:
  1307. default:
  1308. }
  1309. case <-ticker.C:
  1310. // Sanity check update the progress
  1311. select {
  1312. case update <- struct{}{}:
  1313. default:
  1314. }
  1315. case <-update:
  1316. // Short circuit if we lost all our peers
  1317. if d.peers.Len() == 0 {
  1318. return errNoPeers
  1319. }
  1320. // Check for fetch request timeouts and demote the responsible peers
  1321. for pid, fails := range expire() {
  1322. if peer := d.peers.Peer(pid); peer != nil {
  1323. if fails > 1 {
  1324. glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
  1325. setIdle(peer, 0)
  1326. } else {
  1327. glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
  1328. d.dropPeer(pid)
  1329. }
  1330. }
  1331. }
  1332. // If there's nothing more to fetch, wait or terminate
  1333. if pending() == 0 {
  1334. if !inFlight() && finished {
  1335. glog.V(logger.Debug).Infof("%s fetching completed", kind)
  1336. return nil
  1337. }
  1338. break
  1339. }
  1340. // Send a download request to all idle peers, until throttled
  1341. progressed, throttled, running := false, false, inFlight()
  1342. idles, total := idle()
  1343. for _, peer := range idles {
  1344. // Short circuit if throttling activated
  1345. if throttle() {
  1346. throttled = true
  1347. break
  1348. }
  1349. // Reserve a chunk of fetches for a peer. A nil can mean either that
  1350. // no more headers are available, or that the peer is known not to
  1351. // have them.
  1352. request, progress, err := reserve(peer, capacity(peer))
  1353. if err != nil {
  1354. return err
  1355. }
  1356. if progress {
  1357. progressed = true
  1358. }
  1359. if request == nil {
  1360. continue
  1361. }
  1362. if glog.V(logger.Detail) {
  1363. if len(request.Headers) > 0 {
  1364. glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
  1365. } else {
  1366. glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
  1367. }
  1368. }
  1369. // Fetch the chunk and make sure any errors return the hashes to the queue
  1370. if fetchHook != nil {
  1371. fetchHook(request.Headers)
  1372. }
  1373. if err := fetch(peer, request); err != nil {
  1374. // Although we could try and make an attempt to fix this, this error really
  1375. // means that we've double allocated a fetch task to a peer. If that is the
  1376. // case, the internal state of the downloader and the queue is very wrong so
  1377. // better hard crash and note the error instead of silently accumulating into
  1378. // a much bigger issue.
  1379. panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind)))
  1380. }
  1381. running = true
  1382. }
  1383. // Make sure that we have peers available for fetching. If all peers have been tried
  1384. // and all failed throw an error
  1385. if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
  1386. return errPeersUnavailable
  1387. }
  1388. case <-d.hashCh:
  1389. case <-d.blockCh:
  1390. // Ignore eth/61 packets because this is eth/62+.
  1391. // These can arrive as a late delivery from a previous sync.
  1392. }
  1393. }
  1394. }
  1395. // process takes fetch results from the queue and tries to import them into the
  1396. // chain. The type of import operation will depend on the result contents.
  1397. func (d *Downloader) process() error {
  1398. pivot := d.queue.FastSyncPivot()
  1399. for {
  1400. results := d.queue.WaitResults()
  1401. if len(results) == 0 {
  1402. return nil // queue empty
  1403. }
  1404. if d.chainInsertHook != nil {
  1405. d.chainInsertHook(results)
  1406. }
  1407. // Actually import the blocks
  1408. if glog.V(logger.Debug) {
  1409. first, last := results[0].Header, results[len(results)-1].Header
  1410. glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
  1411. }
  1412. for len(results) != 0 {
  1413. // Check for any termination requests
  1414. if atomic.LoadInt32(&d.interrupt) == 1 {
  1415. return errCancelProcessing
  1416. }
  1417. // Retrieve the a batch of results to import
  1418. var (
  1419. blocks = make([]*types.Block, 0, maxResultsProcess)
  1420. receipts = make([]types.Receipts, 0, maxResultsProcess)
  1421. )
  1422. items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
  1423. for _, result := range results[:items] {
  1424. switch {
  1425. case d.mode == FullSync:
  1426. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1427. case d.mode == FastSync:
  1428. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1429. if result.Header.Number.Uint64() <= pivot {
  1430. receipts = append(receipts, result.Receipts)
  1431. }
  1432. }
  1433. }
  1434. // Try to process the results, aborting if there's an error
  1435. var (
  1436. err error
  1437. index int
  1438. )
  1439. switch {
  1440. case len(receipts) > 0:
  1441. index, err = d.insertReceipts(blocks, receipts)
  1442. if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
  1443. glog.V(logger.Debug).Infof("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4])
  1444. index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
  1445. }
  1446. default:
  1447. index, err = d.insertBlocks(blocks)
  1448. }
  1449. if err != nil {
  1450. glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
  1451. return err
  1452. }
  1453. // Shift the results to the next batch
  1454. results = results[items:]
  1455. }
  1456. }
  1457. }
  1458. // DeliverHashes injects a new batch of hashes received from a remote node into
  1459. // the download schedule. This is usually invoked through the BlockHashesMsg by
  1460. // the protocol handler.
  1461. func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) {
  1462. return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter)
  1463. }
  1464. // DeliverBlocks injects a new batch of blocks received from a remote node.
  1465. // This is usually invoked through the BlocksMsg by the protocol handler.
  1466. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) {
  1467. return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter)
  1468. }
  1469. // DeliverHeaders injects a new batch of block headers received from a remote
  1470. // node into the download schedule.
  1471. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
  1472. return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
  1473. }
  1474. // DeliverBodies injects a new batch of block bodies received from a remote node.
  1475. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
  1476. return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
  1477. }
  1478. // DeliverReceipts injects a new batch of receipts received from a remote node.
  1479. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
  1480. return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
  1481. }
  1482. // DeliverNodeData injects a new batch of node state data received from a remote node.
  1483. func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
  1484. return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
  1485. }
  1486. // deliver injects a new batch of data received from a remote node.
  1487. func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  1488. // Update the delivery metrics for both good and failed deliveries
  1489. inMeter.Mark(int64(packet.Items()))
  1490. defer func() {
  1491. if err != nil {
  1492. dropMeter.Mark(int64(packet.Items()))
  1493. }
  1494. }()
  1495. // Deliver or abort if the sync is canceled while queuing
  1496. d.cancelLock.RLock()
  1497. cancel := d.cancelCh
  1498. d.cancelLock.RUnlock()
  1499. if cancel == nil {
  1500. return errNoSyncActive
  1501. }
  1502. select {
  1503. case destCh <- packet:
  1504. return nil
  1505. case <-cancel:
  1506. return errNoSyncActive
  1507. }
  1508. }