downloader_test.go 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "strings"
  22. "sync"
  23. "sync/atomic"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/core/rawdb"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/trie"
  33. )
  34. // Reduce some of the parameters to make the tester faster.
  35. func init() {
  36. maxForkAncestry = 10000
  37. blockCacheItems = 1024
  38. fsHeaderContCheck = 500 * time.Millisecond
  39. }
  40. // downloadTester is a test simulator for mocking out local block chain.
  41. type downloadTester struct {
  42. downloader *Downloader
  43. genesis *types.Block // Genesis blocks used by the tester and peers
  44. stateDb ethdb.Database // Database used by the tester for syncing from peers
  45. peerDb ethdb.Database // Database of the peers containing all data
  46. peers map[string]*downloadTesterPeer
  47. ownHashes []common.Hash // Hash chain belonging to the tester
  48. ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
  49. ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
  50. ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
  51. ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
  52. ancientHeaders map[common.Hash]*types.Header // Ancient headers belonging to the tester
  53. ancientBlocks map[common.Hash]*types.Block // Ancient blocks belonging to the tester
  54. ancientReceipts map[common.Hash]types.Receipts // Ancient receipts belonging to the tester
  55. ancientChainTd map[common.Hash]*big.Int // Ancient total difficulties of the blocks in the local chain
  56. lock sync.RWMutex
  57. }
  58. // newTester creates a new downloader test mocker.
  59. func newTester() *downloadTester {
  60. tester := &downloadTester{
  61. genesis: testGenesis,
  62. peerDb: testDB,
  63. peers: make(map[string]*downloadTesterPeer),
  64. ownHashes: []common.Hash{testGenesis.Hash()},
  65. ownHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
  66. ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
  67. ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
  68. ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
  69. // Initialize ancient store with test genesis block
  70. ancientHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
  71. ancientBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
  72. ancientReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
  73. ancientChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
  74. }
  75. tester.stateDb = rawdb.NewMemoryDatabase()
  76. tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
  77. tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer)
  78. return tester
  79. }
  80. // terminate aborts any operations on the embedded downloader and releases all
  81. // held resources.
  82. func (dl *downloadTester) terminate() {
  83. dl.downloader.Terminate()
  84. }
  85. // sync starts synchronizing with a remote peer, blocking until it completes.
  86. func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
  87. dl.lock.RLock()
  88. hash := dl.peers[id].chain.headBlock().Hash()
  89. // If no particular TD was requested, load from the peer's blockchain
  90. if td == nil {
  91. td = dl.peers[id].chain.td(hash)
  92. }
  93. dl.lock.RUnlock()
  94. // Synchronise with the chosen peer and ensure proper cleanup afterwards
  95. err := dl.downloader.synchronise(id, hash, td, mode)
  96. select {
  97. case <-dl.downloader.cancelCh:
  98. // Ok, downloader fully cancelled after sync cycle
  99. default:
  100. // Downloader is still accepting packets, can block a peer up
  101. panic("downloader active post sync cycle") // panic will be caught by tester
  102. }
  103. return err
  104. }
  105. // HasHeader checks if a header is present in the testers canonical chain.
  106. func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
  107. return dl.GetHeaderByHash(hash) != nil
  108. }
  109. // HasBlock checks if a block is present in the testers canonical chain.
  110. func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool {
  111. return dl.GetBlockByHash(hash) != nil
  112. }
  113. // HasFastBlock checks if a block is present in the testers canonical chain.
  114. func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool {
  115. dl.lock.RLock()
  116. defer dl.lock.RUnlock()
  117. if _, ok := dl.ancientReceipts[hash]; ok {
  118. return true
  119. }
  120. _, ok := dl.ownReceipts[hash]
  121. return ok
  122. }
  123. // GetHeader retrieves a header from the testers canonical chain.
  124. func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
  125. dl.lock.RLock()
  126. defer dl.lock.RUnlock()
  127. header := dl.ancientHeaders[hash]
  128. if header != nil {
  129. return header
  130. }
  131. return dl.ownHeaders[hash]
  132. }
  133. // GetBlock retrieves a block from the testers canonical chain.
  134. func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
  135. dl.lock.RLock()
  136. defer dl.lock.RUnlock()
  137. block := dl.ancientBlocks[hash]
  138. if block != nil {
  139. return block
  140. }
  141. return dl.ownBlocks[hash]
  142. }
  143. // CurrentHeader retrieves the current head header from the canonical chain.
  144. func (dl *downloadTester) CurrentHeader() *types.Header {
  145. dl.lock.RLock()
  146. defer dl.lock.RUnlock()
  147. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  148. if header := dl.ancientHeaders[dl.ownHashes[i]]; header != nil {
  149. return header
  150. }
  151. if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil {
  152. return header
  153. }
  154. }
  155. return dl.genesis.Header()
  156. }
  157. // CurrentBlock retrieves the current head block from the canonical chain.
  158. func (dl *downloadTester) CurrentBlock() *types.Block {
  159. dl.lock.RLock()
  160. defer dl.lock.RUnlock()
  161. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  162. if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
  163. if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
  164. return block
  165. }
  166. return block
  167. }
  168. if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
  169. if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
  170. return block
  171. }
  172. }
  173. }
  174. return dl.genesis
  175. }
  176. // CurrentFastBlock retrieves the current head fast-sync block from the canonical chain.
  177. func (dl *downloadTester) CurrentFastBlock() *types.Block {
  178. dl.lock.RLock()
  179. defer dl.lock.RUnlock()
  180. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  181. if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
  182. return block
  183. }
  184. if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
  185. return block
  186. }
  187. }
  188. return dl.genesis
  189. }
  190. // FastSyncCommitHead manually sets the head block to a given hash.
  191. func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error {
  192. // For now only check that the state trie is correct
  193. if block := dl.GetBlockByHash(hash); block != nil {
  194. _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb))
  195. return err
  196. }
  197. return fmt.Errorf("non existent block: %x", hash[:4])
  198. }
  199. // GetTd retrieves the block's total difficulty from the canonical chain.
  200. func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
  201. dl.lock.RLock()
  202. defer dl.lock.RUnlock()
  203. if td := dl.ancientChainTd[hash]; td != nil {
  204. return td
  205. }
  206. return dl.ownChainTd[hash]
  207. }
  208. // InsertHeaderChain injects a new batch of headers into the simulated chain.
  209. func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
  210. dl.lock.Lock()
  211. defer dl.lock.Unlock()
  212. // Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
  213. if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
  214. return 0, errors.New("unknown parent")
  215. }
  216. for i := 1; i < len(headers); i++ {
  217. if headers[i].ParentHash != headers[i-1].Hash() {
  218. return i, errors.New("unknown parent")
  219. }
  220. }
  221. // Do a full insert if pre-checks passed
  222. for i, header := range headers {
  223. if _, ok := dl.ownHeaders[header.Hash()]; ok {
  224. continue
  225. }
  226. if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
  227. return i, errors.New("unknown parent")
  228. }
  229. dl.ownHashes = append(dl.ownHashes, header.Hash())
  230. dl.ownHeaders[header.Hash()] = header
  231. dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
  232. }
  233. return len(headers), nil
  234. }
  235. // InsertChain injects a new batch of blocks into the simulated chain.
  236. func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
  237. dl.lock.Lock()
  238. defer dl.lock.Unlock()
  239. for i, block := range blocks {
  240. if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
  241. return i, errors.New("unknown parent")
  242. } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
  243. return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
  244. }
  245. if _, ok := dl.ownHeaders[block.Hash()]; !ok {
  246. dl.ownHashes = append(dl.ownHashes, block.Hash())
  247. dl.ownHeaders[block.Hash()] = block.Header()
  248. }
  249. dl.ownBlocks[block.Hash()] = block
  250. dl.ownReceipts[block.Hash()] = make(types.Receipts, 0)
  251. dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
  252. dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty())
  253. }
  254. return len(blocks), nil
  255. }
  256. // InsertReceiptChain injects a new batch of receipts into the simulated chain.
  257. func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts, ancientLimit uint64) (i int, err error) {
  258. dl.lock.Lock()
  259. defer dl.lock.Unlock()
  260. for i := 0; i < len(blocks) && i < len(receipts); i++ {
  261. if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
  262. return i, errors.New("unknown owner")
  263. }
  264. if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
  265. if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
  266. return i, errors.New("unknown parent")
  267. }
  268. }
  269. if blocks[i].NumberU64() <= ancientLimit {
  270. dl.ancientBlocks[blocks[i].Hash()] = blocks[i]
  271. dl.ancientReceipts[blocks[i].Hash()] = receipts[i]
  272. // Migrate from active db to ancient db
  273. dl.ancientHeaders[blocks[i].Hash()] = blocks[i].Header()
  274. dl.ancientChainTd[blocks[i].Hash()] = new(big.Int).Add(dl.ancientChainTd[blocks[i].ParentHash()], blocks[i].Difficulty())
  275. delete(dl.ownHeaders, blocks[i].Hash())
  276. delete(dl.ownChainTd, blocks[i].Hash())
  277. } else {
  278. dl.ownBlocks[blocks[i].Hash()] = blocks[i]
  279. dl.ownReceipts[blocks[i].Hash()] = receipts[i]
  280. }
  281. }
  282. return len(blocks), nil
  283. }
  284. // Rollback removes some recently added elements from the chain.
  285. func (dl *downloadTester) Rollback(hashes []common.Hash) {
  286. dl.lock.Lock()
  287. defer dl.lock.Unlock()
  288. for i := len(hashes) - 1; i >= 0; i-- {
  289. if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] {
  290. dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1]
  291. }
  292. delete(dl.ownChainTd, hashes[i])
  293. delete(dl.ownHeaders, hashes[i])
  294. delete(dl.ownReceipts, hashes[i])
  295. delete(dl.ownBlocks, hashes[i])
  296. delete(dl.ancientChainTd, hashes[i])
  297. delete(dl.ancientHeaders, hashes[i])
  298. delete(dl.ancientReceipts, hashes[i])
  299. delete(dl.ancientBlocks, hashes[i])
  300. }
  301. }
  302. // newPeer registers a new block download source into the downloader.
  303. func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error {
  304. dl.lock.Lock()
  305. defer dl.lock.Unlock()
  306. peer := &downloadTesterPeer{dl: dl, id: id, chain: chain}
  307. dl.peers[id] = peer
  308. return dl.downloader.RegisterPeer(id, version, peer)
  309. }
  310. // dropPeer simulates a hard peer removal from the connection pool.
  311. func (dl *downloadTester) dropPeer(id string) {
  312. dl.lock.Lock()
  313. defer dl.lock.Unlock()
  314. delete(dl.peers, id)
  315. dl.downloader.UnregisterPeer(id)
  316. }
  317. type downloadTesterPeer struct {
  318. dl *downloadTester
  319. id string
  320. lock sync.RWMutex
  321. chain *testChain
  322. missingStates map[common.Hash]bool // State entries that fast sync should not return
  323. }
  324. // Head constructs a function to retrieve a peer's current head hash
  325. // and total difficulty.
  326. func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
  327. b := dlp.chain.headBlock()
  328. return b.Hash(), dlp.chain.td(b.Hash())
  329. }
  330. // RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
  331. // origin; associated with a particular peer in the download tester. The returned
  332. // function can be used to retrieve batches of headers from the particular peer.
  333. func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  334. if reverse {
  335. panic("reverse header requests not supported")
  336. }
  337. result := dlp.chain.headersByHash(origin, amount, skip)
  338. go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
  339. return nil
  340. }
  341. // RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
  342. // origin; associated with a particular peer in the download tester. The returned
  343. // function can be used to retrieve batches of headers from the particular peer.
  344. func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  345. if reverse {
  346. panic("reverse header requests not supported")
  347. }
  348. result := dlp.chain.headersByNumber(origin, amount, skip)
  349. go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
  350. return nil
  351. }
  352. // RequestBodies constructs a getBlockBodies method associated with a particular
  353. // peer in the download tester. The returned function can be used to retrieve
  354. // batches of block bodies from the particularly requested peer.
  355. func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
  356. txs, uncles := dlp.chain.bodies(hashes)
  357. go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles)
  358. return nil
  359. }
  360. // RequestReceipts constructs a getReceipts method associated with a particular
  361. // peer in the download tester. The returned function can be used to retrieve
  362. // batches of block receipts from the particularly requested peer.
  363. func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
  364. receipts := dlp.chain.receipts(hashes)
  365. go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts)
  366. return nil
  367. }
  368. // RequestNodeData constructs a getNodeData method associated with a particular
  369. // peer in the download tester. The returned function can be used to retrieve
  370. // batches of node state data from the particularly requested peer.
  371. func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
  372. dlp.dl.lock.RLock()
  373. defer dlp.dl.lock.RUnlock()
  374. results := make([][]byte, 0, len(hashes))
  375. for _, hash := range hashes {
  376. if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
  377. if !dlp.missingStates[hash] {
  378. results = append(results, data)
  379. }
  380. }
  381. }
  382. go dlp.dl.downloader.DeliverNodeData(dlp.id, results)
  383. return nil
  384. }
  385. // assertOwnChain checks if the local chain contains the correct number of items
  386. // of the various chain components.
  387. func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
  388. // Mark this method as a helper to report errors at callsite, not in here
  389. t.Helper()
  390. assertOwnForkedChain(t, tester, 1, []int{length})
  391. }
  392. // assertOwnForkedChain checks if the local forked chain contains the correct
  393. // number of items of the various chain components.
  394. func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
  395. // Mark this method as a helper to report errors at callsite, not in here
  396. t.Helper()
  397. // Initialize the counters for the first fork
  398. headers, blocks, receipts := lengths[0], lengths[0], lengths[0]
  399. // Update the counters for each subsequent fork
  400. for _, length := range lengths[1:] {
  401. headers += length - common
  402. blocks += length - common
  403. receipts += length - common
  404. }
  405. if tester.downloader.mode == LightSync {
  406. blocks, receipts = 1, 1
  407. }
  408. if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers {
  409. t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
  410. }
  411. if bs := len(tester.ownBlocks) + len(tester.ancientBlocks) - 1; bs != blocks {
  412. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
  413. }
  414. if rs := len(tester.ownReceipts) + len(tester.ancientReceipts) - 1; rs != receipts {
  415. t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
  416. }
  417. }
  418. // Tests that simple synchronization against a canonical chain works correctly.
  419. // In this test common ancestor lookup should be short circuited and not require
  420. // binary searching.
  421. func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) }
  422. func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) }
  423. func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) }
  424. func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) }
  425. func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) }
  426. func TestCanonicalSynchronisation64Light(t *testing.T) {
  427. testCanonicalSynchronisation(t, 64, LightSync)
  428. }
  429. func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
  430. t.Parallel()
  431. tester := newTester()
  432. defer tester.terminate()
  433. // Create a small enough block chain to download
  434. chain := testChainBase.shorten(blockCacheItems - 15)
  435. tester.newPeer("peer", protocol, chain)
  436. // Synchronise with the peer and make sure all relevant data was retrieved
  437. if err := tester.sync("peer", nil, mode); err != nil {
  438. t.Fatalf("failed to synchronise blocks: %v", err)
  439. }
  440. assertOwnChain(t, tester, chain.len())
  441. }
  442. // Tests that if a large batch of blocks are being downloaded, it is throttled
  443. // until the cached blocks are retrieved.
  444. func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) }
  445. func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
  446. func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
  447. func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
  448. func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
  449. func testThrottling(t *testing.T, protocol int, mode SyncMode) {
  450. t.Parallel()
  451. tester := newTester()
  452. defer tester.terminate()
  453. // Create a long block chain to download and the tester
  454. targetBlocks := testChainBase.len() - 1
  455. tester.newPeer("peer", protocol, testChainBase)
  456. // Wrap the importer to allow stepping
  457. blocked, proceed := uint32(0), make(chan struct{})
  458. tester.downloader.chainInsertHook = func(results []*fetchResult) {
  459. atomic.StoreUint32(&blocked, uint32(len(results)))
  460. <-proceed
  461. }
  462. // Start a synchronisation concurrently
  463. errc := make(chan error)
  464. go func() {
  465. errc <- tester.sync("peer", nil, mode)
  466. }()
  467. // Iteratively take some blocks, always checking the retrieval count
  468. for {
  469. // Check the retrieval count synchronously (! reason for this ugly block)
  470. tester.lock.RLock()
  471. retrieved := len(tester.ownBlocks)
  472. tester.lock.RUnlock()
  473. if retrieved >= targetBlocks+1 {
  474. break
  475. }
  476. // Wait a bit for sync to throttle itself
  477. var cached, frozen int
  478. for start := time.Now(); time.Since(start) < 3*time.Second; {
  479. time.Sleep(25 * time.Millisecond)
  480. tester.lock.Lock()
  481. tester.downloader.queue.lock.Lock()
  482. cached = len(tester.downloader.queue.blockDonePool)
  483. if mode == FastSync {
  484. if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
  485. cached = receipts
  486. }
  487. }
  488. frozen = int(atomic.LoadUint32(&blocked))
  489. retrieved = len(tester.ownBlocks)
  490. tester.downloader.queue.lock.Unlock()
  491. tester.lock.Unlock()
  492. if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
  493. break
  494. }
  495. }
  496. // Make sure we filled up the cache, then exhaust it
  497. time.Sleep(25 * time.Millisecond) // give it a chance to screw up
  498. tester.lock.RLock()
  499. retrieved = len(tester.ownBlocks)
  500. tester.lock.RUnlock()
  501. if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
  502. t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
  503. }
  504. // Permit the blocked blocks to import
  505. if atomic.LoadUint32(&blocked) > 0 {
  506. atomic.StoreUint32(&blocked, uint32(0))
  507. proceed <- struct{}{}
  508. }
  509. }
  510. // Check that we haven't pulled more blocks than available
  511. assertOwnChain(t, tester, targetBlocks+1)
  512. if err := <-errc; err != nil {
  513. t.Fatalf("block synchronization failed: %v", err)
  514. }
  515. }
  516. // Tests that simple synchronization against a forked chain works correctly. In
  517. // this test common ancestor lookup should *not* be short circuited, and a full
  518. // binary search should be executed.
  519. func TestForkedSync62(t *testing.T) { testForkedSync(t, 62, FullSync) }
  520. func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) }
  521. func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) }
  522. func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) }
  523. func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) }
  524. func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
  525. func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
  526. t.Parallel()
  527. tester := newTester()
  528. defer tester.terminate()
  529. chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
  530. chainB := testChainForkLightB.shorten(testChainBase.len() + 80)
  531. tester.newPeer("fork A", protocol, chainA)
  532. tester.newPeer("fork B", protocol, chainB)
  533. // Synchronise with the peer and make sure all blocks were retrieved
  534. if err := tester.sync("fork A", nil, mode); err != nil {
  535. t.Fatalf("failed to synchronise blocks: %v", err)
  536. }
  537. assertOwnChain(t, tester, chainA.len())
  538. // Synchronise with the second peer and make sure that fork is pulled too
  539. if err := tester.sync("fork B", nil, mode); err != nil {
  540. t.Fatalf("failed to synchronise blocks: %v", err)
  541. }
  542. assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
  543. }
  544. // Tests that synchronising against a much shorter but much heavyer fork works
  545. // corrently and is not dropped.
  546. func TestHeavyForkedSync62(t *testing.T) { testHeavyForkedSync(t, 62, FullSync) }
  547. func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) }
  548. func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) }
  549. func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) }
  550. func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) }
  551. func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, LightSync) }
  552. func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
  553. t.Parallel()
  554. tester := newTester()
  555. defer tester.terminate()
  556. chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
  557. chainB := testChainForkHeavy.shorten(testChainBase.len() + 80)
  558. tester.newPeer("light", protocol, chainA)
  559. tester.newPeer("heavy", protocol, chainB)
  560. // Synchronise with the peer and make sure all blocks were retrieved
  561. if err := tester.sync("light", nil, mode); err != nil {
  562. t.Fatalf("failed to synchronise blocks: %v", err)
  563. }
  564. assertOwnChain(t, tester, chainA.len())
  565. // Synchronise with the second peer and make sure that fork is pulled too
  566. if err := tester.sync("heavy", nil, mode); err != nil {
  567. t.Fatalf("failed to synchronise blocks: %v", err)
  568. }
  569. assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
  570. }
  571. // Tests that chain forks are contained within a certain interval of the current
  572. // chain head, ensuring that malicious peers cannot waste resources by feeding
  573. // long dead chains.
  574. func TestBoundedForkedSync62(t *testing.T) { testBoundedForkedSync(t, 62, FullSync) }
  575. func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) }
  576. func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) }
  577. func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) }
  578. func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) }
  579. func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, LightSync) }
  580. func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
  581. t.Parallel()
  582. tester := newTester()
  583. defer tester.terminate()
  584. chainA := testChainForkLightA
  585. chainB := testChainForkLightB
  586. tester.newPeer("original", protocol, chainA)
  587. tester.newPeer("rewriter", protocol, chainB)
  588. // Synchronise with the peer and make sure all blocks were retrieved
  589. if err := tester.sync("original", nil, mode); err != nil {
  590. t.Fatalf("failed to synchronise blocks: %v", err)
  591. }
  592. assertOwnChain(t, tester, chainA.len())
  593. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  594. if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor {
  595. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  596. }
  597. }
  598. // Tests that chain forks are contained within a certain interval of the current
  599. // chain head for short but heavy forks too. These are a bit special because they
  600. // take different ancestor lookup paths.
  601. func TestBoundedHeavyForkedSync62(t *testing.T) { testBoundedHeavyForkedSync(t, 62, FullSync) }
  602. func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) }
  603. func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) }
  604. func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) }
  605. func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) }
  606. func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSync(t, 64, LightSync) }
  607. func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
  608. t.Parallel()
  609. tester := newTester()
  610. defer tester.terminate()
  611. // Create a long enough forked chain
  612. chainA := testChainForkLightA
  613. chainB := testChainForkHeavy
  614. tester.newPeer("original", protocol, chainA)
  615. tester.newPeer("heavy-rewriter", protocol, chainB)
  616. // Synchronise with the peer and make sure all blocks were retrieved
  617. if err := tester.sync("original", nil, mode); err != nil {
  618. t.Fatalf("failed to synchronise blocks: %v", err)
  619. }
  620. assertOwnChain(t, tester, chainA.len())
  621. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  622. if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
  623. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  624. }
  625. }
  626. // Tests that an inactive downloader will not accept incoming block headers and
  627. // bodies.
  628. func TestInactiveDownloader62(t *testing.T) {
  629. t.Parallel()
  630. tester := newTester()
  631. defer tester.terminate()
  632. // Check that neither block headers nor bodies are accepted
  633. if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
  634. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  635. }
  636. if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
  637. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  638. }
  639. }
  640. // Tests that an inactive downloader will not accept incoming block headers,
  641. // bodies and receipts.
  642. func TestInactiveDownloader63(t *testing.T) {
  643. t.Parallel()
  644. tester := newTester()
  645. defer tester.terminate()
  646. // Check that neither block headers nor bodies are accepted
  647. if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
  648. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  649. }
  650. if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
  651. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  652. }
  653. if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive {
  654. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  655. }
  656. }
  657. // Tests that a canceled download wipes all previously accumulated state.
  658. func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) }
  659. func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) }
  660. func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) }
  661. func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) }
  662. func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
  663. func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
  664. func testCancel(t *testing.T, protocol int, mode SyncMode) {
  665. t.Parallel()
  666. tester := newTester()
  667. defer tester.terminate()
  668. chain := testChainBase.shorten(MaxHeaderFetch)
  669. tester.newPeer("peer", protocol, chain)
  670. // Make sure canceling works with a pristine downloader
  671. tester.downloader.Cancel()
  672. if !tester.downloader.queue.Idle() {
  673. t.Errorf("download queue not idle")
  674. }
  675. // Synchronise with the peer, but cancel afterwards
  676. if err := tester.sync("peer", nil, mode); err != nil {
  677. t.Fatalf("failed to synchronise blocks: %v", err)
  678. }
  679. tester.downloader.Cancel()
  680. if !tester.downloader.queue.Idle() {
  681. t.Errorf("download queue not idle")
  682. }
  683. }
  684. // Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
  685. func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) }
  686. func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) }
  687. func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) }
  688. func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) }
  689. func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) }
  690. func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
  691. func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
  692. t.Parallel()
  693. tester := newTester()
  694. defer tester.terminate()
  695. // Create various peers with various parts of the chain
  696. targetPeers := 8
  697. chain := testChainBase.shorten(targetPeers * 100)
  698. for i := 0; i < targetPeers; i++ {
  699. id := fmt.Sprintf("peer #%d", i)
  700. tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1)))
  701. }
  702. if err := tester.sync("peer #0", nil, mode); err != nil {
  703. t.Fatalf("failed to synchronise blocks: %v", err)
  704. }
  705. assertOwnChain(t, tester, chain.len())
  706. }
  707. // Tests that synchronisations behave well in multi-version protocol environments
  708. // and not wreak havoc on other nodes in the network.
  709. func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) }
  710. func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) }
  711. func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) }
  712. func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) }
  713. func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) }
  714. func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
  715. func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
  716. t.Parallel()
  717. tester := newTester()
  718. defer tester.terminate()
  719. // Create a small enough block chain to download
  720. chain := testChainBase.shorten(blockCacheItems - 15)
  721. // Create peers of every type
  722. tester.newPeer("peer 62", 62, chain)
  723. tester.newPeer("peer 63", 63, chain)
  724. tester.newPeer("peer 64", 64, chain)
  725. // Synchronise with the requested peer and make sure all blocks were retrieved
  726. if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil {
  727. t.Fatalf("failed to synchronise blocks: %v", err)
  728. }
  729. assertOwnChain(t, tester, chain.len())
  730. // Check that no peers have been dropped off
  731. for _, version := range []int{62, 63, 64} {
  732. peer := fmt.Sprintf("peer %d", version)
  733. if _, ok := tester.peers[peer]; !ok {
  734. t.Errorf("%s dropped", peer)
  735. }
  736. }
  737. }
  738. // Tests that if a block is empty (e.g. header only), no body request should be
  739. // made, and instead the header should be assembled into a whole block in itself.
  740. func TestEmptyShortCircuit62(t *testing.T) { testEmptyShortCircuit(t, 62, FullSync) }
  741. func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) }
  742. func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) }
  743. func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) }
  744. func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) }
  745. func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
  746. func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
  747. t.Parallel()
  748. tester := newTester()
  749. defer tester.terminate()
  750. // Create a block chain to download
  751. chain := testChainBase
  752. tester.newPeer("peer", protocol, chain)
  753. // Instrument the downloader to signal body requests
  754. bodiesHave, receiptsHave := int32(0), int32(0)
  755. tester.downloader.bodyFetchHook = func(headers []*types.Header) {
  756. atomic.AddInt32(&bodiesHave, int32(len(headers)))
  757. }
  758. tester.downloader.receiptFetchHook = func(headers []*types.Header) {
  759. atomic.AddInt32(&receiptsHave, int32(len(headers)))
  760. }
  761. // Synchronise with the peer and make sure all blocks were retrieved
  762. if err := tester.sync("peer", nil, mode); err != nil {
  763. t.Fatalf("failed to synchronise blocks: %v", err)
  764. }
  765. assertOwnChain(t, tester, chain.len())
  766. // Validate the number of block bodies that should have been requested
  767. bodiesNeeded, receiptsNeeded := 0, 0
  768. for _, block := range chain.blockm {
  769. if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
  770. bodiesNeeded++
  771. }
  772. }
  773. for _, receipt := range chain.receiptm {
  774. if mode == FastSync && len(receipt) > 0 {
  775. receiptsNeeded++
  776. }
  777. }
  778. if int(bodiesHave) != bodiesNeeded {
  779. t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
  780. }
  781. if int(receiptsHave) != receiptsNeeded {
  782. t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
  783. }
  784. }
  785. // Tests that headers are enqueued continuously, preventing malicious nodes from
  786. // stalling the downloader by feeding gapped header chains.
  787. func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62, FullSync) }
  788. func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) }
  789. func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) }
  790. func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) }
  791. func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) }
  792. func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
  793. func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
  794. t.Parallel()
  795. tester := newTester()
  796. defer tester.terminate()
  797. chain := testChainBase.shorten(blockCacheItems - 15)
  798. brokenChain := chain.shorten(chain.len())
  799. delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
  800. tester.newPeer("attack", protocol, brokenChain)
  801. if err := tester.sync("attack", nil, mode); err == nil {
  802. t.Fatalf("succeeded attacker synchronisation")
  803. }
  804. // Synchronise with the valid peer and make sure sync succeeds
  805. tester.newPeer("valid", protocol, chain)
  806. if err := tester.sync("valid", nil, mode); err != nil {
  807. t.Fatalf("failed to synchronise blocks: %v", err)
  808. }
  809. assertOwnChain(t, tester, chain.len())
  810. }
  811. // Tests that if requested headers are shifted (i.e. first is missing), the queue
  812. // detects the invalid numbering.
  813. func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62, FullSync) }
  814. func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) }
  815. func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) }
  816. func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) }
  817. func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) }
  818. func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
  819. func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
  820. t.Parallel()
  821. tester := newTester()
  822. defer tester.terminate()
  823. chain := testChainBase.shorten(blockCacheItems - 15)
  824. // Attempt a full sync with an attacker feeding shifted headers
  825. brokenChain := chain.shorten(chain.len())
  826. delete(brokenChain.headerm, brokenChain.chain[1])
  827. delete(brokenChain.blockm, brokenChain.chain[1])
  828. delete(brokenChain.receiptm, brokenChain.chain[1])
  829. tester.newPeer("attack", protocol, brokenChain)
  830. if err := tester.sync("attack", nil, mode); err == nil {
  831. t.Fatalf("succeeded attacker synchronisation")
  832. }
  833. // Synchronise with the valid peer and make sure sync succeeds
  834. tester.newPeer("valid", protocol, chain)
  835. if err := tester.sync("valid", nil, mode); err != nil {
  836. t.Fatalf("failed to synchronise blocks: %v", err)
  837. }
  838. assertOwnChain(t, tester, chain.len())
  839. }
  840. // Tests that upon detecting an invalid header, the recent ones are rolled back
  841. // for various failure scenarios. Afterwards a full sync is attempted to make
  842. // sure no state was corrupted.
  843. func TestInvalidHeaderRollback63Fast(t *testing.T) { testInvalidHeaderRollback(t, 63, FastSync) }
  844. func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(t, 64, FastSync) }
  845. func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
  846. func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
  847. t.Parallel()
  848. tester := newTester()
  849. defer tester.terminate()
  850. // Create a small enough block chain to download
  851. targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
  852. chain := testChainBase.shorten(targetBlocks)
  853. // Attempt to sync with an attacker that feeds junk during the fast sync phase.
  854. // This should result in the last fsHeaderSafetyNet headers being rolled back.
  855. missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
  856. fastAttackChain := chain.shorten(chain.len())
  857. delete(fastAttackChain.headerm, fastAttackChain.chain[missing])
  858. tester.newPeer("fast-attack", protocol, fastAttackChain)
  859. if err := tester.sync("fast-attack", nil, mode); err == nil {
  860. t.Fatalf("succeeded fast attacker synchronisation")
  861. }
  862. if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
  863. t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
  864. }
  865. // Attempt to sync with an attacker that feeds junk during the block import phase.
  866. // This should result in both the last fsHeaderSafetyNet number of headers being
  867. // rolled back, and also the pivot point being reverted to a non-block status.
  868. missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
  869. blockAttackChain := chain.shorten(chain.len())
  870. delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in
  871. delete(blockAttackChain.headerm, blockAttackChain.chain[missing])
  872. tester.newPeer("block-attack", protocol, blockAttackChain)
  873. if err := tester.sync("block-attack", nil, mode); err == nil {
  874. t.Fatalf("succeeded block attacker synchronisation")
  875. }
  876. if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  877. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  878. }
  879. if mode == FastSync {
  880. if head := tester.CurrentBlock().NumberU64(); head != 0 {
  881. t.Errorf("fast sync pivot block #%d not rolled back", head)
  882. }
  883. }
  884. // Attempt to sync with an attacker that withholds promised blocks after the
  885. // fast sync pivot point. This could be a trial to leave the node with a bad
  886. // but already imported pivot block.
  887. withholdAttackChain := chain.shorten(chain.len())
  888. tester.newPeer("withhold-attack", protocol, withholdAttackChain)
  889. tester.downloader.syncInitHook = func(uint64, uint64) {
  890. for i := missing; i < withholdAttackChain.len(); i++ {
  891. delete(withholdAttackChain.headerm, withholdAttackChain.chain[i])
  892. }
  893. tester.downloader.syncInitHook = nil
  894. }
  895. if err := tester.sync("withhold-attack", nil, mode); err == nil {
  896. t.Fatalf("succeeded withholding attacker synchronisation")
  897. }
  898. if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  899. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  900. }
  901. if mode == FastSync {
  902. if head := tester.CurrentBlock().NumberU64(); head != 0 {
  903. t.Errorf("fast sync pivot block #%d not rolled back", head)
  904. }
  905. }
  906. // synchronise with the valid peer and make sure sync succeeds. Since the last rollback
  907. // should also disable fast syncing for this process, verify that we did a fresh full
  908. // sync. Note, we can't assert anything about the receipts since we won't purge the
  909. // database of them, hence we can't use assertOwnChain.
  910. tester.newPeer("valid", protocol, chain)
  911. if err := tester.sync("valid", nil, mode); err != nil {
  912. t.Fatalf("failed to synchronise blocks: %v", err)
  913. }
  914. if hs := len(tester.ownHeaders); hs != chain.len() {
  915. t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len())
  916. }
  917. if mode != LightSync {
  918. if bs := len(tester.ownBlocks); bs != chain.len() {
  919. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
  920. }
  921. }
  922. }
  923. // Tests that a peer advertising a high TD doesn't get to stall the downloader
  924. // afterwards by not sending any useful hashes.
  925. func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) }
  926. func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) }
  927. func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) }
  928. func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) }
  929. func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) }
  930. func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
  931. func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
  932. t.Parallel()
  933. tester := newTester()
  934. defer tester.terminate()
  935. chain := testChainBase.shorten(1)
  936. tester.newPeer("attack", protocol, chain)
  937. if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
  938. t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
  939. }
  940. }
  941. // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
  942. func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) }
  943. func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
  944. func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
  945. func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
  946. t.Parallel()
  947. // Define the disconnection requirement for individual hash fetch errors
  948. tests := []struct {
  949. result error
  950. drop bool
  951. }{
  952. {nil, false}, // Sync succeeded, all is well
  953. {errBusy, false}, // Sync is already in progress, no problem
  954. {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
  955. {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
  956. {errStallingPeer, true}, // Peer was detected to be stalling, drop it
  957. {errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it
  958. {errNoPeers, false}, // No peers to download from, soft race, no issue
  959. {errTimeout, true}, // No hashes received in due time, drop the peer
  960. {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
  961. {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
  962. {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
  963. {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
  964. {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
  965. {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
  966. {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  967. }
  968. // Run the tests and check disconnection status
  969. tester := newTester()
  970. defer tester.terminate()
  971. chain := testChainBase.shorten(1)
  972. for i, tt := range tests {
  973. // Register a new peer and ensure its presence
  974. id := fmt.Sprintf("test %d", i)
  975. if err := tester.newPeer(id, protocol, chain); err != nil {
  976. t.Fatalf("test %d: failed to register new peer: %v", i, err)
  977. }
  978. if _, ok := tester.peers[id]; !ok {
  979. t.Fatalf("test %d: registered peer not found", i)
  980. }
  981. // Simulate a synchronisation and check the required result
  982. tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
  983. tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync)
  984. if _, ok := tester.peers[id]; !ok != tt.drop {
  985. t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
  986. }
  987. }
  988. }
  989. // Tests that synchronisation progress (origin block number, current block number
  990. // and highest block number) is tracked and updated correctly.
  991. func TestSyncProgress62(t *testing.T) { testSyncProgress(t, 62, FullSync) }
  992. func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) }
  993. func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) }
  994. func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) }
  995. func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
  996. func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
  997. func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  998. t.Parallel()
  999. tester := newTester()
  1000. defer tester.terminate()
  1001. chain := testChainBase.shorten(blockCacheItems - 15)
  1002. // Set a sync init hook to catch progress changes
  1003. starting := make(chan struct{})
  1004. progress := make(chan struct{})
  1005. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1006. starting <- struct{}{}
  1007. <-progress
  1008. }
  1009. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1010. // Synchronise half the blocks and check initial progress
  1011. tester.newPeer("peer-half", protocol, chain.shorten(chain.len()/2))
  1012. pending := new(sync.WaitGroup)
  1013. pending.Add(1)
  1014. go func() {
  1015. defer pending.Done()
  1016. if err := tester.sync("peer-half", nil, mode); err != nil {
  1017. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1018. }
  1019. }()
  1020. <-starting
  1021. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1022. HighestBlock: uint64(chain.len()/2 - 1),
  1023. })
  1024. progress <- struct{}{}
  1025. pending.Wait()
  1026. // Synchronise all the blocks and check continuation progress
  1027. tester.newPeer("peer-full", protocol, chain)
  1028. pending.Add(1)
  1029. go func() {
  1030. defer pending.Done()
  1031. if err := tester.sync("peer-full", nil, mode); err != nil {
  1032. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1033. }
  1034. }()
  1035. <-starting
  1036. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  1037. StartingBlock: uint64(chain.len()/2 - 1),
  1038. CurrentBlock: uint64(chain.len()/2 - 1),
  1039. HighestBlock: uint64(chain.len() - 1),
  1040. })
  1041. // Check final progress after successful sync
  1042. progress <- struct{}{}
  1043. pending.Wait()
  1044. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1045. StartingBlock: uint64(chain.len()/2 - 1),
  1046. CurrentBlock: uint64(chain.len() - 1),
  1047. HighestBlock: uint64(chain.len() - 1),
  1048. })
  1049. }
  1050. func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) {
  1051. // Mark this method as a helper to report errors at callsite, not in here
  1052. t.Helper()
  1053. p := d.Progress()
  1054. p.KnownStates, p.PulledStates = 0, 0
  1055. want.KnownStates, want.PulledStates = 0, 0
  1056. if p != want {
  1057. t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want)
  1058. }
  1059. }
  1060. // Tests that synchronisation progress (origin block number and highest block
  1061. // number) is tracked and updated correctly in case of a fork (or manual head
  1062. // revertal).
  1063. func TestForkedSyncProgress62(t *testing.T) { testForkedSyncProgress(t, 62, FullSync) }
  1064. func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) }
  1065. func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) }
  1066. func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) }
  1067. func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) }
  1068. func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
  1069. func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1070. t.Parallel()
  1071. tester := newTester()
  1072. defer tester.terminate()
  1073. chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch)
  1074. chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch)
  1075. // Set a sync init hook to catch progress changes
  1076. starting := make(chan struct{})
  1077. progress := make(chan struct{})
  1078. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1079. starting <- struct{}{}
  1080. <-progress
  1081. }
  1082. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1083. // Synchronise with one of the forks and check progress
  1084. tester.newPeer("fork A", protocol, chainA)
  1085. pending := new(sync.WaitGroup)
  1086. pending.Add(1)
  1087. go func() {
  1088. defer pending.Done()
  1089. if err := tester.sync("fork A", nil, mode); err != nil {
  1090. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1091. }
  1092. }()
  1093. <-starting
  1094. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1095. HighestBlock: uint64(chainA.len() - 1),
  1096. })
  1097. progress <- struct{}{}
  1098. pending.Wait()
  1099. // Simulate a successful sync above the fork
  1100. tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight
  1101. // Synchronise with the second fork and check progress resets
  1102. tester.newPeer("fork B", protocol, chainB)
  1103. pending.Add(1)
  1104. go func() {
  1105. defer pending.Done()
  1106. if err := tester.sync("fork B", nil, mode); err != nil {
  1107. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1108. }
  1109. }()
  1110. <-starting
  1111. checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{
  1112. StartingBlock: uint64(testChainBase.len()) - 1,
  1113. CurrentBlock: uint64(chainA.len() - 1),
  1114. HighestBlock: uint64(chainB.len() - 1),
  1115. })
  1116. // Check final progress after successful sync
  1117. progress <- struct{}{}
  1118. pending.Wait()
  1119. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1120. StartingBlock: uint64(testChainBase.len()) - 1,
  1121. CurrentBlock: uint64(chainB.len() - 1),
  1122. HighestBlock: uint64(chainB.len() - 1),
  1123. })
  1124. }
  1125. // Tests that if synchronisation is aborted due to some failure, then the progress
  1126. // origin is not updated in the next sync cycle, as it should be considered the
  1127. // continuation of the previous sync and not a new instance.
  1128. func TestFailedSyncProgress62(t *testing.T) { testFailedSyncProgress(t, 62, FullSync) }
  1129. func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) }
  1130. func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) }
  1131. func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) }
  1132. func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) }
  1133. func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
  1134. func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1135. t.Parallel()
  1136. tester := newTester()
  1137. defer tester.terminate()
  1138. chain := testChainBase.shorten(blockCacheItems - 15)
  1139. // Set a sync init hook to catch progress changes
  1140. starting := make(chan struct{})
  1141. progress := make(chan struct{})
  1142. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1143. starting <- struct{}{}
  1144. <-progress
  1145. }
  1146. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1147. // Attempt a full sync with a faulty peer
  1148. brokenChain := chain.shorten(chain.len())
  1149. missing := brokenChain.len() / 2
  1150. delete(brokenChain.headerm, brokenChain.chain[missing])
  1151. delete(brokenChain.blockm, brokenChain.chain[missing])
  1152. delete(brokenChain.receiptm, brokenChain.chain[missing])
  1153. tester.newPeer("faulty", protocol, brokenChain)
  1154. pending := new(sync.WaitGroup)
  1155. pending.Add(1)
  1156. go func() {
  1157. defer pending.Done()
  1158. if err := tester.sync("faulty", nil, mode); err == nil {
  1159. panic("succeeded faulty synchronisation")
  1160. }
  1161. }()
  1162. <-starting
  1163. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1164. HighestBlock: uint64(brokenChain.len() - 1),
  1165. })
  1166. progress <- struct{}{}
  1167. pending.Wait()
  1168. afterFailedSync := tester.downloader.Progress()
  1169. // Synchronise with a good peer and check that the progress origin remind the same
  1170. // after a failure
  1171. tester.newPeer("valid", protocol, chain)
  1172. pending.Add(1)
  1173. go func() {
  1174. defer pending.Done()
  1175. if err := tester.sync("valid", nil, mode); err != nil {
  1176. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1177. }
  1178. }()
  1179. <-starting
  1180. checkProgress(t, tester.downloader, "completing", afterFailedSync)
  1181. // Check final progress after successful sync
  1182. progress <- struct{}{}
  1183. pending.Wait()
  1184. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1185. CurrentBlock: uint64(chain.len() - 1),
  1186. HighestBlock: uint64(chain.len() - 1),
  1187. })
  1188. }
  1189. // Tests that if an attacker fakes a chain height, after the attack is detected,
  1190. // the progress height is successfully reduced at the next sync invocation.
  1191. func TestFakedSyncProgress62(t *testing.T) { testFakedSyncProgress(t, 62, FullSync) }
  1192. func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) }
  1193. func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) }
  1194. func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) }
  1195. func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) }
  1196. func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
  1197. func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1198. t.Parallel()
  1199. tester := newTester()
  1200. defer tester.terminate()
  1201. chain := testChainBase.shorten(blockCacheItems - 15)
  1202. // Set a sync init hook to catch progress changes
  1203. starting := make(chan struct{})
  1204. progress := make(chan struct{})
  1205. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1206. starting <- struct{}{}
  1207. <-progress
  1208. }
  1209. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1210. // Create and sync with an attacker that promises a higher chain than available.
  1211. brokenChain := chain.shorten(chain.len())
  1212. numMissing := 5
  1213. for i := brokenChain.len() - 2; i > brokenChain.len()-numMissing; i-- {
  1214. delete(brokenChain.headerm, brokenChain.chain[i])
  1215. }
  1216. tester.newPeer("attack", protocol, brokenChain)
  1217. pending := new(sync.WaitGroup)
  1218. pending.Add(1)
  1219. go func() {
  1220. defer pending.Done()
  1221. if err := tester.sync("attack", nil, mode); err == nil {
  1222. panic("succeeded attacker synchronisation")
  1223. }
  1224. }()
  1225. <-starting
  1226. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1227. HighestBlock: uint64(brokenChain.len() - 1),
  1228. })
  1229. progress <- struct{}{}
  1230. pending.Wait()
  1231. afterFailedSync := tester.downloader.Progress()
  1232. // Synchronise with a good peer and check that the progress height has been reduced to
  1233. // the true value.
  1234. validChain := chain.shorten(chain.len() - numMissing)
  1235. tester.newPeer("valid", protocol, validChain)
  1236. pending.Add(1)
  1237. go func() {
  1238. defer pending.Done()
  1239. if err := tester.sync("valid", nil, mode); err != nil {
  1240. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1241. }
  1242. }()
  1243. <-starting
  1244. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  1245. CurrentBlock: afterFailedSync.CurrentBlock,
  1246. HighestBlock: uint64(validChain.len() - 1),
  1247. })
  1248. // Check final progress after successful sync.
  1249. progress <- struct{}{}
  1250. pending.Wait()
  1251. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1252. CurrentBlock: uint64(validChain.len() - 1),
  1253. HighestBlock: uint64(validChain.len() - 1),
  1254. })
  1255. }
  1256. // This test reproduces an issue where unexpected deliveries would
  1257. // block indefinitely if they arrived at the right time.
  1258. func TestDeliverHeadersHang(t *testing.T) {
  1259. t.Parallel()
  1260. testCases := []struct {
  1261. protocol int
  1262. syncMode SyncMode
  1263. }{
  1264. {62, FullSync},
  1265. {63, FullSync},
  1266. {63, FastSync},
  1267. {64, FullSync},
  1268. {64, FastSync},
  1269. {64, LightSync},
  1270. }
  1271. for _, tc := range testCases {
  1272. t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) {
  1273. t.Parallel()
  1274. testDeliverHeadersHang(t, tc.protocol, tc.syncMode)
  1275. })
  1276. }
  1277. }
  1278. func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
  1279. master := newTester()
  1280. defer master.terminate()
  1281. chain := testChainBase.shorten(15)
  1282. for i := 0; i < 200; i++ {
  1283. tester := newTester()
  1284. tester.peerDb = master.peerDb
  1285. tester.newPeer("peer", protocol, chain)
  1286. // Whenever the downloader requests headers, flood it with
  1287. // a lot of unrequested header deliveries.
  1288. tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
  1289. peer: tester.downloader.peers.peers["peer"].peer,
  1290. tester: tester,
  1291. }
  1292. if err := tester.sync("peer", nil, mode); err != nil {
  1293. t.Errorf("test %d: sync failed: %v", i, err)
  1294. }
  1295. tester.terminate()
  1296. }
  1297. }
  1298. type floodingTestPeer struct {
  1299. peer Peer
  1300. tester *downloadTester
  1301. }
  1302. func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
  1303. func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error {
  1304. return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse)
  1305. }
  1306. func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error {
  1307. return ftp.peer.RequestBodies(hashes)
  1308. }
  1309. func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error {
  1310. return ftp.peer.RequestReceipts(hashes)
  1311. }
  1312. func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error {
  1313. return ftp.peer.RequestNodeData(hashes)
  1314. }
  1315. func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error {
  1316. deliveriesDone := make(chan struct{}, 500)
  1317. for i := 0; i < cap(deliveriesDone)-1; i++ {
  1318. peer := fmt.Sprintf("fake-peer%d", i)
  1319. go func() {
  1320. ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
  1321. deliveriesDone <- struct{}{}
  1322. }()
  1323. }
  1324. // None of the extra deliveries should block.
  1325. timeout := time.After(60 * time.Second)
  1326. launched := false
  1327. for i := 0; i < cap(deliveriesDone); i++ {
  1328. select {
  1329. case <-deliveriesDone:
  1330. if !launched {
  1331. // Start delivering the requested headers
  1332. // after one of the flooding responses has arrived.
  1333. go func() {
  1334. ftp.peer.RequestHeadersByNumber(from, count, skip, reverse)
  1335. deliveriesDone <- struct{}{}
  1336. }()
  1337. launched = true
  1338. }
  1339. case <-timeout:
  1340. panic("blocked")
  1341. }
  1342. }
  1343. return nil
  1344. }
  1345. func TestRemoteHeaderRequestSpan(t *testing.T) {
  1346. testCases := []struct {
  1347. remoteHeight uint64
  1348. localHeight uint64
  1349. expected []int
  1350. }{
  1351. // Remote is way higher. We should ask for the remote head and go backwards
  1352. {1500, 1000,
  1353. []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499},
  1354. },
  1355. {15000, 13006,
  1356. []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999},
  1357. },
  1358. //Remote is pretty close to us. We don't have to fetch as many
  1359. {1200, 1150,
  1360. []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199},
  1361. },
  1362. // Remote is equal to us (so on a fork with higher td)
  1363. // We should get the closest couple of ancestors
  1364. {1500, 1500,
  1365. []int{1497, 1499},
  1366. },
  1367. // We're higher than the remote! Odd
  1368. {1000, 1500,
  1369. []int{997, 999},
  1370. },
  1371. // Check some weird edgecases that it behaves somewhat rationally
  1372. {0, 1500,
  1373. []int{0, 2},
  1374. },
  1375. {6000000, 0,
  1376. []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999},
  1377. },
  1378. {0, 0,
  1379. []int{0, 2},
  1380. },
  1381. }
  1382. reqs := func(from, count, span int) []int {
  1383. var r []int
  1384. num := from
  1385. for len(r) < count {
  1386. r = append(r, num)
  1387. num += span + 1
  1388. }
  1389. return r
  1390. }
  1391. for i, tt := range testCases {
  1392. from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
  1393. data := reqs(int(from), count, span)
  1394. if max != uint64(data[len(data)-1]) {
  1395. t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
  1396. }
  1397. failed := false
  1398. if len(data) != len(tt.expected) {
  1399. failed = true
  1400. t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
  1401. } else {
  1402. for j, n := range data {
  1403. if n != tt.expected[j] {
  1404. failed = true
  1405. break
  1406. }
  1407. }
  1408. }
  1409. if failed {
  1410. res := strings.Replace(fmt.Sprint(data), " ", ",", -1)
  1411. exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1)
  1412. t.Logf("got: %v\n", res)
  1413. t.Logf("exp: %v\n", exp)
  1414. t.Errorf("test %d: wrong values", i)
  1415. }
  1416. }
  1417. }
  1418. // Tests that peers below a pre-configured checkpoint block are prevented from
  1419. // being fast-synced from, avoiding potential cheap eclipse attacks.
  1420. func TestCheckpointEnforcement62(t *testing.T) { testCheckpointEnforcement(t, 62, FullSync) }
  1421. func TestCheckpointEnforcement63Full(t *testing.T) { testCheckpointEnforcement(t, 63, FullSync) }
  1422. func TestCheckpointEnforcement63Fast(t *testing.T) { testCheckpointEnforcement(t, 63, FastSync) }
  1423. func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) }
  1424. func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) }
  1425. func TestCheckpointEnforcement64Light(t *testing.T) { testCheckpointEnforcement(t, 64, LightSync) }
  1426. func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
  1427. t.Parallel()
  1428. // Create a new tester with a particular hard coded checkpoint block
  1429. tester := newTester()
  1430. defer tester.terminate()
  1431. tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256
  1432. chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1)
  1433. // Attempt to sync with the peer and validate the result
  1434. tester.newPeer("peer", protocol, chain)
  1435. var expect error
  1436. if mode == FastSync || mode == LightSync {
  1437. expect = errUnsyncedPeer
  1438. }
  1439. if err := tester.sync("peer", nil, mode); err != expect {
  1440. t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
  1441. }
  1442. if mode == FastSync || mode == LightSync {
  1443. assertOwnChain(t, tester, 1)
  1444. } else {
  1445. assertOwnChain(t, tester, chain.len())
  1446. }
  1447. }