fetcher.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package fetcher contains the block announcement based synchonisation.
  17. package fetcher
  18. import (
  19. "errors"
  20. "fmt"
  21. "math/rand"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/core"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/logger"
  27. "github.com/ethereum/go-ethereum/logger/glog"
  28. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  29. )
  30. const (
  31. arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
  32. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
  33. fetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
  34. maxUncleDist = 7 // Maximum allowed backward distance from the chain head
  35. maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
  36. hashLimit = 256 // Maximum number of unique blocks a peer may have announced
  37. blockLimit = 64 // Maximum number of unique blocks a per may have delivered
  38. )
  39. var (
  40. errTerminated = errors.New("terminated")
  41. )
  42. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  43. type blockRetrievalFn func(common.Hash) *types.Block
  44. // blockRequesterFn is a callback type for sending a block retrieval request.
  45. type blockRequesterFn func([]common.Hash) error
  46. // headerRequesterFn is a callback type for sending a header retrieval request.
  47. type headerRequesterFn func(common.Hash) error
  48. // bodyRequesterFn is a callback type for sending a body retrieval request.
  49. type bodyRequesterFn func([]common.Hash) error
  50. // blockValidatorFn is a callback type to verify a block's header for fast propagation.
  51. type blockValidatorFn func(block *types.Block, parent *types.Block) error
  52. // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
  53. type blockBroadcasterFn func(block *types.Block, propagate bool)
  54. // chainHeightFn is a callback type to retrieve the current chain height.
  55. type chainHeightFn func() uint64
  56. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  57. type chainInsertFn func(types.Blocks) (int, error)
  58. // peerDropFn is a callback type for dropping a peer detected as malicious.
  59. type peerDropFn func(id string)
  60. // announce is the hash notification of the availability of a new block in the
  61. // network.
  62. type announce struct {
  63. hash common.Hash // Hash of the block being announced
  64. number uint64 // Number of the block being announced (0 = unknown | old protocol)
  65. header *types.Header // Header of the block partially reassembled (new protocol)
  66. time time.Time // Timestamp of the announcement
  67. origin string // Identifier of the peer originating the notification
  68. fetch61 blockRequesterFn // [eth/61] Fetcher function to retrieve an announced block
  69. fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block
  70. fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block
  71. }
  72. // headerFilterTask represents a batch of headers needing fetcher filtering.
  73. type headerFilterTask struct {
  74. headers []*types.Header // Collection of headers to filter
  75. time time.Time // Arrival time of the headers
  76. }
  77. // headerFilterTask represents a batch of block bodies (transactions and uncles)
  78. // needing fetcher filtering.
  79. type bodyFilterTask struct {
  80. transactions [][]*types.Transaction // Collection of transactions per block bodies
  81. uncles [][]*types.Header // Collection of uncles per block bodies
  82. time time.Time // Arrival time of the blocks' contents
  83. }
  84. // inject represents a schedules import operation.
  85. type inject struct {
  86. origin string
  87. block *types.Block
  88. }
  89. // Fetcher is responsible for accumulating block announcements from various peers
  90. // and scheduling them for retrieval.
  91. type Fetcher struct {
  92. // Various event channels
  93. notify chan *announce
  94. inject chan *inject
  95. blockFilter chan chan []*types.Block
  96. headerFilter chan chan *headerFilterTask
  97. bodyFilter chan chan *bodyFilterTask
  98. done chan common.Hash
  99. quit chan struct{}
  100. // Announce states
  101. announces map[string]int // Per peer announce counts to prevent memory exhaustion
  102. announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
  103. fetching map[common.Hash]*announce // Announced blocks, currently fetching
  104. fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
  105. completing map[common.Hash]*announce // Blocks with headers, currently body-completing
  106. // Block cache
  107. queue *prque.Prque // Queue containing the import operations (block number sorted)
  108. queues map[string]int // Per peer block counts to prevent memory exhaustion
  109. queued map[common.Hash]*inject // Set of already queued blocks (to dedup imports)
  110. // Callbacks
  111. getBlock blockRetrievalFn // Retrieves a block from the local chain
  112. validateBlock blockValidatorFn // Checks if a block's headers have a valid proof of work
  113. broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
  114. chainHeight chainHeightFn // Retrieves the current chain's height
  115. insertChain chainInsertFn // Injects a batch of blocks into the chain
  116. dropPeer peerDropFn // Drops a peer for misbehaving
  117. // Testing hooks
  118. announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
  119. queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
  120. fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
  121. completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
  122. importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
  123. }
  124. // New creates a block fetcher to retrieve blocks based on hash announcements.
  125. func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
  126. return &Fetcher{
  127. notify: make(chan *announce),
  128. inject: make(chan *inject),
  129. blockFilter: make(chan chan []*types.Block),
  130. headerFilter: make(chan chan *headerFilterTask),
  131. bodyFilter: make(chan chan *bodyFilterTask),
  132. done: make(chan common.Hash),
  133. quit: make(chan struct{}),
  134. announces: make(map[string]int),
  135. announced: make(map[common.Hash][]*announce),
  136. fetching: make(map[common.Hash]*announce),
  137. fetched: make(map[common.Hash][]*announce),
  138. completing: make(map[common.Hash]*announce),
  139. queue: prque.New(),
  140. queues: make(map[string]int),
  141. queued: make(map[common.Hash]*inject),
  142. getBlock: getBlock,
  143. validateBlock: validateBlock,
  144. broadcastBlock: broadcastBlock,
  145. chainHeight: chainHeight,
  146. insertChain: insertChain,
  147. dropPeer: dropPeer,
  148. }
  149. }
  150. // Start boots up the announcement based synchoniser, accepting and processing
  151. // hash notifications and block fetches until termination requested.
  152. func (f *Fetcher) Start() {
  153. go f.loop()
  154. }
  155. // Stop terminates the announcement based synchroniser, canceling all pending
  156. // operations.
  157. func (f *Fetcher) Stop() {
  158. close(f.quit)
  159. }
  160. // Notify announces the fetcher of the potential availability of a new block in
  161. // the network.
  162. func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
  163. blockFetcher blockRequesterFn, // eth/61 specific whole block fetcher
  164. headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  165. block := &announce{
  166. hash: hash,
  167. number: number,
  168. time: time,
  169. origin: peer,
  170. fetch61: blockFetcher,
  171. fetchHeader: headerFetcher,
  172. fetchBodies: bodyFetcher,
  173. }
  174. select {
  175. case f.notify <- block:
  176. return nil
  177. case <-f.quit:
  178. return errTerminated
  179. }
  180. }
  181. // Enqueue tries to fill gaps the the fetcher's future import queue.
  182. func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
  183. op := &inject{
  184. origin: peer,
  185. block: block,
  186. }
  187. select {
  188. case f.inject <- op:
  189. return nil
  190. case <-f.quit:
  191. return errTerminated
  192. }
  193. }
  194. // FilterBlocks extracts all the blocks that were explicitly requested by the fetcher,
  195. // returning those that should be handled differently.
  196. func (f *Fetcher) FilterBlocks(blocks types.Blocks) types.Blocks {
  197. glog.V(logger.Detail).Infof("[eth/61] filtering %d blocks", len(blocks))
  198. // Send the filter channel to the fetcher
  199. filter := make(chan []*types.Block)
  200. select {
  201. case f.blockFilter <- filter:
  202. case <-f.quit:
  203. return nil
  204. }
  205. // Request the filtering of the block list
  206. select {
  207. case filter <- blocks:
  208. case <-f.quit:
  209. return nil
  210. }
  211. // Retrieve the blocks remaining after filtering
  212. select {
  213. case blocks := <-filter:
  214. return blocks
  215. case <-f.quit:
  216. return nil
  217. }
  218. }
  219. // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  220. // returning those that should be handled differently.
  221. func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
  222. glog.V(logger.Detail).Infof("[eth/62] filtering %d headers", len(headers))
  223. // Send the filter channel to the fetcher
  224. filter := make(chan *headerFilterTask)
  225. select {
  226. case f.headerFilter <- filter:
  227. case <-f.quit:
  228. return nil
  229. }
  230. // Request the filtering of the header list
  231. select {
  232. case filter <- &headerFilterTask{headers: headers, time: time}:
  233. case <-f.quit:
  234. return nil
  235. }
  236. // Retrieve the headers remaining after filtering
  237. select {
  238. case task := <-filter:
  239. return task.headers
  240. case <-f.quit:
  241. return nil
  242. }
  243. }
  244. // FilterBodies extracts all the block bodies that were explicitly requested by
  245. // the fetcher, returning those that should be handled differently.
  246. func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
  247. glog.V(logger.Detail).Infof("[eth/62] filtering %d:%d bodies", len(transactions), len(uncles))
  248. // Send the filter channel to the fetcher
  249. filter := make(chan *bodyFilterTask)
  250. select {
  251. case f.bodyFilter <- filter:
  252. case <-f.quit:
  253. return nil, nil
  254. }
  255. // Request the filtering of the body list
  256. select {
  257. case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}:
  258. case <-f.quit:
  259. return nil, nil
  260. }
  261. // Retrieve the bodies remaining after filtering
  262. select {
  263. case task := <-filter:
  264. return task.transactions, task.uncles
  265. case <-f.quit:
  266. return nil, nil
  267. }
  268. }
  269. // Loop is the main fetcher loop, checking and processing various notification
  270. // events.
  271. func (f *Fetcher) loop() {
  272. // Iterate the block fetching until a quit is requested
  273. fetchTimer := time.NewTimer(0)
  274. completeTimer := time.NewTimer(0)
  275. for {
  276. // Clean up any expired block fetches
  277. for hash, announce := range f.fetching {
  278. if time.Since(announce.time) > fetchTimeout {
  279. f.forgetHash(hash)
  280. }
  281. }
  282. // Import any queued blocks that could potentially fit
  283. height := f.chainHeight()
  284. for !f.queue.Empty() {
  285. op := f.queue.PopItem().(*inject)
  286. if f.queueChangeHook != nil {
  287. f.queueChangeHook(op.block.Hash(), false)
  288. }
  289. // If too high up the chain or phase, continue later
  290. number := op.block.NumberU64()
  291. if number > height+1 {
  292. f.queue.Push(op, -float32(op.block.NumberU64()))
  293. if f.queueChangeHook != nil {
  294. f.queueChangeHook(op.block.Hash(), true)
  295. }
  296. break
  297. }
  298. // Otherwise if fresh and still unknown, try and import
  299. hash := op.block.Hash()
  300. if number+maxUncleDist < height || f.getBlock(hash) != nil {
  301. f.forgetBlock(hash)
  302. continue
  303. }
  304. f.insert(op.origin, op.block)
  305. }
  306. // Wait for an outside event to occur
  307. select {
  308. case <-f.quit:
  309. // Fetcher terminating, abort all operations
  310. return
  311. case notification := <-f.notify:
  312. // A block was announced, make sure the peer isn't DOSing us
  313. propAnnounceInMeter.Mark(1)
  314. count := f.announces[notification.origin] + 1
  315. if count > hashLimit {
  316. glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)
  317. propAnnounceDOSMeter.Mark(1)
  318. break
  319. }
  320. // If we have a valid block number, check that it's potentially useful
  321. if notification.number > 0 {
  322. if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  323. glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)
  324. propAnnounceDropMeter.Mark(1)
  325. break
  326. }
  327. }
  328. // All is well, schedule the announce if block's not yet downloading
  329. if _, ok := f.fetching[notification.hash]; ok {
  330. break
  331. }
  332. if _, ok := f.completing[notification.hash]; ok {
  333. break
  334. }
  335. f.announces[notification.origin] = count
  336. f.announced[notification.hash] = append(f.announced[notification.hash], notification)
  337. if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
  338. f.announceChangeHook(notification.hash, true)
  339. }
  340. if len(f.announced) == 1 {
  341. f.rescheduleFetch(fetchTimer)
  342. }
  343. case op := <-f.inject:
  344. // A direct block insertion was requested, try and fill any pending gaps
  345. propBroadcastInMeter.Mark(1)
  346. f.enqueue(op.origin, op.block)
  347. case hash := <-f.done:
  348. // A pending import finished, remove all traces of the notification
  349. f.forgetHash(hash)
  350. f.forgetBlock(hash)
  351. case <-fetchTimer.C:
  352. // At least one block's timer ran out, check for needing retrieval
  353. request := make(map[string][]common.Hash)
  354. for hash, announces := range f.announced {
  355. if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
  356. // Pick a random peer to retrieve from, reset all others
  357. announce := announces[rand.Intn(len(announces))]
  358. f.forgetHash(hash)
  359. // If the block still didn't arrive, queue for fetching
  360. if f.getBlock(hash) == nil {
  361. request[announce.origin] = append(request[announce.origin], hash)
  362. f.fetching[hash] = announce
  363. }
  364. }
  365. }
  366. // Send out all block (eth/61) or header (eth/62) requests
  367. for peer, hashes := range request {
  368. if glog.V(logger.Detail) && len(hashes) > 0 {
  369. list := "["
  370. for _, hash := range hashes {
  371. list += fmt.Sprintf("%x…, ", hash[:4])
  372. }
  373. list = list[:len(list)-2] + "]"
  374. if f.fetching[hashes[0]].fetch61 != nil {
  375. glog.V(logger.Detail).Infof("[eth/61] Peer %s: fetching blocks %s", peer, list)
  376. } else {
  377. glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching headers %s", peer, list)
  378. }
  379. }
  380. // Create a closure of the fetch and schedule in on a new thread
  381. fetchBlocks, fetchHeader, hashes := f.fetching[hashes[0]].fetch61, f.fetching[hashes[0]].fetchHeader, hashes
  382. go func() {
  383. if f.fetchingHook != nil {
  384. f.fetchingHook(hashes)
  385. }
  386. if fetchBlocks != nil {
  387. // Use old eth/61 protocol to retrieve whole blocks
  388. blockFetchMeter.Mark(int64(len(hashes)))
  389. fetchBlocks(hashes)
  390. } else {
  391. // Use new eth/62 protocol to retrieve headers first
  392. for _, hash := range hashes {
  393. headerFetchMeter.Mark(1)
  394. fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
  395. }
  396. }
  397. }()
  398. }
  399. // Schedule the next fetch if blocks are still pending
  400. f.rescheduleFetch(fetchTimer)
  401. case <-completeTimer.C:
  402. // At least one header's timer ran out, retrieve everything
  403. request := make(map[string][]common.Hash)
  404. for hash, announces := range f.fetched {
  405. // Pick a random peer to retrieve from, reset all others
  406. announce := announces[rand.Intn(len(announces))]
  407. f.forgetHash(hash)
  408. // If the block still didn't arrive, queue for completion
  409. if f.getBlock(hash) == nil {
  410. request[announce.origin] = append(request[announce.origin], hash)
  411. f.completing[hash] = announce
  412. }
  413. }
  414. // Send out all block body requests
  415. for peer, hashes := range request {
  416. if glog.V(logger.Detail) && len(hashes) > 0 {
  417. list := "["
  418. for _, hash := range hashes {
  419. list += fmt.Sprintf("%x…, ", hash[:4])
  420. }
  421. list = list[:len(list)-2] + "]"
  422. glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching bodies %s", peer, list)
  423. }
  424. // Create a closure of the fetch and schedule in on a new thread
  425. if f.completingHook != nil {
  426. f.completingHook(hashes)
  427. }
  428. bodyFetchMeter.Mark(int64(len(hashes)))
  429. go f.completing[hashes[0]].fetchBodies(hashes)
  430. }
  431. // Schedule the next fetch if blocks are still pending
  432. f.rescheduleComplete(completeTimer)
  433. case filter := <-f.blockFilter:
  434. // Blocks arrived, extract any explicit fetches, return all else
  435. var blocks types.Blocks
  436. select {
  437. case blocks = <-filter:
  438. case <-f.quit:
  439. return
  440. }
  441. blockFilterInMeter.Mark(int64(len(blocks)))
  442. explicit, download := []*types.Block{}, []*types.Block{}
  443. for _, block := range blocks {
  444. hash := block.Hash()
  445. // Filter explicitly requested blocks from hash announcements
  446. if f.fetching[hash] != nil && f.queued[hash] == nil {
  447. // Discard if already imported by other means
  448. if f.getBlock(hash) == nil {
  449. explicit = append(explicit, block)
  450. } else {
  451. f.forgetHash(hash)
  452. }
  453. } else {
  454. download = append(download, block)
  455. }
  456. }
  457. blockFilterOutMeter.Mark(int64(len(download)))
  458. select {
  459. case filter <- download:
  460. case <-f.quit:
  461. return
  462. }
  463. // Schedule the retrieved blocks for ordered import
  464. for _, block := range explicit {
  465. if announce := f.fetching[block.Hash()]; announce != nil {
  466. f.enqueue(announce.origin, block)
  467. }
  468. }
  469. case filter := <-f.headerFilter:
  470. // Headers arrived from a remote peer. Extract those that were explicitly
  471. // requested by the fetcher, and return everything else so it's delivered
  472. // to other parts of the system.
  473. var task *headerFilterTask
  474. select {
  475. case task = <-filter:
  476. case <-f.quit:
  477. return
  478. }
  479. headerFilterInMeter.Mark(int64(len(task.headers)))
  480. // Split the batch of headers into unknown ones (to return to the caller),
  481. // known incomplete ones (requiring body retrievals) and completed blocks.
  482. unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
  483. for _, header := range task.headers {
  484. hash := header.Hash()
  485. // Filter fetcher-requested headers from other synchronisation algorithms
  486. if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
  487. // If the delivered header does not match the promised number, drop the announcer
  488. if header.Number.Uint64() != announce.number {
  489. glog.V(logger.Detail).Infof("[eth/62] Peer %s: invalid block number for [%x…]: announced %d, provided %d", announce.origin, header.Hash().Bytes()[:4], announce.number, header.Number.Uint64())
  490. f.dropPeer(announce.origin)
  491. f.forgetHash(hash)
  492. continue
  493. }
  494. // Only keep if not imported by other means
  495. if f.getBlock(hash) == nil {
  496. announce.header = header
  497. announce.time = task.time
  498. // If the block is empty (header only), short circuit into the final import queue
  499. if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
  500. glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
  501. block := types.NewBlockWithHeader(header)
  502. block.ReceivedAt = task.time
  503. complete = append(complete, block)
  504. f.completing[hash] = announce
  505. continue
  506. }
  507. // Otherwise add to the list of blocks needing completion
  508. incomplete = append(incomplete, announce)
  509. } else {
  510. glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] already imported, discarding header", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
  511. f.forgetHash(hash)
  512. }
  513. } else {
  514. // Fetcher doesn't know about it, add to the return list
  515. unknown = append(unknown, header)
  516. }
  517. }
  518. headerFilterOutMeter.Mark(int64(len(unknown)))
  519. select {
  520. case filter <- &headerFilterTask{headers: unknown, time: task.time}:
  521. case <-f.quit:
  522. return
  523. }
  524. // Schedule the retrieved headers for body completion
  525. for _, announce := range incomplete {
  526. hash := announce.header.Hash()
  527. if _, ok := f.completing[hash]; ok {
  528. continue
  529. }
  530. f.fetched[hash] = append(f.fetched[hash], announce)
  531. if len(f.fetched) == 1 {
  532. f.rescheduleComplete(completeTimer)
  533. }
  534. }
  535. // Schedule the header-only blocks for import
  536. for _, block := range complete {
  537. if announce := f.completing[block.Hash()]; announce != nil {
  538. f.enqueue(announce.origin, block)
  539. }
  540. }
  541. case filter := <-f.bodyFilter:
  542. // Block bodies arrived, extract any explicitly requested blocks, return the rest
  543. var task *bodyFilterTask
  544. select {
  545. case task = <-filter:
  546. case <-f.quit:
  547. return
  548. }
  549. bodyFilterInMeter.Mark(int64(len(task.transactions)))
  550. blocks := []*types.Block{}
  551. for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
  552. // Match up a body to any possible completion request
  553. matched := false
  554. for hash, announce := range f.completing {
  555. if f.queued[hash] == nil {
  556. txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
  557. uncleHash := types.CalcUncleHash(task.uncles[i])
  558. if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash {
  559. // Mark the body matched, reassemble if still unknown
  560. matched = true
  561. if f.getBlock(hash) == nil {
  562. block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
  563. block.ReceivedAt = task.time
  564. blocks = append(blocks, block)
  565. } else {
  566. f.forgetHash(hash)
  567. }
  568. }
  569. }
  570. }
  571. if matched {
  572. task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
  573. task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
  574. i--
  575. continue
  576. }
  577. }
  578. bodyFilterOutMeter.Mark(int64(len(task.transactions)))
  579. select {
  580. case filter <- task:
  581. case <-f.quit:
  582. return
  583. }
  584. // Schedule the retrieved blocks for ordered import
  585. for _, block := range blocks {
  586. if announce := f.completing[block.Hash()]; announce != nil {
  587. f.enqueue(announce.origin, block)
  588. }
  589. }
  590. }
  591. }
  592. }
  593. // rescheduleFetch resets the specified fetch timer to the next announce timeout.
  594. func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
  595. // Short circuit if no blocks are announced
  596. if len(f.announced) == 0 {
  597. return
  598. }
  599. // Otherwise find the earliest expiring announcement
  600. earliest := time.Now()
  601. for _, announces := range f.announced {
  602. if earliest.After(announces[0].time) {
  603. earliest = announces[0].time
  604. }
  605. }
  606. fetch.Reset(arriveTimeout - time.Since(earliest))
  607. }
  608. // rescheduleComplete resets the specified completion timer to the next fetch timeout.
  609. func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
  610. // Short circuit if no headers are fetched
  611. if len(f.fetched) == 0 {
  612. return
  613. }
  614. // Otherwise find the earliest expiring announcement
  615. earliest := time.Now()
  616. for _, announces := range f.fetched {
  617. if earliest.After(announces[0].time) {
  618. earliest = announces[0].time
  619. }
  620. }
  621. complete.Reset(gatherSlack - time.Since(earliest))
  622. }
  623. // enqueue schedules a new future import operation, if the block to be imported
  624. // has not yet been seen.
  625. func (f *Fetcher) enqueue(peer string, block *types.Block) {
  626. hash := block.Hash()
  627. // Ensure the peer isn't DOSing us
  628. count := f.queues[peer] + 1
  629. if count > blockLimit {
  630. glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
  631. propBroadcastDOSMeter.Mark(1)
  632. f.forgetHash(hash)
  633. return
  634. }
  635. // Discard any past or too distant blocks
  636. if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  637. glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
  638. propBroadcastDropMeter.Mark(1)
  639. f.forgetHash(hash)
  640. return
  641. }
  642. // Schedule the block for future importing
  643. if _, ok := f.queued[hash]; !ok {
  644. op := &inject{
  645. origin: peer,
  646. block: block,
  647. }
  648. f.queues[peer] = count
  649. f.queued[hash] = op
  650. f.queue.Push(op, -float32(block.NumberU64()))
  651. if f.queueChangeHook != nil {
  652. f.queueChangeHook(op.block.Hash(), true)
  653. }
  654. if glog.V(logger.Debug) {
  655. glog.Infof("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size())
  656. }
  657. }
  658. }
  659. // insert spawns a new goroutine to run a block insertion into the chain. If the
  660. // block's number is at the same height as the current import phase, if updates
  661. // the phase states accordingly.
  662. func (f *Fetcher) insert(peer string, block *types.Block) {
  663. hash := block.Hash()
  664. // Run the import on a new thread
  665. glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x…]", peer, block.NumberU64(), hash[:4])
  666. go func() {
  667. defer func() { f.done <- hash }()
  668. // If the parent's unknown, abort insertion
  669. parent := f.getBlock(block.ParentHash())
  670. if parent == nil {
  671. glog.V(logger.Debug).Infof("Peer %s: parent []%x] of block #%d [%x…] unknown", block.ParentHash().Bytes()[:4], peer, block.NumberU64(), hash[:4])
  672. return
  673. }
  674. // Quickly validate the header and propagate the block if it passes
  675. switch err := f.validateBlock(block, parent); err {
  676. case nil:
  677. // All ok, quickly propagate to our peers
  678. propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  679. go f.broadcastBlock(block, true)
  680. case core.BlockFutureErr:
  681. // Weird future block, don't fail, but neither propagate
  682. default:
  683. // Something went very wrong, drop the peer
  684. glog.V(logger.Debug).Infof("Peer %s: block #%d [%x…] verification failed: %v", peer, block.NumberU64(), hash[:4], err)
  685. f.dropPeer(peer)
  686. return
  687. }
  688. // Run the actual import and log any issues
  689. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  690. glog.V(logger.Warn).Infof("Peer %s: block #%d [%x…] import failed: %v", peer, block.NumberU64(), hash[:4], err)
  691. return
  692. }
  693. // If import succeeded, broadcast the block
  694. propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  695. go f.broadcastBlock(block, false)
  696. // Invoke the testing hook if needed
  697. if f.importedHook != nil {
  698. f.importedHook(block)
  699. }
  700. }()
  701. }
  702. // forgetHash removes all traces of a block announcement from the fetcher's
  703. // internal state.
  704. func (f *Fetcher) forgetHash(hash common.Hash) {
  705. // Remove all pending announces and decrement DOS counters
  706. for _, announce := range f.announced[hash] {
  707. f.announces[announce.origin]--
  708. if f.announces[announce.origin] == 0 {
  709. delete(f.announces, announce.origin)
  710. }
  711. }
  712. delete(f.announced, hash)
  713. if f.announceChangeHook != nil {
  714. f.announceChangeHook(hash, false)
  715. }
  716. // Remove any pending fetches and decrement the DOS counters
  717. if announce := f.fetching[hash]; announce != nil {
  718. f.announces[announce.origin]--
  719. if f.announces[announce.origin] == 0 {
  720. delete(f.announces, announce.origin)
  721. }
  722. delete(f.fetching, hash)
  723. }
  724. // Remove any pending completion requests and decrement the DOS counters
  725. for _, announce := range f.fetched[hash] {
  726. f.announces[announce.origin]--
  727. if f.announces[announce.origin] == 0 {
  728. delete(f.announces, announce.origin)
  729. }
  730. }
  731. delete(f.fetched, hash)
  732. // Remove any pending completions and decrement the DOS counters
  733. if announce := f.completing[hash]; announce != nil {
  734. f.announces[announce.origin]--
  735. if f.announces[announce.origin] == 0 {
  736. delete(f.announces, announce.origin)
  737. }
  738. delete(f.completing, hash)
  739. }
  740. }
  741. // forgetBlock removes all traces of a queued block from the fetcher's internal
  742. // state.
  743. func (f *Fetcher) forgetBlock(hash common.Hash) {
  744. if insert := f.queued[hash]; insert != nil {
  745. f.queues[insert.origin]--
  746. if f.queues[insert.origin] == 0 {
  747. delete(f.queues, insert.origin)
  748. }
  749. delete(f.queued, hash)
  750. }
  751. }