downloader_test.go 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "os"
  22. "strings"
  23. "sync"
  24. "sync/atomic"
  25. "testing"
  26. "time"
  27. "github.com/ethereum/go-ethereum"
  28. "github.com/ethereum/go-ethereum/common"
  29. "github.com/ethereum/go-ethereum/consensus/ethash"
  30. "github.com/ethereum/go-ethereum/core"
  31. "github.com/ethereum/go-ethereum/core/rawdb"
  32. "github.com/ethereum/go-ethereum/core/types"
  33. "github.com/ethereum/go-ethereum/core/vm"
  34. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  35. "github.com/ethereum/go-ethereum/eth/protocols/snap"
  36. "github.com/ethereum/go-ethereum/event"
  37. "github.com/ethereum/go-ethereum/log"
  38. "github.com/ethereum/go-ethereum/params"
  39. "github.com/ethereum/go-ethereum/rlp"
  40. "github.com/ethereum/go-ethereum/trie"
  41. )
  42. // downloadTester is a test simulator for mocking out local block chain.
  43. type downloadTester struct {
  44. freezer string
  45. chain *core.BlockChain
  46. downloader *Downloader
  47. peers map[string]*downloadTesterPeer
  48. lock sync.RWMutex
  49. }
  50. // newTester creates a new downloader test mocker.
  51. func newTester(t *testing.T) *downloadTester {
  52. return newTesterWithNotification(t, nil)
  53. }
  54. // newTester creates a new downloader test mocker.
  55. func newTesterWithNotification(t *testing.T, success func()) *downloadTester {
  56. freezer := t.TempDir()
  57. db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), freezer, "", false)
  58. if err != nil {
  59. panic(err)
  60. }
  61. t.Cleanup(func() {
  62. db.Close()
  63. })
  64. gspec := core.Genesis{
  65. Alloc: core.GenesisAlloc{testAddress: {Balance: big.NewInt(1000000000000000)}},
  66. BaseFee: big.NewInt(params.InitialBaseFee),
  67. }
  68. gspec.MustCommit(db)
  69. chain, err := core.NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil)
  70. if err != nil {
  71. panic(err)
  72. }
  73. tester := &downloadTester{
  74. freezer: freezer,
  75. chain: chain,
  76. peers: make(map[string]*downloadTesterPeer),
  77. }
  78. tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success)
  79. return tester
  80. }
  81. // terminate aborts any operations on the embedded downloader and releases all
  82. // held resources.
  83. func (dl *downloadTester) terminate() {
  84. dl.downloader.Terminate()
  85. dl.chain.Stop()
  86. os.RemoveAll(dl.freezer)
  87. }
  88. // sync starts synchronizing with a remote peer, blocking until it completes.
  89. func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
  90. head := dl.peers[id].chain.CurrentBlock()
  91. if td == nil {
  92. // If no particular TD was requested, load from the peer's blockchain
  93. td = dl.peers[id].chain.GetTd(head.Hash(), head.NumberU64())
  94. }
  95. // Synchronise with the chosen peer and ensure proper cleanup afterwards
  96. err := dl.downloader.synchronise(id, head.Hash(), td, nil, mode, false, nil)
  97. select {
  98. case <-dl.downloader.cancelCh:
  99. // Ok, downloader fully cancelled after sync cycle
  100. default:
  101. // Downloader is still accepting packets, can block a peer up
  102. panic("downloader active post sync cycle") // panic will be caught by tester
  103. }
  104. return err
  105. }
  106. // newPeer registers a new block download source into the downloader.
  107. func (dl *downloadTester) newPeer(id string, version uint, blocks []*types.Block) *downloadTesterPeer {
  108. dl.lock.Lock()
  109. defer dl.lock.Unlock()
  110. peer := &downloadTesterPeer{
  111. dl: dl,
  112. id: id,
  113. chain: newTestBlockchain(blocks),
  114. withholdHeaders: make(map[common.Hash]struct{}),
  115. }
  116. dl.peers[id] = peer
  117. if err := dl.downloader.RegisterPeer(id, version, peer); err != nil {
  118. panic(err)
  119. }
  120. if err := dl.downloader.SnapSyncer.Register(peer); err != nil {
  121. panic(err)
  122. }
  123. return peer
  124. }
  125. // dropPeer simulates a hard peer removal from the connection pool.
  126. func (dl *downloadTester) dropPeer(id string) {
  127. dl.lock.Lock()
  128. defer dl.lock.Unlock()
  129. delete(dl.peers, id)
  130. dl.downloader.SnapSyncer.Unregister(id)
  131. dl.downloader.UnregisterPeer(id)
  132. }
  133. type downloadTesterPeer struct {
  134. dl *downloadTester
  135. id string
  136. chain *core.BlockChain
  137. withholdHeaders map[common.Hash]struct{}
  138. }
  139. // Head constructs a function to retrieve a peer's current head hash
  140. // and total difficulty.
  141. func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
  142. head := dlp.chain.CurrentBlock()
  143. return head.Hash(), dlp.chain.GetTd(head.Hash(), head.NumberU64())
  144. }
  145. func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header {
  146. var headers = make([]*types.Header, len(rlpdata))
  147. for i, data := range rlpdata {
  148. var h types.Header
  149. if err := rlp.DecodeBytes(data, &h); err != nil {
  150. panic(err)
  151. }
  152. headers[i] = &h
  153. }
  154. return headers
  155. }
  156. // RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
  157. // origin; associated with a particular peer in the download tester. The returned
  158. // function can be used to retrieve batches of headers from the particular peer.
  159. func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
  160. // Service the header query via the live handler code
  161. rlpHeaders := eth.ServiceGetBlockHeadersQuery(dlp.chain, &eth.GetBlockHeadersPacket{
  162. Origin: eth.HashOrNumber{
  163. Hash: origin,
  164. },
  165. Amount: uint64(amount),
  166. Skip: uint64(skip),
  167. Reverse: reverse,
  168. }, nil)
  169. headers := unmarshalRlpHeaders(rlpHeaders)
  170. // If a malicious peer is simulated withholding headers, delete them
  171. for hash := range dlp.withholdHeaders {
  172. for i, header := range headers {
  173. if header.Hash() == hash {
  174. headers = append(headers[:i], headers[i+1:]...)
  175. break
  176. }
  177. }
  178. }
  179. hashes := make([]common.Hash, len(headers))
  180. for i, header := range headers {
  181. hashes[i] = header.Hash()
  182. }
  183. // Deliver the headers to the downloader
  184. req := &eth.Request{
  185. Peer: dlp.id,
  186. }
  187. res := &eth.Response{
  188. Req: req,
  189. Res: (*eth.BlockHeadersPacket)(&headers),
  190. Meta: hashes,
  191. Time: 1,
  192. Done: make(chan error, 1), // Ignore the returned status
  193. }
  194. go func() {
  195. sink <- res
  196. }()
  197. return req, nil
  198. }
  199. // RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
  200. // origin; associated with a particular peer in the download tester. The returned
  201. // function can be used to retrieve batches of headers from the particular peer.
  202. func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
  203. // Service the header query via the live handler code
  204. rlpHeaders := eth.ServiceGetBlockHeadersQuery(dlp.chain, &eth.GetBlockHeadersPacket{
  205. Origin: eth.HashOrNumber{
  206. Number: origin,
  207. },
  208. Amount: uint64(amount),
  209. Skip: uint64(skip),
  210. Reverse: reverse,
  211. }, nil)
  212. headers := unmarshalRlpHeaders(rlpHeaders)
  213. // If a malicious peer is simulated withholding headers, delete them
  214. for hash := range dlp.withholdHeaders {
  215. for i, header := range headers {
  216. if header.Hash() == hash {
  217. headers = append(headers[:i], headers[i+1:]...)
  218. break
  219. }
  220. }
  221. }
  222. hashes := make([]common.Hash, len(headers))
  223. for i, header := range headers {
  224. hashes[i] = header.Hash()
  225. }
  226. // Deliver the headers to the downloader
  227. req := &eth.Request{
  228. Peer: dlp.id,
  229. }
  230. res := &eth.Response{
  231. Req: req,
  232. Res: (*eth.BlockHeadersPacket)(&headers),
  233. Meta: hashes,
  234. Time: 1,
  235. Done: make(chan error, 1), // Ignore the returned status
  236. }
  237. go func() {
  238. sink <- res
  239. }()
  240. return req, nil
  241. }
  242. // RequestBodies constructs a getBlockBodies method associated with a particular
  243. // peer in the download tester. The returned function can be used to retrieve
  244. // batches of block bodies from the particularly requested peer.
  245. func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
  246. blobs := eth.ServiceGetBlockBodiesQuery(dlp.chain, hashes)
  247. bodies := make([]*eth.BlockBody, len(blobs))
  248. for i, blob := range blobs {
  249. bodies[i] = new(eth.BlockBody)
  250. rlp.DecodeBytes(blob, bodies[i])
  251. }
  252. var (
  253. txsHashes = make([]common.Hash, len(bodies))
  254. uncleHashes = make([]common.Hash, len(bodies))
  255. )
  256. hasher := trie.NewStackTrie(nil)
  257. for i, body := range bodies {
  258. txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher)
  259. uncleHashes[i] = types.CalcUncleHash(body.Uncles)
  260. }
  261. req := &eth.Request{
  262. Peer: dlp.id,
  263. }
  264. res := &eth.Response{
  265. Req: req,
  266. Res: (*eth.BlockBodiesPacket)(&bodies),
  267. Meta: [][]common.Hash{txsHashes, uncleHashes},
  268. Time: 1,
  269. Done: make(chan error, 1), // Ignore the returned status
  270. }
  271. go func() {
  272. sink <- res
  273. }()
  274. return req, nil
  275. }
  276. // RequestReceipts constructs a getReceipts method associated with a particular
  277. // peer in the download tester. The returned function can be used to retrieve
  278. // batches of block receipts from the particularly requested peer.
  279. func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
  280. blobs := eth.ServiceGetReceiptsQuery(dlp.chain, hashes)
  281. receipts := make([][]*types.Receipt, len(blobs))
  282. for i, blob := range blobs {
  283. rlp.DecodeBytes(blob, &receipts[i])
  284. }
  285. hasher := trie.NewStackTrie(nil)
  286. hashes = make([]common.Hash, len(receipts))
  287. for i, receipt := range receipts {
  288. hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
  289. }
  290. req := &eth.Request{
  291. Peer: dlp.id,
  292. }
  293. res := &eth.Response{
  294. Req: req,
  295. Res: (*eth.ReceiptsPacket)(&receipts),
  296. Meta: hashes,
  297. Time: 1,
  298. Done: make(chan error, 1), // Ignore the returned status
  299. }
  300. go func() {
  301. sink <- res
  302. }()
  303. return req, nil
  304. }
  305. // ID retrieves the peer's unique identifier.
  306. func (dlp *downloadTesterPeer) ID() string {
  307. return dlp.id
  308. }
  309. // RequestAccountRange fetches a batch of accounts rooted in a specific account
  310. // trie, starting with the origin.
  311. func (dlp *downloadTesterPeer) RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error {
  312. // Create the request and service it
  313. req := &snap.GetAccountRangePacket{
  314. ID: id,
  315. Root: root,
  316. Origin: origin,
  317. Limit: limit,
  318. Bytes: bytes,
  319. }
  320. slimaccs, proofs := snap.ServiceGetAccountRangeQuery(dlp.chain, req)
  321. // We need to convert to non-slim format, delegate to the packet code
  322. res := &snap.AccountRangePacket{
  323. ID: id,
  324. Accounts: slimaccs,
  325. Proof: proofs,
  326. }
  327. hashes, accounts, _ := res.Unpack()
  328. go dlp.dl.downloader.SnapSyncer.OnAccounts(dlp, id, hashes, accounts, proofs)
  329. return nil
  330. }
  331. // RequestStorageRanges fetches a batch of storage slots belonging to one or
  332. // more accounts. If slots from only one account is requested, an origin marker
  333. // may also be used to retrieve from there.
  334. func (dlp *downloadTesterPeer) RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error {
  335. // Create the request and service it
  336. req := &snap.GetStorageRangesPacket{
  337. ID: id,
  338. Accounts: accounts,
  339. Root: root,
  340. Origin: origin,
  341. Limit: limit,
  342. Bytes: bytes,
  343. }
  344. storage, proofs := snap.ServiceGetStorageRangesQuery(dlp.chain, req)
  345. // We need to convert to demultiplex, delegate to the packet code
  346. res := &snap.StorageRangesPacket{
  347. ID: id,
  348. Slots: storage,
  349. Proof: proofs,
  350. }
  351. hashes, slots := res.Unpack()
  352. go dlp.dl.downloader.SnapSyncer.OnStorage(dlp, id, hashes, slots, proofs)
  353. return nil
  354. }
  355. // RequestByteCodes fetches a batch of bytecodes by hash.
  356. func (dlp *downloadTesterPeer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error {
  357. req := &snap.GetByteCodesPacket{
  358. ID: id,
  359. Hashes: hashes,
  360. Bytes: bytes,
  361. }
  362. codes := snap.ServiceGetByteCodesQuery(dlp.chain, req)
  363. go dlp.dl.downloader.SnapSyncer.OnByteCodes(dlp, id, codes)
  364. return nil
  365. }
  366. // RequestTrieNodes fetches a batch of account or storage trie nodes rooted in
  367. // a specific state trie.
  368. func (dlp *downloadTesterPeer) RequestTrieNodes(id uint64, root common.Hash, paths []snap.TrieNodePathSet, bytes uint64) error {
  369. req := &snap.GetTrieNodesPacket{
  370. ID: id,
  371. Root: root,
  372. Paths: paths,
  373. Bytes: bytes,
  374. }
  375. nodes, _ := snap.ServiceGetTrieNodesQuery(dlp.chain, req, time.Now())
  376. go dlp.dl.downloader.SnapSyncer.OnTrieNodes(dlp, id, nodes)
  377. return nil
  378. }
  379. // Log retrieves the peer's own contextual logger.
  380. func (dlp *downloadTesterPeer) Log() log.Logger {
  381. return log.New("peer", dlp.id)
  382. }
  383. // assertOwnChain checks if the local chain contains the correct number of items
  384. // of the various chain components.
  385. func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
  386. // Mark this method as a helper to report errors at callsite, not in here
  387. t.Helper()
  388. headers, blocks, receipts := length, length, length
  389. if tester.downloader.getMode() == LightSync {
  390. blocks, receipts = 1, 1
  391. }
  392. if hs := int(tester.chain.CurrentHeader().Number.Uint64()) + 1; hs != headers {
  393. t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
  394. }
  395. if bs := int(tester.chain.CurrentBlock().NumberU64()) + 1; bs != blocks {
  396. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, blocks)
  397. }
  398. if rs := int(tester.chain.CurrentFastBlock().NumberU64()) + 1; rs != receipts {
  399. t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
  400. }
  401. }
  402. func TestCanonicalSynchronisation66Full(t *testing.T) { testCanonSync(t, eth.ETH66, FullSync) }
  403. func TestCanonicalSynchronisation66Snap(t *testing.T) { testCanonSync(t, eth.ETH66, SnapSync) }
  404. func TestCanonicalSynchronisation66Light(t *testing.T) { testCanonSync(t, eth.ETH66, LightSync) }
  405. func TestCanonicalSynchronisation67Full(t *testing.T) { testCanonSync(t, eth.ETH67, FullSync) }
  406. func TestCanonicalSynchronisation67Snap(t *testing.T) { testCanonSync(t, eth.ETH67, SnapSync) }
  407. func TestCanonicalSynchronisation67Light(t *testing.T) { testCanonSync(t, eth.ETH67, LightSync) }
  408. func testCanonSync(t *testing.T, protocol uint, mode SyncMode) {
  409. tester := newTester(t)
  410. defer tester.terminate()
  411. // Create a small enough block chain to download
  412. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  413. tester.newPeer("peer", protocol, chain.blocks[1:])
  414. // Synchronise with the peer and make sure all relevant data was retrieved
  415. if err := tester.sync("peer", nil, mode); err != nil {
  416. t.Fatalf("failed to synchronise blocks: %v", err)
  417. }
  418. assertOwnChain(t, tester, len(chain.blocks))
  419. }
  420. // Tests that if a large batch of blocks are being downloaded, it is throttled
  421. // until the cached blocks are retrieved.
  422. func TestThrottling66Full(t *testing.T) { testThrottling(t, eth.ETH66, FullSync) }
  423. func TestThrottling66Snap(t *testing.T) { testThrottling(t, eth.ETH66, SnapSync) }
  424. func TestThrottling67Full(t *testing.T) { testThrottling(t, eth.ETH67, FullSync) }
  425. func TestThrottling67Snap(t *testing.T) { testThrottling(t, eth.ETH67, SnapSync) }
  426. func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
  427. tester := newTester(t)
  428. defer tester.terminate()
  429. // Create a long block chain to download and the tester
  430. targetBlocks := len(testChainBase.blocks) - 1
  431. tester.newPeer("peer", protocol, testChainBase.blocks[1:])
  432. // Wrap the importer to allow stepping
  433. blocked, proceed := uint32(0), make(chan struct{})
  434. tester.downloader.chainInsertHook = func(results []*fetchResult) {
  435. atomic.StoreUint32(&blocked, uint32(len(results)))
  436. <-proceed
  437. }
  438. // Start a synchronisation concurrently
  439. errc := make(chan error, 1)
  440. go func() {
  441. errc <- tester.sync("peer", nil, mode)
  442. }()
  443. // Iteratively take some blocks, always checking the retrieval count
  444. for {
  445. // Check the retrieval count synchronously (! reason for this ugly block)
  446. tester.lock.RLock()
  447. retrieved := int(tester.chain.CurrentFastBlock().Number().Uint64()) + 1
  448. tester.lock.RUnlock()
  449. if retrieved >= targetBlocks+1 {
  450. break
  451. }
  452. // Wait a bit for sync to throttle itself
  453. var cached, frozen int
  454. for start := time.Now(); time.Since(start) < 3*time.Second; {
  455. time.Sleep(25 * time.Millisecond)
  456. tester.lock.Lock()
  457. tester.downloader.queue.lock.Lock()
  458. tester.downloader.queue.resultCache.lock.Lock()
  459. {
  460. cached = tester.downloader.queue.resultCache.countCompleted()
  461. frozen = int(atomic.LoadUint32(&blocked))
  462. retrieved = int(tester.chain.CurrentFastBlock().Number().Uint64()) + 1
  463. }
  464. tester.downloader.queue.resultCache.lock.Unlock()
  465. tester.downloader.queue.lock.Unlock()
  466. tester.lock.Unlock()
  467. if cached == blockCacheMaxItems ||
  468. cached == blockCacheMaxItems-reorgProtHeaderDelay ||
  469. retrieved+cached+frozen == targetBlocks+1 ||
  470. retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
  471. break
  472. }
  473. }
  474. // Make sure we filled up the cache, then exhaust it
  475. time.Sleep(25 * time.Millisecond) // give it a chance to screw up
  476. tester.lock.RLock()
  477. retrieved = int(tester.chain.CurrentFastBlock().Number().Uint64()) + 1
  478. tester.lock.RUnlock()
  479. if cached != blockCacheMaxItems && cached != blockCacheMaxItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
  480. t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
  481. }
  482. // Permit the blocked blocks to import
  483. if atomic.LoadUint32(&blocked) > 0 {
  484. atomic.StoreUint32(&blocked, uint32(0))
  485. proceed <- struct{}{}
  486. }
  487. }
  488. // Check that we haven't pulled more blocks than available
  489. assertOwnChain(t, tester, targetBlocks+1)
  490. if err := <-errc; err != nil {
  491. t.Fatalf("block synchronization failed: %v", err)
  492. }
  493. }
  494. // Tests that simple synchronization against a forked chain works correctly. In
  495. // this test common ancestor lookup should *not* be short circuited, and a full
  496. // binary search should be executed.
  497. func TestForkedSync66Full(t *testing.T) { testForkedSync(t, eth.ETH66, FullSync) }
  498. func TestForkedSync66Snap(t *testing.T) { testForkedSync(t, eth.ETH66, SnapSync) }
  499. func TestForkedSync66Light(t *testing.T) { testForkedSync(t, eth.ETH66, LightSync) }
  500. func TestForkedSync67Full(t *testing.T) { testForkedSync(t, eth.ETH67, FullSync) }
  501. func TestForkedSync67Snap(t *testing.T) { testForkedSync(t, eth.ETH67, SnapSync) }
  502. func TestForkedSync67Light(t *testing.T) { testForkedSync(t, eth.ETH67, LightSync) }
  503. func testForkedSync(t *testing.T, protocol uint, mode SyncMode) {
  504. tester := newTester(t)
  505. defer tester.terminate()
  506. chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + 80)
  507. chainB := testChainForkLightB.shorten(len(testChainBase.blocks) + 81)
  508. tester.newPeer("fork A", protocol, chainA.blocks[1:])
  509. tester.newPeer("fork B", protocol, chainB.blocks[1:])
  510. // Synchronise with the peer and make sure all blocks were retrieved
  511. if err := tester.sync("fork A", nil, mode); err != nil {
  512. t.Fatalf("failed to synchronise blocks: %v", err)
  513. }
  514. assertOwnChain(t, tester, len(chainA.blocks))
  515. // Synchronise with the second peer and make sure that fork is pulled too
  516. if err := tester.sync("fork B", nil, mode); err != nil {
  517. t.Fatalf("failed to synchronise blocks: %v", err)
  518. }
  519. assertOwnChain(t, tester, len(chainB.blocks))
  520. }
  521. // Tests that synchronising against a much shorter but much heavier fork works
  522. // currently and is not dropped.
  523. func TestHeavyForkedSync66Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH66, FullSync) }
  524. func TestHeavyForkedSync66Snap(t *testing.T) { testHeavyForkedSync(t, eth.ETH66, SnapSync) }
  525. func TestHeavyForkedSync66Light(t *testing.T) { testHeavyForkedSync(t, eth.ETH66, LightSync) }
  526. func TestHeavyForkedSync67Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, FullSync) }
  527. func TestHeavyForkedSync67Snap(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, SnapSync) }
  528. func TestHeavyForkedSync67Light(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, LightSync) }
  529. func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) {
  530. tester := newTester(t)
  531. defer tester.terminate()
  532. chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + 80)
  533. chainB := testChainForkHeavy.shorten(len(testChainBase.blocks) + 79)
  534. tester.newPeer("light", protocol, chainA.blocks[1:])
  535. tester.newPeer("heavy", protocol, chainB.blocks[1:])
  536. // Synchronise with the peer and make sure all blocks were retrieved
  537. if err := tester.sync("light", nil, mode); err != nil {
  538. t.Fatalf("failed to synchronise blocks: %v", err)
  539. }
  540. assertOwnChain(t, tester, len(chainA.blocks))
  541. // Synchronise with the second peer and make sure that fork is pulled too
  542. if err := tester.sync("heavy", nil, mode); err != nil {
  543. t.Fatalf("failed to synchronise blocks: %v", err)
  544. }
  545. assertOwnChain(t, tester, len(chainB.blocks))
  546. }
  547. // Tests that chain forks are contained within a certain interval of the current
  548. // chain head, ensuring that malicious peers cannot waste resources by feeding
  549. // long dead chains.
  550. func TestBoundedForkedSync66Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH66, FullSync) }
  551. func TestBoundedForkedSync66Snap(t *testing.T) { testBoundedForkedSync(t, eth.ETH66, SnapSync) }
  552. func TestBoundedForkedSync66Light(t *testing.T) { testBoundedForkedSync(t, eth.ETH66, LightSync) }
  553. func TestBoundedForkedSync67Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, FullSync) }
  554. func TestBoundedForkedSync67Snap(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, SnapSync) }
  555. func TestBoundedForkedSync67Light(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, LightSync) }
  556. func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) {
  557. tester := newTester(t)
  558. defer tester.terminate()
  559. chainA := testChainForkLightA
  560. chainB := testChainForkLightB
  561. tester.newPeer("original", protocol, chainA.blocks[1:])
  562. tester.newPeer("rewriter", protocol, chainB.blocks[1:])
  563. // Synchronise with the peer and make sure all blocks were retrieved
  564. if err := tester.sync("original", nil, mode); err != nil {
  565. t.Fatalf("failed to synchronise blocks: %v", err)
  566. }
  567. assertOwnChain(t, tester, len(chainA.blocks))
  568. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  569. if err := tester.sync("rewriter", nil, mode); err != errInvalidAncestor {
  570. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  571. }
  572. }
  573. // Tests that chain forks are contained within a certain interval of the current
  574. // chain head for short but heavy forks too. These are a bit special because they
  575. // take different ancestor lookup paths.
  576. func TestBoundedHeavyForkedSync66Full(t *testing.T) {
  577. testBoundedHeavyForkedSync(t, eth.ETH66, FullSync)
  578. }
  579. func TestBoundedHeavyForkedSync66Snap(t *testing.T) {
  580. testBoundedHeavyForkedSync(t, eth.ETH66, SnapSync)
  581. }
  582. func TestBoundedHeavyForkedSync66Light(t *testing.T) {
  583. testBoundedHeavyForkedSync(t, eth.ETH66, LightSync)
  584. }
  585. func TestBoundedHeavyForkedSync67Full(t *testing.T) {
  586. testBoundedHeavyForkedSync(t, eth.ETH67, FullSync)
  587. }
  588. func TestBoundedHeavyForkedSync67Snap(t *testing.T) {
  589. testBoundedHeavyForkedSync(t, eth.ETH67, SnapSync)
  590. }
  591. func TestBoundedHeavyForkedSync67Light(t *testing.T) {
  592. testBoundedHeavyForkedSync(t, eth.ETH67, LightSync)
  593. }
  594. func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) {
  595. tester := newTester(t)
  596. defer tester.terminate()
  597. // Create a long enough forked chain
  598. chainA := testChainForkLightA
  599. chainB := testChainForkHeavy
  600. tester.newPeer("original", protocol, chainA.blocks[1:])
  601. // Synchronise with the peer and make sure all blocks were retrieved
  602. if err := tester.sync("original", nil, mode); err != nil {
  603. t.Fatalf("failed to synchronise blocks: %v", err)
  604. }
  605. assertOwnChain(t, tester, len(chainA.blocks))
  606. tester.newPeer("heavy-rewriter", protocol, chainB.blocks[1:])
  607. // Synchronise with the second peer and ensure that the fork is rejected to being too old
  608. if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
  609. t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
  610. }
  611. }
  612. // Tests that a canceled download wipes all previously accumulated state.
  613. func TestCancel66Full(t *testing.T) { testCancel(t, eth.ETH66, FullSync) }
  614. func TestCancel66Snap(t *testing.T) { testCancel(t, eth.ETH66, SnapSync) }
  615. func TestCancel66Light(t *testing.T) { testCancel(t, eth.ETH66, LightSync) }
  616. func TestCancel67Full(t *testing.T) { testCancel(t, eth.ETH67, FullSync) }
  617. func TestCancel67Snap(t *testing.T) { testCancel(t, eth.ETH67, SnapSync) }
  618. func TestCancel67Light(t *testing.T) { testCancel(t, eth.ETH67, LightSync) }
  619. func testCancel(t *testing.T, protocol uint, mode SyncMode) {
  620. tester := newTester(t)
  621. defer tester.terminate()
  622. chain := testChainBase.shorten(MaxHeaderFetch)
  623. tester.newPeer("peer", protocol, chain.blocks[1:])
  624. // Make sure canceling works with a pristine downloader
  625. tester.downloader.Cancel()
  626. if !tester.downloader.queue.Idle() {
  627. t.Errorf("download queue not idle")
  628. }
  629. // Synchronise with the peer, but cancel afterwards
  630. if err := tester.sync("peer", nil, mode); err != nil {
  631. t.Fatalf("failed to synchronise blocks: %v", err)
  632. }
  633. tester.downloader.Cancel()
  634. if !tester.downloader.queue.Idle() {
  635. t.Errorf("download queue not idle")
  636. }
  637. }
  638. // Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
  639. func TestMultiSynchronisation66Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH66, FullSync) }
  640. func TestMultiSynchronisation66Snap(t *testing.T) { testMultiSynchronisation(t, eth.ETH66, SnapSync) }
  641. func TestMultiSynchronisation66Light(t *testing.T) { testMultiSynchronisation(t, eth.ETH66, LightSync) }
  642. func TestMultiSynchronisation67Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, FullSync) }
  643. func TestMultiSynchronisation67Snap(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, SnapSync) }
  644. func TestMultiSynchronisation67Light(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, LightSync) }
  645. func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) {
  646. tester := newTester(t)
  647. defer tester.terminate()
  648. // Create various peers with various parts of the chain
  649. targetPeers := 8
  650. chain := testChainBase.shorten(targetPeers * 100)
  651. for i := 0; i < targetPeers; i++ {
  652. id := fmt.Sprintf("peer #%d", i)
  653. tester.newPeer(id, protocol, chain.shorten(len(chain.blocks) / (i + 1)).blocks[1:])
  654. }
  655. if err := tester.sync("peer #0", nil, mode); err != nil {
  656. t.Fatalf("failed to synchronise blocks: %v", err)
  657. }
  658. assertOwnChain(t, tester, len(chain.blocks))
  659. }
  660. // Tests that synchronisations behave well in multi-version protocol environments
  661. // and not wreak havoc on other nodes in the network.
  662. func TestMultiProtoSynchronisation66Full(t *testing.T) { testMultiProtoSync(t, eth.ETH66, FullSync) }
  663. func TestMultiProtoSynchronisation66Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH66, SnapSync) }
  664. func TestMultiProtoSynchronisation66Light(t *testing.T) { testMultiProtoSync(t, eth.ETH66, LightSync) }
  665. func TestMultiProtoSynchronisation67Full(t *testing.T) { testMultiProtoSync(t, eth.ETH67, FullSync) }
  666. func TestMultiProtoSynchronisation67Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH67, SnapSync) }
  667. func TestMultiProtoSynchronisation67Light(t *testing.T) { testMultiProtoSync(t, eth.ETH67, LightSync) }
  668. func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {
  669. tester := newTester(t)
  670. defer tester.terminate()
  671. // Create a small enough block chain to download
  672. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  673. // Create peers of every type
  674. tester.newPeer("peer 66", eth.ETH66, chain.blocks[1:])
  675. tester.newPeer("peer 67", eth.ETH67, chain.blocks[1:])
  676. // Synchronise with the requested peer and make sure all blocks were retrieved
  677. if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil {
  678. t.Fatalf("failed to synchronise blocks: %v", err)
  679. }
  680. assertOwnChain(t, tester, len(chain.blocks))
  681. // Check that no peers have been dropped off
  682. for _, version := range []int{66, 67} {
  683. peer := fmt.Sprintf("peer %d", version)
  684. if _, ok := tester.peers[peer]; !ok {
  685. t.Errorf("%s dropped", peer)
  686. }
  687. }
  688. }
  689. // Tests that if a block is empty (e.g. header only), no body request should be
  690. // made, and instead the header should be assembled into a whole block in itself.
  691. func TestEmptyShortCircuit66Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH66, FullSync) }
  692. func TestEmptyShortCircuit66Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH66, SnapSync) }
  693. func TestEmptyShortCircuit66Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH66, LightSync) }
  694. func TestEmptyShortCircuit67Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, FullSync) }
  695. func TestEmptyShortCircuit67Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, SnapSync) }
  696. func TestEmptyShortCircuit67Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, LightSync) }
  697. func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
  698. tester := newTester(t)
  699. defer tester.terminate()
  700. // Create a block chain to download
  701. chain := testChainBase
  702. tester.newPeer("peer", protocol, chain.blocks[1:])
  703. // Instrument the downloader to signal body requests
  704. bodiesHave, receiptsHave := int32(0), int32(0)
  705. tester.downloader.bodyFetchHook = func(headers []*types.Header) {
  706. atomic.AddInt32(&bodiesHave, int32(len(headers)))
  707. }
  708. tester.downloader.receiptFetchHook = func(headers []*types.Header) {
  709. atomic.AddInt32(&receiptsHave, int32(len(headers)))
  710. }
  711. // Synchronise with the peer and make sure all blocks were retrieved
  712. if err := tester.sync("peer", nil, mode); err != nil {
  713. t.Fatalf("failed to synchronise blocks: %v", err)
  714. }
  715. assertOwnChain(t, tester, len(chain.blocks))
  716. // Validate the number of block bodies that should have been requested
  717. bodiesNeeded, receiptsNeeded := 0, 0
  718. for _, block := range chain.blocks[1:] {
  719. if mode != LightSync && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
  720. bodiesNeeded++
  721. }
  722. }
  723. for _, block := range chain.blocks[1:] {
  724. if mode == SnapSync && len(block.Transactions()) > 0 {
  725. receiptsNeeded++
  726. }
  727. }
  728. if int(bodiesHave) != bodiesNeeded {
  729. t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
  730. }
  731. if int(receiptsHave) != receiptsNeeded {
  732. t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
  733. }
  734. }
  735. // Tests that headers are enqueued continuously, preventing malicious nodes from
  736. // stalling the downloader by feeding gapped header chains.
  737. func TestMissingHeaderAttack66Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH66, FullSync) }
  738. func TestMissingHeaderAttack66Snap(t *testing.T) { testMissingHeaderAttack(t, eth.ETH66, SnapSync) }
  739. func TestMissingHeaderAttack66Light(t *testing.T) { testMissingHeaderAttack(t, eth.ETH66, LightSync) }
  740. func TestMissingHeaderAttack67Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, FullSync) }
  741. func TestMissingHeaderAttack67Snap(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, SnapSync) }
  742. func TestMissingHeaderAttack67Light(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, LightSync) }
  743. func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) {
  744. tester := newTester(t)
  745. defer tester.terminate()
  746. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  747. attacker := tester.newPeer("attack", protocol, chain.blocks[1:])
  748. attacker.withholdHeaders[chain.blocks[len(chain.blocks)/2-1].Hash()] = struct{}{}
  749. if err := tester.sync("attack", nil, mode); err == nil {
  750. t.Fatalf("succeeded attacker synchronisation")
  751. }
  752. // Synchronise with the valid peer and make sure sync succeeds
  753. tester.newPeer("valid", protocol, chain.blocks[1:])
  754. if err := tester.sync("valid", nil, mode); err != nil {
  755. t.Fatalf("failed to synchronise blocks: %v", err)
  756. }
  757. assertOwnChain(t, tester, len(chain.blocks))
  758. }
  759. // Tests that if requested headers are shifted (i.e. first is missing), the queue
  760. // detects the invalid numbering.
  761. func TestShiftedHeaderAttack66Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH66, FullSync) }
  762. func TestShiftedHeaderAttack66Snap(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH66, SnapSync) }
  763. func TestShiftedHeaderAttack66Light(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH66, LightSync) }
  764. func TestShiftedHeaderAttack67Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, FullSync) }
  765. func TestShiftedHeaderAttack67Snap(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, SnapSync) }
  766. func TestShiftedHeaderAttack67Light(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, LightSync) }
  767. func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) {
  768. tester := newTester(t)
  769. defer tester.terminate()
  770. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  771. // Attempt a full sync with an attacker feeding shifted headers
  772. attacker := tester.newPeer("attack", protocol, chain.blocks[1:])
  773. attacker.withholdHeaders[chain.blocks[1].Hash()] = struct{}{}
  774. if err := tester.sync("attack", nil, mode); err == nil {
  775. t.Fatalf("succeeded attacker synchronisation")
  776. }
  777. // Synchronise with the valid peer and make sure sync succeeds
  778. tester.newPeer("valid", protocol, chain.blocks[1:])
  779. if err := tester.sync("valid", nil, mode); err != nil {
  780. t.Fatalf("failed to synchronise blocks: %v", err)
  781. }
  782. assertOwnChain(t, tester, len(chain.blocks))
  783. }
  784. // Tests that upon detecting an invalid header, the recent ones are rolled back
  785. // for various failure scenarios. Afterwards a full sync is attempted to make
  786. // sure no state was corrupted.
  787. func TestInvalidHeaderRollback66Snap(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH66, SnapSync) }
  788. func TestInvalidHeaderRollback67Snap(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH67, SnapSync) }
  789. func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) {
  790. tester := newTester(t)
  791. defer tester.terminate()
  792. // Create a small enough block chain to download
  793. targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
  794. chain := testChainBase.shorten(targetBlocks)
  795. // Attempt to sync with an attacker that feeds junk during the fast sync phase.
  796. // This should result in the last fsHeaderSafetyNet headers being rolled back.
  797. missing := fsHeaderSafetyNet + MaxHeaderFetch + 1
  798. fastAttacker := tester.newPeer("fast-attack", protocol, chain.blocks[1:])
  799. fastAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{}
  800. if err := tester.sync("fast-attack", nil, mode); err == nil {
  801. t.Fatalf("succeeded fast attacker synchronisation")
  802. }
  803. if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
  804. t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
  805. }
  806. // Attempt to sync with an attacker that feeds junk during the block import phase.
  807. // This should result in both the last fsHeaderSafetyNet number of headers being
  808. // rolled back, and also the pivot point being reverted to a non-block status.
  809. missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1
  810. blockAttacker := tester.newPeer("block-attack", protocol, chain.blocks[1:])
  811. fastAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{} // Make sure the fast-attacker doesn't fill in
  812. blockAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{}
  813. if err := tester.sync("block-attack", nil, mode); err == nil {
  814. t.Fatalf("succeeded block attacker synchronisation")
  815. }
  816. if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  817. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  818. }
  819. if mode == SnapSync {
  820. if head := tester.chain.CurrentBlock().NumberU64(); head != 0 {
  821. t.Errorf("fast sync pivot block #%d not rolled back", head)
  822. }
  823. }
  824. // Attempt to sync with an attacker that withholds promised blocks after the
  825. // fast sync pivot point. This could be a trial to leave the node with a bad
  826. // but already imported pivot block.
  827. withholdAttacker := tester.newPeer("withhold-attack", protocol, chain.blocks[1:])
  828. tester.downloader.syncInitHook = func(uint64, uint64) {
  829. for i := missing; i < len(chain.blocks); i++ {
  830. withholdAttacker.withholdHeaders[chain.blocks[i].Hash()] = struct{}{}
  831. }
  832. tester.downloader.syncInitHook = nil
  833. }
  834. if err := tester.sync("withhold-attack", nil, mode); err == nil {
  835. t.Fatalf("succeeded withholding attacker synchronisation")
  836. }
  837. if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
  838. t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
  839. }
  840. if mode == SnapSync {
  841. if head := tester.chain.CurrentBlock().NumberU64(); head != 0 {
  842. t.Errorf("fast sync pivot block #%d not rolled back", head)
  843. }
  844. }
  845. // Synchronise with the valid peer and make sure sync succeeds. Since the last rollback
  846. // should also disable fast syncing for this process, verify that we did a fresh full
  847. // sync. Note, we can't assert anything about the receipts since we won't purge the
  848. // database of them, hence we can't use assertOwnChain.
  849. tester.newPeer("valid", protocol, chain.blocks[1:])
  850. if err := tester.sync("valid", nil, mode); err != nil {
  851. t.Fatalf("failed to synchronise blocks: %v", err)
  852. }
  853. assertOwnChain(t, tester, len(chain.blocks))
  854. }
  855. // Tests that a peer advertising a high TD doesn't get to stall the downloader
  856. // afterwards by not sending any useful hashes.
  857. func TestHighTDStarvationAttack66Full(t *testing.T) {
  858. testHighTDStarvationAttack(t, eth.ETH66, FullSync)
  859. }
  860. func TestHighTDStarvationAttack66Snap(t *testing.T) {
  861. testHighTDStarvationAttack(t, eth.ETH66, SnapSync)
  862. }
  863. func TestHighTDStarvationAttack66Light(t *testing.T) {
  864. testHighTDStarvationAttack(t, eth.ETH66, LightSync)
  865. }
  866. func TestHighTDStarvationAttack67Full(t *testing.T) {
  867. testHighTDStarvationAttack(t, eth.ETH67, FullSync)
  868. }
  869. func TestHighTDStarvationAttack67Snap(t *testing.T) {
  870. testHighTDStarvationAttack(t, eth.ETH67, SnapSync)
  871. }
  872. func TestHighTDStarvationAttack67Light(t *testing.T) {
  873. testHighTDStarvationAttack(t, eth.ETH67, LightSync)
  874. }
  875. func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) {
  876. tester := newTester(t)
  877. defer tester.terminate()
  878. chain := testChainBase.shorten(1)
  879. tester.newPeer("attack", protocol, chain.blocks[1:])
  880. if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
  881. t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
  882. }
  883. }
  884. // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
  885. func TestBlockHeaderAttackerDropping66(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH66) }
  886. func TestBlockHeaderAttackerDropping67(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH67) }
  887. func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) {
  888. // Define the disconnection requirement for individual hash fetch errors
  889. tests := []struct {
  890. result error
  891. drop bool
  892. }{
  893. {nil, false}, // Sync succeeded, all is well
  894. {errBusy, false}, // Sync is already in progress, no problem
  895. {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
  896. {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
  897. {errStallingPeer, true}, // Peer was detected to be stalling, drop it
  898. {errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it
  899. {errNoPeers, false}, // No peers to download from, soft race, no issue
  900. {errTimeout, true}, // No hashes received in due time, drop the peer
  901. {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
  902. {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
  903. {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
  904. {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
  905. {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
  906. {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
  907. {errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
  908. }
  909. // Run the tests and check disconnection status
  910. tester := newTester(t)
  911. defer tester.terminate()
  912. chain := testChainBase.shorten(1)
  913. for i, tt := range tests {
  914. // Register a new peer and ensure its presence
  915. id := fmt.Sprintf("test %d", i)
  916. tester.newPeer(id, protocol, chain.blocks[1:])
  917. if _, ok := tester.peers[id]; !ok {
  918. t.Fatalf("test %d: registered peer not found", i)
  919. }
  920. // Simulate a synchronisation and check the required result
  921. tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
  922. tester.downloader.LegacySync(id, tester.chain.Genesis().Hash(), big.NewInt(1000), nil, FullSync)
  923. if _, ok := tester.peers[id]; !ok != tt.drop {
  924. t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
  925. }
  926. }
  927. }
  928. // Tests that synchronisation progress (origin block number, current block number
  929. // and highest block number) is tracked and updated correctly.
  930. func TestSyncProgress66Full(t *testing.T) { testSyncProgress(t, eth.ETH66, FullSync) }
  931. func TestSyncProgress66Snap(t *testing.T) { testSyncProgress(t, eth.ETH66, SnapSync) }
  932. func TestSyncProgress66Light(t *testing.T) { testSyncProgress(t, eth.ETH66, LightSync) }
  933. func TestSyncProgress67Full(t *testing.T) { testSyncProgress(t, eth.ETH67, FullSync) }
  934. func TestSyncProgress67Snap(t *testing.T) { testSyncProgress(t, eth.ETH67, SnapSync) }
  935. func TestSyncProgress67Light(t *testing.T) { testSyncProgress(t, eth.ETH67, LightSync) }
  936. func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
  937. tester := newTester(t)
  938. defer tester.terminate()
  939. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  940. // Set a sync init hook to catch progress changes
  941. starting := make(chan struct{})
  942. progress := make(chan struct{})
  943. tester.downloader.syncInitHook = func(origin, latest uint64) {
  944. starting <- struct{}{}
  945. <-progress
  946. }
  947. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  948. // Synchronise half the blocks and check initial progress
  949. tester.newPeer("peer-half", protocol, chain.shorten(len(chain.blocks) / 2).blocks[1:])
  950. pending := new(sync.WaitGroup)
  951. pending.Add(1)
  952. go func() {
  953. defer pending.Done()
  954. if err := tester.sync("peer-half", nil, mode); err != nil {
  955. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  956. }
  957. }()
  958. <-starting
  959. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  960. HighestBlock: uint64(len(chain.blocks)/2 - 1),
  961. })
  962. progress <- struct{}{}
  963. pending.Wait()
  964. // Synchronise all the blocks and check continuation progress
  965. tester.newPeer("peer-full", protocol, chain.blocks[1:])
  966. pending.Add(1)
  967. go func() {
  968. defer pending.Done()
  969. if err := tester.sync("peer-full", nil, mode); err != nil {
  970. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  971. }
  972. }()
  973. <-starting
  974. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  975. StartingBlock: uint64(len(chain.blocks)/2 - 1),
  976. CurrentBlock: uint64(len(chain.blocks)/2 - 1),
  977. HighestBlock: uint64(len(chain.blocks) - 1),
  978. })
  979. // Check final progress after successful sync
  980. progress <- struct{}{}
  981. pending.Wait()
  982. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  983. StartingBlock: uint64(len(chain.blocks)/2 - 1),
  984. CurrentBlock: uint64(len(chain.blocks) - 1),
  985. HighestBlock: uint64(len(chain.blocks) - 1),
  986. })
  987. }
  988. func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.SyncProgress) {
  989. // Mark this method as a helper to report errors at callsite, not in here
  990. t.Helper()
  991. p := d.Progress()
  992. if p.StartingBlock != want.StartingBlock || p.CurrentBlock != want.CurrentBlock || p.HighestBlock != want.HighestBlock {
  993. t.Fatalf("%s progress mismatch:\nhave %+v\nwant %+v", stage, p, want)
  994. }
  995. }
  996. // Tests that synchronisation progress (origin block number and highest block
  997. // number) is tracked and updated correctly in case of a fork (or manual head
  998. // revertal).
  999. func TestForkedSyncProgress66Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH66, FullSync) }
  1000. func TestForkedSyncProgress66Snap(t *testing.T) { testForkedSyncProgress(t, eth.ETH66, SnapSync) }
  1001. func TestForkedSyncProgress66Light(t *testing.T) { testForkedSyncProgress(t, eth.ETH66, LightSync) }
  1002. func TestForkedSyncProgress67Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, FullSync) }
  1003. func TestForkedSyncProgress67Snap(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, SnapSync) }
  1004. func TestForkedSyncProgress67Light(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, LightSync) }
  1005. func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
  1006. tester := newTester(t)
  1007. defer tester.terminate()
  1008. chainA := testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch)
  1009. chainB := testChainForkLightB.shorten(len(testChainBase.blocks) + MaxHeaderFetch)
  1010. // Set a sync init hook to catch progress changes
  1011. starting := make(chan struct{})
  1012. progress := make(chan struct{})
  1013. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1014. starting <- struct{}{}
  1015. <-progress
  1016. }
  1017. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1018. // Synchronise with one of the forks and check progress
  1019. tester.newPeer("fork A", protocol, chainA.blocks[1:])
  1020. pending := new(sync.WaitGroup)
  1021. pending.Add(1)
  1022. go func() {
  1023. defer pending.Done()
  1024. if err := tester.sync("fork A", nil, mode); err != nil {
  1025. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1026. }
  1027. }()
  1028. <-starting
  1029. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1030. HighestBlock: uint64(len(chainA.blocks) - 1),
  1031. })
  1032. progress <- struct{}{}
  1033. pending.Wait()
  1034. // Simulate a successful sync above the fork
  1035. tester.downloader.syncStatsChainOrigin = tester.downloader.syncStatsChainHeight
  1036. // Synchronise with the second fork and check progress resets
  1037. tester.newPeer("fork B", protocol, chainB.blocks[1:])
  1038. pending.Add(1)
  1039. go func() {
  1040. defer pending.Done()
  1041. if err := tester.sync("fork B", nil, mode); err != nil {
  1042. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1043. }
  1044. }()
  1045. <-starting
  1046. checkProgress(t, tester.downloader, "forking", ethereum.SyncProgress{
  1047. StartingBlock: uint64(len(testChainBase.blocks)) - 1,
  1048. CurrentBlock: uint64(len(chainA.blocks) - 1),
  1049. HighestBlock: uint64(len(chainB.blocks) - 1),
  1050. })
  1051. // Check final progress after successful sync
  1052. progress <- struct{}{}
  1053. pending.Wait()
  1054. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1055. StartingBlock: uint64(len(testChainBase.blocks)) - 1,
  1056. CurrentBlock: uint64(len(chainB.blocks) - 1),
  1057. HighestBlock: uint64(len(chainB.blocks) - 1),
  1058. })
  1059. }
  1060. // Tests that if synchronisation is aborted due to some failure, then the progress
  1061. // origin is not updated in the next sync cycle, as it should be considered the
  1062. // continuation of the previous sync and not a new instance.
  1063. func TestFailedSyncProgress66Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH66, FullSync) }
  1064. func TestFailedSyncProgress66Snap(t *testing.T) { testFailedSyncProgress(t, eth.ETH66, SnapSync) }
  1065. func TestFailedSyncProgress66Light(t *testing.T) { testFailedSyncProgress(t, eth.ETH66, LightSync) }
  1066. func TestFailedSyncProgress67Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, FullSync) }
  1067. func TestFailedSyncProgress67Snap(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, SnapSync) }
  1068. func TestFailedSyncProgress67Light(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, LightSync) }
  1069. func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
  1070. tester := newTester(t)
  1071. defer tester.terminate()
  1072. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  1073. // Set a sync init hook to catch progress changes
  1074. starting := make(chan struct{})
  1075. progress := make(chan struct{})
  1076. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1077. starting <- struct{}{}
  1078. <-progress
  1079. }
  1080. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1081. // Attempt a full sync with a faulty peer
  1082. missing := len(chain.blocks)/2 - 1
  1083. faulter := tester.newPeer("faulty", protocol, chain.blocks[1:])
  1084. faulter.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{}
  1085. pending := new(sync.WaitGroup)
  1086. pending.Add(1)
  1087. go func() {
  1088. defer pending.Done()
  1089. if err := tester.sync("faulty", nil, mode); err == nil {
  1090. panic("succeeded faulty synchronisation")
  1091. }
  1092. }()
  1093. <-starting
  1094. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1095. HighestBlock: uint64(len(chain.blocks) - 1),
  1096. })
  1097. progress <- struct{}{}
  1098. pending.Wait()
  1099. afterFailedSync := tester.downloader.Progress()
  1100. // Synchronise with a good peer and check that the progress origin remind the same
  1101. // after a failure
  1102. tester.newPeer("valid", protocol, chain.blocks[1:])
  1103. pending.Add(1)
  1104. go func() {
  1105. defer pending.Done()
  1106. if err := tester.sync("valid", nil, mode); err != nil {
  1107. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1108. }
  1109. }()
  1110. <-starting
  1111. checkProgress(t, tester.downloader, "completing", afterFailedSync)
  1112. // Check final progress after successful sync
  1113. progress <- struct{}{}
  1114. pending.Wait()
  1115. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1116. CurrentBlock: uint64(len(chain.blocks) - 1),
  1117. HighestBlock: uint64(len(chain.blocks) - 1),
  1118. })
  1119. }
  1120. // Tests that if an attacker fakes a chain height, after the attack is detected,
  1121. // the progress height is successfully reduced at the next sync invocation.
  1122. func TestFakedSyncProgress66Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH66, FullSync) }
  1123. func TestFakedSyncProgress66Snap(t *testing.T) { testFakedSyncProgress(t, eth.ETH66, SnapSync) }
  1124. func TestFakedSyncProgress66Light(t *testing.T) { testFakedSyncProgress(t, eth.ETH66, LightSync) }
  1125. func TestFakedSyncProgress67Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, FullSync) }
  1126. func TestFakedSyncProgress67Snap(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, SnapSync) }
  1127. func TestFakedSyncProgress67Light(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, LightSync) }
  1128. func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
  1129. tester := newTester(t)
  1130. defer tester.terminate()
  1131. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  1132. // Set a sync init hook to catch progress changes
  1133. starting := make(chan struct{})
  1134. progress := make(chan struct{})
  1135. tester.downloader.syncInitHook = func(origin, latest uint64) {
  1136. starting <- struct{}{}
  1137. <-progress
  1138. }
  1139. checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})
  1140. // Create and sync with an attacker that promises a higher chain than available.
  1141. attacker := tester.newPeer("attack", protocol, chain.blocks[1:])
  1142. numMissing := 5
  1143. for i := len(chain.blocks) - 2; i > len(chain.blocks)-numMissing; i-- {
  1144. attacker.withholdHeaders[chain.blocks[i].Hash()] = struct{}{}
  1145. }
  1146. pending := new(sync.WaitGroup)
  1147. pending.Add(1)
  1148. go func() {
  1149. defer pending.Done()
  1150. if err := tester.sync("attack", nil, mode); err == nil {
  1151. panic("succeeded attacker synchronisation")
  1152. }
  1153. }()
  1154. <-starting
  1155. checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
  1156. HighestBlock: uint64(len(chain.blocks) - 1),
  1157. })
  1158. progress <- struct{}{}
  1159. pending.Wait()
  1160. afterFailedSync := tester.downloader.Progress()
  1161. // Synchronise with a good peer and check that the progress height has been reduced to
  1162. // the true value.
  1163. validChain := chain.shorten(len(chain.blocks) - numMissing)
  1164. tester.newPeer("valid", protocol, validChain.blocks[1:])
  1165. pending.Add(1)
  1166. go func() {
  1167. defer pending.Done()
  1168. if err := tester.sync("valid", nil, mode); err != nil {
  1169. panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
  1170. }
  1171. }()
  1172. <-starting
  1173. checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
  1174. CurrentBlock: afterFailedSync.CurrentBlock,
  1175. HighestBlock: uint64(len(validChain.blocks) - 1),
  1176. })
  1177. // Check final progress after successful sync.
  1178. progress <- struct{}{}
  1179. pending.Wait()
  1180. checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
  1181. CurrentBlock: uint64(len(validChain.blocks) - 1),
  1182. HighestBlock: uint64(len(validChain.blocks) - 1),
  1183. })
  1184. }
  1185. func TestRemoteHeaderRequestSpan(t *testing.T) {
  1186. testCases := []struct {
  1187. remoteHeight uint64
  1188. localHeight uint64
  1189. expected []int
  1190. }{
  1191. // Remote is way higher. We should ask for the remote head and go backwards
  1192. {1500, 1000,
  1193. []int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499},
  1194. },
  1195. {15000, 13006,
  1196. []int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999},
  1197. },
  1198. // Remote is pretty close to us. We don't have to fetch as many
  1199. {1200, 1150,
  1200. []int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199},
  1201. },
  1202. // Remote is equal to us (so on a fork with higher td)
  1203. // We should get the closest couple of ancestors
  1204. {1500, 1500,
  1205. []int{1497, 1499},
  1206. },
  1207. // We're higher than the remote! Odd
  1208. {1000, 1500,
  1209. []int{997, 999},
  1210. },
  1211. // Check some weird edgecases that it behaves somewhat rationally
  1212. {0, 1500,
  1213. []int{0, 2},
  1214. },
  1215. {6000000, 0,
  1216. []int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999},
  1217. },
  1218. {0, 0,
  1219. []int{0, 2},
  1220. },
  1221. }
  1222. reqs := func(from, count, span int) []int {
  1223. var r []int
  1224. num := from
  1225. for len(r) < count {
  1226. r = append(r, num)
  1227. num += span + 1
  1228. }
  1229. return r
  1230. }
  1231. for i, tt := range testCases {
  1232. from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
  1233. data := reqs(int(from), count, span)
  1234. if max != uint64(data[len(data)-1]) {
  1235. t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
  1236. }
  1237. failed := false
  1238. if len(data) != len(tt.expected) {
  1239. failed = true
  1240. t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
  1241. } else {
  1242. for j, n := range data {
  1243. if n != tt.expected[j] {
  1244. failed = true
  1245. break
  1246. }
  1247. }
  1248. }
  1249. if failed {
  1250. res := strings.ReplaceAll(fmt.Sprint(data), " ", ",")
  1251. exp := strings.ReplaceAll(fmt.Sprint(tt.expected), " ", ",")
  1252. t.Logf("got: %v\n", res)
  1253. t.Logf("exp: %v\n", exp)
  1254. t.Errorf("test %d: wrong values", i)
  1255. }
  1256. }
  1257. }
  1258. // Tests that peers below a pre-configured checkpoint block are prevented from
  1259. // being fast-synced from, avoiding potential cheap eclipse attacks.
  1260. func TestCheckpointEnforcement66Full(t *testing.T) { testCheckpointEnforcement(t, eth.ETH66, FullSync) }
  1261. func TestCheckpointEnforcement66Snap(t *testing.T) { testCheckpointEnforcement(t, eth.ETH66, SnapSync) }
  1262. func TestCheckpointEnforcement66Light(t *testing.T) {
  1263. testCheckpointEnforcement(t, eth.ETH66, LightSync)
  1264. }
  1265. func TestCheckpointEnforcement67Full(t *testing.T) { testCheckpointEnforcement(t, eth.ETH67, FullSync) }
  1266. func TestCheckpointEnforcement67Snap(t *testing.T) { testCheckpointEnforcement(t, eth.ETH67, SnapSync) }
  1267. func TestCheckpointEnforcement67Light(t *testing.T) {
  1268. testCheckpointEnforcement(t, eth.ETH67, LightSync)
  1269. }
  1270. func testCheckpointEnforcement(t *testing.T, protocol uint, mode SyncMode) {
  1271. // Create a new tester with a particular hard coded checkpoint block
  1272. tester := newTester(t)
  1273. defer tester.terminate()
  1274. tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256
  1275. chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1)
  1276. // Attempt to sync with the peer and validate the result
  1277. tester.newPeer("peer", protocol, chain.blocks[1:])
  1278. var expect error
  1279. if mode == SnapSync || mode == LightSync {
  1280. expect = errUnsyncedPeer
  1281. }
  1282. if err := tester.sync("peer", nil, mode); !errors.Is(err, expect) {
  1283. t.Fatalf("block sync error mismatch: have %v, want %v", err, expect)
  1284. }
  1285. if mode == SnapSync || mode == LightSync {
  1286. assertOwnChain(t, tester, 1)
  1287. } else {
  1288. assertOwnChain(t, tester, len(chain.blocks))
  1289. }
  1290. }
  1291. // Tests that peers below a pre-configured checkpoint block are prevented from
  1292. // being fast-synced from, avoiding potential cheap eclipse attacks.
  1293. func TestBeaconSync66Full(t *testing.T) { testBeaconSync(t, eth.ETH66, FullSync) }
  1294. func TestBeaconSync66Snap(t *testing.T) { testBeaconSync(t, eth.ETH66, SnapSync) }
  1295. func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
  1296. //log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
  1297. var cases = []struct {
  1298. name string // The name of testing scenario
  1299. local int // The length of local chain(canonical chain assumed), 0 means genesis is the head
  1300. }{
  1301. {name: "Beacon sync since genesis", local: 0},
  1302. {name: "Beacon sync with short local chain", local: 1},
  1303. {name: "Beacon sync with long local chain", local: blockCacheMaxItems - 15 - fsMinFullBlocks/2},
  1304. {name: "Beacon sync with full local chain", local: blockCacheMaxItems - 15 - 1},
  1305. }
  1306. for _, c := range cases {
  1307. t.Run(c.name, func(t *testing.T) {
  1308. success := make(chan struct{})
  1309. tester := newTesterWithNotification(t, func() {
  1310. close(success)
  1311. })
  1312. defer tester.terminate()
  1313. chain := testChainBase.shorten(blockCacheMaxItems - 15)
  1314. tester.newPeer("peer", protocol, chain.blocks[1:])
  1315. // Build the local chain segment if it's required
  1316. if c.local > 0 {
  1317. tester.chain.InsertChain(chain.blocks[1 : c.local+1])
  1318. }
  1319. if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header()); err != nil {
  1320. t.Fatalf("Failed to beacon sync chain %v %v", c.name, err)
  1321. }
  1322. select {
  1323. case <-success:
  1324. // Ok, downloader fully cancelled after sync cycle
  1325. if bs := int(tester.chain.CurrentBlock().NumberU64()) + 1; bs != len(chain.blocks) {
  1326. t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, len(chain.blocks))
  1327. }
  1328. case <-time.NewTimer(time.Second * 3).C:
  1329. t.Fatalf("Failed to sync chain in three seconds")
  1330. }
  1331. })
  1332. }
  1333. }