block_fetcher_test.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus/ethash"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/crypto"
  30. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  31. "github.com/ethereum/go-ethereum/params"
  32. "github.com/ethereum/go-ethereum/trie"
  33. )
  34. var (
  35. testdb = rawdb.NewMemoryDatabase()
  36. testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
  37. testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
  38. gspec = core.Genesis{
  39. Alloc: core.GenesisAlloc{testAddress: {Balance: big.NewInt(1000000000000000)}},
  40. BaseFee: big.NewInt(params.InitialBaseFee),
  41. }
  42. genesis = gspec.MustCommit(testdb)
  43. unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit, BaseFee: big.NewInt(params.InitialBaseFee)}, nil, nil, nil, trie.NewStackTrie(nil))
  44. )
  45. // makeChain creates a chain of n blocks starting at and including parent.
  46. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  47. // contains a transaction and every 5th an uncle to allow testing correct block
  48. // reassembly.
  49. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
  50. blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
  51. block.SetCoinbase(common.Address{seed})
  52. // If the block number is multiple of 3, send a bonus transaction to the miner
  53. if parent == genesis && i%3 == 0 {
  54. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  55. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
  56. if err != nil {
  57. panic(err)
  58. }
  59. block.AddTx(tx)
  60. }
  61. // If the block number is a multiple of 5, add a bonus uncle to the block
  62. if i > 0 && i%5 == 0 {
  63. block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 2).Hash(), Number: big.NewInt(int64(i - 1))})
  64. }
  65. })
  66. hashes := make([]common.Hash, n+1)
  67. hashes[len(hashes)-1] = parent.Hash()
  68. blockm := make(map[common.Hash]*types.Block, n+1)
  69. blockm[parent.Hash()] = parent
  70. for i, b := range blocks {
  71. hashes[len(hashes)-i-2] = b.Hash()
  72. blockm[b.Hash()] = b
  73. }
  74. return hashes, blockm
  75. }
  76. // fetcherTester is a test simulator for mocking out local block chain.
  77. type fetcherTester struct {
  78. fetcher *BlockFetcher
  79. hashes []common.Hash // Hash chain belonging to the tester
  80. headers map[common.Hash]*types.Header // Headers belonging to the tester
  81. blocks map[common.Hash]*types.Block // Blocks belonging to the tester
  82. drops map[string]bool // Map of peers dropped by the fetcher
  83. lock sync.RWMutex
  84. }
  85. // newTester creates a new fetcher test mocker.
  86. func newTester(light bool) *fetcherTester {
  87. tester := &fetcherTester{
  88. hashes: []common.Hash{genesis.Hash()},
  89. headers: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
  90. blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
  91. drops: make(map[string]bool),
  92. }
  93. tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
  94. tester.fetcher.Start()
  95. return tester
  96. }
  97. // getHeader retrieves a header from the tester's block chain.
  98. func (f *fetcherTester) getHeader(hash common.Hash) *types.Header {
  99. f.lock.RLock()
  100. defer f.lock.RUnlock()
  101. return f.headers[hash]
  102. }
  103. // getBlock retrieves a block from the tester's block chain.
  104. func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
  105. f.lock.RLock()
  106. defer f.lock.RUnlock()
  107. return f.blocks[hash]
  108. }
  109. // verifyHeader is a nop placeholder for the block header verification.
  110. func (f *fetcherTester) verifyHeader(header *types.Header) error {
  111. return nil
  112. }
  113. // broadcastBlock is a nop placeholder for the block broadcasting.
  114. func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
  115. }
  116. // chainHeight retrieves the current height (block number) of the chain.
  117. func (f *fetcherTester) chainHeight() uint64 {
  118. f.lock.RLock()
  119. defer f.lock.RUnlock()
  120. if f.fetcher.light {
  121. return f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64()
  122. }
  123. return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
  124. }
  125. // insertChain injects a new headers into the simulated chain.
  126. func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
  127. f.lock.Lock()
  128. defer f.lock.Unlock()
  129. for i, header := range headers {
  130. // Make sure the parent in known
  131. if _, ok := f.headers[header.ParentHash]; !ok {
  132. return i, errors.New("unknown parent")
  133. }
  134. // Discard any new blocks if the same height already exists
  135. if header.Number.Uint64() <= f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64() {
  136. return i, nil
  137. }
  138. // Otherwise build our current chain
  139. f.hashes = append(f.hashes, header.Hash())
  140. f.headers[header.Hash()] = header
  141. }
  142. return 0, nil
  143. }
  144. // insertChain injects a new blocks into the simulated chain.
  145. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
  146. f.lock.Lock()
  147. defer f.lock.Unlock()
  148. for i, block := range blocks {
  149. // Make sure the parent in known
  150. if _, ok := f.blocks[block.ParentHash()]; !ok {
  151. return i, errors.New("unknown parent")
  152. }
  153. // Discard any new blocks if the same height already exists
  154. if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
  155. return i, nil
  156. }
  157. // Otherwise build our current chain
  158. f.hashes = append(f.hashes, block.Hash())
  159. f.blocks[block.Hash()] = block
  160. }
  161. return 0, nil
  162. }
  163. // dropPeer is an emulator for the peer removal, simply accumulating the various
  164. // peers dropped by the fetcher.
  165. func (f *fetcherTester) dropPeer(peer string) {
  166. f.lock.Lock()
  167. defer f.lock.Unlock()
  168. f.drops[peer] = true
  169. }
  170. // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
  171. func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
  172. closure := make(map[common.Hash]*types.Block)
  173. for hash, block := range blocks {
  174. closure[hash] = block
  175. }
  176. // Create a function that return a header from the closure
  177. return func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
  178. // Gather the blocks to return
  179. headers := make([]*types.Header, 0, 1)
  180. if block, ok := closure[hash]; ok {
  181. headers = append(headers, block.Header())
  182. }
  183. // Return on a new thread
  184. req := &eth.Request{
  185. Peer: peer,
  186. }
  187. res := &eth.Response{
  188. Req: req,
  189. Res: (*eth.BlockHeadersPacket)(&headers),
  190. Time: drift,
  191. Done: make(chan error, 1), // Ignore the returned status
  192. }
  193. go func() {
  194. sink <- res
  195. }()
  196. return req, nil
  197. }
  198. }
  199. // makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
  200. func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
  201. closure := make(map[common.Hash]*types.Block)
  202. for hash, block := range blocks {
  203. closure[hash] = block
  204. }
  205. // Create a function that returns blocks from the closure
  206. return func(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
  207. // Gather the block bodies to return
  208. transactions := make([][]*types.Transaction, 0, len(hashes))
  209. uncles := make([][]*types.Header, 0, len(hashes))
  210. for _, hash := range hashes {
  211. if block, ok := closure[hash]; ok {
  212. transactions = append(transactions, block.Transactions())
  213. uncles = append(uncles, block.Uncles())
  214. }
  215. }
  216. // Return on a new thread
  217. bodies := make([]*eth.BlockBody, len(transactions))
  218. for i, txs := range transactions {
  219. bodies[i] = &eth.BlockBody{
  220. Transactions: txs,
  221. Uncles: uncles[i],
  222. }
  223. }
  224. req := &eth.Request{
  225. Peer: peer,
  226. }
  227. res := &eth.Response{
  228. Req: req,
  229. Res: (*eth.BlockBodiesPacket)(&bodies),
  230. Time: drift,
  231. Done: make(chan error, 1), // Ignore the returned status
  232. }
  233. go func() {
  234. sink <- res
  235. }()
  236. return req, nil
  237. }
  238. }
  239. // verifyFetchingEvent verifies that one single event arrive on a fetching channel.
  240. func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
  241. t.Helper()
  242. if arrive {
  243. select {
  244. case <-fetching:
  245. case <-time.After(time.Second):
  246. t.Fatalf("fetching timeout")
  247. }
  248. } else {
  249. select {
  250. case <-fetching:
  251. t.Fatalf("fetching invoked")
  252. case <-time.After(10 * time.Millisecond):
  253. }
  254. }
  255. }
  256. // verifyCompletingEvent verifies that one single event arrive on an completing channel.
  257. func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
  258. t.Helper()
  259. if arrive {
  260. select {
  261. case <-completing:
  262. case <-time.After(time.Second):
  263. t.Fatalf("completing timeout")
  264. }
  265. } else {
  266. select {
  267. case <-completing:
  268. t.Fatalf("completing invoked")
  269. case <-time.After(10 * time.Millisecond):
  270. }
  271. }
  272. }
  273. // verifyImportEvent verifies that one single event arrive on an import channel.
  274. func verifyImportEvent(t *testing.T, imported chan interface{}, arrive bool) {
  275. t.Helper()
  276. if arrive {
  277. select {
  278. case <-imported:
  279. case <-time.After(time.Second):
  280. t.Fatalf("import timeout")
  281. }
  282. } else {
  283. select {
  284. case <-imported:
  285. t.Fatalf("import invoked")
  286. case <-time.After(20 * time.Millisecond):
  287. }
  288. }
  289. }
  290. // verifyImportCount verifies that exactly count number of events arrive on an
  291. // import hook channel.
  292. func verifyImportCount(t *testing.T, imported chan interface{}, count int) {
  293. t.Helper()
  294. for i := 0; i < count; i++ {
  295. select {
  296. case <-imported:
  297. case <-time.After(time.Second):
  298. t.Fatalf("block %d: import timeout", i+1)
  299. }
  300. }
  301. verifyImportDone(t, imported)
  302. }
  303. // verifyImportDone verifies that no more events are arriving on an import channel.
  304. func verifyImportDone(t *testing.T, imported chan interface{}) {
  305. t.Helper()
  306. select {
  307. case <-imported:
  308. t.Fatalf("extra block imported")
  309. case <-time.After(50 * time.Millisecond):
  310. }
  311. }
  312. // verifyChainHeight verifies the chain height is as expected.
  313. func verifyChainHeight(t *testing.T, fetcher *fetcherTester, height uint64) {
  314. t.Helper()
  315. if fetcher.chainHeight() != height {
  316. t.Fatalf("chain height mismatch, got %d, want %d", fetcher.chainHeight(), height)
  317. }
  318. }
  319. // Tests that a fetcher accepts block/header announcements and initiates retrievals
  320. // for them, successfully importing into the local chain.
  321. func TestFullSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, false) }
  322. func TestLightSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, true) }
  323. func testSequentialAnnouncements(t *testing.T, light bool) {
  324. // Create a chain of blocks to import
  325. targetBlocks := 4 * hashLimit
  326. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  327. tester := newTester(light)
  328. defer tester.fetcher.Stop()
  329. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  330. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  331. // Iteratively announce blocks until all are imported
  332. imported := make(chan interface{})
  333. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  334. if light {
  335. if header == nil {
  336. t.Fatalf("Fetcher try to import empty header")
  337. }
  338. imported <- header
  339. } else {
  340. if block == nil {
  341. t.Fatalf("Fetcher try to import empty block")
  342. }
  343. imported <- block
  344. }
  345. }
  346. for i := len(hashes) - 2; i >= 0; i-- {
  347. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  348. verifyImportEvent(t, imported, true)
  349. }
  350. verifyImportDone(t, imported)
  351. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  352. }
  353. // Tests that if blocks are announced by multiple peers (or even the same buggy
  354. // peer), they will only get downloaded at most once.
  355. func TestFullConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, false) }
  356. func TestLightConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, true) }
  357. func testConcurrentAnnouncements(t *testing.T, light bool) {
  358. // Create a chain of blocks to import
  359. targetBlocks := 4 * hashLimit
  360. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  361. // Assemble a tester with a built in counter for the requests
  362. tester := newTester(light)
  363. firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
  364. firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
  365. secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
  366. secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
  367. counter := uint32(0)
  368. firstHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
  369. atomic.AddUint32(&counter, 1)
  370. return firstHeaderFetcher(hash, sink)
  371. }
  372. secondHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
  373. atomic.AddUint32(&counter, 1)
  374. return secondHeaderFetcher(hash, sink)
  375. }
  376. // Iteratively announce blocks until all are imported
  377. imported := make(chan interface{})
  378. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  379. if light {
  380. if header == nil {
  381. t.Fatalf("Fetcher try to import empty header")
  382. }
  383. imported <- header
  384. } else {
  385. if block == nil {
  386. t.Fatalf("Fetcher try to import empty block")
  387. }
  388. imported <- block
  389. }
  390. }
  391. for i := len(hashes) - 2; i >= 0; i-- {
  392. tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
  393. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  394. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  395. verifyImportEvent(t, imported, true)
  396. }
  397. verifyImportDone(t, imported)
  398. // Make sure no blocks were retrieved twice
  399. if int(counter) != targetBlocks {
  400. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
  401. }
  402. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  403. }
  404. // Tests that announcements arriving while a previous is being fetched still
  405. // results in a valid import.
  406. func TestFullOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, false) }
  407. func TestLightOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, true) }
  408. func testOverlappingAnnouncements(t *testing.T, light bool) {
  409. // Create a chain of blocks to import
  410. targetBlocks := 4 * hashLimit
  411. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  412. tester := newTester(light)
  413. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  414. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  415. // Iteratively announce blocks, but overlap them continuously
  416. overlap := 16
  417. imported := make(chan interface{}, len(hashes)-1)
  418. for i := 0; i < overlap; i++ {
  419. imported <- nil
  420. }
  421. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  422. if light {
  423. if header == nil {
  424. t.Fatalf("Fetcher try to import empty header")
  425. }
  426. imported <- header
  427. } else {
  428. if block == nil {
  429. t.Fatalf("Fetcher try to import empty block")
  430. }
  431. imported <- block
  432. }
  433. }
  434. for i := len(hashes) - 2; i >= 0; i-- {
  435. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  436. select {
  437. case <-imported:
  438. case <-time.After(time.Second):
  439. t.Fatalf("block %d: import timeout", len(hashes)-i)
  440. }
  441. }
  442. // Wait for all the imports to complete and check count
  443. verifyImportCount(t, imported, overlap)
  444. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  445. }
  446. // Tests that announces already being retrieved will not be duplicated.
  447. func TestFullPendingDeduplication(t *testing.T) { testPendingDeduplication(t, false) }
  448. func TestLightPendingDeduplication(t *testing.T) { testPendingDeduplication(t, true) }
  449. func testPendingDeduplication(t *testing.T, light bool) {
  450. // Create a hash and corresponding block
  451. hashes, blocks := makeChain(1, 0, genesis)
  452. // Assemble a tester with a built in counter and delayed fetcher
  453. tester := newTester(light)
  454. headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
  455. bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
  456. delay := 50 * time.Millisecond
  457. counter := uint32(0)
  458. headerWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) {
  459. atomic.AddUint32(&counter, 1)
  460. // Simulate a long running fetch
  461. resink := make(chan *eth.Response)
  462. req, err := headerFetcher(hash, resink)
  463. if err == nil {
  464. go func() {
  465. res := <-resink
  466. time.Sleep(delay)
  467. sink <- res
  468. }()
  469. }
  470. return req, err
  471. }
  472. checkNonExist := func() bool {
  473. return tester.getBlock(hashes[0]) == nil
  474. }
  475. if light {
  476. checkNonExist = func() bool {
  477. return tester.getHeader(hashes[0]) == nil
  478. }
  479. }
  480. // Announce the same block many times until it's fetched (wait for any pending ops)
  481. for checkNonExist() {
  482. tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
  483. time.Sleep(time.Millisecond)
  484. }
  485. time.Sleep(delay)
  486. // Check that all blocks were imported and none fetched twice
  487. if int(counter) != 1 {
  488. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
  489. }
  490. verifyChainHeight(t, tester, 1)
  491. }
  492. // Tests that announcements retrieved in a random order are cached and eventually
  493. // imported when all the gaps are filled in.
  494. func TestFullRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, false) }
  495. func TestLightRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, true) }
  496. func testRandomArrivalImport(t *testing.T, light bool) {
  497. // Create a chain of blocks to import, and choose one to delay
  498. targetBlocks := maxQueueDist
  499. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  500. skip := targetBlocks / 2
  501. tester := newTester(light)
  502. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  503. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  504. // Iteratively announce blocks, skipping one entry
  505. imported := make(chan interface{}, len(hashes)-1)
  506. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  507. if light {
  508. if header == nil {
  509. t.Fatalf("Fetcher try to import empty header")
  510. }
  511. imported <- header
  512. } else {
  513. if block == nil {
  514. t.Fatalf("Fetcher try to import empty block")
  515. }
  516. imported <- block
  517. }
  518. }
  519. for i := len(hashes) - 1; i >= 0; i-- {
  520. if i != skip {
  521. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  522. time.Sleep(time.Millisecond)
  523. }
  524. }
  525. // Finally announce the skipped entry and check full import
  526. tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  527. verifyImportCount(t, imported, len(hashes)-1)
  528. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  529. }
  530. // Tests that direct block enqueues (due to block propagation vs. hash announce)
  531. // are correctly schedule, filling and import queue gaps.
  532. func TestQueueGapFill(t *testing.T) {
  533. // Create a chain of blocks to import, and choose one to not announce at all
  534. targetBlocks := maxQueueDist
  535. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  536. skip := targetBlocks / 2
  537. tester := newTester(false)
  538. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  539. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  540. // Iteratively announce blocks, skipping one entry
  541. imported := make(chan interface{}, len(hashes)-1)
  542. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  543. for i := len(hashes) - 1; i >= 0; i-- {
  544. if i != skip {
  545. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  546. time.Sleep(time.Millisecond)
  547. }
  548. }
  549. // Fill the missing block directly as if propagated
  550. tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
  551. verifyImportCount(t, imported, len(hashes)-1)
  552. verifyChainHeight(t, tester, uint64(len(hashes)-1))
  553. }
  554. // Tests that blocks arriving from various sources (multiple propagations, hash
  555. // announces, etc) do not get scheduled for import multiple times.
  556. func TestImportDeduplication(t *testing.T) {
  557. // Create two blocks to import (one for duplication, the other for stalling)
  558. hashes, blocks := makeChain(2, 0, genesis)
  559. // Create the tester and wrap the importer with a counter
  560. tester := newTester(false)
  561. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  562. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  563. counter := uint32(0)
  564. tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
  565. atomic.AddUint32(&counter, uint32(len(blocks)))
  566. return tester.insertChain(blocks)
  567. }
  568. // Instrument the fetching and imported events
  569. fetching := make(chan []common.Hash)
  570. imported := make(chan interface{}, len(hashes)-1)
  571. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  572. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  573. // Announce the duplicating block, wait for retrieval, and also propagate directly
  574. tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  575. <-fetching
  576. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  577. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  578. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  579. // Fill the missing block directly as if propagated, and check import uniqueness
  580. tester.fetcher.Enqueue("valid", blocks[hashes[1]])
  581. verifyImportCount(t, imported, 2)
  582. if counter != 2 {
  583. t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
  584. }
  585. }
  586. // Tests that blocks with numbers much lower or higher than out current head get
  587. // discarded to prevent wasting resources on useless blocks from faulty peers.
  588. func TestDistantPropagationDiscarding(t *testing.T) {
  589. // Create a long chain to import and define the discard boundaries
  590. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  591. head := hashes[len(hashes)/2]
  592. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  593. // Create a tester and simulate a head block being the middle of the above chain
  594. tester := newTester(false)
  595. tester.lock.Lock()
  596. tester.hashes = []common.Hash{head}
  597. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  598. tester.lock.Unlock()
  599. // Ensure that a block with a lower number than the threshold is discarded
  600. tester.fetcher.Enqueue("lower", blocks[hashes[low]])
  601. time.Sleep(10 * time.Millisecond)
  602. if !tester.fetcher.queue.Empty() {
  603. t.Fatalf("fetcher queued stale block")
  604. }
  605. // Ensure that a block with a higher number than the threshold is discarded
  606. tester.fetcher.Enqueue("higher", blocks[hashes[high]])
  607. time.Sleep(10 * time.Millisecond)
  608. if !tester.fetcher.queue.Empty() {
  609. t.Fatalf("fetcher queued future block")
  610. }
  611. }
  612. // Tests that announcements with numbers much lower or higher than out current
  613. // head get discarded to prevent wasting resources on useless blocks from faulty
  614. // peers.
  615. func TestFullDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, false) }
  616. func TestLightDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, true) }
  617. func testDistantAnnouncementDiscarding(t *testing.T, light bool) {
  618. // Create a long chain to import and define the discard boundaries
  619. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  620. head := hashes[len(hashes)/2]
  621. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  622. // Create a tester and simulate a head block being the middle of the above chain
  623. tester := newTester(light)
  624. tester.lock.Lock()
  625. tester.hashes = []common.Hash{head}
  626. tester.headers = map[common.Hash]*types.Header{head: blocks[head].Header()}
  627. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  628. tester.lock.Unlock()
  629. headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
  630. bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
  631. fetching := make(chan struct{}, 2)
  632. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
  633. // Ensure that a block with a lower number than the threshold is discarded
  634. tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  635. select {
  636. case <-time.After(50 * time.Millisecond):
  637. case <-fetching:
  638. t.Fatalf("fetcher requested stale header")
  639. }
  640. // Ensure that a block with a higher number than the threshold is discarded
  641. tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  642. select {
  643. case <-time.After(50 * time.Millisecond):
  644. case <-fetching:
  645. t.Fatalf("fetcher requested future header")
  646. }
  647. }
  648. // Tests that peers announcing blocks with invalid numbers (i.e. not matching
  649. // the headers provided afterwards) get dropped as malicious.
  650. func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) }
  651. func TestLightInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, true) }
  652. func testInvalidNumberAnnouncement(t *testing.T, light bool) {
  653. // Create a single block to import and check numbers against
  654. hashes, blocks := makeChain(1, 0, genesis)
  655. tester := newTester(light)
  656. badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
  657. badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
  658. imported := make(chan interface{})
  659. announced := make(chan interface{}, 2)
  660. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  661. if light {
  662. if header == nil {
  663. t.Fatalf("Fetcher try to import empty header")
  664. }
  665. imported <- header
  666. } else {
  667. if block == nil {
  668. t.Fatalf("Fetcher try to import empty block")
  669. }
  670. imported <- block
  671. }
  672. }
  673. // Announce a block with a bad number, check for immediate drop
  674. tester.fetcher.announceChangeHook = func(hash common.Hash, b bool) {
  675. announced <- nil
  676. }
  677. tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
  678. verifyAnnounce := func() {
  679. for i := 0; i < 2; i++ {
  680. select {
  681. case <-announced:
  682. continue
  683. case <-time.After(1 * time.Second):
  684. t.Fatal("announce timeout")
  685. return
  686. }
  687. }
  688. }
  689. verifyAnnounce()
  690. verifyImportEvent(t, imported, false)
  691. tester.lock.RLock()
  692. dropped := tester.drops["bad"]
  693. tester.lock.RUnlock()
  694. if !dropped {
  695. t.Fatalf("peer with invalid numbered announcement not dropped")
  696. }
  697. goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
  698. goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
  699. // Make sure a good announcement passes without a drop
  700. tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
  701. verifyAnnounce()
  702. verifyImportEvent(t, imported, true)
  703. tester.lock.RLock()
  704. dropped = tester.drops["good"]
  705. tester.lock.RUnlock()
  706. if dropped {
  707. t.Fatalf("peer with valid numbered announcement dropped")
  708. }
  709. verifyImportDone(t, imported)
  710. }
  711. // Tests that if a block is empty (i.e. header only), no body request should be
  712. // made, and instead the header should be assembled into a whole block in itself.
  713. func TestEmptyBlockShortCircuit(t *testing.T) {
  714. // Create a chain of blocks to import
  715. hashes, blocks := makeChain(32, 0, genesis)
  716. tester := newTester(false)
  717. defer tester.fetcher.Stop()
  718. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  719. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  720. // Add a monitoring hook for all internal events
  721. fetching := make(chan []common.Hash)
  722. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  723. completing := make(chan []common.Hash)
  724. tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
  725. imported := make(chan interface{})
  726. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
  727. if block == nil {
  728. t.Fatalf("Fetcher try to import empty block")
  729. }
  730. imported <- block
  731. }
  732. // Iteratively announce blocks until all are imported
  733. for i := len(hashes) - 2; i >= 0; i-- {
  734. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  735. // All announces should fetch the header
  736. verifyFetchingEvent(t, fetching, true)
  737. // Only blocks with data contents should request bodies
  738. verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
  739. // Irrelevant of the construct, import should succeed
  740. verifyImportEvent(t, imported, true)
  741. }
  742. verifyImportDone(t, imported)
  743. }
  744. // Tests that a peer is unable to use unbounded memory with sending infinite
  745. // block announcements to a node, but that even in the face of such an attack,
  746. // the fetcher remains operational.
  747. func TestHashMemoryExhaustionAttack(t *testing.T) {
  748. // Create a tester with instrumented import hooks
  749. tester := newTester(false)
  750. imported, announces := make(chan interface{}), int32(0)
  751. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  752. tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
  753. if added {
  754. atomic.AddInt32(&announces, 1)
  755. } else {
  756. atomic.AddInt32(&announces, -1)
  757. }
  758. }
  759. // Create a valid chain and an infinite junk chain
  760. targetBlocks := hashLimit + 2*maxQueueDist
  761. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  762. validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  763. validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  764. attack, _ := makeChain(targetBlocks, 0, unknownBlock)
  765. attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
  766. attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
  767. // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
  768. for i := 0; i < len(attack); i++ {
  769. if i < maxQueueDist {
  770. tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
  771. }
  772. tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
  773. }
  774. if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
  775. t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
  776. }
  777. // Wait for fetches to complete
  778. verifyImportCount(t, imported, maxQueueDist)
  779. // Feed the remaining valid hashes to ensure DOS protection state remains clean
  780. for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
  781. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
  782. verifyImportEvent(t, imported, true)
  783. }
  784. verifyImportDone(t, imported)
  785. }
  786. // Tests that blocks sent to the fetcher (either through propagation or via hash
  787. // announces and retrievals) don't pile up indefinitely, exhausting available
  788. // system memory.
  789. func TestBlockMemoryExhaustionAttack(t *testing.T) {
  790. // Create a tester with instrumented import hooks
  791. tester := newTester(false)
  792. imported, enqueued := make(chan interface{}), int32(0)
  793. tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
  794. tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
  795. if added {
  796. atomic.AddInt32(&enqueued, 1)
  797. } else {
  798. atomic.AddInt32(&enqueued, -1)
  799. }
  800. }
  801. // Create a valid chain and a batch of dangling (but in range) blocks
  802. targetBlocks := hashLimit + 2*maxQueueDist
  803. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  804. attack := make(map[common.Hash]*types.Block)
  805. for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
  806. hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
  807. for _, hash := range hashes[:maxQueueDist-2] {
  808. attack[hash] = blocks[hash]
  809. }
  810. }
  811. // Try to feed all the attacker blocks make sure only a limited batch is accepted
  812. for _, block := range attack {
  813. tester.fetcher.Enqueue("attacker", block)
  814. }
  815. time.Sleep(200 * time.Millisecond)
  816. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
  817. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
  818. }
  819. // Queue up a batch of valid blocks, and check that a new peer is allowed to do so
  820. for i := 0; i < maxQueueDist-1; i++ {
  821. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
  822. }
  823. time.Sleep(100 * time.Millisecond)
  824. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
  825. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
  826. }
  827. // Insert the missing piece (and sanity check the import)
  828. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
  829. verifyImportCount(t, imported, maxQueueDist)
  830. // Insert the remaining blocks in chunks to ensure clean DOS protection
  831. for i := maxQueueDist; i < len(hashes)-1; i++ {
  832. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
  833. verifyImportEvent(t, imported, true)
  834. }
  835. verifyImportDone(t, imported)
  836. }