block_fetcher.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // This is a temporary package whilst working on the eth/66 blocking refactors.
  17. // After that work is done, les needs to be refactored to use the new package,
  18. // or alternatively use a stripped down version of it. Either way, we need to
  19. // keep the changes scoped so duplicating temporarily seems the sanest.
  20. package fetcher
  21. import (
  22. "errors"
  23. "math/rand"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/prque"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/metrics"
  31. "github.com/ethereum/go-ethereum/trie"
  32. )
  33. const (
  34. lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
  35. arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
  36. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
  37. fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
  38. )
  39. const (
  40. maxUncleDist = 7 // Maximum allowed backward distance from the chain head
  41. maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
  42. hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
  43. blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
  44. )
  45. var (
  46. blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/in", nil)
  47. blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/announces/out", nil)
  48. blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/drop", nil)
  49. blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/dos", nil)
  50. blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil)
  51. blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil)
  52. blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil)
  53. blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil)
  54. headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil)
  55. bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil)
  56. headerFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/in", nil)
  57. headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/out", nil)
  58. bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/in", nil)
  59. bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
  60. )
  61. var errTerminated = errors.New("terminated")
  62. // HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
  63. type HeaderRetrievalFn func(common.Hash) *types.Header
  64. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  65. type blockRetrievalFn func(common.Hash) *types.Block
  66. // headerRequesterFn is a callback type for sending a header retrieval request.
  67. type headerRequesterFn func(common.Hash) error
  68. // bodyRequesterFn is a callback type for sending a body retrieval request.
  69. type bodyRequesterFn func([]common.Hash) error
  70. // headerVerifierFn is a callback type to verify a block's header for fast propagation.
  71. type headerVerifierFn func(header *types.Header) error
  72. // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
  73. type blockBroadcasterFn func(block *types.Block, propagate bool)
  74. // chainHeightFn is a callback type to retrieve the current chain height.
  75. type chainHeightFn func() uint64
  76. // headersInsertFn is a callback type to insert a batch of headers into the local chain.
  77. type headersInsertFn func(headers []*types.Header) (int, error)
  78. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  79. type chainInsertFn func(types.Blocks) (int, error)
  80. // peerDropFn is a callback type for dropping a peer detected as malicious.
  81. type peerDropFn func(id string)
  82. // blockAnnounce is the hash notification of the availability of a new block in the
  83. // network.
  84. type blockAnnounce struct {
  85. hash common.Hash // Hash of the block being announced
  86. number uint64 // Number of the block being announced (0 = unknown | old protocol)
  87. header *types.Header // Header of the block partially reassembled (new protocol)
  88. time time.Time // Timestamp of the announcement
  89. origin string // Identifier of the peer originating the notification
  90. fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
  91. fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
  92. }
  93. // headerFilterTask represents a batch of headers needing fetcher filtering.
  94. type headerFilterTask struct {
  95. peer string // The source peer of block headers
  96. headers []*types.Header // Collection of headers to filter
  97. time time.Time // Arrival time of the headers
  98. }
  99. // bodyFilterTask represents a batch of block bodies (transactions and uncles)
  100. // needing fetcher filtering.
  101. type bodyFilterTask struct {
  102. peer string // The source peer of block bodies
  103. transactions [][]*types.Transaction // Collection of transactions per block bodies
  104. uncles [][]*types.Header // Collection of uncles per block bodies
  105. time time.Time // Arrival time of the blocks' contents
  106. }
  107. // blockOrHeaderInject represents a schedules import operation.
  108. type blockOrHeaderInject struct {
  109. origin string
  110. header *types.Header // Used for light mode fetcher which only cares about header.
  111. block *types.Block // Used for normal mode fetcher which imports full block.
  112. }
  113. // number returns the block number of the injected object.
  114. func (inject *blockOrHeaderInject) number() uint64 {
  115. if inject.header != nil {
  116. return inject.header.Number.Uint64()
  117. }
  118. return inject.block.NumberU64()
  119. }
  120. // number returns the block hash of the injected object.
  121. func (inject *blockOrHeaderInject) hash() common.Hash {
  122. if inject.header != nil {
  123. return inject.header.Hash()
  124. }
  125. return inject.block.Hash()
  126. }
  127. // BlockFetcher is responsible for accumulating block announcements from various peers
  128. // and scheduling them for retrieval.
  129. type BlockFetcher struct {
  130. light bool // The indicator whether it's a light fetcher or normal one.
  131. // Various event channels
  132. notify chan *blockAnnounce
  133. inject chan *blockOrHeaderInject
  134. headerFilter chan chan *headerFilterTask
  135. bodyFilter chan chan *bodyFilterTask
  136. done chan common.Hash
  137. quit chan struct{}
  138. // Announce states
  139. announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
  140. announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
  141. fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching
  142. fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
  143. completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
  144. // Block cache
  145. queue *prque.Prque // Queue containing the import operations (block number sorted)
  146. queues map[string]int // Per peer block counts to prevent memory exhaustion
  147. queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
  148. // Callbacks
  149. getHeader HeaderRetrievalFn // Retrieves a header from the local chain
  150. getBlock blockRetrievalFn // Retrieves a block from the local chain
  151. verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
  152. broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
  153. chainHeight chainHeightFn // Retrieves the current chain's height
  154. insertHeaders headersInsertFn // Injects a batch of headers into the chain
  155. insertChain chainInsertFn // Injects a batch of blocks into the chain
  156. dropPeer peerDropFn // Drops a peer for misbehaving
  157. // Testing hooks
  158. announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
  159. queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
  160. fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
  161. completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
  162. importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)
  163. }
  164. // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
  165. func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
  166. return &BlockFetcher{
  167. light: light,
  168. notify: make(chan *blockAnnounce),
  169. inject: make(chan *blockOrHeaderInject),
  170. headerFilter: make(chan chan *headerFilterTask),
  171. bodyFilter: make(chan chan *bodyFilterTask),
  172. done: make(chan common.Hash),
  173. quit: make(chan struct{}),
  174. announces: make(map[string]int),
  175. announced: make(map[common.Hash][]*blockAnnounce),
  176. fetching: make(map[common.Hash]*blockAnnounce),
  177. fetched: make(map[common.Hash][]*blockAnnounce),
  178. completing: make(map[common.Hash]*blockAnnounce),
  179. queue: prque.New(nil),
  180. queues: make(map[string]int),
  181. queued: make(map[common.Hash]*blockOrHeaderInject),
  182. getHeader: getHeader,
  183. getBlock: getBlock,
  184. verifyHeader: verifyHeader,
  185. broadcastBlock: broadcastBlock,
  186. chainHeight: chainHeight,
  187. insertHeaders: insertHeaders,
  188. insertChain: insertChain,
  189. dropPeer: dropPeer,
  190. }
  191. }
  192. // Start boots up the announcement based synchroniser, accepting and processing
  193. // hash notifications and block fetches until termination requested.
  194. func (f *BlockFetcher) Start() {
  195. go f.loop()
  196. }
  197. // Stop terminates the announcement based synchroniser, canceling all pending
  198. // operations.
  199. func (f *BlockFetcher) Stop() {
  200. close(f.quit)
  201. }
  202. // Notify announces the fetcher of the potential availability of a new block in
  203. // the network.
  204. func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
  205. headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  206. block := &blockAnnounce{
  207. hash: hash,
  208. number: number,
  209. time: time,
  210. origin: peer,
  211. fetchHeader: headerFetcher,
  212. fetchBodies: bodyFetcher,
  213. }
  214. select {
  215. case f.notify <- block:
  216. return nil
  217. case <-f.quit:
  218. return errTerminated
  219. }
  220. }
  221. // Enqueue tries to fill gaps the fetcher's future import queue.
  222. func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
  223. op := &blockOrHeaderInject{
  224. origin: peer,
  225. block: block,
  226. }
  227. select {
  228. case f.inject <- op:
  229. return nil
  230. case <-f.quit:
  231. return errTerminated
  232. }
  233. }
  234. // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  235. // returning those that should be handled differently.
  236. func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
  237. log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
  238. // Send the filter channel to the fetcher
  239. filter := make(chan *headerFilterTask)
  240. select {
  241. case f.headerFilter <- filter:
  242. case <-f.quit:
  243. return nil
  244. }
  245. // Request the filtering of the header list
  246. select {
  247. case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
  248. case <-f.quit:
  249. return nil
  250. }
  251. // Retrieve the headers remaining after filtering
  252. select {
  253. case task := <-filter:
  254. return task.headers
  255. case <-f.quit:
  256. return nil
  257. }
  258. }
  259. // FilterBodies extracts all the block bodies that were explicitly requested by
  260. // the fetcher, returning those that should be handled differently.
  261. func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
  262. log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
  263. // Send the filter channel to the fetcher
  264. filter := make(chan *bodyFilterTask)
  265. select {
  266. case f.bodyFilter <- filter:
  267. case <-f.quit:
  268. return nil, nil
  269. }
  270. // Request the filtering of the body list
  271. select {
  272. case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
  273. case <-f.quit:
  274. return nil, nil
  275. }
  276. // Retrieve the bodies remaining after filtering
  277. select {
  278. case task := <-filter:
  279. return task.transactions, task.uncles
  280. case <-f.quit:
  281. return nil, nil
  282. }
  283. }
  284. // Loop is the main fetcher loop, checking and processing various notification
  285. // events.
  286. func (f *BlockFetcher) loop() {
  287. // Iterate the block fetching until a quit is requested
  288. var (
  289. fetchTimer = time.NewTimer(0)
  290. completeTimer = time.NewTimer(0)
  291. )
  292. <-fetchTimer.C // clear out the channel
  293. <-completeTimer.C
  294. defer fetchTimer.Stop()
  295. defer completeTimer.Stop()
  296. for {
  297. // Clean up any expired block fetches
  298. for hash, announce := range f.fetching {
  299. if time.Since(announce.time) > fetchTimeout {
  300. f.forgetHash(hash)
  301. }
  302. }
  303. // Import any queued blocks that could potentially fit
  304. height := f.chainHeight()
  305. for !f.queue.Empty() {
  306. op := f.queue.PopItem().(*blockOrHeaderInject)
  307. hash := op.hash()
  308. if f.queueChangeHook != nil {
  309. f.queueChangeHook(hash, false)
  310. }
  311. // If too high up the chain or phase, continue later
  312. number := op.number()
  313. if number > height+1 {
  314. f.queue.Push(op, -int64(number))
  315. if f.queueChangeHook != nil {
  316. f.queueChangeHook(hash, true)
  317. }
  318. break
  319. }
  320. // Otherwise if fresh and still unknown, try and import
  321. if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
  322. f.forgetBlock(hash)
  323. continue
  324. }
  325. if f.light {
  326. f.importHeaders(op.origin, op.header)
  327. } else {
  328. f.importBlocks(op.origin, op.block)
  329. }
  330. }
  331. // Wait for an outside event to occur
  332. select {
  333. case <-f.quit:
  334. // BlockFetcher terminating, abort all operations
  335. return
  336. case notification := <-f.notify:
  337. // A block was announced, make sure the peer isn't DOSing us
  338. blockAnnounceInMeter.Mark(1)
  339. count := f.announces[notification.origin] + 1
  340. if count > hashLimit {
  341. log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
  342. blockAnnounceDOSMeter.Mark(1)
  343. break
  344. }
  345. // If we have a valid block number, check that it's potentially useful
  346. if notification.number > 0 {
  347. if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  348. log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
  349. blockAnnounceDropMeter.Mark(1)
  350. break
  351. }
  352. }
  353. // All is well, schedule the announce if block's not yet downloading
  354. if _, ok := f.fetching[notification.hash]; ok {
  355. break
  356. }
  357. if _, ok := f.completing[notification.hash]; ok {
  358. break
  359. }
  360. f.announces[notification.origin] = count
  361. f.announced[notification.hash] = append(f.announced[notification.hash], notification)
  362. if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
  363. f.announceChangeHook(notification.hash, true)
  364. }
  365. if len(f.announced) == 1 {
  366. f.rescheduleFetch(fetchTimer)
  367. }
  368. case op := <-f.inject:
  369. // A direct block insertion was requested, try and fill any pending gaps
  370. blockBroadcastInMeter.Mark(1)
  371. // Now only direct block injection is allowed, drop the header injection
  372. // here silently if we receive.
  373. if f.light {
  374. continue
  375. }
  376. f.enqueue(op.origin, nil, op.block)
  377. case hash := <-f.done:
  378. // A pending import finished, remove all traces of the notification
  379. f.forgetHash(hash)
  380. f.forgetBlock(hash)
  381. case <-fetchTimer.C:
  382. // At least one block's timer ran out, check for needing retrieval
  383. request := make(map[string][]common.Hash)
  384. for hash, announces := range f.announced {
  385. // In current LES protocol(les2/les3), only header announce is
  386. // available, no need to wait too much time for header broadcast.
  387. timeout := arriveTimeout - gatherSlack
  388. if f.light {
  389. timeout = 0
  390. }
  391. if time.Since(announces[0].time) > timeout {
  392. // Pick a random peer to retrieve from, reset all others
  393. announce := announces[rand.Intn(len(announces))]
  394. f.forgetHash(hash)
  395. // If the block still didn't arrive, queue for fetching
  396. if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
  397. request[announce.origin] = append(request[announce.origin], hash)
  398. f.fetching[hash] = announce
  399. }
  400. }
  401. }
  402. // Send out all block header requests
  403. for peer, hashes := range request {
  404. log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
  405. // Create a closure of the fetch and schedule in on a new thread
  406. fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
  407. go func() {
  408. if f.fetchingHook != nil {
  409. f.fetchingHook(hashes)
  410. }
  411. for _, hash := range hashes {
  412. headerFetchMeter.Mark(1)
  413. fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
  414. }
  415. }()
  416. }
  417. // Schedule the next fetch if blocks are still pending
  418. f.rescheduleFetch(fetchTimer)
  419. case <-completeTimer.C:
  420. // At least one header's timer ran out, retrieve everything
  421. request := make(map[string][]common.Hash)
  422. for hash, announces := range f.fetched {
  423. // Pick a random peer to retrieve from, reset all others
  424. announce := announces[rand.Intn(len(announces))]
  425. f.forgetHash(hash)
  426. // If the block still didn't arrive, queue for completion
  427. if f.getBlock(hash) == nil {
  428. request[announce.origin] = append(request[announce.origin], hash)
  429. f.completing[hash] = announce
  430. }
  431. }
  432. // Send out all block body requests
  433. for peer, hashes := range request {
  434. log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
  435. // Create a closure of the fetch and schedule in on a new thread
  436. if f.completingHook != nil {
  437. f.completingHook(hashes)
  438. }
  439. bodyFetchMeter.Mark(int64(len(hashes)))
  440. go f.completing[hashes[0]].fetchBodies(hashes)
  441. }
  442. // Schedule the next fetch if blocks are still pending
  443. f.rescheduleComplete(completeTimer)
  444. case filter := <-f.headerFilter:
  445. // Headers arrived from a remote peer. Extract those that were explicitly
  446. // requested by the fetcher, and return everything else so it's delivered
  447. // to other parts of the system.
  448. var task *headerFilterTask
  449. select {
  450. case task = <-filter:
  451. case <-f.quit:
  452. return
  453. }
  454. headerFilterInMeter.Mark(int64(len(task.headers)))
  455. // Split the batch of headers into unknown ones (to return to the caller),
  456. // known incomplete ones (requiring body retrievals) and completed blocks.
  457. unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
  458. for _, header := range task.headers {
  459. hash := header.Hash()
  460. // Filter fetcher-requested headers from other synchronisation algorithms
  461. if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
  462. // If the delivered header does not match the promised number, drop the announcer
  463. if header.Number.Uint64() != announce.number {
  464. log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
  465. f.dropPeer(announce.origin)
  466. f.forgetHash(hash)
  467. continue
  468. }
  469. // Collect all headers only if we are running in light
  470. // mode and the headers are not imported by other means.
  471. if f.light {
  472. if f.getHeader(hash) == nil {
  473. announce.header = header
  474. lightHeaders = append(lightHeaders, announce)
  475. }
  476. f.forgetHash(hash)
  477. continue
  478. }
  479. // Only keep if not imported by other means
  480. if f.getBlock(hash) == nil {
  481. announce.header = header
  482. announce.time = task.time
  483. // If the block is empty (header only), short circuit into the final import queue
  484. if header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash {
  485. log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  486. block := types.NewBlockWithHeader(header)
  487. block.ReceivedAt = task.time
  488. complete = append(complete, block)
  489. f.completing[hash] = announce
  490. continue
  491. }
  492. // Otherwise add to the list of blocks needing completion
  493. incomplete = append(incomplete, announce)
  494. } else {
  495. log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  496. f.forgetHash(hash)
  497. }
  498. } else {
  499. // BlockFetcher doesn't know about it, add to the return list
  500. unknown = append(unknown, header)
  501. }
  502. }
  503. headerFilterOutMeter.Mark(int64(len(unknown)))
  504. select {
  505. case filter <- &headerFilterTask{headers: unknown, time: task.time}:
  506. case <-f.quit:
  507. return
  508. }
  509. // Schedule the retrieved headers for body completion
  510. for _, announce := range incomplete {
  511. hash := announce.header.Hash()
  512. if _, ok := f.completing[hash]; ok {
  513. continue
  514. }
  515. f.fetched[hash] = append(f.fetched[hash], announce)
  516. if len(f.fetched) == 1 {
  517. f.rescheduleComplete(completeTimer)
  518. }
  519. }
  520. // Schedule the header for light fetcher import
  521. for _, announce := range lightHeaders {
  522. f.enqueue(announce.origin, announce.header, nil)
  523. }
  524. // Schedule the header-only blocks for import
  525. for _, block := range complete {
  526. if announce := f.completing[block.Hash()]; announce != nil {
  527. f.enqueue(announce.origin, nil, block)
  528. }
  529. }
  530. case filter := <-f.bodyFilter:
  531. // Block bodies arrived, extract any explicitly requested blocks, return the rest
  532. var task *bodyFilterTask
  533. select {
  534. case task = <-filter:
  535. case <-f.quit:
  536. return
  537. }
  538. bodyFilterInMeter.Mark(int64(len(task.transactions)))
  539. blocks := []*types.Block{}
  540. // abort early if there's nothing explicitly requested
  541. if len(f.completing) > 0 {
  542. for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
  543. // Match up a body to any possible completion request
  544. var (
  545. matched = false
  546. uncleHash common.Hash // calculated lazily and reused
  547. txnHash common.Hash // calculated lazily and reused
  548. )
  549. for hash, announce := range f.completing {
  550. if f.queued[hash] != nil || announce.origin != task.peer {
  551. continue
  552. }
  553. if uncleHash == (common.Hash{}) {
  554. uncleHash = types.CalcUncleHash(task.uncles[i])
  555. }
  556. if uncleHash != announce.header.UncleHash {
  557. continue
  558. }
  559. if txnHash == (common.Hash{}) {
  560. txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), trie.NewStackTrie(nil))
  561. }
  562. if txnHash != announce.header.TxHash {
  563. continue
  564. }
  565. // Mark the body matched, reassemble if still unknown
  566. matched = true
  567. if f.getBlock(hash) == nil {
  568. block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
  569. block.ReceivedAt = task.time
  570. blocks = append(blocks, block)
  571. } else {
  572. f.forgetHash(hash)
  573. }
  574. }
  575. if matched {
  576. task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
  577. task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
  578. i--
  579. continue
  580. }
  581. }
  582. }
  583. bodyFilterOutMeter.Mark(int64(len(task.transactions)))
  584. select {
  585. case filter <- task:
  586. case <-f.quit:
  587. return
  588. }
  589. // Schedule the retrieved blocks for ordered import
  590. for _, block := range blocks {
  591. if announce := f.completing[block.Hash()]; announce != nil {
  592. f.enqueue(announce.origin, nil, block)
  593. }
  594. }
  595. }
  596. }
  597. }
  598. // rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
  599. func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
  600. // Short circuit if no blocks are announced
  601. if len(f.announced) == 0 {
  602. return
  603. }
  604. // Schedule announcement retrieval quickly for light mode
  605. // since server won't send any headers to client.
  606. if f.light {
  607. fetch.Reset(lightTimeout)
  608. return
  609. }
  610. // Otherwise find the earliest expiring announcement
  611. earliest := time.Now()
  612. for _, announces := range f.announced {
  613. if earliest.After(announces[0].time) {
  614. earliest = announces[0].time
  615. }
  616. }
  617. fetch.Reset(arriveTimeout - time.Since(earliest))
  618. }
  619. // rescheduleComplete resets the specified completion timer to the next fetch timeout.
  620. func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
  621. // Short circuit if no headers are fetched
  622. if len(f.fetched) == 0 {
  623. return
  624. }
  625. // Otherwise find the earliest expiring announcement
  626. earliest := time.Now()
  627. for _, announces := range f.fetched {
  628. if earliest.After(announces[0].time) {
  629. earliest = announces[0].time
  630. }
  631. }
  632. complete.Reset(gatherSlack - time.Since(earliest))
  633. }
  634. // enqueue schedules a new header or block import operation, if the component
  635. // to be imported has not yet been seen.
  636. func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) {
  637. var (
  638. hash common.Hash
  639. number uint64
  640. )
  641. if header != nil {
  642. hash, number = header.Hash(), header.Number.Uint64()
  643. } else {
  644. hash, number = block.Hash(), block.NumberU64()
  645. }
  646. // Ensure the peer isn't DOSing us
  647. count := f.queues[peer] + 1
  648. if count > blockLimit {
  649. log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
  650. blockBroadcastDOSMeter.Mark(1)
  651. f.forgetHash(hash)
  652. return
  653. }
  654. // Discard any past or too distant blocks
  655. if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  656. log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
  657. blockBroadcastDropMeter.Mark(1)
  658. f.forgetHash(hash)
  659. return
  660. }
  661. // Schedule the block for future importing
  662. if _, ok := f.queued[hash]; !ok {
  663. op := &blockOrHeaderInject{origin: peer}
  664. if header != nil {
  665. op.header = header
  666. } else {
  667. op.block = block
  668. }
  669. f.queues[peer] = count
  670. f.queued[hash] = op
  671. f.queue.Push(op, -int64(number))
  672. if f.queueChangeHook != nil {
  673. f.queueChangeHook(hash, true)
  674. }
  675. log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
  676. }
  677. }
  678. // importHeaders spawns a new goroutine to run a header insertion into the chain.
  679. // If the header's number is at the same height as the current import phase, it
  680. // updates the phase states accordingly.
  681. func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
  682. hash := header.Hash()
  683. log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
  684. go func() {
  685. defer func() { f.done <- hash }()
  686. // If the parent's unknown, abort insertion
  687. parent := f.getHeader(header.ParentHash)
  688. if parent == nil {
  689. log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
  690. return
  691. }
  692. // Validate the header and if something went wrong, drop the peer
  693. if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
  694. log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  695. f.dropPeer(peer)
  696. return
  697. }
  698. // Run the actual import and log any issues
  699. if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
  700. log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  701. return
  702. }
  703. // Invoke the testing hook if needed
  704. if f.importedHook != nil {
  705. f.importedHook(header, nil)
  706. }
  707. }()
  708. }
  709. // importBlocks spawns a new goroutine to run a block insertion into the chain. If the
  710. // block's number is at the same height as the current import phase, it updates
  711. // the phase states accordingly.
  712. func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
  713. hash := block.Hash()
  714. // Run the import on a new thread
  715. log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
  716. go func() {
  717. defer func() { f.done <- hash }()
  718. // If the parent's unknown, abort insertion
  719. parent := f.getBlock(block.ParentHash())
  720. if parent == nil {
  721. log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
  722. return
  723. }
  724. // Quickly validate the header and propagate the block if it passes
  725. switch err := f.verifyHeader(block.Header()); err {
  726. case nil:
  727. // All ok, quickly propagate to our peers
  728. blockBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  729. go f.broadcastBlock(block, true)
  730. case consensus.ErrFutureBlock:
  731. // Weird future block, don't fail, but neither propagate
  732. default:
  733. // Something went very wrong, drop the peer
  734. log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  735. f.dropPeer(peer)
  736. return
  737. }
  738. // Run the actual import and log any issues
  739. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  740. log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  741. return
  742. }
  743. // If import succeeded, broadcast the block
  744. blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  745. go f.broadcastBlock(block, false)
  746. // Invoke the testing hook if needed
  747. if f.importedHook != nil {
  748. f.importedHook(nil, block)
  749. }
  750. }()
  751. }
  752. // forgetHash removes all traces of a block announcement from the fetcher's
  753. // internal state.
  754. func (f *BlockFetcher) forgetHash(hash common.Hash) {
  755. // Remove all pending announces and decrement DOS counters
  756. if announceMap, ok := f.announced[hash]; ok {
  757. for _, announce := range announceMap {
  758. f.announces[announce.origin]--
  759. if f.announces[announce.origin] <= 0 {
  760. delete(f.announces, announce.origin)
  761. }
  762. }
  763. delete(f.announced, hash)
  764. if f.announceChangeHook != nil {
  765. f.announceChangeHook(hash, false)
  766. }
  767. }
  768. // Remove any pending fetches and decrement the DOS counters
  769. if announce := f.fetching[hash]; announce != nil {
  770. f.announces[announce.origin]--
  771. if f.announces[announce.origin] <= 0 {
  772. delete(f.announces, announce.origin)
  773. }
  774. delete(f.fetching, hash)
  775. }
  776. // Remove any pending completion requests and decrement the DOS counters
  777. for _, announce := range f.fetched[hash] {
  778. f.announces[announce.origin]--
  779. if f.announces[announce.origin] <= 0 {
  780. delete(f.announces, announce.origin)
  781. }
  782. }
  783. delete(f.fetched, hash)
  784. // Remove any pending completions and decrement the DOS counters
  785. if announce := f.completing[hash]; announce != nil {
  786. f.announces[announce.origin]--
  787. if f.announces[announce.origin] <= 0 {
  788. delete(f.announces, announce.origin)
  789. }
  790. delete(f.completing, hash)
  791. }
  792. }
  793. // forgetBlock removes all traces of a queued block from the fetcher's internal
  794. // state.
  795. func (f *BlockFetcher) forgetBlock(hash common.Hash) {
  796. if insert := f.queued[hash]; insert != nil {
  797. f.queues[insert.origin]--
  798. if f.queues[insert.origin] == 0 {
  799. delete(f.queues, insert.origin)
  800. }
  801. delete(f.queued, hash)
  802. }
  803. }