downloader_test.go 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "strings"
  22. "sync"
  23. "sync/atomic"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/core/rawdb"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/trie"
  33. )
  34. // Reduce some of the parameters to make the tester faster.
  35. func init() {
  36. maxForkAncestry = 10000
  37. blockCacheItems = 1024
  38. fsHeaderContCheck = 500 * time.Millisecond
  39. }
  40. // downloadTester is a test simulator for mocking out local block chain.
  41. type downloadTester struct {
  42. downloader *Downloader
  43. genesis *types.Block // Genesis blocks used by the tester and peers
  44. stateDb ethdb.Database // Database used by the tester for syncing from peers
  45. peerDb ethdb.Database // Database of the peers containing all data
  46. peers map[string]*downloadTesterPeer
  47. ownHashes []common.Hash // Hash chain belonging to the tester
  48. ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
  49. ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
  50. ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
  51. ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
  52. ancientHeaders map[common.Hash]*types.Header // Ancient headers belonging to the tester
  53. ancientBlocks map[common.Hash]*types.Block // Ancient blocks belonging to the tester
  54. ancientReceipts map[common.Hash]types.Receipts // Ancient receipts belonging to the tester
  55. ancientChainTd map[common.Hash]*big.Int // Ancient total difficulties of the blocks in the local chain
  56. lock sync.RWMutex
  57. }
  58. // newTester creates a new downloader test mocker.
  59. func newTester() *downloadTester {
  60. tester := &downloadTester{
  61. genesis: testGenesis,
  62. peerDb: testDB,
  63. peers: make(map[string]*downloadTesterPeer),
  64. ownHashes: []common.Hash{testGenesis.Hash()},
  65. ownHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
  66. ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
  67. ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
  68. ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
  69. // Initialize ancient store with test genesis block
  70. ancientHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()},
  71. ancientBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis},
  72. ancientReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil},
  73. ancientChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()},
  74. }
  75. tester.stateDb = rawdb.NewMemoryDatabase()
  76. tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})
  77. tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer)
  78. return tester
  79. }
  80. // terminate aborts any operations on the embedded downloader and releases all
  81. // held resources.
  82. func (dl *downloadTester) terminate() {
  83. dl.downloader.Terminate()
  84. }
  85. // sync starts synchronizing with a remote peer, blocking until it completes.
  86. func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
  87. dl.lock.RLock()
  88. hash := dl.peers[id].chain.headBlock().Hash()
  89. // If no particular TD was requested, load from the peer's blockchain
  90. if td == nil {
  91. td = dl.peers[id].chain.td(hash)
  92. }
  93. dl.lock.RUnlock()
  94. // Synchronise with the chosen peer and ensure proper cleanup afterwards
  95. err := dl.downloader.synchronise(id, hash, td, mode)
  96. select {
  97. case <-dl.downloader.cancelCh:
  98. // Ok, downloader fully cancelled after sync cycle
  99. default:
  100. // Downloader is still accepting packets, can block a peer up
  101. panic("downloader active post sync cycle") // panic will be caught by tester
  102. }
  103. return err
  104. }
  105. // HasHeader checks if a header is present in the testers canonical chain.
  106. func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
  107. return dl.GetHeaderByHash(hash) != nil
  108. }
  109. // HasBlock checks if a block is present in the testers canonical chain.
  110. func (dl *downloadTester) HasBlock(hash common.Hash, number uint64) bool {
  111. return dl.GetBlockByHash(hash) != nil
  112. }
  113. // HasFastBlock checks if a block is present in the testers canonical chain.
  114. func (dl *downloadTester) HasFastBlock(hash common.Hash, number uint64) bool {
  115. dl.lock.RLock()
  116. defer dl.lock.RUnlock()
  117. if _, ok := dl.ancientReceipts[hash]; ok {
  118. return true
  119. }
  120. _, ok := dl.ownReceipts[hash]
  121. return ok
  122. }
  123. // GetHeader retrieves a header from the testers canonical chain.
  124. func (dl *downloadTester) GetHeaderByHash(hash common.Hash) *types.Header {
  125. dl.lock.RLock()
  126. defer dl.lock.RUnlock()
  127. return dl.getHeaderByHash(hash)
  128. }
  129. // getHeaderByHash returns the header if found either within ancients or own blocks)
  130. // This method assumes that the caller holds at least the read-lock (dl.lock)
  131. func (dl *downloadTester) getHeaderByHash(hash common.Hash) *types.Header {
  132. header := dl.ancientHeaders[hash]
  133. if header != nil {
  134. return header
  135. }
  136. return dl.ownHeaders[hash]
  137. }
  138. // GetBlock retrieves a block from the testers canonical chain.
  139. func (dl *downloadTester) GetBlockByHash(hash common.Hash) *types.Block {
  140. dl.lock.RLock()
  141. defer dl.lock.RUnlock()
  142. block := dl.ancientBlocks[hash]
  143. if block != nil {
  144. return block
  145. }
  146. return dl.ownBlocks[hash]
  147. }
  148. // CurrentHeader retrieves the current head header from the canonical chain.
  149. func (dl *downloadTester) CurrentHeader() *types.Header {
  150. dl.lock.RLock()
  151. defer dl.lock.RUnlock()
  152. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  153. if header := dl.ancientHeaders[dl.ownHashes[i]]; header != nil {
  154. return header
  155. }
  156. if header := dl.ownHeaders[dl.ownHashes[i]]; header != nil {
  157. return header
  158. }
  159. }
  160. return dl.genesis.Header()
  161. }
  162. // CurrentBlock retrieves the current head block from the canonical chain.
  163. func (dl *downloadTester) CurrentBlock() *types.Block {
  164. dl.lock.RLock()
  165. defer dl.lock.RUnlock()
  166. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  167. if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
  168. if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
  169. return block
  170. }
  171. return block
  172. }
  173. if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
  174. if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
  175. return block
  176. }
  177. }
  178. }
  179. return dl.genesis
  180. }
  181. // CurrentFastBlock retrieves the current head fast-sync block from the canonical chain.
  182. func (dl *downloadTester) CurrentFastBlock() *types.Block {
  183. dl.lock.RLock()
  184. defer dl.lock.RUnlock()
  185. for i := len(dl.ownHashes) - 1; i >= 0; i-- {
  186. if block := dl.ancientBlocks[dl.ownHashes[i]]; block != nil {
  187. return block
  188. }
  189. if block := dl.ownBlocks[dl.ownHashes[i]]; block != nil {
  190. return block
  191. }
  192. }
  193. return dl.genesis
  194. }
  195. // FastSyncCommitHead manually sets the head block to a given hash.
  196. func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error {
  197. // For now only check that the state trie is correct
  198. if block := dl.GetBlockByHash(hash); block != nil {
  199. _, err := trie.NewSecure(block.Root(), trie.NewDatabase(dl.stateDb))
  200. return err
  201. }
  202. return fmt.Errorf("non existent block: %x", hash[:4])
  203. }
  204. // GetTd retrieves the block's total difficulty from the canonical chain.
  205. func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
  206. dl.lock.RLock()
  207. defer dl.lock.RUnlock()
  208. return dl.getTd(hash)
  209. }
  210. // getTd retrieves the block's total difficulty if found either within
  211. // ancients or own blocks).
  212. // This method assumes that the caller holds at least the read-lock (dl.lock)
  213. func (dl *downloadTester) getTd(hash common.Hash) *big.Int {
  214. if td := dl.ancientChainTd[hash]; td != nil {
  215. return td
  216. }
  217. return dl.ownChainTd[hash]
  218. }
  219. // InsertHeaderChain injects a new batch of headers into the simulated chain.
  220. func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
  221. dl.lock.Lock()
  222. defer dl.lock.Unlock()
  223. // Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
  224. if dl.getHeaderByHash(headers[0].ParentHash) == nil {
  225. return 0, fmt.Errorf("InsertHeaderChain: unknown parent at first position, parent of number %d", headers[0].Number)
  226. }
  227. var hashes []common.Hash
  228. for i := 1; i < len(headers); i++ {
  229. hash := headers[i-1].Hash()
  230. if headers[i].ParentHash != headers[i-1].Hash() {
  231. return i, fmt.Errorf("non-contiguous import at position %d", i)
  232. }
  233. hashes = append(hashes, hash)
  234. }
  235. hashes = append(hashes, headers[len(headers)-1].Hash())
  236. // Do a full insert if pre-checks passed
  237. for i, header := range headers {
  238. hash := hashes[i]
  239. if dl.getHeaderByHash(hash) != nil {
  240. continue
  241. }
  242. if dl.getHeaderByHash(header.ParentHash) == nil {
  243. // This _should_ be impossible, due to precheck and induction
  244. return i, fmt.Errorf("InsertHeaderChain: unknown parent at position %d", i)
  245. }
  246. dl.ownHashes = append(dl.ownHashes, hash)
  247. dl.ownHeaders[hash] = header
  248. td := dl.getTd(header.ParentHash)
  249. dl.ownChainTd[hash] = new(big.Int).Add(td, header.Difficulty)
  250. }
  251. return len(headers), nil
  252. }
  253. // InsertChain injects a new batch of blocks into the simulated chain.
  254. func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
  255. dl.lock.Lock()
  256. defer dl.lock.Unlock()
  257. for i, block := range blocks {
  258. if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
  259. return i, fmt.Errorf("InsertChain: unknown parent at position %d / %d", i, len(blocks))
  260. } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
  261. return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err)
  262. }
  263. if _, ok := dl.ownHeaders[block.Hash()]; !ok {
  264. dl.ownHashes = append(dl.ownHashes, block.Hash())
  265. dl.ownHeaders[block.Hash()] = block.Header()
  266. }
  267. dl.ownBlocks[block.Hash()] = block
  268. dl.ownReceipts[block.Hash()] = make(types.Receipts, 0)
  269. dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
  270. td := dl.getTd(block.ParentHash())
  271. dl.ownChainTd[block.Hash()] = new(big.Int).Add(td, block.Difficulty())
  272. }
  273. return len(blocks), nil
  274. }
  275. // InsertReceiptChain injects a new batch of receipts into the simulated chain.
  276. func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []types.Receipts, ancientLimit uint64) (i int, err error) {
  277. dl.lock.Lock()
  278. defer dl.lock.Unlock()
  279. for i := 0; i < len(blocks) && i < len(receipts); i++ {
  280. if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
  281. return i, errors.New("unknown owner")
  282. }
  283. if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
  284. if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
  285. return i, errors.New("InsertReceiptChain: unknown parent")
  286. }
  287. }
  288. if blocks[i].NumberU64() <= ancientLimit {
  289. dl.ancientBlocks[blocks[i].Hash()] = blocks[i]
  290. dl.ancientReceipts[blocks[i].Hash()] = receipts[i]
  291. // Migrate from active db to ancient db
  292. dl.ancientHeaders[blocks[i].Hash()] = blocks[i].Header()
  293. dl.ancientChainTd[blocks[i].Hash()] = new(big.Int).Add(dl.ancientChainTd[blocks[i].ParentHash()], blocks[i].Difficulty())
  294. delete(dl.ownHeaders, blocks[i].Hash())
  295. delete(dl.ownChainTd, blocks[i].Hash())
  296. } else {
  297. dl.ownBlocks[blocks[i].Hash()] = blocks[i]
  298. dl.ownReceipts[blocks[i].Hash()] = receipts[i]
  299. }
  300. }
  301. return len(blocks), nil
  302. }
  303. // Rollback removes some recently added elements from the chain.
  304. func (dl *downloadTester) Rollback(hashes []common.Hash) {
  305. dl.lock.Lock()
  306. defer dl.lock.Unlock()
  307. for i := len(hashes) - 1; i >= 0; i-- {
  308. if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] {
  309. dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1]
  310. }
  311. delete(dl.ownChainTd, hashes[i])
  312. delete(dl.ownHeaders, hashes[i])
  313. delete(dl.ownReceipts, hashes[i])
  314. delete(dl.ownBlocks, hashes[i])
  315. delete(dl.ancientChainTd, hashes[i])
  316. delete(dl.ancientHeaders, hashes[i])
  317. delete(dl.ancientReceipts, hashes[i])
  318. delete(dl.ancientBlocks, hashes[i])
  319. }
  320. }
  321. // newPeer registers a new block download source into the downloader.
  322. func (dl *downloadTester) newPeer(id string, version int, chain *testChain) error {
  323. dl.lock.Lock()
  324. defer dl.lock.Unlock()
  325. peer := &downloadTesterPeer{dl: dl, id: id, chain: chain}
  326. dl.peers[id] = peer
  327. return dl.downloader.RegisterPeer(id, version, peer)
  328. }
  329. // dropPeer simulates a hard peer removal from the connection pool.
  330. func (dl *downloadTester) dropPeer(id string) {
  331. dl.lock.Lock()
  332. defer dl.lock.Unlock()
  333. delete(dl.peers, id)
  334. dl.downloader.UnregisterPeer(id)
  335. }
  336. type downloadTesterPeer struct {
  337. dl *downloadTester
  338. id string
  339. lock sync.RWMutex
  340. chain *testChain
  341. missingStates map[common.Hash]bool // State entries that fast sync should not return
  342. }
  343. // Head constructs a function to retrieve a peer's current head hash
  344. // and total difficulty.
  345. func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
  346. b := dlp.chain.headBlock()
  347. return b.Hash(), dlp.chain.td(b.Hash())
  348. }
  349. // RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
  350. // origin; associated with a particular peer in the download tester. The returned
  351. // function can be used to retrieve batches of headers from the particular peer.
  352. func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  353. if reverse {
  354. panic("reverse header requests not supported")
  355. }
  356. result := dlp.chain.headersByHash(origin, amount, skip)
  357. go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
  358. return nil
  359. }
  360. // RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
  361. // origin; associated with a particular peer in the download tester. The returned
  362. // function can be used to retrieve batches of headers from the particular peer.
  363. func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  364. if reverse {
  365. panic("reverse header requests not supported")
  366. }
  367. result := dlp.chain.headersByNumber(origin, amount, skip)
  368. go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
  369. return nil
  370. }
  371. // RequestBodies constructs a getBlockBodies method associated with a particular
  372. // peer in the download tester. The returned function can be used to retrieve
  373. // batches of block bodies from the particularly requested peer.
  374. func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
  375. txs, uncles := dlp.chain.bodies(hashes)
  376. go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles)
  377. return nil
  378. }
  379. // RequestReceipts constructs a getReceipts method associated with a particular
  380. // peer in the download tester. The returned function can be used to retrieve
  381. // batches of block receipts from the particularly requested peer.
  382. func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
  383. receipts := dlp.chain.receipts(hashes)
  384. go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts)
  385. return nil
  386. }
  387. // RequestNodeData constructs a getNodeData method associated with a particular
  388. // peer in the download tester. The returned function can be used to retrieve
  389. // batches of node state data from the particularly requested peer.
  390. func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
  391. dlp.dl.lock.RLock()
  392. defer dlp.dl.lock.RUnlock()
  393. results := make([][]byte, 0, len(hashes))
  394. for _, hash := range hashes {
  395. if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
  396. if !dlp.missingStates[hash] {
  397. results = append(results, data)
  398. }
  399. }
  400. }
  401. go dlp.dl.downloader.DeliverNodeData(dlp.id, results)
  402. return nil
  403. }
  404. // assertOwnChain checks if the local chain contains the correct number of items
  405. // of the various chain components.
  406. func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
  407. // Mark this method as a helper to report errors at callsite, not in here
  408. t.Helper()
  409. assertOwnForkedChain(t, tester, 1, []int{length})
  410. }
  411. // assertOwnForkedChain checks if the local forked chain contains the correct
  412. // number of items of the various chain components.
  413. func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
  414. // Mark this method as a helper to report errors at callsite, not in here
  415. t.Helper()
  416. // Initialize the counters for the first fork
  417. headers, blocks, receipts := lengths[0], lengths[0], lengths[0]
  418. // Update the counters for each subsequent fork
  419. for _, length := range lengths[1:] {
  420. headers += length - common
  421. blocks += length - common
  422. receipts += length - common
  423. }
  424. if tester.downloader.mode == LightSync {
  425. blocks, receipts = 1, 1
  426. }
  427. if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers {
  428. t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
  429. }
  430. if bs := len(tester.ownBlocks) + len(tester.ancientBlocks) - 1; bs != blocks {
  431. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
  432. }
  433. if rs := len(tester.ownReceipts) + len(tester.ancientReceipts) - 1; rs != receipts {
  434. t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
  435. }
  436. }
  437. // Tests that simple synchronization against a canonical chain works correctly.
  438. // In this test common ancestor lookup should be short circuited and not require
  439. // binary searching.
  440. func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) }
  441. func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) }
  442. func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) }
  443. func TestCanonicalSynchronisation64Full(t *testing.T) { testCanonicalSynchronisation(t, 64, FullSync) }
  444. func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronisation(t, 64, FastSync) }
  445. func TestCanonicalSynchronisation64Light(t *testing.T) {
  446. testCanonicalSynchronisation(t, 64, LightSync)
  447. }
  448. func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
  449. t.Parallel()
  450. tester := newTester()
  451. defer tester.terminate()
  452. // Create a small enough block chain to download
  453. chain := testChainBase.shorten(blockCacheItems - 15)
  454. tester.newPeer("peer", protocol, chain)
  455. // Synchronise with the peer and make sure all relevant data was retrieved
  456. if err := tester.sync("peer", nil, mode); err != nil {
  457. t.Fatalf("failed to synchronise blocks: %v", err)
  458. }
  459. assertOwnChain(t, tester, chain.len())
  460. }
  461. // Tests that if a large batch of blocks are being downloaded, it is throttled
  462. // until the cached blocks are retrieved.
  463. func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) }
  464. func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
  465. func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
  466. func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
  467. func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
  468. func testThrottling(t *testing.T, protocol int, mode SyncMode) {
  469. t.Parallel()
  470. tester := newTester()
  471. defer tester.terminate()
  472. // Create a long block chain to download and the tester
  473. targetBlocks := testChainBase.len() - 1
  474. tester.newPeer("peer", protocol, testChainBase)
  475. // Wrap the importer to allow stepping
  476. blocked, proceed := uint32(0), make(chan struct{})
  477. tester.downloader.chainInsertHook = func(results []*fetchResult) {
  478. atomic.StoreUint32(&blocked, uint32(len(results)))
  479. <-proceed
  480. }
  481. // Start a synchronisation concurrently
  482. errc := make(chan error)
  483. go func() {
  484. errc <- tester.sync("peer", nil, mode)
  485. }()
  486. // Iteratively take some blocks, always checking the retrieval count
  487. for {
  488. // Check the retrieval count synchronously (! reason for this ugly block)
  489. tester.lock.RLock()
  490. retrieved := len(tester.ownBlocks)
  491. tester.lock.RUnlock()
  492. if retrieved >= targetBlocks+1 {
  493. break
  494. }
  495. // Wait a bit for sync to throttle itself
  496. var cached, frozen int
  497. for start := time.Now(); time.Since(start) < 3*time.Second; {
  498. time.Sleep(25 * time.Millisecond)
  499. tester.lock.Lock()
  500. tester.downloader.queue.lock.Lock()
  501. cached = len(tester.downloader.queue.blockDonePool)
  502. if mode == FastSync {
  503. if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
  504. cached = receipts
  505. }
  506. }
  507. frozen = int(atomic.LoadUint32(&blocked))
  508. retrieved = len(tester.ownBlocks)
  509. tester.downloader.queue.lock.Unlock()
  510. tester.lock.Unlock()
  511. if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
  512. break
  513. }
  514. }
  515. // Make sure we filled up the cache, then exhaust it
  516. time.Sleep(25 * time.Millisecond) // give it a chance to screw up
  517. tester.lock.RLock()
  518. retrieved = len(tester.ownBlocks)
  519. tester.lock.RUnlock()
  520. if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
  521. t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
  522. }
  523. // Permit the blocked blocks to import
  524. if atomic.LoadUint32(&blocked) > 0 {
  525. atomic.StoreUint32(&blocked, uint32(0))
  526. proceed <- struct{}{}
  527. }
  528. }
  529. // Check that we haven't pulled more blocks than available
  530. assertOwnChain(t, tester, targetBlocks+1)
  531. if err := <-errc; err != nil {
  532. t.Fatalf("block synchronization failed: %v", err)
  533. }
  534. }
  535. // Tests that simple synchronization against a forked chain works correctly. In
  536. // this test common ancestor lookup should *not* be short circuited, and a full
  537. // binary search should be executed.
  538. func TestForkedSync62(t *testing.T) { testForkedSync(t, 62, FullSync) }
  539. func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) }
  540. func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) }
  541. func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) }
  542. func TestForkedSync64Fast(t *testing.T) { testForkedSync(t, 64, FastSync) }
  543. func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
  544. func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
  545. t.Parallel()
  546. tester := newTester()
  547. defer tester.terminate()
  548. chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
  549. chainB := testChainForkLightB.shorten(testChainBase.len() + 80)
  550. tester.newPeer("fork A", protocol, chainA)
  551. tester.newPeer("fork B", protocol, chainB)
  552. // Synchronise with the peer and make sure all blocks were retrieved
  553. if err := tester.sync("fork A", nil, mode); err != nil {
  554. t.Fatalf("failed to synchronise blocks: %v", err)
  555. }
  556. assertOwnChain(t, tester, chainA.len())
  557. // Synchronise with the second peer and make sure that fork is pulled too
  558. if err := tester.sync("fork B", nil, mode); err != nil {
  559. t.Fatalf("failed to synchronise blocks: %v", err)
  560. }
  561. assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
  562. }
  563. // Tests that synchronising against a much shorter but much heavyer fork works
  564. // corrently and is not dropped.
  565. func TestHeavyForkedSync62(t *testing.T) { testHeavyForkedSync(t, 62, FullSync) }
  566. func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) }
  567. func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) }
  568. func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSync) }
  569. func TestHeavyForkedSync64Fast(t *testing.T) { testHeavyForkedSync(t, 64, FastSync) }
  570. func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, LightSync) }
  571. func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
  572. t.Parallel()
  573. tester := newTester()
  574. defer tester.terminate()
  575. chainA := testChainForkLightA.shorten(testChainBase.len() + 80)
  576. chainB := testChainForkHeavy.shorten(testChainBase.len() + 80)
  577. tester.newPeer("light", protocol, chainA)
  578. tester.newPeer("heavy", protocol, chainB)
  579. // Synchronise with the peer and make sure all blocks were retrieved
  580. if err := tester.sync("light", nil, mode); err != nil {
  581. t.Fatalf("failed to synchronise blocks: %v", err)
  582. }
  583. assertOwnChain(t, tester, chainA.len())
  584. // Synchronise with the second peer and make sure that fork is pulled too
  585. if err := tester.sync("heavy", nil, mode); err != nil {
  586. t.Fatalf("failed to synchronise blocks: %v", err)
  587. }
  588. assertOwnForkedChain(t, tester, testChainBase.len(), []int{chainA.len(), chainB.len()})
  589. }
  590. // Tests that chain forks are contained within a certain interval of the current
  591. // chain head, ensuring that malicious peers cannot waste resources by feeding
  592. // long dead chains.
  593. func TestBoundedForkedSync62(t *testing.T) { testBoundedForkedSync(t, 62, FullSync) }
  594. func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) }
  595. func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) }
  596. func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, FullSync) }
  597. func TestBoundedForkedSync64Fast(t *testing.T) { testBoundedForkedSync(t, 64, FastSync) }
  598. func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, LightSync) }
  599. func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
  600. t.Parallel()
  601. tester := newTester()
  602. defer tester.terminate()
  603. chainA := testChainForkLightA
  604. chainB := testChainForkLightB
  605. tester.newPeer("original", protocol, chainA)
  606. tester.newPeer("rewriter", protocol, chainB)
  607. // Synchronise with the peer and make sure all blocks were retrieved
  608. if err := tester.sync("original", nil, mode); err != nil {
  609. t.Fatalf("failed to synchronise blocks: %v", err)
  610. }
  611. assertOwnChain(t, tester, chainA.len())
  612. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  613. if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor {
  614. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  615. }
  616. }
  617. // Tests that chain forks are contained within a certain interval of the current
  618. // chain head for short but heavy forks too. These are a bit special because they
  619. // take different ancestor lookup paths.
  620. func TestBoundedHeavyForkedSync62(t *testing.T) { testBoundedHeavyForkedSync(t, 62, FullSync) }
  621. func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) }
  622. func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) }
  623. func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FullSync) }
  624. func TestBoundedHeavyForkedSync64Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 64, FastSync) }
  625. func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSync(t, 64, LightSync) }
  626. func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
  627. t.Parallel()
  628. tester := newTester()
  629. defer tester.terminate()
  630. // Create a long enough forked chain
  631. chainA := testChainForkLightA
  632. chainB := testChainForkHeavy
  633. tester.newPeer("original", protocol, chainA)
  634. tester.newPeer("heavy-rewriter", protocol, chainB)
  635. // Synchronise with the peer and make sure all blocks were retrieved
  636. if err := tester.sync("original", nil, mode); err != nil {
  637. t.Fatalf("failed to synchronise blocks: %v", err)
  638. }
  639. assertOwnChain(t, tester, chainA.len())
  640. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  641. if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
  642. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  643. }
  644. }
  645. // Tests that an inactive downloader will not accept incoming block headers and
  646. // bodies.
  647. func TestInactiveDownloader62(t *testing.T) {
  648. t.Parallel()
  649. tester := newTester()
  650. defer tester.terminate()
  651. // Check that neither block headers nor bodies are accepted
  652. if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
  653. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  654. }
  655. if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
  656. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  657. }
  658. }
  659. // Tests that an inactive downloader will not accept incoming block headers,
  660. // bodies and receipts.
  661. func TestInactiveDownloader63(t *testing.T) {
  662. t.Parallel()
  663. tester := newTester()
  664. defer tester.terminate()
  665. // Check that neither block headers nor bodies are accepted
  666. if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
  667. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  668. }
  669. if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
  670. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  671. }
  672. if err := tester.downloader.DeliverReceipts("bad peer", [][]*types.Receipt{}); err != errNoSyncActive {
  673. t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
  674. }
  675. }
  676. // Tests that a canceled download wipes all previously accumulated state.
  677. func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) }
  678. func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) }
  679. func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) }
  680. func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) }
  681. func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
  682. func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
  683. func testCancel(t *testing.T, protocol int, mode SyncMode) {
  684. t.Parallel()
  685. tester := newTester()
  686. defer tester.terminate()
  687. chain := testChainBase.shorten(MaxHeaderFetch)
  688. tester.newPeer("peer", protocol, chain)
  689. // Make sure canceling works with a pristine downloader
  690. tester.downloader.Cancel()
  691. if !tester.downloader.queue.Idle() {
  692. t.Errorf("download queue not idle")
  693. }
  694. // Synchronise with the peer, but cancel afterwards
  695. if err := tester.sync("peer", nil, mode); err != nil {
  696. t.Fatalf("failed to synchronise blocks: %v", err)
  697. }
  698. tester.downloader.Cancel()
  699. if !tester.downloader.queue.Idle() {
  700. t.Errorf("download queue not idle")
  701. }
  702. }
  703. // Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
  704. func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) }
  705. func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) }
  706. func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) }
  707. func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t, 64, FullSync) }
  708. func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, 64, FastSync) }
  709. func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
  710. func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
  711. t.Parallel()
  712. tester := newTester()
  713. defer tester.terminate()
  714. // Create various peers with various parts of the chain
  715. targetPeers := 8
  716. chain := testChainBase.shorten(targetPeers * 100)
  717. for i := 0; i < targetPeers; i++ {
  718. id := fmt.Sprintf("peer #%d", i)
  719. tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1)))
  720. }
  721. if err := tester.sync("peer #0", nil, mode); err != nil {
  722. t.Fatalf("failed to synchronise blocks: %v", err)
  723. }
  724. assertOwnChain(t, tester, chain.len())
  725. }
  726. // Tests that synchronisations behave well in multi-version protocol environments
  727. // and not wreak havoc on other nodes in the network.
  728. func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) }
  729. func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) }
  730. func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) }
  731. func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 64, FullSync) }
  732. func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, 64, FastSync) }
  733. func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
  734. func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
  735. t.Parallel()
  736. tester := newTester()
  737. defer tester.terminate()
  738. // Create a small enough block chain to download
  739. chain := testChainBase.shorten(blockCacheItems - 15)
  740. // Create peers of every type
  741. tester.newPeer("peer 62", 62, chain)
  742. tester.newPeer("peer 63", 63, chain)
  743. tester.newPeer("peer 64", 64, chain)
  744. // Synchronise with the requested peer and make sure all blocks were retrieved
  745. if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil {
  746. t.Fatalf("failed to synchronise blocks: %v", err)
  747. }
  748. assertOwnChain(t, tester, chain.len())
  749. // Check that no peers have been dropped off
  750. for _, version := range []int{62, 63, 64} {
  751. peer := fmt.Sprintf("peer %d", version)
  752. if _, ok := tester.peers[peer]; !ok {
  753. t.Errorf("%s dropped", peer)
  754. }
  755. }
  756. }
  757. // Tests that if a block is empty (e.g. header only), no body request should be
  758. // made, and instead the header should be assembled into a whole block in itself.
  759. func TestEmptyShortCircuit62(t *testing.T) { testEmptyShortCircuit(t, 62, FullSync) }
  760. func TestEmptyShortCircuit63Full(t *testing.T) { testEmptyShortCircuit(t, 63, FullSync) }
  761. func TestEmptyShortCircuit63Fast(t *testing.T) { testEmptyShortCircuit(t, 63, FastSync) }
  762. func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, FullSync) }
  763. func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, FastSync) }
  764. func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
  765. func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
  766. t.Parallel()
  767. tester := newTester()
  768. defer tester.terminate()
  769. // Create a block chain to download
  770. chain := testChainBase
  771. tester.newPeer("peer", protocol, chain)
  772. // Instrument the downloader to signal body requests
  773. bodiesHave, receiptsHave := int32(0), int32(0)
  774. tester.downloader.bodyFetchHook = func(headers []*types.Header) {
  775. atomic.AddInt32(&bodiesHave, int32(len(headers)))
  776. }
  777. tester.downloader.receiptFetchHook = func(headers []*types.Header) {
  778. atomic.AddInt32(&receiptsHave, int32(len(headers)))
  779. }
  780. // Synchronise with the peer and make sure all blocks were retrieved
  781. if err := tester.sync("peer", nil, mode); err != nil {
  782. t.Fatalf("failed to synchronise blocks: %v", err)
  783. }
  784. assertOwnChain(t, tester, chain.len())
  785. // Validate the number of block bodies that should have been requested
  786. bodiesNeeded, receiptsNeeded := 0, 0
  787. for _, block := range chain.blockm {
  788. if mode != LightSync && block != tester.genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
  789. bodiesNeeded++
  790. }
  791. }
  792. for _, receipt := range chain.receiptm {
  793. if mode == FastSync && len(receipt) > 0 {
  794. receiptsNeeded++
  795. }
  796. }
  797. if int(bodiesHave) != bodiesNeeded {
  798. t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
  799. }
  800. if int(receiptsHave) != receiptsNeeded {
  801. t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
  802. }
  803. }
  804. // Tests that headers are enqueued continuously, preventing malicious nodes from
  805. // stalling the downloader by feeding gapped header chains.
  806. func TestMissingHeaderAttack62(t *testing.T) { testMissingHeaderAttack(t, 62, FullSync) }
  807. func TestMissingHeaderAttack63Full(t *testing.T) { testMissingHeaderAttack(t, 63, FullSync) }
  808. func TestMissingHeaderAttack63Fast(t *testing.T) { testMissingHeaderAttack(t, 63, FastSync) }
  809. func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64, FullSync) }
  810. func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 64, FastSync) }
  811. func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
  812. func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
  813. t.Parallel()
  814. tester := newTester()
  815. defer tester.terminate()
  816. chain := testChainBase.shorten(blockCacheItems - 15)
  817. brokenChain := chain.shorten(chain.len())
  818. delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
  819. tester.newPeer("attack", protocol, brokenChain)
  820. if err := tester.sync("attack", nil, mode); err == nil {
  821. t.Fatalf("succeeded attacker synchronisation")
  822. }
  823. // Synchronise with the valid peer and make sure sync succeeds
  824. tester.newPeer("valid", protocol, chain)
  825. if err := tester.sync("valid", nil, mode); err != nil {
  826. t.Fatalf("failed to synchronise blocks: %v", err)
  827. }
  828. assertOwnChain(t, tester, chain.len())
  829. }
  830. // Tests that if requested headers are shifted (i.e. first is missing), the queue
  831. // detects the invalid numbering.
  832. func TestShiftedHeaderAttack62(t *testing.T) { testShiftedHeaderAttack(t, 62, FullSync) }
  833. func TestShiftedHeaderAttack63Full(t *testing.T) { testShiftedHeaderAttack(t, 63, FullSync) }
  834. func TestShiftedHeaderAttack63Fast(t *testing.T) { testShiftedHeaderAttack(t, 63, FastSync) }
  835. func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64, FullSync) }
  836. func TestShiftedHeaderAttack64Fast(t *testing.T) { testShiftedHeaderAttack(t, 64, FastSync) }
  837. func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
  838. func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
  839. t.Parallel()
  840. tester := newTester()
  841. defer tester.terminate()
  842. chain := testChainBase.shorten(blockCacheItems - 15)
  843. // Attempt a full sync with an attacker feeding shifted headers
  844. brokenChain := chain.shorten(chain.len())
  845. delete(brokenChain.headerm, brokenChain.chain[1])
  846. delete(brokenChain.blockm, brokenChain.chain[1])
  847. delete(brokenChain.receiptm, brokenChain.chain[1])
  848. tester.newPeer("attack", protocol, brokenChain)
  849. if err := tester.sync("attack", nil, mode); err == nil {
  850. t.Fatalf("succeeded attacker synchronisation")
  851. }
  852. // Synchronise with the valid peer and make sure sync succeeds
  853. tester.newPeer("valid", protocol, chain)
  854. if err := tester.sync("valid", nil, mode); err != nil {
  855. t.Fatalf("failed to synchronise blocks: %v", err)
  856. }
  857. assertOwnChain(t, tester, chain.len())
  858. }
  859. // Tests that upon detecting an invalid header, the recent ones are rolled back
  860. // for various failure scenarios. Afterwards a full sync is attempted to make
  861. // sure no state was corrupted.
  862. func TestInvalidHeaderRollback63Fast(t *testing.T) { testInvalidHeaderRollback(t, 63, FastSync) }
  863. func TestInvalidHeaderRollback64Fast(t *testing.T) { testInvalidHeaderRollback(t, 64, FastSync) }
  864. func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
  865. func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
  866. t.Parallel()
  867. tester := newTester()
  868. defer tester.terminate()
  869. // Create a small enough block chain to download
  870. targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
  871. chain := testChainBase.shorten(targetBlocks)
  872. // Attempt to sync with an attacker that feeds junk during the fast sync phase.
  873. // This should result in the last fsHeaderSafetyNet headers being rolled back.
  874. missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
  875. fastAttackChain := chain.shorten(chain.len())
  876. delete(fastAttackChain.headerm, fastAttackChain.chain[missing])
  877. tester.newPeer("fast-attack", protocol, fastAttackChain)
  878. if err := tester.sync("fast-attack", nil, mode); err == nil {
  879. t.Fatalf("succeeded fast attacker synchronisation")
  880. }
  881. if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
  882. t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
  883. }
  884. // Attempt to sync with an attacker that feeds junk during the block import phase.
  885. // This should result in both the last fsHeaderSafetyNet number of headers being
  886. // rolled back, and also the pivot point being reverted to a non-block status.
  887. missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
  888. blockAttackChain := chain.shorten(chain.len())
  889. delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) // Make sure the fast-attacker doesn't fill in
  890. delete(blockAttackChain.headerm, blockAttackChain.chain[missing])
  891. tester.newPeer("block-attack", protocol, blockAttackChain)
  892. if err := tester.sync("block-attack", nil, mode); err == nil {
  893. t.Fatalf("succeeded block attacker synchronisation")
  894. }
  895. if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  896. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  897. }
  898. if mode == FastSync {
  899. if head := tester.CurrentBlock().NumberU64(); head != 0 {
  900. t.Errorf("fast sync pivot block #%d not rolled back", head)
  901. }
  902. }
  903. // Attempt to sync with an attacker that withholds promised blocks after the
  904. // fast sync pivot point. This could be a trial to leave the node with a bad
  905. // but already imported pivot block.
  906. withholdAttackChain := chain.shorten(chain.len())
  907. tester.newPeer("withhold-attack", protocol, withholdAttackChain)
  908. tester.downloader.syncInitHook = func(uint64, uint64) {
  909. for i := missing; i < withholdAttackChain.len(); i++ {
  910. delete(withholdAttackChain.headerm, withholdAttackChain.chain[i])
  911. }
  912. tester.downloader.syncInitHook = nil
  913. }
  914. if err := tester.sync("withhold-attack", nil, mode); err == nil {
  915. t.Fatalf("succeeded withholding attacker synchronisation")
  916. }
  917. if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  918. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  919. }
  920. if mode == FastSync {
  921. if head := tester.CurrentBlock().NumberU64(); head != 0 {
  922. t.Errorf("fast sync pivot block #%d not rolled back", head)
  923. }
  924. }
  925. // synchronise with the valid peer and make sure sync succeeds. Since the last rollback
  926. // should also disable fast syncing for this process, verify that we did a fresh full
  927. // sync. Note, we can't assert anything about the receipts since we won't purge the
  928. // database of them, hence we can't use assertOwnChain.
  929. tester.newPeer("valid", protocol, chain)
  930. if err := tester.sync("valid", nil, mode); err != nil {
  931. t.Fatalf("failed to synchronise blocks: %v", err)
  932. }
  933. if hs := len(tester.ownHeaders); hs != chain.len() {
  934. t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, chain.len())
  935. }
  936. if mode != LightSync {
  937. if bs := len(tester.ownBlocks); bs != chain.len() {
  938. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
  939. }
  940. }
  941. }
  942. // Tests that a peer advertising a high TD doesn't get to stall the downloader
  943. // afterwards by not sending any useful hashes.
  944. func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) }
  945. func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) }
  946. func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) }
  947. func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) }
  948. func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) }
  949. func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
  950. func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
  951. t.Parallel()
  952. tester := newTester()
  953. defer tester.terminate()
  954. chain := testChainBase.shorten(1)
  955. tester.newPeer("attack", protocol, chain)
  956. if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
  957. t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
  958. }
  959. }
  960. // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
  961. func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) }
  962. func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
  963. func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
  964. func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
  965. t.Parallel()
  966. // Define the disconnection requirement for individual hash fetch errors
  967. tests := []struct {
  968. result error
  969. drop bool
  970. }{
  971. {nil, false}, // Sync succeeded, all is well
  972. {errBusy, false}, // Sync is already in progress, no problem
  973. {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
  974. {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
  975. {errStallingPeer, true}, // Peer was detected to be stalling, drop it
  976. {errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it
  977. {errNoPeers, false}, // No peers to download from, soft race, no issue
  978. {errTimeout, true}, // No hashes received in due time, drop the peer
  979. {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
  980. {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
  981. {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
  982. {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
  983. {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
  984. {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
  985. {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  986. }
  987. // Run the tests and check disconnection status
  988. tester := newTester()
  989. defer tester.terminate()
  990. chain := testChainBase.shorten(1)
  991. for i, tt := range tests {
  992. // Register a new peer and ensure its presence
  993. id := fmt.Sprintf("test %d", i)
  994. if err := tester.newPeer(id, protocol, chain); err != nil {
  995. t.Fatalf("test %d: failed to register new peer: %v", i, err)
  996. }
  997. if _, ok := tester.peers[id]; !ok {
  998. t.Fatalf("test %d: registered peer not found", i)
  999. }
  1000. // Simulate a synchronisation and check the required result
  1001. tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
  1002. tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync)
  1003. if _, ok := tester.peers[id]; !ok != tt.drop {
  1004. t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
  1005. }
  1006. }
  1007. }
  1008. // Tests that synchronisation progress (origin block number, current block number
  1009. // and highest block number) is tracked and updated correctly.
  1010. func TestSyncProgress62(t *testing.T) { testSyncProgress(t, 62, FullSync) }
  1011. func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) }
  1012. func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) }
  1013. func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) }
  1014. func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
  1015. func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
  1016. func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1017. t.Parallel()
  1018. tester := newTester()
  1019. defer tester.terminate()
  1020. chain := testChainBase.shorten(blockCacheItems - 15)
  1021. // Set a sync init hook to catch progress changes
  1022. starting := make(chan struct{})
  1023. progress := make(chan struct{})
  1024. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1025. starting <- struct{}{}
  1026. <-progress
  1027. }
  1028. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1029. // Synchronise half the blocks and check initial progress
  1030. tester.newPeer("peer-half", protocol, chain.shorten(chain.len()/2))
  1031. pending := new(sync.WaitGroup)
  1032. pending.Add(1)
  1033. go func() {
  1034. defer pending.Done()
  1035. if err := tester.sync("peer-half", nil, mode); err != nil {
  1036. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1037. }
  1038. }()
  1039. <-starting
  1040. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1041. HighestBlock: uint64(chain.len()/2 - 1),
  1042. })
  1043. progress <- struct{}{}
  1044. pending.Wait()
  1045. // Synchronise all the blocks and check continuation progress
  1046. tester.newPeer("peer-full", protocol, chain)
  1047. pending.Add(1)
  1048. go func() {
  1049. defer pending.Done()
  1050. if err := tester.sync("peer-full", nil, mode); err != nil {
  1051. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1052. }
  1053. }()
  1054. <-starting
  1055. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  1056. StartingBlock: uint64(chain.len()/2 - 1),
  1057. CurrentBlock: uint64(chain.len()/2 - 1),
  1058. HighestBlock: uint64(chain.len() - 1),
  1059. })
  1060. // Check final progress after successful sync
  1061. progress <- struct{}{}
  1062. pending.Wait()
  1063. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1064. StartingBlock: uint64(chain.len()/2 - 1),
  1065. CurrentBlock: uint64(chain.len() - 1),
  1066. HighestBlock: uint64(chain.len() - 1),
  1067. })
  1068. }
  1069. func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) {
  1070. // Mark this method as a helper to report errors at callsite, not in here
  1071. t.Helper()
  1072. p := d.Progress()
  1073. p.KnownStates, p.PulledStates = 0, 0
  1074. want.KnownStates, want.PulledStates = 0, 0
  1075. if p != want {
  1076. t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want)
  1077. }
  1078. }
  1079. // Tests that synchronisation progress (origin block number and highest block
  1080. // number) is tracked and updated correctly in case of a fork (or manual head
  1081. // revertal).
  1082. func TestForkedSyncProgress62(t *testing.T) { testForkedSyncProgress(t, 62, FullSync) }
  1083. func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) }
  1084. func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) }
  1085. func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64, FullSync) }
  1086. func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, FastSync) }
  1087. func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
  1088. func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1089. t.Parallel()
  1090. tester := newTester()
  1091. defer tester.terminate()
  1092. chainA := testChainForkLightA.shorten(testChainBase.len() + MaxHashFetch)
  1093. chainB := testChainForkLightB.shorten(testChainBase.len() + MaxHashFetch)
  1094. // Set a sync init hook to catch progress changes
  1095. starting := make(chan struct{})
  1096. progress := make(chan struct{})
  1097. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1098. starting <- struct{}{}
  1099. <-progress
  1100. }
  1101. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1102. // Synchronise with one of the forks and check progress
  1103. tester.newPeer("fork A", protocol, chainA)
  1104. pending := new(sync.WaitGroup)
  1105. pending.Add(1)
  1106. go func() {
  1107. defer pending.Done()
  1108. if err := tester.sync("fork A", nil, mode); err != nil {
  1109. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1110. }
  1111. }()
  1112. <-starting
  1113. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1114. HighestBlock: uint64(chainA.len() - 1),
  1115. })
  1116. progress <- struct{}{}
  1117. pending.Wait()
  1118. // Simulate a successful sync above the fork
  1119. tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight
  1120. // Synchronise with the second fork and check progress resets
  1121. tester.newPeer("fork B", protocol, chainB)
  1122. pending.Add(1)
  1123. go func() {
  1124. defer pending.Done()
  1125. if err := tester.sync("fork B", nil, mode); err != nil {
  1126. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1127. }
  1128. }()
  1129. <-starting
  1130. checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{
  1131. StartingBlock: uint64(testChainBase.len()) - 1,
  1132. CurrentBlock: uint64(chainA.len() - 1),
  1133. HighestBlock: uint64(chainB.len() - 1),
  1134. })
  1135. // Check final progress after successful sync
  1136. progress <- struct{}{}
  1137. pending.Wait()
  1138. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1139. StartingBlock: uint64(testChainBase.len()) - 1,
  1140. CurrentBlock: uint64(chainB.len() - 1),
  1141. HighestBlock: uint64(chainB.len() - 1),
  1142. })
  1143. }
  1144. // Tests that if synchronisation is aborted due to some failure, then the progress
  1145. // origin is not updated in the next sync cycle, as it should be considered the
  1146. // continuation of the previous sync and not a new instance.
  1147. func TestFailedSyncProgress62(t *testing.T) { testFailedSyncProgress(t, 62, FullSync) }
  1148. func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) }
  1149. func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) }
  1150. func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64, FullSync) }
  1151. func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, FastSync) }
  1152. func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
  1153. func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1154. t.Parallel()
  1155. tester := newTester()
  1156. defer tester.terminate()
  1157. chain := testChainBase.shorten(blockCacheItems - 15)
  1158. // Set a sync init hook to catch progress changes
  1159. starting := make(chan struct{})
  1160. progress := make(chan struct{})
  1161. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1162. starting <- struct{}{}
  1163. <-progress
  1164. }
  1165. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1166. // Attempt a full sync with a faulty peer
  1167. brokenChain := chain.shorten(chain.len())
  1168. missing := brokenChain.len() / 2
  1169. delete(brokenChain.headerm, brokenChain.chain[missing])
  1170. delete(brokenChain.blockm, brokenChain.chain[missing])
  1171. delete(brokenChain.receiptm, brokenChain.chain[missing])
  1172. tester.newPeer("faulty", protocol, brokenChain)
  1173. pending := new(sync.WaitGroup)
  1174. pending.Add(1)
  1175. go func() {
  1176. defer pending.Done()
  1177. if err := tester.sync("faulty", nil, mode); err == nil {
  1178. panic("succeeded faulty synchronisation")
  1179. }
  1180. }()
  1181. <-starting
  1182. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1183. HighestBlock: uint64(brokenChain.len() - 1),
  1184. })
  1185. progress <- struct{}{}
  1186. pending.Wait()
  1187. afterFailedSync := tester.downloader.Progress()
  1188. // Synchronise with a good peer and check that the progress origin remind the same
  1189. // after a failure
  1190. tester.newPeer("valid", protocol, chain)
  1191. pending.Add(1)
  1192. go func() {
  1193. defer pending.Done()
  1194. if err := tester.sync("valid", nil, mode); err != nil {
  1195. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1196. }
  1197. }()
  1198. <-starting
  1199. checkProgress(t, tester.downloader, "completing", afterFailedSync)
  1200. // Check final progress after successful sync
  1201. progress <- struct{}{}
  1202. pending.Wait()
  1203. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1204. CurrentBlock: uint64(chain.len() - 1),
  1205. HighestBlock: uint64(chain.len() - 1),
  1206. })
  1207. }
  1208. // Tests that if an attacker fakes a chain height, after the attack is detected,
  1209. // the progress height is successfully reduced at the next sync invocation.
  1210. func TestFakedSyncProgress62(t *testing.T) { testFakedSyncProgress(t, 62, FullSync) }
  1211. func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) }
  1212. func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) }
  1213. func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, FullSync) }
  1214. func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, FastSync) }
  1215. func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
  1216. func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
  1217. t.Parallel()
  1218. tester := newTester()
  1219. defer tester.terminate()
  1220. chain := testChainBase.shorten(blockCacheItems - 15)
  1221. // Set a sync init hook to catch progress changes
  1222. starting := make(chan struct{})
  1223. progress := make(chan struct{})
  1224. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1225. starting <- struct{}{}
  1226. <-progress
  1227. }
  1228. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1229. // Create and sync with an attacker that promises a higher chain than available.
  1230. brokenChain := chain.shorten(chain.len())
  1231. numMissing := 5
  1232. for i := brokenChain.len() - 2; i > brokenChain.len()-numMissing; i-- {
  1233. delete(brokenChain.headerm, brokenChain.chain[i])
  1234. }
  1235. tester.newPeer("attack", protocol, brokenChain)
  1236. pending := new(sync.WaitGroup)
  1237. pending.Add(1)
  1238. go func() {
  1239. defer pending.Done()
  1240. if err := tester.sync("attack", nil, mode); err == nil {
  1241. panic("succeeded attacker synchronisation")
  1242. }
  1243. }()
  1244. <-starting
  1245. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1246. HighestBlock: uint64(brokenChain.len() - 1),
  1247. })
  1248. progress <- struct{}{}
  1249. pending.Wait()
  1250. afterFailedSync := tester.downloader.Progress()
  1251. // Synchronise with a good peer and check that the progress height has been reduced to
  1252. // the true value.
  1253. validChain := chain.shorten(chain.len() - numMissing)
  1254. tester.newPeer("valid", protocol, validChain)
  1255. pending.Add(1)
  1256. go func() {
  1257. defer pending.Done()
  1258. if err := tester.sync("valid", nil, mode); err != nil {
  1259. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1260. }
  1261. }()
  1262. <-starting
  1263. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  1264. CurrentBlock: afterFailedSync.CurrentBlock,
  1265. HighestBlock: uint64(validChain.len() - 1),
  1266. })
  1267. // Check final progress after successful sync.
  1268. progress <- struct{}{}
  1269. pending.Wait()
  1270. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1271. CurrentBlock: uint64(validChain.len() - 1),
  1272. HighestBlock: uint64(validChain.len() - 1),
  1273. })
  1274. }
  1275. // This test reproduces an issue where unexpected deliveries would
  1276. // block indefinitely if they arrived at the right time.
  1277. func TestDeliverHeadersHang(t *testing.T) {
  1278. t.Parallel()
  1279. testCases := []struct {
  1280. protocol int
  1281. syncMode SyncMode
  1282. }{
  1283. {62, FullSync},
  1284. {63, FullSync},
  1285. {63, FastSync},
  1286. {64, FullSync},
  1287. {64, FastSync},
  1288. {64, LightSync},
  1289. }
  1290. for _, tc := range testCases {
  1291. t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) {
  1292. t.Parallel()
  1293. testDeliverHeadersHang(t, tc.protocol, tc.syncMode)
  1294. })
  1295. }
  1296. }
  1297. func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
  1298. master := newTester()
  1299. defer master.terminate()
  1300. chain := testChainBase.shorten(15)
  1301. for i := 0; i < 200; i++ {
  1302. tester := newTester()
  1303. tester.peerDb = master.peerDb
  1304. tester.newPeer("peer", protocol, chain)
  1305. // Whenever the downloader requests headers, flood it with
  1306. // a lot of unrequested header deliveries.
  1307. tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
  1308. peer: tester.downloader.peers.peers["peer"].peer,
  1309. tester: tester,
  1310. }
  1311. if err := tester.sync("peer", nil, mode); err != nil {
  1312. t.Errorf("test %d: sync failed: %v", i, err)
  1313. }
  1314. tester.terminate()
  1315. }
  1316. }
  1317. type floodingTestPeer struct {
  1318. peer Peer
  1319. tester *downloadTester
  1320. }
  1321. func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
  1322. func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error {
  1323. return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse)
  1324. }
  1325. func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error {
  1326. return ftp.peer.RequestBodies(hashes)
  1327. }
  1328. func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error {
  1329. return ftp.peer.RequestReceipts(hashes)
  1330. }
  1331. func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error {
  1332. return ftp.peer.RequestNodeData(hashes)
  1333. }
  1334. func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error {
  1335. deliveriesDone := make(chan struct{}, 500)
  1336. for i := 0; i < cap(deliveriesDone)-1; i++ {
  1337. peer := fmt.Sprintf("fake-peer%d", i)
  1338. go func() {
  1339. ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
  1340. deliveriesDone <- struct{}{}
  1341. }()
  1342. }
  1343. // None of the extra deliveries should block.
  1344. timeout := time.After(60 * time.Second)
  1345. launched := false
  1346. for i := 0; i < cap(deliveriesDone); i++ {
  1347. select {
  1348. case <-deliveriesDone:
  1349. if !launched {
  1350. // Start delivering the requested headers
  1351. // after one of the flooding responses has arrived.
  1352. go func() {
  1353. ftp.peer.RequestHeadersByNumber(from, count, skip, reverse)
  1354. deliveriesDone <- struct{}{}
  1355. }()
  1356. launched = true
  1357. }
  1358. case <-timeout:
  1359. panic("blocked")
  1360. }
  1361. }
  1362. return nil
  1363. }
  1364. func TestRemoteHeaderRequestSpan(t *testing.T) {
  1365. testCases := []struct {
  1366. remoteHeight uint64
  1367. localHeight uint64
  1368. expected []int
  1369. }{
  1370. // Remote is way higher. We should ask for the remote head and go backwards
  1371. {1500, 1000,
  1372. []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499},
  1373. },
  1374. {15000, 13006,
  1375. []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999},
  1376. },
  1377. //Remote is pretty close to us. We don't have to fetch as many
  1378. {1200, 1150,
  1379. []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199},
  1380. },
  1381. // Remote is equal to us (so on a fork with higher td)
  1382. // We should get the closest couple of ancestors
  1383. {1500, 1500,
  1384. []int{1497, 1499},
  1385. },
  1386. // We're higher than the remote! Odd
  1387. {1000, 1500,
  1388. []int{997, 999},
  1389. },
  1390. // Check some weird edgecases that it behaves somewhat rationally
  1391. {0, 1500,
  1392. []int{0, 2},
  1393. },
  1394. {6000000, 0,
  1395. []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999},
  1396. },
  1397. {0, 0,
  1398. []int{0, 2},
  1399. },
  1400. }
  1401. reqs := func(from, count, span int) []int {
  1402. var r []int
  1403. num := from
  1404. for len(r) < count {
  1405. r = append(r, num)
  1406. num += span + 1
  1407. }
  1408. return r
  1409. }
  1410. for i, tt := range testCases {
  1411. from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
  1412. data := reqs(int(from), count, span)
  1413. if max != uint64(data[len(data)-1]) {
  1414. t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
  1415. }
  1416. failed := false
  1417. if len(data) != len(tt.expected) {
  1418. failed = true
  1419. t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
  1420. } else {
  1421. for j, n := range data {
  1422. if n != tt.expected[j] {
  1423. failed = true
  1424. break
  1425. }
  1426. }
  1427. }
  1428. if failed {
  1429. res := strings.Replace(fmt.Sprint(data), " ", ",", -1)
  1430. exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1)
  1431. t.Logf("got: %v\n", res)
  1432. t.Logf("exp: %v\n", exp)
  1433. t.Errorf("test %d: wrong values", i)
  1434. }
  1435. }
  1436. }
  1437. // Tests that peers below a pre-configured checkpoint block are prevented from
  1438. // being fast-synced from, avoiding potential cheap eclipse attacks.
  1439. func TestCheckpointEnforcement62(t *testing.T) { testCheckpointEnforcement(t, 62, FullSync) }
  1440. func TestCheckpointEnforcement63Full(t *testing.T) { testCheckpointEnforcement(t, 63, FullSync) }
  1441. func TestCheckpointEnforcement63Fast(t *testing.T) { testCheckpointEnforcement(t, 63, FastSync) }
  1442. func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) }
  1443. func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) }
  1444. func TestCheckpointEnforcement64Light(t *testing.T) { testCheckpointEnforcement(t, 64, LightSync) }
  1445. func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
  1446. t.Parallel()
  1447. // Create a new tester with a particular hard coded checkpoint block
  1448. tester := newTester()
  1449. defer tester.terminate()
  1450. tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256
  1451. chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1)
  1452. // Attempt to sync with the peer and validate the result
  1453. tester.newPeer("peer", protocol, chain)
  1454. var expect error
  1455. if mode == FastSync || mode == LightSync {
  1456. expect = errUnsyncedPeer
  1457. }
  1458. if err := tester.sync("peer", nil, mode); err != expect {
  1459. t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
  1460. }
  1461. if mode == FastSync || mode == LightSync {
  1462. assertOwnChain(t, tester, 1)
  1463. } else {
  1464. assertOwnChain(t, tester, chain.len())
  1465. }
  1466. }