downloader.go 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package downloader contains the manual full chain synchronisation.
  17. package downloader
  18. import (
  19. "crypto/rand"
  20. "errors"
  21. "fmt"
  22. "math"
  23. "math/big"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. ethereum "github.com/ethereum/go-ethereum"
  29. "github.com/ethereum/go-ethereum/common"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/logger"
  34. "github.com/ethereum/go-ethereum/logger/glog"
  35. "github.com/ethereum/go-ethereum/params"
  36. "github.com/ethereum/go-ethereum/trie"
  37. "github.com/rcrowley/go-metrics"
  38. )
  39. var (
  40. MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
  41. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  42. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  43. MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
  44. MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
  45. MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
  46. MaxStateFetch = 384 // Amount of node state values to allow fetching per request
  47. MaxForkAncestry = 3 * params.EpochDuration.Uint64() // Maximum chain reorganisation
  48. rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
  49. rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
  50. rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
  51. ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
  52. ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
  53. qosTuningPeers = 5 // Number of peers to tune based on (best peers)
  54. qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
  55. qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
  56. maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
  57. maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
  58. maxResultsProcess = 2048 // Number of content download results to import at once into the chain
  59. fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
  60. fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
  61. fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
  62. fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
  63. fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
  64. fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
  65. )
  66. var (
  67. errBusy = errors.New("busy")
  68. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  69. errBadPeer = errors.New("action from bad peer ignored")
  70. errStallingPeer = errors.New("peer is stalling")
  71. errNoPeers = errors.New("no peers to keep download active")
  72. errTimeout = errors.New("timeout")
  73. errEmptyHeaderSet = errors.New("empty header set by peer")
  74. errPeersUnavailable = errors.New("no peers available or all tried for download")
  75. errInvalidAncestor = errors.New("retrieved ancestor is invalid")
  76. errInvalidChain = errors.New("retrieved hash chain is invalid")
  77. errInvalidBlock = errors.New("retrieved block is invalid")
  78. errInvalidBody = errors.New("retrieved block body is invalid")
  79. errInvalidReceipt = errors.New("retrieved receipt is invalid")
  80. errCancelBlockFetch = errors.New("block download canceled (requested)")
  81. errCancelHeaderFetch = errors.New("block header download canceled (requested)")
  82. errCancelBodyFetch = errors.New("block body download canceled (requested)")
  83. errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
  84. errCancelStateFetch = errors.New("state data download canceled (requested)")
  85. errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
  86. errCancelContentProcessing = errors.New("content processing canceled (requested)")
  87. errNoSyncActive = errors.New("no sync active")
  88. errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
  89. )
  90. type Downloader struct {
  91. mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
  92. mux *event.TypeMux // Event multiplexer to announce sync operation events
  93. queue *queue // Scheduler for selecting the hashes to download
  94. peers *peerSet // Set of active peers from which download can proceed
  95. fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
  96. fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
  97. rttEstimate uint64 // Round trip time to target for download requests
  98. rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
  99. // Statistics
  100. syncStatsChainOrigin uint64 // Origin block number where syncing started at
  101. syncStatsChainHeight uint64 // Highest block number known when syncing started
  102. syncStatsStateDone uint64 // Number of state trie entries already pulled
  103. syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
  104. // Callbacks
  105. hasHeader headerCheckFn // Checks if a header is present in the chain
  106. hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
  107. getHeader headerRetrievalFn // Retrieves a header from the chain
  108. getBlock blockRetrievalFn // Retrieves a block from the chain
  109. headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
  110. headBlock headBlockRetrievalFn // Retrieves the head block from the chain
  111. headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
  112. commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
  113. getTd tdRetrievalFn // Retrieves the TD of a block from the chain
  114. insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
  115. insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
  116. insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
  117. rollback chainRollbackFn // Removes a batch of recently added chain links
  118. dropPeer peerDropFn // Drops a peer for misbehaving
  119. // Status
  120. synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  121. synchronising int32
  122. notified int32
  123. // Channels
  124. newPeerCh chan *peer
  125. headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
  126. bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
  127. receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
  128. stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
  129. bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
  130. receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
  131. stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
  132. headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
  133. // Cancellation and termination
  134. cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
  135. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  136. cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
  137. quitCh chan struct{} // Quit channel to signal termination
  138. quitLock sync.RWMutex // Lock to prevent double closes
  139. // Testing hooks
  140. syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
  141. bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
  142. receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
  143. chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
  144. }
  145. // New creates a new downloader to fetch hashes and blocks from remote peers.
  146. func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
  147. getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
  148. headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
  149. insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
  150. dl := &Downloader{
  151. mode: FullSync,
  152. mux: mux,
  153. queue: newQueue(stateDb),
  154. peers: newPeerSet(),
  155. rttEstimate: uint64(rttMaxEstimate),
  156. rttConfidence: uint64(1000000),
  157. hasHeader: hasHeader,
  158. hasBlockAndState: hasBlockAndState,
  159. getHeader: getHeader,
  160. getBlock: getBlock,
  161. headHeader: headHeader,
  162. headBlock: headBlock,
  163. headFastBlock: headFastBlock,
  164. commitHeadBlock: commitHeadBlock,
  165. getTd: getTd,
  166. insertHeaders: insertHeaders,
  167. insertBlocks: insertBlocks,
  168. insertReceipts: insertReceipts,
  169. rollback: rollback,
  170. dropPeer: dropPeer,
  171. newPeerCh: make(chan *peer, 1),
  172. headerCh: make(chan dataPack, 1),
  173. bodyCh: make(chan dataPack, 1),
  174. receiptCh: make(chan dataPack, 1),
  175. stateCh: make(chan dataPack, 1),
  176. bodyWakeCh: make(chan bool, 1),
  177. receiptWakeCh: make(chan bool, 1),
  178. stateWakeCh: make(chan bool, 1),
  179. headerProcCh: make(chan []*types.Header, 1),
  180. quitCh: make(chan struct{}),
  181. }
  182. go dl.qosTuner()
  183. return dl
  184. }
  185. // Progress retrieves the synchronisation boundaries, specifically the origin
  186. // block where synchronisation started at (may have failed/suspended); the block
  187. // or header sync is currently at; and the latest known block which the sync targets.
  188. //
  189. // In addition, during the state download phase of fast synchronisation the number
  190. // of processed and the total number of known states are also returned. Otherwise
  191. // these are zero.
  192. func (d *Downloader) Progress() ethereum.SyncProgress {
  193. // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
  194. pendingStates := uint64(d.queue.PendingNodeData())
  195. // Lock the current stats and return the progress
  196. d.syncStatsLock.RLock()
  197. defer d.syncStatsLock.RUnlock()
  198. current := uint64(0)
  199. switch d.mode {
  200. case FullSync:
  201. current = d.headBlock().NumberU64()
  202. case FastSync:
  203. current = d.headFastBlock().NumberU64()
  204. case LightSync:
  205. current = d.headHeader().Number.Uint64()
  206. }
  207. return ethereum.SyncProgress{
  208. StartingBlock: d.syncStatsChainOrigin,
  209. CurrentBlock: current,
  210. HighestBlock: d.syncStatsChainHeight,
  211. PulledStates: d.syncStatsStateDone,
  212. KnownStates: d.syncStatsStateDone + pendingStates,
  213. }
  214. }
  215. // Synchronising returns whether the downloader is currently retrieving blocks.
  216. func (d *Downloader) Synchronising() bool {
  217. return atomic.LoadInt32(&d.synchronising) > 0
  218. }
  219. // RegisterPeer injects a new download peer into the set of block source to be
  220. // used for fetching hashes and blocks from.
  221. func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
  222. getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
  223. getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
  224. glog.V(logger.Detail).Infoln("Registering peer", id)
  225. if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
  226. glog.V(logger.Error).Infoln("Register failed:", err)
  227. return err
  228. }
  229. d.qosReduceConfidence()
  230. return nil
  231. }
  232. // UnregisterPeer remove a peer from the known list, preventing any action from
  233. // the specified peer. An effort is also made to return any pending fetches into
  234. // the queue.
  235. func (d *Downloader) UnregisterPeer(id string) error {
  236. // Unregister the peer from the active peer set and revoke any fetch tasks
  237. glog.V(logger.Detail).Infoln("Unregistering peer", id)
  238. if err := d.peers.Unregister(id); err != nil {
  239. glog.V(logger.Error).Infoln("Unregister failed:", err)
  240. return err
  241. }
  242. d.queue.Revoke(id)
  243. // If this peer was the master peer, abort sync immediately
  244. d.cancelLock.RLock()
  245. master := id == d.cancelPeer
  246. d.cancelLock.RUnlock()
  247. if master {
  248. d.cancel()
  249. }
  250. return nil
  251. }
  252. // Synchronise tries to sync up our local block chain with a remote peer, both
  253. // adding various sanity checks as well as wrapping it with various log entries.
  254. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
  255. glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
  256. err := d.synchronise(id, head, td, mode)
  257. switch err {
  258. case nil:
  259. glog.V(logger.Detail).Infof("Synchronisation completed")
  260. case errBusy:
  261. glog.V(logger.Detail).Infof("Synchronisation already in progress")
  262. case errTimeout, errBadPeer, errStallingPeer,
  263. errEmptyHeaderSet, errPeersUnavailable, errTooOld,
  264. errInvalidAncestor, errInvalidChain:
  265. glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
  266. d.dropPeer(id)
  267. default:
  268. glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
  269. }
  270. return err
  271. }
  272. // synchronise will select the peer and use it for synchronising. If an empty string is given
  273. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  274. // checks fail an error will be returned. This method is synchronous
  275. func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  276. // Mock out the synchronisation if testing
  277. if d.synchroniseMock != nil {
  278. return d.synchroniseMock(id, hash)
  279. }
  280. // Make sure only one goroutine is ever allowed past this point at once
  281. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  282. return errBusy
  283. }
  284. defer atomic.StoreInt32(&d.synchronising, 0)
  285. // Post a user notification of the sync (only once per session)
  286. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  287. glog.V(logger.Info).Infoln("Block synchronisation started")
  288. }
  289. // Reset the queue, peer set and wake channels to clean any internal leftover state
  290. d.queue.Reset()
  291. d.peers.Reset()
  292. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  293. select {
  294. case <-ch:
  295. default:
  296. }
  297. }
  298. for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
  299. for empty := false; !empty; {
  300. select {
  301. case <-ch:
  302. default:
  303. empty = true
  304. }
  305. }
  306. }
  307. for empty := false; !empty; {
  308. select {
  309. case <-d.headerProcCh:
  310. default:
  311. empty = true
  312. }
  313. }
  314. // Create cancel channel for aborting mid-flight and mark the master peer
  315. d.cancelLock.Lock()
  316. d.cancelCh = make(chan struct{})
  317. d.cancelPeer = id
  318. d.cancelLock.Unlock()
  319. defer d.cancel() // No matter what, we can't leave the cancel channel open
  320. // Set the requested sync mode, unless it's forbidden
  321. d.mode = mode
  322. if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
  323. d.mode = FullSync
  324. }
  325. // Retrieve the origin peer and initiate the downloading process
  326. p := d.peers.Peer(id)
  327. if p == nil {
  328. return errUnknownPeer
  329. }
  330. return d.syncWithPeer(p, hash, td)
  331. }
  332. // syncWithPeer starts a block synchronization based on the hash chain from the
  333. // specified peer and head hash.
  334. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
  335. d.mux.Post(StartEvent{})
  336. defer func() {
  337. // reset on error
  338. if err != nil {
  339. d.mux.Post(FailedEvent{err})
  340. } else {
  341. d.mux.Post(DoneEvent{})
  342. }
  343. }()
  344. if p.version < 62 {
  345. return errTooOld
  346. }
  347. glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
  348. defer func(start time.Time) {
  349. glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
  350. }(time.Now())
  351. // Look up the sync boundaries: the common ancestor and the target block
  352. latest, err := d.fetchHeight(p)
  353. if err != nil {
  354. return err
  355. }
  356. height := latest.Number.Uint64()
  357. origin, err := d.findAncestor(p, height)
  358. if err != nil {
  359. return err
  360. }
  361. d.syncStatsLock.Lock()
  362. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  363. d.syncStatsChainOrigin = origin
  364. }
  365. d.syncStatsChainHeight = height
  366. d.syncStatsLock.Unlock()
  367. // Initiate the sync using a concurrent header and content retrieval algorithm
  368. pivot := uint64(0)
  369. switch d.mode {
  370. case LightSync:
  371. pivot = height
  372. case FastSync:
  373. // Calculate the new fast/slow sync pivot point
  374. if d.fsPivotLock == nil {
  375. pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
  376. if err != nil {
  377. panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
  378. }
  379. if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
  380. pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
  381. }
  382. } else {
  383. // Pivot point locked in, use this and do not pick a new one!
  384. pivot = d.fsPivotLock.Number.Uint64()
  385. }
  386. // If the point is below the origin, move origin back to ensure state download
  387. if pivot < origin {
  388. if pivot > 0 {
  389. origin = pivot - 1
  390. } else {
  391. origin = 0
  392. }
  393. }
  394. glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
  395. }
  396. d.queue.Prepare(origin+1, d.mode, pivot, latest)
  397. if d.syncInitHook != nil {
  398. d.syncInitHook(origin, height)
  399. }
  400. return d.spawnSync(origin+1,
  401. func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
  402. func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
  403. func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
  404. func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
  405. func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
  406. )
  407. }
  408. // spawnSync runs d.process and all given fetcher functions to completion in
  409. // separate goroutines, returning the first error that appears.
  410. func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
  411. var wg sync.WaitGroup
  412. errc := make(chan error, len(fetchers)+1)
  413. wg.Add(len(fetchers) + 1)
  414. go func() { defer wg.Done(); errc <- d.processContent() }()
  415. for _, fn := range fetchers {
  416. fn := fn
  417. go func() { defer wg.Done(); errc <- fn() }()
  418. }
  419. // Wait for the first error, then terminate the others.
  420. var err error
  421. for i := 0; i < len(fetchers)+1; i++ {
  422. if i == len(fetchers) {
  423. // Close the queue when all fetchers have exited.
  424. // This will cause the block processor to end when
  425. // it has processed the queue.
  426. d.queue.Close()
  427. }
  428. if err = <-errc; err != nil {
  429. break
  430. }
  431. }
  432. d.queue.Close()
  433. d.cancel()
  434. wg.Wait()
  435. return err
  436. }
  437. // cancel cancels all of the operations and resets the queue. It returns true
  438. // if the cancel operation was completed.
  439. func (d *Downloader) cancel() {
  440. // Close the current cancel channel
  441. d.cancelLock.Lock()
  442. if d.cancelCh != nil {
  443. select {
  444. case <-d.cancelCh:
  445. // Channel was already closed
  446. default:
  447. close(d.cancelCh)
  448. }
  449. }
  450. d.cancelLock.Unlock()
  451. }
  452. // Terminate interrupts the downloader, canceling all pending operations.
  453. // The downloader cannot be reused after calling Terminate.
  454. func (d *Downloader) Terminate() {
  455. // Close the termination channel (make sure double close is allowed)
  456. d.quitLock.Lock()
  457. select {
  458. case <-d.quitCh:
  459. default:
  460. close(d.quitCh)
  461. }
  462. d.quitLock.Unlock()
  463. // Cancel any pending download requests
  464. d.cancel()
  465. }
  466. // fetchHeight retrieves the head header of the remote peer to aid in estimating
  467. // the total time a pending synchronisation would take.
  468. func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
  469. glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
  470. // Request the advertised remote head block and wait for the response
  471. head, _ := p.currentHead()
  472. go p.getRelHeaders(head, 1, 0, false)
  473. timeout := time.After(d.requestTTL())
  474. for {
  475. select {
  476. case <-d.cancelCh:
  477. return nil, errCancelBlockFetch
  478. case packet := <-d.headerCh:
  479. // Discard anything not from the origin peer
  480. if packet.PeerId() != p.id {
  481. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  482. break
  483. }
  484. // Make sure the peer actually gave something valid
  485. headers := packet.(*headerPack).headers
  486. if len(headers) != 1 {
  487. glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
  488. return nil, errBadPeer
  489. }
  490. return headers[0], nil
  491. case <-timeout:
  492. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  493. return nil, errTimeout
  494. case <-d.bodyCh:
  495. case <-d.stateCh:
  496. case <-d.receiptCh:
  497. // Out of bounds delivery, ignore
  498. }
  499. }
  500. }
  501. // findAncestor tries to locate the common ancestor link of the local chain and
  502. // a remote peers blockchain. In the general case when our node was in sync and
  503. // on the correct chain, checking the top N links should already get us a match.
  504. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  505. // the head links match), we do a binary search to find the common ancestor.
  506. func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
  507. glog.V(logger.Debug).Infof("%v: looking for common ancestor (remote height %d)", p, height)
  508. // Figure out the valid ancestor range to prevent rewrite attacks
  509. floor, ceil := int64(-1), d.headHeader().Number.Uint64()
  510. if d.mode == FullSync {
  511. ceil = d.headBlock().NumberU64()
  512. } else if d.mode == FastSync {
  513. ceil = d.headFastBlock().NumberU64()
  514. }
  515. if ceil >= MaxForkAncestry {
  516. floor = int64(ceil - MaxForkAncestry)
  517. }
  518. // Request the topmost blocks to short circuit binary ancestor lookup
  519. head := ceil
  520. if head > height {
  521. head = height
  522. }
  523. from := int64(head) - int64(MaxHeaderFetch)
  524. if from < 0 {
  525. from = 0
  526. }
  527. // Span out with 15 block gaps into the future to catch bad head reports
  528. limit := 2 * MaxHeaderFetch / 16
  529. count := 1 + int((int64(ceil)-from)/16)
  530. if count > limit {
  531. count = limit
  532. }
  533. go p.getAbsHeaders(uint64(from), count, 15, false)
  534. // Wait for the remote response to the head fetch
  535. number, hash := uint64(0), common.Hash{}
  536. timeout := time.After(d.requestTTL())
  537. for finished := false; !finished; {
  538. select {
  539. case <-d.cancelCh:
  540. return 0, errCancelHeaderFetch
  541. case packet := <-d.headerCh:
  542. // Discard anything not from the origin peer
  543. if packet.PeerId() != p.id {
  544. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  545. break
  546. }
  547. // Make sure the peer actually gave something valid
  548. headers := packet.(*headerPack).headers
  549. if len(headers) == 0 {
  550. glog.V(logger.Warn).Infof("%v: empty head header set", p)
  551. return 0, errEmptyHeaderSet
  552. }
  553. // Make sure the peer's reply conforms to the request
  554. for i := 0; i < len(headers); i++ {
  555. if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
  556. glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i)*16, number)
  557. return 0, errInvalidChain
  558. }
  559. }
  560. // Check if a common ancestor was found
  561. finished = true
  562. for i := len(headers) - 1; i >= 0; i-- {
  563. // Skip any headers that underflow/overflow our requested set
  564. if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
  565. continue
  566. }
  567. // Otherwise check if we already know the header or not
  568. if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
  569. number, hash = headers[i].Number.Uint64(), headers[i].Hash()
  570. // If every header is known, even future ones, the peer straight out lied about its head
  571. if number > height && i == limit-1 {
  572. glog.V(logger.Warn).Infof("%v: lied about chain head: reported %d, found above %d", p, height, number)
  573. return 0, errStallingPeer
  574. }
  575. break
  576. }
  577. }
  578. case <-timeout:
  579. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  580. return 0, errTimeout
  581. case <-d.bodyCh:
  582. case <-d.stateCh:
  583. case <-d.receiptCh:
  584. // Out of bounds delivery, ignore
  585. }
  586. }
  587. // If the head fetch already found an ancestor, return
  588. if !common.EmptyHash(hash) {
  589. if int64(number) <= floor {
  590. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
  591. return 0, errInvalidAncestor
  592. }
  593. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
  594. return number, nil
  595. }
  596. // Ancestor not found, we need to binary search over our chain
  597. start, end := uint64(0), head
  598. if floor > 0 {
  599. start = uint64(floor)
  600. }
  601. for start+1 < end {
  602. // Split our chain interval in two, and request the hash to cross check
  603. check := (start + end) / 2
  604. timeout := time.After(d.requestTTL())
  605. go p.getAbsHeaders(uint64(check), 1, 0, false)
  606. // Wait until a reply arrives to this request
  607. for arrived := false; !arrived; {
  608. select {
  609. case <-d.cancelCh:
  610. return 0, errCancelHeaderFetch
  611. case packer := <-d.headerCh:
  612. // Discard anything not from the origin peer
  613. if packer.PeerId() != p.id {
  614. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
  615. break
  616. }
  617. // Make sure the peer actually gave something valid
  618. headers := packer.(*headerPack).headers
  619. if len(headers) != 1 {
  620. glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
  621. return 0, errBadPeer
  622. }
  623. arrived = true
  624. // Modify the search interval based on the response
  625. if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
  626. end = check
  627. break
  628. }
  629. header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
  630. if header.Number.Uint64() != check {
  631. glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
  632. return 0, errBadPeer
  633. }
  634. start = check
  635. case <-timeout:
  636. glog.V(logger.Debug).Infof("%v: search header timeout", p)
  637. return 0, errTimeout
  638. case <-d.bodyCh:
  639. case <-d.stateCh:
  640. case <-d.receiptCh:
  641. // Out of bounds delivery, ignore
  642. }
  643. }
  644. }
  645. // Ensure valid ancestry and return
  646. if int64(start) <= floor {
  647. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)
  648. return 0, errInvalidAncestor
  649. }
  650. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, start, hash[:4])
  651. return start, nil
  652. }
  653. // fetchHeaders keeps retrieving headers concurrently from the number
  654. // requested, until no more are returned, potentially throttling on the way. To
  655. // facilitate concurrency but still protect against malicious nodes sending bad
  656. // headers, we construct a header chain skeleton using the "origin" peer we are
  657. // syncing with, and fill in the missing headers using anyone else. Headers from
  658. // other peers are only accepted if they map cleanly to the skeleton. If no one
  659. // can fill in the skeleton - not even the origin peer - it's assumed invalid and
  660. // the origin is dropped.
  661. func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
  662. glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
  663. defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
  664. // Create a timeout timer, and the associated header fetcher
  665. skeleton := true // Skeleton assembly phase or finishing up
  666. request := time.Now() // time of the last skeleton fetch request
  667. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  668. <-timeout.C // timeout channel should be initially empty
  669. defer timeout.Stop()
  670. getHeaders := func(from uint64) {
  671. request = time.Now()
  672. timeout.Reset(d.requestTTL())
  673. if skeleton {
  674. glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
  675. go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
  676. } else {
  677. glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
  678. go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
  679. }
  680. }
  681. // Start pulling the header chain skeleton until all is done
  682. getHeaders(from)
  683. for {
  684. select {
  685. case <-d.cancelCh:
  686. return errCancelHeaderFetch
  687. case packet := <-d.headerCh:
  688. // Make sure the active peer is giving us the skeleton headers
  689. if packet.PeerId() != p.id {
  690. glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
  691. break
  692. }
  693. headerReqTimer.UpdateSince(request)
  694. timeout.Stop()
  695. // If the skeleton's finished, pull any remaining head headers directly from the origin
  696. if packet.Items() == 0 && skeleton {
  697. skeleton = false
  698. getHeaders(from)
  699. continue
  700. }
  701. // If no more headers are inbound, notify the content fetchers and return
  702. if packet.Items() == 0 {
  703. glog.V(logger.Debug).Infof("%v: no available headers", p)
  704. select {
  705. case d.headerProcCh <- nil:
  706. return nil
  707. case <-d.cancelCh:
  708. return errCancelHeaderFetch
  709. }
  710. }
  711. headers := packet.(*headerPack).headers
  712. // If we received a skeleton batch, resolve internals concurrently
  713. if skeleton {
  714. filled, proced, err := d.fillHeaderSkeleton(from, headers)
  715. if err != nil {
  716. glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
  717. return errInvalidChain
  718. }
  719. headers = filled[proced:]
  720. from += uint64(proced)
  721. }
  722. // Insert all the new headers and fetch the next batch
  723. if len(headers) > 0 {
  724. glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
  725. select {
  726. case d.headerProcCh <- headers:
  727. case <-d.cancelCh:
  728. return errCancelHeaderFetch
  729. }
  730. from += uint64(len(headers))
  731. }
  732. getHeaders(from)
  733. case <-timeout.C:
  734. // Header retrieval timed out, consider the peer bad and drop
  735. glog.V(logger.Debug).Infof("%v: header request timed out", p)
  736. headerTimeoutMeter.Mark(1)
  737. d.dropPeer(p.id)
  738. // Finish the sync gracefully instead of dumping the gathered data though
  739. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  740. select {
  741. case ch <- false:
  742. case <-d.cancelCh:
  743. }
  744. }
  745. select {
  746. case d.headerProcCh <- nil:
  747. case <-d.cancelCh:
  748. }
  749. return errBadPeer
  750. }
  751. }
  752. }
  753. // fillHeaderSkeleton concurrently retrieves headers from all our available peers
  754. // and maps them to the provided skeleton header chain.
  755. //
  756. // Any partial results from the beginning of the skeleton is (if possible) forwarded
  757. // immediately to the header processor to keep the rest of the pipeline full even
  758. // in the case of header stalls.
  759. //
  760. // The method returs the entire filled skeleton and also the number of headers
  761. // already forwarded for processing.
  762. func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
  763. glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
  764. d.queue.ScheduleSkeleton(from, skeleton)
  765. var (
  766. deliver = func(packet dataPack) (int, error) {
  767. pack := packet.(*headerPack)
  768. return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
  769. }
  770. expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
  771. throttle = func() bool { return false }
  772. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  773. return d.queue.ReserveHeaders(p, count), false, nil
  774. }
  775. fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
  776. capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
  777. setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
  778. )
  779. err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
  780. d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
  781. nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
  782. glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
  783. filled, proced := d.queue.RetrieveHeaders()
  784. return filled, proced, err
  785. }
  786. // fetchBodies iteratively downloads the scheduled block bodies, taking any
  787. // available peers, reserving a chunk of blocks for each, waiting for delivery
  788. // and also periodically checking for timeouts.
  789. func (d *Downloader) fetchBodies(from uint64) error {
  790. glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
  791. var (
  792. deliver = func(packet dataPack) (int, error) {
  793. pack := packet.(*bodyPack)
  794. return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
  795. }
  796. expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
  797. fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
  798. capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
  799. setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
  800. )
  801. err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
  802. d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
  803. d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body")
  804. glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
  805. return err
  806. }
  807. // fetchReceipts iteratively downloads the scheduled block receipts, taking any
  808. // available peers, reserving a chunk of receipts for each, waiting for delivery
  809. // and also periodically checking for timeouts.
  810. func (d *Downloader) fetchReceipts(from uint64) error {
  811. glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
  812. var (
  813. deliver = func(packet dataPack) (int, error) {
  814. pack := packet.(*receiptPack)
  815. return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
  816. }
  817. expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
  818. fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
  819. capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
  820. setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
  821. )
  822. err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
  823. d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
  824. d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
  825. glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
  826. return err
  827. }
  828. // fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
  829. // available peers, reserving a chunk of nodes for each, waiting for delivery and
  830. // also periodically checking for timeouts.
  831. func (d *Downloader) fetchNodeData() error {
  832. glog.V(logger.Debug).Infof("Downloading node state data")
  833. var (
  834. deliver = func(packet dataPack) (int, error) {
  835. start := time.Now()
  836. return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
  837. // If the peer returned old-requested data, forgive
  838. if err == trie.ErrNotRequested {
  839. glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
  840. return
  841. }
  842. if err != nil {
  843. // If the node data processing failed, the root hash is very wrong, abort
  844. glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
  845. d.cancel()
  846. return
  847. }
  848. // Processing succeeded, notify state fetcher of continuation
  849. pending := d.queue.PendingNodeData()
  850. if pending > 0 {
  851. select {
  852. case d.stateWakeCh <- true:
  853. default:
  854. }
  855. }
  856. d.syncStatsLock.Lock()
  857. d.syncStatsStateDone += uint64(delivered)
  858. syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
  859. d.syncStatsLock.Unlock()
  860. // If real database progress was made, reset any fast-sync pivot failure
  861. if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
  862. glog.V(logger.Debug).Infof("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails))
  863. atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
  864. }
  865. // Log a message to the user and return
  866. if delivered > 0 {
  867. glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending)
  868. }
  869. })
  870. }
  871. expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
  872. throttle = func() bool { return false }
  873. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  874. return d.queue.ReserveNodeData(p, count), false, nil
  875. }
  876. fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
  877. capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
  878. setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
  879. )
  880. err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
  881. d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
  882. d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State")
  883. glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
  884. return err
  885. }
  886. // fetchParts iteratively downloads scheduled block parts, taking any available
  887. // peers, reserving a chunk of fetch requests for each, waiting for delivery and
  888. // also periodically checking for timeouts.
  889. //
  890. // As the scheduling/timeout logic mostly is the same for all downloaded data
  891. // types, this method is used by each for data gathering and is instrumented with
  892. // various callbacks to handle the slight differences between processing them.
  893. //
  894. // The instrumentation parameters:
  895. // - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
  896. // - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
  897. // - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
  898. // - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
  899. // - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
  900. // - pending: task callback for the number of requests still needing download (detect completion/non-completability)
  901. // - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
  902. // - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
  903. // - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
  904. // - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
  905. // - fetch: network callback to actually send a particular download request to a physical remote peer
  906. // - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
  907. // - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping)
  908. // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
  909. // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
  910. // - kind: textual label of the type being downloaded to display in log mesages
  911. func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
  912. expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
  913. fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
  914. idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
  915. // Create a ticker to detect expired retrieval tasks
  916. ticker := time.NewTicker(100 * time.Millisecond)
  917. defer ticker.Stop()
  918. update := make(chan struct{}, 1)
  919. // Prepare the queue and fetch block parts until the block header fetcher's done
  920. finished := false
  921. for {
  922. select {
  923. case <-d.cancelCh:
  924. return errCancel
  925. case packet := <-deliveryCh:
  926. // If the peer was previously banned and failed to deliver it's pack
  927. // in a reasonable time frame, ignore it's message.
  928. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  929. // Deliver the received chunk of data and check chain validity
  930. accepted, err := deliver(packet)
  931. if err == errInvalidChain {
  932. return err
  933. }
  934. // Unless a peer delivered something completely else than requested (usually
  935. // caused by a timed out request which came through in the end), set it to
  936. // idle. If the delivery's stale, the peer should have already been idled.
  937. if err != errStaleDelivery {
  938. setIdle(peer, accepted)
  939. }
  940. // Issue a log to the user to see what's going on
  941. switch {
  942. case err == nil && packet.Items() == 0:
  943. glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
  944. case err == nil:
  945. glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
  946. default:
  947. glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
  948. }
  949. }
  950. // Blocks assembled, try to update the progress
  951. select {
  952. case update <- struct{}{}:
  953. default:
  954. }
  955. case cont := <-wakeCh:
  956. // The header fetcher sent a continuation flag, check if it's done
  957. if !cont {
  958. finished = true
  959. }
  960. // Headers arrive, try to update the progress
  961. select {
  962. case update <- struct{}{}:
  963. default:
  964. }
  965. case <-ticker.C:
  966. // Sanity check update the progress
  967. select {
  968. case update <- struct{}{}:
  969. default:
  970. }
  971. case <-update:
  972. // Short circuit if we lost all our peers
  973. if d.peers.Len() == 0 {
  974. return errNoPeers
  975. }
  976. // Check for fetch request timeouts and demote the responsible peers
  977. for pid, fails := range expire() {
  978. if peer := d.peers.Peer(pid); peer != nil {
  979. // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
  980. // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
  981. // out that sync wise we need to get rid of the peer.
  982. //
  983. // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
  984. // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
  985. // how response times reacts, to it always requests one more than the minimum (i.e. min 2).
  986. if fails > 2 {
  987. glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
  988. setIdle(peer, 0)
  989. } else {
  990. glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
  991. d.dropPeer(pid)
  992. }
  993. }
  994. }
  995. // If there's nothing more to fetch, wait or terminate
  996. if pending() == 0 {
  997. if !inFlight() && finished {
  998. glog.V(logger.Debug).Infof("%s fetching completed", kind)
  999. return nil
  1000. }
  1001. break
  1002. }
  1003. // Send a download request to all idle peers, until throttled
  1004. progressed, throttled, running := false, false, inFlight()
  1005. idles, total := idle()
  1006. for _, peer := range idles {
  1007. // Short circuit if throttling activated
  1008. if throttle() {
  1009. throttled = true
  1010. break
  1011. }
  1012. // Reserve a chunk of fetches for a peer. A nil can mean either that
  1013. // no more headers are available, or that the peer is known not to
  1014. // have them.
  1015. request, progress, err := reserve(peer, capacity(peer))
  1016. if err != nil {
  1017. return err
  1018. }
  1019. if progress {
  1020. progressed = true
  1021. }
  1022. if request == nil {
  1023. continue
  1024. }
  1025. if glog.V(logger.Detail) {
  1026. if request.From > 0 {
  1027. glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
  1028. } else if len(request.Headers) > 0 {
  1029. glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
  1030. } else {
  1031. glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
  1032. }
  1033. }
  1034. // Fetch the chunk and make sure any errors return the hashes to the queue
  1035. if fetchHook != nil {
  1036. fetchHook(request.Headers)
  1037. }
  1038. if err := fetch(peer, request); err != nil {
  1039. // Although we could try and make an attempt to fix this, this error really
  1040. // means that we've double allocated a fetch task to a peer. If that is the
  1041. // case, the internal state of the downloader and the queue is very wrong so
  1042. // better hard crash and note the error instead of silently accumulating into
  1043. // a much bigger issue.
  1044. panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind)))
  1045. }
  1046. running = true
  1047. }
  1048. // Make sure that we have peers available for fetching. If all peers have been tried
  1049. // and all failed throw an error
  1050. if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
  1051. return errPeersUnavailable
  1052. }
  1053. }
  1054. }
  1055. }
  1056. // processHeaders takes batches of retrieved headers from an input channel and
  1057. // keeps processing and scheduling them into the header chain and downloader's
  1058. // queue until the stream ends or a failure occurs.
  1059. func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
  1060. // Calculate the pivoting point for switching from fast to slow sync
  1061. pivot := d.queue.FastSyncPivot()
  1062. // Keep a count of uncertain headers to roll back
  1063. rollback := []*types.Header{}
  1064. defer func() {
  1065. if len(rollback) > 0 {
  1066. // Flatten the headers and roll them back
  1067. hashes := make([]common.Hash, len(rollback))
  1068. for i, header := range rollback {
  1069. hashes[i] = header.Hash()
  1070. }
  1071. lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
  1072. d.rollback(hashes)
  1073. glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
  1074. len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
  1075. // If we're already past the pivot point, this could be an attack, thread carefully
  1076. if rollback[len(rollback)-1].Number.Uint64() > pivot {
  1077. // If we didn't ever fail, lock in te pivot header (must! not! change!)
  1078. if atomic.LoadUint32(&d.fsPivotFails) == 0 {
  1079. for _, header := range rollback {
  1080. if header.Number.Uint64() == pivot {
  1081. glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
  1082. d.fsPivotLock = header
  1083. }
  1084. }
  1085. }
  1086. atomic.AddUint32(&d.fsPivotFails, 1)
  1087. }
  1088. }
  1089. }()
  1090. // Wait for batches of headers to process
  1091. gotHeaders := false
  1092. for {
  1093. select {
  1094. case <-d.cancelCh:
  1095. return errCancelHeaderProcessing
  1096. case headers := <-d.headerProcCh:
  1097. // Terminate header processing if we synced up
  1098. if len(headers) == 0 {
  1099. // Notify everyone that headers are fully processed
  1100. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1101. select {
  1102. case ch <- false:
  1103. case <-d.cancelCh:
  1104. }
  1105. }
  1106. // If no headers were retrieved at all, the peer violated it's TD promise that it had a
  1107. // better chain compared to ours. The only exception is if it's promised blocks were
  1108. // already imported by other means (e.g. fecher):
  1109. //
  1110. // R <remote peer>, L <local node>: Both at block 10
  1111. // R: Mine block 11, and propagate it to L
  1112. // L: Queue block 11 for import
  1113. // L: Notice that R's head and TD increased compared to ours, start sync
  1114. // L: Import of block 11 finishes
  1115. // L: Sync begins, and finds common ancestor at 11
  1116. // L: Request new headers up from 11 (R's TD was higher, it must have something)
  1117. // R: Nothing to give
  1118. if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  1119. return errStallingPeer
  1120. }
  1121. // If fast or light syncing, ensure promised headers are indeed delivered. This is
  1122. // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
  1123. // of delivering the post-pivot blocks that would flag the invalid content.
  1124. //
  1125. // This check cannot be executed "as is" for full imports, since blocks may still be
  1126. // queued for processing when the header download completes. However, as long as the
  1127. // peer gave us something useful, we're already happy/progressed (above check).
  1128. if d.mode == FastSync || d.mode == LightSync {
  1129. if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
  1130. return errStallingPeer
  1131. }
  1132. }
  1133. // Disable any rollback and return
  1134. rollback = nil
  1135. return nil
  1136. }
  1137. // Otherwise split the chunk of headers into batches and process them
  1138. gotHeaders = true
  1139. for len(headers) > 0 {
  1140. // Terminate if something failed in between processing chunks
  1141. select {
  1142. case <-d.cancelCh:
  1143. return errCancelHeaderProcessing
  1144. default:
  1145. }
  1146. // Select the next chunk of headers to import
  1147. limit := maxHeadersProcess
  1148. if limit > len(headers) {
  1149. limit = len(headers)
  1150. }
  1151. chunk := headers[:limit]
  1152. // In case of header only syncing, validate the chunk immediately
  1153. if d.mode == FastSync || d.mode == LightSync {
  1154. // Collect the yet unknown headers to mark them as uncertain
  1155. unknown := make([]*types.Header, 0, len(headers))
  1156. for _, header := range chunk {
  1157. if !d.hasHeader(header.Hash()) {
  1158. unknown = append(unknown, header)
  1159. }
  1160. }
  1161. // If we're importing pure headers, verify based on their recentness
  1162. frequency := fsHeaderCheckFrequency
  1163. if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
  1164. frequency = 1
  1165. }
  1166. if n, err := d.insertHeaders(chunk, frequency); err != nil {
  1167. // If some headers were inserted, add them too to the rollback list
  1168. if n > 0 {
  1169. rollback = append(rollback, chunk[:n]...)
  1170. }
  1171. glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
  1172. return errInvalidChain
  1173. }
  1174. // All verifications passed, store newly found uncertain headers
  1175. rollback = append(rollback, unknown...)
  1176. if len(rollback) > fsHeaderSafetyNet {
  1177. rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
  1178. }
  1179. }
  1180. // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
  1181. if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
  1182. if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
  1183. glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
  1184. return errInvalidChain
  1185. }
  1186. }
  1187. // Unless we're doing light chains, schedule the headers for associated content retrieval
  1188. if d.mode == FullSync || d.mode == FastSync {
  1189. // If we've reached the allowed number of pending headers, stall a bit
  1190. for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
  1191. select {
  1192. case <-d.cancelCh:
  1193. return errCancelHeaderProcessing
  1194. case <-time.After(time.Second):
  1195. }
  1196. }
  1197. // Otherwise insert the headers for content retrieval
  1198. inserts := d.queue.Schedule(chunk, origin)
  1199. if len(inserts) != len(chunk) {
  1200. glog.V(logger.Debug).Infof("stale headers")
  1201. return errBadPeer
  1202. }
  1203. }
  1204. headers = headers[limit:]
  1205. origin += uint64(limit)
  1206. }
  1207. // Signal the content downloaders of the availablility of new tasks
  1208. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1209. select {
  1210. case ch <- true:
  1211. default:
  1212. }
  1213. }
  1214. }
  1215. }
  1216. }
  1217. // processContent takes fetch results from the queue and tries to import them
  1218. // into the chain. The type of import operation will depend on the result contents.
  1219. func (d *Downloader) processContent() error {
  1220. pivot := d.queue.FastSyncPivot()
  1221. for {
  1222. results := d.queue.WaitResults()
  1223. if len(results) == 0 {
  1224. return nil // queue empty
  1225. }
  1226. if d.chainInsertHook != nil {
  1227. d.chainInsertHook(results)
  1228. }
  1229. // Actually import the blocks
  1230. if glog.V(logger.Debug) {
  1231. first, last := results[0].Header, results[len(results)-1].Header
  1232. glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
  1233. }
  1234. for len(results) != 0 {
  1235. // Check for any termination requests
  1236. select {
  1237. case <-d.quitCh:
  1238. return errCancelContentProcessing
  1239. default:
  1240. }
  1241. // Retrieve the a batch of results to import
  1242. var (
  1243. blocks = make([]*types.Block, 0, maxResultsProcess)
  1244. receipts = make([]types.Receipts, 0, maxResultsProcess)
  1245. )
  1246. items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
  1247. for _, result := range results[:items] {
  1248. switch {
  1249. case d.mode == FullSync:
  1250. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1251. case d.mode == FastSync:
  1252. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1253. if result.Header.Number.Uint64() <= pivot {
  1254. receipts = append(receipts, result.Receipts)
  1255. }
  1256. }
  1257. }
  1258. // Try to process the results, aborting if there's an error
  1259. var (
  1260. err error
  1261. index int
  1262. )
  1263. switch {
  1264. case len(receipts) > 0:
  1265. index, err = d.insertReceipts(blocks, receipts)
  1266. if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
  1267. glog.V(logger.Debug).Infof("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4])
  1268. index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
  1269. }
  1270. default:
  1271. index, err = d.insertBlocks(blocks)
  1272. }
  1273. if err != nil {
  1274. glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
  1275. return errInvalidChain
  1276. }
  1277. // Shift the results to the next batch
  1278. results = results[items:]
  1279. }
  1280. }
  1281. }
  1282. // DeliverHeaders injects a new batch of block headers received from a remote
  1283. // node into the download schedule.
  1284. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
  1285. return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
  1286. }
  1287. // DeliverBodies injects a new batch of block bodies received from a remote node.
  1288. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
  1289. return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
  1290. }
  1291. // DeliverReceipts injects a new batch of receipts received from a remote node.
  1292. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
  1293. return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
  1294. }
  1295. // DeliverNodeData injects a new batch of node state data received from a remote node.
  1296. func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
  1297. return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
  1298. }
  1299. // deliver injects a new batch of data received from a remote node.
  1300. func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  1301. // Update the delivery metrics for both good and failed deliveries
  1302. inMeter.Mark(int64(packet.Items()))
  1303. defer func() {
  1304. if err != nil {
  1305. dropMeter.Mark(int64(packet.Items()))
  1306. }
  1307. }()
  1308. // Deliver or abort if the sync is canceled while queuing
  1309. d.cancelLock.RLock()
  1310. cancel := d.cancelCh
  1311. d.cancelLock.RUnlock()
  1312. if cancel == nil {
  1313. return errNoSyncActive
  1314. }
  1315. select {
  1316. case destCh <- packet:
  1317. return nil
  1318. case <-cancel:
  1319. return errNoSyncActive
  1320. }
  1321. }
  1322. // qosTuner is the quality of service tuning loop that occasionally gathers the
  1323. // peer latency statistics and updates the estimated request round trip time.
  1324. func (d *Downloader) qosTuner() {
  1325. for {
  1326. // Retrieve the current median RTT and integrate into the previoust target RTT
  1327. rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
  1328. atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
  1329. // A new RTT cycle passed, increase our confidence in the estimated RTT
  1330. conf := atomic.LoadUint64(&d.rttConfidence)
  1331. conf = conf + (1000000-conf)/2
  1332. atomic.StoreUint64(&d.rttConfidence, conf)
  1333. // Log the new QoS values and sleep until the next RTT
  1334. glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
  1335. select {
  1336. case <-d.quitCh:
  1337. return
  1338. case <-time.After(rtt):
  1339. }
  1340. }
  1341. }
  1342. // qosReduceConfidence is meant to be called when a new peer joins the downloader's
  1343. // peer set, needing to reduce the confidence we have in out QoS estimates.
  1344. func (d *Downloader) qosReduceConfidence() {
  1345. // If we have a single peer, confidence is always 1
  1346. peers := uint64(d.peers.Len())
  1347. if peers == 1 {
  1348. atomic.StoreUint64(&d.rttConfidence, 1000000)
  1349. return
  1350. }
  1351. // If we have a ton of peers, don't drop confidence)
  1352. if peers >= uint64(qosConfidenceCap) {
  1353. return
  1354. }
  1355. // Otherwise drop the confidence factor
  1356. conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
  1357. if float64(conf)/1000000 < rttMinConfidence {
  1358. conf = uint64(rttMinConfidence * 1000000)
  1359. }
  1360. atomic.StoreUint64(&d.rttConfidence, conf)
  1361. rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1362. glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
  1363. }
  1364. // requestRTT returns the current target round trip time for a download request
  1365. // to complete in.
  1366. //
  1367. // Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
  1368. // the downloader tries to adapt queries to the RTT, so multiple RTT values can
  1369. // be adapted to, but smaller ones are preffered (stabler download stream).
  1370. func (d *Downloader) requestRTT() time.Duration {
  1371. return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
  1372. }
  1373. // requestTTL returns the current timeout allowance for a single download request
  1374. // to finish under.
  1375. func (d *Downloader) requestTTL() time.Duration {
  1376. var (
  1377. rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1378. conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
  1379. )
  1380. ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
  1381. if ttl > ttlLimit {
  1382. ttl = ttlLimit
  1383. }
  1384. return ttl
  1385. }