syncer_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "fmt"
  21. "io"
  22. "io/ioutil"
  23. "math"
  24. "os"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. "github.com/ethereum/go-ethereum/node"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/p2p/enode"
  32. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  33. "github.com/ethereum/go-ethereum/swarm/log"
  34. "github.com/ethereum/go-ethereum/swarm/network"
  35. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  36. "github.com/ethereum/go-ethereum/swarm/state"
  37. "github.com/ethereum/go-ethereum/swarm/storage"
  38. mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
  39. )
  40. const dataChunkCount = 200
  41. func TestSyncerSimulation(t *testing.T) {
  42. testSyncBetweenNodes(t, 2, 1, dataChunkCount, true, 1)
  43. testSyncBetweenNodes(t, 4, 1, dataChunkCount, true, 1)
  44. testSyncBetweenNodes(t, 8, 1, dataChunkCount, true, 1)
  45. testSyncBetweenNodes(t, 16, 1, dataChunkCount, true, 1)
  46. }
  47. func createMockStore(globalStore *mockdb.GlobalStore, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) {
  48. address := common.BytesToAddress(id.Bytes())
  49. mockStore := globalStore.NewNodeStore(address)
  50. params := storage.NewDefaultLocalStoreParams()
  51. datadir, err = ioutil.TempDir("", "localMockStore-"+id.TerminalString())
  52. if err != nil {
  53. return nil, "", err
  54. }
  55. params.Init(datadir)
  56. params.BaseKey = addr.Over()
  57. lstore, err = storage.NewLocalStore(params, mockStore)
  58. return lstore, datadir, nil
  59. }
  60. func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool, po uint8) {
  61. sim := simulation.New(map[string]simulation.ServiceFunc{
  62. "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  63. var store storage.ChunkStore
  64. var globalStore *mockdb.GlobalStore
  65. var gDir, datadir string
  66. node := ctx.Config.Node()
  67. addr := network.NewAddr(node)
  68. //hack to put addresses in same space
  69. addr.OAddr[0] = byte(0)
  70. if *useMockStore {
  71. gDir, globalStore, err = createGlobalStore()
  72. if err != nil {
  73. return nil, nil, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil")
  74. }
  75. store, datadir, err = createMockStore(globalStore, node.ID(), addr)
  76. } else {
  77. store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
  78. }
  79. if err != nil {
  80. return nil, nil, err
  81. }
  82. bucket.Store(bucketKeyStore, store)
  83. cleanup = func() {
  84. store.Close()
  85. os.RemoveAll(datadir)
  86. if *useMockStore {
  87. err := globalStore.Close()
  88. if err != nil {
  89. log.Error("Error closing global store! %v", "err", err)
  90. }
  91. os.RemoveAll(gDir)
  92. }
  93. }
  94. localStore := store.(*storage.LocalStore)
  95. netStore, err := storage.NewNetStore(localStore, nil)
  96. if err != nil {
  97. return nil, nil, err
  98. }
  99. bucket.Store(bucketKeyDB, netStore)
  100. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  101. delivery := NewDelivery(kad, netStore)
  102. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  103. bucket.Store(bucketKeyDelivery, delivery)
  104. r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
  105. SkipCheck: skipCheck,
  106. })
  107. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
  108. bucket.Store(bucketKeyFileStore, fileStore)
  109. return r, cleanup, nil
  110. },
  111. })
  112. defer sim.Close()
  113. // create context for simulation run
  114. timeout := 30 * time.Second
  115. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  116. // defer cancel should come before defer simulation teardown
  117. defer cancel()
  118. _, err := sim.AddNodesAndConnectChain(nodes)
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
  123. nodeIDs := sim.UpNodeIDs()
  124. nodeIndex := make(map[enode.ID]int)
  125. for i, id := range nodeIDs {
  126. nodeIndex[id] = i
  127. }
  128. disconnections := sim.PeerEvents(
  129. context.Background(),
  130. sim.NodeIDs(),
  131. simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
  132. )
  133. go func() {
  134. for d := range disconnections {
  135. if d.Error != nil {
  136. log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
  137. t.Fatal(d.Error)
  138. }
  139. }
  140. }()
  141. // each node Subscribes to each other's swarmChunkServerStreamName
  142. for j := 0; j < nodes-1; j++ {
  143. id := nodeIDs[j]
  144. client, err := sim.Net.GetNode(id).Client()
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. sid := nodeIDs[j+1]
  149. client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
  150. if err != nil {
  151. return err
  152. }
  153. if j > 0 || nodes == 2 {
  154. item, ok := sim.NodeItem(nodeIDs[j], bucketKeyFileStore)
  155. if !ok {
  156. return fmt.Errorf("No filestore")
  157. }
  158. fileStore := item.(*storage.FileStore)
  159. size := chunkCount * chunkSize
  160. _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false)
  161. if err != nil {
  162. t.Fatal(err.Error())
  163. }
  164. wait(ctx)
  165. }
  166. }
  167. // here we distribute chunks of a random file into stores 1...nodes
  168. if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
  169. return err
  170. }
  171. // collect hashes in po 1 bin for each node
  172. hashes := make([][]storage.Address, nodes)
  173. totalHashes := 0
  174. hashCounts := make([]int, nodes)
  175. for i := nodes - 1; i >= 0; i-- {
  176. if i < nodes-1 {
  177. hashCounts[i] = hashCounts[i+1]
  178. }
  179. item, ok := sim.NodeItem(nodeIDs[i], bucketKeyDB)
  180. if !ok {
  181. return fmt.Errorf("No DB")
  182. }
  183. netStore := item.(*storage.NetStore)
  184. netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
  185. hashes[i] = append(hashes[i], addr)
  186. totalHashes++
  187. hashCounts[i]++
  188. return true
  189. })
  190. }
  191. var total, found int
  192. for _, node := range nodeIDs {
  193. i := nodeIndex[node]
  194. for j := i; j < nodes; j++ {
  195. total += len(hashes[j])
  196. for _, key := range hashes[j] {
  197. item, ok := sim.NodeItem(nodeIDs[j], bucketKeyDB)
  198. if !ok {
  199. return fmt.Errorf("No DB")
  200. }
  201. db := item.(*storage.NetStore)
  202. _, err := db.Get(ctx, key)
  203. if err == nil {
  204. found++
  205. }
  206. }
  207. }
  208. log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)
  209. }
  210. if total == found && total > 0 {
  211. return nil
  212. }
  213. return fmt.Errorf("Total not equallying found: total is %d", total)
  214. })
  215. if result.Error != nil {
  216. t.Fatal(result.Error)
  217. }
  218. }