sync_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package trie
  17. import (
  18. "bytes"
  19. "fmt"
  20. "testing"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/crypto"
  23. "github.com/ethereum/go-ethereum/ethdb/memorydb"
  24. )
  25. // makeTestTrie create a sample test trie to test node-wise reconstruction.
  26. func makeTestTrie() (*Database, *StateTrie, map[string][]byte) {
  27. // Create an empty trie
  28. triedb := NewDatabase(memorydb.New())
  29. trie, _ := NewStateTrie(common.Hash{}, common.Hash{}, triedb)
  30. // Fill it with some arbitrary data
  31. content := make(map[string][]byte)
  32. for i := byte(0); i < 255; i++ {
  33. // Map the same data under multiple keys
  34. key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i}
  35. content[string(key)] = val
  36. trie.Update(key, val)
  37. key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i}
  38. content[string(key)] = val
  39. trie.Update(key, val)
  40. // Add some other data to inflate the trie
  41. for j := byte(3); j < 13; j++ {
  42. key, val = common.LeftPadBytes([]byte{j, i}, 32), []byte{j, i}
  43. content[string(key)] = val
  44. trie.Update(key, val)
  45. }
  46. }
  47. root, nodes, err := trie.Commit(false)
  48. if err != nil {
  49. panic(fmt.Errorf("failed to commit trie %v", err))
  50. }
  51. if err := triedb.Update(NewWithNodeSet(nodes)); err != nil {
  52. panic(fmt.Errorf("failed to commit db %v", err))
  53. }
  54. // Re-create the trie based on the new state
  55. trie, _ = NewSecure(common.Hash{}, root, triedb)
  56. return triedb, trie, content
  57. }
  58. // checkTrieContents cross references a reconstructed trie with an expected data
  59. // content map.
  60. func checkTrieContents(t *testing.T, db *Database, root []byte, content map[string][]byte) {
  61. // Check root availability and trie contents
  62. trie, err := NewStateTrie(common.Hash{}, common.BytesToHash(root), db)
  63. if err != nil {
  64. t.Fatalf("failed to create trie at %x: %v", root, err)
  65. }
  66. if err := checkTrieConsistency(db, common.BytesToHash(root)); err != nil {
  67. t.Fatalf("inconsistent trie at %x: %v", root, err)
  68. }
  69. for key, val := range content {
  70. if have := trie.Get([]byte(key)); !bytes.Equal(have, val) {
  71. t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val)
  72. }
  73. }
  74. }
  75. // checkTrieConsistency checks that all nodes in a trie are indeed present.
  76. func checkTrieConsistency(db *Database, root common.Hash) error {
  77. // Create and iterate a trie rooted in a subnode
  78. trie, err := NewStateTrie(common.Hash{}, root, db)
  79. if err != nil {
  80. return nil // Consider a non existent state consistent
  81. }
  82. it := trie.NodeIterator(nil)
  83. for it.Next(true) {
  84. }
  85. return it.Error()
  86. }
  87. // trieElement represents the element in the state trie(bytecode or trie node).
  88. type trieElement struct {
  89. path string
  90. hash common.Hash
  91. syncPath SyncPath
  92. }
  93. // Tests that an empty trie is not scheduled for syncing.
  94. func TestEmptySync(t *testing.T) {
  95. dbA := NewDatabase(memorydb.New())
  96. dbB := NewDatabase(memorydb.New())
  97. emptyA := NewEmpty(dbA)
  98. emptyB, _ := New(common.Hash{}, emptyRoot, dbB)
  99. for i, trie := range []*Trie{emptyA, emptyB} {
  100. sync := NewSync(trie.Hash(), memorydb.New(), nil)
  101. if paths, nodes, codes := sync.Missing(1); len(paths) != 0 || len(nodes) != 0 || len(codes) != 0 {
  102. t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, paths, nodes, codes)
  103. }
  104. }
  105. }
  106. // Tests that given a root hash, a trie can sync iteratively on a single thread,
  107. // requesting retrieval tasks and returning all of them in one go.
  108. func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1, false) }
  109. func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100, false) }
  110. func TestIterativeSyncIndividualByPath(t *testing.T) { testIterativeSync(t, 1, true) }
  111. func TestIterativeSyncBatchedByPath(t *testing.T) { testIterativeSync(t, 100, true) }
  112. func testIterativeSync(t *testing.T, count int, bypath bool) {
  113. // Create a random trie to copy
  114. srcDb, srcTrie, srcData := makeTestTrie()
  115. // Create a destination trie and sync with the scheduler
  116. diskdb := memorydb.New()
  117. triedb := NewDatabase(diskdb)
  118. sched := NewSync(srcTrie.Hash(), diskdb, nil)
  119. // The code requests are ignored here since there is no code
  120. // at the testing trie.
  121. paths, nodes, _ := sched.Missing(count)
  122. var elements []trieElement
  123. for i := 0; i < len(paths); i++ {
  124. elements = append(elements, trieElement{
  125. path: paths[i],
  126. hash: nodes[i],
  127. syncPath: NewSyncPath([]byte(paths[i])),
  128. })
  129. }
  130. for len(elements) > 0 {
  131. results := make([]NodeSyncResult, len(elements))
  132. if !bypath {
  133. for i, element := range elements {
  134. data, err := srcDb.Node(element.hash)
  135. if err != nil {
  136. t.Fatalf("failed to retrieve node data for hash %x: %v", element.hash, err)
  137. }
  138. results[i] = NodeSyncResult{element.path, data}
  139. }
  140. } else {
  141. for i, element := range elements {
  142. data, _, err := srcTrie.TryGetNode(element.syncPath[len(element.syncPath)-1])
  143. if err != nil {
  144. t.Fatalf("failed to retrieve node data for path %x: %v", element.path, err)
  145. }
  146. results[i] = NodeSyncResult{element.path, data}
  147. }
  148. }
  149. for _, result := range results {
  150. if err := sched.ProcessNode(result); err != nil {
  151. t.Fatalf("failed to process result %v", err)
  152. }
  153. }
  154. batch := diskdb.NewBatch()
  155. if err := sched.Commit(batch); err != nil {
  156. t.Fatalf("failed to commit data: %v", err)
  157. }
  158. batch.Write()
  159. paths, nodes, _ = sched.Missing(count)
  160. elements = elements[:0]
  161. for i := 0; i < len(paths); i++ {
  162. elements = append(elements, trieElement{
  163. path: paths[i],
  164. hash: nodes[i],
  165. syncPath: NewSyncPath([]byte(paths[i])),
  166. })
  167. }
  168. }
  169. // Cross check that the two tries are in sync
  170. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  171. }
  172. // Tests that the trie scheduler can correctly reconstruct the state even if only
  173. // partial results are returned, and the others sent only later.
  174. func TestIterativeDelayedSync(t *testing.T) {
  175. // Create a random trie to copy
  176. srcDb, srcTrie, srcData := makeTestTrie()
  177. // Create a destination trie and sync with the scheduler
  178. diskdb := memorydb.New()
  179. triedb := NewDatabase(diskdb)
  180. sched := NewSync(srcTrie.Hash(), diskdb, nil)
  181. // The code requests are ignored here since there is no code
  182. // at the testing trie.
  183. paths, nodes, _ := sched.Missing(10000)
  184. var elements []trieElement
  185. for i := 0; i < len(paths); i++ {
  186. elements = append(elements, trieElement{
  187. path: paths[i],
  188. hash: nodes[i],
  189. syncPath: NewSyncPath([]byte(paths[i])),
  190. })
  191. }
  192. for len(elements) > 0 {
  193. // Sync only half of the scheduled nodes
  194. results := make([]NodeSyncResult, len(elements)/2+1)
  195. for i, element := range elements[:len(results)] {
  196. data, err := srcDb.Node(element.hash)
  197. if err != nil {
  198. t.Fatalf("failed to retrieve node data for %x: %v", element.hash, err)
  199. }
  200. results[i] = NodeSyncResult{element.path, data}
  201. }
  202. for _, result := range results {
  203. if err := sched.ProcessNode(result); err != nil {
  204. t.Fatalf("failed to process result %v", err)
  205. }
  206. }
  207. batch := diskdb.NewBatch()
  208. if err := sched.Commit(batch); err != nil {
  209. t.Fatalf("failed to commit data: %v", err)
  210. }
  211. batch.Write()
  212. paths, nodes, _ = sched.Missing(10000)
  213. elements = elements[len(results):]
  214. for i := 0; i < len(paths); i++ {
  215. elements = append(elements, trieElement{
  216. path: paths[i],
  217. hash: nodes[i],
  218. syncPath: NewSyncPath([]byte(paths[i])),
  219. })
  220. }
  221. }
  222. // Cross check that the two tries are in sync
  223. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  224. }
  225. // Tests that given a root hash, a trie can sync iteratively on a single thread,
  226. // requesting retrieval tasks and returning all of them in one go, however in a
  227. // random order.
  228. func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) }
  229. func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) }
  230. func testIterativeRandomSync(t *testing.T, count int) {
  231. // Create a random trie to copy
  232. srcDb, srcTrie, srcData := makeTestTrie()
  233. // Create a destination trie and sync with the scheduler
  234. diskdb := memorydb.New()
  235. triedb := NewDatabase(diskdb)
  236. sched := NewSync(srcTrie.Hash(), diskdb, nil)
  237. // The code requests are ignored here since there is no code
  238. // at the testing trie.
  239. paths, nodes, _ := sched.Missing(count)
  240. queue := make(map[string]trieElement)
  241. for i, path := range paths {
  242. queue[path] = trieElement{
  243. path: paths[i],
  244. hash: nodes[i],
  245. syncPath: NewSyncPath([]byte(paths[i])),
  246. }
  247. }
  248. for len(queue) > 0 {
  249. // Fetch all the queued nodes in a random order
  250. results := make([]NodeSyncResult, 0, len(queue))
  251. for path, element := range queue {
  252. data, err := srcDb.Node(element.hash)
  253. if err != nil {
  254. t.Fatalf("failed to retrieve node data for %x: %v", element.hash, err)
  255. }
  256. results = append(results, NodeSyncResult{path, data})
  257. }
  258. // Feed the retrieved results back and queue new tasks
  259. for _, result := range results {
  260. if err := sched.ProcessNode(result); err != nil {
  261. t.Fatalf("failed to process result %v", err)
  262. }
  263. }
  264. batch := diskdb.NewBatch()
  265. if err := sched.Commit(batch); err != nil {
  266. t.Fatalf("failed to commit data: %v", err)
  267. }
  268. batch.Write()
  269. paths, nodes, _ = sched.Missing(count)
  270. queue = make(map[string]trieElement)
  271. for i, path := range paths {
  272. queue[path] = trieElement{
  273. path: path,
  274. hash: nodes[i],
  275. syncPath: NewSyncPath([]byte(path)),
  276. }
  277. }
  278. }
  279. // Cross check that the two tries are in sync
  280. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  281. }
  282. // Tests that the trie scheduler can correctly reconstruct the state even if only
  283. // partial results are returned (Even those randomly), others sent only later.
  284. func TestIterativeRandomDelayedSync(t *testing.T) {
  285. // Create a random trie to copy
  286. srcDb, srcTrie, srcData := makeTestTrie()
  287. // Create a destination trie and sync with the scheduler
  288. diskdb := memorydb.New()
  289. triedb := NewDatabase(diskdb)
  290. sched := NewSync(srcTrie.Hash(), diskdb, nil)
  291. // The code requests are ignored here since there is no code
  292. // at the testing trie.
  293. paths, nodes, _ := sched.Missing(10000)
  294. queue := make(map[string]trieElement)
  295. for i, path := range paths {
  296. queue[path] = trieElement{
  297. path: path,
  298. hash: nodes[i],
  299. syncPath: NewSyncPath([]byte(path)),
  300. }
  301. }
  302. for len(queue) > 0 {
  303. // Sync only half of the scheduled nodes, even those in random order
  304. results := make([]NodeSyncResult, 0, len(queue)/2+1)
  305. for path, element := range queue {
  306. data, err := srcDb.Node(element.hash)
  307. if err != nil {
  308. t.Fatalf("failed to retrieve node data for %x: %v", element.hash, err)
  309. }
  310. results = append(results, NodeSyncResult{path, data})
  311. if len(results) >= cap(results) {
  312. break
  313. }
  314. }
  315. // Feed the retrieved results back and queue new tasks
  316. for _, result := range results {
  317. if err := sched.ProcessNode(result); err != nil {
  318. t.Fatalf("failed to process result %v", err)
  319. }
  320. }
  321. batch := diskdb.NewBatch()
  322. if err := sched.Commit(batch); err != nil {
  323. t.Fatalf("failed to commit data: %v", err)
  324. }
  325. batch.Write()
  326. for _, result := range results {
  327. delete(queue, result.Path)
  328. }
  329. paths, nodes, _ = sched.Missing(10000)
  330. for i, path := range paths {
  331. queue[path] = trieElement{
  332. path: path,
  333. hash: nodes[i],
  334. syncPath: NewSyncPath([]byte(path)),
  335. }
  336. }
  337. }
  338. // Cross check that the two tries are in sync
  339. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  340. }
  341. // Tests that a trie sync will not request nodes multiple times, even if they
  342. // have such references.
  343. func TestDuplicateAvoidanceSync(t *testing.T) {
  344. // Create a random trie to copy
  345. srcDb, srcTrie, srcData := makeTestTrie()
  346. // Create a destination trie and sync with the scheduler
  347. diskdb := memorydb.New()
  348. triedb := NewDatabase(diskdb)
  349. sched := NewSync(srcTrie.Hash(), diskdb, nil)
  350. // The code requests are ignored here since there is no code
  351. // at the testing trie.
  352. paths, nodes, _ := sched.Missing(0)
  353. var elements []trieElement
  354. for i := 0; i < len(paths); i++ {
  355. elements = append(elements, trieElement{
  356. path: paths[i],
  357. hash: nodes[i],
  358. syncPath: NewSyncPath([]byte(paths[i])),
  359. })
  360. }
  361. requested := make(map[common.Hash]struct{})
  362. for len(elements) > 0 {
  363. results := make([]NodeSyncResult, len(elements))
  364. for i, element := range elements {
  365. data, err := srcDb.Node(element.hash)
  366. if err != nil {
  367. t.Fatalf("failed to retrieve node data for %x: %v", element.hash, err)
  368. }
  369. if _, ok := requested[element.hash]; ok {
  370. t.Errorf("hash %x already requested once", element.hash)
  371. }
  372. requested[element.hash] = struct{}{}
  373. results[i] = NodeSyncResult{element.path, data}
  374. }
  375. for _, result := range results {
  376. if err := sched.ProcessNode(result); err != nil {
  377. t.Fatalf("failed to process result %v", err)
  378. }
  379. }
  380. batch := diskdb.NewBatch()
  381. if err := sched.Commit(batch); err != nil {
  382. t.Fatalf("failed to commit data: %v", err)
  383. }
  384. batch.Write()
  385. paths, nodes, _ = sched.Missing(0)
  386. elements = elements[:0]
  387. for i := 0; i < len(paths); i++ {
  388. elements = append(elements, trieElement{
  389. path: paths[i],
  390. hash: nodes[i],
  391. syncPath: NewSyncPath([]byte(paths[i])),
  392. })
  393. }
  394. }
  395. // Cross check that the two tries are in sync
  396. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  397. }
  398. // Tests that at any point in time during a sync, only complete sub-tries are in
  399. // the database.
  400. func TestIncompleteSync(t *testing.T) {
  401. // Create a random trie to copy
  402. srcDb, srcTrie, _ := makeTestTrie()
  403. // Create a destination trie and sync with the scheduler
  404. diskdb := memorydb.New()
  405. triedb := NewDatabase(diskdb)
  406. sched := NewSync(srcTrie.Hash(), diskdb, nil)
  407. // The code requests are ignored here since there is no code
  408. // at the testing trie.
  409. var (
  410. added []common.Hash
  411. elements []trieElement
  412. root = srcTrie.Hash()
  413. )
  414. paths, nodes, _ := sched.Missing(1)
  415. for i := 0; i < len(paths); i++ {
  416. elements = append(elements, trieElement{
  417. path: paths[i],
  418. hash: nodes[i],
  419. syncPath: NewSyncPath([]byte(paths[i])),
  420. })
  421. }
  422. for len(elements) > 0 {
  423. // Fetch a batch of trie nodes
  424. results := make([]NodeSyncResult, len(elements))
  425. for i, element := range elements {
  426. data, err := srcDb.Node(element.hash)
  427. if err != nil {
  428. t.Fatalf("failed to retrieve node data for %x: %v", element.hash, err)
  429. }
  430. results[i] = NodeSyncResult{element.path, data}
  431. }
  432. // Process each of the trie nodes
  433. for _, result := range results {
  434. if err := sched.ProcessNode(result); err != nil {
  435. t.Fatalf("failed to process result %v", err)
  436. }
  437. }
  438. batch := diskdb.NewBatch()
  439. if err := sched.Commit(batch); err != nil {
  440. t.Fatalf("failed to commit data: %v", err)
  441. }
  442. batch.Write()
  443. for _, result := range results {
  444. hash := crypto.Keccak256Hash(result.Data)
  445. if hash != root {
  446. added = append(added, hash)
  447. }
  448. // Check that all known sub-tries in the synced trie are complete
  449. if err := checkTrieConsistency(triedb, hash); err != nil {
  450. t.Fatalf("trie inconsistent: %v", err)
  451. }
  452. }
  453. // Fetch the next batch to retrieve
  454. paths, nodes, _ = sched.Missing(1)
  455. elements = elements[:0]
  456. for i := 0; i < len(paths); i++ {
  457. elements = append(elements, trieElement{
  458. path: paths[i],
  459. hash: nodes[i],
  460. syncPath: NewSyncPath([]byte(paths[i])),
  461. })
  462. }
  463. }
  464. // Sanity check that removing any node from the database is detected
  465. for _, hash := range added {
  466. value, _ := diskdb.Get(hash.Bytes())
  467. diskdb.Delete(hash.Bytes())
  468. if err := checkTrieConsistency(triedb, root); err == nil {
  469. t.Fatalf("trie inconsistency not caught, missing: %x", hash)
  470. }
  471. diskdb.Put(hash.Bytes(), value)
  472. }
  473. }
  474. // Tests that trie nodes get scheduled lexicographically when having the same
  475. // depth.
  476. func TestSyncOrdering(t *testing.T) {
  477. // Create a random trie to copy
  478. srcDb, srcTrie, srcData := makeTestTrie()
  479. // Create a destination trie and sync with the scheduler, tracking the requests
  480. diskdb := memorydb.New()
  481. triedb := NewDatabase(diskdb)
  482. sched := NewSync(srcTrie.Hash(), diskdb, nil)
  483. // The code requests are ignored here since there is no code
  484. // at the testing trie.
  485. var (
  486. reqs []SyncPath
  487. elements []trieElement
  488. )
  489. paths, nodes, _ := sched.Missing(1)
  490. for i := 0; i < len(paths); i++ {
  491. elements = append(elements, trieElement{
  492. path: paths[i],
  493. hash: nodes[i],
  494. syncPath: NewSyncPath([]byte(paths[i])),
  495. })
  496. reqs = append(reqs, NewSyncPath([]byte(paths[i])))
  497. }
  498. for len(elements) > 0 {
  499. results := make([]NodeSyncResult, len(elements))
  500. for i, element := range elements {
  501. data, err := srcDb.Node(element.hash)
  502. if err != nil {
  503. t.Fatalf("failed to retrieve node data for %x: %v", element.hash, err)
  504. }
  505. results[i] = NodeSyncResult{element.path, data}
  506. }
  507. for _, result := range results {
  508. if err := sched.ProcessNode(result); err != nil {
  509. t.Fatalf("failed to process result %v", err)
  510. }
  511. }
  512. batch := diskdb.NewBatch()
  513. if err := sched.Commit(batch); err != nil {
  514. t.Fatalf("failed to commit data: %v", err)
  515. }
  516. batch.Write()
  517. paths, nodes, _ = sched.Missing(1)
  518. elements = elements[:0]
  519. for i := 0; i < len(paths); i++ {
  520. elements = append(elements, trieElement{
  521. path: paths[i],
  522. hash: nodes[i],
  523. syncPath: NewSyncPath([]byte(paths[i])),
  524. })
  525. reqs = append(reqs, NewSyncPath([]byte(paths[i])))
  526. }
  527. }
  528. // Cross check that the two tries are in sync
  529. checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
  530. // Check that the trie nodes have been requested path-ordered
  531. for i := 0; i < len(reqs)-1; i++ {
  532. if len(reqs[i]) > 1 || len(reqs[i+1]) > 1 {
  533. // In the case of the trie tests, there's no storage so the tuples
  534. // must always be single items. 2-tuples should be tested in state.
  535. t.Errorf("Invalid request tuples: len(%v) or len(%v) > 1", reqs[i], reqs[i+1])
  536. }
  537. if bytes.Compare(compactToHex(reqs[i][0]), compactToHex(reqs[i+1][0])) > 0 {
  538. t.Errorf("Invalid request order: %v before %v", compactToHex(reqs[i][0]), compactToHex(reqs[i+1][0]))
  539. }
  540. }
  541. }