downloader.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package downloader contains the manual full chain synchronisation.
  17. package downloader
  18. import (
  19. "bytes"
  20. "errors"
  21. "math"
  22. "math/big"
  23. "math/rand"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/event"
  31. "github.com/ethereum/go-ethereum/logger"
  32. "github.com/ethereum/go-ethereum/logger/glog"
  33. "gopkg.in/fatih/set.v0"
  34. )
  35. const (
  36. eth60 = 60 // Constant to check for old protocol support
  37. eth61 = 61 // Constant to check for new protocol support
  38. eth62 = 62 // Constant to check for experimental protocol support
  39. )
  40. var (
  41. MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling
  42. MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
  43. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  44. hashTTL = 5 * time.Second // Time it takes for a hash request to time out
  45. blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth
  46. blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired
  47. crossCheckCycle = time.Second // Period after which to check for expired cross checks
  48. maxQueuedHashes = 256 * 1024 // Maximum number of hashes to queue for import (DOS protection)
  49. maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
  50. maxBlockProcess = 256 // Number of blocks to import at once into the chain
  51. )
  52. var (
  53. errBusy = errors.New("busy")
  54. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  55. errBadPeer = errors.New("action from bad peer ignored")
  56. errStallingPeer = errors.New("peer is stalling")
  57. errBannedHead = errors.New("peer head hash already banned")
  58. errNoPeers = errors.New("no peers to keep download active")
  59. errPendingQueue = errors.New("pending items in queue")
  60. errTimeout = errors.New("timeout")
  61. errEmptyHashSet = errors.New("empty hash set by peer")
  62. errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
  63. errAlreadyInPool = errors.New("hash already in pool")
  64. errInvalidChain = errors.New("retrieved hash chain is invalid")
  65. errCrossCheckFailed = errors.New("block cross-check failed")
  66. errCancelHashFetch = errors.New("hash fetching canceled (requested)")
  67. errCancelBlockFetch = errors.New("block downloading canceled (requested)")
  68. errNoSyncActive = errors.New("no sync active")
  69. )
  70. // hashCheckFn is a callback type for verifying a hash's presence in the local chain.
  71. type hashCheckFn func(common.Hash) bool
  72. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  73. type blockRetrievalFn func(common.Hash) *types.Block
  74. // headRetrievalFn is a callback type for retrieving the head block from the local chain.
  75. type headRetrievalFn func() *types.Block
  76. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  77. type chainInsertFn func(types.Blocks) (int, error)
  78. // peerDropFn is a callback type for dropping a peer detected as malicious.
  79. type peerDropFn func(id string)
  80. type blockPack struct {
  81. peerId string
  82. blocks []*types.Block
  83. }
  84. type hashPack struct {
  85. peerId string
  86. hashes []common.Hash
  87. }
  88. type crossCheck struct {
  89. expire time.Time
  90. parent common.Hash
  91. }
  92. type Downloader struct {
  93. mux *event.TypeMux
  94. queue *queue // Scheduler for selecting the hashes to download
  95. peers *peerSet // Set of active peers from which download can proceed
  96. checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
  97. banned *set.Set // Set of hashes we've received and banned
  98. interrupt int32 // Atomic boolean to signal termination
  99. // Statistics
  100. importStart time.Time // Instance when the last blocks were taken from the cache
  101. importQueue []*Block // Previously taken blocks to check import progress
  102. importDone int // Number of taken blocks already imported from the last batch
  103. importLock sync.Mutex
  104. // Callbacks
  105. hasBlock hashCheckFn // Checks if a block is present in the chain
  106. getBlock blockRetrievalFn // Retrieves a block from the chain
  107. headBlock headRetrievalFn // Retrieves the head block from the chain
  108. insertChain chainInsertFn // Injects a batch of blocks into the chain
  109. dropPeer peerDropFn // Drops a peer for misbehaving
  110. // Status
  111. synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  112. synchronising int32
  113. processing int32
  114. notified int32
  115. // Channels
  116. newPeerCh chan *peer
  117. hashCh chan hashPack // Channel receiving inbound hashes
  118. blockCh chan blockPack // Channel receiving inbound blocks
  119. processCh chan bool // Channel to signal the block fetcher of new or finished work
  120. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  121. cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
  122. }
  123. // Block is an origin-tagged blockchain block.
  124. type Block struct {
  125. RawBlock *types.Block
  126. OriginPeer string
  127. }
  128. // New creates a new downloader to fetch hashes and blocks from remote peers.
  129. func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
  130. // Create the base downloader
  131. downloader := &Downloader{
  132. mux: mux,
  133. queue: newQueue(),
  134. peers: newPeerSet(),
  135. hasBlock: hasBlock,
  136. getBlock: getBlock,
  137. headBlock: headBlock,
  138. insertChain: insertChain,
  139. dropPeer: dropPeer,
  140. newPeerCh: make(chan *peer, 1),
  141. hashCh: make(chan hashPack, 1),
  142. blockCh: make(chan blockPack, 1),
  143. processCh: make(chan bool, 1),
  144. }
  145. // Inject all the known bad hashes
  146. downloader.banned = set.New()
  147. for hash, _ := range core.BadHashes {
  148. downloader.banned.Add(hash)
  149. }
  150. return downloader
  151. }
  152. // Stats retrieves the current status of the downloader.
  153. func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
  154. // Fetch the download status
  155. pending, cached = d.queue.Size()
  156. // Figure out the import progress
  157. d.importLock.Lock()
  158. defer d.importLock.Unlock()
  159. for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
  160. d.importQueue = d.importQueue[1:]
  161. d.importDone++
  162. }
  163. importing = len(d.importQueue)
  164. // Make an estimate on the total sync
  165. estimate = 0
  166. if d.importDone > 0 {
  167. estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
  168. }
  169. return
  170. }
  171. // Synchronising returns whether the downloader is currently retrieving blocks.
  172. func (d *Downloader) Synchronising() bool {
  173. return atomic.LoadInt32(&d.synchronising) > 0
  174. }
  175. // RegisterPeer injects a new download peer into the set of block source to be
  176. // used for fetching hashes and blocks from.
  177. func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) error {
  178. // If the peer wants to send a banned hash, reject
  179. if d.banned.Has(head) {
  180. glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id)
  181. return errBannedHead
  182. }
  183. // Otherwise try to construct and register the peer
  184. glog.V(logger.Detail).Infoln("Registering peer", id)
  185. if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks)); err != nil {
  186. glog.V(logger.Error).Infoln("Register failed:", err)
  187. return err
  188. }
  189. return nil
  190. }
  191. // UnregisterPeer remove a peer from the known list, preventing any action from
  192. // the specified peer.
  193. func (d *Downloader) UnregisterPeer(id string) error {
  194. glog.V(logger.Detail).Infoln("Unregistering peer", id)
  195. if err := d.peers.Unregister(id); err != nil {
  196. glog.V(logger.Error).Infoln("Unregister failed:", err)
  197. return err
  198. }
  199. return nil
  200. }
  201. // Synchronise tries to sync up our local block chain with a remote peer, both
  202. // adding various sanity checks as well as wrapping it with various log entries.
  203. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int) {
  204. glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head 0x%x, TD %v", id, head[:4], td)
  205. switch err := d.synchronise(id, head, td); err {
  206. case nil:
  207. glog.V(logger.Detail).Infof("Synchronisation completed")
  208. case errBusy:
  209. glog.V(logger.Detail).Infof("Synchronisation already in progress")
  210. case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed:
  211. glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
  212. d.dropPeer(id)
  213. case errPendingQueue:
  214. glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
  215. default:
  216. glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
  217. }
  218. }
  219. // synchronise will select the peer and use it for synchronising. If an empty string is given
  220. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  221. // checks fail an error will be returned. This method is synchronous
  222. func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error {
  223. // Mock out the synchonisation if testing
  224. if d.synchroniseMock != nil {
  225. return d.synchroniseMock(id, hash)
  226. }
  227. // Make sure only one goroutine is ever allowed past this point at once
  228. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  229. return errBusy
  230. }
  231. defer atomic.StoreInt32(&d.synchronising, 0)
  232. // If the head hash is banned, terminate immediately
  233. if d.banned.Has(hash) {
  234. return errBannedHead
  235. }
  236. // Post a user notification of the sync (only once per session)
  237. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  238. glog.V(logger.Info).Infoln("Block synchronisation started")
  239. }
  240. // Abort if the queue still contains some leftover data
  241. if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
  242. return errPendingQueue
  243. }
  244. // Reset the queue and peer set to clean any internal leftover state
  245. d.queue.Reset()
  246. d.peers.Reset()
  247. d.checks = make(map[common.Hash]*crossCheck)
  248. // Create cancel channel for aborting mid-flight
  249. d.cancelLock.Lock()
  250. d.cancelCh = make(chan struct{})
  251. d.cancelLock.Unlock()
  252. // Retrieve the origin peer and initiate the downloading process
  253. p := d.peers.Peer(id)
  254. if p == nil {
  255. return errUnknownPeer
  256. }
  257. return d.syncWithPeer(p, hash, td)
  258. }
  259. // Has checks if the downloader knows about a particular hash, meaning that its
  260. // either already downloaded of pending retrieval.
  261. func (d *Downloader) Has(hash common.Hash) bool {
  262. return d.queue.Has(hash)
  263. }
  264. // syncWithPeer starts a block synchronization based on the hash chain from the
  265. // specified peer and head hash.
  266. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
  267. d.mux.Post(StartEvent{})
  268. defer func() {
  269. // reset on error
  270. if err != nil {
  271. d.cancel()
  272. d.mux.Post(FailedEvent{err})
  273. } else {
  274. d.mux.Post(DoneEvent{})
  275. }
  276. }()
  277. glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version)
  278. switch p.version {
  279. case eth60:
  280. // Old eth/60 version, use reverse hash retrieval algorithm
  281. if err = d.fetchHashes60(p, hash); err != nil {
  282. return err
  283. }
  284. if err = d.fetchBlocks60(); err != nil {
  285. return err
  286. }
  287. case eth61, eth62:
  288. // New eth/61, use forward, concurrent hash and block retrieval algorithm
  289. number, err := d.findAncestor(p)
  290. if err != nil {
  291. return err
  292. }
  293. errc := make(chan error, 2)
  294. go func() { errc <- d.fetchHashes(p, td, number+1) }()
  295. go func() { errc <- d.fetchBlocks(number + 1) }()
  296. // If any fetcher fails, cancel the other
  297. if err := <-errc; err != nil {
  298. d.cancel()
  299. <-errc
  300. return err
  301. }
  302. return <-errc
  303. default:
  304. // Something very wrong, stop right here
  305. glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
  306. return errBadPeer
  307. }
  308. glog.V(logger.Debug).Infoln("Synchronization completed")
  309. return nil
  310. }
  311. // cancel cancels all of the operations and resets the queue. It returns true
  312. // if the cancel operation was completed.
  313. func (d *Downloader) cancel() {
  314. // Close the current cancel channel
  315. d.cancelLock.Lock()
  316. if d.cancelCh != nil {
  317. select {
  318. case <-d.cancelCh:
  319. // Channel was already closed
  320. default:
  321. close(d.cancelCh)
  322. }
  323. }
  324. d.cancelLock.Unlock()
  325. // Reset the queue
  326. d.queue.Reset()
  327. }
  328. // Terminate interrupts the downloader, canceling all pending operations.
  329. func (d *Downloader) Terminate() {
  330. atomic.StoreInt32(&d.interrupt, 1)
  331. d.cancel()
  332. }
  333. // fetchHashes60 starts retrieving hashes backwards from a specific peer and hash,
  334. // up until it finds a common ancestor. If the source peer times out, alternative
  335. // ones are tried for continuation.
  336. func (d *Downloader) fetchHashes60(p *peer, h common.Hash) error {
  337. var (
  338. start = time.Now()
  339. active = p // active peer will help determine the current active peer
  340. head = common.Hash{} // common and last hash
  341. timeout = time.NewTimer(0) // timer to dump a non-responsive active peer
  342. attempted = make(map[string]bool) // attempted peers will help with retries
  343. crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
  344. )
  345. defer crossTicker.Stop()
  346. defer timeout.Stop()
  347. glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
  348. <-timeout.C // timeout channel should be initially empty.
  349. getHashes := func(from common.Hash) {
  350. go active.getRelHashes(from)
  351. timeout.Reset(hashTTL)
  352. }
  353. // Add the hash to the queue, and start hash retrieval.
  354. d.queue.Insert([]common.Hash{h}, false)
  355. getHashes(h)
  356. attempted[p.id] = true
  357. for finished := false; !finished; {
  358. select {
  359. case <-d.cancelCh:
  360. return errCancelHashFetch
  361. case hashPack := <-d.hashCh:
  362. // Make sure the active peer is giving us the hashes
  363. if hashPack.peerId != active.id {
  364. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
  365. break
  366. }
  367. timeout.Stop()
  368. // Make sure the peer actually gave something valid
  369. if len(hashPack.hashes) == 0 {
  370. glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
  371. return errEmptyHashSet
  372. }
  373. for index, hash := range hashPack.hashes {
  374. if d.banned.Has(hash) {
  375. glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id)
  376. d.queue.Insert(hashPack.hashes[:index+1], false)
  377. if err := d.banBlocks(active.id, hash); err != nil {
  378. glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
  379. }
  380. return errInvalidChain
  381. }
  382. }
  383. // Determine if we're done fetching hashes (queue up all pending), and continue if not done
  384. done, index := false, 0
  385. for index, head = range hashPack.hashes {
  386. if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
  387. glog.V(logger.Debug).Infof("Found common hash %x", head[:4])
  388. hashPack.hashes = hashPack.hashes[:index]
  389. done = true
  390. break
  391. }
  392. }
  393. // Insert all the new hashes, but only continue if got something useful
  394. inserts := d.queue.Insert(hashPack.hashes, false)
  395. if len(inserts) == 0 && !done {
  396. glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
  397. return errBadPeer
  398. }
  399. if !done {
  400. // Check that the peer is not stalling the sync
  401. if len(inserts) < MinHashFetch {
  402. return errStallingPeer
  403. }
  404. // Try and fetch a random block to verify the hash batch
  405. // Skip the last hash as the cross check races with the next hash fetch
  406. cross := rand.Intn(len(inserts) - 1)
  407. origin, parent := inserts[cross], inserts[cross+1]
  408. glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent)
  409. d.checks[origin] = &crossCheck{
  410. expire: time.Now().Add(blockSoftTTL),
  411. parent: parent,
  412. }
  413. go active.getBlocks([]common.Hash{origin})
  414. // Also fetch a fresh batch of hashes
  415. getHashes(head)
  416. continue
  417. }
  418. // We're done, prepare the download cache and proceed pulling the blocks
  419. offset := uint64(0)
  420. if block := d.getBlock(head); block != nil {
  421. offset = block.NumberU64() + 1
  422. }
  423. d.queue.Prepare(offset)
  424. finished = true
  425. case blockPack := <-d.blockCh:
  426. // Cross check the block with the random verifications
  427. if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
  428. continue
  429. }
  430. block := blockPack.blocks[0]
  431. if check, ok := d.checks[block.Hash()]; ok {
  432. if block.ParentHash() != check.parent {
  433. return errCrossCheckFailed
  434. }
  435. delete(d.checks, block.Hash())
  436. }
  437. case <-crossTicker.C:
  438. // Iterate over all the cross checks and fail the hash chain if they're not verified
  439. for hash, check := range d.checks {
  440. if time.Now().After(check.expire) {
  441. glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
  442. return errCrossCheckFailed
  443. }
  444. }
  445. case <-timeout.C:
  446. glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id)
  447. var p *peer // p will be set if a peer can be found
  448. // Attempt to find a new peer by checking inclusion of peers best hash in our
  449. // already fetched hash list. This can't guarantee 100% correctness but does
  450. // a fair job. This is always either correct or false incorrect.
  451. for _, peer := range d.peers.AllPeers() {
  452. if d.queue.Has(peer.head) && !attempted[peer.id] {
  453. p = peer
  454. break
  455. }
  456. }
  457. // if all peers have been tried, abort the process entirely or if the hash is
  458. // the zero hash.
  459. if p == nil || (head == common.Hash{}) {
  460. return errTimeout
  461. }
  462. // set p to the active peer. this will invalidate any hashes that may be returned
  463. // by our previous (delayed) peer.
  464. active = p
  465. getHashes(head)
  466. glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id)
  467. }
  468. }
  469. glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start))
  470. return nil
  471. }
  472. // fetchBlocks60 iteratively downloads the entire schedules block-chain, taking
  473. // any available peers, reserving a chunk of blocks for each, wait for delivery
  474. // and periodically checking for timeouts.
  475. func (d *Downloader) fetchBlocks60() error {
  476. glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
  477. start := time.Now()
  478. // Start a ticker to continue throttled downloads and check for bad peers
  479. ticker := time.NewTicker(20 * time.Millisecond)
  480. defer ticker.Stop()
  481. out:
  482. for {
  483. select {
  484. case <-d.cancelCh:
  485. return errCancelBlockFetch
  486. case <-d.hashCh:
  487. // Out of bounds hashes received, ignore them
  488. case blockPack := <-d.blockCh:
  489. // Short circuit if it's a stale cross check
  490. if len(blockPack.blocks) == 1 {
  491. block := blockPack.blocks[0]
  492. if _, ok := d.checks[block.Hash()]; ok {
  493. delete(d.checks, block.Hash())
  494. break
  495. }
  496. }
  497. // If the peer was previously banned and failed to deliver it's pack
  498. // in a reasonable time frame, ignore it's message.
  499. if peer := d.peers.Peer(blockPack.peerId); peer != nil {
  500. // Deliver the received chunk of blocks, and demote in case of errors
  501. err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
  502. switch err {
  503. case nil:
  504. // If no blocks were delivered, demote the peer (need the delivery above)
  505. if len(blockPack.blocks) == 0 {
  506. peer.Demote()
  507. peer.SetIdle()
  508. glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
  509. break
  510. }
  511. // All was successful, promote the peer and potentially start processing
  512. peer.Promote()
  513. peer.SetIdle()
  514. glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
  515. go d.process()
  516. case errInvalidChain:
  517. // The hash chain is invalid (blocks are not ordered properly), abort
  518. return err
  519. case errNoFetchesPending:
  520. // Peer probably timed out with its delivery but came through
  521. // in the end, demote, but allow to to pull from this peer.
  522. peer.Demote()
  523. peer.SetIdle()
  524. glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
  525. case errStaleDelivery:
  526. // Delivered something completely else than requested, usually
  527. // caused by a timeout and delivery during a new sync cycle.
  528. // Don't set it to idle as the original request should still be
  529. // in flight.
  530. peer.Demote()
  531. glog.V(logger.Detail).Infof("%s: stale delivery", peer)
  532. default:
  533. // Peer did something semi-useful, demote but keep it around
  534. peer.Demote()
  535. peer.SetIdle()
  536. glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
  537. go d.process()
  538. }
  539. }
  540. case <-ticker.C:
  541. // Short circuit if we lost all our peers
  542. if d.peers.Len() == 0 {
  543. return errNoPeers
  544. }
  545. // Check for block request timeouts and demote the responsible peers
  546. badPeers := d.queue.Expire(blockHardTTL)
  547. for _, pid := range badPeers {
  548. if peer := d.peers.Peer(pid); peer != nil {
  549. peer.Demote()
  550. glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
  551. }
  552. }
  553. // If there are unrequested hashes left start fetching from the available peers
  554. if d.queue.Pending() > 0 {
  555. // Throttle the download if block cache is full and waiting processing
  556. if d.queue.Throttle() {
  557. break
  558. }
  559. // Send a download request to all idle peers, until throttled
  560. idlePeers := d.peers.IdlePeers()
  561. for _, peer := range idlePeers {
  562. // Short circuit if throttling activated since above
  563. if d.queue.Throttle() {
  564. break
  565. }
  566. // Get a possible chunk. If nil is returned no chunk
  567. // could be returned due to no hashes available.
  568. request := d.queue.Reserve(peer, peer.Capacity())
  569. if request == nil {
  570. continue
  571. }
  572. if glog.V(logger.Detail) {
  573. glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
  574. }
  575. // Fetch the chunk and check for error. If the peer was somehow
  576. // already fetching a chunk due to a bug, it will be returned to
  577. // the queue
  578. if err := peer.Fetch(request); err != nil {
  579. glog.V(logger.Error).Infof("Peer %s received double work", peer.id)
  580. d.queue.Cancel(request)
  581. }
  582. }
  583. // Make sure that we have peers available for fetching. If all peers have been tried
  584. // and all failed throw an error
  585. if d.queue.InFlight() == 0 {
  586. return errPeersUnavailable
  587. }
  588. } else if d.queue.InFlight() == 0 {
  589. // When there are no more queue and no more in flight, We can
  590. // safely assume we're done. Another part of the process will check
  591. // for parent errors and will re-request anything that's missing
  592. break out
  593. }
  594. }
  595. }
  596. glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
  597. return nil
  598. }
  599. // findAncestor tries to locate the common ancestor block of the local chain and
  600. // a remote peers blockchain. In the general case when our node was in sync and
  601. // on the correct chain, checking the top N blocks should already get us a match.
  602. // In the rare scenario when we ended up on a long soft fork (i.e. none of the
  603. // head blocks match), we do a binary search to find the common ancestor.
  604. func (d *Downloader) findAncestor(p *peer) (uint64, error) {
  605. glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)
  606. // Request out head blocks to short circuit ancestor location
  607. head := d.headBlock().NumberU64()
  608. from := int64(head) - int64(MaxHashFetch)
  609. if from < 0 {
  610. from = 0
  611. }
  612. go p.getAbsHashes(uint64(from), MaxHashFetch)
  613. // Wait for the remote response to the head fetch
  614. number, hash := uint64(0), common.Hash{}
  615. timeout := time.After(hashTTL)
  616. for finished := false; !finished; {
  617. select {
  618. case <-d.cancelCh:
  619. return 0, errCancelHashFetch
  620. case hashPack := <-d.hashCh:
  621. // Discard anything not from the origin peer
  622. if hashPack.peerId != p.id {
  623. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
  624. break
  625. }
  626. // Make sure the peer actually gave something valid
  627. hashes := hashPack.hashes
  628. if len(hashes) == 0 {
  629. glog.V(logger.Debug).Infof("%v: empty head hash set", p)
  630. return 0, errEmptyHashSet
  631. }
  632. // Check if a common ancestor was found
  633. finished = true
  634. for i := len(hashes) - 1; i >= 0; i-- {
  635. if d.hasBlock(hashes[i]) {
  636. number, hash = uint64(from)+uint64(i), hashes[i]
  637. break
  638. }
  639. }
  640. case <-d.blockCh:
  641. // Out of bounds blocks received, ignore them
  642. case <-timeout:
  643. glog.V(logger.Debug).Infof("%v: head hash timeout", p)
  644. return 0, errTimeout
  645. }
  646. }
  647. // If the head fetch already found an ancestor, return
  648. if !common.EmptyHash(hash) {
  649. glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x]", p, number, hash[:4])
  650. return number, nil
  651. }
  652. // Ancestor not found, we need to binary search over our chain
  653. start, end := uint64(0), head
  654. for start+1 < end {
  655. // Split our chain interval in two, and request the hash to cross check
  656. check := (start + end) / 2
  657. timeout := time.After(hashTTL)
  658. go p.getAbsHashes(uint64(check), 1)
  659. // Wait until a reply arrives to this request
  660. for arrived := false; !arrived; {
  661. select {
  662. case <-d.cancelCh:
  663. return 0, errCancelHashFetch
  664. case hashPack := <-d.hashCh:
  665. // Discard anything not from the origin peer
  666. if hashPack.peerId != p.id {
  667. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
  668. break
  669. }
  670. // Make sure the peer actually gave something valid
  671. hashes := hashPack.hashes
  672. if len(hashes) != 1 {
  673. glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
  674. return 0, errBadPeer
  675. }
  676. arrived = true
  677. // Modify the search interval based on the response
  678. block := d.getBlock(hashes[0])
  679. if block == nil {
  680. end = check
  681. break
  682. }
  683. if block.NumberU64() != check {
  684. glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
  685. return 0, errBadPeer
  686. }
  687. start = check
  688. case <-d.blockCh:
  689. // Out of bounds blocks received, ignore them
  690. case <-timeout:
  691. glog.V(logger.Debug).Infof("%v: search hash timeout", p)
  692. return 0, errTimeout
  693. }
  694. }
  695. }
  696. return start, nil
  697. }
  698. // fetchHashes keeps retrieving hashes from the requested number, until no more
  699. // are returned, potentially throttling on the way.
  700. func (d *Downloader) fetchHashes(p *peer, td *big.Int, from uint64) error {
  701. glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)
  702. // Create a timeout timer, and the associated hash fetcher
  703. timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
  704. <-timeout.C // timeout channel should be initially empty
  705. defer timeout.Stop()
  706. getHashes := func(from uint64) {
  707. glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)
  708. go p.getAbsHashes(from, MaxHashFetch)
  709. timeout.Reset(hashTTL)
  710. }
  711. // Start pulling hashes, until all are exhausted
  712. getHashes(from)
  713. gotHashes := false
  714. for {
  715. select {
  716. case <-d.cancelCh:
  717. return errCancelHashFetch
  718. case hashPack := <-d.hashCh:
  719. // Make sure the active peer is giving us the hashes
  720. if hashPack.peerId != p.id {
  721. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
  722. break
  723. }
  724. timeout.Stop()
  725. // If no more hashes are inbound, notify the block fetcher and return
  726. if len(hashPack.hashes) == 0 {
  727. glog.V(logger.Debug).Infof("%v: no available hashes", p)
  728. select {
  729. case d.processCh <- false:
  730. case <-d.cancelCh:
  731. }
  732. // If no hashes were retrieved at all, the peer violated it's TD promise that it had a
  733. // better chain compared to ours. The only exception is if it's promised blocks were
  734. // already imported by other means (e.g. fecher):
  735. //
  736. // R <remote peer>, L <local node>: Both at block 10
  737. // R: Mine block 11, and propagate it to L
  738. // L: Queue block 11 for import
  739. // L: Notice that R's head and TD increased compared to ours, start sync
  740. // L: Import of block 11 finishes
  741. // L: Sync begins, and finds common ancestor at 11
  742. // L: Request new hashes up from 11 (R's TD was higher, it must have something)
  743. // R: Nothing to give
  744. if !gotHashes && td.Cmp(d.headBlock().Td) > 0 {
  745. return errStallingPeer
  746. }
  747. return nil
  748. }
  749. gotHashes = true
  750. // Otherwise insert all the new hashes, aborting in case of junk
  751. glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from)
  752. inserts := d.queue.Insert(hashPack.hashes, true)
  753. if len(inserts) != len(hashPack.hashes) {
  754. glog.V(logger.Debug).Infof("%v: stale hashes", p)
  755. return errBadPeer
  756. }
  757. // Notify the block fetcher of new hashes, but stop if queue is full
  758. cont := d.queue.Pending() < maxQueuedHashes
  759. select {
  760. case d.processCh <- cont:
  761. default:
  762. }
  763. if !cont {
  764. return nil
  765. }
  766. // Queue not yet full, fetch the next batch
  767. from += uint64(len(hashPack.hashes))
  768. getHashes(from)
  769. case <-timeout.C:
  770. glog.V(logger.Debug).Infof("%v: hash request timed out", p)
  771. return errTimeout
  772. }
  773. }
  774. }
  775. // fetchBlocks iteratively downloads the scheduled hashes, taking any available
  776. // peers, reserving a chunk of blocks for each, waiting for delivery and also
  777. // periodically checking for timeouts.
  778. func (d *Downloader) fetchBlocks(from uint64) error {
  779. glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
  780. defer glog.V(logger.Debug).Infof("Block download terminated")
  781. // Create a timeout timer for scheduling expiration tasks
  782. ticker := time.NewTicker(100 * time.Millisecond)
  783. defer ticker.Stop()
  784. update := make(chan struct{}, 1)
  785. // Prepare the queue and fetch blocks until the hash fetcher's done
  786. d.queue.Prepare(from)
  787. finished := false
  788. for {
  789. select {
  790. case <-d.cancelCh:
  791. return errCancelBlockFetch
  792. case blockPack := <-d.blockCh:
  793. // If the peer was previously banned and failed to deliver it's pack
  794. // in a reasonable time frame, ignore it's message.
  795. if peer := d.peers.Peer(blockPack.peerId); peer != nil {
  796. // Deliver the received chunk of blocks, and demote in case of errors
  797. err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
  798. switch err {
  799. case nil:
  800. // If no blocks were delivered, demote the peer (need the delivery above)
  801. if len(blockPack.blocks) == 0 {
  802. peer.Demote()
  803. peer.SetIdle()
  804. glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
  805. break
  806. }
  807. // All was successful, promote the peer and potentially start processing
  808. peer.Promote()
  809. peer.SetIdle()
  810. glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
  811. go d.process()
  812. case errInvalidChain:
  813. // The hash chain is invalid (blocks are not ordered properly), abort
  814. return err
  815. case errNoFetchesPending:
  816. // Peer probably timed out with its delivery but came through
  817. // in the end, demote, but allow to to pull from this peer.
  818. peer.Demote()
  819. peer.SetIdle()
  820. glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
  821. case errStaleDelivery:
  822. // Delivered something completely else than requested, usually
  823. // caused by a timeout and delivery during a new sync cycle.
  824. // Don't set it to idle as the original request should still be
  825. // in flight.
  826. peer.Demote()
  827. glog.V(logger.Detail).Infof("%s: stale delivery", peer)
  828. default:
  829. // Peer did something semi-useful, demote but keep it around
  830. peer.Demote()
  831. peer.SetIdle()
  832. glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
  833. go d.process()
  834. }
  835. }
  836. // Blocks arrived, try to update the progress
  837. select {
  838. case update <- struct{}{}:
  839. default:
  840. }
  841. case cont := <-d.processCh:
  842. // The hash fetcher sent a continuation flag, check if it's done
  843. if !cont {
  844. finished = true
  845. }
  846. // Hashes arrive, try to update the progress
  847. select {
  848. case update <- struct{}{}:
  849. default:
  850. }
  851. case <-ticker.C:
  852. // Sanity check update the progress
  853. select {
  854. case update <- struct{}{}:
  855. default:
  856. }
  857. case <-update:
  858. // Short circuit if we lost all our peers
  859. if d.peers.Len() == 0 {
  860. return errNoPeers
  861. }
  862. // Check for block request timeouts and demote the responsible peers
  863. for _, pid := range d.queue.Expire(blockHardTTL) {
  864. if peer := d.peers.Peer(pid); peer != nil {
  865. peer.Demote()
  866. glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
  867. }
  868. }
  869. // If there's noting more to fetch, wait or terminate
  870. if d.queue.Pending() == 0 {
  871. if d.queue.InFlight() == 0 && finished {
  872. glog.V(logger.Debug).Infof("Block fetching completed")
  873. return nil
  874. }
  875. break
  876. }
  877. // Send a download request to all idle peers, until throttled
  878. for _, peer := range d.peers.IdlePeers() {
  879. // Short circuit if throttling activated
  880. if d.queue.Throttle() {
  881. break
  882. }
  883. // Reserve a chunk of hashes for a peer. A nil can mean either that
  884. // no more hashes are available, or that the peer is known not to
  885. // have them.
  886. request := d.queue.Reserve(peer, peer.Capacity())
  887. if request == nil {
  888. continue
  889. }
  890. if glog.V(logger.Detail) {
  891. glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
  892. }
  893. // Fetch the chunk and make sure any errors return the hashes to the queue
  894. if err := peer.Fetch(request); err != nil {
  895. glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
  896. d.queue.Cancel(request)
  897. }
  898. }
  899. // Make sure that we have peers available for fetching. If all peers have been tried
  900. // and all failed throw an error
  901. if !d.queue.Throttle() && d.queue.InFlight() == 0 {
  902. return errPeersUnavailable
  903. }
  904. }
  905. }
  906. }
  907. // banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes,
  908. // and bans the head of the retrieved batch.
  909. //
  910. // This method only fetches one single batch as the goal is not ban an entire
  911. // (potentially long) invalid chain - wasting a lot of time in the meanwhile -,
  912. // but rather to gradually build up a blacklist if the peer keeps reconnecting.
  913. func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
  914. glog.V(logger.Debug).Infof("Banning a batch out of %d blocks from %s", d.queue.Pending(), peerId)
  915. // Ask the peer being banned for a batch of blocks from the banning point
  916. peer := d.peers.Peer(peerId)
  917. if peer == nil {
  918. return nil
  919. }
  920. request := d.queue.Reserve(peer, MaxBlockFetch)
  921. if request == nil {
  922. return nil
  923. }
  924. if err := peer.Fetch(request); err != nil {
  925. return err
  926. }
  927. // Wait a bit for the reply to arrive, and ban if done so
  928. timeout := time.After(blockHardTTL)
  929. for {
  930. select {
  931. case <-d.cancelCh:
  932. return errCancelBlockFetch
  933. case <-timeout:
  934. return errTimeout
  935. case <-d.hashCh:
  936. // Out of bounds hashes received, ignore them
  937. case blockPack := <-d.blockCh:
  938. blocks := blockPack.blocks
  939. // Short circuit if it's a stale cross check
  940. if len(blocks) == 1 {
  941. block := blocks[0]
  942. if _, ok := d.checks[block.Hash()]; ok {
  943. delete(d.checks, block.Hash())
  944. break
  945. }
  946. }
  947. // Short circuit if it's not from the peer being banned
  948. if blockPack.peerId != peerId {
  949. break
  950. }
  951. // Short circuit if no blocks were returned
  952. if len(blocks) == 0 {
  953. return errors.New("no blocks returned to ban")
  954. }
  955. // Reconstruct the original chain order and ensure we're banning the correct blocks
  956. types.BlockBy(types.Number).Sort(blocks)
  957. if bytes.Compare(blocks[0].Hash().Bytes(), head.Bytes()) != 0 {
  958. return errors.New("head block not the banned one")
  959. }
  960. index := 0
  961. for _, block := range blocks[1:] {
  962. if bytes.Compare(block.ParentHash().Bytes(), blocks[index].Hash().Bytes()) != 0 {
  963. break
  964. }
  965. index++
  966. }
  967. // Ban the head hash and phase out any excess
  968. d.banned.Add(blocks[index].Hash())
  969. for d.banned.Size() > maxBannedHashes {
  970. var evacuate common.Hash
  971. d.banned.Each(func(item interface{}) bool {
  972. // Skip any hard coded bans
  973. if core.BadHashes[item.(common.Hash)] {
  974. return true
  975. }
  976. evacuate = item.(common.Hash)
  977. return false
  978. })
  979. d.banned.Remove(evacuate)
  980. }
  981. glog.V(logger.Debug).Infof("Banned %d blocks from: %s", index+1, peerId)
  982. return nil
  983. }
  984. }
  985. }
  986. // process takes blocks from the queue and tries to import them into the chain.
  987. //
  988. // The algorithmic flow is as follows:
  989. // - The `processing` flag is swapped to 1 to ensure singleton access
  990. // - The current `cancel` channel is retrieved to detect sync abortions
  991. // - Blocks are iteratively taken from the cache and inserted into the chain
  992. // - When the cache becomes empty, insertion stops
  993. // - The `processing` flag is swapped back to 0
  994. // - A post-exit check is made whether new blocks became available
  995. // - This step is important: it handles a potential race condition between
  996. // checking for no more work, and releasing the processing "mutex". In
  997. // between these state changes, a block may have arrived, but a processing
  998. // attempt denied, so we need to re-enter to ensure the block isn't left
  999. // to idle in the cache.
  1000. func (d *Downloader) process() {
  1001. // Make sure only one goroutine is ever allowed to process blocks at once
  1002. if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
  1003. return
  1004. }
  1005. // If the processor just exited, but there are freshly pending items, try to
  1006. // reenter. This is needed because the goroutine spinned up for processing
  1007. // the fresh blocks might have been rejected entry to to this present thread
  1008. // not yet releasing the `processing` state.
  1009. defer func() {
  1010. if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
  1011. d.process()
  1012. }
  1013. }()
  1014. // Release the lock upon exit (note, before checking for reentry!), and set
  1015. // the import statistics to zero.
  1016. defer func() {
  1017. d.importLock.Lock()
  1018. d.importQueue = nil
  1019. d.importDone = 0
  1020. d.importLock.Unlock()
  1021. atomic.StoreInt32(&d.processing, 0)
  1022. }()
  1023. // Repeat the processing as long as there are blocks to import
  1024. for {
  1025. // Fetch the next batch of blocks
  1026. blocks := d.queue.TakeBlocks()
  1027. if len(blocks) == 0 {
  1028. return
  1029. }
  1030. // Reset the import statistics
  1031. d.importLock.Lock()
  1032. d.importStart = time.Now()
  1033. d.importQueue = blocks
  1034. d.importDone = 0
  1035. d.importLock.Unlock()
  1036. // Actually import the blocks
  1037. glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
  1038. for len(blocks) != 0 {
  1039. // Check for any termination requests
  1040. if atomic.LoadInt32(&d.interrupt) == 1 {
  1041. return
  1042. }
  1043. // Retrieve the first batch of blocks to insert
  1044. max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
  1045. raw := make(types.Blocks, 0, max)
  1046. for _, block := range blocks[:max] {
  1047. raw = append(raw, block.RawBlock)
  1048. }
  1049. // Try to inset the blocks, drop the originating peer if there's an error
  1050. index, err := d.insertChain(raw)
  1051. if err != nil {
  1052. glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
  1053. d.dropPeer(blocks[index].OriginPeer)
  1054. d.cancel()
  1055. return
  1056. }
  1057. blocks = blocks[max:]
  1058. }
  1059. }
  1060. }
  1061. // DeliverBlocks injects a new batch of blocks received from a remote node.
  1062. // This is usually invoked through the BlocksMsg by the protocol handler.
  1063. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
  1064. // Make sure the downloader is active
  1065. if atomic.LoadInt32(&d.synchronising) == 0 {
  1066. return errNoSyncActive
  1067. }
  1068. // Deliver or abort if the sync is canceled while queuing
  1069. d.cancelLock.RLock()
  1070. cancel := d.cancelCh
  1071. d.cancelLock.RUnlock()
  1072. select {
  1073. case d.blockCh <- blockPack{id, blocks}:
  1074. return nil
  1075. case <-cancel:
  1076. return errNoSyncActive
  1077. }
  1078. }
  1079. // DeliverHashes injects a new batch of hashes received from a remote node into
  1080. // the download schedule. This is usually invoked through the BlockHashesMsg by
  1081. // the protocol handler.
  1082. func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
  1083. // Make sure the downloader is active
  1084. if atomic.LoadInt32(&d.synchronising) == 0 {
  1085. return errNoSyncActive
  1086. }
  1087. // Deliver or abort if the sync is canceled while queuing
  1088. d.cancelLock.RLock()
  1089. cancel := d.cancelCh
  1090. d.cancelLock.RUnlock()
  1091. select {
  1092. case d.hashCh <- hashPack{id, hashes}:
  1093. return nil
  1094. case <-cancel:
  1095. return errNoSyncActive
  1096. }
  1097. }