snapshot_sync_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "fmt"
  21. "io"
  22. "os"
  23. "sync"
  24. "testing"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/node"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/p2p/discover"
  31. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  32. "github.com/ethereum/go-ethereum/swarm/network"
  33. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  34. "github.com/ethereum/go-ethereum/swarm/pot"
  35. "github.com/ethereum/go-ethereum/swarm/state"
  36. "github.com/ethereum/go-ethereum/swarm/storage"
  37. mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
  38. )
  39. const testMinProxBinSize = 2
  40. const MaxTimeout = 600
  41. type synctestConfig struct {
  42. addrs [][]byte
  43. hashes []storage.Address
  44. idToChunksMap map[discover.NodeID][]int
  45. chunksToNodesMap map[string][]int
  46. addrToIDMap map[string]discover.NodeID
  47. }
  48. //This test is a syncing test for nodes.
  49. //One node is randomly selected to be the pivot node.
  50. //A configurable number of chunks and nodes can be
  51. //provided to the test, the number of chunks is uploaded
  52. //to the pivot node, and we check that nodes get the chunks
  53. //they are expected to store based on the syncing protocol.
  54. //Number of chunks and nodes can be provided via commandline too.
  55. func TestSyncingViaGlobalSync(t *testing.T) {
  56. //if nodes/chunks have been provided via commandline,
  57. //run the tests with these values
  58. if *nodes != 0 && *chunks != 0 {
  59. log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
  60. testSyncingViaGlobalSync(t, *chunks, *nodes)
  61. } else {
  62. var nodeCnt []int
  63. var chnkCnt []int
  64. //if the `longrunning` flag has been provided
  65. //run more test combinations
  66. if *longrunning {
  67. chnkCnt = []int{1, 8, 32, 256, 1024}
  68. nodeCnt = []int{16, 32, 64, 128, 256}
  69. } else {
  70. //default test
  71. chnkCnt = []int{4, 32}
  72. nodeCnt = []int{32, 16}
  73. }
  74. for _, chnk := range chnkCnt {
  75. for _, n := range nodeCnt {
  76. log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
  77. testSyncingViaGlobalSync(t, chnk, n)
  78. }
  79. }
  80. }
  81. }
  82. func TestSyncingViaDirectSubscribe(t *testing.T) {
  83. //if nodes/chunks have been provided via commandline,
  84. //run the tests with these values
  85. if *nodes != 0 && *chunks != 0 {
  86. log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
  87. err := testSyncingViaDirectSubscribe(*chunks, *nodes)
  88. if err != nil {
  89. t.Fatal(err)
  90. }
  91. } else {
  92. var nodeCnt []int
  93. var chnkCnt []int
  94. //if the `longrunning` flag has been provided
  95. //run more test combinations
  96. if *longrunning {
  97. chnkCnt = []int{1, 8, 32, 256, 1024}
  98. nodeCnt = []int{32, 16}
  99. } else {
  100. //default test
  101. chnkCnt = []int{4, 32}
  102. nodeCnt = []int{32, 16}
  103. }
  104. for _, chnk := range chnkCnt {
  105. for _, n := range nodeCnt {
  106. log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
  107. err := testSyncingViaDirectSubscribe(chnk, n)
  108. if err != nil {
  109. t.Fatal(err)
  110. }
  111. }
  112. }
  113. }
  114. }
  115. func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
  116. sim := simulation.New(map[string]simulation.ServiceFunc{
  117. "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  118. id := ctx.Config.ID
  119. addr := network.NewAddrFromNodeID(id)
  120. store, datadir, err := createTestLocalStorageForID(id, addr)
  121. if err != nil {
  122. return nil, nil, err
  123. }
  124. bucket.Store(bucketKeyStore, store)
  125. cleanup = func() {
  126. os.RemoveAll(datadir)
  127. store.Close()
  128. }
  129. localStore := store.(*storage.LocalStore)
  130. db := storage.NewDBAPI(localStore)
  131. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  132. delivery := NewDelivery(kad, db)
  133. r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
  134. DoSync: true,
  135. SyncUpdateDelay: 3 * time.Second,
  136. })
  137. bucket.Store(bucketKeyRegistry, r)
  138. return r, cleanup, nil
  139. },
  140. })
  141. defer sim.Close()
  142. log.Info("Initializing test config")
  143. conf := &synctestConfig{}
  144. //map of discover ID to indexes of chunks expected at that ID
  145. conf.idToChunksMap = make(map[discover.NodeID][]int)
  146. //map of overlay address to discover ID
  147. conf.addrToIDMap = make(map[string]discover.NodeID)
  148. //array where the generated chunk hashes will be stored
  149. conf.hashes = make([]storage.Address, 0)
  150. err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
  155. defer cancelSimRun()
  156. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
  157. nodeIDs := sim.UpNodeIDs()
  158. for _, n := range nodeIDs {
  159. //get the kademlia overlay address from this ID
  160. a := network.ToOverlayAddr(n.Bytes())
  161. //append it to the array of all overlay addresses
  162. conf.addrs = append(conf.addrs, a)
  163. //the proximity calculation is on overlay addr,
  164. //the p2p/simulations check func triggers on discover.NodeID,
  165. //so we need to know which overlay addr maps to which nodeID
  166. conf.addrToIDMap[string(a)] = n
  167. }
  168. //get the node at that index
  169. //this is the node selected for upload
  170. node := sim.RandomUpNode()
  171. item, ok := sim.NodeItem(node.ID, bucketKeyStore)
  172. if !ok {
  173. return fmt.Errorf("No localstore")
  174. }
  175. lstore := item.(*storage.LocalStore)
  176. hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
  177. if err != nil {
  178. return err
  179. }
  180. conf.hashes = append(conf.hashes, hashes...)
  181. mapKeysToNodes(conf)
  182. if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
  183. return err
  184. }
  185. // File retrieval check is repeated until all uploaded files are retrieved from all nodes
  186. // or until the timeout is reached.
  187. allSuccess := false
  188. var gDir string
  189. var globalStore *mockdb.GlobalStore
  190. if *useMockStore {
  191. gDir, globalStore, err = createGlobalStore()
  192. if err != nil {
  193. return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
  194. }
  195. defer func() {
  196. os.RemoveAll(gDir)
  197. err := globalStore.Close()
  198. if err != nil {
  199. log.Error("Error closing global store! %v", "err", err)
  200. }
  201. }()
  202. }
  203. for !allSuccess {
  204. for _, id := range nodeIDs {
  205. //for each expected chunk, check if it is in the local store
  206. localChunks := conf.idToChunksMap[id]
  207. localSuccess := true
  208. for _, ch := range localChunks {
  209. //get the real chunk by the index in the index array
  210. chunk := conf.hashes[ch]
  211. log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
  212. //check if the expected chunk is indeed in the localstore
  213. var err error
  214. if *useMockStore {
  215. //use the globalStore if the mockStore should be used; in that case,
  216. //the complete localStore stack is bypassed for getting the chunk
  217. _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
  218. } else {
  219. //use the actual localstore
  220. item, ok := sim.NodeItem(id, bucketKeyStore)
  221. if !ok {
  222. return fmt.Errorf("Error accessing localstore")
  223. }
  224. lstore := item.(*storage.LocalStore)
  225. _, err = lstore.Get(ctx, chunk)
  226. }
  227. if err != nil {
  228. log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
  229. localSuccess = false
  230. // Do not get crazy with logging the warn message
  231. time.Sleep(500 * time.Millisecond)
  232. } else {
  233. log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
  234. }
  235. }
  236. allSuccess = localSuccess
  237. }
  238. }
  239. if !allSuccess {
  240. return fmt.Errorf("Not all chunks succeeded!")
  241. }
  242. return nil
  243. })
  244. if result.Error != nil {
  245. t.Fatal(result.Error)
  246. }
  247. }
  248. /*
  249. The test generates the given number of chunks
  250. For every chunk generated, the nearest node addresses
  251. are identified, we verify that the nodes closer to the
  252. chunk addresses actually do have the chunks in their local stores.
  253. The test loads a snapshot file to construct the swarm network,
  254. assuming that the snapshot file identifies a healthy
  255. kademlia network. The snapshot should have 'streamer' in its service list.
  256. */
  257. func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
  258. sim := simulation.New(map[string]simulation.ServiceFunc{
  259. "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  260. id := ctx.Config.ID
  261. addr := network.NewAddrFromNodeID(id)
  262. store, datadir, err := createTestLocalStorageForID(id, addr)
  263. if err != nil {
  264. return nil, nil, err
  265. }
  266. bucket.Store(bucketKeyStore, store)
  267. cleanup = func() {
  268. os.RemoveAll(datadir)
  269. store.Close()
  270. }
  271. localStore := store.(*storage.LocalStore)
  272. db := storage.NewDBAPI(localStore)
  273. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  274. delivery := NewDelivery(kad, db)
  275. r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
  276. bucket.Store(bucketKeyRegistry, r)
  277. fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
  278. bucket.Store(bucketKeyFileStore, fileStore)
  279. return r, cleanup, nil
  280. },
  281. })
  282. defer sim.Close()
  283. ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
  284. defer cancelSimRun()
  285. conf := &synctestConfig{}
  286. //map of discover ID to indexes of chunks expected at that ID
  287. conf.idToChunksMap = make(map[discover.NodeID][]int)
  288. //map of overlay address to discover ID
  289. conf.addrToIDMap = make(map[string]discover.NodeID)
  290. //array where the generated chunk hashes will be stored
  291. conf.hashes = make([]storage.Address, 0)
  292. err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
  293. if err != nil {
  294. return err
  295. }
  296. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
  297. nodeIDs := sim.UpNodeIDs()
  298. for _, n := range nodeIDs {
  299. //get the kademlia overlay address from this ID
  300. a := network.ToOverlayAddr(n.Bytes())
  301. //append it to the array of all overlay addresses
  302. conf.addrs = append(conf.addrs, a)
  303. //the proximity calculation is on overlay addr,
  304. //the p2p/simulations check func triggers on discover.NodeID,
  305. //so we need to know which overlay addr maps to which nodeID
  306. conf.addrToIDMap[string(a)] = n
  307. }
  308. var subscriptionCount int
  309. filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4)
  310. eventC := sim.PeerEvents(ctx, nodeIDs, filter)
  311. for j, node := range nodeIDs {
  312. log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
  313. //start syncing!
  314. item, ok := sim.NodeItem(node, bucketKeyRegistry)
  315. if !ok {
  316. return fmt.Errorf("No registry")
  317. }
  318. registry := item.(*Registry)
  319. var cnt int
  320. cnt, err = startSyncing(registry, conf)
  321. if err != nil {
  322. return err
  323. }
  324. //increment the number of subscriptions we need to wait for
  325. //by the count returned from startSyncing (SYNC subscriptions)
  326. subscriptionCount += cnt
  327. }
  328. for e := range eventC {
  329. if e.Error != nil {
  330. return e.Error
  331. }
  332. subscriptionCount--
  333. if subscriptionCount == 0 {
  334. break
  335. }
  336. }
  337. //select a random node for upload
  338. node := sim.RandomUpNode()
  339. item, ok := sim.NodeItem(node.ID, bucketKeyStore)
  340. if !ok {
  341. return fmt.Errorf("No localstore")
  342. }
  343. lstore := item.(*storage.LocalStore)
  344. hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore)
  345. if err != nil {
  346. return err
  347. }
  348. conf.hashes = append(conf.hashes, hashes...)
  349. mapKeysToNodes(conf)
  350. if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
  351. return err
  352. }
  353. var gDir string
  354. var globalStore *mockdb.GlobalStore
  355. if *useMockStore {
  356. gDir, globalStore, err = createGlobalStore()
  357. if err != nil {
  358. return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
  359. }
  360. defer os.RemoveAll(gDir)
  361. }
  362. // File retrieval check is repeated until all uploaded files are retrieved from all nodes
  363. // or until the timeout is reached.
  364. allSuccess := false
  365. for !allSuccess {
  366. for _, id := range nodeIDs {
  367. //for each expected chunk, check if it is in the local store
  368. localChunks := conf.idToChunksMap[id]
  369. localSuccess := true
  370. for _, ch := range localChunks {
  371. //get the real chunk by the index in the index array
  372. chunk := conf.hashes[ch]
  373. log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
  374. //check if the expected chunk is indeed in the localstore
  375. var err error
  376. if *useMockStore {
  377. //use the globalStore if the mockStore should be used; in that case,
  378. //the complete localStore stack is bypassed for getting the chunk
  379. _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
  380. } else {
  381. //use the actual localstore
  382. item, ok := sim.NodeItem(id, bucketKeyStore)
  383. if !ok {
  384. return fmt.Errorf("Error accessing localstore")
  385. }
  386. lstore := item.(*storage.LocalStore)
  387. _, err = lstore.Get(ctx, chunk)
  388. }
  389. if err != nil {
  390. log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
  391. localSuccess = false
  392. // Do not get crazy with logging the warn message
  393. time.Sleep(500 * time.Millisecond)
  394. } else {
  395. log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
  396. }
  397. }
  398. allSuccess = localSuccess
  399. }
  400. }
  401. if !allSuccess {
  402. return fmt.Errorf("Not all chunks succeeded!")
  403. }
  404. return nil
  405. })
  406. if result.Error != nil {
  407. return result.Error
  408. }
  409. log.Info("Simulation terminated")
  410. return nil
  411. }
  412. //the server func to start syncing
  413. //issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
  414. //the kademlia's `EachBin` function.
  415. //returns the number of subscriptions requested
  416. func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
  417. var err error
  418. kad, ok := r.delivery.overlay.(*network.Kademlia)
  419. if !ok {
  420. return 0, fmt.Errorf("Not a Kademlia!")
  421. }
  422. subCnt := 0
  423. //iterate over each bin and solicit needed subscription to bins
  424. kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool {
  425. //identify begin and start index of the bin(s) we want to subscribe to
  426. histRange := &Range{}
  427. subCnt++
  428. err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top)
  429. if err != nil {
  430. log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
  431. return false
  432. }
  433. return true
  434. })
  435. return subCnt, nil
  436. }
  437. //map chunk keys to addresses which are responsible
  438. func mapKeysToNodes(conf *synctestConfig) {
  439. kmap := make(map[string][]int)
  440. nodemap := make(map[string][]int)
  441. //build a pot for chunk hashes
  442. np := pot.NewPot(nil, 0)
  443. indexmap := make(map[string]int)
  444. for i, a := range conf.addrs {
  445. indexmap[string(a)] = i
  446. np, _, _ = pot.Add(np, a, pof)
  447. }
  448. //for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes
  449. log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes))
  450. for i := 0; i < len(conf.hashes); i++ {
  451. pl := 256 //highest possible proximity
  452. var nns []int
  453. np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool {
  454. a := val.([]byte)
  455. if pl < 256 && pl != po {
  456. return false
  457. }
  458. if pl == 256 || pl == po {
  459. log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)]))
  460. nns = append(nns, indexmap[string(a)])
  461. nodemap[string(a)] = append(nodemap[string(a)], i)
  462. }
  463. if pl == 256 && len(nns) >= testMinProxBinSize {
  464. //maxProxBinSize has been reached at this po, so save it
  465. //we will add all other nodes at the same po
  466. pl = po
  467. }
  468. return true
  469. })
  470. kmap[string(conf.hashes[i])] = nns
  471. }
  472. for addr, chunks := range nodemap {
  473. //this selects which chunks are expected to be found with the given node
  474. conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
  475. }
  476. log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
  477. conf.chunksToNodesMap = kmap
  478. }
  479. //upload a file(chunks) to a single local node store
  480. func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) {
  481. log.Debug(fmt.Sprintf("Uploading to node id: %s", id))
  482. fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
  483. size := chunkSize
  484. var rootAddrs []storage.Address
  485. for i := 0; i < chunkCount; i++ {
  486. rk, wait, err := fileStore.Store(context.TODO(), io.LimitReader(crand.Reader, int64(size)), int64(size), false)
  487. if err != nil {
  488. return nil, err
  489. }
  490. err = wait(context.TODO())
  491. if err != nil {
  492. return nil, err
  493. }
  494. rootAddrs = append(rootAddrs, (rk))
  495. }
  496. return rootAddrs, nil
  497. }