block_fetcher.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package fetcher contains the announcement based header, blocks or transaction synchronisation.
  17. package fetcher
  18. import (
  19. "errors"
  20. "math/rand"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/consensus"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. "github.com/ethereum/go-ethereum/trie"
  30. )
  31. const (
  32. lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
  33. arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
  34. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
  35. fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
  36. )
  37. const (
  38. maxUncleDist = 7 // Maximum allowed backward distance from the chain head
  39. maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
  40. hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced
  41. blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
  42. )
  43. var (
  44. blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/in", nil)
  45. blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/announces/out", nil)
  46. blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/drop", nil)
  47. blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/announces/dos", nil)
  48. blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil)
  49. blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil)
  50. blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil)
  51. blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil)
  52. headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil)
  53. bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil)
  54. headerFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/in", nil)
  55. headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/headers/out", nil)
  56. bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/in", nil)
  57. bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
  58. )
  59. var errTerminated = errors.New("terminated")
  60. // HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
  61. type HeaderRetrievalFn func(common.Hash) *types.Header
  62. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  63. type blockRetrievalFn func(common.Hash) *types.Block
  64. // headerRequesterFn is a callback type for sending a header retrieval request.
  65. type headerRequesterFn func(common.Hash, chan *eth.Response) (*eth.Request, error)
  66. // bodyRequesterFn is a callback type for sending a body retrieval request.
  67. type bodyRequesterFn func([]common.Hash, chan *eth.Response) (*eth.Request, error)
  68. // headerVerifierFn is a callback type to verify a block's header for fast propagation.
  69. type headerVerifierFn func(header *types.Header) error
  70. // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
  71. type blockBroadcasterFn func(block *types.Block, propagate bool)
  72. // chainHeightFn is a callback type to retrieve the current chain height.
  73. type chainHeightFn func() uint64
  74. // headersInsertFn is a callback type to insert a batch of headers into the local chain.
  75. type headersInsertFn func(headers []*types.Header) (int, error)
  76. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  77. type chainInsertFn func(types.Blocks) (int, error)
  78. // peerDropFn is a callback type for dropping a peer detected as malicious.
  79. type peerDropFn func(id string)
  80. // blockAnnounce is the hash notification of the availability of a new block in the
  81. // network.
  82. type blockAnnounce struct {
  83. hash common.Hash // Hash of the block being announced
  84. number uint64 // Number of the block being announced (0 = unknown | old protocol)
  85. header *types.Header // Header of the block partially reassembled (new protocol)
  86. time time.Time // Timestamp of the announcement
  87. origin string // Identifier of the peer originating the notification
  88. fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
  89. fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
  90. }
  91. // headerFilterTask represents a batch of headers needing fetcher filtering.
  92. type headerFilterTask struct {
  93. peer string // The source peer of block headers
  94. headers []*types.Header // Collection of headers to filter
  95. time time.Time // Arrival time of the headers
  96. }
  97. // bodyFilterTask represents a batch of block bodies (transactions and uncles)
  98. // needing fetcher filtering.
  99. type bodyFilterTask struct {
  100. peer string // The source peer of block bodies
  101. transactions [][]*types.Transaction // Collection of transactions per block bodies
  102. uncles [][]*types.Header // Collection of uncles per block bodies
  103. time time.Time // Arrival time of the blocks' contents
  104. }
  105. // blockOrHeaderInject represents a schedules import operation.
  106. type blockOrHeaderInject struct {
  107. origin string
  108. header *types.Header // Used for light mode fetcher which only cares about header.
  109. block *types.Block // Used for normal mode fetcher which imports full block.
  110. }
  111. // number returns the block number of the injected object.
  112. func (inject *blockOrHeaderInject) number() uint64 {
  113. if inject.header != nil {
  114. return inject.header.Number.Uint64()
  115. }
  116. return inject.block.NumberU64()
  117. }
  118. // number returns the block hash of the injected object.
  119. func (inject *blockOrHeaderInject) hash() common.Hash {
  120. if inject.header != nil {
  121. return inject.header.Hash()
  122. }
  123. return inject.block.Hash()
  124. }
  125. // BlockFetcher is responsible for accumulating block announcements from various peers
  126. // and scheduling them for retrieval.
  127. type BlockFetcher struct {
  128. light bool // The indicator whether it's a light fetcher or normal one.
  129. // Various event channels
  130. notify chan *blockAnnounce
  131. inject chan *blockOrHeaderInject
  132. headerFilter chan chan *headerFilterTask
  133. bodyFilter chan chan *bodyFilterTask
  134. done chan common.Hash
  135. quit chan struct{}
  136. // Announce states
  137. announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
  138. announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
  139. fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching
  140. fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
  141. completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
  142. // Block cache
  143. queue *prque.Prque // Queue containing the import operations (block number sorted)
  144. queues map[string]int // Per peer block counts to prevent memory exhaustion
  145. queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
  146. // Callbacks
  147. getHeader HeaderRetrievalFn // Retrieves a header from the local chain
  148. getBlock blockRetrievalFn // Retrieves a block from the local chain
  149. verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
  150. broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
  151. chainHeight chainHeightFn // Retrieves the current chain's height
  152. insertHeaders headersInsertFn // Injects a batch of headers into the chain
  153. insertChain chainInsertFn // Injects a batch of blocks into the chain
  154. dropPeer peerDropFn // Drops a peer for misbehaving
  155. // Testing hooks
  156. announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
  157. queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
  158. fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
  159. completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
  160. importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)
  161. }
  162. // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
  163. func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
  164. return &BlockFetcher{
  165. light: light,
  166. notify: make(chan *blockAnnounce),
  167. inject: make(chan *blockOrHeaderInject),
  168. headerFilter: make(chan chan *headerFilterTask),
  169. bodyFilter: make(chan chan *bodyFilterTask),
  170. done: make(chan common.Hash),
  171. quit: make(chan struct{}),
  172. announces: make(map[string]int),
  173. announced: make(map[common.Hash][]*blockAnnounce),
  174. fetching: make(map[common.Hash]*blockAnnounce),
  175. fetched: make(map[common.Hash][]*blockAnnounce),
  176. completing: make(map[common.Hash]*blockAnnounce),
  177. queue: prque.New(nil),
  178. queues: make(map[string]int),
  179. queued: make(map[common.Hash]*blockOrHeaderInject),
  180. getHeader: getHeader,
  181. getBlock: getBlock,
  182. verifyHeader: verifyHeader,
  183. broadcastBlock: broadcastBlock,
  184. chainHeight: chainHeight,
  185. insertHeaders: insertHeaders,
  186. insertChain: insertChain,
  187. dropPeer: dropPeer,
  188. }
  189. }
  190. // Start boots up the announcement based synchroniser, accepting and processing
  191. // hash notifications and block fetches until termination requested.
  192. func (f *BlockFetcher) Start() {
  193. go f.loop()
  194. }
  195. // Stop terminates the announcement based synchroniser, canceling all pending
  196. // operations.
  197. func (f *BlockFetcher) Stop() {
  198. close(f.quit)
  199. }
  200. // Notify announces the fetcher of the potential availability of a new block in
  201. // the network.
  202. func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
  203. headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  204. block := &blockAnnounce{
  205. hash: hash,
  206. number: number,
  207. time: time,
  208. origin: peer,
  209. fetchHeader: headerFetcher,
  210. fetchBodies: bodyFetcher,
  211. }
  212. select {
  213. case f.notify <- block:
  214. return nil
  215. case <-f.quit:
  216. return errTerminated
  217. }
  218. }
  219. // Enqueue tries to fill gaps the fetcher's future import queue.
  220. func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
  221. op := &blockOrHeaderInject{
  222. origin: peer,
  223. block: block,
  224. }
  225. select {
  226. case f.inject <- op:
  227. return nil
  228. case <-f.quit:
  229. return errTerminated
  230. }
  231. }
  232. // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  233. // returning those that should be handled differently.
  234. func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
  235. log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
  236. // Send the filter channel to the fetcher
  237. filter := make(chan *headerFilterTask)
  238. select {
  239. case f.headerFilter <- filter:
  240. case <-f.quit:
  241. return nil
  242. }
  243. // Request the filtering of the header list
  244. select {
  245. case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
  246. case <-f.quit:
  247. return nil
  248. }
  249. // Retrieve the headers remaining after filtering
  250. select {
  251. case task := <-filter:
  252. return task.headers
  253. case <-f.quit:
  254. return nil
  255. }
  256. }
  257. // FilterBodies extracts all the block bodies that were explicitly requested by
  258. // the fetcher, returning those that should be handled differently.
  259. func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
  260. log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
  261. // Send the filter channel to the fetcher
  262. filter := make(chan *bodyFilterTask)
  263. select {
  264. case f.bodyFilter <- filter:
  265. case <-f.quit:
  266. return nil, nil
  267. }
  268. // Request the filtering of the body list
  269. select {
  270. case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
  271. case <-f.quit:
  272. return nil, nil
  273. }
  274. // Retrieve the bodies remaining after filtering
  275. select {
  276. case task := <-filter:
  277. return task.transactions, task.uncles
  278. case <-f.quit:
  279. return nil, nil
  280. }
  281. }
  282. // Loop is the main fetcher loop, checking and processing various notification
  283. // events.
  284. func (f *BlockFetcher) loop() {
  285. // Iterate the block fetching until a quit is requested
  286. var (
  287. fetchTimer = time.NewTimer(0)
  288. completeTimer = time.NewTimer(0)
  289. )
  290. <-fetchTimer.C // clear out the channel
  291. <-completeTimer.C
  292. defer fetchTimer.Stop()
  293. defer completeTimer.Stop()
  294. for {
  295. // Clean up any expired block fetches
  296. for hash, announce := range f.fetching {
  297. if time.Since(announce.time) > fetchTimeout {
  298. f.forgetHash(hash)
  299. }
  300. }
  301. // Import any queued blocks that could potentially fit
  302. height := f.chainHeight()
  303. for !f.queue.Empty() {
  304. op := f.queue.PopItem().(*blockOrHeaderInject)
  305. hash := op.hash()
  306. if f.queueChangeHook != nil {
  307. f.queueChangeHook(hash, false)
  308. }
  309. // If too high up the chain or phase, continue later
  310. number := op.number()
  311. if number > height+1 {
  312. f.queue.Push(op, -int64(number))
  313. if f.queueChangeHook != nil {
  314. f.queueChangeHook(hash, true)
  315. }
  316. break
  317. }
  318. // Otherwise if fresh and still unknown, try and import
  319. if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
  320. f.forgetBlock(hash)
  321. continue
  322. }
  323. if f.light {
  324. f.importHeaders(op.origin, op.header)
  325. } else {
  326. f.importBlocks(op.origin, op.block)
  327. }
  328. }
  329. // Wait for an outside event to occur
  330. select {
  331. case <-f.quit:
  332. // BlockFetcher terminating, abort all operations
  333. return
  334. case notification := <-f.notify:
  335. // A block was announced, make sure the peer isn't DOSing us
  336. blockAnnounceInMeter.Mark(1)
  337. count := f.announces[notification.origin] + 1
  338. if count > hashLimit {
  339. log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
  340. blockAnnounceDOSMeter.Mark(1)
  341. break
  342. }
  343. if notification.number == 0 {
  344. break
  345. }
  346. // If we have a valid block number, check that it's potentially useful
  347. if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  348. log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
  349. blockAnnounceDropMeter.Mark(1)
  350. break
  351. }
  352. // All is well, schedule the announce if block's not yet downloading
  353. if _, ok := f.fetching[notification.hash]; ok {
  354. break
  355. }
  356. if _, ok := f.completing[notification.hash]; ok {
  357. break
  358. }
  359. f.announces[notification.origin] = count
  360. f.announced[notification.hash] = append(f.announced[notification.hash], notification)
  361. if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
  362. f.announceChangeHook(notification.hash, true)
  363. }
  364. if len(f.announced) == 1 {
  365. f.rescheduleFetch(fetchTimer)
  366. }
  367. case op := <-f.inject:
  368. // A direct block insertion was requested, try and fill any pending gaps
  369. blockBroadcastInMeter.Mark(1)
  370. // Now only direct block injection is allowed, drop the header injection
  371. // here silently if we receive.
  372. if f.light {
  373. continue
  374. }
  375. f.enqueue(op.origin, nil, op.block)
  376. case hash := <-f.done:
  377. // A pending import finished, remove all traces of the notification
  378. f.forgetHash(hash)
  379. f.forgetBlock(hash)
  380. case <-fetchTimer.C:
  381. // At least one block's timer ran out, check for needing retrieval
  382. request := make(map[string][]common.Hash)
  383. for hash, announces := range f.announced {
  384. // In current LES protocol(les2/les3), only header announce is
  385. // available, no need to wait too much time for header broadcast.
  386. timeout := arriveTimeout - gatherSlack
  387. if f.light {
  388. timeout = 0
  389. }
  390. if time.Since(announces[0].time) > timeout {
  391. // Pick a random peer to retrieve from, reset all others
  392. announce := announces[rand.Intn(len(announces))]
  393. f.forgetHash(hash)
  394. // If the block still didn't arrive, queue for fetching
  395. if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
  396. request[announce.origin] = append(request[announce.origin], hash)
  397. f.fetching[hash] = announce
  398. }
  399. }
  400. }
  401. // Send out all block header requests
  402. for peer, hashes := range request {
  403. log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
  404. // Create a closure of the fetch and schedule in on a new thread
  405. fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
  406. go func(peer string) {
  407. if f.fetchingHook != nil {
  408. f.fetchingHook(hashes)
  409. }
  410. for _, hash := range hashes {
  411. headerFetchMeter.Mark(1)
  412. go func(hash common.Hash) {
  413. resCh := make(chan *eth.Response)
  414. req, err := fetchHeader(hash, resCh)
  415. if err != nil {
  416. return // Legacy code, yolo
  417. }
  418. defer req.Close()
  419. timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer
  420. defer timeout.Stop()
  421. select {
  422. case res := <-resCh:
  423. res.Done <- nil
  424. f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time))
  425. case <-timeout.C:
  426. // The peer didn't respond in time. The request
  427. // was already rescheduled at this point, we were
  428. // waiting for a catchup. With an unresponsive
  429. // peer however, it's a protocol violation.
  430. f.dropPeer(peer)
  431. }
  432. }(hash)
  433. }
  434. }(peer)
  435. }
  436. // Schedule the next fetch if blocks are still pending
  437. f.rescheduleFetch(fetchTimer)
  438. case <-completeTimer.C:
  439. // At least one header's timer ran out, retrieve everything
  440. request := make(map[string][]common.Hash)
  441. for hash, announces := range f.fetched {
  442. // Pick a random peer to retrieve from, reset all others
  443. announce := announces[rand.Intn(len(announces))]
  444. f.forgetHash(hash)
  445. // If the block still didn't arrive, queue for completion
  446. if f.getBlock(hash) == nil {
  447. request[announce.origin] = append(request[announce.origin], hash)
  448. f.completing[hash] = announce
  449. }
  450. }
  451. // Send out all block body requests
  452. for peer, hashes := range request {
  453. log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
  454. // Create a closure of the fetch and schedule in on a new thread
  455. if f.completingHook != nil {
  456. f.completingHook(hashes)
  457. }
  458. fetchBodies := f.completing[hashes[0]].fetchBodies
  459. bodyFetchMeter.Mark(int64(len(hashes)))
  460. go func(peer string, hashes []common.Hash) {
  461. resCh := make(chan *eth.Response)
  462. req, err := fetchBodies(hashes, resCh)
  463. if err != nil {
  464. return // Legacy code, yolo
  465. }
  466. defer req.Close()
  467. timeout := time.NewTimer(2 * fetchTimeout) // 2x leeway before dropping the peer
  468. defer timeout.Stop()
  469. select {
  470. case res := <-resCh:
  471. res.Done <- nil
  472. txs, uncles := res.Res.(*eth.BlockBodiesPacket).Unpack()
  473. f.FilterBodies(peer, txs, uncles, time.Now())
  474. case <-timeout.C:
  475. // The peer didn't respond in time. The request
  476. // was already rescheduled at this point, we were
  477. // waiting for a catchup. With an unresponsive
  478. // peer however, it's a protocol violation.
  479. f.dropPeer(peer)
  480. }
  481. }(peer, hashes)
  482. }
  483. // Schedule the next fetch if blocks are still pending
  484. f.rescheduleComplete(completeTimer)
  485. case filter := <-f.headerFilter:
  486. // Headers arrived from a remote peer. Extract those that were explicitly
  487. // requested by the fetcher, and return everything else so it's delivered
  488. // to other parts of the system.
  489. var task *headerFilterTask
  490. select {
  491. case task = <-filter:
  492. case <-f.quit:
  493. return
  494. }
  495. headerFilterInMeter.Mark(int64(len(task.headers)))
  496. // Split the batch of headers into unknown ones (to return to the caller),
  497. // known incomplete ones (requiring body retrievals) and completed blocks.
  498. unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
  499. for _, header := range task.headers {
  500. hash := header.Hash()
  501. // Filter fetcher-requested headers from other synchronisation algorithms
  502. if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
  503. // If the delivered header does not match the promised number, drop the announcer
  504. if header.Number.Uint64() != announce.number {
  505. log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
  506. f.dropPeer(announce.origin)
  507. f.forgetHash(hash)
  508. continue
  509. }
  510. // Collect all headers only if we are running in light
  511. // mode and the headers are not imported by other means.
  512. if f.light {
  513. if f.getHeader(hash) == nil {
  514. announce.header = header
  515. lightHeaders = append(lightHeaders, announce)
  516. }
  517. f.forgetHash(hash)
  518. continue
  519. }
  520. // Only keep if not imported by other means
  521. if f.getBlock(hash) == nil {
  522. announce.header = header
  523. announce.time = task.time
  524. // If the block is empty (header only), short circuit into the final import queue
  525. if header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash {
  526. log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  527. block := types.NewBlockWithHeader(header)
  528. block.ReceivedAt = task.time
  529. complete = append(complete, block)
  530. f.completing[hash] = announce
  531. continue
  532. }
  533. // Otherwise add to the list of blocks needing completion
  534. incomplete = append(incomplete, announce)
  535. } else {
  536. log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  537. f.forgetHash(hash)
  538. }
  539. } else {
  540. // BlockFetcher doesn't know about it, add to the return list
  541. unknown = append(unknown, header)
  542. }
  543. }
  544. headerFilterOutMeter.Mark(int64(len(unknown)))
  545. select {
  546. case filter <- &headerFilterTask{headers: unknown, time: task.time}:
  547. case <-f.quit:
  548. return
  549. }
  550. // Schedule the retrieved headers for body completion
  551. for _, announce := range incomplete {
  552. hash := announce.header.Hash()
  553. if _, ok := f.completing[hash]; ok {
  554. continue
  555. }
  556. f.fetched[hash] = append(f.fetched[hash], announce)
  557. if len(f.fetched) == 1 {
  558. f.rescheduleComplete(completeTimer)
  559. }
  560. }
  561. // Schedule the header for light fetcher import
  562. for _, announce := range lightHeaders {
  563. f.enqueue(announce.origin, announce.header, nil)
  564. }
  565. // Schedule the header-only blocks for import
  566. for _, block := range complete {
  567. if announce := f.completing[block.Hash()]; announce != nil {
  568. f.enqueue(announce.origin, nil, block)
  569. }
  570. }
  571. case filter := <-f.bodyFilter:
  572. // Block bodies arrived, extract any explicitly requested blocks, return the rest
  573. var task *bodyFilterTask
  574. select {
  575. case task = <-filter:
  576. case <-f.quit:
  577. return
  578. }
  579. bodyFilterInMeter.Mark(int64(len(task.transactions)))
  580. blocks := []*types.Block{}
  581. // abort early if there's nothing explicitly requested
  582. if len(f.completing) > 0 {
  583. for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
  584. // Match up a body to any possible completion request
  585. var (
  586. matched = false
  587. uncleHash common.Hash // calculated lazily and reused
  588. txnHash common.Hash // calculated lazily and reused
  589. )
  590. for hash, announce := range f.completing {
  591. if f.queued[hash] != nil || announce.origin != task.peer {
  592. continue
  593. }
  594. if uncleHash == (common.Hash{}) {
  595. uncleHash = types.CalcUncleHash(task.uncles[i])
  596. }
  597. if uncleHash != announce.header.UncleHash {
  598. continue
  599. }
  600. if txnHash == (common.Hash{}) {
  601. txnHash = types.DeriveSha(types.Transactions(task.transactions[i]), trie.NewStackTrie(nil))
  602. }
  603. if txnHash != announce.header.TxHash {
  604. continue
  605. }
  606. // Mark the body matched, reassemble if still unknown
  607. matched = true
  608. if f.getBlock(hash) == nil {
  609. block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
  610. block.ReceivedAt = task.time
  611. blocks = append(blocks, block)
  612. } else {
  613. f.forgetHash(hash)
  614. }
  615. }
  616. if matched {
  617. task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
  618. task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
  619. i--
  620. continue
  621. }
  622. }
  623. }
  624. bodyFilterOutMeter.Mark(int64(len(task.transactions)))
  625. select {
  626. case filter <- task:
  627. case <-f.quit:
  628. return
  629. }
  630. // Schedule the retrieved blocks for ordered import
  631. for _, block := range blocks {
  632. if announce := f.completing[block.Hash()]; announce != nil {
  633. f.enqueue(announce.origin, nil, block)
  634. }
  635. }
  636. }
  637. }
  638. }
  639. // rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
  640. func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
  641. // Short circuit if no blocks are announced
  642. if len(f.announced) == 0 {
  643. return
  644. }
  645. // Schedule announcement retrieval quickly for light mode
  646. // since server won't send any headers to client.
  647. if f.light {
  648. fetch.Reset(lightTimeout)
  649. return
  650. }
  651. // Otherwise find the earliest expiring announcement
  652. earliest := time.Now()
  653. for _, announces := range f.announced {
  654. if earliest.After(announces[0].time) {
  655. earliest = announces[0].time
  656. }
  657. }
  658. fetch.Reset(arriveTimeout - time.Since(earliest))
  659. }
  660. // rescheduleComplete resets the specified completion timer to the next fetch timeout.
  661. func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
  662. // Short circuit if no headers are fetched
  663. if len(f.fetched) == 0 {
  664. return
  665. }
  666. // Otherwise find the earliest expiring announcement
  667. earliest := time.Now()
  668. for _, announces := range f.fetched {
  669. if earliest.After(announces[0].time) {
  670. earliest = announces[0].time
  671. }
  672. }
  673. complete.Reset(gatherSlack - time.Since(earliest))
  674. }
  675. // enqueue schedules a new header or block import operation, if the component
  676. // to be imported has not yet been seen.
  677. func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) {
  678. var (
  679. hash common.Hash
  680. number uint64
  681. )
  682. if header != nil {
  683. hash, number = header.Hash(), header.Number.Uint64()
  684. } else {
  685. hash, number = block.Hash(), block.NumberU64()
  686. }
  687. // Ensure the peer isn't DOSing us
  688. count := f.queues[peer] + 1
  689. if count > blockLimit {
  690. log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
  691. blockBroadcastDOSMeter.Mark(1)
  692. f.forgetHash(hash)
  693. return
  694. }
  695. // Discard any past or too distant blocks
  696. if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  697. log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
  698. blockBroadcastDropMeter.Mark(1)
  699. f.forgetHash(hash)
  700. return
  701. }
  702. // Schedule the block for future importing
  703. if _, ok := f.queued[hash]; !ok {
  704. op := &blockOrHeaderInject{origin: peer}
  705. if header != nil {
  706. op.header = header
  707. } else {
  708. op.block = block
  709. }
  710. f.queues[peer] = count
  711. f.queued[hash] = op
  712. f.queue.Push(op, -int64(number))
  713. if f.queueChangeHook != nil {
  714. f.queueChangeHook(hash, true)
  715. }
  716. log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
  717. }
  718. }
  719. // importHeaders spawns a new goroutine to run a header insertion into the chain.
  720. // If the header's number is at the same height as the current import phase, it
  721. // updates the phase states accordingly.
  722. func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
  723. hash := header.Hash()
  724. log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
  725. go func() {
  726. defer func() { f.done <- hash }()
  727. // If the parent's unknown, abort insertion
  728. parent := f.getHeader(header.ParentHash)
  729. if parent == nil {
  730. log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
  731. return
  732. }
  733. // Validate the header and if something went wrong, drop the peer
  734. if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
  735. log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  736. f.dropPeer(peer)
  737. return
  738. }
  739. // Run the actual import and log any issues
  740. if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
  741. log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
  742. return
  743. }
  744. // Invoke the testing hook if needed
  745. if f.importedHook != nil {
  746. f.importedHook(header, nil)
  747. }
  748. }()
  749. }
  750. // importBlocks spawns a new goroutine to run a block insertion into the chain. If the
  751. // block's number is at the same height as the current import phase, it updates
  752. // the phase states accordingly.
  753. func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
  754. hash := block.Hash()
  755. // Run the import on a new thread
  756. log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
  757. go func() {
  758. defer func() { f.done <- hash }()
  759. // If the parent's unknown, abort insertion
  760. parent := f.getBlock(block.ParentHash())
  761. if parent == nil {
  762. log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
  763. return
  764. }
  765. // Quickly validate the header and propagate the block if it passes
  766. switch err := f.verifyHeader(block.Header()); err {
  767. case nil:
  768. // All ok, quickly propagate to our peers
  769. blockBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  770. go f.broadcastBlock(block, true)
  771. case consensus.ErrFutureBlock:
  772. // Weird future block, don't fail, but neither propagate
  773. default:
  774. // Something went very wrong, drop the peer
  775. log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  776. f.dropPeer(peer)
  777. return
  778. }
  779. // Run the actual import and log any issues
  780. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  781. log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  782. return
  783. }
  784. // If import succeeded, broadcast the block
  785. blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  786. go f.broadcastBlock(block, false)
  787. // Invoke the testing hook if needed
  788. if f.importedHook != nil {
  789. f.importedHook(nil, block)
  790. }
  791. }()
  792. }
  793. // forgetHash removes all traces of a block announcement from the fetcher's
  794. // internal state.
  795. func (f *BlockFetcher) forgetHash(hash common.Hash) {
  796. // Remove all pending announces and decrement DOS counters
  797. if announceMap, ok := f.announced[hash]; ok {
  798. for _, announce := range announceMap {
  799. f.announces[announce.origin]--
  800. if f.announces[announce.origin] <= 0 {
  801. delete(f.announces, announce.origin)
  802. }
  803. }
  804. delete(f.announced, hash)
  805. if f.announceChangeHook != nil {
  806. f.announceChangeHook(hash, false)
  807. }
  808. }
  809. // Remove any pending fetches and decrement the DOS counters
  810. if announce := f.fetching[hash]; announce != nil {
  811. f.announces[announce.origin]--
  812. if f.announces[announce.origin] <= 0 {
  813. delete(f.announces, announce.origin)
  814. }
  815. delete(f.fetching, hash)
  816. }
  817. // Remove any pending completion requests and decrement the DOS counters
  818. for _, announce := range f.fetched[hash] {
  819. f.announces[announce.origin]--
  820. if f.announces[announce.origin] <= 0 {
  821. delete(f.announces, announce.origin)
  822. }
  823. }
  824. delete(f.fetched, hash)
  825. // Remove any pending completions and decrement the DOS counters
  826. if announce := f.completing[hash]; announce != nil {
  827. f.announces[announce.origin]--
  828. if f.announces[announce.origin] <= 0 {
  829. delete(f.announces, announce.origin)
  830. }
  831. delete(f.completing, hash)
  832. }
  833. }
  834. // forgetBlock removes all traces of a queued block from the fetcher's internal
  835. // state.
  836. func (f *BlockFetcher) forgetBlock(hash common.Hash) {
  837. if insert := f.queued[hash]; insert != nil {
  838. f.queues[insert.origin]--
  839. if f.queues[insert.origin] == 0 {
  840. delete(f.queues, insert.origin)
  841. }
  842. delete(f.queued, hash)
  843. }
  844. }