downloader.go 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package downloader contains the manual full chain synchronisation.
  17. package downloader
  18. import (
  19. "crypto/rand"
  20. "errors"
  21. "fmt"
  22. "math"
  23. "math/big"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. ethereum "github.com/ethereum/go-ethereum"
  29. "github.com/ethereum/go-ethereum/common"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/logger"
  34. "github.com/ethereum/go-ethereum/logger/glog"
  35. "github.com/ethereum/go-ethereum/params"
  36. "github.com/ethereum/go-ethereum/trie"
  37. "github.com/rcrowley/go-metrics"
  38. )
  39. var (
  40. MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
  41. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  42. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  43. MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
  44. MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
  45. MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
  46. MaxStateFetch = 384 // Amount of node state values to allow fetching per request
  47. MaxForkAncestry = 3 * params.EpochDuration.Uint64() // Maximum chain reorganisation
  48. rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
  49. rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
  50. rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
  51. ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
  52. ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
  53. qosTuningPeers = 5 // Number of peers to tune based on (best peers)
  54. qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
  55. qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
  56. maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
  57. maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
  58. maxResultsProcess = 2048 // Number of content download results to import at once into the chain
  59. fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
  60. fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
  61. fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
  62. fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
  63. fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
  64. fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing
  65. )
  66. var (
  67. errBusy = errors.New("busy")
  68. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  69. errBadPeer = errors.New("action from bad peer ignored")
  70. errStallingPeer = errors.New("peer is stalling")
  71. errNoPeers = errors.New("no peers to keep download active")
  72. errTimeout = errors.New("timeout")
  73. errEmptyHeaderSet = errors.New("empty header set by peer")
  74. errPeersUnavailable = errors.New("no peers available or all tried for download")
  75. errInvalidAncestor = errors.New("retrieved ancestor is invalid")
  76. errInvalidChain = errors.New("retrieved hash chain is invalid")
  77. errInvalidBlock = errors.New("retrieved block is invalid")
  78. errInvalidBody = errors.New("retrieved block body is invalid")
  79. errInvalidReceipt = errors.New("retrieved receipt is invalid")
  80. errCancelBlockFetch = errors.New("block download canceled (requested)")
  81. errCancelHeaderFetch = errors.New("block header download canceled (requested)")
  82. errCancelBodyFetch = errors.New("block body download canceled (requested)")
  83. errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
  84. errCancelStateFetch = errors.New("state data download canceled (requested)")
  85. errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
  86. errCancelContentProcessing = errors.New("content processing canceled (requested)")
  87. errNoSyncActive = errors.New("no sync active")
  88. errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
  89. )
  90. type Downloader struct {
  91. mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
  92. mux *event.TypeMux // Event multiplexer to announce sync operation events
  93. queue *queue // Scheduler for selecting the hashes to download
  94. peers *peerSet // Set of active peers from which download can proceed
  95. fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
  96. fsPivotFails int // Number of fast sync failures in the critical section
  97. rttEstimate uint64 // Round trip time to target for download requests
  98. rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
  99. // Statistics
  100. syncStatsChainOrigin uint64 // Origin block number where syncing started at
  101. syncStatsChainHeight uint64 // Highest block number known when syncing started
  102. syncStatsStateDone uint64 // Number of state trie entries already pulled
  103. syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
  104. // Callbacks
  105. hasHeader headerCheckFn // Checks if a header is present in the chain
  106. hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
  107. getHeader headerRetrievalFn // Retrieves a header from the chain
  108. getBlock blockRetrievalFn // Retrieves a block from the chain
  109. headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
  110. headBlock headBlockRetrievalFn // Retrieves the head block from the chain
  111. headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
  112. commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
  113. getTd tdRetrievalFn // Retrieves the TD of a block from the chain
  114. insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
  115. insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
  116. insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
  117. rollback chainRollbackFn // Removes a batch of recently added chain links
  118. dropPeer peerDropFn // Drops a peer for misbehaving
  119. // Status
  120. synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  121. synchronising int32
  122. notified int32
  123. // Channels
  124. newPeerCh chan *peer
  125. headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
  126. bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
  127. receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
  128. stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
  129. bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
  130. receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
  131. stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
  132. headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
  133. // Cancellation and termination
  134. cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
  135. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  136. cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
  137. quitCh chan struct{} // Quit channel to signal termination
  138. quitLock sync.RWMutex // Lock to prevent double closes
  139. // Testing hooks
  140. syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
  141. bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
  142. receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
  143. chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
  144. }
  145. // New creates a new downloader to fetch hashes and blocks from remote peers.
  146. func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
  147. getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
  148. headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
  149. insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
  150. dl := &Downloader{
  151. mode: FullSync,
  152. mux: mux,
  153. queue: newQueue(stateDb),
  154. peers: newPeerSet(),
  155. rttEstimate: uint64(rttMaxEstimate),
  156. rttConfidence: uint64(1000000),
  157. hasHeader: hasHeader,
  158. hasBlockAndState: hasBlockAndState,
  159. getHeader: getHeader,
  160. getBlock: getBlock,
  161. headHeader: headHeader,
  162. headBlock: headBlock,
  163. headFastBlock: headFastBlock,
  164. commitHeadBlock: commitHeadBlock,
  165. getTd: getTd,
  166. insertHeaders: insertHeaders,
  167. insertBlocks: insertBlocks,
  168. insertReceipts: insertReceipts,
  169. rollback: rollback,
  170. dropPeer: dropPeer,
  171. newPeerCh: make(chan *peer, 1),
  172. headerCh: make(chan dataPack, 1),
  173. bodyCh: make(chan dataPack, 1),
  174. receiptCh: make(chan dataPack, 1),
  175. stateCh: make(chan dataPack, 1),
  176. bodyWakeCh: make(chan bool, 1),
  177. receiptWakeCh: make(chan bool, 1),
  178. stateWakeCh: make(chan bool, 1),
  179. headerProcCh: make(chan []*types.Header, 1),
  180. quitCh: make(chan struct{}),
  181. }
  182. go dl.qosTuner()
  183. return dl
  184. }
  185. // Progress retrieves the synchronisation boundaries, specifically the origin
  186. // block where synchronisation started at (may have failed/suspended); the block
  187. // or header sync is currently at; and the latest known block which the sync targets.
  188. //
  189. // In addition, during the state download phase of fast synchronisation the number
  190. // of processed and the total number of known states are also returned. Otherwise
  191. // these are zero.
  192. func (d *Downloader) Progress() ethereum.SyncProgress {
  193. // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
  194. pendingStates := uint64(d.queue.PendingNodeData())
  195. // Lock the current stats and return the progress
  196. d.syncStatsLock.RLock()
  197. defer d.syncStatsLock.RUnlock()
  198. current := uint64(0)
  199. switch d.mode {
  200. case FullSync:
  201. current = d.headBlock().NumberU64()
  202. case FastSync:
  203. current = d.headFastBlock().NumberU64()
  204. case LightSync:
  205. current = d.headHeader().Number.Uint64()
  206. }
  207. return ethereum.SyncProgress{
  208. StartingBlock: d.syncStatsChainOrigin,
  209. CurrentBlock: current,
  210. HighestBlock: d.syncStatsChainHeight,
  211. PulledStates: d.syncStatsStateDone,
  212. KnownStates: d.syncStatsStateDone + pendingStates,
  213. }
  214. }
  215. // Synchronising returns whether the downloader is currently retrieving blocks.
  216. func (d *Downloader) Synchronising() bool {
  217. return atomic.LoadInt32(&d.synchronising) > 0
  218. }
  219. // RegisterPeer injects a new download peer into the set of block source to be
  220. // used for fetching hashes and blocks from.
  221. func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
  222. getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
  223. getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
  224. glog.V(logger.Detail).Infoln("Registering peer", id)
  225. if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
  226. glog.V(logger.Error).Infoln("Register failed:", err)
  227. return err
  228. }
  229. d.qosReduceConfidence()
  230. return nil
  231. }
  232. // UnregisterPeer remove a peer from the known list, preventing any action from
  233. // the specified peer. An effort is also made to return any pending fetches into
  234. // the queue.
  235. func (d *Downloader) UnregisterPeer(id string) error {
  236. // Unregister the peer from the active peer set and revoke any fetch tasks
  237. glog.V(logger.Detail).Infoln("Unregistering peer", id)
  238. if err := d.peers.Unregister(id); err != nil {
  239. glog.V(logger.Error).Infoln("Unregister failed:", err)
  240. return err
  241. }
  242. d.queue.Revoke(id)
  243. // If this peer was the master peer, abort sync immediately
  244. d.cancelLock.RLock()
  245. master := id == d.cancelPeer
  246. d.cancelLock.RUnlock()
  247. if master {
  248. d.cancel()
  249. }
  250. return nil
  251. }
  252. // Synchronise tries to sync up our local block chain with a remote peer, both
  253. // adding various sanity checks as well as wrapping it with various log entries.
  254. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
  255. glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
  256. err := d.synchronise(id, head, td, mode)
  257. switch err {
  258. case nil:
  259. glog.V(logger.Detail).Infof("Synchronisation completed")
  260. case errBusy:
  261. glog.V(logger.Detail).Infof("Synchronisation already in progress")
  262. case errTimeout, errBadPeer, errStallingPeer,
  263. errEmptyHeaderSet, errPeersUnavailable, errTooOld,
  264. errInvalidAncestor, errInvalidChain:
  265. glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
  266. d.dropPeer(id)
  267. default:
  268. glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
  269. }
  270. return err
  271. }
  272. // synchronise will select the peer and use it for synchronising. If an empty string is given
  273. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  274. // checks fail an error will be returned. This method is synchronous
  275. func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  276. // Mock out the synchronisation if testing
  277. if d.synchroniseMock != nil {
  278. return d.synchroniseMock(id, hash)
  279. }
  280. // Make sure only one goroutine is ever allowed past this point at once
  281. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  282. return errBusy
  283. }
  284. defer atomic.StoreInt32(&d.synchronising, 0)
  285. // Post a user notification of the sync (only once per session)
  286. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  287. glog.V(logger.Info).Infoln("Block synchronisation started")
  288. }
  289. // Reset the queue, peer set and wake channels to clean any internal leftover state
  290. d.queue.Reset()
  291. d.peers.Reset()
  292. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  293. select {
  294. case <-ch:
  295. default:
  296. }
  297. }
  298. for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
  299. for empty := false; !empty; {
  300. select {
  301. case <-ch:
  302. default:
  303. empty = true
  304. }
  305. }
  306. }
  307. for empty := false; !empty; {
  308. select {
  309. case <-d.headerProcCh:
  310. default:
  311. empty = true
  312. }
  313. }
  314. // Create cancel channel for aborting mid-flight and mark the master peer
  315. d.cancelLock.Lock()
  316. d.cancelCh = make(chan struct{})
  317. d.cancelPeer = id
  318. d.cancelLock.Unlock()
  319. defer d.cancel() // No matter what, we can't leave the cancel channel open
  320. // Set the requested sync mode, unless it's forbidden
  321. d.mode = mode
  322. if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials {
  323. d.mode = FullSync
  324. }
  325. // Retrieve the origin peer and initiate the downloading process
  326. p := d.peers.Peer(id)
  327. if p == nil {
  328. return errUnknownPeer
  329. }
  330. return d.syncWithPeer(p, hash, td)
  331. }
  332. // syncWithPeer starts a block synchronization based on the hash chain from the
  333. // specified peer and head hash.
  334. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
  335. d.mux.Post(StartEvent{})
  336. defer func() {
  337. // reset on error
  338. if err != nil {
  339. d.mux.Post(FailedEvent{err})
  340. } else {
  341. d.mux.Post(DoneEvent{})
  342. }
  343. }()
  344. if p.version < 62 {
  345. return errTooOld
  346. }
  347. glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
  348. defer func(start time.Time) {
  349. glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
  350. }(time.Now())
  351. // Look up the sync boundaries: the common ancestor and the target block
  352. latest, err := d.fetchHeight(p)
  353. if err != nil {
  354. return err
  355. }
  356. height := latest.Number.Uint64()
  357. origin, err := d.findAncestor(p, height)
  358. if err != nil {
  359. return err
  360. }
  361. d.syncStatsLock.Lock()
  362. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  363. d.syncStatsChainOrigin = origin
  364. }
  365. d.syncStatsChainHeight = height
  366. d.syncStatsLock.Unlock()
  367. // Initiate the sync using a concurrent header and content retrieval algorithm
  368. pivot := uint64(0)
  369. switch d.mode {
  370. case LightSync:
  371. pivot = height
  372. case FastSync:
  373. // Calculate the new fast/slow sync pivot point
  374. if d.fsPivotLock == nil {
  375. pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
  376. if err != nil {
  377. panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
  378. }
  379. if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
  380. pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
  381. }
  382. } else {
  383. // Pivot point locked in, use this and do not pick a new one!
  384. pivot = d.fsPivotLock.Number.Uint64()
  385. }
  386. // If the point is below the origin, move origin back to ensure state download
  387. if pivot < origin {
  388. if pivot > 0 {
  389. origin = pivot - 1
  390. } else {
  391. origin = 0
  392. }
  393. }
  394. glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
  395. }
  396. d.queue.Prepare(origin+1, d.mode, pivot, latest)
  397. if d.syncInitHook != nil {
  398. d.syncInitHook(origin, height)
  399. }
  400. return d.spawnSync(origin+1,
  401. func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
  402. func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
  403. func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
  404. func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
  405. func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
  406. )
  407. }
  408. // spawnSync runs d.process and all given fetcher functions to completion in
  409. // separate goroutines, returning the first error that appears.
  410. func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
  411. var wg sync.WaitGroup
  412. errc := make(chan error, len(fetchers)+1)
  413. wg.Add(len(fetchers) + 1)
  414. go func() { defer wg.Done(); errc <- d.processContent() }()
  415. for _, fn := range fetchers {
  416. fn := fn
  417. go func() { defer wg.Done(); errc <- fn() }()
  418. }
  419. // Wait for the first error, then terminate the others.
  420. var err error
  421. for i := 0; i < len(fetchers)+1; i++ {
  422. if i == len(fetchers) {
  423. // Close the queue when all fetchers have exited.
  424. // This will cause the block processor to end when
  425. // it has processed the queue.
  426. d.queue.Close()
  427. }
  428. if err = <-errc; err != nil {
  429. break
  430. }
  431. }
  432. d.queue.Close()
  433. d.cancel()
  434. wg.Wait()
  435. return err
  436. }
  437. // cancel cancels all of the operations and resets the queue. It returns true
  438. // if the cancel operation was completed.
  439. func (d *Downloader) cancel() {
  440. // Close the current cancel channel
  441. d.cancelLock.Lock()
  442. if d.cancelCh != nil {
  443. select {
  444. case <-d.cancelCh:
  445. // Channel was already closed
  446. default:
  447. close(d.cancelCh)
  448. }
  449. }
  450. d.cancelLock.Unlock()
  451. }
  452. // Terminate interrupts the downloader, canceling all pending operations.
  453. // The downloader cannot be reused after calling Terminate.
  454. func (d *Downloader) Terminate() {
  455. // Close the termination channel (make sure double close is allowed)
  456. d.quitLock.Lock()
  457. select {
  458. case <-d.quitCh:
  459. default:
  460. close(d.quitCh)
  461. }
  462. d.quitLock.Unlock()
  463. // Cancel any pending download requests
  464. d.cancel()
  465. }
  466. // fetchHeight retrieves the head header of the remote peer to aid in estimating
  467. // the total time a pending synchronisation would take.
  468. func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
  469. glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
  470. // Request the advertised remote head block and wait for the response
  471. head, _ := p.currentHead()
  472. go p.getRelHeaders(head, 1, 0, false)
  473. timeout := time.After(d.requestTTL())
  474. for {
  475. select {
  476. case <-d.cancelCh:
  477. return nil, errCancelBlockFetch
  478. case packet := <-d.headerCh:
  479. // Discard anything not from the origin peer
  480. if packet.PeerId() != p.id {
  481. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  482. break
  483. }
  484. // Make sure the peer actually gave something valid
  485. headers := packet.(*headerPack).headers
  486. if len(headers) != 1 {
  487. glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
  488. return nil, errBadPeer
  489. }
  490. return headers[0], nil
  491. case <-timeout:
  492. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  493. return nil, errTimeout
  494. case <-d.bodyCh:
  495. case <-d.stateCh:
  496. case <-d.receiptCh:
  497. // Out of bounds delivery, ignore
  498. }
  499. }
  500. }
  501. // findAncestor tries to locate the common ancestor link of the local chain and
  502. // a remote peers blockchain. In the general case when our node was in sync and
  503. // on the correct chain, checking the top N links should already get us a match.
  504. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  505. // the head links match), we do a binary search to find the common ancestor.
  506. func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
  507. glog.V(logger.Debug).Infof("%v: looking for common ancestor (remote height %d)", p, height)
  508. // Figure out the valid ancestor range to prevent rewrite attacks
  509. floor, ceil := int64(-1), d.headHeader().Number.Uint64()
  510. if d.mode == FullSync {
  511. ceil = d.headBlock().NumberU64()
  512. } else if d.mode == FastSync {
  513. ceil = d.headFastBlock().NumberU64()
  514. }
  515. if ceil >= MaxForkAncestry {
  516. floor = int64(ceil - MaxForkAncestry)
  517. }
  518. // Request the topmost blocks to short circuit binary ancestor lookup
  519. head := ceil
  520. if head > height {
  521. head = height
  522. }
  523. from := int64(head) - int64(MaxHeaderFetch)
  524. if from < 0 {
  525. from = 0
  526. }
  527. // Span out with 15 block gaps into the future to catch bad head reports
  528. limit := 2 * MaxHeaderFetch / 16
  529. count := 1 + int((int64(ceil)-from)/16)
  530. if count > limit {
  531. count = limit
  532. }
  533. go p.getAbsHeaders(uint64(from), count, 15, false)
  534. // Wait for the remote response to the head fetch
  535. number, hash := uint64(0), common.Hash{}
  536. timeout := time.After(d.requestTTL())
  537. for finished := false; !finished; {
  538. select {
  539. case <-d.cancelCh:
  540. return 0, errCancelHeaderFetch
  541. case packet := <-d.headerCh:
  542. // Discard anything not from the origin peer
  543. if packet.PeerId() != p.id {
  544. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  545. break
  546. }
  547. // Make sure the peer actually gave something valid
  548. headers := packet.(*headerPack).headers
  549. if len(headers) == 0 {
  550. glog.V(logger.Warn).Infof("%v: empty head header set", p)
  551. return 0, errEmptyHeaderSet
  552. }
  553. // Make sure the peer's reply conforms to the request
  554. for i := 0; i < len(headers); i++ {
  555. if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
  556. glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i)*16, number)
  557. return 0, errInvalidChain
  558. }
  559. }
  560. // Check if a common ancestor was found
  561. finished = true
  562. for i := len(headers) - 1; i >= 0; i-- {
  563. // Skip any headers that underflow/overflow our requested set
  564. if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
  565. continue
  566. }
  567. // Otherwise check if we already know the header or not
  568. if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
  569. number, hash = headers[i].Number.Uint64(), headers[i].Hash()
  570. // If every header is known, even future ones, the peer straight out lied about its head
  571. if number > height && i == limit-1 {
  572. glog.V(logger.Warn).Infof("%v: lied about chain head: reported %d, found above %d", p, height, number)
  573. return 0, errStallingPeer
  574. }
  575. break
  576. }
  577. }
  578. case <-timeout:
  579. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  580. return 0, errTimeout
  581. case <-d.bodyCh:
  582. case <-d.stateCh:
  583. case <-d.receiptCh:
  584. // Out of bounds delivery, ignore
  585. }
  586. }
  587. // If the head fetch already found an ancestor, return
  588. if !common.EmptyHash(hash) {
  589. if int64(number) <= floor {
  590. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
  591. return 0, errInvalidAncestor
  592. }
  593. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
  594. return number, nil
  595. }
  596. // Ancestor not found, we need to binary search over our chain
  597. start, end := uint64(0), head
  598. if floor > 0 {
  599. start = uint64(floor)
  600. }
  601. for start+1 < end {
  602. // Split our chain interval in two, and request the hash to cross check
  603. check := (start + end) / 2
  604. timeout := time.After(d.requestTTL())
  605. go p.getAbsHeaders(uint64(check), 1, 0, false)
  606. // Wait until a reply arrives to this request
  607. for arrived := false; !arrived; {
  608. select {
  609. case <-d.cancelCh:
  610. return 0, errCancelHeaderFetch
  611. case packer := <-d.headerCh:
  612. // Discard anything not from the origin peer
  613. if packer.PeerId() != p.id {
  614. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
  615. break
  616. }
  617. // Make sure the peer actually gave something valid
  618. headers := packer.(*headerPack).headers
  619. if len(headers) != 1 {
  620. glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
  621. return 0, errBadPeer
  622. }
  623. arrived = true
  624. // Modify the search interval based on the response
  625. if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
  626. end = check
  627. break
  628. }
  629. header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
  630. if header.Number.Uint64() != check {
  631. glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
  632. return 0, errBadPeer
  633. }
  634. start = check
  635. case <-timeout:
  636. glog.V(logger.Debug).Infof("%v: search header timeout", p)
  637. return 0, errTimeout
  638. case <-d.bodyCh:
  639. case <-d.stateCh:
  640. case <-d.receiptCh:
  641. // Out of bounds delivery, ignore
  642. }
  643. }
  644. }
  645. // Ensure valid ancestry and return
  646. if int64(start) <= floor {
  647. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)
  648. return 0, errInvalidAncestor
  649. }
  650. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, start, hash[:4])
  651. return start, nil
  652. }
  653. // fetchHeaders keeps retrieving headers concurrently from the number
  654. // requested, until no more are returned, potentially throttling on the way. To
  655. // facilitate concurrency but still protect against malicious nodes sending bad
  656. // headers, we construct a header chain skeleton using the "origin" peer we are
  657. // syncing with, and fill in the missing headers using anyone else. Headers from
  658. // other peers are only accepted if they map cleanly to the skeleton. If no one
  659. // can fill in the skeleton - not even the origin peer - it's assumed invalid and
  660. // the origin is dropped.
  661. func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
  662. glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
  663. defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
  664. // Create a timeout timer, and the associated header fetcher
  665. skeleton := true // Skeleton assembly phase or finishing up
  666. request := time.Now() // time of the last skeleton fetch request
  667. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  668. <-timeout.C // timeout channel should be initially empty
  669. defer timeout.Stop()
  670. getHeaders := func(from uint64) {
  671. request = time.Now()
  672. timeout.Reset(d.requestTTL())
  673. if skeleton {
  674. glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
  675. go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
  676. } else {
  677. glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
  678. go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
  679. }
  680. }
  681. // Start pulling the header chain skeleton until all is done
  682. getHeaders(from)
  683. for {
  684. select {
  685. case <-d.cancelCh:
  686. return errCancelHeaderFetch
  687. case packet := <-d.headerCh:
  688. // Make sure the active peer is giving us the skeleton headers
  689. if packet.PeerId() != p.id {
  690. glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
  691. break
  692. }
  693. headerReqTimer.UpdateSince(request)
  694. timeout.Stop()
  695. // If the skeleton's finished, pull any remaining head headers directly from the origin
  696. if packet.Items() == 0 && skeleton {
  697. skeleton = false
  698. getHeaders(from)
  699. continue
  700. }
  701. // If no more headers are inbound, notify the content fetchers and return
  702. if packet.Items() == 0 {
  703. glog.V(logger.Debug).Infof("%v: no available headers", p)
  704. select {
  705. case d.headerProcCh <- nil:
  706. return nil
  707. case <-d.cancelCh:
  708. return errCancelHeaderFetch
  709. }
  710. }
  711. headers := packet.(*headerPack).headers
  712. // If we received a skeleton batch, resolve internals concurrently
  713. if skeleton {
  714. filled, proced, err := d.fillHeaderSkeleton(from, headers)
  715. if err != nil {
  716. glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
  717. return errInvalidChain
  718. }
  719. headers = filled[proced:]
  720. from += uint64(proced)
  721. }
  722. // Insert all the new headers and fetch the next batch
  723. if len(headers) > 0 {
  724. glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
  725. select {
  726. case d.headerProcCh <- headers:
  727. case <-d.cancelCh:
  728. return errCancelHeaderFetch
  729. }
  730. from += uint64(len(headers))
  731. }
  732. getHeaders(from)
  733. case <-timeout.C:
  734. // Header retrieval timed out, consider the peer bad and drop
  735. glog.V(logger.Debug).Infof("%v: header request timed out", p)
  736. headerTimeoutMeter.Mark(1)
  737. d.dropPeer(p.id)
  738. // Finish the sync gracefully instead of dumping the gathered data though
  739. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  740. select {
  741. case ch <- false:
  742. case <-d.cancelCh:
  743. }
  744. }
  745. select {
  746. case d.headerProcCh <- nil:
  747. case <-d.cancelCh:
  748. }
  749. return errBadPeer
  750. }
  751. }
  752. }
  753. // fillHeaderSkeleton concurrently retrieves headers from all our available peers
  754. // and maps them to the provided skeleton header chain.
  755. //
  756. // Any partial results from the beginning of the skeleton is (if possible) forwarded
  757. // immediately to the header processor to keep the rest of the pipeline full even
  758. // in the case of header stalls.
  759. //
  760. // The method returs the entire filled skeleton and also the number of headers
  761. // already forwarded for processing.
  762. func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
  763. glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
  764. d.queue.ScheduleSkeleton(from, skeleton)
  765. var (
  766. deliver = func(packet dataPack) (int, error) {
  767. pack := packet.(*headerPack)
  768. return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
  769. }
  770. expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
  771. throttle = func() bool { return false }
  772. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  773. return d.queue.ReserveHeaders(p, count), false, nil
  774. }
  775. fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
  776. capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
  777. setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
  778. )
  779. err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
  780. d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
  781. nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
  782. glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
  783. filled, proced := d.queue.RetrieveHeaders()
  784. return filled, proced, err
  785. }
  786. // fetchBodies iteratively downloads the scheduled block bodies, taking any
  787. // available peers, reserving a chunk of blocks for each, waiting for delivery
  788. // and also periodically checking for timeouts.
  789. func (d *Downloader) fetchBodies(from uint64) error {
  790. glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
  791. var (
  792. deliver = func(packet dataPack) (int, error) {
  793. pack := packet.(*bodyPack)
  794. return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
  795. }
  796. expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
  797. fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
  798. capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
  799. setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
  800. )
  801. err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
  802. d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
  803. d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body")
  804. glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
  805. return err
  806. }
  807. // fetchReceipts iteratively downloads the scheduled block receipts, taking any
  808. // available peers, reserving a chunk of receipts for each, waiting for delivery
  809. // and also periodically checking for timeouts.
  810. func (d *Downloader) fetchReceipts(from uint64) error {
  811. glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
  812. var (
  813. deliver = func(packet dataPack) (int, error) {
  814. pack := packet.(*receiptPack)
  815. return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
  816. }
  817. expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
  818. fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
  819. capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
  820. setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
  821. )
  822. err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
  823. d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
  824. d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
  825. glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
  826. return err
  827. }
  828. // fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
  829. // available peers, reserving a chunk of nodes for each, waiting for delivery and
  830. // also periodically checking for timeouts.
  831. func (d *Downloader) fetchNodeData() error {
  832. glog.V(logger.Debug).Infof("Downloading node state data")
  833. var (
  834. deliver = func(packet dataPack) (int, error) {
  835. start := time.Now()
  836. return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
  837. // If the peer returned old-requested data, forgive
  838. if err == trie.ErrNotRequested {
  839. glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
  840. return
  841. }
  842. if err != nil {
  843. // If the node data processing failed, the root hash is very wrong, abort
  844. glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
  845. d.cancel()
  846. return
  847. }
  848. // Processing succeeded, notify state fetcher of continuation
  849. pending := d.queue.PendingNodeData()
  850. if pending > 0 {
  851. select {
  852. case d.stateWakeCh <- true:
  853. default:
  854. }
  855. }
  856. d.syncStatsLock.Lock()
  857. d.syncStatsStateDone += uint64(delivered)
  858. d.syncStatsLock.Unlock()
  859. // Log a message to the user and return
  860. if delivered > 0 {
  861. glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending)
  862. }
  863. })
  864. }
  865. expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
  866. throttle = func() bool { return false }
  867. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  868. return d.queue.ReserveNodeData(p, count), false, nil
  869. }
  870. fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
  871. capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
  872. setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
  873. )
  874. err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
  875. d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
  876. d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State")
  877. glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
  878. return err
  879. }
  880. // fetchParts iteratively downloads scheduled block parts, taking any available
  881. // peers, reserving a chunk of fetch requests for each, waiting for delivery and
  882. // also periodically checking for timeouts.
  883. //
  884. // As the scheduling/timeout logic mostly is the same for all downloaded data
  885. // types, this method is used by each for data gathering and is instrumented with
  886. // various callbacks to handle the slight differences between processing them.
  887. //
  888. // The instrumentation parameters:
  889. // - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
  890. // - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
  891. // - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
  892. // - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
  893. // - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
  894. // - pending: task callback for the number of requests still needing download (detect completion/non-completability)
  895. // - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
  896. // - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
  897. // - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
  898. // - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
  899. // - fetch: network callback to actually send a particular download request to a physical remote peer
  900. // - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
  901. // - capacity: network callback to retreive the estimated type-specific bandwidth capacity of a peer (traffic shaping)
  902. // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
  903. // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
  904. // - kind: textual label of the type being downloaded to display in log mesages
  905. func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
  906. expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
  907. fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
  908. idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
  909. // Create a ticker to detect expired retrieval tasks
  910. ticker := time.NewTicker(100 * time.Millisecond)
  911. defer ticker.Stop()
  912. update := make(chan struct{}, 1)
  913. // Prepare the queue and fetch block parts until the block header fetcher's done
  914. finished := false
  915. for {
  916. select {
  917. case <-d.cancelCh:
  918. return errCancel
  919. case packet := <-deliveryCh:
  920. // If the peer was previously banned and failed to deliver it's pack
  921. // in a reasonable time frame, ignore it's message.
  922. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  923. // Deliver the received chunk of data and check chain validity
  924. accepted, err := deliver(packet)
  925. if err == errInvalidChain {
  926. return err
  927. }
  928. // Unless a peer delivered something completely else than requested (usually
  929. // caused by a timed out request which came through in the end), set it to
  930. // idle. If the delivery's stale, the peer should have already been idled.
  931. if err != errStaleDelivery {
  932. setIdle(peer, accepted)
  933. }
  934. // Issue a log to the user to see what's going on
  935. switch {
  936. case err == nil && packet.Items() == 0:
  937. glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
  938. case err == nil:
  939. glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
  940. default:
  941. glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
  942. }
  943. }
  944. // Blocks assembled, try to update the progress
  945. select {
  946. case update <- struct{}{}:
  947. default:
  948. }
  949. case cont := <-wakeCh:
  950. // The header fetcher sent a continuation flag, check if it's done
  951. if !cont {
  952. finished = true
  953. }
  954. // Headers arrive, try to update the progress
  955. select {
  956. case update <- struct{}{}:
  957. default:
  958. }
  959. case <-ticker.C:
  960. // Sanity check update the progress
  961. select {
  962. case update <- struct{}{}:
  963. default:
  964. }
  965. case <-update:
  966. // Short circuit if we lost all our peers
  967. if d.peers.Len() == 0 {
  968. return errNoPeers
  969. }
  970. // Check for fetch request timeouts and demote the responsible peers
  971. for pid, fails := range expire() {
  972. if peer := d.peers.Peer(pid); peer != nil {
  973. // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
  974. // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
  975. // out that sync wise we need to get rid of the peer.
  976. //
  977. // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
  978. // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
  979. // how response times reacts, to it always requests one more than the minimum (i.e. min 2).
  980. if fails > 2 {
  981. glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
  982. setIdle(peer, 0)
  983. } else {
  984. glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
  985. d.dropPeer(pid)
  986. }
  987. }
  988. }
  989. // If there's nothing more to fetch, wait or terminate
  990. if pending() == 0 {
  991. if !inFlight() && finished {
  992. glog.V(logger.Debug).Infof("%s fetching completed", kind)
  993. return nil
  994. }
  995. break
  996. }
  997. // Send a download request to all idle peers, until throttled
  998. progressed, throttled, running := false, false, inFlight()
  999. idles, total := idle()
  1000. for _, peer := range idles {
  1001. // Short circuit if throttling activated
  1002. if throttle() {
  1003. throttled = true
  1004. break
  1005. }
  1006. // Reserve a chunk of fetches for a peer. A nil can mean either that
  1007. // no more headers are available, or that the peer is known not to
  1008. // have them.
  1009. request, progress, err := reserve(peer, capacity(peer))
  1010. if err != nil {
  1011. return err
  1012. }
  1013. if progress {
  1014. progressed = true
  1015. }
  1016. if request == nil {
  1017. continue
  1018. }
  1019. if glog.V(logger.Detail) {
  1020. if request.From > 0 {
  1021. glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
  1022. } else if len(request.Headers) > 0 {
  1023. glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
  1024. } else {
  1025. glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
  1026. }
  1027. }
  1028. // Fetch the chunk and make sure any errors return the hashes to the queue
  1029. if fetchHook != nil {
  1030. fetchHook(request.Headers)
  1031. }
  1032. if err := fetch(peer, request); err != nil {
  1033. // Although we could try and make an attempt to fix this, this error really
  1034. // means that we've double allocated a fetch task to a peer. If that is the
  1035. // case, the internal state of the downloader and the queue is very wrong so
  1036. // better hard crash and note the error instead of silently accumulating into
  1037. // a much bigger issue.
  1038. panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind)))
  1039. }
  1040. running = true
  1041. }
  1042. // Make sure that we have peers available for fetching. If all peers have been tried
  1043. // and all failed throw an error
  1044. if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
  1045. return errPeersUnavailable
  1046. }
  1047. }
  1048. }
  1049. }
  1050. // processHeaders takes batches of retrieved headers from an input channel and
  1051. // keeps processing and scheduling them into the header chain and downloader's
  1052. // queue until the stream ends or a failure occurs.
  1053. func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
  1054. // Calculate the pivoting point for switching from fast to slow sync
  1055. pivot := d.queue.FastSyncPivot()
  1056. // Keep a count of uncertain headers to roll back
  1057. rollback := []*types.Header{}
  1058. defer func() {
  1059. if len(rollback) > 0 {
  1060. // Flatten the headers and roll them back
  1061. hashes := make([]common.Hash, len(rollback))
  1062. for i, header := range rollback {
  1063. hashes[i] = header.Hash()
  1064. }
  1065. lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number()
  1066. d.rollback(hashes)
  1067. glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
  1068. len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number())
  1069. // If we're already past the pivot point, this could be an attack, thread carefully
  1070. if rollback[len(rollback)-1].Number.Uint64() > pivot {
  1071. // If we didn't ever fail, lock in te pivot header (must! not! change!)
  1072. if d.fsPivotFails == 0 {
  1073. for _, header := range rollback {
  1074. if header.Number.Uint64() == pivot {
  1075. glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
  1076. d.fsPivotLock = header
  1077. }
  1078. }
  1079. }
  1080. d.fsPivotFails++
  1081. }
  1082. }
  1083. }()
  1084. // Wait for batches of headers to process
  1085. gotHeaders := false
  1086. for {
  1087. select {
  1088. case <-d.cancelCh:
  1089. return errCancelHeaderProcessing
  1090. case headers := <-d.headerProcCh:
  1091. // Terminate header processing if we synced up
  1092. if len(headers) == 0 {
  1093. // Notify everyone that headers are fully processed
  1094. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1095. select {
  1096. case ch <- false:
  1097. case <-d.cancelCh:
  1098. }
  1099. }
  1100. // If no headers were retrieved at all, the peer violated it's TD promise that it had a
  1101. // better chain compared to ours. The only exception is if it's promised blocks were
  1102. // already imported by other means (e.g. fecher):
  1103. //
  1104. // R <remote peer>, L <local node>: Both at block 10
  1105. // R: Mine block 11, and propagate it to L
  1106. // L: Queue block 11 for import
  1107. // L: Notice that R's head and TD increased compared to ours, start sync
  1108. // L: Import of block 11 finishes
  1109. // L: Sync begins, and finds common ancestor at 11
  1110. // L: Request new headers up from 11 (R's TD was higher, it must have something)
  1111. // R: Nothing to give
  1112. if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  1113. return errStallingPeer
  1114. }
  1115. // If fast or light syncing, ensure promised headers are indeed delivered. This is
  1116. // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
  1117. // of delivering the post-pivot blocks that would flag the invalid content.
  1118. //
  1119. // This check cannot be executed "as is" for full imports, since blocks may still be
  1120. // queued for processing when the header download completes. However, as long as the
  1121. // peer gave us something useful, we're already happy/progressed (above check).
  1122. if d.mode == FastSync || d.mode == LightSync {
  1123. if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
  1124. return errStallingPeer
  1125. }
  1126. }
  1127. // Disable any rollback and return
  1128. rollback = nil
  1129. return nil
  1130. }
  1131. // Otherwise split the chunk of headers into batches and process them
  1132. gotHeaders = true
  1133. for len(headers) > 0 {
  1134. // Terminate if something failed in between processing chunks
  1135. select {
  1136. case <-d.cancelCh:
  1137. return errCancelHeaderProcessing
  1138. default:
  1139. }
  1140. // Select the next chunk of headers to import
  1141. limit := maxHeadersProcess
  1142. if limit > len(headers) {
  1143. limit = len(headers)
  1144. }
  1145. chunk := headers[:limit]
  1146. // In case of header only syncing, validate the chunk immediately
  1147. if d.mode == FastSync || d.mode == LightSync {
  1148. // Collect the yet unknown headers to mark them as uncertain
  1149. unknown := make([]*types.Header, 0, len(headers))
  1150. for _, header := range chunk {
  1151. if !d.hasHeader(header.Hash()) {
  1152. unknown = append(unknown, header)
  1153. }
  1154. }
  1155. // If we're importing pure headers, verify based on their recentness
  1156. frequency := fsHeaderCheckFrequency
  1157. if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
  1158. frequency = 1
  1159. }
  1160. if n, err := d.insertHeaders(chunk, frequency); err != nil {
  1161. // If some headers were inserted, add them too to the rollback list
  1162. if n > 0 {
  1163. rollback = append(rollback, chunk[:n]...)
  1164. }
  1165. glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
  1166. return errInvalidChain
  1167. }
  1168. // All verifications passed, store newly found uncertain headers
  1169. rollback = append(rollback, unknown...)
  1170. if len(rollback) > fsHeaderSafetyNet {
  1171. rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
  1172. }
  1173. }
  1174. // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
  1175. if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
  1176. if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
  1177. glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
  1178. return errInvalidChain
  1179. }
  1180. }
  1181. // Unless we're doing light chains, schedule the headers for associated content retrieval
  1182. if d.mode == FullSync || d.mode == FastSync {
  1183. // If we've reached the allowed number of pending headers, stall a bit
  1184. for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
  1185. select {
  1186. case <-d.cancelCh:
  1187. return errCancelHeaderProcessing
  1188. case <-time.After(time.Second):
  1189. }
  1190. }
  1191. // Otherwise insert the headers for content retrieval
  1192. inserts := d.queue.Schedule(chunk, origin)
  1193. if len(inserts) != len(chunk) {
  1194. glog.V(logger.Debug).Infof("stale headers")
  1195. return errBadPeer
  1196. }
  1197. }
  1198. headers = headers[limit:]
  1199. origin += uint64(limit)
  1200. }
  1201. // Signal the content downloaders of the availablility of new tasks
  1202. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1203. select {
  1204. case ch <- true:
  1205. default:
  1206. }
  1207. }
  1208. }
  1209. }
  1210. }
  1211. // processContent takes fetch results from the queue and tries to import them
  1212. // into the chain. The type of import operation will depend on the result contents.
  1213. func (d *Downloader) processContent() error {
  1214. pivot := d.queue.FastSyncPivot()
  1215. for {
  1216. results := d.queue.WaitResults()
  1217. if len(results) == 0 {
  1218. return nil // queue empty
  1219. }
  1220. if d.chainInsertHook != nil {
  1221. d.chainInsertHook(results)
  1222. }
  1223. // Actually import the blocks
  1224. if glog.V(logger.Debug) {
  1225. first, last := results[0].Header, results[len(results)-1].Header
  1226. glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
  1227. }
  1228. for len(results) != 0 {
  1229. // Check for any termination requests
  1230. select {
  1231. case <-d.quitCh:
  1232. return errCancelContentProcessing
  1233. default:
  1234. }
  1235. // Retrieve the a batch of results to import
  1236. var (
  1237. blocks = make([]*types.Block, 0, maxResultsProcess)
  1238. receipts = make([]types.Receipts, 0, maxResultsProcess)
  1239. )
  1240. items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
  1241. for _, result := range results[:items] {
  1242. switch {
  1243. case d.mode == FullSync:
  1244. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1245. case d.mode == FastSync:
  1246. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1247. if result.Header.Number.Uint64() <= pivot {
  1248. receipts = append(receipts, result.Receipts)
  1249. }
  1250. }
  1251. }
  1252. // Try to process the results, aborting if there's an error
  1253. var (
  1254. err error
  1255. index int
  1256. )
  1257. switch {
  1258. case len(receipts) > 0:
  1259. index, err = d.insertReceipts(blocks, receipts)
  1260. if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
  1261. glog.V(logger.Debug).Infof("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4])
  1262. index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
  1263. }
  1264. default:
  1265. index, err = d.insertBlocks(blocks)
  1266. }
  1267. if err != nil {
  1268. glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
  1269. return errInvalidChain
  1270. }
  1271. // Shift the results to the next batch
  1272. results = results[items:]
  1273. }
  1274. }
  1275. }
  1276. // DeliverHeaders injects a new batch of block headers received from a remote
  1277. // node into the download schedule.
  1278. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
  1279. return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
  1280. }
  1281. // DeliverBodies injects a new batch of block bodies received from a remote node.
  1282. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
  1283. return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
  1284. }
  1285. // DeliverReceipts injects a new batch of receipts received from a remote node.
  1286. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
  1287. return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
  1288. }
  1289. // DeliverNodeData injects a new batch of node state data received from a remote node.
  1290. func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
  1291. return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
  1292. }
  1293. // deliver injects a new batch of data received from a remote node.
  1294. func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  1295. // Update the delivery metrics for both good and failed deliveries
  1296. inMeter.Mark(int64(packet.Items()))
  1297. defer func() {
  1298. if err != nil {
  1299. dropMeter.Mark(int64(packet.Items()))
  1300. }
  1301. }()
  1302. // Deliver or abort if the sync is canceled while queuing
  1303. d.cancelLock.RLock()
  1304. cancel := d.cancelCh
  1305. d.cancelLock.RUnlock()
  1306. if cancel == nil {
  1307. return errNoSyncActive
  1308. }
  1309. select {
  1310. case destCh <- packet:
  1311. return nil
  1312. case <-cancel:
  1313. return errNoSyncActive
  1314. }
  1315. }
  1316. // qosTuner is the quality of service tuning loop that occasionally gathers the
  1317. // peer latency statistics and updates the estimated request round trip time.
  1318. func (d *Downloader) qosTuner() {
  1319. for {
  1320. // Retrieve the current median RTT and integrate into the previoust target RTT
  1321. rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
  1322. atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
  1323. // A new RTT cycle passed, increase our confidence in the estimated RTT
  1324. conf := atomic.LoadUint64(&d.rttConfidence)
  1325. conf = conf + (1000000-conf)/2
  1326. atomic.StoreUint64(&d.rttConfidence, conf)
  1327. // Log the new QoS values and sleep until the next RTT
  1328. glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
  1329. select {
  1330. case <-d.quitCh:
  1331. return
  1332. case <-time.After(rtt):
  1333. }
  1334. }
  1335. }
  1336. // qosReduceConfidence is meant to be called when a new peer joins the downloader's
  1337. // peer set, needing to reduce the confidence we have in out QoS estimates.
  1338. func (d *Downloader) qosReduceConfidence() {
  1339. // If we have a single peer, confidence is always 1
  1340. peers := uint64(d.peers.Len())
  1341. if peers == 1 {
  1342. atomic.StoreUint64(&d.rttConfidence, 1000000)
  1343. return
  1344. }
  1345. // If we have a ton of peers, don't drop confidence)
  1346. if peers >= uint64(qosConfidenceCap) {
  1347. return
  1348. }
  1349. // Otherwise drop the confidence factor
  1350. conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
  1351. if float64(conf)/1000000 < rttMinConfidence {
  1352. conf = uint64(rttMinConfidence * 1000000)
  1353. }
  1354. atomic.StoreUint64(&d.rttConfidence, conf)
  1355. rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1356. glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
  1357. }
  1358. // requestRTT returns the current target round trip time for a download request
  1359. // to complete in.
  1360. //
  1361. // Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
  1362. // the downloader tries to adapt queries to the RTT, so multiple RTT values can
  1363. // be adapted to, but smaller ones are preffered (stabler download stream).
  1364. func (d *Downloader) requestRTT() time.Duration {
  1365. return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
  1366. }
  1367. // requestTTL returns the current timeout allowance for a single download request
  1368. // to finish under.
  1369. func (d *Downloader) requestTTL() time.Duration {
  1370. var (
  1371. rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1372. conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
  1373. )
  1374. ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
  1375. if ttl > ttlLimit {
  1376. ttl = ttlLimit
  1377. }
  1378. return ttl
  1379. }