downloader.go 60 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package downloader contains the manual full chain synchronisation.
  17. package downloader
  18. import (
  19. "crypto/rand"
  20. "errors"
  21. "fmt"
  22. "math"
  23. "math/big"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. ethereum "github.com/ethereum/go-ethereum"
  29. "github.com/ethereum/go-ethereum/common"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/logger"
  34. "github.com/ethereum/go-ethereum/logger/glog"
  35. "github.com/ethereum/go-ethereum/params"
  36. "github.com/ethereum/go-ethereum/trie"
  37. "github.com/rcrowley/go-metrics"
  38. )
  39. var (
  40. MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
  41. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  42. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  43. MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
  44. MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
  45. MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
  46. MaxStateFetch = 384 // Amount of node state values to allow fetching per request
  47. MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation
  48. rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
  49. rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
  50. rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
  51. ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
  52. ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
  53. qosTuningPeers = 5 // Number of peers to tune based on (best peers)
  54. qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
  55. qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
  56. maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
  57. maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
  58. maxResultsProcess = 2048 // Number of content download results to import at once into the chain
  59. fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
  60. fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
  61. fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
  62. fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point
  63. fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
  64. fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
  65. )
  66. var (
  67. errBusy = errors.New("busy")
  68. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  69. errBadPeer = errors.New("action from bad peer ignored")
  70. errStallingPeer = errors.New("peer is stalling")
  71. errNoPeers = errors.New("no peers to keep download active")
  72. errTimeout = errors.New("timeout")
  73. errEmptyHeaderSet = errors.New("empty header set by peer")
  74. errPeersUnavailable = errors.New("no peers available or all tried for download")
  75. errInvalidAncestor = errors.New("retrieved ancestor is invalid")
  76. errInvalidChain = errors.New("retrieved hash chain is invalid")
  77. errInvalidBlock = errors.New("retrieved block is invalid")
  78. errInvalidBody = errors.New("retrieved block body is invalid")
  79. errInvalidReceipt = errors.New("retrieved receipt is invalid")
  80. errCancelBlockFetch = errors.New("block download canceled (requested)")
  81. errCancelHeaderFetch = errors.New("block header download canceled (requested)")
  82. errCancelBodyFetch = errors.New("block body download canceled (requested)")
  83. errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
  84. errCancelStateFetch = errors.New("state data download canceled (requested)")
  85. errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
  86. errCancelContentProcessing = errors.New("content processing canceled (requested)")
  87. errNoSyncActive = errors.New("no sync active")
  88. errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
  89. )
  90. type Downloader struct {
  91. mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
  92. mux *event.TypeMux // Event multiplexer to announce sync operation events
  93. queue *queue // Scheduler for selecting the hashes to download
  94. peers *peerSet // Set of active peers from which download can proceed
  95. fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
  96. fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
  97. rttEstimate uint64 // Round trip time to target for download requests
  98. rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
  99. // Statistics
  100. syncStatsChainOrigin uint64 // Origin block number where syncing started at
  101. syncStatsChainHeight uint64 // Highest block number known when syncing started
  102. syncStatsStateDone uint64 // Number of state trie entries already pulled
  103. syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
  104. // Callbacks
  105. hasHeader headerCheckFn // Checks if a header is present in the chain
  106. hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain
  107. getHeader headerRetrievalFn // Retrieves a header from the chain
  108. getBlock blockRetrievalFn // Retrieves a block from the chain
  109. headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
  110. headBlock headBlockRetrievalFn // Retrieves the head block from the chain
  111. headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
  112. commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
  113. getTd tdRetrievalFn // Retrieves the TD of a block from the chain
  114. insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
  115. insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
  116. insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
  117. rollback chainRollbackFn // Removes a batch of recently added chain links
  118. dropPeer peerDropFn // Drops a peer for misbehaving
  119. // Status
  120. synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  121. synchronising int32
  122. notified int32
  123. // Channels
  124. newPeerCh chan *peer
  125. headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
  126. bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
  127. receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
  128. stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
  129. bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
  130. receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
  131. stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
  132. headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
  133. // Cancellation and termination
  134. cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
  135. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  136. cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
  137. quitCh chan struct{} // Quit channel to signal termination
  138. quitLock sync.RWMutex // Lock to prevent double closes
  139. // Testing hooks
  140. syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
  141. bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
  142. receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
  143. chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
  144. }
  145. // New creates a new downloader to fetch hashes and blocks from remote peers.
  146. func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
  147. getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
  148. headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
  149. insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
  150. dl := &Downloader{
  151. mode: mode,
  152. mux: mux,
  153. queue: newQueue(stateDb),
  154. peers: newPeerSet(),
  155. rttEstimate: uint64(rttMaxEstimate),
  156. rttConfidence: uint64(1000000),
  157. hasHeader: hasHeader,
  158. hasBlockAndState: hasBlockAndState,
  159. getHeader: getHeader,
  160. getBlock: getBlock,
  161. headHeader: headHeader,
  162. headBlock: headBlock,
  163. headFastBlock: headFastBlock,
  164. commitHeadBlock: commitHeadBlock,
  165. getTd: getTd,
  166. insertHeaders: insertHeaders,
  167. insertBlocks: insertBlocks,
  168. insertReceipts: insertReceipts,
  169. rollback: rollback,
  170. dropPeer: dropPeer,
  171. newPeerCh: make(chan *peer, 1),
  172. headerCh: make(chan dataPack, 1),
  173. bodyCh: make(chan dataPack, 1),
  174. receiptCh: make(chan dataPack, 1),
  175. stateCh: make(chan dataPack, 1),
  176. bodyWakeCh: make(chan bool, 1),
  177. receiptWakeCh: make(chan bool, 1),
  178. stateWakeCh: make(chan bool, 1),
  179. headerProcCh: make(chan []*types.Header, 1),
  180. quitCh: make(chan struct{}),
  181. }
  182. go dl.qosTuner()
  183. return dl
  184. }
  185. // Progress retrieves the synchronisation boundaries, specifically the origin
  186. // block where synchronisation started at (may have failed/suspended); the block
  187. // or header sync is currently at; and the latest known block which the sync targets.
  188. //
  189. // In addition, during the state download phase of fast synchronisation the number
  190. // of processed and the total number of known states are also returned. Otherwise
  191. // these are zero.
  192. func (d *Downloader) Progress() ethereum.SyncProgress {
  193. // Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
  194. pendingStates := uint64(d.queue.PendingNodeData())
  195. // Lock the current stats and return the progress
  196. d.syncStatsLock.RLock()
  197. defer d.syncStatsLock.RUnlock()
  198. current := uint64(0)
  199. switch d.mode {
  200. case FullSync:
  201. current = d.headBlock().NumberU64()
  202. case FastSync:
  203. current = d.headFastBlock().NumberU64()
  204. case LightSync:
  205. current = d.headHeader().Number.Uint64()
  206. }
  207. return ethereum.SyncProgress{
  208. StartingBlock: d.syncStatsChainOrigin,
  209. CurrentBlock: current,
  210. HighestBlock: d.syncStatsChainHeight,
  211. PulledStates: d.syncStatsStateDone,
  212. KnownStates: d.syncStatsStateDone + pendingStates,
  213. }
  214. }
  215. // Synchronising returns whether the downloader is currently retrieving blocks.
  216. func (d *Downloader) Synchronising() bool {
  217. return atomic.LoadInt32(&d.synchronising) > 0
  218. }
  219. // RegisterPeer injects a new download peer into the set of block source to be
  220. // used for fetching hashes and blocks from.
  221. func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
  222. getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
  223. getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
  224. glog.V(logger.Detail).Infoln("Registering peer", id)
  225. if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
  226. glog.V(logger.Error).Infoln("Register failed:", err)
  227. return err
  228. }
  229. d.qosReduceConfidence()
  230. return nil
  231. }
  232. // UnregisterPeer remove a peer from the known list, preventing any action from
  233. // the specified peer. An effort is also made to return any pending fetches into
  234. // the queue.
  235. func (d *Downloader) UnregisterPeer(id string) error {
  236. // Unregister the peer from the active peer set and revoke any fetch tasks
  237. glog.V(logger.Detail).Infoln("Unregistering peer", id)
  238. if err := d.peers.Unregister(id); err != nil {
  239. glog.V(logger.Error).Infoln("Unregister failed:", err)
  240. return err
  241. }
  242. d.queue.Revoke(id)
  243. // If this peer was the master peer, abort sync immediately
  244. d.cancelLock.RLock()
  245. master := id == d.cancelPeer
  246. d.cancelLock.RUnlock()
  247. if master {
  248. d.cancel()
  249. }
  250. return nil
  251. }
  252. // Synchronise tries to sync up our local block chain with a remote peer, both
  253. // adding various sanity checks as well as wrapping it with various log entries.
  254. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
  255. glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
  256. err := d.synchronise(id, head, td, mode)
  257. switch err {
  258. case nil:
  259. glog.V(logger.Detail).Infof("Synchronisation completed")
  260. case errBusy:
  261. glog.V(logger.Detail).Infof("Synchronisation already in progress")
  262. case errTimeout, errBadPeer, errStallingPeer,
  263. errEmptyHeaderSet, errPeersUnavailable, errTooOld,
  264. errInvalidAncestor, errInvalidChain:
  265. glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
  266. d.dropPeer(id)
  267. default:
  268. glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
  269. }
  270. return err
  271. }
  272. // synchronise will select the peer and use it for synchronising. If an empty string is given
  273. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  274. // checks fail an error will be returned. This method is synchronous
  275. func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  276. // Mock out the synchronisation if testing
  277. if d.synchroniseMock != nil {
  278. return d.synchroniseMock(id, hash)
  279. }
  280. // Make sure only one goroutine is ever allowed past this point at once
  281. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  282. return errBusy
  283. }
  284. defer atomic.StoreInt32(&d.synchronising, 0)
  285. // Post a user notification of the sync (only once per session)
  286. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  287. glog.V(logger.Info).Infoln("Block synchronisation started")
  288. }
  289. // Reset the queue, peer set and wake channels to clean any internal leftover state
  290. d.queue.Reset()
  291. d.peers.Reset()
  292. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  293. select {
  294. case <-ch:
  295. default:
  296. }
  297. }
  298. for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
  299. for empty := false; !empty; {
  300. select {
  301. case <-ch:
  302. default:
  303. empty = true
  304. }
  305. }
  306. }
  307. for empty := false; !empty; {
  308. select {
  309. case <-d.headerProcCh:
  310. default:
  311. empty = true
  312. }
  313. }
  314. // Create cancel channel for aborting mid-flight and mark the master peer
  315. d.cancelLock.Lock()
  316. d.cancelCh = make(chan struct{})
  317. d.cancelPeer = id
  318. d.cancelLock.Unlock()
  319. defer d.cancel() // No matter what, we can't leave the cancel channel open
  320. // Set the requested sync mode, unless it's forbidden
  321. d.mode = mode
  322. if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
  323. d.mode = FullSync
  324. }
  325. // Retrieve the origin peer and initiate the downloading process
  326. p := d.peers.Peer(id)
  327. if p == nil {
  328. return errUnknownPeer
  329. }
  330. return d.syncWithPeer(p, hash, td)
  331. }
  332. // syncWithPeer starts a block synchronization based on the hash chain from the
  333. // specified peer and head hash.
  334. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
  335. d.mux.Post(StartEvent{})
  336. defer func() {
  337. // reset on error
  338. if err != nil {
  339. d.mux.Post(FailedEvent{err})
  340. } else {
  341. d.mux.Post(DoneEvent{})
  342. }
  343. }()
  344. if p.version < 62 {
  345. return errTooOld
  346. }
  347. glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
  348. defer func(start time.Time) {
  349. glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
  350. }(time.Now())
  351. // Look up the sync boundaries: the common ancestor and the target block
  352. latest, err := d.fetchHeight(p)
  353. if err != nil {
  354. return err
  355. }
  356. height := latest.Number.Uint64()
  357. origin, err := d.findAncestor(p, height)
  358. if err != nil {
  359. return err
  360. }
  361. d.syncStatsLock.Lock()
  362. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  363. d.syncStatsChainOrigin = origin
  364. }
  365. d.syncStatsChainHeight = height
  366. d.syncStatsLock.Unlock()
  367. // Initiate the sync using a concurrent header and content retrieval algorithm
  368. pivot := uint64(0)
  369. switch d.mode {
  370. case LightSync:
  371. pivot = height
  372. case FastSync:
  373. // Calculate the new fast/slow sync pivot point
  374. if d.fsPivotLock == nil {
  375. pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
  376. if err != nil {
  377. panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
  378. }
  379. if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
  380. pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
  381. }
  382. } else {
  383. // Pivot point locked in, use this and do not pick a new one!
  384. pivot = d.fsPivotLock.Number.Uint64()
  385. }
  386. // If the point is below the origin, move origin back to ensure state download
  387. if pivot < origin {
  388. if pivot > 0 {
  389. origin = pivot - 1
  390. } else {
  391. origin = 0
  392. }
  393. }
  394. glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
  395. }
  396. d.queue.Prepare(origin+1, d.mode, pivot, latest)
  397. if d.syncInitHook != nil {
  398. d.syncInitHook(origin, height)
  399. }
  400. return d.spawnSync(origin+1,
  401. func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
  402. func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
  403. func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
  404. func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
  405. func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
  406. )
  407. }
  408. // spawnSync runs d.process and all given fetcher functions to completion in
  409. // separate goroutines, returning the first error that appears.
  410. func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
  411. var wg sync.WaitGroup
  412. errc := make(chan error, len(fetchers)+1)
  413. wg.Add(len(fetchers) + 1)
  414. go func() { defer wg.Done(); errc <- d.processContent() }()
  415. for _, fn := range fetchers {
  416. fn := fn
  417. go func() { defer wg.Done(); errc <- fn() }()
  418. }
  419. // Wait for the first error, then terminate the others.
  420. var err error
  421. for i := 0; i < len(fetchers)+1; i++ {
  422. if i == len(fetchers) {
  423. // Close the queue when all fetchers have exited.
  424. // This will cause the block processor to end when
  425. // it has processed the queue.
  426. d.queue.Close()
  427. }
  428. if err = <-errc; err != nil {
  429. break
  430. }
  431. }
  432. d.queue.Close()
  433. d.cancel()
  434. wg.Wait()
  435. // If sync failed in the critical section, bump the fail counter
  436. if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
  437. atomic.AddUint32(&d.fsPivotFails, 1)
  438. }
  439. return err
  440. }
  441. // cancel cancels all of the operations and resets the queue. It returns true
  442. // if the cancel operation was completed.
  443. func (d *Downloader) cancel() {
  444. // Close the current cancel channel
  445. d.cancelLock.Lock()
  446. if d.cancelCh != nil {
  447. select {
  448. case <-d.cancelCh:
  449. // Channel was already closed
  450. default:
  451. close(d.cancelCh)
  452. }
  453. }
  454. d.cancelLock.Unlock()
  455. }
  456. // Terminate interrupts the downloader, canceling all pending operations.
  457. // The downloader cannot be reused after calling Terminate.
  458. func (d *Downloader) Terminate() {
  459. // Close the termination channel (make sure double close is allowed)
  460. d.quitLock.Lock()
  461. select {
  462. case <-d.quitCh:
  463. default:
  464. close(d.quitCh)
  465. }
  466. d.quitLock.Unlock()
  467. // Cancel any pending download requests
  468. d.cancel()
  469. }
  470. // fetchHeight retrieves the head header of the remote peer to aid in estimating
  471. // the total time a pending synchronisation would take.
  472. func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
  473. glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
  474. // Request the advertised remote head block and wait for the response
  475. head, _ := p.currentHead()
  476. go p.getRelHeaders(head, 1, 0, false)
  477. timeout := time.After(d.requestTTL())
  478. for {
  479. select {
  480. case <-d.cancelCh:
  481. return nil, errCancelBlockFetch
  482. case packet := <-d.headerCh:
  483. // Discard anything not from the origin peer
  484. if packet.PeerId() != p.id {
  485. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  486. break
  487. }
  488. // Make sure the peer actually gave something valid
  489. headers := packet.(*headerPack).headers
  490. if len(headers) != 1 {
  491. glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
  492. return nil, errBadPeer
  493. }
  494. return headers[0], nil
  495. case <-timeout:
  496. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  497. return nil, errTimeout
  498. case <-d.bodyCh:
  499. case <-d.stateCh:
  500. case <-d.receiptCh:
  501. // Out of bounds delivery, ignore
  502. }
  503. }
  504. }
  505. // findAncestor tries to locate the common ancestor link of the local chain and
  506. // a remote peers blockchain. In the general case when our node was in sync and
  507. // on the correct chain, checking the top N links should already get us a match.
  508. // In the rare scenario when we ended up on a long reorganisation (i.e. none of
  509. // the head links match), we do a binary search to find the common ancestor.
  510. func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
  511. glog.V(logger.Debug).Infof("%v: looking for common ancestor (remote height %d)", p, height)
  512. // Figure out the valid ancestor range to prevent rewrite attacks
  513. floor, ceil := int64(-1), d.headHeader().Number.Uint64()
  514. if d.mode == FullSync {
  515. ceil = d.headBlock().NumberU64()
  516. } else if d.mode == FastSync {
  517. ceil = d.headFastBlock().NumberU64()
  518. }
  519. if ceil >= MaxForkAncestry {
  520. floor = int64(ceil - MaxForkAncestry)
  521. }
  522. // Request the topmost blocks to short circuit binary ancestor lookup
  523. head := ceil
  524. if head > height {
  525. head = height
  526. }
  527. from := int64(head) - int64(MaxHeaderFetch)
  528. if from < 0 {
  529. from = 0
  530. }
  531. // Span out with 15 block gaps into the future to catch bad head reports
  532. limit := 2 * MaxHeaderFetch / 16
  533. count := 1 + int((int64(ceil)-from)/16)
  534. if count > limit {
  535. count = limit
  536. }
  537. go p.getAbsHeaders(uint64(from), count, 15, false)
  538. // Wait for the remote response to the head fetch
  539. number, hash := uint64(0), common.Hash{}
  540. timeout := time.After(d.requestTTL())
  541. for finished := false; !finished; {
  542. select {
  543. case <-d.cancelCh:
  544. return 0, errCancelHeaderFetch
  545. case packet := <-d.headerCh:
  546. // Discard anything not from the origin peer
  547. if packet.PeerId() != p.id {
  548. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
  549. break
  550. }
  551. // Make sure the peer actually gave something valid
  552. headers := packet.(*headerPack).headers
  553. if len(headers) == 0 {
  554. glog.V(logger.Warn).Infof("%v: empty head header set", p)
  555. return 0, errEmptyHeaderSet
  556. }
  557. // Make sure the peer's reply conforms to the request
  558. for i := 0; i < len(headers); i++ {
  559. if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
  560. glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i)*16, number)
  561. return 0, errInvalidChain
  562. }
  563. }
  564. // Check if a common ancestor was found
  565. finished = true
  566. for i := len(headers) - 1; i >= 0; i-- {
  567. // Skip any headers that underflow/overflow our requested set
  568. if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
  569. continue
  570. }
  571. // Otherwise check if we already know the header or not
  572. if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
  573. number, hash = headers[i].Number.Uint64(), headers[i].Hash()
  574. // If every header is known, even future ones, the peer straight out lied about its head
  575. if number > height && i == limit-1 {
  576. glog.V(logger.Warn).Infof("%v: lied about chain head: reported %d, found above %d", p, height, number)
  577. return 0, errStallingPeer
  578. }
  579. break
  580. }
  581. }
  582. case <-timeout:
  583. glog.V(logger.Debug).Infof("%v: head header timeout", p)
  584. return 0, errTimeout
  585. case <-d.bodyCh:
  586. case <-d.stateCh:
  587. case <-d.receiptCh:
  588. // Out of bounds delivery, ignore
  589. }
  590. }
  591. // If the head fetch already found an ancestor, return
  592. if !common.EmptyHash(hash) {
  593. if int64(number) <= floor {
  594. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
  595. return 0, errInvalidAncestor
  596. }
  597. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
  598. return number, nil
  599. }
  600. // Ancestor not found, we need to binary search over our chain
  601. start, end := uint64(0), head
  602. if floor > 0 {
  603. start = uint64(floor)
  604. }
  605. for start+1 < end {
  606. // Split our chain interval in two, and request the hash to cross check
  607. check := (start + end) / 2
  608. timeout := time.After(d.requestTTL())
  609. go p.getAbsHeaders(uint64(check), 1, 0, false)
  610. // Wait until a reply arrives to this request
  611. for arrived := false; !arrived; {
  612. select {
  613. case <-d.cancelCh:
  614. return 0, errCancelHeaderFetch
  615. case packer := <-d.headerCh:
  616. // Discard anything not from the origin peer
  617. if packer.PeerId() != p.id {
  618. glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
  619. break
  620. }
  621. // Make sure the peer actually gave something valid
  622. headers := packer.(*headerPack).headers
  623. if len(headers) != 1 {
  624. glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
  625. return 0, errBadPeer
  626. }
  627. arrived = true
  628. // Modify the search interval based on the response
  629. if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
  630. end = check
  631. break
  632. }
  633. header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
  634. if header.Number.Uint64() != check {
  635. glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
  636. return 0, errBadPeer
  637. }
  638. start = check
  639. case <-timeout:
  640. glog.V(logger.Debug).Infof("%v: search header timeout", p)
  641. return 0, errTimeout
  642. case <-d.bodyCh:
  643. case <-d.stateCh:
  644. case <-d.receiptCh:
  645. // Out of bounds delivery, ignore
  646. }
  647. }
  648. }
  649. // Ensure valid ancestry and return
  650. if int64(start) <= floor {
  651. glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)
  652. return 0, errInvalidAncestor
  653. }
  654. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, start, hash[:4])
  655. return start, nil
  656. }
  657. // fetchHeaders keeps retrieving headers concurrently from the number
  658. // requested, until no more are returned, potentially throttling on the way. To
  659. // facilitate concurrency but still protect against malicious nodes sending bad
  660. // headers, we construct a header chain skeleton using the "origin" peer we are
  661. // syncing with, and fill in the missing headers using anyone else. Headers from
  662. // other peers are only accepted if they map cleanly to the skeleton. If no one
  663. // can fill in the skeleton - not even the origin peer - it's assumed invalid and
  664. // the origin is dropped.
  665. func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
  666. glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
  667. defer glog.V(logger.Debug).Infof("%v: header download terminated", p)
  668. // Create a timeout timer, and the associated header fetcher
  669. skeleton := true // Skeleton assembly phase or finishing up
  670. request := time.Now() // time of the last skeleton fetch request
  671. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  672. <-timeout.C // timeout channel should be initially empty
  673. defer timeout.Stop()
  674. getHeaders := func(from uint64) {
  675. request = time.Now()
  676. timeout.Reset(d.requestTTL())
  677. if skeleton {
  678. glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
  679. go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
  680. } else {
  681. glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
  682. go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
  683. }
  684. }
  685. // Start pulling the header chain skeleton until all is done
  686. getHeaders(from)
  687. for {
  688. select {
  689. case <-d.cancelCh:
  690. return errCancelHeaderFetch
  691. case packet := <-d.headerCh:
  692. // Make sure the active peer is giving us the skeleton headers
  693. if packet.PeerId() != p.id {
  694. glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
  695. break
  696. }
  697. headerReqTimer.UpdateSince(request)
  698. timeout.Stop()
  699. // If the skeleton's finished, pull any remaining head headers directly from the origin
  700. if packet.Items() == 0 && skeleton {
  701. skeleton = false
  702. getHeaders(from)
  703. continue
  704. }
  705. // If no more headers are inbound, notify the content fetchers and return
  706. if packet.Items() == 0 {
  707. glog.V(logger.Debug).Infof("%v: no available headers", p)
  708. select {
  709. case d.headerProcCh <- nil:
  710. return nil
  711. case <-d.cancelCh:
  712. return errCancelHeaderFetch
  713. }
  714. }
  715. headers := packet.(*headerPack).headers
  716. // If we received a skeleton batch, resolve internals concurrently
  717. if skeleton {
  718. filled, proced, err := d.fillHeaderSkeleton(from, headers)
  719. if err != nil {
  720. glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
  721. return errInvalidChain
  722. }
  723. headers = filled[proced:]
  724. from += uint64(proced)
  725. }
  726. // Insert all the new headers and fetch the next batch
  727. if len(headers) > 0 {
  728. glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
  729. select {
  730. case d.headerProcCh <- headers:
  731. case <-d.cancelCh:
  732. return errCancelHeaderFetch
  733. }
  734. from += uint64(len(headers))
  735. }
  736. getHeaders(from)
  737. case <-timeout.C:
  738. // Header retrieval timed out, consider the peer bad and drop
  739. glog.V(logger.Debug).Infof("%v: header request timed out", p)
  740. headerTimeoutMeter.Mark(1)
  741. d.dropPeer(p.id)
  742. // Finish the sync gracefully instead of dumping the gathered data though
  743. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  744. select {
  745. case ch <- false:
  746. case <-d.cancelCh:
  747. }
  748. }
  749. select {
  750. case d.headerProcCh <- nil:
  751. case <-d.cancelCh:
  752. }
  753. return errBadPeer
  754. }
  755. }
  756. }
  757. // fillHeaderSkeleton concurrently retrieves headers from all our available peers
  758. // and maps them to the provided skeleton header chain.
  759. //
  760. // Any partial results from the beginning of the skeleton is (if possible) forwarded
  761. // immediately to the header processor to keep the rest of the pipeline full even
  762. // in the case of header stalls.
  763. //
  764. // The method returs the entire filled skeleton and also the number of headers
  765. // already forwarded for processing.
  766. func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
  767. glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
  768. d.queue.ScheduleSkeleton(from, skeleton)
  769. var (
  770. deliver = func(packet dataPack) (int, error) {
  771. pack := packet.(*headerPack)
  772. return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
  773. }
  774. expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
  775. throttle = func() bool { return false }
  776. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  777. return d.queue.ReserveHeaders(p, count), false, nil
  778. }
  779. fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
  780. capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
  781. setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
  782. )
  783. err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
  784. d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
  785. nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")
  786. glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
  787. filled, proced := d.queue.RetrieveHeaders()
  788. return filled, proced, err
  789. }
  790. // fetchBodies iteratively downloads the scheduled block bodies, taking any
  791. // available peers, reserving a chunk of blocks for each, waiting for delivery
  792. // and also periodically checking for timeouts.
  793. func (d *Downloader) fetchBodies(from uint64) error {
  794. glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
  795. var (
  796. deliver = func(packet dataPack) (int, error) {
  797. pack := packet.(*bodyPack)
  798. return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
  799. }
  800. expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
  801. fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
  802. capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
  803. setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
  804. )
  805. err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
  806. d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
  807. d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body")
  808. glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
  809. return err
  810. }
  811. // fetchReceipts iteratively downloads the scheduled block receipts, taking any
  812. // available peers, reserving a chunk of receipts for each, waiting for delivery
  813. // and also periodically checking for timeouts.
  814. func (d *Downloader) fetchReceipts(from uint64) error {
  815. glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
  816. var (
  817. deliver = func(packet dataPack) (int, error) {
  818. pack := packet.(*receiptPack)
  819. return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
  820. }
  821. expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
  822. fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
  823. capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
  824. setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
  825. )
  826. err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
  827. d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
  828. d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
  829. glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
  830. return err
  831. }
  832. // fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
  833. // available peers, reserving a chunk of nodes for each, waiting for delivery and
  834. // also periodically checking for timeouts.
  835. func (d *Downloader) fetchNodeData() error {
  836. glog.V(logger.Debug).Infof("Downloading node state data")
  837. var (
  838. deliver = func(packet dataPack) (int, error) {
  839. start := time.Now()
  840. return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
  841. // If the peer returned old-requested data, forgive
  842. if err == trie.ErrNotRequested {
  843. glog.V(logger.Debug).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
  844. return
  845. }
  846. if err != nil {
  847. // If the node data processing failed, the root hash is very wrong, abort
  848. glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
  849. d.cancel()
  850. return
  851. }
  852. // Processing succeeded, notify state fetcher of continuation
  853. pending := d.queue.PendingNodeData()
  854. if pending > 0 {
  855. select {
  856. case d.stateWakeCh <- true:
  857. default:
  858. }
  859. }
  860. d.syncStatsLock.Lock()
  861. d.syncStatsStateDone += uint64(delivered)
  862. syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
  863. d.syncStatsLock.Unlock()
  864. // If real database progress was made, reset any fast-sync pivot failure
  865. if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
  866. glog.V(logger.Debug).Infof("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails))
  867. atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
  868. }
  869. // Log a message to the user and return
  870. if delivered > 0 {
  871. glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending)
  872. }
  873. })
  874. }
  875. expire = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
  876. throttle = func() bool { return false }
  877. reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
  878. return d.queue.ReserveNodeData(p, count), false, nil
  879. }
  880. fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
  881. capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
  882. setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
  883. )
  884. err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
  885. d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
  886. d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State")
  887. glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
  888. return err
  889. }
  890. // fetchParts iteratively downloads scheduled block parts, taking any available
  891. // peers, reserving a chunk of fetch requests for each, waiting for delivery and
  892. // also periodically checking for timeouts.
  893. //
  894. // As the scheduling/timeout logic mostly is the same for all downloaded data
  895. // types, this method is used by each for data gathering and is instrumented with
  896. // various callbacks to handle the slight differences between processing them.
  897. //
  898. // The instrumentation parameters:
  899. // - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
  900. // - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
  901. // - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
  902. // - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
  903. // - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
  904. // - pending: task callback for the number of requests still needing download (detect completion/non-completability)
  905. // - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
  906. // - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
  907. // - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
  908. // - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
  909. // - fetch: network callback to actually send a particular download request to a physical remote peer
  910. // - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
  911. // - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
  912. // - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
  913. // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
  914. // - kind: textual label of the type being downloaded to display in log mesages
  915. func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
  916. expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
  917. fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
  918. idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
  919. // Create a ticker to detect expired retrieval tasks
  920. ticker := time.NewTicker(100 * time.Millisecond)
  921. defer ticker.Stop()
  922. update := make(chan struct{}, 1)
  923. // Prepare the queue and fetch block parts until the block header fetcher's done
  924. finished := false
  925. for {
  926. select {
  927. case <-d.cancelCh:
  928. return errCancel
  929. case packet := <-deliveryCh:
  930. // If the peer was previously banned and failed to deliver it's pack
  931. // in a reasonable time frame, ignore it's message.
  932. if peer := d.peers.Peer(packet.PeerId()); peer != nil {
  933. // Deliver the received chunk of data and check chain validity
  934. accepted, err := deliver(packet)
  935. if err == errInvalidChain {
  936. return err
  937. }
  938. // Unless a peer delivered something completely else than requested (usually
  939. // caused by a timed out request which came through in the end), set it to
  940. // idle. If the delivery's stale, the peer should have already been idled.
  941. if err != errStaleDelivery {
  942. setIdle(peer, accepted)
  943. }
  944. // Issue a log to the user to see what's going on
  945. switch {
  946. case err == nil && packet.Items() == 0:
  947. glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
  948. case err == nil:
  949. glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
  950. default:
  951. glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
  952. }
  953. }
  954. // Blocks assembled, try to update the progress
  955. select {
  956. case update <- struct{}{}:
  957. default:
  958. }
  959. case cont := <-wakeCh:
  960. // The header fetcher sent a continuation flag, check if it's done
  961. if !cont {
  962. finished = true
  963. }
  964. // Headers arrive, try to update the progress
  965. select {
  966. case update <- struct{}{}:
  967. default:
  968. }
  969. case <-ticker.C:
  970. // Sanity check update the progress
  971. select {
  972. case update <- struct{}{}:
  973. default:
  974. }
  975. case <-update:
  976. // Short circuit if we lost all our peers
  977. if d.peers.Len() == 0 {
  978. return errNoPeers
  979. }
  980. // Check for fetch request timeouts and demote the responsible peers
  981. for pid, fails := range expire() {
  982. if peer := d.peers.Peer(pid); peer != nil {
  983. // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
  984. // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
  985. // out that sync wise we need to get rid of the peer.
  986. //
  987. // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
  988. // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
  989. // how response times reacts, to it always requests one more than the minimum (i.e. min 2).
  990. if fails > 2 {
  991. glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
  992. setIdle(peer, 0)
  993. } else {
  994. glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
  995. d.dropPeer(pid)
  996. }
  997. }
  998. }
  999. // If there's nothing more to fetch, wait or terminate
  1000. if pending() == 0 {
  1001. if !inFlight() && finished {
  1002. glog.V(logger.Debug).Infof("%s fetching completed", kind)
  1003. return nil
  1004. }
  1005. break
  1006. }
  1007. // Send a download request to all idle peers, until throttled
  1008. progressed, throttled, running := false, false, inFlight()
  1009. idles, total := idle()
  1010. for _, peer := range idles {
  1011. // Short circuit if throttling activated
  1012. if throttle() {
  1013. throttled = true
  1014. break
  1015. }
  1016. // Reserve a chunk of fetches for a peer. A nil can mean either that
  1017. // no more headers are available, or that the peer is known not to
  1018. // have them.
  1019. request, progress, err := reserve(peer, capacity(peer))
  1020. if err != nil {
  1021. return err
  1022. }
  1023. if progress {
  1024. progressed = true
  1025. }
  1026. if request == nil {
  1027. continue
  1028. }
  1029. if glog.V(logger.Detail) {
  1030. if request.From > 0 {
  1031. glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
  1032. } else if len(request.Headers) > 0 {
  1033. glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
  1034. } else {
  1035. glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
  1036. }
  1037. }
  1038. // Fetch the chunk and make sure any errors return the hashes to the queue
  1039. if fetchHook != nil {
  1040. fetchHook(request.Headers)
  1041. }
  1042. if err := fetch(peer, request); err != nil {
  1043. // Although we could try and make an attempt to fix this, this error really
  1044. // means that we've double allocated a fetch task to a peer. If that is the
  1045. // case, the internal state of the downloader and the queue is very wrong so
  1046. // better hard crash and note the error instead of silently accumulating into
  1047. // a much bigger issue.
  1048. panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind)))
  1049. }
  1050. running = true
  1051. }
  1052. // Make sure that we have peers available for fetching. If all peers have been tried
  1053. // and all failed throw an error
  1054. if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
  1055. return errPeersUnavailable
  1056. }
  1057. }
  1058. }
  1059. }
  1060. // processHeaders takes batches of retrieved headers from an input channel and
  1061. // keeps processing and scheduling them into the header chain and downloader's
  1062. // queue until the stream ends or a failure occurs.
  1063. func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
  1064. // Calculate the pivoting point for switching from fast to slow sync
  1065. pivot := d.queue.FastSyncPivot()
  1066. // Keep a count of uncertain headers to roll back
  1067. rollback := []*types.Header{}
  1068. defer func() {
  1069. if len(rollback) > 0 {
  1070. // Flatten the headers and roll them back
  1071. hashes := make([]common.Hash, len(rollback))
  1072. for i, header := range rollback {
  1073. hashes[i] = header.Hash()
  1074. }
  1075. lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0
  1076. if d.headFastBlock != nil {
  1077. lastFastBlock = d.headFastBlock().Number()
  1078. }
  1079. if d.headBlock != nil {
  1080. lastBlock = d.headBlock().Number()
  1081. }
  1082. d.rollback(hashes)
  1083. curFastBlock, curBlock := common.Big0, common.Big0
  1084. if d.headFastBlock != nil {
  1085. curFastBlock = d.headFastBlock().Number()
  1086. }
  1087. if d.headBlock != nil {
  1088. curBlock = d.headBlock().Number()
  1089. }
  1090. glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
  1091. len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, curFastBlock, lastBlock, curBlock)
  1092. // If we're already past the pivot point, this could be an attack, thread carefully
  1093. if rollback[len(rollback)-1].Number.Uint64() > pivot {
  1094. // If we didn't ever fail, lock in te pivot header (must! not! change!)
  1095. if atomic.LoadUint32(&d.fsPivotFails) == 0 {
  1096. for _, header := range rollback {
  1097. if header.Number.Uint64() == pivot {
  1098. glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
  1099. d.fsPivotLock = header
  1100. }
  1101. }
  1102. }
  1103. }
  1104. }
  1105. }()
  1106. // Wait for batches of headers to process
  1107. gotHeaders := false
  1108. for {
  1109. select {
  1110. case <-d.cancelCh:
  1111. return errCancelHeaderProcessing
  1112. case headers := <-d.headerProcCh:
  1113. // Terminate header processing if we synced up
  1114. if len(headers) == 0 {
  1115. // Notify everyone that headers are fully processed
  1116. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1117. select {
  1118. case ch <- false:
  1119. case <-d.cancelCh:
  1120. }
  1121. }
  1122. // If no headers were retrieved at all, the peer violated it's TD promise that it had a
  1123. // better chain compared to ours. The only exception is if it's promised blocks were
  1124. // already imported by other means (e.g. fecher):
  1125. //
  1126. // R <remote peer>, L <local node>: Both at block 10
  1127. // R: Mine block 11, and propagate it to L
  1128. // L: Queue block 11 for import
  1129. // L: Notice that R's head and TD increased compared to ours, start sync
  1130. // L: Import of block 11 finishes
  1131. // L: Sync begins, and finds common ancestor at 11
  1132. // L: Request new headers up from 11 (R's TD was higher, it must have something)
  1133. // R: Nothing to give
  1134. if d.mode != LightSync {
  1135. if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
  1136. return errStallingPeer
  1137. }
  1138. }
  1139. // If fast or light syncing, ensure promised headers are indeed delivered. This is
  1140. // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
  1141. // of delivering the post-pivot blocks that would flag the invalid content.
  1142. //
  1143. // This check cannot be executed "as is" for full imports, since blocks may still be
  1144. // queued for processing when the header download completes. However, as long as the
  1145. // peer gave us something useful, we're already happy/progressed (above check).
  1146. if d.mode == FastSync || d.mode == LightSync {
  1147. if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
  1148. return errStallingPeer
  1149. }
  1150. }
  1151. // Disable any rollback and return
  1152. rollback = nil
  1153. return nil
  1154. }
  1155. // Otherwise split the chunk of headers into batches and process them
  1156. gotHeaders = true
  1157. for len(headers) > 0 {
  1158. // Terminate if something failed in between processing chunks
  1159. select {
  1160. case <-d.cancelCh:
  1161. return errCancelHeaderProcessing
  1162. default:
  1163. }
  1164. // Select the next chunk of headers to import
  1165. limit := maxHeadersProcess
  1166. if limit > len(headers) {
  1167. limit = len(headers)
  1168. }
  1169. chunk := headers[:limit]
  1170. // In case of header only syncing, validate the chunk immediately
  1171. if d.mode == FastSync || d.mode == LightSync {
  1172. // Collect the yet unknown headers to mark them as uncertain
  1173. unknown := make([]*types.Header, 0, len(headers))
  1174. for _, header := range chunk {
  1175. if !d.hasHeader(header.Hash()) {
  1176. unknown = append(unknown, header)
  1177. }
  1178. }
  1179. // If we're importing pure headers, verify based on their recentness
  1180. frequency := fsHeaderCheckFrequency
  1181. if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
  1182. frequency = 1
  1183. }
  1184. if n, err := d.insertHeaders(chunk, frequency); err != nil {
  1185. // If some headers were inserted, add them too to the rollback list
  1186. if n > 0 {
  1187. rollback = append(rollback, chunk[:n]...)
  1188. }
  1189. glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
  1190. return errInvalidChain
  1191. }
  1192. // All verifications passed, store newly found uncertain headers
  1193. rollback = append(rollback, unknown...)
  1194. if len(rollback) > fsHeaderSafetyNet {
  1195. rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
  1196. }
  1197. }
  1198. // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
  1199. if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
  1200. if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
  1201. glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
  1202. return errInvalidChain
  1203. }
  1204. }
  1205. // Unless we're doing light chains, schedule the headers for associated content retrieval
  1206. if d.mode == FullSync || d.mode == FastSync {
  1207. // If we've reached the allowed number of pending headers, stall a bit
  1208. for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
  1209. select {
  1210. case <-d.cancelCh:
  1211. return errCancelHeaderProcessing
  1212. case <-time.After(time.Second):
  1213. }
  1214. }
  1215. // Otherwise insert the headers for content retrieval
  1216. inserts := d.queue.Schedule(chunk, origin)
  1217. if len(inserts) != len(chunk) {
  1218. glog.V(logger.Debug).Infof("stale headers")
  1219. return errBadPeer
  1220. }
  1221. }
  1222. headers = headers[limit:]
  1223. origin += uint64(limit)
  1224. }
  1225. // Signal the content downloaders of the availablility of new tasks
  1226. for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
  1227. select {
  1228. case ch <- true:
  1229. default:
  1230. }
  1231. }
  1232. }
  1233. }
  1234. }
  1235. // processContent takes fetch results from the queue and tries to import them
  1236. // into the chain. The type of import operation will depend on the result contents.
  1237. func (d *Downloader) processContent() error {
  1238. pivot := d.queue.FastSyncPivot()
  1239. for {
  1240. results := d.queue.WaitResults()
  1241. if len(results) == 0 {
  1242. return nil // queue empty
  1243. }
  1244. if d.chainInsertHook != nil {
  1245. d.chainInsertHook(results)
  1246. }
  1247. // Actually import the blocks
  1248. if glog.V(logger.Debug) {
  1249. first, last := results[0].Header, results[len(results)-1].Header
  1250. glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
  1251. }
  1252. for len(results) != 0 {
  1253. // Check for any termination requests
  1254. select {
  1255. case <-d.quitCh:
  1256. return errCancelContentProcessing
  1257. default:
  1258. }
  1259. // Retrieve the a batch of results to import
  1260. var (
  1261. blocks = make([]*types.Block, 0, maxResultsProcess)
  1262. receipts = make([]types.Receipts, 0, maxResultsProcess)
  1263. )
  1264. items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
  1265. for _, result := range results[:items] {
  1266. switch {
  1267. case d.mode == FullSync:
  1268. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1269. case d.mode == FastSync:
  1270. blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
  1271. if result.Header.Number.Uint64() <= pivot {
  1272. receipts = append(receipts, result.Receipts)
  1273. }
  1274. }
  1275. }
  1276. // Try to process the results, aborting if there's an error
  1277. var (
  1278. err error
  1279. index int
  1280. )
  1281. switch {
  1282. case len(receipts) > 0:
  1283. index, err = d.insertReceipts(blocks, receipts)
  1284. if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
  1285. glog.V(logger.Debug).Infof("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4])
  1286. index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
  1287. }
  1288. default:
  1289. index, err = d.insertBlocks(blocks)
  1290. }
  1291. if err != nil {
  1292. glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
  1293. return errInvalidChain
  1294. }
  1295. // Shift the results to the next batch
  1296. results = results[items:]
  1297. }
  1298. }
  1299. }
  1300. // DeliverHeaders injects a new batch of block headers received from a remote
  1301. // node into the download schedule.
  1302. func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
  1303. return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
  1304. }
  1305. // DeliverBodies injects a new batch of block bodies received from a remote node.
  1306. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
  1307. return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
  1308. }
  1309. // DeliverReceipts injects a new batch of receipts received from a remote node.
  1310. func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
  1311. return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
  1312. }
  1313. // DeliverNodeData injects a new batch of node state data received from a remote node.
  1314. func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
  1315. return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
  1316. }
  1317. // deliver injects a new batch of data received from a remote node.
  1318. func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  1319. // Update the delivery metrics for both good and failed deliveries
  1320. inMeter.Mark(int64(packet.Items()))
  1321. defer func() {
  1322. if err != nil {
  1323. dropMeter.Mark(int64(packet.Items()))
  1324. }
  1325. }()
  1326. // Deliver or abort if the sync is canceled while queuing
  1327. d.cancelLock.RLock()
  1328. cancel := d.cancelCh
  1329. d.cancelLock.RUnlock()
  1330. if cancel == nil {
  1331. return errNoSyncActive
  1332. }
  1333. select {
  1334. case destCh <- packet:
  1335. return nil
  1336. case <-cancel:
  1337. return errNoSyncActive
  1338. }
  1339. }
  1340. // qosTuner is the quality of service tuning loop that occasionally gathers the
  1341. // peer latency statistics and updates the estimated request round trip time.
  1342. func (d *Downloader) qosTuner() {
  1343. for {
  1344. // Retrieve the current median RTT and integrate into the previoust target RTT
  1345. rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
  1346. atomic.StoreUint64(&d.rttEstimate, uint64(rtt))
  1347. // A new RTT cycle passed, increase our confidence in the estimated RTT
  1348. conf := atomic.LoadUint64(&d.rttConfidence)
  1349. conf = conf + (1000000-conf)/2
  1350. atomic.StoreUint64(&d.rttConfidence, conf)
  1351. // Log the new QoS values and sleep until the next RTT
  1352. glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
  1353. select {
  1354. case <-d.quitCh:
  1355. return
  1356. case <-time.After(rtt):
  1357. }
  1358. }
  1359. }
  1360. // qosReduceConfidence is meant to be called when a new peer joins the downloader's
  1361. // peer set, needing to reduce the confidence we have in out QoS estimates.
  1362. func (d *Downloader) qosReduceConfidence() {
  1363. // If we have a single peer, confidence is always 1
  1364. peers := uint64(d.peers.Len())
  1365. if peers == 1 {
  1366. atomic.StoreUint64(&d.rttConfidence, 1000000)
  1367. return
  1368. }
  1369. // If we have a ton of peers, don't drop confidence)
  1370. if peers >= uint64(qosConfidenceCap) {
  1371. return
  1372. }
  1373. // Otherwise drop the confidence factor
  1374. conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
  1375. if float64(conf)/1000000 < rttMinConfidence {
  1376. conf = uint64(rttMinConfidence * 1000000)
  1377. }
  1378. atomic.StoreUint64(&d.rttConfidence, conf)
  1379. rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1380. glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
  1381. }
  1382. // requestRTT returns the current target round trip time for a download request
  1383. // to complete in.
  1384. //
  1385. // Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
  1386. // the downloader tries to adapt queries to the RTT, so multiple RTT values can
  1387. // be adapted to, but smaller ones are preffered (stabler download stream).
  1388. func (d *Downloader) requestRTT() time.Duration {
  1389. return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
  1390. }
  1391. // requestTTL returns the current timeout allowance for a single download request
  1392. // to finish under.
  1393. func (d *Downloader) requestTTL() time.Duration {
  1394. var (
  1395. rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
  1396. conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
  1397. )
  1398. ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
  1399. if ttl > ttlLimit {
  1400. ttl = ttlLimit
  1401. }
  1402. return ttl
  1403. }