downloader_test.go 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "strings"
  22. "sync"
  23. "sync/atomic"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/core/rawdb"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/trie"
  33. )
  34. // Reduce some of the parameters to make the tester faster.
  35. func init() {
  36. MaxForkAncestry = uint64(10000)
  37. blockCacheItems = 1024
  38. fsHeaderContCheck = 500 * time.Millisecond
  39. }
  40. // downloadTester is a test simulator for mocking out local block chain.
  41. type downloadTester struct {
  42. downloader *Downloader
  43. genesis *types.Block // Genesis blocks used by the tester and peers
  44. stateDb ethdb.Database // Database used by the tester for syncing from peers
  45. peerDb ethdb.Database // Database of the peers containing all data
  46. peers map[string]*downloadTesterPeer
  47. ownHashes []common.Hash // Hash chain belonging to the tester
  48. ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
  49. ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
  50. ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
  51. ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
  52. ancientHeaders map[common.Hash]*types.Header // Ancient headers belonging to the tester
  53. ancientBlocks map[common.Hash]*types.Block // Ancient blocks belonging to the tester
  54. ancientReceipts map[common.Hash]types.Receipts // Ancient receipts belonging to the tester
  55. ancientChainTd map[common.Hash]*big.Int // Ancient total difficulties of the blocks in the local chain
  56. lock sync.RWMutex
  57. }
  58. // newTester creates a new downloader test mocker.
  59. func newTester() *downloadTester {
  60. tester := &downloadTester{
  61. genesis: testGenesis,
  62. peerDb: testDB,
  63. peers: make(map[string]*downloadTesterPeer),
  64. ownHashes: []common.Hash{testGenesis.Hash()},
  65. ownHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
  66. ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
  67. ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
  68. ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
  69. // Initialize ancient store with test genesis block
  70. ancientHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
  71. ancientBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
  72. ancientReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
  73. ancientChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
  74. }
  75. tester.stateDb = rawdb.NewMemoryDatabase()
  76. tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
  77. tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer)
  78. return tester
  79. }
  80. // terminate aborts any operations on the embedded downloader and releases all
  81. // held resources.
  82. func (dl *downloadTester) terminate() {
  83. dl.downloader.Terminate()
  84. }
  85. // sync starts synchronizing with a remote peer, blocking until it completes.
  86. func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
  87. dl.lock.RLock()
  88. hash := dl.peers[id].chain.headBlock().Hash()
  89. // If no particular TD was requested, load from the peer's blockchain
  90. if td == nil {
  91. td = dl.peers[id].chain.td(hash)
  92. }
  93. dl.lock.RUnlock()
  94. // Synchronise with the chosen peer and ensure proper cleanup afterwards
  95. err := dl.downloader.synchronise(id, hash, td, mode)
  96. select {
  97. case <-dl.downloader.cancelCh:
  98. // Ok, downloader fully cancelled after sync cycle
  99. default:
  100. // Downloader is still accepting packets, can block a peer up
  101. panic("downloader active post sync cycle") // panic will be caught by tester
  102. }
  103. return err
  104. }
  105. // HasHeader checks if a header is present in the testers canonical chain.
  106. func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
  107. return dl.GetHeaderByHash(hash) != nil
  108. }
  109. // HasBlock checks if a block is present in the testers canonical chain.
  110. func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool {
  111. return dl.GetBlockByHash(hash) != nil
  112. }
  113. // HasFastBlock checks if a block is present in the testers canonical chain.
  114. func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool {
  115. dl.lock.RLock()
  116. defer dl.lock.RUnlock()
  117. if _, ok := dl.ancientReceipts[hash]; ok {
  118. return true
  119. }
  120. _, ok := dl.ownReceipts[hash]
  121. return ok
  122. }
  123. // GetHeader retrieves a header from the testers canonical chain.
  124. func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
  125. dl.lock.RLock()
  126. defer dl.lock.RUnlock()
  127. header := dl.ancientHeaders[hash]
  128. if header != nil {
  129. return header
  130. }
  131. return dl.ownHeaders[hash]
  132. }
  133. // GetBlock retrieves a block from the testers canonical chain.
  134. func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
  135. dl.lock.RLock()
  136. defer dl.lock.RUnlock()
  137. block := dl.ancientBlocks[hash]
  138. if block != nil {
  139. return block
  140. }
  141. return dl.ownBlocks[hash]
  142. }
  143. // CurrentHeader retrieves the current head header from the canonical chain.
  144. func (dl *downloadTester) CurrentHeader() *types.Header {
  145. dl.lock.RLock()
  146. defer dl.lock.RUnlock()
  147. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  148. if header := dl.ancientHeaders[dl.ownHashes[i]]; header != nil {
  149. return header
  150. }
  151. if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil {
  152. return header
  153. }
  154. }
  155. return dl.genesis.Header()
  156. }
  157. // CurrentBlock retrieves the current head block from the canonical chain.
  158. func (dl *downloadTester) CurrentBlock() *types.Block {
  159. dl.lock.RLock()
  160. defer dl.lock.RUnlock()
  161. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  162. if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
  163. if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
  164. return block
  165. }
  166. return block
  167. }
  168. if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
  169. if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
  170. return block
  171. }
  172. }
  173. }
  174. return dl.genesis
  175. }
  176. // CurrentFastBlock retrieves the current head fast-sync block from the canonical chain.
  177. func (dl *downloadTester) CurrentFastBlock() *types.Block {
  178. dl.lock.RLock()
  179. defer dl.lock.RUnlock()
  180. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  181. if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
  182. return block
  183. }
  184. if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
  185. return block
  186. }
  187. }
  188. return dl.genesis
  189. }
  190. // FastSyncCommitHead manually sets the head block to a given hash.
  191. func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error {
  192. // For now only check that the state trie is correct
  193. if block := dl.GetBlockByHash(hash); block != nil {
  194. _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb))
  195. return err
  196. }
  197. return fmt.Errorf("non existent block: %x", hash[:4])
  198. }
  199. // GetTd retrieves the block's total difficulty from the canonical chain.
  200. func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
  201. dl.lock.RLock()
  202. defer dl.lock.RUnlock()
  203. if td := dl.ancientChainTd[hash]; td != nil {
  204. return td
  205. }
  206. return dl.ownChainTd[hash]
  207. }
  208. // InsertHeaderChain injects a new batch of headers into the simulated chain.
  209. func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
  210. dl.lock.Lock()
  211. defer dl.lock.Unlock()
  212. // Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
  213. if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
  214. return 0, errors.New("unknown parent")
  215. }
  216. for i := 1; i < len(headers); i++ {
  217. if headers[i].ParentHash != headers[i-1].Hash() {
  218. return i, errors.New("unknown parent")
  219. }
  220. }
  221. // Do a full insert if pre-checks passed
  222. for i, header := range headers {
  223. if _, ok := dl.ownHeaders[header.Hash()]; ok {
  224. continue
  225. }
  226. if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
  227. return i, errors.New("unknown parent")
  228. }
  229. dl.ownHashes = append(dl.ownHashes, header.Hash())
  230. dl.ownHeaders[header.Hash()] = header
  231. dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
  232. }
  233. return len(headers), nil
  234. }
  235. // InsertChain injects a new batch of blocks into the simulated chain.
  236. func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
  237. dl.lock.Lock()
  238. defer dl.lock.Unlock()
  239. for i, block := range blocks {
  240. if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
  241. return i, errors.New("unknown parent")
  242. } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
  243. return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
  244. }
  245. if _, ok := dl.ownHeaders[block.Hash()]; !ok {
  246. dl.ownHashes = append(dl.ownHashes, block.Hash())
  247. dl.ownHeaders[block.Hash()] = block.Header()
  248. }
  249. dl.ownBlocks[block.Hash()] = block
  250. dl.ownReceipts[block.Hash()] = make(types.Receipts, 0)
  251. dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
  252. dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty())
  253. }
  254. return len(blocks), nil
  255. }
  256. // InsertReceiptChain injects a new batch of receipts into the simulated chain.
  257. func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts, ancientLimit uint64) (i int, err error) {
  258. dl.lock.Lock()
  259. defer dl.lock.Unlock()
  260. for i := 0; i < len(blocks) && i < len(receipts); i++ {
  261. if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
  262. return i, errors.New("unknown owner")
  263. }
  264. if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
  265. if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
  266. return i, errors.New("unknown parent")
  267. }
  268. }
  269. if blocks[i].NumberU64() <= ancientLimit {
  270. dl.ancientBlocks[blocks[i].Hash()] = blocks[i]
  271. dl.ancientReceipts[blocks[i].Hash()] = receipts[i]
  272. // Migrate from active db to ancient db
  273. dl.ancientHeaders[blocks[i].Hash()] = blocks[i].Header()
  274. dl.ancientChainTd[blocks[i].Hash()] = new(big.Int).Add(dl.ancientChainTd[blocks[i].ParentHash()], blocks[i].Difficulty())
  275. delete(dl.ownHeaders, blocks[i].Hash())
  276. delete(dl.ownChainTd, blocks[i].Hash())
  277. } else {
  278. dl.ownBlocks[blocks[i].Hash()] = blocks[i]
  279. dl.ownReceipts[blocks[i].Hash()] = receipts[i]
  280. }
  281. }
  282. return len(blocks), nil
  283. }
  284. // Rollback removes some recently added elements from the chain.
  285. func (dl *downloadTester) Rollback(hashes []common.Hash) {
  286. dl.lock.Lock()
  287. defer dl.lock.Unlock()
  288. for i := len(hashes) - 1; i >= 0; i-- {
  289. if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] {
  290. dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1]
  291. }
  292. delete(dl.ownChainTd, hashes[i])
  293. delete(dl.ownHeaders, hashes[i])
  294. delete(dl.ownReceipts, hashes[i])
  295. delete(dl.ownBlocks, hashes[i])
  296. delete(dl.ancientChainTd, hashes[i])
  297. delete(dl.ancientHeaders, hashes[i])
  298. delete(dl.ancientReceipts, hashes[i])
  299. delete(dl.ancientBlocks, hashes[i])
  300. }
  301. }
  302. // newPeer registers a new block download source into the downloader.
  303. func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error {
  304. dl.lock.Lock()
  305. defer dl.lock.Unlock()
  306. peer := &downloadTesterPeer{dl: dl, id: id, chain: chain}
  307. dl.peers[id] = peer
  308. return dl.downloader.RegisterPeer(id, version, peer)
  309. }
  310. // dropPeer simulates a hard peer removal from the connection pool.
  311. func (dl *downloadTester) dropPeer(id string) {
  312. dl.lock.Lock()
  313. defer dl.lock.Unlock()
  314. delete(dl.peers, id)
  315. dl.downloader.UnregisterPeer(id)
  316. }
  317. type downloadTesterPeer struct {
  318. dl *downloadTester
  319. id string
  320. lock sync.RWMutex
  321. chain *testChain
  322. missingStates map[common.Hash]bool // State entries that fast sync should not return
  323. }
  324. // Head constructs a function to retrieve a peer's current head hash
  325. // and total difficulty.
  326. func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
  327. b := dlp.chain.headBlock()
  328. return b.Hash(), dlp.chain.td(b.Hash())
  329. }
  330. // RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
  331. // origin; associated with a particular peer in the download tester. The returned
  332. // function can be used to retrieve batches of headers from the particular peer.
  333. func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  334. if reverse {
  335. panic("reverse header requests not supported")
  336. }
  337. result := dlp.chain.headersByHash(origin, amount, skip)
  338. go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
  339. return nil
  340. }
  341. // RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
  342. // origin; associated with a particular peer in the download tester. The returned
  343. // function can be used to retrieve batches of headers from the particular peer.
  344. func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  345. if reverse {
  346. panic("reverse header requests not supported")
  347. }
  348. result := dlp.chain.headersByNumber(origin, amount, skip)
  349. go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
  350. return nil
  351. }
  352. // RequestBodies constructs a getBlockBodies method associated with a particular
  353. // peer in the download tester. The returned function can be used to retrieve
  354. // batches of block bodies from the particularly requested peer.
  355. func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
  356. txs, uncles := dlp.chain.bodies(hashes)
  357. go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles)
  358. return nil
  359. }
  360. // RequestReceipts constructs a getReceipts method associated with a particular
  361. // peer in the download tester. The returned function can be used to retrieve
  362. // batches of block receipts from the particularly requested peer.
  363. func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
  364. receipts := dlp.chain.receipts(hashes)
  365. go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts)
  366. return nil
  367. }
  368. // RequestNodeData constructs a getNodeData method associated with a particular
  369. // peer in the download tester. The returned function can be used to retrieve
  370. // batches of node state data from the particularly requested peer.
  371. func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
  372. dlp.dl.lock.RLock()
  373. defer dlp.dl.lock.RUnlock()
  374. results := make([][]byte, 0, len(hashes))
  375. for _, hash := range hashes {
  376. if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
  377. if !dlp.missingStates[hash] {
  378. results = append(results, data)
  379. }
  380. }
  381. }
  382. go dlp.dl.downloader.DeliverNodeData(dlp.id, results)
  383. return nil
  384. }
  385. // assertOwnChain checks if the local chain contains the correct number of items
  386. // of the various chain components.
  387. func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
  388. // Mark this method as a helper to report errors at callsite, not in here
  389. t.Helper()
  390. assertOwnForkedChain(t, tester, 1, []int{length})
  391. }
  392. // assertOwnForkedChain checks if the local forked chain contains the correct
  393. // number of items of the various chain components.
  394. func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
  395. // Mark this method as a helper to report errors at callsite, not in here
  396. t.Helper()
  397. // Initialize the counters for the first fork
  398. headers, blocks, receipts := lengths[0], lengths[0], lengths[0]
  399. // Update the counters for each subsequent fork
  400. for _, length := range lengths[1:] {
  401. headers += length - common
  402. blocks += length - common
  403. receipts += length - common
  404. }
  405. if tester.downloader.mode == LightSync {
  406. blocks, receipts = 1, 1
  407. }
  408. if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers {
  409. t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
  410. }
  411. if bs := len(tester.ownBlocks) + len(tester.ancientBlocks) - 1; bs != blocks {
  412. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
  413. }
  414. if rs := len(tester.ownReceipts) + len(tester.ancientReceipts) - 1; rs != receipts {
  415. t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
  416. }
  417. }
  418. // Tests that simple synchronization against a canonical chain works correctly.
  419. // In this test common ancestor lookup should be short circuited and not require
  420. // binary searching.
  421. func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) }
  422. func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) }
  423. func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) }
  424. func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) }
  425. func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) }
  426. func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
  427. func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
  428. t.Parallel()
  429. tester := newTester()
  430. defer tester.terminate()
  431. // Create a small enough block chain to download
  432. chain := testChainBase.shorten(blockCacheItems - 15)
  433. tester.newPeer("peer", protocol, chain)
  434. // Synchronise with the peer and make sure all relevant data was retrieved
  435. if err := tester.sync("peer", nil, mode); err != nil {
  436. t.Fatalf("failed to synchronise blocks: %v", err)
  437. }
  438. assertOwnChain(t, tester, chain.len())
  439. }
  440. // Tests that if a large batch of blocks are being downloaded, it is throttled
  441. // until the cached blocks are retrieved.
  442. func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) }
  443. func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
  444. func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
  445. func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
  446. func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
  447. func testThrottling(t *testing.T, protocol int, mode SyncMode) {
  448. t.Parallel()
  449. tester := newTester()
  450. defer tester.terminate()
  451. // Create a long block chain to download and the tester
  452. targetBlocks := testChainBase.len() - 1
  453. tester.newPeer("peer", protocol, testChainBase)
  454. // Wrap the importer to allow stepping
  455. blocked, proceed := uint32(0), make(chan struct{})
  456. tester.downloader.chainInsertHook = func(results []*fetchResult) {
  457. atomic.StoreUint32(&blocked, uint32(len(results)))
  458. <-proceed
  459. }
  460. // Start a synchronisation concurrently
  461. errc := make(chan error)
  462. go func() {
  463. errc <- tester.sync("peer", nil, mode)
  464. }()
  465. // Iteratively take some blocks, always checking the retrieval count
  466. for {
  467. // Check the retrieval count synchronously (! reason for this ugly block)
  468. tester.lock.RLock()
  469. retrieved := len(tester.ownBlocks)
  470. tester.lock.RUnlock()
  471. if retrieved >= targetBlocks+1 {
  472. break
  473. }
  474. // Wait a bit for sync to throttle itself
  475. var cached, frozen int
  476. for start := time.Now(); time.Since(start) < 3*time.Second; {
  477. time.Sleep(25 * time.Millisecond)
  478. tester.lock.Lock()
  479. tester.downloader.queue.lock.Lock()
  480. cached = len(tester.downloader.queue.blockDonePool)
  481. if mode == FastSync {
  482. if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
  483. cached = receipts
  484. }
  485. }
  486. frozen = int(atomic.LoadUint32(&blocked))
  487. retrieved = len(tester.ownBlocks)
  488. tester.downloader.queue.lock.Unlock()
  489. tester.lock.Unlock()
  490. if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
  491. break
  492. }
  493. }
  494. // Make sure we filled up the cache, then exhaust it
  495. time.Sleep(25 * time.Millisecond) // give it a chance to screw up
  496. tester.lock.RLock()
  497. retrieved = len(tester.ownBlocks)
  498. tester.lock.RUnlock()
  499. if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
  500. t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
  501. }
  502. // Permit the blocked blocks to import
  503. if atomic.LoadUint32(&blocked) > 0 {
  504. atomic.StoreUint32(&blocked, uint32(0))
  505. proceed <- struct{}{}
  506. }
  507. }
  508. // Check that we haven't pulled more blocks than available
  509. assertOwnChain(t, tester, targetBlocks+1)
  510. if err := <-errc; err != nil {
  511. t.Fatalf("block synchronization failed: %v", err)
  512. }
  513. }
  514. // Tests that simple synchronization against a forked chain works correctly. In
  515. // this test common ancestor lookup should *not* be short circuited, and a full
  516. // binary search should be executed.
  517. func TestForkedSync62(t *testing.T) { testForkedSync(t, 62, FullSync) }
  518. func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) }
  519. func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) }
  520. func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) }
  521. func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) }
  522. func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
  523. func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
  524. t.Parallel()
  525. tester := newTester()
  526. defer tester.terminate()
  527. chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
  528. chainB := testChainForkLightB.shorten(testChainBase.len() + 80)
  529. tester.newPeer("fork A", protocol, chainA)
  530. tester.newPeer("fork B", protocol, chainB)
  531. // Synchronise with the peer and make sure all blocks were retrieved
  532. if err := tester.sync("fork A", nil, mode); err != nil {
  533. t.Fatalf("failed to synchronise blocks: %v", err)
  534. }
  535. assertOwnChain(t, tester, chainA.len())
  536. // Synchronise with the second peer and make sure that fork is pulled too
  537. if err := tester.sync("fork B", nil, mode); err != nil {
  538. t.Fatalf("failed to synchronise blocks: %v", err)
  539. }
  540. assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
  541. }
  542. // Tests that synchronising against a much shorter but much heavyer fork works
  543. // corrently and is not dropped.
  544. func TestHeavyForkedSync62(t *testing.T) { testHeavyForkedSync(t, 62, FullSync) }
  545. func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) }
  546. func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) }
  547. func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) }
  548. func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) }
  549. func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, LightSync) }
  550. func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
  551. t.Parallel()
  552. tester := newTester()
  553. defer tester.terminate()
  554. chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
  555. chainB := testChainForkHeavy.shorten(testChainBase.len() + 80)
  556. tester.newPeer("light", protocol, chainA)
  557. tester.newPeer("heavy", protocol, chainB)
  558. // Synchronise with the peer and make sure all blocks were retrieved
  559. if err := tester.sync("light", nil, mode); err != nil {
  560. t.Fatalf("failed to synchronise blocks: %v", err)
  561. }
  562. assertOwnChain(t, tester, chainA.len())
  563. // Synchronise with the second peer and make sure that fork is pulled too
  564. if err := tester.sync("heavy", nil, mode); err != nil {
  565. t.Fatalf("failed to synchronise blocks: %v", err)
  566. }
  567. assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
  568. }
  569. // Tests that chain forks are contained within a certain interval of the current
  570. // chain head, ensuring that malicious peers cannot waste resources by feeding
  571. // long dead chains.
  572. func TestBoundedForkedSync62(t *testing.T) { testBoundedForkedSync(t, 62, FullSync) }
  573. func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) }
  574. func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) }
  575. func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) }
  576. func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) }
  577. func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, LightSync) }
  578. func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
  579. t.Parallel()
  580. tester := newTester()
  581. defer tester.terminate()
  582. chainA := testChainForkLightA
  583. chainB := testChainForkLightB
  584. tester.newPeer("original", protocol, chainA)
  585. tester.newPeer("rewriter", protocol, chainB)
  586. // Synchronise with the peer and make sure all blocks were retrieved
  587. if err := tester.sync("original", nil, mode); err != nil {
  588. t.Fatalf("failed to synchronise blocks: %v", err)
  589. }
  590. assertOwnChain(t, tester, chainA.len())
  591. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  592. if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor {
  593. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  594. }
  595. }
  596. // Tests that chain forks are contained within a certain interval of the current
  597. // chain head for short but heavy forks too. These are a bit special because they
  598. // take different ancestor lookup paths.
  599. func TestBoundedHeavyForkedSync62(t *testing.T) { testBoundedHeavyForkedSync(t, 62, FullSync) }
  600. func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) }
  601. func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) }
  602. func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) }
  603. func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) }
  604. func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSync(t, 64, LightSync) }
  605. func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
  606. t.Parallel()
  607. tester := newTester()
  608. defer tester.terminate()
  609. // Create a long enough forked chain
  610. chainA := testChainForkLightA
  611. chainB := testChainForkHeavy
  612. tester.newPeer("original", protocol, chainA)
  613. tester.newPeer("heavy-rewriter", protocol, chainB)
  614. // Synchronise with the peer and make sure all blocks were retrieved
  615. if err := tester.sync("original", nil, mode); err != nil {
  616. t.Fatalf("failed to synchronise blocks: %v", err)
  617. }
  618. assertOwnChain(t, tester, chainA.len())
  619. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  620. if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
  621. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  622. }
  623. }
  624. // Tests that an inactive downloader will not accept incoming block headers and
  625. // bodies.
  626. func TestInactiveDownloader62(t *testing.T) {
  627. t.Parallel()
  628. tester := newTester()
  629. defer tester.terminate()
  630. // Check that neither block headers nor bodies are accepted
  631. if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
  632. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  633. }
  634. if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
  635. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  636. }
  637. }
  638. // Tests that an inactive downloader will not accept incoming block headers,
  639. // bodies and receipts.
  640. func TestInactiveDownloader63(t *testing.T) {
  641. t.Parallel()
  642. tester := newTester()
  643. defer tester.terminate()
  644. // Check that neither block headers nor bodies are accepted
  645. if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
  646. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  647. }
  648. if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
  649. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  650. }
  651. if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive {
  652. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  653. }
  654. }
  655. // Tests that a canceled download wipes all previously accumulated state.
  656. func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) }
  657. func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) }
  658. func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) }
  659. func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) }
  660. func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
  661. func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
  662. func testCancel(t *testing.T, protocol int, mode SyncMode) {
  663. t.Parallel()
  664. tester := newTester()
  665. defer tester.terminate()
  666. chain := testChainBase.shorten(MaxHeaderFetch)
  667. tester.newPeer("peer", protocol, chain)
  668. // Make sure canceling works with a pristine downloader
  669. tester.downloader.Cancel()
  670. if !tester.downloader.queue.Idle() {
  671. t.Errorf("download queue not idle")
  672. }
  673. // Synchronise with the peer, but cancel afterwards
  674. if err := tester.sync("peer", nil, mode); err != nil {
  675. t.Fatalf("failed to synchronise blocks: %v", err)
  676. }
  677. tester.downloader.Cancel()
  678. if !tester.downloader.queue.Idle() {
  679. t.Errorf("download queue not idle")
  680. }
  681. }
  682. // Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
  683. func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) }
  684. func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) }
  685. func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) }
  686. func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) }
  687. func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) }
  688. func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
  689. func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
  690. t.Parallel()
  691. tester := newTester()
  692. defer tester.terminate()
  693. // Create various peers with various parts of the chain
  694. targetPeers := 8
  695. chain := testChainBase.shorten(targetPeers * 100)
  696. for i := 0; i < targetPeers; i++ {
  697. id := fmt.Sprintf("peer #%d", i)
  698. tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1)))
  699. }
  700. if err := tester.sync("peer #0", nil, mode); err != nil {
  701. t.Fatalf("failed to synchronise blocks: %v", err)
  702. }
  703. assertOwnChain(t, tester, chain.len())
  704. }
  705. // Tests that synchronisations behave well in multi-version protocol environments
  706. // and not wreak havoc on other nodes in the network.
  707. func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) }
  708. func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) }
  709. func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) }
  710. func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) }
  711. func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) }
  712. func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
  713. func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
  714. t.Parallel()
  715. tester := newTester()
  716. defer tester.terminate()
  717. // Create a small enough block chain to download
  718. chain := testChainBase.shorten(blockCacheItems - 15)
  719. // Create peers of every type
  720. tester.newPeer("peer 62", 62, chain)
  721. tester.newPeer("peer 63", 63, chain)
  722. tester.newPeer("peer 64", 64, chain)
  723. // Synchronise with the requested peer and make sure all blocks were retrieved
  724. if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil {
  725. t.Fatalf("failed to synchronise blocks: %v", err)
  726. }
  727. assertOwnChain(t, tester, chain.len())
  728. // Check that no peers have been dropped off
  729. for _, version := range []int{62, 63, 64} {
  730. peer := fmt.Sprintf("peer %d", version)
  731. if _, ok := tester.peers[peer]; !ok {
  732. t.Errorf("%s dropped", peer)
  733. }
  734. }
  735. }
  736. // Tests that if a block is empty (e.g. header only), no body request should be
  737. // made, and instead the header should be assembled into a whole block in itself.
  738. func TestEmptyShortCircuit62(t *testing.T) { testEmptyShortCircuit(t, 62, FullSync) }
  739. func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) }
  740. func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) }
  741. func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) }
  742. func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) }
  743. func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
  744. func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
  745. t.Parallel()
  746. tester := newTester()
  747. defer tester.terminate()
  748. // Create a block chain to download
  749. chain := testChainBase
  750. tester.newPeer("peer", protocol, chain)
  751. // Instrument the downloader to signal body requests
  752. bodiesHave, receiptsHave := int32(0), int32(0)
  753. tester.downloader.bodyFetchHook = func(headers []*types.Header) {
  754. atomic.AddInt32(&bodiesHave, int32(len(headers)))
  755. }
  756. tester.downloader.receiptFetchHook = func(headers []*types.Header) {
  757. atomic.AddInt32(&receiptsHave, int32(len(headers)))
  758. }
  759. // Synchronise with the peer and make sure all blocks were retrieved
  760. if err := tester.sync("peer", nil, mode); err != nil {
  761. t.Fatalf("failed to synchronise blocks: %v", err)
  762. }
  763. assertOwnChain(t, tester, chain.len())
  764. // Validate the number of block bodies that should have been requested
  765. bodiesNeeded, receiptsNeeded := 0, 0
  766. for _, block := range chain.blockm {
  767. if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
  768. bodiesNeeded++
  769. }
  770. }
  771. for _, receipt := range chain.receiptm {
  772. if mode == FastSync && len(receipt) > 0 {
  773. receiptsNeeded++
  774. }
  775. }
  776. if int(bodiesHave) != bodiesNeeded {
  777. t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
  778. }
  779. if int(receiptsHave) != receiptsNeeded {
  780. t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
  781. }
  782. }
  783. // Tests that headers are enqueued continuously, preventing malicious nodes from
  784. // stalling the downloader by feeding gapped header chains.
  785. func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62, FullSync) }
  786. func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) }
  787. func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) }
  788. func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) }
  789. func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) }
  790. func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
  791. func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
  792. t.Parallel()
  793. tester := newTester()
  794. defer tester.terminate()
  795. chain := testChainBase.shorten(blockCacheItems - 15)
  796. brokenChain := chain.shorten(chain.len())
  797. delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
  798. tester.newPeer("attack", protocol, brokenChain)
  799. if err := tester.sync("attack", nil, mode); err == nil {
  800. t.Fatalf("succeeded attacker synchronisation")
  801. }
  802. // Synchronise with the valid peer and make sure sync succeeds
  803. tester.newPeer("valid", protocol, chain)
  804. if err := tester.sync("valid", nil, mode); err != nil {
  805. t.Fatalf("failed to synchronise blocks: %v", err)
  806. }
  807. assertOwnChain(t, tester, chain.len())
  808. }
  809. // Tests that if requested headers are shifted (i.e. first is missing), the queue
  810. // detects the invalid numbering.
  811. func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62, FullSync) }
  812. func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) }
  813. func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) }
  814. func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) }
  815. func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) }
  816. func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
  817. func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
  818. t.Parallel()
  819. tester := newTester()
  820. defer tester.terminate()
  821. chain := testChainBase.shorten(blockCacheItems - 15)
  822. // Attempt a full sync with an attacker feeding shifted headers
  823. brokenChain := chain.shorten(chain.len())
  824. delete(brokenChain.headerm, brokenChain.chain[1])
  825. delete(brokenChain.blockm, brokenChain.chain[1])
  826. delete(brokenChain.receiptm, brokenChain.chain[1])
  827. tester.newPeer("attack", protocol, brokenChain)
  828. if err := tester.sync("attack", nil, mode); err == nil {
  829. t.Fatalf("succeeded attacker synchronisation")
  830. }
  831. // Synchronise with the valid peer and make sure sync succeeds
  832. tester.newPeer("valid", protocol, chain)
  833. if err := tester.sync("valid", nil, mode); err != nil {
  834. t.Fatalf("failed to synchronise blocks: %v", err)
  835. }
  836. assertOwnChain(t, tester, chain.len())
  837. }
  838. // Tests that upon detecting an invalid header, the recent ones are rolled back
  839. // for various failure scenarios. Afterwards a full sync is attempted to make
  840. // sure no state was corrupted.
  841. func TestInvalidHeaderRollback63Fast(t *testing.T) { testInvalidHeaderRollback(t, 63, FastSync) }
  842. func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(t, 64, FastSync) }
  843. func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
  844. func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
  845. t.Parallel()
  846. tester := newTester()
  847. defer tester.terminate()
  848. // Create a small enough block chain to download
  849. targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
  850. chain := testChainBase.shorten(targetBlocks)
  851. // Attempt to sync with an attacker that feeds junk during the fast sync phase.
  852. // This should result in the last fsHeaderSafetyNet headers being rolled back.
  853. missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
  854. fastAttackChain := chain.shorten(chain.len())
  855. delete(fastAttackChain.headerm, fastAttackChain.chain[missing])
  856. tester.newPeer("fast-attack", protocol, fastAttackChain)
  857. if err := tester.sync("fast-attack", nil, mode); err == nil {
  858. t.Fatalf("succeeded fast attacker synchronisation")
  859. }
  860. if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
  861. t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
  862. }
  863. // Attempt to sync with an attacker that feeds junk during the block import phase.
  864. // This should result in both the last fsHeaderSafetyNet number of headers being
  865. // rolled back, and also the pivot point being reverted to a non-block status.
  866. missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
  867. blockAttackChain := chain.shorten(chain.len())
  868. delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in
  869. delete(blockAttackChain.headerm, blockAttackChain.chain[missing])
  870. tester.newPeer("block-attack", protocol, blockAttackChain)
  871. if err := tester.sync("block-attack", nil, mode); err == nil {
  872. t.Fatalf("succeeded block attacker synchronisation")
  873. }
  874. if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  875. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  876. }
  877. if mode == FastSync {
  878. if head := tester.CurrentBlock().NumberU64(); head != 0 {
  879. t.Errorf("fast sync pivot block #%d not rolled back", head)
  880. }
  881. }
  882. // Attempt to sync with an attacker that withholds promised blocks after the
  883. // fast sync pivot point. This could be a trial to leave the node with a bad
  884. // but already imported pivot block.
  885. withholdAttackChain := chain.shorten(chain.len())
  886. tester.newPeer("withhold-attack", protocol, withholdAttackChain)
  887. tester.downloader.syncInitHook = func(uint64, uint64) {
  888. for i := missing; i < withholdAttackChain.len(); i++ {
  889. delete(withholdAttackChain.headerm, withholdAttackChain.chain[i])
  890. }
  891. tester.downloader.syncInitHook = nil
  892. }
  893. if err := tester.sync("withhold-attack", nil, mode); err == nil {
  894. t.Fatalf("succeeded withholding attacker synchronisation")
  895. }
  896. if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  897. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  898. }
  899. if mode == FastSync {
  900. if head := tester.CurrentBlock().NumberU64(); head != 0 {
  901. t.Errorf("fast sync pivot block #%d not rolled back", head)
  902. }
  903. }
  904. // synchronise with the valid peer and make sure sync succeeds. Since the last rollback
  905. // should also disable fast syncing for this process, verify that we did a fresh full
  906. // sync. Note, we can't assert anything about the receipts since we won't purge the
  907. // database of them, hence we can't use assertOwnChain.
  908. tester.newPeer("valid", protocol, chain)
  909. if err := tester.sync("valid", nil, mode); err != nil {
  910. t.Fatalf("failed to synchronise blocks: %v", err)
  911. }
  912. if hs := len(tester.ownHeaders); hs != chain.len() {
  913. t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len())
  914. }
  915. if mode != LightSync {
  916. if bs := len(tester.ownBlocks); bs != chain.len() {
  917. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
  918. }
  919. }
  920. }
  921. // Tests that a peer advertising an high TD doesn't get to stall the downloader
  922. // afterwards by not sending any useful hashes.
  923. func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) }
  924. func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) }
  925. func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) }
  926. func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) }
  927. func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) }
  928. func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
  929. func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
  930. t.Parallel()
  931. tester := newTester()
  932. defer tester.terminate()
  933. chain := testChainBase.shorten(1)
  934. tester.newPeer("attack", protocol, chain)
  935. if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
  936. t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
  937. }
  938. }
  939. // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
  940. func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) }
  941. func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
  942. func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
  943. func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
  944. t.Parallel()
  945. // Define the disconnection requirement for individual hash fetch errors
  946. tests := []struct {
  947. result error
  948. drop bool
  949. }{
  950. {nil, false}, // Sync succeeded, all is well
  951. {errBusy, false}, // Sync is already in progress, no problem
  952. {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
  953. {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
  954. {errStallingPeer, true}, // Peer was detected to be stalling, drop it
  955. {errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it
  956. {errNoPeers, false}, // No peers to download from, soft race, no issue
  957. {errTimeout, true}, // No hashes received in due time, drop the peer
  958. {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
  959. {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
  960. {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
  961. {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
  962. {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
  963. {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
  964. {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
  965. {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  966. {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  967. {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  968. {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  969. {errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  970. {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  971. }
  972. // Run the tests and check disconnection status
  973. tester := newTester()
  974. defer tester.terminate()
  975. chain := testChainBase.shorten(1)
  976. for i, tt := range tests {
  977. // Register a new peer and ensure it's presence
  978. id := fmt.Sprintf("test %d", i)
  979. if err := tester.newPeer(id, protocol, chain); err != nil {
  980. t.Fatalf("test %d: failed to register new peer: %v", i, err)
  981. }
  982. if _, ok := tester.peers[id]; !ok {
  983. t.Fatalf("test %d: registered peer not found", i)
  984. }
  985. // Simulate a synchronisation and check the required result
  986. tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
  987. tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync)
  988. if _, ok := tester.peers[id]; !ok != tt.drop {
  989. t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
  990. }
  991. }
  992. }
  993. // Tests that synchronisation progress (origin block number, current block number
  994. // and highest block number) is tracked and updated correctly.
  995. func TestSyncProgress62(t *testing.T) { testSyncProgress(t, 62, FullSync) }
  996. func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) }
  997. func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) }
  998. func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) }
  999. func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
  1000. func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
  1001. func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1002. t.Parallel()
  1003. tester := newTester()
  1004. defer tester.terminate()
  1005. chain := testChainBase.shorten(blockCacheItems - 15)
  1006. // Set a sync init hook to catch progress changes
  1007. starting := make(chan struct{})
  1008. progress := make(chan struct{})
  1009. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1010. starting <- struct{}{}
  1011. <-progress
  1012. }
  1013. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1014. // Synchronise half the blocks and check initial progress
  1015. tester.newPeer("peer-half", protocol, chain.shorten(chain.len()/2))
  1016. pending := new(sync.WaitGroup)
  1017. pending.Add(1)
  1018. go func() {
  1019. defer pending.Done()
  1020. if err := tester.sync("peer-half", nil, mode); err != nil {
  1021. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1022. }
  1023. }()
  1024. <-starting
  1025. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1026. HighestBlock: uint64(chain.len()/2 - 1),
  1027. })
  1028. progress <- struct{}{}
  1029. pending.Wait()
  1030. // Synchronise all the blocks and check continuation progress
  1031. tester.newPeer("peer-full", protocol, chain)
  1032. pending.Add(1)
  1033. go func() {
  1034. defer pending.Done()
  1035. if err := tester.sync("peer-full", nil, mode); err != nil {
  1036. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1037. }
  1038. }()
  1039. <-starting
  1040. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  1041. StartingBlock: uint64(chain.len()/2 - 1),
  1042. CurrentBlock: uint64(chain.len()/2 - 1),
  1043. HighestBlock: uint64(chain.len() - 1),
  1044. })
  1045. // Check final progress after successful sync
  1046. progress <- struct{}{}
  1047. pending.Wait()
  1048. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1049. StartingBlock: uint64(chain.len()/2 - 1),
  1050. CurrentBlock: uint64(chain.len() - 1),
  1051. HighestBlock: uint64(chain.len() - 1),
  1052. })
  1053. }
  1054. func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) {
  1055. // Mark this method as a helper to report errors at callsite, not in here
  1056. t.Helper()
  1057. p := d.Progress()
  1058. p.KnownStates, p.PulledStates = 0, 0
  1059. want.KnownStates, want.PulledStates = 0, 0
  1060. if p != want {
  1061. t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want)
  1062. }
  1063. }
  1064. // Tests that synchronisation progress (origin block number and highest block
  1065. // number) is tracked and updated correctly in case of a fork (or manual head
  1066. // revertal).
  1067. func TestForkedSyncProgress62(t *testing.T) { testForkedSyncProgress(t, 62, FullSync) }
  1068. func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) }
  1069. func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) }
  1070. func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) }
  1071. func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) }
  1072. func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
  1073. func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1074. t.Parallel()
  1075. tester := newTester()
  1076. defer tester.terminate()
  1077. chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch)
  1078. chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch)
  1079. // Set a sync init hook to catch progress changes
  1080. starting := make(chan struct{})
  1081. progress := make(chan struct{})
  1082. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1083. starting <- struct{}{}
  1084. <-progress
  1085. }
  1086. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1087. // Synchronise with one of the forks and check progress
  1088. tester.newPeer("fork A", protocol, chainA)
  1089. pending := new(sync.WaitGroup)
  1090. pending.Add(1)
  1091. go func() {
  1092. defer pending.Done()
  1093. if err := tester.sync("fork A", nil, mode); err != nil {
  1094. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1095. }
  1096. }()
  1097. <-starting
  1098. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1099. HighestBlock: uint64(chainA.len() - 1),
  1100. })
  1101. progress <- struct{}{}
  1102. pending.Wait()
  1103. // Simulate a successful sync above the fork
  1104. tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight
  1105. // Synchronise with the second fork and check progress resets
  1106. tester.newPeer("fork B", protocol, chainB)
  1107. pending.Add(1)
  1108. go func() {
  1109. defer pending.Done()
  1110. if err := tester.sync("fork B", nil, mode); err != nil {
  1111. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1112. }
  1113. }()
  1114. <-starting
  1115. checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{
  1116. StartingBlock: uint64(testChainBase.len()) - 1,
  1117. CurrentBlock: uint64(chainA.len() - 1),
  1118. HighestBlock: uint64(chainB.len() - 1),
  1119. })
  1120. // Check final progress after successful sync
  1121. progress <- struct{}{}
  1122. pending.Wait()
  1123. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1124. StartingBlock: uint64(testChainBase.len()) - 1,
  1125. CurrentBlock: uint64(chainB.len() - 1),
  1126. HighestBlock: uint64(chainB.len() - 1),
  1127. })
  1128. }
  1129. // Tests that if synchronisation is aborted due to some failure, then the progress
  1130. // origin is not updated in the next sync cycle, as it should be considered the
  1131. // continuation of the previous sync and not a new instance.
  1132. func TestFailedSyncProgress62(t *testing.T) { testFailedSyncProgress(t, 62, FullSync) }
  1133. func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) }
  1134. func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) }
  1135. func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) }
  1136. func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) }
  1137. func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
  1138. func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1139. t.Parallel()
  1140. tester := newTester()
  1141. defer tester.terminate()
  1142. chain := testChainBase.shorten(blockCacheItems - 15)
  1143. // Set a sync init hook to catch progress changes
  1144. starting := make(chan struct{})
  1145. progress := make(chan struct{})
  1146. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1147. starting <- struct{}{}
  1148. <-progress
  1149. }
  1150. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1151. // Attempt a full sync with a faulty peer
  1152. brokenChain := chain.shorten(chain.len())
  1153. missing := brokenChain.len() / 2
  1154. delete(brokenChain.headerm, brokenChain.chain[missing])
  1155. delete(brokenChain.blockm, brokenChain.chain[missing])
  1156. delete(brokenChain.receiptm, brokenChain.chain[missing])
  1157. tester.newPeer("faulty", protocol, brokenChain)
  1158. pending := new(sync.WaitGroup)
  1159. pending.Add(1)
  1160. go func() {
  1161. defer pending.Done()
  1162. if err := tester.sync("faulty", nil, mode); err == nil {
  1163. panic("succeeded faulty synchronisation")
  1164. }
  1165. }()
  1166. <-starting
  1167. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1168. HighestBlock: uint64(brokenChain.len() - 1),
  1169. })
  1170. progress <- struct{}{}
  1171. pending.Wait()
  1172. afterFailedSync := tester.downloader.Progress()
  1173. // Synchronise with a good peer and check that the progress origin remind the same
  1174. // after a failure
  1175. tester.newPeer("valid", protocol, chain)
  1176. pending.Add(1)
  1177. go func() {
  1178. defer pending.Done()
  1179. if err := tester.sync("valid", nil, mode); err != nil {
  1180. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1181. }
  1182. }()
  1183. <-starting
  1184. checkProgress(t, tester.downloader, "completing", afterFailedSync)
  1185. // Check final progress after successful sync
  1186. progress <- struct{}{}
  1187. pending.Wait()
  1188. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1189. CurrentBlock: uint64(chain.len() - 1),
  1190. HighestBlock: uint64(chain.len() - 1),
  1191. })
  1192. }
  1193. // Tests that if an attacker fakes a chain height, after the attack is detected,
  1194. // the progress height is successfully reduced at the next sync invocation.
  1195. func TestFakedSyncProgress62(t *testing.T) { testFakedSyncProgress(t, 62, FullSync) }
  1196. func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) }
  1197. func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) }
  1198. func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) }
  1199. func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) }
  1200. func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
  1201. func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1202. t.Parallel()
  1203. tester := newTester()
  1204. defer tester.terminate()
  1205. chain := testChainBase.shorten(blockCacheItems - 15)
  1206. // Set a sync init hook to catch progress changes
  1207. starting := make(chan struct{})
  1208. progress := make(chan struct{})
  1209. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1210. starting <- struct{}{}
  1211. <-progress
  1212. }
  1213. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1214. // Create and sync with an attacker that promises a higher chain than available.
  1215. brokenChain := chain.shorten(chain.len())
  1216. numMissing := 5
  1217. for i := brokenChain.len() - 2; i > brokenChain.len()-numMissing; i-- {
  1218. delete(brokenChain.headerm, brokenChain.chain[i])
  1219. }
  1220. tester.newPeer("attack", protocol, brokenChain)
  1221. pending := new(sync.WaitGroup)
  1222. pending.Add(1)
  1223. go func() {
  1224. defer pending.Done()
  1225. if err := tester.sync("attack", nil, mode); err == nil {
  1226. panic("succeeded attacker synchronisation")
  1227. }
  1228. }()
  1229. <-starting
  1230. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1231. HighestBlock: uint64(brokenChain.len() - 1),
  1232. })
  1233. progress <- struct{}{}
  1234. pending.Wait()
  1235. afterFailedSync := tester.downloader.Progress()
  1236. // Synchronise with a good peer and check that the progress height has been reduced to
  1237. // the true value.
  1238. validChain := chain.shorten(chain.len() - numMissing)
  1239. tester.newPeer("valid", protocol, validChain)
  1240. pending.Add(1)
  1241. go func() {
  1242. defer pending.Done()
  1243. if err := tester.sync("valid", nil, mode); err != nil {
  1244. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1245. }
  1246. }()
  1247. <-starting
  1248. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  1249. CurrentBlock: afterFailedSync.CurrentBlock,
  1250. HighestBlock: uint64(validChain.len() - 1),
  1251. })
  1252. // Check final progress after successful sync.
  1253. progress <- struct{}{}
  1254. pending.Wait()
  1255. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1256. CurrentBlock: uint64(validChain.len() - 1),
  1257. HighestBlock: uint64(validChain.len() - 1),
  1258. })
  1259. }
  1260. // This test reproduces an issue where unexpected deliveries would
  1261. // block indefinitely if they arrived at the right time.
  1262. func TestDeliverHeadersHang(t *testing.T) {
  1263. t.Parallel()
  1264. testCases := []struct {
  1265. protocol int
  1266. syncMode SyncMode
  1267. }{
  1268. {62, FullSync},
  1269. {63, FullSync},
  1270. {63, FastSync},
  1271. {64, FullSync},
  1272. {64, FastSync},
  1273. {64, LightSync},
  1274. }
  1275. for _, tc := range testCases {
  1276. t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) {
  1277. t.Parallel()
  1278. testDeliverHeadersHang(t, tc.protocol, tc.syncMode)
  1279. })
  1280. }
  1281. }
  1282. func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
  1283. master := newTester()
  1284. defer master.terminate()
  1285. chain := testChainBase.shorten(15)
  1286. for i := 0; i < 200; i++ {
  1287. tester := newTester()
  1288. tester.peerDb = master.peerDb
  1289. tester.newPeer("peer", protocol, chain)
  1290. // Whenever the downloader requests headers, flood it with
  1291. // a lot of unrequested header deliveries.
  1292. tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
  1293. peer: tester.downloader.peers.peers["peer"].peer,
  1294. tester: tester,
  1295. }
  1296. if err := tester.sync("peer", nil, mode); err != nil {
  1297. t.Errorf("test %d: sync failed: %v", i, err)
  1298. }
  1299. tester.terminate()
  1300. }
  1301. }
  1302. type floodingTestPeer struct {
  1303. peer Peer
  1304. tester *downloadTester
  1305. }
  1306. func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
  1307. func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error {
  1308. return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse)
  1309. }
  1310. func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error {
  1311. return ftp.peer.RequestBodies(hashes)
  1312. }
  1313. func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error {
  1314. return ftp.peer.RequestReceipts(hashes)
  1315. }
  1316. func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error {
  1317. return ftp.peer.RequestNodeData(hashes)
  1318. }
  1319. func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error {
  1320. deliveriesDone := make(chan struct{}, 500)
  1321. for i := 0; i < cap(deliveriesDone)-1; i++ {
  1322. peer := fmt.Sprintf("fake-peer%d", i)
  1323. go func() {
  1324. ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
  1325. deliveriesDone <- struct{}{}
  1326. }()
  1327. }
  1328. // None of the extra deliveries should block.
  1329. timeout := time.After(60 * time.Second)
  1330. launched := false
  1331. for i := 0; i < cap(deliveriesDone); i++ {
  1332. select {
  1333. case <-deliveriesDone:
  1334. if !launched {
  1335. // Start delivering the requested headers
  1336. // after one of the flooding responses has arrived.
  1337. go func() {
  1338. ftp.peer.RequestHeadersByNumber(from, count, skip, reverse)
  1339. deliveriesDone <- struct{}{}
  1340. }()
  1341. launched = true
  1342. }
  1343. case <-timeout:
  1344. panic("blocked")
  1345. }
  1346. }
  1347. return nil
  1348. }
  1349. func TestRemoteHeaderRequestSpan(t *testing.T) {
  1350. testCases := []struct {
  1351. remoteHeight uint64
  1352. localHeight uint64
  1353. expected []int
  1354. }{
  1355. // Remote is way higher. We should ask for the remote head and go backwards
  1356. {1500, 1000,
  1357. []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499},
  1358. },
  1359. {15000, 13006,
  1360. []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999},
  1361. },
  1362. //Remote is pretty close to us. We don't have to fetch as many
  1363. {1200, 1150,
  1364. []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199},
  1365. },
  1366. // Remote is equal to us (so on a fork with higher td)
  1367. // We should get the closest couple of ancestors
  1368. {1500, 1500,
  1369. []int{1497, 1499},
  1370. },
  1371. // We're higher than the remote! Odd
  1372. {1000, 1500,
  1373. []int{997, 999},
  1374. },
  1375. // Check some weird edgecases that it behaves somewhat rationally
  1376. {0, 1500,
  1377. []int{0, 2},
  1378. },
  1379. {6000000, 0,
  1380. []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999},
  1381. },
  1382. {0, 0,
  1383. []int{0, 2},
  1384. },
  1385. }
  1386. reqs := func(from, count, span int) []int {
  1387. var r []int
  1388. num := from
  1389. for len(r) < count {
  1390. r = append(r, num)
  1391. num += span + 1
  1392. }
  1393. return r
  1394. }
  1395. for i, tt := range testCases {
  1396. from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
  1397. data := reqs(int(from), count, span)
  1398. if max != uint64(data[len(data)-1]) {
  1399. t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
  1400. }
  1401. failed := false
  1402. if len(data) != len(tt.expected) {
  1403. failed = true
  1404. t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
  1405. } else {
  1406. for j, n := range data {
  1407. if n != tt.expected[j] {
  1408. failed = true
  1409. break
  1410. }
  1411. }
  1412. }
  1413. if failed {
  1414. res := strings.Replace(fmt.Sprint(data), " ", ",", -1)
  1415. exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1)
  1416. fmt.Printf("got: %v\n", res)
  1417. fmt.Printf("exp: %v\n", exp)
  1418. t.Errorf("test %d: wrong values", i)
  1419. }
  1420. }
  1421. }
  1422. // Tests that peers below a pre-configured checkpoint block are prevented from
  1423. // being fast-synced from, avoiding potential cheap eclipse attacks.
  1424. func TestCheckpointEnforcement62(t *testing.T) { testCheckpointEnforcement(t, 62, FullSync) }
  1425. func TestCheckpointEnforcement63Full(t *testing.T) { testCheckpointEnforcement(t, 63, FullSync) }
  1426. func TestCheckpointEnforcement63Fast(t *testing.T) { testCheckpointEnforcement(t, 63, FastSync) }
  1427. func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) }
  1428. func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) }
  1429. func TestCheckpointEnforcement64Light(t *testing.T) { testCheckpointEnforcement(t, 64, LightSync) }
  1430. func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
  1431. t.Parallel()
  1432. // Create a new tester with a particular hard coded checkpoint block
  1433. tester := newTester()
  1434. defer tester.terminate()
  1435. tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256
  1436. chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1)
  1437. // Attempt to sync with the peer and validate the result
  1438. tester.newPeer("peer", protocol, chain)
  1439. var expect error
  1440. if mode == FastSync || mode == LightSync {
  1441. expect = errUnsyncedPeer
  1442. }
  1443. if err := tester.sync("peer", nil, mode); err != expect {
  1444. t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
  1445. }
  1446. if mode == FastSync || mode == LightSync {
  1447. assertOwnChain(t, tester, 1)
  1448. } else {
  1449. assertOwnChain(t, tester, chain.len())
  1450. }
  1451. }