block_fetcher.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package fetcher contains the announcement based header, blocks or transaction synchronisation.
  17. package fetcher
  18. import (
  19. "errors"
  20. "math/rand"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/gopool"
  24. "github.com/ethereum/go-ethereum/common/prque"
  25. "github.com/ethereum/go-ethereum/consensus"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. "github.com/ethereum/go-ethereum/trie"
  30. )
  31. const (
  32. lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
  33. arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
  34. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
  35. fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
  36. reQueueBlockTimeout = 500 * time.Millisecond // Time allowance before blocks are requeued for import
  37. )
  38. const (
  39. maxUncleDist = 11 // Maximum allowed backward distance from the chain head
  40. maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
  41. hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
  42. blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
  43. )
  44. var (
  45. blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/in", nil)
  46. blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/announces/out", nil)
  47. blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/drop", nil)
  48. blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/dos", nil)
  49. blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil)
  50. blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil)
  51. blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil)
  52. blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil)
  53. headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil)
  54. bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil)
  55. headerFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/in", nil)
  56. headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/out", nil)
  57. bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/in", nil)
  58. bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
  59. )
  60. var errTerminated = errors.New("terminated")
  61. // HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
  62. type HeaderRetrievalFn func(common.Hash) *types.Header
  63. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  64. type blockRetrievalFn func(common.Hash) *types.Block
  65. // headerRequesterFn is a callback type for sending a header retrieval request.
  66. type headerRequesterFn func(common.Hash) error
  67. // bodyRequesterFn is a callback type for sending a body retrieval request.
  68. type bodyRequesterFn func([]common.Hash) error
  69. // DiffRequesterFn is a callback type for sending a diff layer retrieval request.
  70. type DiffRequesterFn func([]common.Hash) error
  71. // headerVerifierFn is a callback type to verify a block's header for fast propagation.
  72. type headerVerifierFn func(header *types.Header) error
  73. // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
  74. type blockBroadcasterFn func(block *types.Block, propagate bool)
  75. // chainHeightFn is a callback type to retrieve the current chain height.
  76. type chainHeightFn func() uint64
  77. // headersInsertFn is a callback type to insert a batch of headers into the local chain.
  78. type headersInsertFn func(headers []*types.Header) (int, error)
  79. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  80. type chainInsertFn func(types.Blocks) (int, error)
  81. // peerDropFn is a callback type for dropping a peer detected as malicious.
  82. type peerDropFn func(id string)
  83. // blockAnnounce is the hash notification of the availability of a new block in the
  84. // network.
  85. type blockAnnounce struct {
  86. hash common.Hash // Hash of the block being announced
  87. number uint64 // Number of the block being announced (0 = unknown | old protocol)
  88. header *types.Header // Header of the block partially reassembled (new protocol)
  89. time time.Time // Timestamp of the announcement
  90. origin string // Identifier of the peer originating the notification
  91. fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
  92. fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
  93. fetchDiffs DiffRequesterFn // Fetcher function to retrieve the diff layer of an announced block
  94. }
  95. // headerFilterTask represents a batch of headers needing fetcher filtering.
  96. type headerFilterTask struct {
  97. peer string // The source peer of block headers
  98. headers []*types.Header // Collection of headers to filter
  99. time time.Time // Arrival time of the headers
  100. }
  101. // bodyFilterTask represents a batch of block bodies (transactions and uncles)
  102. // needing fetcher filtering.
  103. type bodyFilterTask struct {
  104. peer string // The source peer of block bodies
  105. transactions [][]*types.Transaction // Collection of transactions per block bodies
  106. uncles [][]*types.Header // Collection of uncles per block bodies
  107. time time.Time // Arrival time of the blocks' contents
  108. }
  109. // blockOrHeaderInject represents a schedules import operation.
  110. type blockOrHeaderInject struct {
  111. origin string
  112. header *types.Header // Used for light mode fetcher which only cares about header.
  113. block *types.Block // Used for normal mode fetcher which imports full block.
  114. }
  115. // number returns the block number of the injected object.
  116. func (inject *blockOrHeaderInject) number() uint64 {
  117. if inject.header != nil {
  118. return inject.header.Number.Uint64()
  119. }
  120. return inject.block.NumberU64()
  121. }
  122. // number returns the block hash of the injected object.
  123. func (inject *blockOrHeaderInject) hash() common.Hash {
  124. if inject.header != nil {
  125. return inject.header.Hash()
  126. }
  127. return inject.block.Hash()
  128. }
  129. // BlockFetcher is responsible for accumulating block announcements from various peers
  130. // and scheduling them for retrieval.
  131. type BlockFetcher struct {
  132. light bool // The indicator whether it's a light fetcher or normal one.
  133. // Various event channels
  134. notify chan *blockAnnounce
  135. inject chan *blockOrHeaderInject
  136. headerFilter chan chan *headerFilterTask
  137. bodyFilter chan chan *bodyFilterTask
  138. done chan common.Hash
  139. quit chan struct{}
  140. requeue chan *blockOrHeaderInject
  141. // Announce states
  142. announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
  143. announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
  144. fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching
  145. fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
  146. completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
  147. // Block cache
  148. queue *prque.Prque // Queue containing the import operations (block number sorted)
  149. queues map[string]int // Per peer block counts to prevent memory exhaustion
  150. queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
  151. // Callbacks
  152. getHeader HeaderRetrievalFn // Retrieves a header from the local chain
  153. getBlock blockRetrievalFn // Retrieves a block from the local chain
  154. verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
  155. broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
  156. chainHeight chainHeightFn // Retrieves the current chain's height
  157. insertHeaders headersInsertFn // Injects a batch of headers into the chain
  158. insertChain chainInsertFn // Injects a batch of blocks into the chain
  159. dropPeer peerDropFn // Drops a peer for misbehaving
  160. // Testing hooks
  161. announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
  162. queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
  163. fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
  164. completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
  165. importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)
  166. }
  167. // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
  168. func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
  169. return &BlockFetcher{
  170. light: light,
  171. notify: make(chan *blockAnnounce),
  172. inject: make(chan *blockOrHeaderInject),
  173. headerFilter: make(chan chan *headerFilterTask),
  174. bodyFilter: make(chan chan *bodyFilterTask),
  175. done: make(chan common.Hash),
  176. quit: make(chan struct{}),
  177. requeue: make(chan *blockOrHeaderInject),
  178. announces: make(map[string]int),
  179. announced: make(map[common.Hash][]*blockAnnounce),
  180. fetching: make(map[common.Hash]*blockAnnounce),
  181. fetched: make(map[common.Hash][]*blockAnnounce),
  182. completing: make(map[common.Hash]*blockAnnounce),
  183. queue: prque.New(nil),
  184. queues: make(map[string]int),
  185. queued: make(map[common.Hash]*blockOrHeaderInject),
  186. getHeader: getHeader,
  187. getBlock: getBlock,
  188. verifyHeader: verifyHeader,
  189. broadcastBlock: broadcastBlock,
  190. chainHeight: chainHeight,
  191. insertHeaders: insertHeaders,
  192. insertChain: insertChain,
  193. dropPeer: dropPeer,
  194. }
  195. }
  196. // Start boots up the announcement based synchroniser, accepting and processing
  197. // hash notifications and block fetches until termination requested.
  198. func (f *BlockFetcher) Start() {
  199. go f.loop()
  200. }
  201. // Stop terminates the announcement based synchroniser, canceling all pending
  202. // operations.
  203. func (f *BlockFetcher) Stop() {
  204. close(f.quit)
  205. }
  206. // Notify announces the fetcher of the potential availability of a new block in
  207. // the network.
  208. func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
  209. headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn, diffFetcher DiffRequesterFn) error {
  210. block := &blockAnnounce{
  211. hash: hash,
  212. number: number,
  213. time: time,
  214. origin: peer,
  215. fetchHeader: headerFetcher,
  216. fetchBodies: bodyFetcher,
  217. fetchDiffs: diffFetcher,
  218. }
  219. select {
  220. case f.notify <- block:
  221. return nil
  222. case <-f.quit:
  223. return errTerminated
  224. }
  225. }
  226. // Enqueue tries to fill gaps the fetcher's future import queue.
  227. func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
  228. op := &blockOrHeaderInject{
  229. origin: peer,
  230. block: block,
  231. }
  232. select {
  233. case f.inject <- op:
  234. return nil
  235. case <-f.quit:
  236. return errTerminated
  237. }
  238. }
  239. // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  240. // returning those that should be handled differently.
  241. func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
  242. log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
  243. // Send the filter channel to the fetcher
  244. filter := make(chan *headerFilterTask)
  245. select {
  246. case f.headerFilter <- filter:
  247. case <-f.quit:
  248. return nil
  249. }
  250. // Request the filtering of the header list
  251. select {
  252. case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
  253. case <-f.quit:
  254. return nil
  255. }
  256. // Retrieve the headers remaining after filtering
  257. select {
  258. case task := <-filter:
  259. return task.headers
  260. case <-f.quit:
  261. return nil
  262. }
  263. }
  264. // FilterBodies extracts all the block bodies that were explicitly requested by
  265. // the fetcher, returning those that should be handled differently.
  266. func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
  267. log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
  268. // Send the filter channel to the fetcher
  269. filter := make(chan *bodyFilterTask)
  270. select {
  271. case f.bodyFilter <- filter:
  272. case <-f.quit:
  273. return nil, nil
  274. }
  275. // Request the filtering of the body list
  276. select {
  277. case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
  278. case <-f.quit:
  279. return nil, nil
  280. }
  281. // Retrieve the bodies remaining after filtering
  282. select {
  283. case task := <-filter:
  284. return task.transactions, task.uncles
  285. case <-f.quit:
  286. return nil, nil
  287. }
  288. }
  289. // Loop is the main fetcher loop, checking and processing various notification
  290. // events.
  291. func (f *BlockFetcher) loop() {
  292. // Iterate the block fetching until a quit is requested
  293. var (
  294. fetchTimer = time.NewTimer(0)
  295. completeTimer = time.NewTimer(0)
  296. )
  297. <-fetchTimer.C // clear out the channel
  298. <-completeTimer.C
  299. defer fetchTimer.Stop()
  300. defer completeTimer.Stop()
  301. for {
  302. // Clean up any expired block fetches
  303. for hash, announce := range f.fetching {
  304. if time.Since(announce.time) > fetchTimeout {
  305. f.forgetHash(hash)
  306. }
  307. }
  308. // Import any queued blocks that could potentially fit
  309. height := f.chainHeight()
  310. for !f.queue.Empty() {
  311. op := f.queue.PopItem().(*blockOrHeaderInject)
  312. hash := op.hash()
  313. if f.queueChangeHook != nil {
  314. f.queueChangeHook(hash, false)
  315. }
  316. // If too high up the chain or phase, continue later
  317. number := op.number()
  318. if number > height+1 {
  319. f.queue.Push(op, -int64(number))
  320. if f.queueChangeHook != nil {
  321. f.queueChangeHook(hash, true)
  322. }
  323. break
  324. }
  325. // Otherwise if fresh and still unknown, try and import
  326. if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
  327. f.forgetBlock(hash)
  328. continue
  329. }
  330. if f.light {
  331. f.importHeaders(op)
  332. } else {
  333. f.importBlocks(op)
  334. }
  335. }
  336. // Wait for an outside event to occur
  337. select {
  338. case <-f.quit:
  339. // BlockFetcher terminating, abort all operations
  340. return
  341. case notification := <-f.notify:
  342. // A block was announced, make sure the peer isn't DOSing us
  343. blockAnnounceInMeter.Mark(1)
  344. count := f.announces[notification.origin] + 1
  345. if count > hashLimit {
  346. log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
  347. blockAnnounceDOSMeter.Mark(1)
  348. break
  349. }
  350. // If we have a valid block number, check that it's potentially useful
  351. if notification.number > 0 {
  352. if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  353. log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
  354. blockAnnounceDropMeter.Mark(1)
  355. break
  356. }
  357. }
  358. // All is well, schedule the announce if block's not yet downloading
  359. if _, ok := f.fetching[notification.hash]; ok {
  360. break
  361. }
  362. if _, ok := f.completing[notification.hash]; ok {
  363. break
  364. }
  365. f.announces[notification.origin] = count
  366. f.announced[notification.hash] = append(f.announced[notification.hash], notification)
  367. if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
  368. f.announceChangeHook(notification.hash, true)
  369. }
  370. if len(f.announced) == 1 {
  371. f.rescheduleFetch(fetchTimer)
  372. }
  373. case op := <-f.requeue:
  374. // Re-queue blocks that have not been written due to fork block competition
  375. number := int64(0)
  376. hash := ""
  377. if op.header != nil {
  378. number = op.header.Number.Int64()
  379. hash = op.header.Hash().String()
  380. } else if op.block != nil {
  381. number = op.block.Number().Int64()
  382. hash = op.block.Hash().String()
  383. }
  384. log.Info("Re-queue blocks", "number", number, "hash", hash)
  385. f.enqueue(op.origin, op.header, op.block)
  386. case op := <-f.inject:
  387. // A direct block insertion was requested, try and fill any pending gaps
  388. blockBroadcastInMeter.Mark(1)
  389. // Now only direct block injection is allowed, drop the header injection
  390. // here silently if we receive.
  391. if f.light {
  392. continue
  393. }
  394. f.enqueue(op.origin, nil, op.block)
  395. case hash := <-f.done:
  396. // A pending import finished, remove all traces of the notification
  397. f.forgetHash(hash)
  398. f.forgetBlock(hash)
  399. case <-fetchTimer.C:
  400. // At least one block's timer ran out, check for needing retrieval
  401. request := make(map[string][]common.Hash)
  402. for hash, announces := range f.announced {
  403. // In current LES protocol(les2/les3), only header announce is
  404. // available, no need to wait too much time for header broadcast.
  405. timeout := arriveTimeout - gatherSlack
  406. if f.light {
  407. timeout = 0
  408. }
  409. if time.Since(announces[0].time) > timeout {
  410. // Pick a random peer to retrieve from, reset all others
  411. announce := announces[rand.Intn(len(announces))]
  412. f.forgetHash(hash)
  413. // If the block still didn't arrive, queue for fetching
  414. if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
  415. request[announce.origin] = append(request[announce.origin], hash)
  416. f.fetching[hash] = announce
  417. }
  418. }
  419. }
  420. // Send out all block header requests
  421. for peer, hashes := range request {
  422. log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
  423. // Create a closure of the fetch and schedule in on a new thread
  424. fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
  425. fetchDiff := f.fetching[hashes[0]].fetchDiffs
  426. gopool.Submit(func() {
  427. if f.fetchingHook != nil {
  428. f.fetchingHook(hashes)
  429. }
  430. if fetchDiff != nil {
  431. fetchDiff(hashes)
  432. }
  433. for _, hash := range hashes {
  434. headerFetchMeter.Mark(1)
  435. fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
  436. }
  437. })
  438. }
  439. // Schedule the next fetch if blocks are still pending
  440. f.rescheduleFetch(fetchTimer)
  441. case <-completeTimer.C:
  442. // At least one header's timer ran out, retrieve everything
  443. request := make(map[string][]common.Hash)
  444. for hash, announces := range f.fetched {
  445. // Pick a random peer to retrieve from, reset all others
  446. announce := announces[rand.Intn(len(announces))]
  447. f.forgetHash(hash)
  448. // If the block still didn't arrive, queue for completion
  449. if f.getBlock(hash) == nil {
  450. request[announce.origin] = append(request[announce.origin], hash)
  451. f.completing[hash] = announce
  452. }
  453. }
  454. // Send out all block body requests
  455. for peer, hashes := range request {
  456. log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
  457. // Create a closure of the fetch and schedule in on a new thread
  458. if f.completingHook != nil {
  459. f.completingHook(hashes)
  460. }
  461. bodyFetchMeter.Mark(int64(len(hashes)))
  462. go f.completing[hashes[0]].fetchBodies(hashes)
  463. }
  464. // Schedule the next fetch if blocks are still pending
  465. f.rescheduleComplete(completeTimer)
  466. case filter := <-f.headerFilter:
  467. // Headers arrived from a remote peer. Extract those that were explicitly
  468. // requested by the fetcher, and return everything else so it's delivered
  469. // to other parts of the system.
  470. var task *headerFilterTask
  471. select {
  472. case task = <-filter:
  473. case <-f.quit:
  474. return
  475. }
  476. headerFilterInMeter.Mark(int64(len(task.headers)))
  477. // Split the batch of headers into unknown ones (to return to the caller),
  478. // known incomplete ones (requiring body retrievals) and completed blocks.
  479. unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
  480. for _, header := range task.headers {
  481. hash := header.Hash()
  482. // Filter fetcher-requested headers from other synchronisation algorithms
  483. if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
  484. // If the delivered header does not match the promised number, drop the announcer
  485. if header.Number.Uint64() != announce.number {
  486. log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
  487. f.dropPeer(announce.origin)
  488. f.forgetHash(hash)
  489. continue
  490. }
  491. // Collect all headers only if we are running in light
  492. // mode and the headers are not imported by other means.
  493. if f.light {
  494. if f.getHeader(hash) == nil {
  495. announce.header = header
  496. lightHeaders = append(lightHeaders, announce)
  497. }
  498. f.forgetHash(hash)
  499. continue
  500. }
  501. // Only keep if not imported by other means
  502. if f.getBlock(hash) == nil {
  503. announce.header = header
  504. announce.time = task.time
  505. // If the block is empty (header only), short circuit into the final import queue
  506. if header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash {
  507. log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  508. block := types.NewBlockWithHeader(header)
  509. block.ReceivedAt = task.time
  510. complete = append(complete, block)
  511. f.completing[hash] = announce
  512. continue
  513. }
  514. // Otherwise add to the list of blocks needing completion
  515. incomplete = append(incomplete, announce)
  516. } else {
  517. log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  518. f.forgetHash(hash)
  519. }
  520. } else {
  521. // BlockFetcher doesn't know about it, add to the return list
  522. unknown = append(unknown, header)
  523. }
  524. }
  525. headerFilterOutMeter.Mark(int64(len(unknown)))
  526. select {
  527. case filter <- &headerFilterTask{headers: unknown, time: task.time}:
  528. case <-f.quit:
  529. return
  530. }
  531. // Schedule the retrieved headers for body completion
  532. for _, announce := range incomplete {
  533. hash := announce.header.Hash()
  534. if _, ok := f.completing[hash]; ok {
  535. continue
  536. }
  537. f.fetched[hash] = append(f.fetched[hash], announce)
  538. if len(f.fetched) == 1 {
  539. f.rescheduleComplete(completeTimer)
  540. }
  541. }
  542. // Schedule the header for light fetcher import
  543. for _, announce := range lightHeaders {
  544. f.enqueue(announce.origin, announce.header, nil)
  545. }
  546. // Schedule the header-only blocks for import
  547. for _, block := range complete {
  548. if announce := f.completing[block.Hash()]; announce != nil {
  549. f.enqueue(announce.origin, nil, block)
  550. }
  551. }
  552. case filter := <-f.bodyFilter:
  553. // Block bodies arrived, extract any explicitly requested blocks, return the rest
  554. var task *bodyFilterTask
  555. select {
  556. case task = <-filter:
  557. case <-f.quit:
  558. return
  559. }
  560. bodyFilterInMeter.Mark(int64(len(task.transactions)))
  561. blocks := []*types.Block{}
  562. // abort early if there's nothing explicitly requested
  563. if len(f.completing) > 0 {
  564. for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
  565. // Match up a body to any possible completion request
  566. var (
  567. matched = false
  568. uncleHash common.Hash // calculated lazily and reused
  569. txnHash common.Hash // calculated lazily and reused
  570. )
  571. for hash, announce := range f.completing {
  572. if f.queued[hash] != nil || announce.origin != task.peer {
  573. continue
  574. }
  575. if uncleHash == (common.Hash{}) {
  576. uncleHash = types.CalcUncleHash(task.uncles[i])
  577. }
  578. if uncleHash != announce.header.UncleHash {
  579. continue
  580. }
  581. if txnHash == (common.Hash{}) {
  582. txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), trie.NewStackTrie(nil))
  583. }
  584. if txnHash != announce.header.TxHash {
  585. continue
  586. }
  587. // Mark the body matched, reassemble if still unknown
  588. matched = true
  589. if f.getBlock(hash) == nil {
  590. block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
  591. block.ReceivedAt = task.time
  592. blocks = append(blocks, block)
  593. } else {
  594. f.forgetHash(hash)
  595. }
  596. }
  597. if matched {
  598. task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
  599. task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
  600. i--
  601. continue
  602. }
  603. }
  604. }
  605. bodyFilterOutMeter.Mark(int64(len(task.transactions)))
  606. select {
  607. case filter <- task:
  608. case <-f.quit:
  609. return
  610. }
  611. // Schedule the retrieved blocks for ordered import
  612. for _, block := range blocks {
  613. if announce := f.completing[block.Hash()]; announce != nil {
  614. f.enqueue(announce.origin, nil, block)
  615. }
  616. }
  617. }
  618. }
  619. }
  620. // rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
  621. func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
  622. // Short circuit if no blocks are announced
  623. if len(f.announced) == 0 {
  624. return
  625. }
  626. // Schedule announcement retrieval quickly for light mode
  627. // since server won't send any headers to client.
  628. if f.light {
  629. fetch.Reset(lightTimeout)
  630. return
  631. }
  632. // Otherwise find the earliest expiring announcement
  633. earliest := time.Now()
  634. for _, announces := range f.announced {
  635. if earliest.After(announces[0].time) {
  636. earliest = announces[0].time
  637. }
  638. }
  639. fetch.Reset(arriveTimeout - time.Since(earliest))
  640. }
  641. // rescheduleComplete resets the specified completion timer to the next fetch timeout.
  642. func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
  643. // Short circuit if no headers are fetched
  644. if len(f.fetched) == 0 {
  645. return
  646. }
  647. // Otherwise find the earliest expiring announcement
  648. earliest := time.Now()
  649. for _, announces := range f.fetched {
  650. if earliest.After(announces[0].time) {
  651. earliest = announces[0].time
  652. }
  653. }
  654. complete.Reset(gatherSlack - time.Since(earliest))
  655. }
  656. // enqueue schedules a new header or block import operation, if the component
  657. // to be imported has not yet been seen.
  658. func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) {
  659. var (
  660. hash common.Hash
  661. number uint64
  662. )
  663. if header != nil {
  664. hash, number = header.Hash(), header.Number.Uint64()
  665. } else {
  666. hash, number = block.Hash(), block.NumberU64()
  667. }
  668. // Ensure the peer isn't DOSing us
  669. count := f.queues[peer] + 1
  670. if count > blockLimit {
  671. log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
  672. blockBroadcastDOSMeter.Mark(1)
  673. f.forgetHash(hash)
  674. return
  675. }
  676. // Discard any past or too distant blocks
  677. if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  678. log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
  679. blockBroadcastDropMeter.Mark(1)
  680. f.forgetHash(hash)
  681. return
  682. }
  683. // Schedule the block for future importing
  684. if _, ok := f.queued[hash]; !ok {
  685. op := &blockOrHeaderInject{origin: peer}
  686. if header != nil {
  687. op.header = header
  688. } else {
  689. op.block = block
  690. }
  691. f.queues[peer] = count
  692. f.queued[hash] = op
  693. f.queue.Push(op, -int64(number))
  694. if f.queueChangeHook != nil {
  695. f.queueChangeHook(hash, true)
  696. }
  697. log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
  698. }
  699. }
  700. // importHeaders spawns a new goroutine to run a header insertion into the chain.
  701. // If the header's number is at the same height as the current import phase, it
  702. // updates the phase states accordingly.
  703. func (f *BlockFetcher) importHeaders(op *blockOrHeaderInject) {
  704. peer := op.origin
  705. header := op.header
  706. hash := header.Hash()
  707. log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
  708. go func() {
  709. defer func() { f.done <- hash }()
  710. // If the parent's unknown, abort insertion
  711. parent := f.getHeader(header.ParentHash)
  712. if parent == nil {
  713. log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
  714. time.Sleep(reQueueBlockTimeout)
  715. f.requeue <- op
  716. return
  717. }
  718. // Validate the header and if something went wrong, drop the peer
  719. if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
  720. log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  721. f.dropPeer(peer)
  722. return
  723. }
  724. // Run the actual import and log any issues
  725. if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
  726. log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  727. return
  728. }
  729. // Invoke the testing hook if needed
  730. if f.importedHook != nil {
  731. f.importedHook(header, nil)
  732. }
  733. }()
  734. }
  735. // importBlocks spawns a new goroutine to run a block insertion into the chain. If the
  736. // block's number is at the same height as the current import phase, it updates
  737. // the phase states accordingly.
  738. func (f *BlockFetcher) importBlocks(op *blockOrHeaderInject) {
  739. peer := op.origin
  740. block := op.block
  741. hash := block.Hash()
  742. // Run the import on a new thread
  743. log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
  744. go func() {
  745. defer func() { f.done <- hash }()
  746. // If the parent's unknown, abort insertion
  747. parent := f.getBlock(block.ParentHash())
  748. if parent == nil {
  749. log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
  750. time.Sleep(reQueueBlockTimeout)
  751. f.requeue <- op
  752. return
  753. }
  754. // Quickly validate the header and propagate the block if it passes
  755. switch err := f.verifyHeader(block.Header()); err {
  756. case nil:
  757. // All ok, quickly propagate to our peers
  758. blockBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  759. go f.broadcastBlock(block, true)
  760. case consensus.ErrFutureBlock:
  761. log.Error("Received future block", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  762. f.dropPeer(peer)
  763. default:
  764. // Something went very wrong, drop the peer
  765. log.Error("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  766. f.dropPeer(peer)
  767. return
  768. }
  769. // Run the actual import and log any issues
  770. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  771. log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  772. return
  773. }
  774. // If import succeeded, broadcast the block
  775. blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  776. go f.broadcastBlock(block, false)
  777. // Invoke the testing hook if needed
  778. if f.importedHook != nil {
  779. f.importedHook(nil, block)
  780. }
  781. }()
  782. }
  783. // forgetHash removes all traces of a block announcement from the fetcher's
  784. // internal state.
  785. func (f *BlockFetcher) forgetHash(hash common.Hash) {
  786. // Remove all pending announces and decrement DOS counters
  787. for _, announce := range f.announced[hash] {
  788. f.announces[announce.origin]--
  789. if f.announces[announce.origin] <= 0 {
  790. delete(f.announces, announce.origin)
  791. }
  792. }
  793. delete(f.announced, hash)
  794. if f.announceChangeHook != nil {
  795. f.announceChangeHook(hash, false)
  796. }
  797. // Remove any pending fetches and decrement the DOS counters
  798. if announce := f.fetching[hash]; announce != nil {
  799. f.announces[announce.origin]--
  800. if f.announces[announce.origin] <= 0 {
  801. delete(f.announces, announce.origin)
  802. }
  803. delete(f.fetching, hash)
  804. }
  805. // Remove any pending completion requests and decrement the DOS counters
  806. for _, announce := range f.fetched[hash] {
  807. f.announces[announce.origin]--
  808. if f.announces[announce.origin] <= 0 {
  809. delete(f.announces, announce.origin)
  810. }
  811. }
  812. delete(f.fetched, hash)
  813. // Remove any pending completions and decrement the DOS counters
  814. if announce := f.completing[hash]; announce != nil {
  815. f.announces[announce.origin]--
  816. if f.announces[announce.origin] <= 0 {
  817. delete(f.announces, announce.origin)
  818. }
  819. delete(f.completing, hash)
  820. }
  821. }
  822. // forgetBlock removes all traces of a queued block from the fetcher's internal
  823. // state.
  824. func (f *BlockFetcher) forgetBlock(hash common.Hash) {
  825. if insert := f.queued[hash]; insert != nil {
  826. f.queues[insert.origin]--
  827. if f.queues[insert.origin] == 0 {
  828. delete(f.queues, insert.origin)
  829. }
  830. delete(f.queued, hash)
  831. }
  832. }