downloader.go 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package downloader contains the manual full chain synchronisation.
  17. package downloader
  18. import (
  19. "crypto/rand"
  20. "errors"
  21. "fmt"
  22. "math"
  23. "math/big"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. ethereum "github.com/ethereum/go-ethereum"
  28. "github.com/ethereum/go-ethereum/common"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/rcrowley/go-metrics"
  35. )
  36. var (
  37. MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
  38. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  39. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  40. MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
  41. MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
  42. MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
  43. MaxStateFetch = 384 // Amount of node state values to allow fetching per request
  44. MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation
  45. rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
  46. rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
  47. rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
  48. ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
  49. ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
  50. qosTuningPeers = 5 // Number of peers to tune based on (best peers)
  51. qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
  52. qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
  53. maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
  54. maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
  55. maxResultsProcess = 2048 // Number of content download results to import at once into the chain
  56. fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
  57. fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
  58. fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
  59. fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point
  60. fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
  61. fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
  62. )
  63. var (
  64. errBusy = errors.New("busy")
  65. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  66. errBadPeer = errors.New("action from bad peer ignored")
  67. errStallingPeer = errors.New("peer is stalling")
  68. errNoPeers = errors.New("no peers to keep download active")
  69. errTimeout = errors.New("timeout")
  70. errEmptyHeaderSet = errors.New("empty header set by peer")
  71. errPeersUnavailable = errors.New("no peers available or all tried for download")
  72. errInvalidAncestor = errors.New("retrieved ancestor is invalid")
  73. errInvalidChain = errors.New("retrieved hash chain is invalid")
  74. errInvalidBlock = errors.New("retrieved block is invalid")
  75. errInvalidBody = errors.New("retrieved block body is invalid")
  76. errInvalidReceipt = errors.New("retrieved receipt is invalid")
  77. errCancelBlockFetch = errors.New("block download canceled (requested)")
  78. errCancelHeaderFetch = errors.New("block header download canceled (requested)")
  79. errCancelBodyFetch = errors.New("block body download canceled (requested)")
  80. errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
  81. errCancelStateFetch = errors.New("state data download canceled (requested)")
  82. errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
  83. errCancelContentProcessing = errors.New("content processing canceled (requested)")
  84. errNoSyncActive = errors.New("no sync active")
  85. errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
  86. )
  87. type Downloader struct {
  88. mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
  89. mux *event.TypeMux // Event multiplexer to announce sync operation events
  90. queue *queue // Scheduler for selecting the hashes to download
  91. peers *peerSet // Set of active peers from which download can proceed
  92. stateDB ethdb.Database
  93. fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
  94. fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
  95. rttEstimate uint64 // Round trip time to target for download requests
  96. rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
  97. // Statistics
  98. syncStatsChainOrigin uint64 // Origin block number where syncing started at
  99. syncStatsChainHeight uint64 // Highest block number known when syncing started
  100. syncStatsState stateSyncStats
  101. syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
  102. // Callbacks
  103. hasHeader headerCheckFn // Checks if a header is present in the chain
  104. hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
  105. getHeader headerRetrievalFn // Retrieves a header from the chain
  106. getBlock blockRetrievalFn // Retrieves a block from the chain
  107. headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
  108. headBlock headBlockRetrievalFn // Retrieves the head block from the chain
  109. headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
  110. commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
  111. getTd tdRetrievalFn // Retrieves the TD of a block from the chain
  112. insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
  113. insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
  114. insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
  115. rollback chainRollbackFn // Removes a batch of recently added chain links
  116. dropPeer peerDropFn // Drops a peer for misbehaving
  117. // Status
  118. synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  119. synchronising int32
  120. notified int32
  121. // Channels
  122. headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
  123. bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
  124. receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
  125. bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
  126. receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
  127. headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
  128. // for stateFetcher
  129. stateSyncStart chan *stateSync
  130. trackStateReq chan *stateReq
  131. stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
  132. // Cancellation and termination
  133. cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
  134. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  135. cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
  136. quitCh chan struct{} // Quit channel to signal termination
  137. quitLock sync.RWMutex // Lock to prevent double closes
  138. // Testing hooks
  139. syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
  140. bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
  141. receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
  142. chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
  143. }
  144. // New creates a new downloader to fetch hashes and blocks from remote peers.
  145. func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
  146. getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
  147. headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
  148. insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
  149. dl := &Downloader{
  150. mode: mode,
  151. mux: mux,
  152. queue: newQueue(),
  153. peers: newPeerSet(),
  154. stateDB: stateDb,
  155. rttEstimate: uint64(rttMaxEstimate),
  156. rttConfidence: uint64(1000000),
  157. hasHeader: hasHeader,
  158. hasBlockAndState: hasBlockAndState,
  159. getHeader: getHeader,
  160. getBlock: getBlock,
  161. headHeader: headHeader,
  162. headBlock: headBlock,
  163. headFastBlock: headFastBlock,
  164. commitHeadBlock: commitHeadBlock,
  165. getTd: getTd,
  166. insertHeaders: insertHeaders,
  167. insertBlocks: insertBlocks,
  168. insertReceipts: insertReceipts,
  169. rollback: rollback,
  170. dropPeer: dropPeer,
  171. headerCh: make(chan dataPack, 1),
  172. bodyCh: make(chan dataPack, 1),
  173. receiptCh: make(chan dataPack, 1),
  174. bodyWakeCh: make(chan bool, 1),
  175. receiptWakeCh: make(chan bool, 1),
  176. headerProcCh: make(chan []*types.Header, 1),
  177. quitCh: make(chan struct{}),
  178. // for stateFetcher
  179. stateSyncStart: make(chan *stateSync),
  180. trackStateReq: make(chan *stateReq),
  181. stateCh: make(chan dataPack),
  182. }
  183. go dl.qosTuner()
  184. go dl.stateFetcher()
  185. return dl
  186. }
  187. // Progress retrieves the synchronisation boundaries, specifically the origin
  188. // block where synchronisation started at (may have failed/suspended); the block
  189. // or header sync is currently at; and the latest known block which the sync targets.
  190. //
  191. // In addition, during the state download phase of fast synchronisation the number
  192. // of processed and the total number of known states are also returned. Otherwise
  193. // these are zero.
  194. func (d *Downloader) Progress() ethereum.SyncProgress {
  195. // Lock the current stats and return the progress
  196. d.syncStatsLock.RLock()
  197. defer d.syncStatsLock.RUnlock()
  198. current := uint64(0)
  199. switch d.mode {
  200. case FullSync:
  201. current = d.headBlock().NumberU64()
  202. case FastSync:
  203. current = d.headFastBlock().NumberU64()
  204. case LightSync:
  205. current = d.headHeader().Number.Uint64()
  206. }
  207. return ethereum.SyncProgress{
  208. StartingBlock: d.syncStatsChainOrigin,
  209. CurrentBlock: current,
  210. HighestBlock: d.syncStatsChainHeight,
  211. PulledStates: d.syncStatsState.processed,
  212. KnownStates: d.syncStatsState.processed + d.syncStatsState.pending,
  213. }
  214. }
  215. // Synchronising returns whether the downloader is currently retrieving blocks.
  216. func (d *Downloader) Synchronising() bool {
  217. return atomic.LoadInt32(&d.synchronising) > 0
  218. }
  219. // RegisterPeer injects a new download peer into the set of block source to be
  220. // used for fetching hashes and blocks from.
  221. func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
  222. getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
  223. getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
  224. logger := log.New("peer", id)
  225. logger.Trace("Registering sync peer")
  226. if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil {
  227. logger.Error("Failed to register sync peer", "err", err)
  228. return err
  229. }
  230. d.qosReduceConfidence()
  231. return nil
  232. }
  233. // UnregisterPeer remove a peer from the known list, preventing any action from
  234. // the specified peer. An effort is also made to return any pending fetches into
  235. // the queue.
  236. func (d *Downloader) UnregisterPeer(id string) error {
  237. // Unregister the peer from the active peer set and revoke any fetch tasks
  238. logger := log.New("peer", id)
  239. logger.Trace("Unregistering sync peer")
  240. if err := d.peers.Unregister(id); err != nil {
  241. logger.Error("Failed to unregister sync peer", "err", err)
  242. return err
  243. }
  244. d.queue.Revoke(id)
  245. // If this peer was the master peer, abort sync immediately
  246. d.cancelLock.RLock()
  247. master := id == d.cancelPeer
  248. d.cancelLock.RUnlock()
  249. if master {
  250. d.Cancel()
  251. }
  252. return nil
  253. }
  254. // Synchronise tries to sync up our local block chain with a remote peer, both
  255. // adding various sanity checks as well as wrapping it with various log entries.
  256. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
  257. err := d.synchronise(id, head, td, mode)
  258. switch err {
  259. case nil:
  260. case errBusy:
  261. case errTimeout, errBadPeer, errStallingPeer,
  262. errEmptyHeaderSet, errPeersUnavailable, errTooOld,
  263. errInvalidAncestor, errInvalidChain:
  264. log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
  265. d.dropPeer(id)
  266. default:
  267. log.Warn("Synchronisation failed, retrying", "err", err)
  268. }
  269. return err
  270. }
  271. // synchronise will select the peer and use it for synchronising. If an empty string is given
  272. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  273. // checks fail an error will be returned. This method is synchronous
  274. func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  275. // Mock out the synchronisation if testing
  276. if d.synchroniseMock != nil {
  277. return d.synchroniseMock(id, hash)
  278. }
  279. // Make sure only one goroutine is ever allowed past this point at once
  280. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  281. return errBusy
  282. }
  283. defer atomic.StoreInt32(&d.synchronising, 0)
  284. // Post a user notification of the sync (only once per session)
  285. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  286. log.Info("Block synchronisation started")
  287. }
  288. // Reset the queue, peer set and wake channels to clean any internal leftover state
  289. d.queue.Reset()
  290. d.peers.Reset()
  291. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
  292. select {
  293. case <-ch:
  294. default:
  295. }
  296. }
  297. for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
  298. for empty := false; !empty; {
  299. select {
  300. case <-ch:
  301. default:
  302. empty = true
  303. }
  304. }
  305. }
  306. for empty := false; !empty; {
  307. select {
  308. case <-d.headerProcCh:
  309. default:
  310. empty = true
  311. }
  312. }
  313. // Create cancel channel for aborting mid-flight and mark the master peer
  314. d.cancelLock.Lock()
  315. d.cancelCh = make(chan struct{})
  316. d.cancelPeer = id
  317. d.cancelLock.Unlock()
  318. defer d.Cancel() // No matter what, we can't leave the cancel channel open
  319. // Set the requested sync mode, unless it's forbidden
  320. d.mode = mode
  321. if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
  322. d.mode = FullSync
  323. }
  324. // Retrieve the origin peer and initiate the downloading process
  325. p := d.peers.Peer(id)
  326. if p == nil {
  327. return errUnknownPeer
  328. }
  329. return d.syncWithPeer(p, hash, td)
  330. }
  331. // syncWithPeer starts a block synchronization based on the hash chain from the
  332. // specified peer and head hash.
  333. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
  334. d.mux.Post(StartEvent{})
  335. defer func() {
  336. // reset on error
  337. if err != nil {
  338. d.mux.Post(FailedEvent{err})
  339. } else {
  340. d.mux.Post(DoneEvent{})
  341. }
  342. }()
  343. if p.version < 62 {
  344. return errTooOld
  345. }
  346. log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
  347. defer func(start time.Time) {
  348. log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
  349. }(time.Now())
  350. // Look up the sync boundaries: the common ancestor and the target block
  351. latest, err := d.fetchHeight(p)
  352. if err != nil {
  353. return err
  354. }
  355. height := latest.Number.Uint64()
  356. origin, err := d.findAncestor(p, height)
  357. if err != nil {
  358. return err
  359. }
  360. d.syncStatsLock.Lock()
  361. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  362. d.syncStatsChainOrigin = origin
  363. }
  364. d.syncStatsChainHeight = height
  365. d.syncStatsLock.Unlock()
  366. // Initiate the sync using a concurrent header and content retrieval algorithm
  367. pivot := uint64(0)
  368. switch d.mode {
  369. case LightSync:
  370. pivot = height
  371. case FastSync:
  372. // Calculate the new fast/slow sync pivot point
  373. if d.fsPivotLock == nil {
  374. pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
  375. if err != nil {
  376. panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
  377. }
  378. if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
  379. pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
  380. }
  381. } else {
  382. // Pivot point locked in, use this and do not pick a new one!
  383. pivot = d.fsPivotLock.Number.Uint64()
  384. }
  385. // If the point is below the origin, move origin back to ensure state download
  386. if pivot < origin {
  387. if pivot > 0 {
  388. origin = pivot - 1
  389. } else {
  390. origin = 0
  391. }
  392. }
  393. log.Debug("Fast syncing until pivot block", "pivot", pivot)
  394. }
  395. d.queue.Prepare(origin+1, d.mode, pivot, latest)
  396. if d.syncInitHook != nil {
  397. d.syncInitHook(origin, height)
  398. }
  399. fetchers := []func() error{
  400. func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
  401. func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
  402. func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
  403. func() error { return d.processHeaders(origin+1, td) },
  404. }
  405. if d.mode == FastSync {
  406. fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
  407. } else if d.mode == FullSync {
  408. fetchers = append(fetchers, d.processFullSyncContent)
  409. }
  410. err = d.spawnSync(fetchers)
  411. if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
  412. // If sync failed in the critical section, bump the fail counter.
  413. atomic.AddUint32(&d.fsPivotFails, 1)
  414. }
  415. return err
  416. }
  417. // spawnSync runs d.process and all given fetcher functions to completion in
  418. // separate goroutines, returning the first error that appears.
  419. func (d *Downloader) spawnSync(fetchers []func() error) error {
  420. var wg sync.WaitGroup
  421. errc := make(chan error, len(fetchers))
  422. wg.Add(len(fetchers))
  423. for _, fn := range fetchers {
  424. fn := fn
  425. go func() { defer wg.Done(); errc <- fn() }()
  426. }
  427. // Wait for the first error, then terminate the others.
  428. var err error
  429. for i := 0; i < len(fetchers); i++ {
  430. if i == len(fetchers)-1 {
  431. // Close the queue when all fetchers have exited.
  432. // This will cause the block processor to end when
  433. // it has processed the queue.
  434. d.queue.Close()
  435. }
  436. if err = <-errc; err != nil {
  437. break
  438. }
  439. }
  440. d.queue.Close()
  441. d.Cancel()
  442. wg.Wait()
  443. return err
  444. }
  445. // Cancel cancels all of the operations and resets the queue. It returns true
  446. // if the cancel operation was completed.
  447. func (d *Downloader) Cancel() {
  448. // Close the current cancel channel
  449. d.cancelLock.Lock()
  450. if d.cancelCh != nil {
  451. select {
  452. case <-d.cancelCh:
  453. // Channel was already closed
  454. default:
  455. close(d.cancelCh)
  456. }
  457. }
  458. d.cancelLock.Unlock()
  459. }
  460. // Terminate interrupts the downloader, canceling all pending operations.
  461. // The downloader cannot be reused after calling Terminate.
  462. func (d *Downloader) Terminate() {
  463. // Close the termination channel (make sure double close is allowed)
  464. d.quitLock.Lock()
  465. select {
  466. case <-d.quitCh:
  467. default:
  468. close(d.quitCh)
  469. }
  470. d.quitLock.Unlock()
  471. // Cancel any pending download requests
  472. d.Cancel()
  473. }
  474. // fetchHeight retrieves the head header of the remote peer to aid in estimating
  475. // the total time a pending synchronisation would take.
  476. func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
  477. p.log.Debug("Retrieving remote chain height")
  478. // Request the advertised remote head block and wait for the response
  479. head, _ := p.currentHead()
  480. go p.getRelHeaders(head, 1, 0, false)
  481. ttl := d.requestTTL()
  482. timeout := time.After(ttl)
  483. for {
  484. select {
  485. case <-d.cancelCh:
  486. return nil, errCancelBlockFetch
  487. case packet := <-d.headerCh:
  488. // Discard anything not from the origin peer
  489. if packet.PeerId() != p.id {
  490. log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
  491. break
  492. }
  493. // Make sure the peer actually gave something valid
  494. headers := packet.(*headerPack).headers
  495. if len(headers) != 1 {
  496. p.log.Debug("Multiple headers for single request", "headers", len(headers))
  497. return nil, errBadPeer
  498. }
  499. head := headers[0]
  500. p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
  501. return head, nil
  502. case <-timeout:
  503. p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
  504. return nil, errTimeout
  505. case <-d.bodyCh:
  506. case <-d.receiptCh:
  507. // Out of bounds delivery, ignore
  508. }
  509. }
  510. }
  511. // findAncestor tries to locate the common ancestor link of the local chain and
  512. // a remote peers blockchain. In the general case when our node was in sync and
  513. // on the correct chain, checking the top N links should already get us a match.
  514. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  515. // the head links match), we do a binary search to find the common ancestor.
  516. func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
  517. // Figure out the valid ancestor range to prevent rewrite attacks
  518. floor, ceil := int64(-1), d.headHeader().Number.Uint64()
  519. p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
  520. if d.mode == FullSync {
  521. ceil = d.headBlock().NumberU64()
  522. } else if d.mode == FastSync {
  523. ceil = d.headFastBlock().NumberU64()
  524. }
  525. if ceil >= MaxForkAncestry {
  526. floor = int64(ceil - MaxForkAncestry)
  527. }
  528. // Request the topmost blocks to short circuit binary ancestor lookup
  529. head := ceil
  530. if head > height {
  531. head = height
  532. }
  533. from := int64(head) - int64(MaxHeaderFetch)
  534. if from < 0 {
  535. from = 0
  536. }
  537. // Span out with 15 block gaps into the future to catch bad head reports
  538. limit := 2 * MaxHeaderFetch / 16
  539. count := 1 + int((int64(ceil)-from)/16)
  540. if count > limit {
  541. count = limit
  542. }
  543. go p.getAbsHeaders(uint64(from), count, 15, false)
  544. // Wait for the remote response to the head fetch
  545. number, hash := uint64(0), common.Hash{}
  546. ttl := d.requestTTL()
  547. timeout := time.After(ttl)
  548. for finished := false; !finished; {
  549. select {
  550. case <-d.cancelCh:
  551. return 0, errCancelHeaderFetch
  552. case packet := <-d.headerCh:
  553. // Discard anything not from the origin peer
  554. if packet.PeerId() != p.id {
  555. log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
  556. break
  557. }
  558. // Make sure the peer actually gave something valid
  559. headers := packet.(*headerPack).headers
  560. if len(headers) == 0 {
  561. p.log.Warn("Empty head header set")
  562. return 0, errEmptyHeaderSet
  563. }
  564. // Make sure the peer's reply conforms to the request
  565. for i := 0; i < len(headers); i++ {
  566. if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
  567. p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
  568. return 0, errInvalidChain
  569. }
  570. }
  571. // Check if a common ancestor was found
  572. finished = true
  573. for i := len(headers) - 1; i >= 0; i-- {
  574. // Skip any headers that underflow/overflow our requested set
  575. if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
  576. continue
  577. }
  578. // Otherwise check if we already know the header or not
  579. if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
  580. number, hash = headers[i].Number.Uint64(), headers[i].Hash()
  581. // If every header is known, even future ones, the peer straight out lied about its head
  582. if number > height && i == limit-1 {
  583. p.log.Warn("Lied about chain head", "reported", height, "found", number)
  584. return 0, errStallingPeer
  585. }
  586. break
  587. }
  588. }
  589. case <-timeout:
  590. p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
  591. return 0, errTimeout
  592. case <-d.bodyCh:
  593. case <-d.receiptCh:
  594. // Out of bounds delivery, ignore
  595. }
  596. }
  597. // If the head fetch already found an ancestor, return
  598. if !common.EmptyHash(hash) {
  599. if int64(number) <= floor {
  600. p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
  601. return 0, errInvalidAncestor
  602. }
  603. p.log.Debug("Found common ancestor", "number", number, "hash", hash)
  604. return number, nil
  605. }
  606. // Ancestor not found, we need to binary search over our chain
  607. start, end := uint64(0), head
  608. if floor > 0 {
  609. start = uint64(floor)
  610. }
  611. for start+1 < end {
  612. // Split our chain interval in two, and request the hash to cross check
  613. check := (start + end) / 2
  614. ttl := d.requestTTL()
  615. timeout := time.After(ttl)
  616. go p.getAbsHeaders(uint64(check), 1, 0, false)
  617. // Wait until a reply arrives to this request
  618. for arrived := false; !arrived; {
  619. select {
  620. case <-d.cancelCh:
  621. return 0, errCancelHeaderFetch
  622. case packer := <-d.headerCh:
  623. // Discard anything not from the origin peer
  624. if packer.PeerId() != p.id {
  625. log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
  626. break
  627. }
  628. // Make sure the peer actually gave something valid
  629. headers := packer.(*headerPack).headers
  630. if len(headers) != 1 {
  631. p.log.Debug("Multiple headers for single request", "headers", len(headers))
  632. return 0, errBadPeer
  633. }
  634. arrived = true
  635. // Modify the search interval based on the response
  636. if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
  637. end = check
  638. break
  639. }
  640. header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
  641. if header.Number.Uint64() != check {
  642. p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
  643. return 0, errBadPeer
  644. }
  645. start = check
  646. case <-timeout:
  647. p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
  648. return 0, errTimeout
  649. case <-d.bodyCh:
  650. case <-d.receiptCh:
  651. // Out of bounds delivery, ignore
  652. }
  653. }
  654. }
  655. // Ensure valid ancestry and return
  656. if int64(start) <= floor {
  657. p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
  658. return 0, errInvalidAncestor
  659. }
  660. p.log.Debug("Found common ancestor", "number", start, "hash", hash)
  661. return start, nil
  662. }
  663. // fetchHeaders keeps retrieving headers concurrently from the number
  664. // requested, until no more are returned, potentially throttling on the way. To
  665. // facilitate concurrency but still protect against malicious nodes sending bad
  666. // headers, we construct a header chain skeleton using the "origin" peer we are
  667. // syncing with, and fill in the missing headers using anyone else. Headers from
  668. // other peers are only accepted if they map cleanly to the skeleton. If no one
  669. // can fill in the skeleton - not even the origin peer - it's assumed invalid and
  670. // the origin is dropped.
  671. func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
  672. p.log.Debug("Directing header downloads", "origin", from)
  673. defer p.log.Debug("Header download terminated")
  674. // Create a timeout timer, and the associated header fetcher
  675. skeleton := true // Skeleton assembly phase or finishing up
  676. request := time.Now() // time of the last skeleton fetch request
  677. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  678. <-timeout.C // timeout channel should be initially empty
  679. defer timeout.Stop()
  680. var ttl time.Duration
  681. getHeaders := func(from uint64) {
  682. request = time.Now()
  683. ttl = d.requestTTL()
  684. timeout.Reset(ttl)
  685. if skeleton {
  686. p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
  687. go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
  688. } else {
  689. p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
  690. go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
  691. }
  692. }
  693. // Start pulling the header chain skeleton until all is done
  694. getHeaders(from)
  695. for {
  696. select {
  697. case <-d.cancelCh:
  698. return errCancelHeaderFetch
  699. case packet := <-d.headerCh:
  700. // Make sure the active peer is giving us the skeleton headers
  701. if packet.PeerId() != p.id {
  702. log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
  703. break
  704. }
  705. headerReqTimer.UpdateSince(request)
  706. timeout.Stop()
  707. // If the skeleton's finished, pull any remaining head headers directly from the origin
  708. if packet.Items() == 0 && skeleton {
  709. skeleton = false
  710. getHeaders(from)
  711. continue
  712. }
  713. // If no more headers are inbound, notify the content fetchers and return
  714. if packet.Items() == 0 {
  715. p.log.Debug("No more headers available")
  716. select {
  717. case d.headerProcCh <- nil:
  718. return nil
  719. case <-d.cancelCh:
  720. return errCancelHeaderFetch
  721. }
  722. }
  723. headers := packet.(*headerPack).headers
  724. // If we received a skeleton batch, resolve internals concurrently
  725. if skeleton {
  726. filled, proced, err := d.fillHeaderSkeleton(from, headers)
  727. if err != nil {
  728. p.log.Debug("Skeleton chain invalid", "err", err)
  729. return errInvalidChain
  730. }
  731. headers = filled[proced:]
  732. from += uint64(proced)
  733. }
  734. // Insert all the new headers and fetch the next batch
  735. if len(headers) > 0 {
  736. p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
  737. select {
  738. case d.headerProcCh <- headers:
  739. case <-d.cancelCh:
  740. return errCancelHeaderFetch
  741. }
  742. from += uint64(len(headers))
  743. }
  744. getHeaders(from)
  745. case <-timeout.C:
  746. // Header retrieval timed out, consider the peer bad and drop
  747. p.log.Debug("Header request timed out", "elapsed", ttl)
  748. headerTimeoutMeter.Mark(1)
  749. d.dropPeer(p.id)
  750. // Finish the sync gracefully instead of dumping the gathered data though
  751. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
  752. select {
  753. case ch <- false:
  754. case <-d.cancelCh:
  755. }
  756. }
  757. select {
  758. case d.headerProcCh <- nil:
  759. case <-d.cancelCh:
  760. }
  761. return errBadPeer
  762. }
  763. }
  764. }
  765. // fillHeaderSkeleton concurrently retrieves headers from all our available peers
  766. // and maps them to the provided skeleton header chain.
  767. //
  768. // Any partial results from the beginning of the skeleton is (if possible) forwarded
  769. // immediately to the header processor to keep the rest of the pipeline full even
  770. // in the case of header stalls.
  771. //
  772. // The method returs the entire filled skeleton and also the number of headers
  773. // already forwarded for processing.
  774. func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
  775. log.Debug("Filling up skeleton", "from", from)
  776. d.queue.ScheduleSkeleton(from, skeleton)
  777. var (
  778. deliver = func(packet dataPack) (int, error) {
  779. pack := packet.(*headerPack)
  780. return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
  781. }
  782. expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
  783. throttle = func() bool { return false }
  784. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  785. return d.queue.ReserveHeaders(p, count), false, nil
  786. }
  787. fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
  788. capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
  789. setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
  790. )
  791. err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
  792. d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
  793. nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
  794. log.Debug("Skeleton fill terminated", "err", err)
  795. filled, proced := d.queue.RetrieveHeaders()
  796. return filled, proced, err
  797. }
  798. // fetchBodies iteratively downloads the scheduled block bodies, taking any
  799. // available peers, reserving a chunk of blocks for each, waiting for delivery
  800. // and also periodically checking for timeouts.
  801. func (d *Downloader) fetchBodies(from uint64) error {
  802. log.Debug("Downloading block bodies", "origin", from)
  803. var (
  804. deliver = func(packet dataPack) (int, error) {
  805. pack := packet.(*bodyPack)
  806. return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
  807. }
  808. expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
  809. fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
  810. capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
  811. setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
  812. )
  813. err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
  814. d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
  815. d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
  816. log.Debug("Block body download terminated", "err", err)
  817. return err
  818. }
  819. // fetchReceipts iteratively downloads the scheduled block receipts, taking any
  820. // available peers, reserving a chunk of receipts for each, waiting for delivery
  821. // and also periodically checking for timeouts.
  822. func (d *Downloader) fetchReceipts(from uint64) error {
  823. log.Debug("Downloading transaction receipts", "origin", from)
  824. var (
  825. deliver = func(packet dataPack) (int, error) {
  826. pack := packet.(*receiptPack)
  827. return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
  828. }
  829. expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
  830. fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
  831. capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
  832. setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
  833. )
  834. err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
  835. d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
  836. d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
  837. log.Debug("Transaction receipt download terminated", "err", err)
  838. return err
  839. }
  840. // fetchParts iteratively downloads scheduled block parts, taking any available
  841. // peers, reserving a chunk of fetch requests for each, waiting for delivery and
  842. // also periodically checking for timeouts.
  843. //
  844. // As the scheduling/timeout logic mostly is the same for all downloaded data
  845. // types, this method is used by each for data gathering and is instrumented with
  846. // various callbacks to handle the slight differences between processing them.
  847. //
  848. // The instrumentation parameters:
  849. // - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
  850. // - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
  851. // - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
  852. // - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
  853. // - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
  854. // - pending: task callback for the number of requests still needing download (detect completion/non-completability)
  855. // - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
  856. // - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
  857. // - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
  858. // - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
  859. // - fetch: network callback to actually send a particular download request to a physical remote peer
  860. // - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
  861. // - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
  862. // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
  863. // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
  864. // - kind: textual label of the type being downloaded to display in log mesages
  865. func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
  866. expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
  867. fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
  868. idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
  869. // Create a ticker to detect expired retrieval tasks
  870. ticker := time.NewTicker(100 * time.Millisecond)
  871. defer ticker.Stop()
  872. update := make(chan struct{}, 1)
  873. // Prepare the queue and fetch block parts until the block header fetcher's done
  874. finished := false
  875. for {
  876. select {
  877. case <-d.cancelCh:
  878. return errCancel
  879. case packet := <-deliveryCh:
  880. // If the peer was previously banned and failed to deliver it's pack
  881. // in a reasonable time frame, ignore it's message.
  882. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  883. // Deliver the received chunk of data and check chain validity
  884. accepted, err := deliver(packet)
  885. if err == errInvalidChain {
  886. return err
  887. }
  888. // Unless a peer delivered something completely else than requested (usually
  889. // caused by a timed out request which came through in the end), set it to
  890. // idle. If the delivery's stale, the peer should have already been idled.
  891. if err != errStaleDelivery {
  892. setIdle(peer, accepted)
  893. }
  894. // Issue a log to the user to see what's going on
  895. switch {
  896. case err == nil && packet.Items() == 0:
  897. peer.log.Trace("Requested data not delivered", "type", kind)
  898. case err == nil:
  899. peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
  900. default:
  901. peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
  902. }
  903. }
  904. // Blocks assembled, try to update the progress
  905. select {
  906. case update <- struct{}{}:
  907. default:
  908. }
  909. case cont := <-wakeCh:
  910. // The header fetcher sent a continuation flag, check if it's done
  911. if !cont {
  912. finished = true
  913. }
  914. // Headers arrive, try to update the progress
  915. select {
  916. case update <- struct{}{}:
  917. default:
  918. }
  919. case <-ticker.C:
  920. // Sanity check update the progress
  921. select {
  922. case update <- struct{}{}:
  923. default:
  924. }
  925. case <-update:
  926. // Short circuit if we lost all our peers
  927. if d.peers.Len() == 0 {
  928. return errNoPeers
  929. }
  930. // Check for fetch request timeouts and demote the responsible peers
  931. for pid, fails := range expire() {
  932. if peer := d.peers.Peer(pid); peer != nil {
  933. // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
  934. // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
  935. // out that sync wise we need to get rid of the peer.
  936. //
  937. // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
  938. // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
  939. // how response times reacts, to it always requests one more than the minimum (i.e. min 2).
  940. if fails > 2 {
  941. peer.log.Trace("Data delivery timed out", "type", kind)
  942. setIdle(peer, 0)
  943. } else {
  944. peer.log.Debug("Stalling delivery, dropping", "type", kind)
  945. d.dropPeer(pid)
  946. }
  947. }
  948. }
  949. // If there's nothing more to fetch, wait or terminate
  950. if pending() == 0 {
  951. if !inFlight() && finished {
  952. log.Debug("Data fetching completed", "type", kind)
  953. return nil
  954. }
  955. break
  956. }
  957. // Send a download request to all idle peers, until throttled
  958. progressed, throttled, running := false, false, inFlight()
  959. idles, total := idle()
  960. for _, peer := range idles {
  961. // Short circuit if throttling activated
  962. if throttle() {
  963. throttled = true
  964. break
  965. }
  966. // Reserve a chunk of fetches for a peer. A nil can mean either that
  967. // no more headers are available, or that the peer is known not to
  968. // have them.
  969. request, progress, err := reserve(peer, capacity(peer))
  970. if err != nil {
  971. return err
  972. }
  973. if progress {
  974. progressed = true
  975. }
  976. if request == nil {
  977. continue
  978. }
  979. if request.From > 0 {
  980. peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
  981. } else if len(request.Headers) > 0 {
  982. peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
  983. } else {
  984. peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes))
  985. }
  986. // Fetch the chunk and make sure any errors return the hashes to the queue
  987. if fetchHook != nil {
  988. fetchHook(request.Headers)
  989. }
  990. if err := fetch(peer, request); err != nil {
  991. // Although we could try and make an attempt to fix this, this error really
  992. // means that we've double allocated a fetch task to a peer. If that is the
  993. // case, the internal state of the downloader and the queue is very wrong so
  994. // better hard crash and note the error instead of silently accumulating into
  995. // a much bigger issue.
  996. panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
  997. }
  998. running = true
  999. }
  1000. // Make sure that we have peers available for fetching. If all peers have been tried
  1001. // and all failed throw an error
  1002. if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
  1003. return errPeersUnavailable
  1004. }
  1005. }
  1006. }
  1007. }
  1008. // processHeaders takes batches of retrieved headers from an input channel and
  1009. // keeps processing and scheduling them into the header chain and downloader's
  1010. // queue until the stream ends or a failure occurs.
  1011. func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
  1012. // Calculate the pivoting point for switching from fast to slow sync
  1013. pivot := d.queue.FastSyncPivot()
  1014. // Keep a count of uncertain headers to roll back
  1015. rollback := []*types.Header{}
  1016. defer func() {
  1017. if len(rollback) > 0 {
  1018. // Flatten the headers and roll them back
  1019. hashes := make([]common.Hash, len(rollback))
  1020. for i, header := range rollback {
  1021. hashes[i] = header.Hash()
  1022. }
  1023. lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0
  1024. if d.headFastBlock != nil {
  1025. lastFastBlock = d.headFastBlock().Number()
  1026. }
  1027. if d.headBlock != nil {
  1028. lastBlock = d.headBlock().Number()
  1029. }
  1030. d.rollback(hashes)
  1031. curFastBlock, curBlock := common.Big0, common.Big0
  1032. if d.headFastBlock != nil {
  1033. curFastBlock = d.headFastBlock().Number()
  1034. }
  1035. if d.headBlock != nil {
  1036. curBlock = d.headBlock().Number()
  1037. }
  1038. log.Warn("Rolled back headers", "count", len(hashes),
  1039. "header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number),
  1040. "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
  1041. "block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
  1042. // If we're already past the pivot point, this could be an attack, thread carefully
  1043. if rollback[len(rollback)-1].Number.Uint64() > pivot {
  1044. // If we didn't ever fail, lock in te pivot header (must! not! change!)
  1045. if atomic.LoadUint32(&d.fsPivotFails) == 0 {
  1046. for _, header := range rollback {
  1047. if header.Number.Uint64() == pivot {
  1048. log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash())
  1049. d.fsPivotLock = header
  1050. }
  1051. }
  1052. }
  1053. }
  1054. }
  1055. }()
  1056. // Wait for batches of headers to process
  1057. gotHeaders := false
  1058. for {
  1059. select {
  1060. case <-d.cancelCh:
  1061. return errCancelHeaderProcessing
  1062. case headers := <-d.headerProcCh:
  1063. // Terminate header processing if we synced up
  1064. if len(headers) == 0 {
  1065. // Notify everyone that headers are fully processed
  1066. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
  1067. select {
  1068. case ch <- false:
  1069. case <-d.cancelCh:
  1070. }
  1071. }
  1072. // If no headers were retrieved at all, the peer violated it's TD promise that it had a
  1073. // better chain compared to ours. The only exception is if it's promised blocks were
  1074. // already imported by other means (e.g. fecher):
  1075. //
  1076. // R <remote peer>, L <local node>: Both at block 10
  1077. // R: Mine block 11, and propagate it to L
  1078. // L: Queue block 11 for import
  1079. // L: Notice that R's head and TD increased compared to ours, start sync
  1080. // L: Import of block 11 finishes
  1081. // L: Sync begins, and finds common ancestor at 11
  1082. // L: Request new headers up from 11 (R's TD was higher, it must have something)
  1083. // R: Nothing to give
  1084. if d.mode != LightSync {
  1085. if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  1086. return errStallingPeer
  1087. }
  1088. }
  1089. // If fast or light syncing, ensure promised headers are indeed delivered. This is
  1090. // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
  1091. // of delivering the post-pivot blocks that would flag the invalid content.
  1092. //
  1093. // This check cannot be executed "as is" for full imports, since blocks may still be
  1094. // queued for processing when the header download completes. However, as long as the
  1095. // peer gave us something useful, we're already happy/progressed (above check).
  1096. if d.mode == FastSync || d.mode == LightSync {
  1097. if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
  1098. return errStallingPeer
  1099. }
  1100. }
  1101. // Disable any rollback and return
  1102. rollback = nil
  1103. return nil
  1104. }
  1105. // Otherwise split the chunk of headers into batches and process them
  1106. gotHeaders = true
  1107. for len(headers) > 0 {
  1108. // Terminate if something failed in between processing chunks
  1109. select {
  1110. case <-d.cancelCh:
  1111. return errCancelHeaderProcessing
  1112. default:
  1113. }
  1114. // Select the next chunk of headers to import
  1115. limit := maxHeadersProcess
  1116. if limit > len(headers) {
  1117. limit = len(headers)
  1118. }
  1119. chunk := headers[:limit]
  1120. // In case of header only syncing, validate the chunk immediately
  1121. if d.mode == FastSync || d.mode == LightSync {
  1122. // Collect the yet unknown headers to mark them as uncertain
  1123. unknown := make([]*types.Header, 0, len(headers))
  1124. for _, header := range chunk {
  1125. if !d.hasHeader(header.Hash()) {
  1126. unknown = append(unknown, header)
  1127. }
  1128. }
  1129. // If we're importing pure headers, verify based on their recentness
  1130. frequency := fsHeaderCheckFrequency
  1131. if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
  1132. frequency = 1
  1133. }
  1134. if n, err := d.insertHeaders(chunk, frequency); err != nil {
  1135. // If some headers were inserted, add them too to the rollback list
  1136. if n > 0 {
  1137. rollback = append(rollback, chunk[:n]...)
  1138. }
  1139. log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
  1140. return errInvalidChain
  1141. }
  1142. // All verifications passed, store newly found uncertain headers
  1143. rollback = append(rollback, unknown...)
  1144. if len(rollback) > fsHeaderSafetyNet {
  1145. rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
  1146. }
  1147. }
  1148. // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
  1149. if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
  1150. if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
  1151. log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash(), "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash())
  1152. return errInvalidChain
  1153. }
  1154. }
  1155. // Unless we're doing light chains, schedule the headers for associated content retrieval
  1156. if d.mode == FullSync || d.mode == FastSync {
  1157. // If we've reached the allowed number of pending headers, stall a bit
  1158. for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
  1159. select {
  1160. case <-d.cancelCh:
  1161. return errCancelHeaderProcessing
  1162. case <-time.After(time.Second):
  1163. }
  1164. }
  1165. // Otherwise insert the headers for content retrieval
  1166. inserts := d.queue.Schedule(chunk, origin)
  1167. if len(inserts) != len(chunk) {
  1168. log.Debug("Stale headers")
  1169. return errBadPeer
  1170. }
  1171. }
  1172. headers = headers[limit:]
  1173. origin += uint64(limit)
  1174. }
  1175. // Signal the content downloaders of the availablility of new tasks
  1176. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
  1177. select {
  1178. case ch <- true:
  1179. default:
  1180. }
  1181. }
  1182. }
  1183. }
  1184. }
  1185. // processFullSyncContent takes fetch results from the queue and imports them into the chain.
  1186. func (d *Downloader) processFullSyncContent() error {
  1187. for {
  1188. results := d.queue.WaitResults()
  1189. if len(results) == 0 {
  1190. return nil
  1191. }
  1192. if d.chainInsertHook != nil {
  1193. d.chainInsertHook(results)
  1194. }
  1195. if err := d.importBlockResults(results); err != nil {
  1196. return err
  1197. }
  1198. }
  1199. }
  1200. func (d *Downloader) importBlockResults(results []*fetchResult) error {
  1201. for len(results) != 0 {
  1202. // Check for any termination requests. This makes clean shutdown faster.
  1203. select {
  1204. case <-d.quitCh:
  1205. return errCancelContentProcessing
  1206. default:
  1207. }
  1208. // Retrieve the a batch of results to import
  1209. items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
  1210. first, last := results[0].Header, results[items-1].Header
  1211. log.Debug("Inserting downloaded chain", "items", len(results),
  1212. "firstnum", first.Number, "firsthash", first.Hash(),
  1213. "lastnum", last.Number, "lasthash", last.Hash(),
  1214. )
  1215. blocks := make([]*types.Block, items)
  1216. for i, result := range results[:items] {
  1217. blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
  1218. }
  1219. if index, err := d.insertBlocks(blocks); err != nil {
  1220. log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
  1221. return errInvalidChain
  1222. }
  1223. // Shift the results to the next batch
  1224. results = results[items:]
  1225. }
  1226. return nil
  1227. }
  1228. // processFastSyncContent takes fetch results from the queue and writes them to the
  1229. // database. It also controls the synchronisation of state nodes of the pivot block.
  1230. func (d *Downloader) processFastSyncContent(latest *types.Header) error {
  1231. // Start syncing state of the reported head block.
  1232. // This should get us most of the state of the pivot block.
  1233. stateSync := d.syncState(latest.Root)
  1234. defer stateSync.Cancel()
  1235. go func() {
  1236. if err := stateSync.Wait(); err != nil {
  1237. d.queue.Close() // wake up WaitResults
  1238. }
  1239. }()
  1240. pivot := d.queue.FastSyncPivot()
  1241. for {
  1242. results := d.queue.WaitResults()
  1243. if len(results) == 0 {
  1244. return stateSync.Cancel()
  1245. }
  1246. if d.chainInsertHook != nil {
  1247. d.chainInsertHook(results)
  1248. }
  1249. P, beforeP, afterP := splitAroundPivot(pivot, results)
  1250. if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
  1251. return err
  1252. }
  1253. if P != nil {
  1254. stateSync.Cancel()
  1255. if err := d.commitPivotBlock(P); err != nil {
  1256. return err
  1257. }
  1258. }
  1259. if err := d.importBlockResults(afterP); err != nil {
  1260. return err
  1261. }
  1262. }
  1263. }
  1264. func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
  1265. for _, result := range results {
  1266. num := result.Header.Number.Uint64()
  1267. switch {
  1268. case num < pivot:
  1269. before = append(before, result)
  1270. case num == pivot:
  1271. p = result
  1272. default:
  1273. after = append(after, result)
  1274. }
  1275. }
  1276. return p, before, after
  1277. }
  1278. func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
  1279. for len(results) != 0 {
  1280. // Check for any termination requests.
  1281. select {
  1282. case <-d.quitCh:
  1283. return errCancelContentProcessing
  1284. case <-stateSync.done:
  1285. if err := stateSync.Wait(); err != nil {
  1286. return err
  1287. }
  1288. default:
  1289. }
  1290. // Retrieve the a batch of results to import
  1291. items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
  1292. first, last := results[0].Header, results[items-1].Header
  1293. log.Debug("Inserting fast-sync blocks", "items", len(results),
  1294. "firstnum", first.Number, "firsthash", first.Hash(),
  1295. "lastnumn", last.Number, "lasthash", last.Hash(),
  1296. )
  1297. blocks := make([]*types.Block, items)
  1298. receipts := make([]types.Receipts, items)
  1299. for i, result := range results[:items] {
  1300. blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
  1301. receipts[i] = result.Receipts
  1302. }
  1303. if index, err := d.insertReceipts(blocks, receipts); err != nil {
  1304. log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
  1305. return errInvalidChain
  1306. }
  1307. // Shift the results to the next batch
  1308. results = results[items:]
  1309. }
  1310. return nil
  1311. }
  1312. func (d *Downloader) commitPivotBlock(result *fetchResult) error {
  1313. b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
  1314. // Sync the pivot block state. This should complete reasonably quickly because
  1315. // we've already synced up to the reported head block state earlier.
  1316. if err := d.syncState(b.Root()).Wait(); err != nil {
  1317. return err
  1318. }
  1319. log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
  1320. if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
  1321. return err
  1322. }
  1323. return d.commitHeadBlock(b.Hash())
  1324. }
  1325. // DeliverHeaders injects a new batch of block headers received from a remote
  1326. // node into the download schedule.
  1327. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
  1328. return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
  1329. }
  1330. // DeliverBodies injects a new batch of block bodies received from a remote node.
  1331. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
  1332. return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
  1333. }
  1334. // DeliverReceipts injects a new batch of receipts received from a remote node.
  1335. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
  1336. return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
  1337. }
  1338. // DeliverNodeData injects a new batch of node state data received from a remote node.
  1339. func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
  1340. return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
  1341. }
  1342. // deliver injects a new batch of data received from a remote node.
  1343. func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  1344. // Update the delivery metrics for both good and failed deliveries
  1345. inMeter.Mark(int64(packet.Items()))
  1346. defer func() {
  1347. if err != nil {
  1348. dropMeter.Mark(int64(packet.Items()))
  1349. }
  1350. }()
  1351. // Deliver or abort if the sync is canceled while queuing
  1352. d.cancelLock.RLock()
  1353. cancel := d.cancelCh
  1354. d.cancelLock.RUnlock()
  1355. if cancel == nil {
  1356. return errNoSyncActive
  1357. }
  1358. select {
  1359. case destCh <- packet:
  1360. return nil
  1361. case <-cancel:
  1362. return errNoSyncActive
  1363. }
  1364. }
  1365. // qosTuner is the quality of service tuning loop that occasionally gathers the
  1366. // peer latency statistics and updates the estimated request round trip time.
  1367. func (d *Downloader) qosTuner() {
  1368. for {
  1369. // Retrieve the current median RTT and integrate into the previoust target RTT
  1370. rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
  1371. atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
  1372. // A new RTT cycle passed, increase our confidence in the estimated RTT
  1373. conf := atomic.LoadUint64(&d.rttConfidence)
  1374. conf = conf + (1000000-conf)/2
  1375. atomic.StoreUint64(&d.rttConfidence, conf)
  1376. // Log the new QoS values and sleep until the next RTT
  1377. log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
  1378. select {
  1379. case <-d.quitCh:
  1380. return
  1381. case <-time.After(rtt):
  1382. }
  1383. }
  1384. }
  1385. // qosReduceConfidence is meant to be called when a new peer joins the downloader's
  1386. // peer set, needing to reduce the confidence we have in out QoS estimates.
  1387. func (d *Downloader) qosReduceConfidence() {
  1388. // If we have a single peer, confidence is always 1
  1389. peers := uint64(d.peers.Len())
  1390. if peers == 0 {
  1391. // Ensure peer connectivity races don't catch us off guard
  1392. return
  1393. }
  1394. if peers == 1 {
  1395. atomic.StoreUint64(&d.rttConfidence, 1000000)
  1396. return
  1397. }
  1398. // If we have a ton of peers, don't drop confidence)
  1399. if peers >= uint64(qosConfidenceCap) {
  1400. return
  1401. }
  1402. // Otherwise drop the confidence factor
  1403. conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
  1404. if float64(conf)/1000000 < rttMinConfidence {
  1405. conf = uint64(rttMinConfidence * 1000000)
  1406. }
  1407. atomic.StoreUint64(&d.rttConfidence, conf)
  1408. rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1409. log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
  1410. }
  1411. // requestRTT returns the current target round trip time for a download request
  1412. // to complete in.
  1413. //
  1414. // Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
  1415. // the downloader tries to adapt queries to the RTT, so multiple RTT values can
  1416. // be adapted to, but smaller ones are preffered (stabler download stream).
  1417. func (d *Downloader) requestRTT() time.Duration {
  1418. return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
  1419. }
  1420. // requestTTL returns the current timeout allowance for a single download request
  1421. // to finish under.
  1422. func (d *Downloader) requestTTL() time.Duration {
  1423. var (
  1424. rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1425. conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
  1426. )
  1427. ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
  1428. if ttl > ttlLimit {
  1429. ttl = ttlLimit
  1430. }
  1431. return ttl
  1432. }