block_fetcher_test.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus/ethash"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/crypto"
  30. "github.com/ethereum/go-ethereum/params"
  31. "github.com/ethereum/go-ethereum/trie"
  32. )
  33. var (
  34. testdb = rawdb.NewMemoryDatabase()
  35. testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
  36. testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
  37. gspec = core.Genesis{
  38. Alloc: core.GenesisAlloc{testAddress: {Balance: big.NewInt(1000000000000000)}},
  39. BaseFee: big.NewInt(params.InitialBaseFee),
  40. }
  41. genesis = gspec.MustCommit(testdb)
  42. unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit, BaseFee: big.NewInt(params.InitialBaseFee)}, nil, nil, nil, trie.NewStackTrie(nil))
  43. )
  44. // makeChain creates a chain of n blocks starting at and including parent.
  45. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  46. // contains a transaction and every 5th an uncle to allow testing correct block
  47. // reassembly.
  48. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
  49. blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
  50. block.SetCoinbase(common.Address{seed})
  51. // If the block number is multiple of 3, send a bonus transaction to the miner
  52. if parent == genesis && i%3 == 0 {
  53. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  54. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
  55. if err != nil {
  56. panic(err)
  57. }
  58. block.AddTx(tx)
  59. }
  60. // If the block number is a multiple of 5, add a bonus uncle to the block
  61. if i > 0 && i%5 == 0 {
  62. block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 2).Hash(), Number: big.NewInt(int64(i - 1))})
  63. }
  64. })
  65. hashes := make([]common.Hash, n+1)
  66. hashes[len(hashes)-1] = parent.Hash()
  67. blockm := make(map[common.Hash]*types.Block, n+1)
  68. blockm[parent.Hash()] = parent
  69. for i, b := range blocks {
  70. hashes[len(hashes)-i-2] = b.Hash()
  71. blockm[b.Hash()] = b
  72. }
  73. return hashes, blockm
  74. }
  75. // fetcherTester is a test simulator for mocking out local block chain.
  76. type fetcherTester struct {
  77. fetcher *BlockFetcher
  78. hashes []common.Hash // Hash chain belonging to the tester
  79. headers map[common.Hash]*types.Header // Headers belonging to the tester
  80. blocks map[common.Hash]*types.Block // Blocks belonging to the tester
  81. drops map[string]bool // Map of peers dropped by the fetcher
  82. lock sync.RWMutex
  83. }
  84. // newTester creates a new fetcher test mocker.
  85. func newTester(light bool) *fetcherTester {
  86. tester := &fetcherTester{
  87. hashes: []common.Hash{genesis.Hash()},
  88. headers: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
  89. blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
  90. drops: make(map[string]bool),
  91. }
  92. tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
  93. tester.fetcher.Start()
  94. return tester
  95. }
  96. // getHeader retrieves a header from the tester's block chain.
  97. func (f *fetcherTester) getHeader(hash common.Hash) *types.Header {
  98. f.lock.RLock()
  99. defer f.lock.RUnlock()
  100. return f.headers[hash]
  101. }
  102. // getBlock retrieves a block from the tester's block chain.
  103. func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
  104. f.lock.RLock()
  105. defer f.lock.RUnlock()
  106. return f.blocks[hash]
  107. }
  108. // verifyHeader is a nop placeholder for the block header verification.
  109. func (f *fetcherTester) verifyHeader(header *types.Header) error {
  110. return nil
  111. }
  112. // broadcastBlock is a nop placeholder for the block broadcasting.
  113. func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
  114. }
  115. // chainHeight retrieves the current height (block number) of the chain.
  116. func (f *fetcherTester) chainHeight() uint64 {
  117. f.lock.RLock()
  118. defer f.lock.RUnlock()
  119. if f.fetcher.light {
  120. return f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64()
  121. }
  122. return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
  123. }
  124. // insertChain injects a new headers into the simulated chain.
  125. func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
  126. f.lock.Lock()
  127. defer f.lock.Unlock()
  128. for i, header := range headers {
  129. // Make sure the parent in known
  130. if _, ok := f.headers[header.ParentHash]; !ok {
  131. return i, errors.New("unknown parent")
  132. }
  133. // Discard any new blocks if the same height already exists
  134. if header.Number.Uint64() <= f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64() {
  135. return i, nil
  136. }
  137. // Otherwise build our current chain
  138. f.hashes = append(f.hashes, header.Hash())
  139. f.headers[header.Hash()] = header
  140. }
  141. return 0, nil
  142. }
  143. // insertChain injects a new blocks into the simulated chain.
  144. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
  145. f.lock.Lock()
  146. defer f.lock.Unlock()
  147. for i, block := range blocks {
  148. // Make sure the parent in known
  149. if _, ok := f.blocks[block.ParentHash()]; !ok {
  150. return i, errors.New("unknown parent")
  151. }
  152. // Discard any new blocks if the same height already exists
  153. if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
  154. return i, nil
  155. }
  156. // Otherwise build our current chain
  157. f.hashes = append(f.hashes, block.Hash())
  158. f.blocks[block.Hash()] = block
  159. }
  160. return 0, nil
  161. }
  162. // dropPeer is an emulator for the peer removal, simply accumulating the various
  163. // peers dropped by the fetcher.
  164. func (f *fetcherTester) dropPeer(peer string) {
  165. f.lock.Lock()
  166. defer f.lock.Unlock()
  167. f.drops[peer] = true
  168. }
  169. // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
  170. func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
  171. closure := make(map[common.Hash]*types.Block)
  172. for hash, block := range blocks {
  173. closure[hash] = block
  174. }
  175. // Create a function that return a header from the closure
  176. return func(hash common.Hash) error {
  177. // Gather the blocks to return
  178. headers := make([]*types.Header, 0, 1)
  179. if block, ok := closure[hash]; ok {
  180. headers = append(headers, block.Header())
  181. }
  182. // Return on a new thread
  183. go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
  184. return nil
  185. }
  186. }
  187. // makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
  188. func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
  189. closure := make(map[common.Hash]*types.Block)
  190. for hash, block := range blocks {
  191. closure[hash] = block
  192. }
  193. // Create a function that returns blocks from the closure
  194. return func(hashes []common.Hash) error {
  195. // Gather the block bodies to return
  196. transactions := make([][]*types.Transaction, 0, len(hashes))
  197. uncles := make([][]*types.Header, 0, len(hashes))
  198. for _, hash := range hashes {
  199. if block, ok := closure[hash]; ok {
  200. transactions = append(transactions, block.Transactions())
  201. uncles = append(uncles, block.Uncles())
  202. }
  203. }
  204. // Return on a new thread
  205. go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
  206. return nil
  207. }
  208. }
  209. // verifyFetchingEvent verifies that one single event arrive on a fetching channel.
  210. func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
  211. if arrive {
  212. select {
  213. case <-fetching:
  214. case <-time.After(time.Second):
  215. t.Fatalf("fetching timeout")
  216. }
  217. } else {
  218. select {
  219. case <-fetching:
  220. t.Fatalf("fetching invoked")
  221. case <-time.After(10 * time.Millisecond):
  222. }
  223. }
  224. }
  225. // verifyCompletingEvent verifies that one single event arrive on an completing channel.
  226. func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
  227. if arrive {
  228. select {
  229. case <-completing:
  230. case <-time.After(time.Second):
  231. t.Fatalf("completing timeout")
  232. }
  233. } else {
  234. select {
  235. case <-completing:
  236. t.Fatalf("completing invoked")
  237. case <-time.After(10 * time.Millisecond):
  238. }
  239. }
  240. }
  241. // verifyImportEvent verifies that one single event arrive on an import channel.
  242. func verifyImportEvent(t *testing.T, imported chan interface{}, arrive bool) {
  243. if arrive {
  244. select {
  245. case <-imported:
  246. case <-time.After(time.Second):
  247. t.Fatalf("import timeout")
  248. }
  249. } else {
  250. select {
  251. case <-imported:
  252. t.Fatalf("import invoked")
  253. case <-time.After(20 * time.Millisecond):
  254. }
  255. }
  256. }
  257. // verifyImportCount verifies that exactly count number of events arrive on an
  258. // import hook channel.
  259. func verifyImportCount(t *testing.T, imported chan interface{}, count int) {
  260. for i := 0; i < count; i++ {
  261. select {
  262. case <-imported:
  263. case <-time.After(time.Second):
  264. t.Fatalf("block %d: import timeout", i+1)
  265. }
  266. }
  267. verifyImportDone(t, imported)
  268. }
  269. // verifyImportDone verifies that no more events are arriving on an import channel.
  270. func verifyImportDone(t *testing.T, imported chan interface{}) {
  271. select {
  272. case <-imported:
  273. t.Fatalf("extra block imported")
  274. case <-time.After(50 * time.Millisecond):
  275. }
  276. }
  277. // verifyChainHeight verifies the chain height is as expected.
  278. func verifyChainHeight(t *testing.T, fetcher *fetcherTester, height uint64) {
  279. if fetcher.chainHeight() != height {
  280. t.Fatalf("chain height mismatch, got %d, want %d", fetcher.chainHeight(), height)
  281. }
  282. }
  283. // Tests that a fetcher accepts block/header announcements and initiates retrievals
  284. // for them, successfully importing into the local chain.
  285. func TestFullSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, false) }
  286. func TestLightSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, true) }
  287. func testSequentialAnnouncements(t *testing.T, light bool) {
  288. // Create a chain of blocks to import
  289. targetBlocks := 4 * hashLimit
  290. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  291. tester := newTester(light)
  292. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  293. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  294. // Iteratively announce blocks until all are imported
  295. imported := make(chan interface{})
  296. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  297. if light {
  298. if header == nil {
  299. t.Fatalf("Fetcher try to import empty header")
  300. }
  301. imported <- header
  302. } else {
  303. if block == nil {
  304. t.Fatalf("Fetcher try to import empty block")
  305. }
  306. imported <- block
  307. }
  308. }
  309. for i := len(hashes) - 2; i >= 0; i-- {
  310. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  311. verifyImportEvent(t, imported, true)
  312. }
  313. verifyImportDone(t, imported)
  314. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  315. }
  316. // Tests that if blocks are announced by multiple peers (or even the same buggy
  317. // peer), they will only get downloaded at most once.
  318. func TestFullConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, false) }
  319. func TestLightConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, true) }
  320. func testConcurrentAnnouncements(t *testing.T, light bool) {
  321. // Create a chain of blocks to import
  322. targetBlocks := 4 * hashLimit
  323. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  324. // Assemble a tester with a built in counter for the requests
  325. tester := newTester(light)
  326. firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
  327. firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
  328. secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
  329. secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
  330. counter := uint32(0)
  331. firstHeaderWrapper := func(hash common.Hash) error {
  332. atomic.AddUint32(&counter, 1)
  333. return firstHeaderFetcher(hash)
  334. }
  335. secondHeaderWrapper := func(hash common.Hash) error {
  336. atomic.AddUint32(&counter, 1)
  337. return secondHeaderFetcher(hash)
  338. }
  339. // Iteratively announce blocks until all are imported
  340. imported := make(chan interface{})
  341. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  342. if light {
  343. if header == nil {
  344. t.Fatalf("Fetcher try to import empty header")
  345. }
  346. imported <- header
  347. } else {
  348. if block == nil {
  349. t.Fatalf("Fetcher try to import empty block")
  350. }
  351. imported <- block
  352. }
  353. }
  354. for i := len(hashes) - 2; i >= 0; i-- {
  355. tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
  356. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  357. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  358. verifyImportEvent(t, imported, true)
  359. }
  360. verifyImportDone(t, imported)
  361. // Make sure no blocks were retrieved twice
  362. if int(counter) != targetBlocks {
  363. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
  364. }
  365. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  366. }
  367. // Tests that announcements arriving while a previous is being fetched still
  368. // results in a valid import.
  369. func TestFullOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, false) }
  370. func TestLightOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, true) }
  371. func testOverlappingAnnouncements(t *testing.T, light bool) {
  372. // Create a chain of blocks to import
  373. targetBlocks := 4 * hashLimit
  374. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  375. tester := newTester(light)
  376. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  377. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  378. // Iteratively announce blocks, but overlap them continuously
  379. overlap := 16
  380. imported := make(chan interface{}, len(hashes)-1)
  381. for i := 0; i < overlap; i++ {
  382. imported <- nil
  383. }
  384. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  385. if light {
  386. if header == nil {
  387. t.Fatalf("Fetcher try to import empty header")
  388. }
  389. imported <- header
  390. } else {
  391. if block == nil {
  392. t.Fatalf("Fetcher try to import empty block")
  393. }
  394. imported <- block
  395. }
  396. }
  397. for i := len(hashes) - 2; i >= 0; i-- {
  398. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  399. select {
  400. case <-imported:
  401. case <-time.After(time.Second):
  402. t.Fatalf("block %d: import timeout", len(hashes)-i)
  403. }
  404. }
  405. // Wait for all the imports to complete and check count
  406. verifyImportCount(t, imported, overlap)
  407. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  408. }
  409. // Tests that announces already being retrieved will not be duplicated.
  410. func TestFullPendingDeduplication(t *testing.T) { testPendingDeduplication(t, false) }
  411. func TestLightPendingDeduplication(t *testing.T) { testPendingDeduplication(t, true) }
  412. func testPendingDeduplication(t *testing.T, light bool) {
  413. // Create a hash and corresponding block
  414. hashes, blocks := makeChain(1, 0, genesis)
  415. // Assemble a tester with a built in counter and delayed fetcher
  416. tester := newTester(light)
  417. headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
  418. bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
  419. delay := 50 * time.Millisecond
  420. counter := uint32(0)
  421. headerWrapper := func(hash common.Hash) error {
  422. atomic.AddUint32(&counter, 1)
  423. // Simulate a long running fetch
  424. go func() {
  425. time.Sleep(delay)
  426. headerFetcher(hash)
  427. }()
  428. return nil
  429. }
  430. checkNonExist := func() bool {
  431. return tester.getBlock(hashes[0]) == nil
  432. }
  433. if light {
  434. checkNonExist = func() bool {
  435. return tester.getHeader(hashes[0]) == nil
  436. }
  437. }
  438. // Announce the same block many times until it's fetched (wait for any pending ops)
  439. for checkNonExist() {
  440. tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
  441. time.Sleep(time.Millisecond)
  442. }
  443. time.Sleep(delay)
  444. // Check that all blocks were imported and none fetched twice
  445. if int(counter) != 1 {
  446. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
  447. }
  448. verifyChainHeight(t, tester, 1)
  449. }
  450. // Tests that announcements retrieved in a random order are cached and eventually
  451. // imported when all the gaps are filled in.
  452. func TestFullRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, false) }
  453. func TestLightRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, true) }
  454. func testRandomArrivalImport(t *testing.T, light bool) {
  455. // Create a chain of blocks to import, and choose one to delay
  456. targetBlocks := maxQueueDist
  457. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  458. skip := targetBlocks / 2
  459. tester := newTester(light)
  460. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  461. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  462. // Iteratively announce blocks, skipping one entry
  463. imported := make(chan interface{}, len(hashes)-1)
  464. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  465. if light {
  466. if header == nil {
  467. t.Fatalf("Fetcher try to import empty header")
  468. }
  469. imported <- header
  470. } else {
  471. if block == nil {
  472. t.Fatalf("Fetcher try to import empty block")
  473. }
  474. imported <- block
  475. }
  476. }
  477. for i := len(hashes) - 1; i >= 0; i-- {
  478. if i != skip {
  479. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  480. time.Sleep(time.Millisecond)
  481. }
  482. }
  483. // Finally announce the skipped entry and check full import
  484. tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  485. verifyImportCount(t, imported, len(hashes)-1)
  486. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  487. }
  488. // Tests that direct block enqueues (due to block propagation vs. hash announce)
  489. // are correctly schedule, filling and import queue gaps.
  490. func TestQueueGapFill(t *testing.T) {
  491. // Create a chain of blocks to import, and choose one to not announce at all
  492. targetBlocks := maxQueueDist
  493. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  494. skip := targetBlocks / 2
  495. tester := newTester(false)
  496. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  497. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  498. // Iteratively announce blocks, skipping one entry
  499. imported := make(chan interface{}, len(hashes)-1)
  500. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  501. for i := len(hashes) - 1; i >= 0; i-- {
  502. if i != skip {
  503. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  504. time.Sleep(time.Millisecond)
  505. }
  506. }
  507. // Fill the missing block directly as if propagated
  508. tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
  509. verifyImportCount(t, imported, len(hashes)-1)
  510. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  511. }
  512. // Tests that blocks arriving from various sources (multiple propagations, hash
  513. // announces, etc) do not get scheduled for import multiple times.
  514. func TestImportDeduplication(t *testing.T) {
  515. // Create two blocks to import (one for duplication, the other for stalling)
  516. hashes, blocks := makeChain(2, 0, genesis)
  517. // Create the tester and wrap the importer with a counter
  518. tester := newTester(false)
  519. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  520. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  521. counter := uint32(0)
  522. tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
  523. atomic.AddUint32(&counter, uint32(len(blocks)))
  524. return tester.insertChain(blocks)
  525. }
  526. // Instrument the fetching and imported events
  527. fetching := make(chan []common.Hash)
  528. imported := make(chan interface{}, len(hashes)-1)
  529. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  530. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  531. // Announce the duplicating block, wait for retrieval, and also propagate directly
  532. tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  533. <-fetching
  534. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  535. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  536. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  537. // Fill the missing block directly as if propagated, and check import uniqueness
  538. tester.fetcher.Enqueue("valid", blocks[hashes[1]])
  539. verifyImportCount(t, imported, 2)
  540. if counter != 2 {
  541. t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
  542. }
  543. }
  544. // Tests that blocks with numbers much lower or higher than out current head get
  545. // discarded to prevent wasting resources on useless blocks from faulty peers.
  546. func TestDistantPropagationDiscarding(t *testing.T) {
  547. // Create a long chain to import and define the discard boundaries
  548. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  549. head := hashes[len(hashes)/2]
  550. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  551. // Create a tester and simulate a head block being the middle of the above chain
  552. tester := newTester(false)
  553. tester.lock.Lock()
  554. tester.hashes = []common.Hash{head}
  555. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  556. tester.lock.Unlock()
  557. // Ensure that a block with a lower number than the threshold is discarded
  558. tester.fetcher.Enqueue("lower", blocks[hashes[low]])
  559. time.Sleep(10 * time.Millisecond)
  560. if !tester.fetcher.queue.Empty() {
  561. t.Fatalf("fetcher queued stale block")
  562. }
  563. // Ensure that a block with a higher number than the threshold is discarded
  564. tester.fetcher.Enqueue("higher", blocks[hashes[high]])
  565. time.Sleep(10 * time.Millisecond)
  566. if !tester.fetcher.queue.Empty() {
  567. t.Fatalf("fetcher queued future block")
  568. }
  569. }
  570. // Tests that announcements with numbers much lower or higher than out current
  571. // head get discarded to prevent wasting resources on useless blocks from faulty
  572. // peers.
  573. func TestFullDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, false) }
  574. func TestLightDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, true) }
  575. func testDistantAnnouncementDiscarding(t *testing.T, light bool) {
  576. // Create a long chain to import and define the discard boundaries
  577. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  578. head := hashes[len(hashes)/2]
  579. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  580. // Create a tester and simulate a head block being the middle of the above chain
  581. tester := newTester(light)
  582. tester.lock.Lock()
  583. tester.hashes = []common.Hash{head}
  584. tester.headers = map[common.Hash]*types.Header{head: blocks[head].Header()}
  585. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  586. tester.lock.Unlock()
  587. headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
  588. bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
  589. fetching := make(chan struct{}, 2)
  590. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
  591. // Ensure that a block with a lower number than the threshold is discarded
  592. tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  593. select {
  594. case <-time.After(50 * time.Millisecond):
  595. case <-fetching:
  596. t.Fatalf("fetcher requested stale header")
  597. }
  598. // Ensure that a block with a higher number than the threshold is discarded
  599. tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  600. select {
  601. case <-time.After(50 * time.Millisecond):
  602. case <-fetching:
  603. t.Fatalf("fetcher requested future header")
  604. }
  605. }
  606. // Tests that peers announcing blocks with invalid numbers (i.e. not matching
  607. // the headers provided afterwards) get dropped as malicious.
  608. func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) }
  609. func TestLightInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, true) }
  610. func testInvalidNumberAnnouncement(t *testing.T, light bool) {
  611. // Create a single block to import and check numbers against
  612. hashes, blocks := makeChain(1, 0, genesis)
  613. tester := newTester(light)
  614. badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
  615. badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
  616. imported := make(chan interface{})
  617. announced := make(chan interface{})
  618. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  619. if light {
  620. if header == nil {
  621. t.Fatalf("Fetcher try to import empty header")
  622. }
  623. imported <- header
  624. } else {
  625. if block == nil {
  626. t.Fatalf("Fetcher try to import empty block")
  627. }
  628. imported <- block
  629. }
  630. }
  631. // Announce a block with a bad number, check for immediate drop
  632. tester.fetcher.announceChangeHook = func(hash common.Hash, b bool) {
  633. announced <- nil
  634. }
  635. tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
  636. verifyAnnounce := func() {
  637. for i := 0; i < 2; i++ {
  638. select {
  639. case <-announced:
  640. continue
  641. case <-time.After(1 * time.Second):
  642. t.Fatal("announce timeout")
  643. return
  644. }
  645. }
  646. }
  647. verifyAnnounce()
  648. verifyImportEvent(t, imported, false)
  649. tester.lock.RLock()
  650. dropped := tester.drops["bad"]
  651. tester.lock.RUnlock()
  652. if !dropped {
  653. t.Fatalf("peer with invalid numbered announcement not dropped")
  654. }
  655. goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
  656. goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
  657. // Make sure a good announcement passes without a drop
  658. tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
  659. verifyAnnounce()
  660. verifyImportEvent(t, imported, true)
  661. tester.lock.RLock()
  662. dropped = tester.drops["good"]
  663. tester.lock.RUnlock()
  664. if dropped {
  665. t.Fatalf("peer with valid numbered announcement dropped")
  666. }
  667. verifyImportDone(t, imported)
  668. }
  669. // Tests that if a block is empty (i.e. header only), no body request should be
  670. // made, and instead the header should be assembled into a whole block in itself.
  671. func TestEmptyBlockShortCircuit(t *testing.T) {
  672. // Create a chain of blocks to import
  673. hashes, blocks := makeChain(32, 0, genesis)
  674. tester := newTester(false)
  675. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  676. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  677. // Add a monitoring hook for all internal events
  678. fetching := make(chan []common.Hash)
  679. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  680. completing := make(chan []common.Hash)
  681. tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
  682. imported := make(chan interface{})
  683. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  684. if block == nil {
  685. t.Fatalf("Fetcher try to import empty block")
  686. }
  687. imported <- block
  688. }
  689. // Iteratively announce blocks until all are imported
  690. for i := len(hashes) - 2; i >= 0; i-- {
  691. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  692. // All announces should fetch the header
  693. verifyFetchingEvent(t, fetching, true)
  694. // Only blocks with data contents should request bodies
  695. verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
  696. // Irrelevant of the construct, import should succeed
  697. verifyImportEvent(t, imported, true)
  698. }
  699. verifyImportDone(t, imported)
  700. }
  701. // Tests that a peer is unable to use unbounded memory with sending infinite
  702. // block announcements to a node, but that even in the face of such an attack,
  703. // the fetcher remains operational.
  704. func TestHashMemoryExhaustionAttack(t *testing.T) {
  705. // Create a tester with instrumented import hooks
  706. tester := newTester(false)
  707. imported, announces := make(chan interface{}), int32(0)
  708. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  709. tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
  710. if added {
  711. atomic.AddInt32(&announces, 1)
  712. } else {
  713. atomic.AddInt32(&announces, -1)
  714. }
  715. }
  716. // Create a valid chain and an infinite junk chain
  717. targetBlocks := hashLimit + 2*maxQueueDist
  718. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  719. validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  720. validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  721. attack, _ := makeChain(targetBlocks, 0, unknownBlock)
  722. attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
  723. attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
  724. // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
  725. for i := 0; i < len(attack); i++ {
  726. if i < maxQueueDist {
  727. tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
  728. }
  729. tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
  730. }
  731. if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
  732. t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
  733. }
  734. // Wait for fetches to complete
  735. verifyImportCount(t, imported, maxQueueDist)
  736. // Feed the remaining valid hashes to ensure DOS protection state remains clean
  737. for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
  738. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
  739. verifyImportEvent(t, imported, true)
  740. }
  741. verifyImportDone(t, imported)
  742. }
  743. // Tests that blocks sent to the fetcher (either through propagation or via hash
  744. // announces and retrievals) don't pile up indefinitely, exhausting available
  745. // system memory.
  746. func TestBlockMemoryExhaustionAttack(t *testing.T) {
  747. // Create a tester with instrumented import hooks
  748. tester := newTester(false)
  749. imported, enqueued := make(chan interface{}), int32(0)
  750. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  751. tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
  752. if added {
  753. atomic.AddInt32(&enqueued, 1)
  754. } else {
  755. atomic.AddInt32(&enqueued, -1)
  756. }
  757. }
  758. // Create a valid chain and a batch of dangling (but in range) blocks
  759. targetBlocks := hashLimit + 2*maxQueueDist
  760. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  761. attack := make(map[common.Hash]*types.Block)
  762. for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
  763. hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
  764. for _, hash := range hashes[:maxQueueDist-2] {
  765. attack[hash] = blocks[hash]
  766. }
  767. }
  768. // Try to feed all the attacker blocks make sure only a limited batch is accepted
  769. for _, block := range attack {
  770. tester.fetcher.Enqueue("attacker", block)
  771. }
  772. time.Sleep(200 * time.Millisecond)
  773. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
  774. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
  775. }
  776. // Queue up a batch of valid blocks, and check that a new peer is allowed to do so
  777. for i := 0; i < maxQueueDist-1; i++ {
  778. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
  779. }
  780. time.Sleep(100 * time.Millisecond)
  781. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
  782. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
  783. }
  784. // Insert the missing piece (and sanity check the import)
  785. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
  786. verifyImportCount(t, imported, maxQueueDist)
  787. // Insert the remaining blocks in chunks to ensure clean DOS protection
  788. for i := maxQueueDist; i < len(hashes)-1; i++ {
  789. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
  790. verifyImportEvent(t, imported, true)
  791. }
  792. verifyImportDone(t, imported)
  793. }