block_fetcher.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package fetcher contains the announcement based header, blocks or transaction synchronisation.
  17. package fetcher
  18. import (
  19. "errors"
  20. "math/rand"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/consensus"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/metrics"
  28. "github.com/ethereum/go-ethereum/trie"
  29. )
  30. const (
  31. lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
  32. arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
  33. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
  34. fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
  35. )
  36. const (
  37. maxUncleDist = 7 // Maximum allowed backward distance from the chain head
  38. maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
  39. hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
  40. blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
  41. )
  42. var (
  43. blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/in", nil)
  44. blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/announces/out", nil)
  45. blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/drop", nil)
  46. blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/dos", nil)
  47. blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil)
  48. blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil)
  49. blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil)
  50. blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil)
  51. headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil)
  52. bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil)
  53. headerFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/in", nil)
  54. headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/out", nil)
  55. bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/in", nil)
  56. bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
  57. )
  58. var errTerminated = errors.New("terminated")
  59. // HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
  60. type HeaderRetrievalFn func(common.Hash) *types.Header
  61. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  62. type blockRetrievalFn func(common.Hash) *types.Block
  63. // headerRequesterFn is a callback type for sending a header retrieval request.
  64. type headerRequesterFn func(common.Hash) error
  65. // bodyRequesterFn is a callback type for sending a body retrieval request.
  66. type bodyRequesterFn func([]common.Hash) error
  67. // headerVerifierFn is a callback type to verify a block's header for fast propagation.
  68. type headerVerifierFn func(header *types.Header) error
  69. // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
  70. type blockBroadcasterFn func(block *types.Block, propagate bool)
  71. // chainHeightFn is a callback type to retrieve the current chain height.
  72. type chainHeightFn func() uint64
  73. // headersInsertFn is a callback type to insert a batch of headers into the local chain.
  74. type headersInsertFn func(headers []*types.Header) (int, error)
  75. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  76. type chainInsertFn func(types.Blocks) (int, error)
  77. // peerDropFn is a callback type for dropping a peer detected as malicious.
  78. type peerDropFn func(id string)
  79. // blockAnnounce is the hash notification of the availability of a new block in the
  80. // network.
  81. type blockAnnounce struct {
  82. hash common.Hash // Hash of the block being announced
  83. number uint64 // Number of the block being announced (0 = unknown | old protocol)
  84. header *types.Header // Header of the block partially reassembled (new protocol)
  85. time time.Time // Timestamp of the announcement
  86. origin string // Identifier of the peer originating the notification
  87. fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
  88. fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
  89. }
  90. // headerFilterTask represents a batch of headers needing fetcher filtering.
  91. type headerFilterTask struct {
  92. peer string // The source peer of block headers
  93. headers []*types.Header // Collection of headers to filter
  94. time time.Time // Arrival time of the headers
  95. }
  96. // bodyFilterTask represents a batch of block bodies (transactions and uncles)
  97. // needing fetcher filtering.
  98. type bodyFilterTask struct {
  99. peer string // The source peer of block bodies
  100. transactions [][]*types.Transaction // Collection of transactions per block bodies
  101. uncles [][]*types.Header // Collection of uncles per block bodies
  102. time time.Time // Arrival time of the blocks' contents
  103. }
  104. // blockOrHeaderInject represents a schedules import operation.
  105. type blockOrHeaderInject struct {
  106. origin string
  107. header *types.Header // Used for light mode fetcher which only cares about header.
  108. block *types.Block // Used for normal mode fetcher which imports full block.
  109. }
  110. // number returns the block number of the injected object.
  111. func (inject *blockOrHeaderInject) number() uint64 {
  112. if inject.header != nil {
  113. return inject.header.Number.Uint64()
  114. }
  115. return inject.block.NumberU64()
  116. }
  117. // number returns the block hash of the injected object.
  118. func (inject *blockOrHeaderInject) hash() common.Hash {
  119. if inject.header != nil {
  120. return inject.header.Hash()
  121. }
  122. return inject.block.Hash()
  123. }
  124. // BlockFetcher is responsible for accumulating block announcements from various peers
  125. // and scheduling them for retrieval.
  126. type BlockFetcher struct {
  127. light bool // The indicator whether it's a light fetcher or normal one.
  128. // Various event channels
  129. notify chan *blockAnnounce
  130. inject chan *blockOrHeaderInject
  131. headerFilter chan chan *headerFilterTask
  132. bodyFilter chan chan *bodyFilterTask
  133. done chan common.Hash
  134. quit chan struct{}
  135. // Announce states
  136. announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
  137. announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
  138. fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching
  139. fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
  140. completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
  141. // Block cache
  142. queue *prque.Prque // Queue containing the import operations (block number sorted)
  143. queues map[string]int // Per peer block counts to prevent memory exhaustion
  144. queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
  145. // Callbacks
  146. getHeader HeaderRetrievalFn // Retrieves a header from the local chain
  147. getBlock blockRetrievalFn // Retrieves a block from the local chain
  148. verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
  149. broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
  150. chainHeight chainHeightFn // Retrieves the current chain's height
  151. insertHeaders headersInsertFn // Injects a batch of headers into the chain
  152. insertChain chainInsertFn // Injects a batch of blocks into the chain
  153. dropPeer peerDropFn // Drops a peer for misbehaving
  154. // Testing hooks
  155. announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
  156. queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
  157. fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
  158. completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
  159. importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)
  160. }
  161. // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
  162. func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
  163. return &BlockFetcher{
  164. light: light,
  165. notify: make(chan *blockAnnounce),
  166. inject: make(chan *blockOrHeaderInject),
  167. headerFilter: make(chan chan *headerFilterTask),
  168. bodyFilter: make(chan chan *bodyFilterTask),
  169. done: make(chan common.Hash),
  170. quit: make(chan struct{}),
  171. announces: make(map[string]int),
  172. announced: make(map[common.Hash][]*blockAnnounce),
  173. fetching: make(map[common.Hash]*blockAnnounce),
  174. fetched: make(map[common.Hash][]*blockAnnounce),
  175. completing: make(map[common.Hash]*blockAnnounce),
  176. queue: prque.New(nil),
  177. queues: make(map[string]int),
  178. queued: make(map[common.Hash]*blockOrHeaderInject),
  179. getHeader: getHeader,
  180. getBlock: getBlock,
  181. verifyHeader: verifyHeader,
  182. broadcastBlock: broadcastBlock,
  183. chainHeight: chainHeight,
  184. insertHeaders: insertHeaders,
  185. insertChain: insertChain,
  186. dropPeer: dropPeer,
  187. }
  188. }
  189. // Start boots up the announcement based synchroniser, accepting and processing
  190. // hash notifications and block fetches until termination requested.
  191. func (f *BlockFetcher) Start() {
  192. go f.loop()
  193. }
  194. // Stop terminates the announcement based synchroniser, canceling all pending
  195. // operations.
  196. func (f *BlockFetcher) Stop() {
  197. close(f.quit)
  198. }
  199. // Notify announces the fetcher of the potential availability of a new block in
  200. // the network.
  201. func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
  202. headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  203. block := &blockAnnounce{
  204. hash: hash,
  205. number: number,
  206. time: time,
  207. origin: peer,
  208. fetchHeader: headerFetcher,
  209. fetchBodies: bodyFetcher,
  210. }
  211. select {
  212. case f.notify <- block:
  213. return nil
  214. case <-f.quit:
  215. return errTerminated
  216. }
  217. }
  218. // Enqueue tries to fill gaps the fetcher's future import queue.
  219. func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
  220. op := &blockOrHeaderInject{
  221. origin: peer,
  222. block: block,
  223. }
  224. select {
  225. case f.inject <- op:
  226. return nil
  227. case <-f.quit:
  228. return errTerminated
  229. }
  230. }
  231. // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  232. // returning those that should be handled differently.
  233. func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
  234. log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
  235. // Send the filter channel to the fetcher
  236. filter := make(chan *headerFilterTask)
  237. select {
  238. case f.headerFilter <- filter:
  239. case <-f.quit:
  240. return nil
  241. }
  242. // Request the filtering of the header list
  243. select {
  244. case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
  245. case <-f.quit:
  246. return nil
  247. }
  248. // Retrieve the headers remaining after filtering
  249. select {
  250. case task := <-filter:
  251. return task.headers
  252. case <-f.quit:
  253. return nil
  254. }
  255. }
  256. // FilterBodies extracts all the block bodies that were explicitly requested by
  257. // the fetcher, returning those that should be handled differently.
  258. func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
  259. log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
  260. // Send the filter channel to the fetcher
  261. filter := make(chan *bodyFilterTask)
  262. select {
  263. case f.bodyFilter <- filter:
  264. case <-f.quit:
  265. return nil, nil
  266. }
  267. // Request the filtering of the body list
  268. select {
  269. case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
  270. case <-f.quit:
  271. return nil, nil
  272. }
  273. // Retrieve the bodies remaining after filtering
  274. select {
  275. case task := <-filter:
  276. return task.transactions, task.uncles
  277. case <-f.quit:
  278. return nil, nil
  279. }
  280. }
  281. // Loop is the main fetcher loop, checking and processing various notification
  282. // events.
  283. func (f *BlockFetcher) loop() {
  284. // Iterate the block fetching until a quit is requested
  285. fetchTimer := time.NewTimer(0)
  286. completeTimer := time.NewTimer(0)
  287. defer fetchTimer.Stop()
  288. defer completeTimer.Stop()
  289. for {
  290. // Clean up any expired block fetches
  291. for hash, announce := range f.fetching {
  292. if time.Since(announce.time) > fetchTimeout {
  293. f.forgetHash(hash)
  294. }
  295. }
  296. // Import any queued blocks that could potentially fit
  297. height := f.chainHeight()
  298. for !f.queue.Empty() {
  299. op := f.queue.PopItem().(*blockOrHeaderInject)
  300. hash := op.hash()
  301. if f.queueChangeHook != nil {
  302. f.queueChangeHook(hash, false)
  303. }
  304. // If too high up the chain or phase, continue later
  305. number := op.number()
  306. if number > height+1 {
  307. f.queue.Push(op, -int64(number))
  308. if f.queueChangeHook != nil {
  309. f.queueChangeHook(hash, true)
  310. }
  311. break
  312. }
  313. // Otherwise if fresh and still unknown, try and import
  314. if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
  315. f.forgetBlock(hash)
  316. continue
  317. }
  318. if f.light {
  319. f.importHeaders(op.origin, op.header)
  320. } else {
  321. f.importBlocks(op.origin, op.block)
  322. }
  323. }
  324. // Wait for an outside event to occur
  325. select {
  326. case <-f.quit:
  327. // BlockFetcher terminating, abort all operations
  328. return
  329. case notification := <-f.notify:
  330. // A block was announced, make sure the peer isn't DOSing us
  331. blockAnnounceInMeter.Mark(1)
  332. count := f.announces[notification.origin] + 1
  333. if count > hashLimit {
  334. log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
  335. blockAnnounceDOSMeter.Mark(1)
  336. break
  337. }
  338. // If we have a valid block number, check that it's potentially useful
  339. if notification.number > 0 {
  340. if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  341. log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
  342. blockAnnounceDropMeter.Mark(1)
  343. break
  344. }
  345. }
  346. // All is well, schedule the announce if block's not yet downloading
  347. if _, ok := f.fetching[notification.hash]; ok {
  348. break
  349. }
  350. if _, ok := f.completing[notification.hash]; ok {
  351. break
  352. }
  353. f.announces[notification.origin] = count
  354. f.announced[notification.hash] = append(f.announced[notification.hash], notification)
  355. if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
  356. f.announceChangeHook(notification.hash, true)
  357. }
  358. if len(f.announced) == 1 {
  359. f.rescheduleFetch(fetchTimer)
  360. }
  361. case op := <-f.inject:
  362. // A direct block insertion was requested, try and fill any pending gaps
  363. blockBroadcastInMeter.Mark(1)
  364. // Now only direct block injection is allowed, drop the header injection
  365. // here silently if we receive.
  366. if f.light {
  367. continue
  368. }
  369. f.enqueue(op.origin, nil, op.block)
  370. case hash := <-f.done:
  371. // A pending import finished, remove all traces of the notification
  372. f.forgetHash(hash)
  373. f.forgetBlock(hash)
  374. case <-fetchTimer.C:
  375. // At least one block's timer ran out, check for needing retrieval
  376. request := make(map[string][]common.Hash)
  377. for hash, announces := range f.announced {
  378. // In current LES protocol(les2/les3), only header announce is
  379. // available, no need to wait too much time for header broadcast.
  380. timeout := arriveTimeout - gatherSlack
  381. if f.light {
  382. timeout = 0
  383. }
  384. if time.Since(announces[0].time) > timeout {
  385. // Pick a random peer to retrieve from, reset all others
  386. announce := announces[rand.Intn(len(announces))]
  387. f.forgetHash(hash)
  388. // If the block still didn't arrive, queue for fetching
  389. if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
  390. request[announce.origin] = append(request[announce.origin], hash)
  391. f.fetching[hash] = announce
  392. }
  393. }
  394. }
  395. // Send out all block header requests
  396. for peer, hashes := range request {
  397. log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
  398. // Create a closure of the fetch and schedule in on a new thread
  399. fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
  400. go func() {
  401. if f.fetchingHook != nil {
  402. f.fetchingHook(hashes)
  403. }
  404. for _, hash := range hashes {
  405. headerFetchMeter.Mark(1)
  406. fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
  407. }
  408. }()
  409. }
  410. // Schedule the next fetch if blocks are still pending
  411. f.rescheduleFetch(fetchTimer)
  412. case <-completeTimer.C:
  413. // At least one header's timer ran out, retrieve everything
  414. request := make(map[string][]common.Hash)
  415. for hash, announces := range f.fetched {
  416. // Pick a random peer to retrieve from, reset all others
  417. announce := announces[rand.Intn(len(announces))]
  418. f.forgetHash(hash)
  419. // If the block still didn't arrive, queue for completion
  420. if f.getBlock(hash) == nil {
  421. request[announce.origin] = append(request[announce.origin], hash)
  422. f.completing[hash] = announce
  423. }
  424. }
  425. // Send out all block body requests
  426. for peer, hashes := range request {
  427. log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
  428. // Create a closure of the fetch and schedule in on a new thread
  429. if f.completingHook != nil {
  430. f.completingHook(hashes)
  431. }
  432. bodyFetchMeter.Mark(int64(len(hashes)))
  433. go f.completing[hashes[0]].fetchBodies(hashes)
  434. }
  435. // Schedule the next fetch if blocks are still pending
  436. f.rescheduleComplete(completeTimer)
  437. case filter := <-f.headerFilter:
  438. // Headers arrived from a remote peer. Extract those that were explicitly
  439. // requested by the fetcher, and return everything else so it's delivered
  440. // to other parts of the system.
  441. var task *headerFilterTask
  442. select {
  443. case task = <-filter:
  444. case <-f.quit:
  445. return
  446. }
  447. headerFilterInMeter.Mark(int64(len(task.headers)))
  448. // Split the batch of headers into unknown ones (to return to the caller),
  449. // known incomplete ones (requiring body retrievals) and completed blocks.
  450. unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
  451. for _, header := range task.headers {
  452. hash := header.Hash()
  453. // Filter fetcher-requested headers from other synchronisation algorithms
  454. if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
  455. // If the delivered header does not match the promised number, drop the announcer
  456. if header.Number.Uint64() != announce.number {
  457. log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
  458. f.dropPeer(announce.origin)
  459. f.forgetHash(hash)
  460. continue
  461. }
  462. // Collect all headers only if we are running in light
  463. // mode and the headers are not imported by other means.
  464. if f.light {
  465. if f.getHeader(hash) == nil {
  466. announce.header = header
  467. lightHeaders = append(lightHeaders, announce)
  468. }
  469. f.forgetHash(hash)
  470. continue
  471. }
  472. // Only keep if not imported by other means
  473. if f.getBlock(hash) == nil {
  474. announce.header = header
  475. announce.time = task.time
  476. // If the block is empty (header only), short circuit into the final import queue
  477. if header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash {
  478. log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  479. block := types.NewBlockWithHeader(header)
  480. block.ReceivedAt = task.time
  481. complete = append(complete, block)
  482. f.completing[hash] = announce
  483. continue
  484. }
  485. // Otherwise add to the list of blocks needing completion
  486. incomplete = append(incomplete, announce)
  487. } else {
  488. log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  489. f.forgetHash(hash)
  490. }
  491. } else {
  492. // BlockFetcher doesn't know about it, add to the return list
  493. unknown = append(unknown, header)
  494. }
  495. }
  496. headerFilterOutMeter.Mark(int64(len(unknown)))
  497. select {
  498. case filter <- &headerFilterTask{headers: unknown, time: task.time}:
  499. case <-f.quit:
  500. return
  501. }
  502. // Schedule the retrieved headers for body completion
  503. for _, announce := range incomplete {
  504. hash := announce.header.Hash()
  505. if _, ok := f.completing[hash]; ok {
  506. continue
  507. }
  508. f.fetched[hash] = append(f.fetched[hash], announce)
  509. if len(f.fetched) == 1 {
  510. f.rescheduleComplete(completeTimer)
  511. }
  512. }
  513. // Schedule the header for light fetcher import
  514. for _, announce := range lightHeaders {
  515. f.enqueue(announce.origin, announce.header, nil)
  516. }
  517. // Schedule the header-only blocks for import
  518. for _, block := range complete {
  519. if announce := f.completing[block.Hash()]; announce != nil {
  520. f.enqueue(announce.origin, nil, block)
  521. }
  522. }
  523. case filter := <-f.bodyFilter:
  524. // Block bodies arrived, extract any explicitly requested blocks, return the rest
  525. var task *bodyFilterTask
  526. select {
  527. case task = <-filter:
  528. case <-f.quit:
  529. return
  530. }
  531. bodyFilterInMeter.Mark(int64(len(task.transactions)))
  532. blocks := []*types.Block{}
  533. // abort early if there's nothing explicitly requested
  534. if len(f.completing) > 0 {
  535. for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
  536. // Match up a body to any possible completion request
  537. var (
  538. matched = false
  539. uncleHash common.Hash // calculated lazily and reused
  540. txnHash common.Hash // calculated lazily and reused
  541. )
  542. for hash, announce := range f.completing {
  543. if f.queued[hash] != nil || announce.origin != task.peer {
  544. continue
  545. }
  546. if uncleHash == (common.Hash{}) {
  547. uncleHash = types.CalcUncleHash(task.uncles[i])
  548. }
  549. if uncleHash != announce.header.UncleHash {
  550. continue
  551. }
  552. if txnHash == (common.Hash{}) {
  553. txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), trie.NewStackTrie(nil))
  554. }
  555. if txnHash != announce.header.TxHash {
  556. continue
  557. }
  558. // Mark the body matched, reassemble if still unknown
  559. matched = true
  560. if f.getBlock(hash) == nil {
  561. block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
  562. block.ReceivedAt = task.time
  563. blocks = append(blocks, block)
  564. } else {
  565. f.forgetHash(hash)
  566. }
  567. }
  568. if matched {
  569. task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
  570. task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
  571. i--
  572. continue
  573. }
  574. }
  575. }
  576. bodyFilterOutMeter.Mark(int64(len(task.transactions)))
  577. select {
  578. case filter <- task:
  579. case <-f.quit:
  580. return
  581. }
  582. // Schedule the retrieved blocks for ordered import
  583. for _, block := range blocks {
  584. if announce := f.completing[block.Hash()]; announce != nil {
  585. f.enqueue(announce.origin, nil, block)
  586. }
  587. }
  588. }
  589. }
  590. }
  591. // rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
  592. func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
  593. // Short circuit if no blocks are announced
  594. if len(f.announced) == 0 {
  595. return
  596. }
  597. // Schedule announcement retrieval quickly for light mode
  598. // since server won't send any headers to client.
  599. if f.light {
  600. fetch.Reset(lightTimeout)
  601. return
  602. }
  603. // Otherwise find the earliest expiring announcement
  604. earliest := time.Now()
  605. for _, announces := range f.announced {
  606. if earliest.After(announces[0].time) {
  607. earliest = announces[0].time
  608. }
  609. }
  610. fetch.Reset(arriveTimeout - time.Since(earliest))
  611. }
  612. // rescheduleComplete resets the specified completion timer to the next fetch timeout.
  613. func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
  614. // Short circuit if no headers are fetched
  615. if len(f.fetched) == 0 {
  616. return
  617. }
  618. // Otherwise find the earliest expiring announcement
  619. earliest := time.Now()
  620. for _, announces := range f.fetched {
  621. if earliest.After(announces[0].time) {
  622. earliest = announces[0].time
  623. }
  624. }
  625. complete.Reset(gatherSlack - time.Since(earliest))
  626. }
  627. // enqueue schedules a new header or block import operation, if the component
  628. // to be imported has not yet been seen.
  629. func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) {
  630. var (
  631. hash common.Hash
  632. number uint64
  633. )
  634. if header != nil {
  635. hash, number = header.Hash(), header.Number.Uint64()
  636. } else {
  637. hash, number = block.Hash(), block.NumberU64()
  638. }
  639. // Ensure the peer isn't DOSing us
  640. count := f.queues[peer] + 1
  641. if count > blockLimit {
  642. log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
  643. blockBroadcastDOSMeter.Mark(1)
  644. f.forgetHash(hash)
  645. return
  646. }
  647. // Discard any past or too distant blocks
  648. if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  649. log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
  650. blockBroadcastDropMeter.Mark(1)
  651. f.forgetHash(hash)
  652. return
  653. }
  654. // Schedule the block for future importing
  655. if _, ok := f.queued[hash]; !ok {
  656. op := &blockOrHeaderInject{origin: peer}
  657. if header != nil {
  658. op.header = header
  659. } else {
  660. op.block = block
  661. }
  662. f.queues[peer] = count
  663. f.queued[hash] = op
  664. f.queue.Push(op, -int64(number))
  665. if f.queueChangeHook != nil {
  666. f.queueChangeHook(hash, true)
  667. }
  668. log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
  669. }
  670. }
  671. // importHeaders spawns a new goroutine to run a header insertion into the chain.
  672. // If the header's number is at the same height as the current import phase, it
  673. // updates the phase states accordingly.
  674. func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
  675. hash := header.Hash()
  676. log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
  677. go func() {
  678. defer func() { f.done <- hash }()
  679. // If the parent's unknown, abort insertion
  680. parent := f.getHeader(header.ParentHash)
  681. if parent == nil {
  682. log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
  683. return
  684. }
  685. // Validate the header and if something went wrong, drop the peer
  686. if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
  687. log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  688. f.dropPeer(peer)
  689. return
  690. }
  691. // Run the actual import and log any issues
  692. if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
  693. log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  694. return
  695. }
  696. // Invoke the testing hook if needed
  697. if f.importedHook != nil {
  698. f.importedHook(header, nil)
  699. }
  700. }()
  701. }
  702. // importBlocks spawns a new goroutine to run a block insertion into the chain. If the
  703. // block's number is at the same height as the current import phase, it updates
  704. // the phase states accordingly.
  705. func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
  706. hash := block.Hash()
  707. // Run the import on a new thread
  708. log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
  709. go func() {
  710. defer func() { f.done <- hash }()
  711. // If the parent's unknown, abort insertion
  712. parent := f.getBlock(block.ParentHash())
  713. if parent == nil {
  714. log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
  715. return
  716. }
  717. // Quickly validate the header and propagate the block if it passes
  718. switch err := f.verifyHeader(block.Header()); err {
  719. case nil:
  720. // All ok, quickly propagate to our peers
  721. blockBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  722. go f.broadcastBlock(block, true)
  723. case consensus.ErrFutureBlock:
  724. // Weird future block, don't fail, but neither propagate
  725. default:
  726. // Something went very wrong, drop the peer
  727. log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  728. f.dropPeer(peer)
  729. return
  730. }
  731. // Run the actual import and log any issues
  732. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  733. log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  734. return
  735. }
  736. // If import succeeded, broadcast the block
  737. blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  738. go f.broadcastBlock(block, false)
  739. // Invoke the testing hook if needed
  740. if f.importedHook != nil {
  741. f.importedHook(nil, block)
  742. }
  743. }()
  744. }
  745. // forgetHash removes all traces of a block announcement from the fetcher's
  746. // internal state.
  747. func (f *BlockFetcher) forgetHash(hash common.Hash) {
  748. // Remove all pending announces and decrement DOS counters
  749. for _, announce := range f.announced[hash] {
  750. f.announces[announce.origin]--
  751. if f.announces[announce.origin] <= 0 {
  752. delete(f.announces, announce.origin)
  753. }
  754. }
  755. delete(f.announced, hash)
  756. if f.announceChangeHook != nil {
  757. f.announceChangeHook(hash, false)
  758. }
  759. // Remove any pending fetches and decrement the DOS counters
  760. if announce := f.fetching[hash]; announce != nil {
  761. f.announces[announce.origin]--
  762. if f.announces[announce.origin] <= 0 {
  763. delete(f.announces, announce.origin)
  764. }
  765. delete(f.fetching, hash)
  766. }
  767. // Remove any pending completion requests and decrement the DOS counters
  768. for _, announce := range f.fetched[hash] {
  769. f.announces[announce.origin]--
  770. if f.announces[announce.origin] <= 0 {
  771. delete(f.announces, announce.origin)
  772. }
  773. }
  774. delete(f.fetched, hash)
  775. // Remove any pending completions and decrement the DOS counters
  776. if announce := f.completing[hash]; announce != nil {
  777. f.announces[announce.origin]--
  778. if f.announces[announce.origin] <= 0 {
  779. delete(f.announces, announce.origin)
  780. }
  781. delete(f.completing, hash)
  782. }
  783. }
  784. // forgetBlock removes all traces of a queued block from the fetcher's internal
  785. // state.
  786. func (f *BlockFetcher) forgetBlock(hash common.Hash) {
  787. if insert := f.queued[hash]; insert != nil {
  788. f.queues[insert.origin]--
  789. if f.queues[insert.origin] == 0 {
  790. delete(f.queues, insert.origin)
  791. }
  792. delete(f.queued, hash)
  793. }
  794. }