fetcher_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/params"
  30. )
  31. var (
  32. testdb, _ = ethdb.NewMemDatabase()
  33. testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
  34. testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
  35. genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
  36. unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil)
  37. )
  38. // makeChain creates a chain of n blocks starting at and including parent.
  39. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  40. // contains a transaction and every 5th an uncle to allow testing correct block
  41. // reassembly.
  42. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
  43. blocks, _ := core.GenerateChain(params.TestChainConfig, parent, testdb, n, func(i int, block *core.BlockGen) {
  44. block.SetCoinbase(common.Address{seed})
  45. // If the block number is multiple of 3, send a bonus transaction to the miner
  46. if parent == genesis && i%3 == 0 {
  47. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  48. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
  49. if err != nil {
  50. panic(err)
  51. }
  52. block.AddTx(tx)
  53. }
  54. // If the block number is a multiple of 5, add a bonus uncle to the block
  55. if i%5 == 0 {
  56. block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
  57. }
  58. })
  59. hashes := make([]common.Hash, n+1)
  60. hashes[len(hashes)-1] = parent.Hash()
  61. blockm := make(map[common.Hash]*types.Block, n+1)
  62. blockm[parent.Hash()] = parent
  63. for i, b := range blocks {
  64. hashes[len(hashes)-i-2] = b.Hash()
  65. blockm[b.Hash()] = b
  66. }
  67. return hashes, blockm
  68. }
  69. // fetcherTester is a test simulator for mocking out local block chain.
  70. type fetcherTester struct {
  71. fetcher *Fetcher
  72. hashes []common.Hash // Hash chain belonging to the tester
  73. blocks map[common.Hash]*types.Block // Blocks belonging to the tester
  74. drops map[string]bool // Map of peers dropped by the fetcher
  75. lock sync.RWMutex
  76. }
  77. // newTester creates a new fetcher test mocker.
  78. func newTester() *fetcherTester {
  79. tester := &fetcherTester{
  80. hashes: []common.Hash{genesis.Hash()},
  81. blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
  82. drops: make(map[string]bool),
  83. }
  84. tester.fetcher = New(tester.getBlock, tester.verifyBlock, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
  85. tester.fetcher.Start()
  86. return tester
  87. }
  88. // getBlock retrieves a block from the tester's block chain.
  89. func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
  90. f.lock.RLock()
  91. defer f.lock.RUnlock()
  92. return f.blocks[hash]
  93. }
  94. // verifyBlock is a nop placeholder for the block header verification.
  95. func (f *fetcherTester) verifyBlock(block *types.Block, parent *types.Block) error {
  96. return nil
  97. }
  98. // broadcastBlock is a nop placeholder for the block broadcasting.
  99. func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
  100. }
  101. // chainHeight retrieves the current height (block number) of the chain.
  102. func (f *fetcherTester) chainHeight() uint64 {
  103. f.lock.RLock()
  104. defer f.lock.RUnlock()
  105. return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
  106. }
  107. // insertChain injects a new blocks into the simulated chain.
  108. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
  109. f.lock.Lock()
  110. defer f.lock.Unlock()
  111. for i, block := range blocks {
  112. // Make sure the parent in known
  113. if _, ok := f.blocks[block.ParentHash()]; !ok {
  114. return i, errors.New("unknown parent")
  115. }
  116. // Discard any new blocks if the same height already exists
  117. if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
  118. return i, nil
  119. }
  120. // Otherwise build our current chain
  121. f.hashes = append(f.hashes, block.Hash())
  122. f.blocks[block.Hash()] = block
  123. }
  124. return 0, nil
  125. }
  126. // dropPeer is an emulator for the peer removal, simply accumulating the various
  127. // peers dropped by the fetcher.
  128. func (f *fetcherTester) dropPeer(peer string) {
  129. f.lock.Lock()
  130. defer f.lock.Unlock()
  131. f.drops[peer] = true
  132. }
  133. // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
  134. func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
  135. closure := make(map[common.Hash]*types.Block)
  136. for hash, block := range blocks {
  137. closure[hash] = block
  138. }
  139. // Create a function that return a header from the closure
  140. return func(hash common.Hash) error {
  141. // Gather the blocks to return
  142. headers := make([]*types.Header, 0, 1)
  143. if block, ok := closure[hash]; ok {
  144. headers = append(headers, block.Header())
  145. }
  146. // Return on a new thread
  147. go f.fetcher.FilterHeaders(headers, time.Now().Add(drift))
  148. return nil
  149. }
  150. }
  151. // makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
  152. func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
  153. closure := make(map[common.Hash]*types.Block)
  154. for hash, block := range blocks {
  155. closure[hash] = block
  156. }
  157. // Create a function that returns blocks from the closure
  158. return func(hashes []common.Hash) error {
  159. // Gather the block bodies to return
  160. transactions := make([][]*types.Transaction, 0, len(hashes))
  161. uncles := make([][]*types.Header, 0, len(hashes))
  162. for _, hash := range hashes {
  163. if block, ok := closure[hash]; ok {
  164. transactions = append(transactions, block.Transactions())
  165. uncles = append(uncles, block.Uncles())
  166. }
  167. }
  168. // Return on a new thread
  169. go f.fetcher.FilterBodies(transactions, uncles, time.Now().Add(drift))
  170. return nil
  171. }
  172. }
  173. // verifyFetchingEvent verifies that one single event arrive on an fetching channel.
  174. func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
  175. if arrive {
  176. select {
  177. case <-fetching:
  178. case <-time.After(time.Second):
  179. t.Fatalf("fetching timeout")
  180. }
  181. } else {
  182. select {
  183. case <-fetching:
  184. t.Fatalf("fetching invoked")
  185. case <-time.After(10 * time.Millisecond):
  186. }
  187. }
  188. }
  189. // verifyCompletingEvent verifies that one single event arrive on an completing channel.
  190. func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
  191. if arrive {
  192. select {
  193. case <-completing:
  194. case <-time.After(time.Second):
  195. t.Fatalf("completing timeout")
  196. }
  197. } else {
  198. select {
  199. case <-completing:
  200. t.Fatalf("completing invoked")
  201. case <-time.After(10 * time.Millisecond):
  202. }
  203. }
  204. }
  205. // verifyImportEvent verifies that one single event arrive on an import channel.
  206. func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
  207. if arrive {
  208. select {
  209. case <-imported:
  210. case <-time.After(time.Second):
  211. t.Fatalf("import timeout")
  212. }
  213. } else {
  214. select {
  215. case <-imported:
  216. t.Fatalf("import invoked")
  217. case <-time.After(10 * time.Millisecond):
  218. }
  219. }
  220. }
  221. // verifyImportCount verifies that exactly count number of events arrive on an
  222. // import hook channel.
  223. func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
  224. for i := 0; i < count; i++ {
  225. select {
  226. case <-imported:
  227. case <-time.After(time.Second):
  228. t.Fatalf("block %d: import timeout", i+1)
  229. }
  230. }
  231. verifyImportDone(t, imported)
  232. }
  233. // verifyImportDone verifies that no more events are arriving on an import channel.
  234. func verifyImportDone(t *testing.T, imported chan *types.Block) {
  235. select {
  236. case <-imported:
  237. t.Fatalf("extra block imported")
  238. case <-time.After(50 * time.Millisecond):
  239. }
  240. }
  241. // Tests that a fetcher accepts block announcements and initiates retrievals for
  242. // them, successfully importing into the local chain.
  243. func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
  244. func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
  245. func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
  246. func testSequentialAnnouncements(t *testing.T, protocol int) {
  247. // Create a chain of blocks to import
  248. targetBlocks := 4 * hashLimit
  249. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  250. tester := newTester()
  251. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  252. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  253. // Iteratively announce blocks until all are imported
  254. imported := make(chan *types.Block)
  255. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  256. for i := len(hashes) - 2; i >= 0; i-- {
  257. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  258. verifyImportEvent(t, imported, true)
  259. }
  260. verifyImportDone(t, imported)
  261. }
  262. // Tests that if blocks are announced by multiple peers (or even the same buggy
  263. // peer), they will only get downloaded at most once.
  264. func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
  265. func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
  266. func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
  267. func testConcurrentAnnouncements(t *testing.T, protocol int) {
  268. // Create a chain of blocks to import
  269. targetBlocks := 4 * hashLimit
  270. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  271. // Assemble a tester with a built in counter for the requests
  272. tester := newTester()
  273. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  274. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  275. counter := uint32(0)
  276. headerWrapper := func(hash common.Hash) error {
  277. atomic.AddUint32(&counter, 1)
  278. return headerFetcher(hash)
  279. }
  280. // Iteratively announce blocks until all are imported
  281. imported := make(chan *types.Block)
  282. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  283. for i := len(hashes) - 2; i >= 0; i-- {
  284. tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
  285. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), headerWrapper, bodyFetcher)
  286. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), headerWrapper, bodyFetcher)
  287. verifyImportEvent(t, imported, true)
  288. }
  289. verifyImportDone(t, imported)
  290. // Make sure no blocks were retrieved twice
  291. if int(counter) != targetBlocks {
  292. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
  293. }
  294. }
  295. // Tests that announcements arriving while a previous is being fetched still
  296. // results in a valid import.
  297. func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
  298. func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
  299. func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
  300. func testOverlappingAnnouncements(t *testing.T, protocol int) {
  301. // Create a chain of blocks to import
  302. targetBlocks := 4 * hashLimit
  303. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  304. tester := newTester()
  305. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  306. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  307. // Iteratively announce blocks, but overlap them continuously
  308. overlap := 16
  309. imported := make(chan *types.Block, len(hashes)-1)
  310. for i := 0; i < overlap; i++ {
  311. imported <- nil
  312. }
  313. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  314. for i := len(hashes) - 2; i >= 0; i-- {
  315. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  316. select {
  317. case <-imported:
  318. case <-time.After(time.Second):
  319. t.Fatalf("block %d: import timeout", len(hashes)-i)
  320. }
  321. }
  322. // Wait for all the imports to complete and check count
  323. verifyImportCount(t, imported, overlap)
  324. }
  325. // Tests that announces already being retrieved will not be duplicated.
  326. func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
  327. func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
  328. func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
  329. func testPendingDeduplication(t *testing.T, protocol int) {
  330. // Create a hash and corresponding block
  331. hashes, blocks := makeChain(1, 0, genesis)
  332. // Assemble a tester with a built in counter and delayed fetcher
  333. tester := newTester()
  334. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  335. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  336. delay := 50 * time.Millisecond
  337. counter := uint32(0)
  338. headerWrapper := func(hash common.Hash) error {
  339. atomic.AddUint32(&counter, 1)
  340. // Simulate a long running fetch
  341. go func() {
  342. time.Sleep(delay)
  343. headerFetcher(hash)
  344. }()
  345. return nil
  346. }
  347. // Announce the same block many times until it's fetched (wait for any pending ops)
  348. for tester.getBlock(hashes[0]) == nil {
  349. tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
  350. time.Sleep(time.Millisecond)
  351. }
  352. time.Sleep(delay)
  353. // Check that all blocks were imported and none fetched twice
  354. if imported := len(tester.blocks); imported != 2 {
  355. t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2)
  356. }
  357. if int(counter) != 1 {
  358. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
  359. }
  360. }
  361. // Tests that announcements retrieved in a random order are cached and eventually
  362. // imported when all the gaps are filled in.
  363. func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
  364. func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
  365. func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
  366. func testRandomArrivalImport(t *testing.T, protocol int) {
  367. // Create a chain of blocks to import, and choose one to delay
  368. targetBlocks := maxQueueDist
  369. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  370. skip := targetBlocks / 2
  371. tester := newTester()
  372. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  373. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  374. // Iteratively announce blocks, skipping one entry
  375. imported := make(chan *types.Block, len(hashes)-1)
  376. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  377. for i := len(hashes) - 1; i >= 0; i-- {
  378. if i != skip {
  379. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  380. time.Sleep(time.Millisecond)
  381. }
  382. }
  383. // Finally announce the skipped entry and check full import
  384. tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  385. verifyImportCount(t, imported, len(hashes)-1)
  386. }
  387. // Tests that direct block enqueues (due to block propagation vs. hash announce)
  388. // are correctly schedule, filling and import queue gaps.
  389. func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
  390. func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
  391. func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
  392. func testQueueGapFill(t *testing.T, protocol int) {
  393. // Create a chain of blocks to import, and choose one to not announce at all
  394. targetBlocks := maxQueueDist
  395. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  396. skip := targetBlocks / 2
  397. tester := newTester()
  398. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  399. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  400. // Iteratively announce blocks, skipping one entry
  401. imported := make(chan *types.Block, len(hashes)-1)
  402. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  403. for i := len(hashes) - 1; i >= 0; i-- {
  404. if i != skip {
  405. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  406. time.Sleep(time.Millisecond)
  407. }
  408. }
  409. // Fill the missing block directly as if propagated
  410. tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
  411. verifyImportCount(t, imported, len(hashes)-1)
  412. }
  413. // Tests that blocks arriving from various sources (multiple propagations, hash
  414. // announces, etc) do not get scheduled for import multiple times.
  415. func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
  416. func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
  417. func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
  418. func testImportDeduplication(t *testing.T, protocol int) {
  419. // Create two blocks to import (one for duplication, the other for stalling)
  420. hashes, blocks := makeChain(2, 0, genesis)
  421. // Create the tester and wrap the importer with a counter
  422. tester := newTester()
  423. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  424. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  425. counter := uint32(0)
  426. tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
  427. atomic.AddUint32(&counter, uint32(len(blocks)))
  428. return tester.insertChain(blocks)
  429. }
  430. // Instrument the fetching and imported events
  431. fetching := make(chan []common.Hash)
  432. imported := make(chan *types.Block, len(hashes)-1)
  433. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  434. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  435. // Announce the duplicating block, wait for retrieval, and also propagate directly
  436. tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  437. <-fetching
  438. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  439. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  440. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  441. // Fill the missing block directly as if propagated, and check import uniqueness
  442. tester.fetcher.Enqueue("valid", blocks[hashes[1]])
  443. verifyImportCount(t, imported, 2)
  444. if counter != 2 {
  445. t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
  446. }
  447. }
  448. // Tests that blocks with numbers much lower or higher than out current head get
  449. // discarded to prevent wasting resources on useless blocks from faulty peers.
  450. func TestDistantPropagationDiscarding(t *testing.T) {
  451. // Create a long chain to import and define the discard boundaries
  452. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  453. head := hashes[len(hashes)/2]
  454. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  455. // Create a tester and simulate a head block being the middle of the above chain
  456. tester := newTester()
  457. tester.lock.Lock()
  458. tester.hashes = []common.Hash{head}
  459. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  460. tester.lock.Unlock()
  461. // Ensure that a block with a lower number than the threshold is discarded
  462. tester.fetcher.Enqueue("lower", blocks[hashes[low]])
  463. time.Sleep(10 * time.Millisecond)
  464. if !tester.fetcher.queue.Empty() {
  465. t.Fatalf("fetcher queued stale block")
  466. }
  467. // Ensure that a block with a higher number than the threshold is discarded
  468. tester.fetcher.Enqueue("higher", blocks[hashes[high]])
  469. time.Sleep(10 * time.Millisecond)
  470. if !tester.fetcher.queue.Empty() {
  471. t.Fatalf("fetcher queued future block")
  472. }
  473. }
  474. // Tests that announcements with numbers much lower or higher than out current
  475. // head get discarded to prevent wasting resources on useless blocks from faulty
  476. // peers.
  477. func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) }
  478. func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) }
  479. func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) }
  480. func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
  481. // Create a long chain to import and define the discard boundaries
  482. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  483. head := hashes[len(hashes)/2]
  484. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  485. // Create a tester and simulate a head block being the middle of the above chain
  486. tester := newTester()
  487. tester.lock.Lock()
  488. tester.hashes = []common.Hash{head}
  489. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  490. tester.lock.Unlock()
  491. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  492. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  493. fetching := make(chan struct{}, 2)
  494. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
  495. // Ensure that a block with a lower number than the threshold is discarded
  496. tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  497. select {
  498. case <-time.After(50 * time.Millisecond):
  499. case <-fetching:
  500. t.Fatalf("fetcher requested stale header")
  501. }
  502. // Ensure that a block with a higher number than the threshold is discarded
  503. tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  504. select {
  505. case <-time.After(50 * time.Millisecond):
  506. case <-fetching:
  507. t.Fatalf("fetcher requested future header")
  508. }
  509. }
  510. // Tests that peers announcing blocks with invalid numbers (i.e. not matching
  511. // the headers provided afterwards) get dropped as malicious.
  512. func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) }
  513. func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) }
  514. func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) }
  515. func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
  516. // Create a single block to import and check numbers against
  517. hashes, blocks := makeChain(1, 0, genesis)
  518. tester := newTester()
  519. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  520. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  521. imported := make(chan *types.Block)
  522. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  523. // Announce a block with a bad number, check for immediate drop
  524. tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  525. verifyImportEvent(t, imported, false)
  526. tester.lock.RLock()
  527. dropped := tester.drops["bad"]
  528. tester.lock.RUnlock()
  529. if !dropped {
  530. t.Fatalf("peer with invalid numbered announcement not dropped")
  531. }
  532. // Make sure a good announcement passes without a drop
  533. tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  534. verifyImportEvent(t, imported, true)
  535. tester.lock.RLock()
  536. dropped = tester.drops["good"]
  537. tester.lock.RUnlock()
  538. if dropped {
  539. t.Fatalf("peer with valid numbered announcement dropped")
  540. }
  541. verifyImportDone(t, imported)
  542. }
  543. // Tests that if a block is empty (i.e. header only), no body request should be
  544. // made, and instead the header should be assembled into a whole block in itself.
  545. func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
  546. func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
  547. func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
  548. func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
  549. // Create a chain of blocks to import
  550. hashes, blocks := makeChain(32, 0, genesis)
  551. tester := newTester()
  552. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  553. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  554. // Add a monitoring hook for all internal events
  555. fetching := make(chan []common.Hash)
  556. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  557. completing := make(chan []common.Hash)
  558. tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
  559. imported := make(chan *types.Block)
  560. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  561. // Iteratively announce blocks until all are imported
  562. for i := len(hashes) - 2; i >= 0; i-- {
  563. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  564. // All announces should fetch the header
  565. verifyFetchingEvent(t, fetching, true)
  566. // Only blocks with data contents should request bodies
  567. verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
  568. // Irrelevant of the construct, import should succeed
  569. verifyImportEvent(t, imported, true)
  570. }
  571. verifyImportDone(t, imported)
  572. }
  573. // Tests that a peer is unable to use unbounded memory with sending infinite
  574. // block announcements to a node, but that even in the face of such an attack,
  575. // the fetcher remains operational.
  576. func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
  577. func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
  578. func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
  579. func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
  580. // Create a tester with instrumented import hooks
  581. tester := newTester()
  582. imported, announces := make(chan *types.Block), int32(0)
  583. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  584. tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
  585. if added {
  586. atomic.AddInt32(&announces, 1)
  587. } else {
  588. atomic.AddInt32(&announces, -1)
  589. }
  590. }
  591. // Create a valid chain and an infinite junk chain
  592. targetBlocks := hashLimit + 2*maxQueueDist
  593. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  594. validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  595. validBodyFetcher := tester.makeBodyFetcher(blocks, 0)
  596. attack, _ := makeChain(targetBlocks, 0, unknownBlock)
  597. attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack)
  598. attackerBodyFetcher := tester.makeBodyFetcher(nil, 0)
  599. // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
  600. for i := 0; i < len(attack); i++ {
  601. if i < maxQueueDist {
  602. tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
  603. }
  604. tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
  605. }
  606. if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
  607. t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
  608. }
  609. // Wait for fetches to complete
  610. verifyImportCount(t, imported, maxQueueDist)
  611. // Feed the remaining valid hashes to ensure DOS protection state remains clean
  612. for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
  613. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
  614. verifyImportEvent(t, imported, true)
  615. }
  616. verifyImportDone(t, imported)
  617. }
  618. // Tests that blocks sent to the fetcher (either through propagation or via hash
  619. // announces and retrievals) don't pile up indefinitely, exhausting available
  620. // system memory.
  621. func TestBlockMemoryExhaustionAttack(t *testing.T) {
  622. // Create a tester with instrumented import hooks
  623. tester := newTester()
  624. imported, enqueued := make(chan *types.Block), int32(0)
  625. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  626. tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
  627. if added {
  628. atomic.AddInt32(&enqueued, 1)
  629. } else {
  630. atomic.AddInt32(&enqueued, -1)
  631. }
  632. }
  633. // Create a valid chain and a batch of dangling (but in range) blocks
  634. targetBlocks := hashLimit + 2*maxQueueDist
  635. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  636. attack := make(map[common.Hash]*types.Block)
  637. for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
  638. hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
  639. for _, hash := range hashes[:maxQueueDist-2] {
  640. attack[hash] = blocks[hash]
  641. }
  642. }
  643. // Try to feed all the attacker blocks make sure only a limited batch is accepted
  644. for _, block := range attack {
  645. tester.fetcher.Enqueue("attacker", block)
  646. }
  647. time.Sleep(200 * time.Millisecond)
  648. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
  649. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
  650. }
  651. // Queue up a batch of valid blocks, and check that a new peer is allowed to do so
  652. for i := 0; i < maxQueueDist-1; i++ {
  653. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
  654. }
  655. time.Sleep(100 * time.Millisecond)
  656. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
  657. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
  658. }
  659. // Insert the missing piece (and sanity check the import)
  660. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
  661. verifyImportCount(t, imported, maxQueueDist)
  662. // Insert the remaining blocks in chunks to ensure clean DOS protection
  663. for i := maxQueueDist; i < len(hashes)-1; i++ {
  664. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
  665. verifyImportEvent(t, imported, true)
  666. }
  667. verifyImportDone(t, imported)
  668. }