downloader.go 71 KB


  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package downloader contains the manual full chain synchronisation.
  17. package downloader
  18. import (
  19. "crypto/rand"
  20. "errors"
  21. "fmt"
  22. "math"
  23. "math/big"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/logger"
  33. "github.com/ethereum/go-ethereum/logger/glog"
  34. "github.com/ethereum/go-ethereum/params"
  35. "github.com/ethereum/go-ethereum/trie"
  36. "github.com/rcrowley/go-metrics"
  37. )
  38. var (
  39. MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
  40. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  41. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  42. MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
  43. MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
  44. MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
  45. MaxStateFetch = 384 // Amount of node state values to allow fetching per request
  46. MaxForkAncestry = 3 * params.EpochDuration.Uint64() // Maximum chain reorganisation
  47. hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
  48. blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
  49. blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
  50. headerTargetRTT = time.Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
  51. headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
  52. bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
  53. bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
  54. receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
  55. receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
  56. stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
  57. stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
  58. maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
  59. maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
  60. maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
  61. maxResultsProcess = 2048 // Number of content download results to import at once into the chain
  62. fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
  63. fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
  64. fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
  65. fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
  66. fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
  67. )
  68. var (
  69. errBusy = errors.New("busy")
  70. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  71. errBadPeer = errors.New("action from bad peer ignored")
  72. errStallingPeer = errors.New("peer is stalling")
  73. errNoPeers = errors.New("no peers to keep download active")
  74. errTimeout = errors.New("timeout")
  75. errEmptyHashSet = errors.New("empty hash set by peer")
  76. errEmptyHeaderSet = errors.New("empty header set by peer")
  77. errPeersUnavailable = errors.New("no peers available or all tried for download")
  78. errAlreadyInPool = errors.New("hash already in pool")
  79. errInvalidAncestor = errors.New("retrieved ancestor is invalid")
  80. errInvalidChain = errors.New("retrieved hash chain is invalid")
  81. errInvalidBlock = errors.New("retrieved block is invalid")
  82. errInvalidBody = errors.New("retrieved block body is invalid")
  83. errInvalidReceipt = errors.New("retrieved receipt is invalid")
  84. errCancelHashFetch = errors.New("hash download canceled (requested)")
  85. errCancelBlockFetch = errors.New("block download canceled (requested)")
  86. errCancelHeaderFetch = errors.New("block header download canceled (requested)")
  87. errCancelBodyFetch = errors.New("block body download canceled (requested)")
  88. errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
  89. errCancelStateFetch = errors.New("state data download canceled (requested)")
  90. errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
  91. errCancelContentProcessing = errors.New("content processing canceled (requested)")
  92. errNoSyncActive = errors.New("no sync active")
  93. )
  94. type Downloader struct {
  95. mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
  96. noFast bool // Flag to disable fast syncing in case of a security error
  97. mux *event.TypeMux // Event multiplexer to announce sync operation events
  98. queue *queue // Scheduler for selecting the hashes to download
  99. peers *peerSet // Set of active peers from which download can proceed
  100. interrupt int32 // Atomic boolean to signal termination
  101. // Statistics
  102. syncStatsChainOrigin uint64 // Origin block number where syncing started at
  103. syncStatsChainHeight uint64 // Highest block number known when syncing started
  104. syncStatsStateDone uint64 // Number of state trie entries already pulled
  105. syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
  106. // Callbacks
  107. hasHeader headerCheckFn // Checks if a header is present in the chain
  108. hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
  109. getHeader headerRetrievalFn // Retrieves a header from the chain
  110. getBlock blockRetrievalFn // Retrieves a block from the chain
  111. headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
  112. headBlock headBlockRetrievalFn // Retrieves the head block from the chain
  113. headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
  114. commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
  115. getTd tdRetrievalFn // Retrieves the TD of a block from the chain
  116. insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
  117. insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
  118. insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
  119. rollback chainRollbackFn // Removes a batch of recently added chain links
  120. dropPeer peerDropFn // Drops a peer for misbehaving
  121. // Status
  122. synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  123. synchronising int32
  124. notified int32
  125. // Channels
  126. newPeerCh chan *peer
  127. hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
  128. blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
  129. headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
  130. bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
  131. receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
  132. stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
  133. blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
  134. bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
  135. receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
  136. stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
  137. headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
  138. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  139. cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
  140. // Testing hooks
  141. syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
  142. bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
  143. receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
  144. chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
  145. }
  146. // New creates a new downloader to fetch hashes and blocks from remote peers.
  147. func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
  148. getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
  149. headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
  150. insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
  151. return &Downloader{
  152. mode: FullSync,
  153. mux: mux,
  154. queue: newQueue(stateDb),
  155. peers: newPeerSet(),
  156. hasHeader: hasHeader,
  157. hasBlockAndState: hasBlockAndState,
  158. getHeader: getHeader,
  159. getBlock: getBlock,
  160. headHeader: headHeader,
  161. headBlock: headBlock,
  162. headFastBlock: headFastBlock,
  163. commitHeadBlock: commitHeadBlock,
  164. getTd: getTd,
  165. insertHeaders: insertHeaders,
  166. insertBlocks: insertBlocks,
  167. insertReceipts: insertReceipts,
  168. rollback: rollback,
  169. dropPeer: dropPeer,
  170. newPeerCh: make(chan *peer, 1),
  171. hashCh: make(chan dataPack, 1),
  172. blockCh: make(chan dataPack, 1),
  173. headerCh: make(chan dataPack, 1),
  174. bodyCh: make(chan dataPack, 1),
  175. receiptCh: make(chan dataPack, 1),
  176. stateCh: make(chan dataPack, 1),
  177. blockWakeCh: make(chan bool, 1),
  178. bodyWakeCh: make(chan bool, 1),
  179. receiptWakeCh: make(chan bool, 1),
  180. stateWakeCh: make(chan bool, 1),
  181. headerProcCh: make(chan []*types.Header, 1),
  182. }
  183. }
  184. // Progress retrieves the synchronisation boundaries, specifically the origin
  185. // block where synchronisation started at (may have failed/suspended); the block
  186. // or header sync is currently at; and the latest known block which the sync targets.
  187. //
  188. // In addition, during the state download phase of fast synchronisation the number
  189. // of processed and the total number of known states are also returned. Otherwise
  190. // these are zero.
  191. func (d *Downloader) Progress() (uint64, uint64, uint64, uint64, uint64) {
  192. // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
  193. pendingStates := uint64(d.queue.PendingNodeData())
  194. // Lock the current stats and return the progress
  195. d.syncStatsLock.RLock()
  196. defer d.syncStatsLock.RUnlock()
  197. current := uint64(0)
  198. switch d.mode {
  199. case FullSync:
  200. current = d.headBlock().NumberU64()
  201. case FastSync:
  202. current = d.headFastBlock().NumberU64()
  203. case LightSync:
  204. current = d.headHeader().Number.Uint64()
  205. }
  206. return d.syncStatsChainOrigin, current, d.syncStatsChainHeight, d.syncStatsStateDone, d.syncStatsStateDone + pendingStates
  207. }
  208. // Synchronising returns whether the downloader is currently retrieving blocks.
  209. func (d *Downloader) Synchronising() bool {
  210. return atomic.LoadInt32(&d.synchronising) > 0
  211. }
  212. // RegisterPeer injects a new download peer into the set of block source to be
  213. // used for fetching hashes and blocks from.
  214. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
  215. getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
  216. getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
  217. getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
  218. glog.V(logger.Detail).Infoln("Registering peer", id)
  219. if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
  220. glog.V(logger.Error).Infoln("Register failed:", err)
  221. return err
  222. }
  223. return nil
  224. }
  225. // UnregisterPeer remove a peer from the known list, preventing any action from
  226. // the specified peer. An effort is also made to return any pending fetches into
  227. // the queue.
  228. func (d *Downloader) UnregisterPeer(id string) error {
  229. glog.V(logger.Detail).Infoln("Unregistering peer", id)
  230. if err := d.peers.Unregister(id); err != nil {
  231. glog.V(logger.Error).Infoln("Unregister failed:", err)
  232. return err
  233. }
  234. d.queue.Revoke(id)
  235. return nil
  236. }
  237. // Synchronise tries to sync up our local block chain with a remote peer, both
  238. // adding various sanity checks as well as wrapping it with various log entries.
  239. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
  240. glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
  241. err := d.synchronise(id, head, td, mode)
  242. switch err {
  243. case nil:
  244. glog.V(logger.Detail).Infof("Synchronisation completed")
  245. case errBusy:
  246. glog.V(logger.Detail).Infof("Synchronisation already in progress")
  247. case errTimeout, errBadPeer, errStallingPeer, errEmptyHashSet, errEmptyHeaderSet, errPeersUnavailable, errInvalidAncestor, errInvalidChain:
  248. glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
  249. d.dropPeer(id)
  250. default:
  251. glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
  252. }
  253. return err
  254. }
  255. // synchronise will select the peer and use it for synchronising. If an empty string is given
  256. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  257. // checks fail an error will be returned. This method is synchronous
  258. func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  259. // Mock out the synchronisation if testing
  260. if d.synchroniseMock != nil {
  261. return d.synchroniseMock(id, hash)
  262. }
  263. // Make sure only one goroutine is ever allowed past this point at once
  264. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  265. return errBusy
  266. }
  267. defer atomic.StoreInt32(&d.synchronising, 0)
  268. // Post a user notification of the sync (only once per session)
  269. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  270. glog.V(logger.Info).Infoln("Block synchronisation started")
  271. }
  272. // Reset the queue, peer set and wake channels to clean any internal leftover state
  273. d.queue.Reset()
  274. d.peers.Reset()
  275. for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  276. select {
  277. case <-ch:
  278. default:
  279. }
  280. }
  281. for empty := false; !empty; {
  282. select {
  283. case <-d.headerProcCh:
  284. default:
  285. empty = true
  286. }
  287. }
  288. // Create cancel channel for aborting mid-flight
  289. d.cancelLock.Lock()
  290. d.cancelCh = make(chan struct{})
  291. d.cancelLock.Unlock()
  292. // Set the requested sync mode, unless it's forbidden
  293. d.mode = mode
  294. if d.mode == FastSync && d.noFast {
  295. d.mode = FullSync
  296. }
  297. // Retrieve the origin peer and initiate the downloading process
  298. p := d.peers.Peer(id)
  299. if p == nil {
  300. return errUnknownPeer
  301. }
  302. return d.syncWithPeer(p, hash, td)
  303. }
  304. // syncWithPeer starts a block synchronization based on the hash chain from the
  305. // specified peer and head hash.
  306. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
  307. d.mux.Post(StartEvent{})
  308. defer func() {
  309. // reset on error
  310. if err != nil {
  311. d.mux.Post(FailedEvent{err})
  312. } else {
  313. d.mux.Post(DoneEvent{})
  314. }
  315. }()
  316. glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
  317. defer func(start time.Time) {
  318. glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
  319. }(time.Now())
  320. switch {
  321. case p.version == 61:
  322. // Look up the sync boundaries: the common ancestor and the target block
  323. latest, err := d.fetchHeight61(p)
  324. if err != nil {
  325. return err
  326. }
  327. origin, err := d.findAncestor61(p, latest)
  328. if err != nil {
  329. return err
  330. }
  331. d.syncStatsLock.Lock()
  332. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  333. d.syncStatsChainOrigin = origin
  334. }
  335. d.syncStatsChainHeight = latest
  336. d.syncStatsLock.Unlock()
  337. // Initiate the sync using a concurrent hash and block retrieval algorithm
  338. d.queue.Prepare(origin+1, d.mode, 0, nil)
  339. if d.syncInitHook != nil {
  340. d.syncInitHook(origin, latest)
  341. }
  342. return d.spawnSync(origin+1,
  343. func() error { return d.fetchHashes61(p, td, origin+1) },
  344. func() error { return d.fetchBlocks61(origin + 1) },
  345. )
  346. case p.version >= 62:
  347. // Look up the sync boundaries: the common ancestor and the target block
  348. latest, err := d.fetchHeight(p)
  349. if err != nil {
  350. return err
  351. }
  352. height := latest.Number.Uint64()
  353. origin, err := d.findAncestor(p, height)
  354. if err != nil {
  355. return err
  356. }
  357. d.syncStatsLock.Lock()
  358. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  359. d.syncStatsChainOrigin = origin
  360. }
  361. d.syncStatsChainHeight = height
  362. d.syncStatsLock.Unlock()
  363. // Initiate the sync using a concurrent header and content retrieval algorithm
  364. pivot := uint64(0)
  365. switch d.mode {
  366. case LightSync:
  367. pivot = height
  368. case FastSync:
  369. // Calculate the new fast/slow sync pivot point
  370. pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
  371. if err != nil {
  372. panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
  373. }
  374. if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
  375. pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
  376. }
  377. // If the point is below the origin, move origin back to ensure state download
  378. if pivot < origin {
  379. if pivot > 0 {
  380. origin = pivot - 1
  381. } else {
  382. origin = 0
  383. }
  384. }
  385. glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
  386. }
  387. d.queue.Prepare(origin+1, d.mode, pivot, latest)
  388. if d.syncInitHook != nil {
  389. d.syncInitHook(origin, height)
  390. }
  391. return d.spawnSync(origin+1,
  392. func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
  393. func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
  394. func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
  395. func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
  396. func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
  397. )
  398. default:
  399. // Something very wrong, stop right here
  400. glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
  401. return errBadPeer
  402. }
  403. }
  404. // spawnSync runs d.process and all given fetcher functions to completion in
  405. // separate goroutines, returning the first error that appears.
  406. func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
  407. var wg sync.WaitGroup
  408. errc := make(chan error, len(fetchers)+1)
  409. wg.Add(len(fetchers) + 1)
  410. go func() { defer wg.Done(); errc <- d.processContent() }()
  411. for _, fn := range fetchers {
  412. fn := fn
  413. go func() { defer wg.Done(); errc <- fn() }()
  414. }
  415. // Wait for the first error, then terminate the others.
  416. var err error
  417. for i := 0; i < len(fetchers)+1; i++ {
  418. if i == len(fetchers) {
  419. // Close the queue when all fetchers have exited.
  420. // This will cause the block processor to end when
  421. // it has processed the queue.
  422. d.queue.Close()
  423. }
  424. if err = <-errc; err != nil {
  425. break
  426. }
  427. }
  428. d.queue.Close()
  429. d.cancel()
  430. wg.Wait()
  431. return err
  432. }
  433. // cancel cancels all of the operations and resets the queue. It returns true
  434. // if the cancel operation was completed.
  435. func (d *Downloader) cancel() {
  436. // Close the current cancel channel
  437. d.cancelLock.Lock()
  438. if d.cancelCh != nil {
  439. select {
  440. case <-d.cancelCh:
  441. // Channel was already closed
  442. default:
  443. close(d.cancelCh)
  444. }
  445. }
  446. d.cancelLock.Unlock()
  447. }
  448. // Terminate interrupts the downloader, canceling all pending operations.
  449. // The downloader cannot be reused after calling Terminate.
  450. func (d *Downloader) Terminate() {
  451. atomic.StoreInt32(&d.interrupt, 1)
  452. d.cancel()
  453. }
  454. // fetchHeight61 retrieves the head block of the remote peer to aid in estimating
  455. // the total time a pending synchronisation would take.
  456. func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
  457. glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
  458. // Request the advertised remote head block and wait for the response
  459. go p.getBlocks([]common.Hash{p.head})
  460. timeout := time.After(hashTTL)
  461. for {
  462. select {
  463. case <-d.cancelCh:
  464. return 0, errCancelBlockFetch
  465. case packet := <-d.blockCh:
  466. // Discard anything not from the origin peer
  467. if packet.PeerId() != p.id {
  468. glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId())
  469. break
  470. }
  471. // Make sure the peer actually gave something valid
  472. blocks := packet.(*blockPack).blocks
  473. if len(blocks) != 1 {
  474. glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
  475. return 0, errBadPeer
  476. }
  477. return blocks[0].NumberU64(), nil
  478. case <-timeout:
  479. glog.V(logger.Debug).Infof("%v: head block timeout", p)
  480. return 0, errTimeout
  481. case <-d.hashCh:
  482. // Out of bounds hashes received, ignore them
  483. case <-d.headerCh:
  484. case <-d.bodyCh:
  485. case <-d.stateCh:
  486. case <-d.receiptCh:
  487. // Ignore eth/{62,63} packets because this is eth/61.
  488. // These can arrive as a late delivery from a previous sync.
  489. }
  490. }
  491. }
  492. // findAncestor61 tries to locate the common ancestor block of the local chain and
  493. // a remote peers blockchain. In the general case when our node was in sync and
  494. // on the correct chain, checking the top N blocks should already get us a match.
  495. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  496. // the head blocks match), we do a binary search to find the common ancestor.
  497. func (d *Downloader) findAncestor61(p *peer, height uint64) (uint64, error) {
  498. glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)
  499. // Figure out the valid ancestor range to prevent rewrite attacks
  500. floor, ceil := int64(-1), d.headBlock().NumberU64()
  501. if ceil >= MaxForkAncestry {
  502. floor = int64(ceil - MaxForkAncestry)
  503. }
  504. // Request the topmost blocks to short circuit binary ancestor lookup
  505. head := ceil
  506. if head > height {
  507. head = height
  508. }
  509. from := int64(head) - int64(MaxHashFetch) + 1
  510. if from < 0 {
  511. from = 0
  512. }
  513. go p.getAbsHashes(uint64(from), MaxHashFetch)
  514. // Wait for the remote response to the head fetch
  515. number, hash := uint64(0), common.Hash{}
  516. timeout := time.After(hashTTL)
  517. for finished := false; !finished; {
  518. select {
  519. case <-d.cancelCh:
  520. return 0, errCancelHashFetch
  521. case packet := <-d.hashCh:
  522. // Discard anything not from the origin peer
  523. if packet.PeerId() != p.id {
  524. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
  525. break
  526. }
  527. // Make sure the peer actually gave something valid
  528. hashes := packet.(*hashPack).hashes
  529. if len(hashes) == 0 {
  530. glog.V(logger.Debug).Infof("%v: empty head hash set", p)
  531. return 0, errEmptyHashSet
  532. }
  533. // Check if a common ancestor was found
  534. finished = true
  535. for i := len(hashes) - 1; i >= 0; i-- {
  536. // Skip any headers that underflow/overflow our requested set
  537. header := d.getHeader(hashes[i])
  538. if header == nil || header.Number.Int64() < from || header.Number.Uint64() > head {
  539. continue
  540. }
  541. // Otherwise check if we already know the header or not
  542. if d.hasBlockAndState(hashes[i]) {
  543. number, hash = header.Number.Uint64(), header.Hash()
  544. break
  545. }
  546. }
  547. case <-timeout:
  548. glog.V(logger.Debug).Infof("%v: head hash timeout", p)
  549. return 0, errTimeout
  550. case <-d.blockCh:
  551. // Out of bounds blocks received, ignore them
  552. case <-d.headerCh:
  553. case <-d.bodyCh:
  554. case <-d.stateCh:
  555. case <-d.receiptCh:
  556. // Ignore eth/{62,63} packets because this is eth/61.
  557. // These can arrive as a late delivery from a previous sync.
  558. }
  559. }
  560. // If the head fetch already found an ancestor, return
  561. if !common.EmptyHash(hash) {
  562. if int64(number) <= floor {
  563. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
  564. return 0, errInvalidAncestor
  565. }
  566. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
  567. return number, nil
  568. }
  569. // Ancestor not found, we need to binary search over our chain
  570. start, end := uint64(0), head
  571. if floor > 0 {
  572. start = uint64(floor)
  573. }
  574. for start+1 < end {
  575. // Split our chain interval in two, and request the hash to cross check
  576. check := (start + end) / 2
  577. timeout := time.After(hashTTL)
  578. go p.getAbsHashes(uint64(check), 1)
  579. // Wait until a reply arrives to this request
  580. for arrived := false; !arrived; {
  581. select {
  582. case <-d.cancelCh:
  583. return 0, errCancelHashFetch
  584. case packet := <-d.hashCh:
  585. // Discard anything not from the origin peer
  586. if packet.PeerId() != p.id {
  587. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
  588. break
  589. }
  590. // Make sure the peer actually gave something valid
  591. hashes := packet.(*hashPack).hashes
  592. if len(hashes) != 1 {
  593. glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
  594. return 0, errBadPeer
  595. }
  596. arrived = true
  597. // Modify the search interval based on the response
  598. if !d.hasBlockAndState(hashes[0]) {
  599. end = check
  600. break
  601. }
  602. block := d.getBlock(hashes[0]) // this doesn't check state, hence the above explicit check
  603. if block.NumberU64() != check {
  604. glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
  605. return 0, errBadPeer
  606. }
  607. start = check
  608. case <-timeout:
  609. glog.V(logger.Debug).Infof("%v: search hash timeout", p)
  610. return 0, errTimeout
  611. case <-d.blockCh:
  612. // Out of bounds blocks received, ignore them
  613. case <-d.headerCh:
  614. case <-d.bodyCh:
  615. case <-d.stateCh:
  616. case <-d.receiptCh:
  617. // Ignore eth/{62,63} packets because this is eth/61.
  618. // These can arrive as a late delivery from a previous sync.
  619. }
  620. }
  621. }
  622. // Ensure valid ancestry and return
  623. if int64(start) <= floor {
  624. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)
  625. return 0, errInvalidAncestor
  626. }
  627. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, start, hash[:4])
  628. return start, nil
  629. }
  630. // fetchHashes61 keeps retrieving hashes from the requested number, until no more
  631. // are returned, potentially throttling on the way.
  632. func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
  633. glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)
  634. // Create a timeout timer, and the associated hash fetcher
  635. request := time.Now() // time of the last fetch request
  636. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  637. <-timeout.C // timeout channel should be initially empty
  638. defer timeout.Stop()
  639. getHashes := func(from uint64) {
  640. glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)
  641. request = time.Now()
  642. timeout.Reset(hashTTL)
  643. go p.getAbsHashes(from, MaxHashFetch)
  644. }
  645. // Start pulling hashes, until all are exhausted
  646. getHashes(from)
  647. gotHashes := false
  648. for {
  649. select {
  650. case <-d.cancelCh:
  651. return errCancelHashFetch
  652. case packet := <-d.hashCh:
  653. // Make sure the active peer is giving us the hashes
  654. if packet.PeerId() != p.id {
  655. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
  656. break
  657. }
  658. hashReqTimer.UpdateSince(request)
  659. timeout.Stop()
  660. // If no more hashes are inbound, notify the block fetcher and return
  661. if packet.Items() == 0 {
  662. glog.V(logger.Debug).Infof("%v: no available hashes", p)
  663. select {
  664. case d.blockWakeCh <- false:
  665. case <-d.cancelCh:
  666. }
  667. // If no hashes were retrieved at all, the peer violated it's TD promise that it had a
  668. // better chain compared to ours. The only exception is if it's promised blocks were
  669. // already imported by other means (e.g. fetcher):
  670. //
  671. // R <remote peer>, L <local node>: Both at block 10
  672. // R: Mine block 11, and propagate it to L
  673. // L: Queue block 11 for import
  674. // L: Notice that R's head and TD increased compared to ours, start sync
  675. // L: Import of block 11 finishes
  676. // L: Sync begins, and finds common ancestor at 11
  677. // L: Request new hashes up from 11 (R's TD was higher, it must have something)
  678. // R: Nothing to give
  679. if !gotHashes && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  680. return errStallingPeer
  681. }
  682. return nil
  683. }
  684. gotHashes = true
  685. hashes := packet.(*hashPack).hashes
  686. // Otherwise insert all the new hashes, aborting in case of junk
  687. glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from)
  688. inserts := d.queue.Schedule61(hashes, true)
  689. if len(inserts) != len(hashes) {
  690. glog.V(logger.Debug).Infof("%v: stale hashes", p)
  691. return errBadPeer
  692. }
  693. // Notify the block fetcher of new hashes, but stop if queue is full
  694. if d.queue.PendingBlocks() < maxQueuedHashes {
  695. // We still have hashes to fetch, send continuation wake signal (potential)
  696. select {
  697. case d.blockWakeCh <- true:
  698. default:
  699. }
  700. } else {
  701. // Hash limit reached, send a termination wake signal (enforced)
  702. select {
  703. case d.blockWakeCh <- false:
  704. case <-d.cancelCh:
  705. }
  706. return nil
  707. }
  708. // Queue not yet full, fetch the next batch
  709. from += uint64(len(hashes))
  710. getHashes(from)
  711. case <-timeout.C:
  712. glog.V(logger.Debug).Infof("%v: hash request timed out", p)
  713. hashTimeoutMeter.Mark(1)
  714. return errTimeout
  715. case <-d.headerCh:
  716. case <-d.bodyCh:
  717. case <-d.stateCh:
  718. case <-d.receiptCh:
  719. // Ignore eth/{62,63} packets because this is eth/61.
  720. // These can arrive as a late delivery from a previous sync.
  721. }
  722. }
  723. }
  724. // fetchBlocks61 iteratively downloads the scheduled hashes, taking any available
  725. // peers, reserving a chunk of blocks for each, waiting for delivery and also
  726. // periodically checking for timeouts.
  727. func (d *Downloader) fetchBlocks61(from uint64) error {
  728. glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
  729. defer glog.V(logger.Debug).Infof("Block download terminated")
  730. // Create a timeout timer for scheduling expiration tasks
  731. ticker := time.NewTicker(100 * time.Millisecond)
  732. defer ticker.Stop()
  733. update := make(chan struct{}, 1)
  734. // Fetch blocks until the hash fetcher's done
  735. finished := false
  736. for {
  737. select {
  738. case <-d.cancelCh:
  739. return errCancelBlockFetch
  740. case packet := <-d.blockCh:
  741. // If the peer was previously banned and failed to deliver it's pack
  742. // in a reasonable time frame, ignore it's message.
  743. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  744. blocks := packet.(*blockPack).blocks
  745. // Deliver the received chunk of blocks and check chain validity
  746. accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
  747. if err == errInvalidChain {
  748. return err
  749. }
  750. // Unless a peer delivered something completely else than requested (usually
  751. // caused by a timed out request which came through in the end), set it to
  752. // idle. If the delivery's stale, the peer should have already been idled.
  753. if err != errStaleDelivery {
  754. peer.SetBlocksIdle(accepted)
  755. }
  756. // Issue a log to the user to see what's going on
  757. switch {
  758. case err == nil && len(blocks) == 0:
  759. glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
  760. case err == nil:
  761. glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
  762. default:
  763. glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
  764. }
  765. }
  766. // Blocks arrived, try to update the progress
  767. select {
  768. case update <- struct{}{}:
  769. default:
  770. }
  771. case cont := <-d.blockWakeCh:
  772. // The hash fetcher sent a continuation flag, check if it's done
  773. if !cont {
  774. finished = true
  775. }
  776. // Hashes arrive, try to update the progress
  777. select {
  778. case update <- struct{}{}:
  779. default:
  780. }
  781. case <-ticker.C:
  782. // Sanity check update the progress
  783. select {
  784. case update <- struct{}{}:
  785. default:
  786. }
  787. case <-update:
  788. // Short circuit if we lost all our peers
  789. if d.peers.Len() == 0 {
  790. return errNoPeers
  791. }
  792. // Check for block request timeouts and demote the responsible peers
  793. for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
  794. if peer := d.peers.Peer(pid); peer != nil {
  795. if fails > 1 {
  796. glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
  797. peer.SetBlocksIdle(0)
  798. } else {
  799. glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
  800. d.dropPeer(pid)
  801. }
  802. }
  803. }
  804. // If there's nothing more to fetch, wait or terminate
  805. if d.queue.PendingBlocks() == 0 {
  806. if !d.queue.InFlightBlocks() && finished {
  807. glog.V(logger.Debug).Infof("Block fetching completed")
  808. return nil
  809. }
  810. break
  811. }
  812. // Send a download request to all idle peers, until throttled
  813. throttled := false
  814. idles, total := d.peers.BlockIdlePeers()
  815. for _, peer := range idles {
  816. // Short circuit if throttling activated
  817. if d.queue.ShouldThrottleBlocks() {
  818. throttled = true
  819. break
  820. }
  821. // Reserve a chunk of hashes for a peer. A nil can mean either that
  822. // no more hashes are available, or that the peer is known not to
  823. // have them.
  824. request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
  825. if request == nil {
  826. continue
  827. }
  828. if glog.V(logger.Detail) {
  829. glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
  830. }
  831. // Fetch the chunk and make sure any errors return the hashes to the queue
  832. if err := peer.Fetch61(request); err != nil {
  833. // Although we could try and make an attempt to fix this, this error really
  834. // means that we've double allocated a fetch task to a peer. If that is the
  835. // case, the internal state of the downloader and the queue is very wrong so
  836. // better hard crash and note the error instead of silently accumulating into
  837. // a much bigger issue.
  838. panic(fmt.Sprintf("%v: fetch assignment failed", peer))
  839. }
  840. }
  841. // Make sure that we have peers available for fetching. If all peers have been tried
  842. // and all failed throw an error
  843. if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
  844. return errPeersUnavailable
  845. }
  846. case <-d.headerCh:
  847. case <-d.bodyCh:
  848. case <-d.stateCh:
  849. case <-d.receiptCh:
  850. // Ignore eth/{62,63} packets because this is eth/61.
  851. // These can arrive as a late delivery from a previous sync.
  852. }
  853. }
  854. }
  855. // fetchHeight retrieves the head header of the remote peer to aid in estimating
  856. // the total time a pending synchronisation would take.
  857. func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
  858. glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
  859. // Request the advertised remote head block and wait for the response
  860. go p.getRelHeaders(p.head, 1, 0, false)
  861. timeout := time.After(headerTTL)
  862. for {
  863. select {
  864. case <-d.cancelCh:
  865. return nil, errCancelBlockFetch
  866. case packet := <-d.headerCh:
  867. // Discard anything not from the origin peer
  868. if packet.PeerId() != p.id {
  869. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  870. break
  871. }
  872. // Make sure the peer actually gave something valid
  873. headers := packet.(*headerPack).headers
  874. if len(headers) != 1 {
  875. glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
  876. return nil, errBadPeer
  877. }
  878. return headers[0], nil
  879. case <-timeout:
  880. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  881. return nil, errTimeout
  882. case <-d.bodyCh:
  883. case <-d.stateCh:
  884. case <-d.receiptCh:
  885. // Out of bounds delivery, ignore
  886. case <-d.hashCh:
  887. case <-d.blockCh:
  888. // Ignore eth/61 packets because this is eth/62+.
  889. // These can arrive as a late delivery from a previous sync.
  890. }
  891. }
  892. }
  893. // findAncestor tries to locate the common ancestor link of the local chain and
  894. // a remote peers blockchain. In the general case when our node was in sync and
  895. // on the correct chain, checking the top N links should already get us a match.
  896. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  897. // the head links match), we do a binary search to find the common ancestor.
  898. func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
  899. glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)
  900. // Figure out the valid ancestor range to prevent rewrite attacks
  901. floor, ceil := int64(-1), d.headHeader().Number.Uint64()
  902. if d.mode == FullSync {
  903. ceil = d.headBlock().NumberU64()
  904. } else if d.mode == FastSync {
  905. ceil = d.headFastBlock().NumberU64()
  906. }
  907. if ceil >= MaxForkAncestry {
  908. floor = int64(ceil - MaxForkAncestry)
  909. }
  910. // Request the topmost blocks to short circuit binary ancestor lookup
  911. head := ceil
  912. if head > height {
  913. head = height
  914. }
  915. from := int64(head) - int64(MaxHeaderFetch) + 1
  916. if from < 0 {
  917. from = 0
  918. }
  919. go p.getAbsHeaders(uint64(from), MaxHeaderFetch, 0, false)
  920. // Wait for the remote response to the head fetch
  921. number, hash := uint64(0), common.Hash{}
  922. timeout := time.After(hashTTL)
  923. for finished := false; !finished; {
  924. select {
  925. case <-d.cancelCh:
  926. return 0, errCancelHashFetch
  927. case packet := <-d.headerCh:
  928. // Discard anything not from the origin peer
  929. if packet.PeerId() != p.id {
  930. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  931. break
  932. }
  933. // Make sure the peer actually gave something valid
  934. headers := packet.(*headerPack).headers
  935. if len(headers) == 0 {
  936. glog.V(logger.Warn).Infof("%v: empty head header set", p)
  937. return 0, errEmptyHeaderSet
  938. }
  939. // Make sure the peer's reply conforms to the request
  940. for i := 0; i < len(headers); i++ {
  941. if number := headers[i].Number.Int64(); number != from+int64(i) {
  942. glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i), number)
  943. return 0, errInvalidChain
  944. }
  945. if i > 0 && headers[i-1].Hash() != headers[i].ParentHash {
  946. glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ancestry: expected [%x], got [%x]", p, i, headers[i-1].Hash().Bytes()[:4], headers[i].ParentHash[:4])
  947. return 0, errInvalidChain
  948. }
  949. }
  950. // Check if a common ancestor was found
  951. finished = true
  952. for i := len(headers) - 1; i >= 0; i-- {
  953. // Skip any headers that underflow/overflow our requested set
  954. if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > head {
  955. continue
  956. }
  957. // Otherwise check if we already know the header or not
  958. if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
  959. number, hash = headers[i].Number.Uint64(), headers[i].Hash()
  960. break
  961. }
  962. }
  963. case <-timeout:
  964. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  965. return 0, errTimeout
  966. case <-d.bodyCh:
  967. case <-d.stateCh:
  968. case <-d.receiptCh:
  969. // Out of bounds delivery, ignore
  970. case <-d.hashCh:
  971. case <-d.blockCh:
  972. // Ignore eth/61 packets because this is eth/62+.
  973. // These can arrive as a late delivery from a previous sync.
  974. }
  975. }
  976. // If the head fetch already found an ancestor, return
  977. if !common.EmptyHash(hash) {
  978. if int64(number) <= floor {
  979. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
  980. return 0, errInvalidAncestor
  981. }
  982. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
  983. return number, nil
  984. }
  985. // Ancestor not found, we need to binary search over our chain
  986. start, end := uint64(0), head
  987. if floor > 0 {
  988. start = uint64(floor)
  989. }
  990. for start+1 < end {
  991. // Split our chain interval in two, and request the hash to cross check
  992. check := (start + end) / 2
  993. timeout := time.After(hashTTL)
  994. go p.getAbsHeaders(uint64(check), 1, 0, false)
  995. // Wait until a reply arrives to this request
  996. for arrived := false; !arrived; {
  997. select {
  998. case <-d.cancelCh:
  999. return 0, errCancelHashFetch
  1000. case packer := <-d.headerCh:
  1001. // Discard anything not from the origin peer
  1002. if packer.PeerId() != p.id {
  1003. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
  1004. break
  1005. }
  1006. // Make sure the peer actually gave something valid
  1007. headers := packer.(*headerPack).headers
  1008. if len(headers) != 1 {
  1009. glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
  1010. return 0, errBadPeer
  1011. }
  1012. arrived = true
  1013. // Modify the search interval based on the response
  1014. if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
  1015. end = check
  1016. break
  1017. }
  1018. header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
  1019. if header.Number.Uint64() != check {
  1020. glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
  1021. return 0, errBadPeer
  1022. }
  1023. start = check
  1024. case <-timeout:
  1025. glog.V(logger.Debug).Infof("%v: search header timeout", p)
  1026. return 0, errTimeout
  1027. case <-d.bodyCh:
  1028. case <-d.stateCh:
  1029. case <-d.receiptCh:
  1030. // Out of bounds delivery, ignore
  1031. case <-d.hashCh:
  1032. case <-d.blockCh:
  1033. // Ignore eth/61 packets because this is eth/62+.
  1034. // These can arrive as a late delivery from a previous sync.
  1035. }
  1036. }
  1037. }
  1038. // Ensure valid ancestry and return
  1039. if int64(start) <= floor {
  1040. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)
  1041. return 0, errInvalidAncestor
  1042. }
  1043. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, start, hash[:4])
  1044. return start, nil
  1045. }
  1046. // fetchHeaders keeps retrieving headers concurrently from the number
  1047. // requested, until no more are returned, potentially throttling on the way. To
  1048. // facilitate concurrency but still protect against malicious nodes sending bad
  1049. // headers, we construct a header chain skeleton using the "origin" peer we are
  1050. // syncing with, and fill in the missing headers using anyone else. Headers from
  1051. // other peers are only accepted if they map cleanly to the skeleton. If no one
  1052. // can fill in the skeleton - not even the origin peer - it's assumed invalid and
  1053. // the origin is dropped.
  1054. func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
  1055. glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
  1056. defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
  1057. // Create a timeout timer, and the associated header fetcher
  1058. skeleton := true // Skeleton assembly phase or finishing up
  1059. request := time.Now() // time of the last skeleton fetch request
  1060. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  1061. <-timeout.C // timeout channel should be initially empty
  1062. defer timeout.Stop()
  1063. getHeaders := func(from uint64) {
  1064. request = time.Now()
  1065. timeout.Reset(headerTTL)
  1066. if skeleton {
  1067. glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
  1068. go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
  1069. } else {
  1070. glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
  1071. go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
  1072. }
  1073. }
  1074. // Start pulling the header chain skeleton until all is done
  1075. getHeaders(from)
  1076. for {
  1077. select {
  1078. case <-d.cancelCh:
  1079. return errCancelHeaderFetch
  1080. case packet := <-d.headerCh:
  1081. // Make sure the active peer is giving us the skeleton headers
  1082. if packet.PeerId() != p.id {
  1083. glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
  1084. break
  1085. }
  1086. headerReqTimer.UpdateSince(request)
  1087. timeout.Stop()
  1088. // If the skeleton's finished, pull any remaining head headers directly from the origin
  1089. if packet.Items() == 0 && skeleton {
  1090. skeleton = false
  1091. getHeaders(from)
  1092. continue
  1093. }
  1094. // If no more headers are inbound, notify the content fetchers and return
  1095. if packet.Items() == 0 {
  1096. glog.V(logger.Debug).Infof("%v: no available headers", p)
  1097. d.headerProcCh <- nil
  1098. return nil
  1099. }
  1100. headers := packet.(*headerPack).headers
  1101. // If we received a skeleton batch, resolve internals concurrently
  1102. if skeleton {
  1103. filled, proced, err := d.fillHeaderSkeleton(from, headers)
  1104. if err != nil {
  1105. glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
  1106. return errInvalidChain
  1107. }
  1108. headers = filled[proced:]
  1109. from += uint64(proced)
  1110. }
  1111. // Insert all the new headers and fetch the next batch
  1112. if len(headers) > 0 {
  1113. glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
  1114. select {
  1115. case d.headerProcCh <- headers:
  1116. case <-d.cancelCh:
  1117. return errCancelHeaderFetch
  1118. }
  1119. from += uint64(len(headers))
  1120. }
  1121. getHeaders(from)
  1122. case <-timeout.C:
  1123. // Header retrieval timed out, consider the peer bad and drop
  1124. glog.V(logger.Debug).Infof("%v: header request timed out", p)
  1125. headerTimeoutMeter.Mark(1)
  1126. d.dropPeer(p.id)
  1127. // Finish the sync gracefully instead of dumping the gathered data though
  1128. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1129. select {
  1130. case ch <- false:
  1131. case <-d.cancelCh:
  1132. }
  1133. }
  1134. select {
  1135. case d.headerProcCh <- nil:
  1136. case <-d.cancelCh:
  1137. }
  1138. return errBadPeer
  1139. case <-d.hashCh:
  1140. case <-d.blockCh:
  1141. // Ignore eth/61 packets because this is eth/62+.
  1142. // These can arrive as a late delivery from a previous sync.
  1143. }
  1144. }
  1145. }
  1146. // fillHeaderSkeleton concurrently retrieves headers from all our available peers
  1147. // and maps them to the provided skeleton header chain.
  1148. //
  1149. // Any partial results from the beginning of the skeleton is (if possible) forwarded
  1150. // immediately to the header processor to keep the rest of the pipeline full even
  1151. // in the case of header stalls.
  1152. //
  1153. // The method returs the entire filled skeleton and also the number of headers
  1154. // already forwarded for processing.
  1155. func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
  1156. glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
  1157. d.queue.ScheduleSkeleton(from, skeleton)
  1158. var (
  1159. deliver = func(packet dataPack) (int, error) {
  1160. pack := packet.(*headerPack)
  1161. return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
  1162. }
  1163. expire = func() map[string]int { return d.queue.ExpireHeaders(headerTTL) }
  1164. throttle = func() bool { return false }
  1165. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  1166. return d.queue.ReserveHeaders(p, count), false, nil
  1167. }
  1168. fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
  1169. capacity = func(p *peer) int { return p.HeaderCapacity() }
  1170. setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
  1171. )
  1172. err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
  1173. d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
  1174. nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
  1175. glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
  1176. filled, proced := d.queue.RetrieveHeaders()
  1177. return filled, proced, err
  1178. }
  1179. // fetchBodies iteratively downloads the scheduled block bodies, taking any
  1180. // available peers, reserving a chunk of blocks for each, waiting for delivery
  1181. // and also periodically checking for timeouts.
  1182. func (d *Downloader) fetchBodies(from uint64) error {
  1183. glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
  1184. var (
  1185. deliver = func(packet dataPack) (int, error) {
  1186. pack := packet.(*bodyPack)
  1187. return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
  1188. }
  1189. expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
  1190. fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
  1191. capacity = func(p *peer) int { return p.BlockCapacity() }
  1192. setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
  1193. )
  1194. err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
  1195. d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
  1196. d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body")
  1197. glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
  1198. return err
  1199. }
  1200. // fetchReceipts iteratively downloads the scheduled block receipts, taking any
  1201. // available peers, reserving a chunk of receipts for each, waiting for delivery
  1202. // and also periodically checking for timeouts.
  1203. func (d *Downloader) fetchReceipts(from uint64) error {
  1204. glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
  1205. var (
  1206. deliver = func(packet dataPack) (int, error) {
  1207. pack := packet.(*receiptPack)
  1208. return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
  1209. }
  1210. expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
  1211. fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
  1212. capacity = func(p *peer) int { return p.ReceiptCapacity() }
  1213. setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
  1214. )
  1215. err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
  1216. d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
  1217. d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
  1218. glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
  1219. return err
  1220. }
  1221. // fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
  1222. // available peers, reserving a chunk of nodes for each, waiting for delivery and
  1223. // also periodically checking for timeouts.
  1224. func (d *Downloader) fetchNodeData() error {
  1225. glog.V(logger.Debug).Infof("Downloading node state data")
  1226. var (
  1227. deliver = func(packet dataPack) (int, error) {
  1228. start := time.Now()
  1229. return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
  1230. // If the peer returned old-requested data, forgive
  1231. if err == trie.ErrNotRequested {
  1232. glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
  1233. return
  1234. }
  1235. if err != nil {
  1236. // If the node data processing failed, the root hash is very wrong, abort
  1237. glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
  1238. d.cancel()
  1239. return
  1240. }
  1241. // Processing succeeded, notify state fetcher of continuation
  1242. pending := d.queue.PendingNodeData()
  1243. if pending > 0 {
  1244. select {
  1245. case d.stateWakeCh <- true:
  1246. default:
  1247. }
  1248. }
  1249. d.syncStatsLock.Lock()
  1250. d.syncStatsStateDone += uint64(delivered)
  1251. d.syncStatsLock.Unlock()
  1252. // Log a message to the user and return
  1253. if delivered > 0 {
  1254. glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending)
  1255. }
  1256. })
  1257. }
  1258. expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
  1259. throttle = func() bool { return false }
  1260. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  1261. return d.queue.ReserveNodeData(p, count), false, nil
  1262. }
  1263. fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
  1264. capacity = func(p *peer) int { return p.NodeDataCapacity() }
  1265. setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
  1266. )
  1267. err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
  1268. d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
  1269. d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State")
  1270. glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
  1271. return err
  1272. }
  1273. // fetchParts iteratively downloads scheduled block parts, taking any available
  1274. // peers, reserving a chunk of fetch requests for each, waiting for delivery and
  1275. // also periodically checking for timeouts.
  1276. //
  1277. // As the scheduling/timeout logic mostly is the same for all downloaded data
  1278. // types, this method is used by each for data gathering and is instrumented with
  1279. // various callbacks to handle the slight differences between processing them.
  1280. //
  1281. // The instrumentation parameters:
  1282. // - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
  1283. // - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
  1284. // - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
  1285. // - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
  1286. // - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
  1287. // - pending: task callback for the number of requests still needing download (detect completion/non-completability)
  1288. // - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
  1289. // - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
  1290. // - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
  1291. // - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
  1292. // - fetch: network callback to actually send a particular download request to a physical remote peer
  1293. // - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
  1294. // - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping)
  1295. // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
  1296. // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
  1297. // - kind: textual label of the type being downloaded to display in log mesages
  1298. func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
  1299. expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
  1300. fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
  1301. idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
  1302. // Create a ticker to detect expired retrieval tasks
  1303. ticker := time.NewTicker(100 * time.Millisecond)
  1304. defer ticker.Stop()
  1305. update := make(chan struct{}, 1)
  1306. // Prepare the queue and fetch block parts until the block header fetcher's done
  1307. finished := false
  1308. for {
  1309. select {
  1310. case <-d.cancelCh:
  1311. return errCancel
  1312. case packet := <-deliveryCh:
  1313. // If the peer was previously banned and failed to deliver it's pack
  1314. // in a reasonable time frame, ignore it's message.
  1315. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  1316. // Deliver the received chunk of data and check chain validity
  1317. accepted, err := deliver(packet)
  1318. if err == errInvalidChain {
  1319. return err
  1320. }
  1321. // Unless a peer delivered something completely else than requested (usually
  1322. // caused by a timed out request which came through in the end), set it to
  1323. // idle. If the delivery's stale, the peer should have already been idled.
  1324. if err != errStaleDelivery {
  1325. setIdle(peer, accepted)
  1326. }
  1327. // Issue a log to the user to see what's going on
  1328. switch {
  1329. case err == nil && packet.Items() == 0:
  1330. glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
  1331. case err == nil:
  1332. glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
  1333. default:
  1334. glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
  1335. }
  1336. }
  1337. // Blocks assembled, try to update the progress
  1338. select {
  1339. case update <- struct{}{}:
  1340. default:
  1341. }
  1342. case cont := <-wakeCh:
  1343. // The header fetcher sent a continuation flag, check if it's done
  1344. if !cont {
  1345. finished = true
  1346. }
  1347. // Headers arrive, try to update the progress
  1348. select {
  1349. case update <- struct{}{}:
  1350. default:
  1351. }
  1352. case <-ticker.C:
  1353. // Sanity check update the progress
  1354. select {
  1355. case update <- struct{}{}:
  1356. default:
  1357. }
  1358. case <-update:
  1359. // Short circuit if we lost all our peers
  1360. if d.peers.Len() == 0 {
  1361. return errNoPeers
  1362. }
  1363. // Check for fetch request timeouts and demote the responsible peers
  1364. for pid, fails := range expire() {
  1365. if peer := d.peers.Peer(pid); peer != nil {
  1366. if fails > 1 {
  1367. glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
  1368. setIdle(peer, 0)
  1369. } else {
  1370. glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
  1371. d.dropPeer(pid)
  1372. }
  1373. }
  1374. }
  1375. // If there's nothing more to fetch, wait or terminate
  1376. if pending() == 0 {
  1377. if !inFlight() && finished {
  1378. glog.V(logger.Debug).Infof("%s fetching completed", kind)
  1379. return nil
  1380. }
  1381. break
  1382. }
  1383. // Send a download request to all idle peers, until throttled
  1384. progressed, throttled, running := false, false, inFlight()
  1385. idles, total := idle()
  1386. for _, peer := range idles {
  1387. // Short circuit if throttling activated
  1388. if throttle() {
  1389. throttled = true
  1390. break
  1391. }
  1392. // Reserve a chunk of fetches for a peer. A nil can mean either that
  1393. // no more headers are available, or that the peer is known not to
  1394. // have them.
  1395. request, progress, err := reserve(peer, capacity(peer))
  1396. if err != nil {
  1397. return err
  1398. }
  1399. if progress {
  1400. progressed = true
  1401. }
  1402. if request == nil {
  1403. continue
  1404. }
  1405. if glog.V(logger.Detail) {
  1406. if request.From > 0 {
  1407. glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
  1408. } else if len(request.Headers) > 0 {
  1409. glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
  1410. } else {
  1411. glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
  1412. }
  1413. }
  1414. // Fetch the chunk and make sure any errors return the hashes to the queue
  1415. if fetchHook != nil {
  1416. fetchHook(request.Headers)
  1417. }
  1418. if err := fetch(peer, request); err != nil {
  1419. // Although we could try and make an attempt to fix this, this error really
  1420. // means that we've double allocated a fetch task to a peer. If that is the
  1421. // case, the internal state of the downloader and the queue is very wrong so
  1422. // better hard crash and note the error instead of silently accumulating into
  1423. // a much bigger issue.
  1424. panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind)))
  1425. }
  1426. running = true
  1427. }
  1428. // Make sure that we have peers available for fetching. If all peers have been tried
  1429. // and all failed throw an error
  1430. if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
  1431. return errPeersUnavailable
  1432. }
  1433. case <-d.hashCh:
  1434. case <-d.blockCh:
  1435. // Ignore eth/61 packets because this is eth/62+.
  1436. // These can arrive as a late delivery from a previous sync.
  1437. }
  1438. }
  1439. }
  1440. // processHeaders takes batches of retrieved headers from an input channel and
  1441. // keeps processing and scheduling them into the header chain and downloader's
  1442. // queue until the stream ends or a failure occurs.
  1443. func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
  1444. // Calculate the pivoting point for switching from fast to slow sync
  1445. pivot := d.queue.FastSyncPivot()
  1446. // Keep a count of uncertain headers to roll back
  1447. rollback := []*types.Header{}
  1448. defer func() {
  1449. if len(rollback) > 0 {
  1450. // Flatten the headers and roll them back
  1451. hashes := make([]common.Hash, len(rollback))
  1452. for i, header := range rollback {
  1453. hashes[i] = header.Hash()
  1454. }
  1455. lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
  1456. d.rollback(hashes)
  1457. glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
  1458. len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
  1459. // If we're already past the pivot point, this could be an attack, disable fast sync
  1460. if rollback[len(rollback)-1].Number.Uint64() > pivot {
  1461. d.noFast = true
  1462. }
  1463. }
  1464. }()
  1465. // Wait for batches of headers to process
  1466. gotHeaders := false
  1467. for {
  1468. select {
  1469. case <-d.cancelCh:
  1470. return errCancelHeaderProcessing
  1471. case headers := <-d.headerProcCh:
  1472. // Terminate header processing if we synced up
  1473. if len(headers) == 0 {
  1474. // Notify everyone that headers are fully processed
  1475. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1476. select {
  1477. case ch <- false:
  1478. case <-d.cancelCh:
  1479. }
  1480. }
  1481. // If no headers were retrieved at all, the peer violated it's TD promise that it had a
  1482. // better chain compared to ours. The only exception is if it's promised blocks were
  1483. // already imported by other means (e.g. fecher):
  1484. //
  1485. // R <remote peer>, L <local node>: Both at block 10
  1486. // R: Mine block 11, and propagate it to L
  1487. // L: Queue block 11 for import
  1488. // L: Notice that R's head and TD increased compared to ours, start sync
  1489. // L: Import of block 11 finishes
  1490. // L: Sync begins, and finds common ancestor at 11
  1491. // L: Request new headers up from 11 (R's TD was higher, it must have something)
  1492. // R: Nothing to give
  1493. if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  1494. return errStallingPeer
  1495. }
  1496. // If fast or light syncing, ensure promised headers are indeed delivered. This is
  1497. // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
  1498. // of delivering the post-pivot blocks that would flag the invalid content.
  1499. //
  1500. // This check cannot be executed "as is" for full imports, since blocks may still be
  1501. // queued for processing when the header download completes. However, as long as the
  1502. // peer gave us something useful, we're already happy/progressed (above check).
  1503. if d.mode == FastSync || d.mode == LightSync {
  1504. if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
  1505. return errStallingPeer
  1506. }
  1507. }
  1508. // Disable any rollback and return
  1509. rollback = nil
  1510. return nil
  1511. }
  1512. // Otherwise split the chunk of headers into batches and process them
  1513. gotHeaders = true
  1514. for len(headers) > 0 {
  1515. // Terminate if something failed in between processing chunks
  1516. select {
  1517. case <-d.cancelCh:
  1518. return errCancelHeaderProcessing
  1519. default:
  1520. }
  1521. // Select the next chunk of headers to import
  1522. limit := maxHeadersProcess
  1523. if limit > len(headers) {
  1524. limit = len(headers)
  1525. }
  1526. chunk := headers[:limit]
  1527. // In case of header only syncing, validate the chunk immediately
  1528. if d.mode == FastSync || d.mode == LightSync {
  1529. // Collect the yet unknown headers to mark them as uncertain
  1530. unknown := make([]*types.Header, 0, len(headers))
  1531. for _, header := range chunk {
  1532. if !d.hasHeader(header.Hash()) {
  1533. unknown = append(unknown, header)
  1534. }
  1535. }
  1536. // If we're importing pure headers, verify based on their recentness
  1537. frequency := fsHeaderCheckFrequency
  1538. if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
  1539. frequency = 1
  1540. }
  1541. if n, err := d.insertHeaders(chunk, frequency); err != nil {
  1542. // If some headers were inserted, add them too to the rollback list
  1543. if n > 0 {
  1544. rollback = append(rollback, chunk[:n]...)
  1545. }
  1546. glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
  1547. return errInvalidChain
  1548. }
  1549. // All verifications passed, store newly found uncertain headers
  1550. rollback = append(rollback, unknown...)
  1551. if len(rollback) > fsHeaderSafetyNet {
  1552. rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
  1553. }
  1554. }
  1555. // Unless we're doing light chains, schedule the headers for associated content retrieval
  1556. if d.mode == FullSync || d.mode == FastSync {
  1557. // If we've reached the allowed number of pending headers, stall a bit
  1558. for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
  1559. select {
  1560. case <-d.cancelCh:
  1561. return errCancelHeaderProcessing
  1562. case <-time.After(time.Second):
  1563. }
  1564. }
  1565. // Otherwise insert the headers for content retrieval
  1566. inserts := d.queue.Schedule(chunk, origin)
  1567. if len(inserts) != len(chunk) {
  1568. glog.V(logger.Debug).Infof("stale headers")
  1569. return errBadPeer
  1570. }
  1571. }
  1572. headers = headers[limit:]
  1573. origin += uint64(limit)
  1574. }
  1575. // Signal the content downloaders of the availablility of new tasks
  1576. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1577. select {
  1578. case ch <- true:
  1579. default:
  1580. }
  1581. }
  1582. }
  1583. }
  1584. }
  1585. // processContent takes fetch results from the queue and tries to import them
  1586. // into the chain. The type of import operation will depend on the result contents.
  1587. func (d *Downloader) processContent() error {
  1588. pivot := d.queue.FastSyncPivot()
  1589. for {
  1590. results := d.queue.WaitResults()
  1591. if len(results) == 0 {
  1592. return nil // queue empty
  1593. }
  1594. if d.chainInsertHook != nil {
  1595. d.chainInsertHook(results)
  1596. }
  1597. // Actually import the blocks
  1598. if glog.V(logger.Debug) {
  1599. first, last := results[0].Header, results[len(results)-1].Header
  1600. glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
  1601. }
  1602. for len(results) != 0 {
  1603. // Check for any termination requests
  1604. if atomic.LoadInt32(&d.interrupt) == 1 {
  1605. return errCancelContentProcessing
  1606. }
  1607. // Retrieve the a batch of results to import
  1608. var (
  1609. blocks = make([]*types.Block, 0, maxResultsProcess)
  1610. receipts = make([]types.Receipts, 0, maxResultsProcess)
  1611. )
  1612. items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
  1613. for _, result := range results[:items] {
  1614. switch {
  1615. case d.mode == FullSync:
  1616. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1617. case d.mode == FastSync:
  1618. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1619. if result.Header.Number.Uint64() <= pivot {
  1620. receipts = append(receipts, result.Receipts)
  1621. }
  1622. }
  1623. }
  1624. // Try to process the results, aborting if there's an error
  1625. var (
  1626. err error
  1627. index int
  1628. )
  1629. switch {
  1630. case len(receipts) > 0:
  1631. index, err = d.insertReceipts(blocks, receipts)
  1632. if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
  1633. glog.V(logger.Debug).Infof("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4])
  1634. index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
  1635. }
  1636. default:
  1637. index, err = d.insertBlocks(blocks)
  1638. }
  1639. if err != nil {
  1640. glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
  1641. return err
  1642. }
  1643. // Shift the results to the next batch
  1644. results = results[items:]
  1645. }
  1646. }
  1647. }
  1648. // DeliverHashes injects a new batch of hashes received from a remote node into
  1649. // the download schedule. This is usually invoked through the BlockHashesMsg by
  1650. // the protocol handler.
  1651. func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) {
  1652. return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter)
  1653. }
  1654. // DeliverBlocks injects a new batch of blocks received from a remote node.
  1655. // This is usually invoked through the BlocksMsg by the protocol handler.
  1656. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) {
  1657. return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter)
  1658. }
  1659. // DeliverHeaders injects a new batch of block headers received from a remote
  1660. // node into the download schedule.
  1661. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
  1662. return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
  1663. }
  1664. // DeliverBodies injects a new batch of block bodies received from a remote node.
  1665. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
  1666. return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
  1667. }
  1668. // DeliverReceipts injects a new batch of receipts received from a remote node.
  1669. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
  1670. return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
  1671. }
  1672. // DeliverNodeData injects a new batch of node state data received from a remote node.
  1673. func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
  1674. return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
  1675. }
  1676. // deliver injects a new batch of data received from a remote node.
  1677. func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  1678. // Update the delivery metrics for both good and failed deliveries
  1679. inMeter.Mark(int64(packet.Items()))
  1680. defer func() {
  1681. if err != nil {
  1682. dropMeter.Mark(int64(packet.Items()))
  1683. }
  1684. }()
  1685. // Deliver or abort if the sync is canceled while queuing
  1686. d.cancelLock.RLock()
  1687. cancel := d.cancelCh
  1688. d.cancelLock.RUnlock()
  1689. if cancel == nil {
  1690. return errNoSyncActive
  1691. }
  1692. select {
  1693. case destCh <- packet:
  1694. return nil
  1695. case <-cancel:
  1696. return errNoSyncActive
  1697. }
  1698. }