fetcher_test.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/params"
  30. )
  31. var (
  32. testdb, _ = ethdb.NewMemDatabase()
  33. testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
  34. testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
  35. genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
  36. unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil)
  37. )
  38. // makeChain creates a chain of n blocks starting at and including parent.
  39. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  40. // contains a transaction and every 5th an uncle to allow testing correct block
  41. // reassembly.
  42. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
  43. blocks, _ := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
  44. block.SetCoinbase(common.Address{seed})
  45. // If the block number is multiple of 3, send a bonus transaction to the miner
  46. if parent == genesis && i%3 == 0 {
  47. tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey)
  48. if err != nil {
  49. panic(err)
  50. }
  51. block.AddTx(tx)
  52. }
  53. // If the block number is a multiple of 5, add a bonus uncle to the block
  54. if i%5 == 0 {
  55. block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
  56. }
  57. })
  58. hashes := make([]common.Hash, n+1)
  59. hashes[len(hashes)-1] = parent.Hash()
  60. blockm := make(map[common.Hash]*types.Block, n+1)
  61. blockm[parent.Hash()] = parent
  62. for i, b := range blocks {
  63. hashes[len(hashes)-i-2] = b.Hash()
  64. blockm[b.Hash()] = b
  65. }
  66. return hashes, blockm
  67. }
  68. // fetcherTester is a test simulator for mocking out local block chain.
  69. type fetcherTester struct {
  70. fetcher *Fetcher
  71. hashes []common.Hash // Hash chain belonging to the tester
  72. blocks map[common.Hash]*types.Block // Blocks belonging to the tester
  73. drops map[string]bool // Map of peers dropped by the fetcher
  74. lock sync.RWMutex
  75. }
  76. // newTester creates a new fetcher test mocker.
  77. func newTester() *fetcherTester {
  78. tester := &fetcherTester{
  79. hashes: []common.Hash{genesis.Hash()},
  80. blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
  81. drops: make(map[string]bool),
  82. }
  83. tester.fetcher = New(tester.getBlock, tester.verifyBlock, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
  84. tester.fetcher.Start()
  85. return tester
  86. }
  87. // getBlock retrieves a block from the tester's block chain.
  88. func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
  89. f.lock.RLock()
  90. defer f.lock.RUnlock()
  91. return f.blocks[hash]
  92. }
  93. // verifyBlock is a nop placeholder for the block header verification.
  94. func (f *fetcherTester) verifyBlock(block *types.Block, parent *types.Block) error {
  95. return nil
  96. }
  97. // broadcastBlock is a nop placeholder for the block broadcasting.
  98. func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
  99. }
  100. // chainHeight retrieves the current height (block number) of the chain.
  101. func (f *fetcherTester) chainHeight() uint64 {
  102. f.lock.RLock()
  103. defer f.lock.RUnlock()
  104. return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
  105. }
  106. // insertChain injects a new blocks into the simulated chain.
  107. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
  108. f.lock.Lock()
  109. defer f.lock.Unlock()
  110. for i, block := range blocks {
  111. // Make sure the parent in known
  112. if _, ok := f.blocks[block.ParentHash()]; !ok {
  113. return i, errors.New("unknown parent")
  114. }
  115. // Discard any new blocks if the same height already exists
  116. if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
  117. return i, nil
  118. }
  119. // Otherwise build our current chain
  120. f.hashes = append(f.hashes, block.Hash())
  121. f.blocks[block.Hash()] = block
  122. }
  123. return 0, nil
  124. }
  125. // dropPeer is an emulator for the peer removal, simply accumulating the various
  126. // peers dropped by the fetcher.
  127. func (f *fetcherTester) dropPeer(peer string) {
  128. f.lock.Lock()
  129. defer f.lock.Unlock()
  130. f.drops[peer] = true
  131. }
  132. // makeBlockFetcher retrieves a block fetcher associated with a simulated peer.
  133. func (f *fetcherTester) makeBlockFetcher(blocks map[common.Hash]*types.Block) blockRequesterFn {
  134. closure := make(map[common.Hash]*types.Block)
  135. for hash, block := range blocks {
  136. closure[hash] = block
  137. }
  138. // Create a function that returns blocks from the closure
  139. return func(hashes []common.Hash) error {
  140. // Gather the blocks to return
  141. blocks := make([]*types.Block, 0, len(hashes))
  142. for _, hash := range hashes {
  143. if block, ok := closure[hash]; ok {
  144. blocks = append(blocks, block)
  145. }
  146. }
  147. // Return on a new thread
  148. go f.fetcher.FilterBlocks(blocks)
  149. return nil
  150. }
  151. }
  152. // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
  153. func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
  154. closure := make(map[common.Hash]*types.Block)
  155. for hash, block := range blocks {
  156. closure[hash] = block
  157. }
  158. // Create a function that return a header from the closure
  159. return func(hash common.Hash) error {
  160. // Gather the blocks to return
  161. headers := make([]*types.Header, 0, 1)
  162. if block, ok := closure[hash]; ok {
  163. headers = append(headers, block.Header())
  164. }
  165. // Return on a new thread
  166. go f.fetcher.FilterHeaders(headers, time.Now().Add(drift))
  167. return nil
  168. }
  169. }
  170. // makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
  171. func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
  172. closure := make(map[common.Hash]*types.Block)
  173. for hash, block := range blocks {
  174. closure[hash] = block
  175. }
  176. // Create a function that returns blocks from the closure
  177. return func(hashes []common.Hash) error {
  178. // Gather the block bodies to return
  179. transactions := make([][]*types.Transaction, 0, len(hashes))
  180. uncles := make([][]*types.Header, 0, len(hashes))
  181. for _, hash := range hashes {
  182. if block, ok := closure[hash]; ok {
  183. transactions = append(transactions, block.Transactions())
  184. uncles = append(uncles, block.Uncles())
  185. }
  186. }
  187. // Return on a new thread
  188. go f.fetcher.FilterBodies(transactions, uncles, time.Now().Add(drift))
  189. return nil
  190. }
  191. }
  192. // verifyFetchingEvent verifies that one single event arrive on an fetching channel.
  193. func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
  194. if arrive {
  195. select {
  196. case <-fetching:
  197. case <-time.After(time.Second):
  198. t.Fatalf("fetching timeout")
  199. }
  200. } else {
  201. select {
  202. case <-fetching:
  203. t.Fatalf("fetching invoked")
  204. case <-time.After(10 * time.Millisecond):
  205. }
  206. }
  207. }
  208. // verifyCompletingEvent verifies that one single event arrive on an completing channel.
  209. func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
  210. if arrive {
  211. select {
  212. case <-completing:
  213. case <-time.After(time.Second):
  214. t.Fatalf("completing timeout")
  215. }
  216. } else {
  217. select {
  218. case <-completing:
  219. t.Fatalf("completing invoked")
  220. case <-time.After(10 * time.Millisecond):
  221. }
  222. }
  223. }
  224. // verifyImportEvent verifies that one single event arrive on an import channel.
  225. func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
  226. if arrive {
  227. select {
  228. case <-imported:
  229. case <-time.After(time.Second):
  230. t.Fatalf("import timeout")
  231. }
  232. } else {
  233. select {
  234. case <-imported:
  235. t.Fatalf("import invoked")
  236. case <-time.After(10 * time.Millisecond):
  237. }
  238. }
  239. }
  240. // verifyImportCount verifies that exactly count number of events arrive on an
  241. // import hook channel.
  242. func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
  243. for i := 0; i < count; i++ {
  244. select {
  245. case <-imported:
  246. case <-time.After(time.Second):
  247. t.Fatalf("block %d: import timeout", i+1)
  248. }
  249. }
  250. verifyImportDone(t, imported)
  251. }
  252. // verifyImportDone verifies that no more events are arriving on an import channel.
  253. func verifyImportDone(t *testing.T, imported chan *types.Block) {
  254. select {
  255. case <-imported:
  256. t.Fatalf("extra block imported")
  257. case <-time.After(50 * time.Millisecond):
  258. }
  259. }
  260. // Tests that a fetcher accepts block announcements and initiates retrievals for
  261. // them, successfully importing into the local chain.
  262. func TestSequentialAnnouncements61(t *testing.T) { testSequentialAnnouncements(t, 61) }
  263. func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
  264. func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
  265. func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
  266. func testSequentialAnnouncements(t *testing.T, protocol int) {
  267. // Create a chain of blocks to import
  268. targetBlocks := 4 * hashLimit
  269. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  270. tester := newTester()
  271. blockFetcher := tester.makeBlockFetcher(blocks)
  272. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  273. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  274. // Iteratively announce blocks until all are imported
  275. imported := make(chan *types.Block)
  276. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  277. for i := len(hashes) - 2; i >= 0; i-- {
  278. if protocol < 62 {
  279. tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
  280. } else {
  281. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  282. }
  283. verifyImportEvent(t, imported, true)
  284. }
  285. verifyImportDone(t, imported)
  286. }
  287. // Tests that if blocks are announced by multiple peers (or even the same buggy
  288. // peer), they will only get downloaded at most once.
  289. func TestConcurrentAnnouncements61(t *testing.T) { testConcurrentAnnouncements(t, 61) }
  290. func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
  291. func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
  292. func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
  293. func testConcurrentAnnouncements(t *testing.T, protocol int) {
  294. // Create a chain of blocks to import
  295. targetBlocks := 4 * hashLimit
  296. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  297. // Assemble a tester with a built in counter for the requests
  298. tester := newTester()
  299. blockFetcher := tester.makeBlockFetcher(blocks)
  300. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  301. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  302. counter := uint32(0)
  303. blockWrapper := func(hashes []common.Hash) error {
  304. atomic.AddUint32(&counter, uint32(len(hashes)))
  305. return blockFetcher(hashes)
  306. }
  307. headerWrapper := func(hash common.Hash) error {
  308. atomic.AddUint32(&counter, 1)
  309. return headerFetcher(hash)
  310. }
  311. // Iteratively announce blocks until all are imported
  312. imported := make(chan *types.Block)
  313. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  314. for i := len(hashes) - 2; i >= 0; i-- {
  315. if protocol < 62 {
  316. tester.fetcher.Notify("first", hashes[i], 0, time.Now().Add(-arriveTimeout), blockWrapper, nil, nil)
  317. tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout+time.Millisecond), blockWrapper, nil, nil)
  318. tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout-time.Millisecond), blockWrapper, nil, nil)
  319. } else {
  320. tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerWrapper, bodyFetcher)
  321. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), nil, headerWrapper, bodyFetcher)
  322. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), nil, headerWrapper, bodyFetcher)
  323. }
  324. verifyImportEvent(t, imported, true)
  325. }
  326. verifyImportDone(t, imported)
  327. // Make sure no blocks were retrieved twice
  328. if int(counter) != targetBlocks {
  329. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
  330. }
  331. }
  332. // Tests that announcements arriving while a previous is being fetched still
  333. // results in a valid import.
  334. func TestOverlappingAnnouncements61(t *testing.T) { testOverlappingAnnouncements(t, 61) }
  335. func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
  336. func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
  337. func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
  338. func testOverlappingAnnouncements(t *testing.T, protocol int) {
  339. // Create a chain of blocks to import
  340. targetBlocks := 4 * hashLimit
  341. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  342. tester := newTester()
  343. blockFetcher := tester.makeBlockFetcher(blocks)
  344. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  345. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  346. // Iteratively announce blocks, but overlap them continuously
  347. overlap := 16
  348. imported := make(chan *types.Block, len(hashes)-1)
  349. for i := 0; i < overlap; i++ {
  350. imported <- nil
  351. }
  352. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  353. for i := len(hashes) - 2; i >= 0; i-- {
  354. if protocol < 62 {
  355. tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
  356. } else {
  357. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  358. }
  359. select {
  360. case <-imported:
  361. case <-time.After(time.Second):
  362. t.Fatalf("block %d: import timeout", len(hashes)-i)
  363. }
  364. }
  365. // Wait for all the imports to complete and check count
  366. verifyImportCount(t, imported, overlap)
  367. }
  368. // Tests that announces already being retrieved will not be duplicated.
  369. func TestPendingDeduplication61(t *testing.T) { testPendingDeduplication(t, 61) }
  370. func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
  371. func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
  372. func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
  373. func testPendingDeduplication(t *testing.T, protocol int) {
  374. // Create a hash and corresponding block
  375. hashes, blocks := makeChain(1, 0, genesis)
  376. // Assemble a tester with a built in counter and delayed fetcher
  377. tester := newTester()
  378. blockFetcher := tester.makeBlockFetcher(blocks)
  379. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  380. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  381. delay := 50 * time.Millisecond
  382. counter := uint32(0)
  383. blockWrapper := func(hashes []common.Hash) error {
  384. atomic.AddUint32(&counter, uint32(len(hashes)))
  385. // Simulate a long running fetch
  386. go func() {
  387. time.Sleep(delay)
  388. blockFetcher(hashes)
  389. }()
  390. return nil
  391. }
  392. headerWrapper := func(hash common.Hash) error {
  393. atomic.AddUint32(&counter, 1)
  394. // Simulate a long running fetch
  395. go func() {
  396. time.Sleep(delay)
  397. headerFetcher(hash)
  398. }()
  399. return nil
  400. }
  401. // Announce the same block many times until it's fetched (wait for any pending ops)
  402. for tester.getBlock(hashes[0]) == nil {
  403. if protocol < 62 {
  404. tester.fetcher.Notify("repeater", hashes[0], 0, time.Now().Add(-arriveTimeout), blockWrapper, nil, nil)
  405. } else {
  406. tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerWrapper, bodyFetcher)
  407. }
  408. time.Sleep(time.Millisecond)
  409. }
  410. time.Sleep(delay)
  411. // Check that all blocks were imported and none fetched twice
  412. if imported := len(tester.blocks); imported != 2 {
  413. t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2)
  414. }
  415. if int(counter) != 1 {
  416. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
  417. }
  418. }
  419. // Tests that announcements retrieved in a random order are cached and eventually
  420. // imported when all the gaps are filled in.
  421. func TestRandomArrivalImport61(t *testing.T) { testRandomArrivalImport(t, 61) }
  422. func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
  423. func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
  424. func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
  425. func testRandomArrivalImport(t *testing.T, protocol int) {
  426. // Create a chain of blocks to import, and choose one to delay
  427. targetBlocks := maxQueueDist
  428. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  429. skip := targetBlocks / 2
  430. tester := newTester()
  431. blockFetcher := tester.makeBlockFetcher(blocks)
  432. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  433. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  434. // Iteratively announce blocks, skipping one entry
  435. imported := make(chan *types.Block, len(hashes)-1)
  436. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  437. for i := len(hashes) - 1; i >= 0; i-- {
  438. if i != skip {
  439. if protocol < 62 {
  440. tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
  441. } else {
  442. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  443. }
  444. time.Sleep(time.Millisecond)
  445. }
  446. }
  447. // Finally announce the skipped entry and check full import
  448. if protocol < 62 {
  449. tester.fetcher.Notify("valid", hashes[skip], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
  450. } else {
  451. tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  452. }
  453. verifyImportCount(t, imported, len(hashes)-1)
  454. }
  455. // Tests that direct block enqueues (due to block propagation vs. hash announce)
  456. // are correctly schedule, filling and import queue gaps.
  457. func TestQueueGapFill61(t *testing.T) { testQueueGapFill(t, 61) }
  458. func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
  459. func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
  460. func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
  461. func testQueueGapFill(t *testing.T, protocol int) {
  462. // Create a chain of blocks to import, and choose one to not announce at all
  463. targetBlocks := maxQueueDist
  464. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  465. skip := targetBlocks / 2
  466. tester := newTester()
  467. blockFetcher := tester.makeBlockFetcher(blocks)
  468. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  469. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  470. // Iteratively announce blocks, skipping one entry
  471. imported := make(chan *types.Block, len(hashes)-1)
  472. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  473. for i := len(hashes) - 1; i >= 0; i-- {
  474. if i != skip {
  475. if protocol < 62 {
  476. tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
  477. } else {
  478. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  479. }
  480. time.Sleep(time.Millisecond)
  481. }
  482. }
  483. // Fill the missing block directly as if propagated
  484. tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
  485. verifyImportCount(t, imported, len(hashes)-1)
  486. }
  487. // Tests that blocks arriving from various sources (multiple propagations, hash
  488. // announces, etc) do not get scheduled for import multiple times.
  489. func TestImportDeduplication61(t *testing.T) { testImportDeduplication(t, 61) }
  490. func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
  491. func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
  492. func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
  493. func testImportDeduplication(t *testing.T, protocol int) {
  494. // Create two blocks to import (one for duplication, the other for stalling)
  495. hashes, blocks := makeChain(2, 0, genesis)
  496. // Create the tester and wrap the importer with a counter
  497. tester := newTester()
  498. blockFetcher := tester.makeBlockFetcher(blocks)
  499. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  500. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  501. counter := uint32(0)
  502. tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
  503. atomic.AddUint32(&counter, uint32(len(blocks)))
  504. return tester.insertChain(blocks)
  505. }
  506. // Instrument the fetching and imported events
  507. fetching := make(chan []common.Hash)
  508. imported := make(chan *types.Block, len(hashes)-1)
  509. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  510. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  511. // Announce the duplicating block, wait for retrieval, and also propagate directly
  512. if protocol < 62 {
  513. tester.fetcher.Notify("valid", hashes[0], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
  514. } else {
  515. tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  516. }
  517. <-fetching
  518. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  519. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  520. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  521. // Fill the missing block directly as if propagated, and check import uniqueness
  522. tester.fetcher.Enqueue("valid", blocks[hashes[1]])
  523. verifyImportCount(t, imported, 2)
  524. if counter != 2 {
  525. t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
  526. }
  527. }
  528. // Tests that blocks with numbers much lower or higher than out current head get
  529. // discarded to prevent wasting resources on useless blocks from faulty peers.
  530. func TestDistantPropagationDiscarding(t *testing.T) {
  531. // Create a long chain to import and define the discard boundaries
  532. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  533. head := hashes[len(hashes)/2]
  534. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  535. // Create a tester and simulate a head block being the middle of the above chain
  536. tester := newTester()
  537. tester.lock.Lock()
  538. tester.hashes = []common.Hash{head}
  539. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  540. tester.lock.Unlock()
  541. // Ensure that a block with a lower number than the threshold is discarded
  542. tester.fetcher.Enqueue("lower", blocks[hashes[low]])
  543. time.Sleep(10 * time.Millisecond)
  544. if !tester.fetcher.queue.Empty() {
  545. t.Fatalf("fetcher queued stale block")
  546. }
  547. // Ensure that a block with a higher number than the threshold is discarded
  548. tester.fetcher.Enqueue("higher", blocks[hashes[high]])
  549. time.Sleep(10 * time.Millisecond)
  550. if !tester.fetcher.queue.Empty() {
  551. t.Fatalf("fetcher queued future block")
  552. }
  553. }
  554. // Tests that announcements with numbers much lower or higher than out current
  555. // head get discarded to prevent wasting resources on useless blocks from faulty
  556. // peers.
  557. func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) }
  558. func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) }
  559. func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) }
  560. func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
  561. // Create a long chain to import and define the discard boundaries
  562. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  563. head := hashes[len(hashes)/2]
  564. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  565. // Create a tester and simulate a head block being the middle of the above chain
  566. tester := newTester()
  567. tester.lock.Lock()
  568. tester.hashes = []common.Hash{head}
  569. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  570. tester.lock.Unlock()
  571. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  572. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  573. fetching := make(chan struct{}, 2)
  574. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
  575. // Ensure that a block with a lower number than the threshold is discarded
  576. tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  577. select {
  578. case <-time.After(50 * time.Millisecond):
  579. case <-fetching:
  580. t.Fatalf("fetcher requested stale header")
  581. }
  582. // Ensure that a block with a higher number than the threshold is discarded
  583. tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  584. select {
  585. case <-time.After(50 * time.Millisecond):
  586. case <-fetching:
  587. t.Fatalf("fetcher requested future header")
  588. }
  589. }
  590. // Tests that peers announcing blocks with invalid numbers (i.e. not matching
  591. // the headers provided afterwards) get dropped as malicious.
  592. func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) }
  593. func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) }
  594. func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) }
  595. func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
  596. // Create a single block to import and check numbers against
  597. hashes, blocks := makeChain(1, 0, genesis)
  598. tester := newTester()
  599. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  600. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  601. imported := make(chan *types.Block)
  602. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  603. // Announce a block with a bad number, check for immediate drop
  604. tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  605. verifyImportEvent(t, imported, false)
  606. tester.lock.RLock()
  607. dropped := tester.drops["bad"]
  608. tester.lock.RUnlock()
  609. if !dropped {
  610. t.Fatalf("peer with invalid numbered announcement not dropped")
  611. }
  612. // Make sure a good announcement passes without a drop
  613. tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  614. verifyImportEvent(t, imported, true)
  615. tester.lock.RLock()
  616. dropped = tester.drops["good"]
  617. tester.lock.RUnlock()
  618. if dropped {
  619. t.Fatalf("peer with valid numbered announcement dropped")
  620. }
  621. verifyImportDone(t, imported)
  622. }
  623. // Tests that if a block is empty (i.e. header only), no body request should be
  624. // made, and instead the header should be assembled into a whole block in itself.
  625. func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
  626. func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
  627. func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
  628. func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
  629. // Create a chain of blocks to import
  630. hashes, blocks := makeChain(32, 0, genesis)
  631. tester := newTester()
  632. headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  633. bodyFetcher := tester.makeBodyFetcher(blocks, 0)
  634. // Add a monitoring hook for all internal events
  635. fetching := make(chan []common.Hash)
  636. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  637. completing := make(chan []common.Hash)
  638. tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
  639. imported := make(chan *types.Block)
  640. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  641. // Iteratively announce blocks until all are imported
  642. for i := len(hashes) - 2; i >= 0; i-- {
  643. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
  644. // All announces should fetch the header
  645. verifyFetchingEvent(t, fetching, true)
  646. // Only blocks with data contents should request bodies
  647. verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
  648. // Irrelevant of the construct, import should succeed
  649. verifyImportEvent(t, imported, true)
  650. }
  651. verifyImportDone(t, imported)
  652. }
  653. // Tests that a peer is unable to use unbounded memory with sending infinite
  654. // block announcements to a node, but that even in the face of such an attack,
  655. // the fetcher remains operational.
  656. func TestHashMemoryExhaustionAttack61(t *testing.T) { testHashMemoryExhaustionAttack(t, 61) }
  657. func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
  658. func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
  659. func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
  660. func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
  661. // Create a tester with instrumented import hooks
  662. tester := newTester()
  663. imported, announces := make(chan *types.Block), int32(0)
  664. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  665. tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
  666. if added {
  667. atomic.AddInt32(&announces, 1)
  668. } else {
  669. atomic.AddInt32(&announces, -1)
  670. }
  671. }
  672. // Create a valid chain and an infinite junk chain
  673. targetBlocks := hashLimit + 2*maxQueueDist
  674. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  675. validBlockFetcher := tester.makeBlockFetcher(blocks)
  676. validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
  677. validBodyFetcher := tester.makeBodyFetcher(blocks, 0)
  678. attack, _ := makeChain(targetBlocks, 0, unknownBlock)
  679. attackerBlockFetcher := tester.makeBlockFetcher(nil)
  680. attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack)
  681. attackerBodyFetcher := tester.makeBodyFetcher(nil, 0)
  682. // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
  683. for i := 0; i < len(attack); i++ {
  684. if i < maxQueueDist {
  685. if protocol < 62 {
  686. tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], 0, time.Now(), validBlockFetcher, nil, nil)
  687. } else {
  688. tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), nil, validHeaderFetcher, validBodyFetcher)
  689. }
  690. }
  691. if protocol < 62 {
  692. tester.fetcher.Notify("attacker", attack[i], 0, time.Now(), attackerBlockFetcher, nil, nil)
  693. } else {
  694. tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), nil, attackerHeaderFetcher, attackerBodyFetcher)
  695. }
  696. }
  697. if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
  698. t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
  699. }
  700. // Wait for fetches to complete
  701. verifyImportCount(t, imported, maxQueueDist)
  702. // Feed the remaining valid hashes to ensure DOS protection state remains clean
  703. for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
  704. if protocol < 62 {
  705. tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), validBlockFetcher, nil, nil)
  706. } else {
  707. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, validHeaderFetcher, validBodyFetcher)
  708. }
  709. verifyImportEvent(t, imported, true)
  710. }
  711. verifyImportDone(t, imported)
  712. }
  713. // Tests that blocks sent to the fetcher (either through propagation or via hash
  714. // announces and retrievals) don't pile up indefinitely, exhausting available
  715. // system memory.
  716. func TestBlockMemoryExhaustionAttack(t *testing.T) {
  717. // Create a tester with instrumented import hooks
  718. tester := newTester()
  719. imported, enqueued := make(chan *types.Block), int32(0)
  720. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  721. tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
  722. if added {
  723. atomic.AddInt32(&enqueued, 1)
  724. } else {
  725. atomic.AddInt32(&enqueued, -1)
  726. }
  727. }
  728. // Create a valid chain and a batch of dangling (but in range) blocks
  729. targetBlocks := hashLimit + 2*maxQueueDist
  730. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  731. attack := make(map[common.Hash]*types.Block)
  732. for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
  733. hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
  734. for _, hash := range hashes[:maxQueueDist-2] {
  735. attack[hash] = blocks[hash]
  736. }
  737. }
  738. // Try to feed all the attacker blocks make sure only a limited batch is accepted
  739. for _, block := range attack {
  740. tester.fetcher.Enqueue("attacker", block)
  741. }
  742. time.Sleep(200 * time.Millisecond)
  743. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
  744. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
  745. }
  746. // Queue up a batch of valid blocks, and check that a new peer is allowed to do so
  747. for i := 0; i < maxQueueDist-1; i++ {
  748. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
  749. }
  750. time.Sleep(100 * time.Millisecond)
  751. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
  752. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
  753. }
  754. // Insert the missing piece (and sanity check the import)
  755. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
  756. verifyImportCount(t, imported, maxQueueDist)
  757. // Insert the remaining blocks in chunks to ensure clean DOS protection
  758. for i := maxQueueDist; i < len(hashes)-1; i++ {
  759. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
  760. verifyImportEvent(t, imported, true)
  761. }
  762. verifyImportDone(t, imported)
  763. }