common_test.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. crand "crypto/rand"
  20. "errors"
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "math/rand"
  26. "os"
  27. "strings"
  28. "sync/atomic"
  29. "testing"
  30. "time"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/p2p/discover"
  33. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  34. "github.com/ethereum/go-ethereum/swarm/network"
  35. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  36. "github.com/ethereum/go-ethereum/swarm/pot"
  37. "github.com/ethereum/go-ethereum/swarm/state"
  38. "github.com/ethereum/go-ethereum/swarm/storage"
  39. mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
  40. colorable "github.com/mattn/go-colorable"
  41. )
  42. var (
  43. loglevel = flag.Int("loglevel", 2, "verbosity of logs")
  44. nodes = flag.Int("nodes", 0, "number of nodes")
  45. chunks = flag.Int("chunks", 0, "number of chunks")
  46. useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)")
  47. longrunning = flag.Bool("longrunning", false, "do run long-running tests")
  48. bucketKeyDB = simulation.BucketKey("db")
  49. bucketKeyStore = simulation.BucketKey("store")
  50. bucketKeyFileStore = simulation.BucketKey("filestore")
  51. bucketKeyNetStore = simulation.BucketKey("netstore")
  52. bucketKeyDelivery = simulation.BucketKey("delivery")
  53. bucketKeyRegistry = simulation.BucketKey("registry")
  54. chunkSize = 4096
  55. pof = pot.DefaultPof(256)
  56. )
  57. func init() {
  58. flag.Parse()
  59. rand.Seed(time.Now().UnixNano())
  60. log.PrintOrigins(true)
  61. log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
  62. }
  63. func createGlobalStore() (string, *mockdb.GlobalStore, error) {
  64. var globalStore *mockdb.GlobalStore
  65. globalStoreDir, err := ioutil.TempDir("", "global.store")
  66. if err != nil {
  67. log.Error("Error initiating global store temp directory!", "err", err)
  68. return "", nil, err
  69. }
  70. globalStore, err = mockdb.NewGlobalStore(globalStoreDir)
  71. if err != nil {
  72. log.Error("Error initiating global store!", "err", err)
  73. return "", nil, err
  74. }
  75. return globalStoreDir, globalStore, nil
  76. }
  77. func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
  78. // setup
  79. addr := network.RandomAddr() // tested peers peer address
  80. to := network.NewKademlia(addr.OAddr, network.NewKadParams())
  81. // temp datadir
  82. datadir, err := ioutil.TempDir("", "streamer")
  83. if err != nil {
  84. return nil, nil, nil, func() {}, err
  85. }
  86. removeDataDir := func() {
  87. os.RemoveAll(datadir)
  88. }
  89. params := storage.NewDefaultLocalStoreParams()
  90. params.Init(datadir)
  91. params.BaseKey = addr.Over()
  92. localStore, err := storage.NewTestLocalStoreForAddr(params)
  93. if err != nil {
  94. return nil, nil, nil, removeDataDir, err
  95. }
  96. netStore, err := storage.NewNetStore(localStore, nil)
  97. if err != nil {
  98. return nil, nil, nil, removeDataDir, err
  99. }
  100. delivery := NewDelivery(to, netStore)
  101. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  102. streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
  103. teardown := func() {
  104. streamer.Close()
  105. removeDataDir()
  106. }
  107. protocolTester := p2ptest.NewProtocolTester(t, network.NewNodeIDFromAddr(addr), 1, streamer.runProtocol)
  108. err = waitForPeers(streamer, 1*time.Second, 1)
  109. if err != nil {
  110. return nil, nil, nil, nil, errors.New("timeout: peer is not created")
  111. }
  112. return protocolTester, streamer, localStore, teardown, nil
  113. }
  114. func waitForPeers(streamer *Registry, timeout time.Duration, expectedPeers int) error {
  115. ticker := time.NewTicker(10 * time.Millisecond)
  116. timeoutTimer := time.NewTimer(timeout)
  117. for {
  118. select {
  119. case <-ticker.C:
  120. if streamer.peersCount() >= expectedPeers {
  121. return nil
  122. }
  123. case <-timeoutTimer.C:
  124. return errors.New("timeout")
  125. }
  126. }
  127. }
  128. type roundRobinStore struct {
  129. index uint32
  130. stores []storage.ChunkStore
  131. }
  132. func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
  133. return &roundRobinStore{
  134. stores: stores,
  135. }
  136. }
  137. func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
  138. return nil, errors.New("get not well defined on round robin store")
  139. }
  140. func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error {
  141. i := atomic.AddUint32(&rrs.index, 1)
  142. idx := int(i) % len(rrs.stores)
  143. return rrs.stores[idx].Put(ctx, chunk)
  144. }
  145. func (rrs *roundRobinStore) Close() {
  146. for _, store := range rrs.stores {
  147. store.Close()
  148. }
  149. }
  150. func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
  151. r, _ := fileStore.Retrieve(context.TODO(), hash)
  152. buf := make([]byte, 1024)
  153. var n int
  154. var total int64
  155. var err error
  156. for (total == 0 || n > 0) && err == nil {
  157. n, err = r.ReadAt(buf, total)
  158. total += int64(n)
  159. }
  160. if err != nil && err != io.EOF {
  161. return total, err
  162. }
  163. return total, nil
  164. }
  165. func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) {
  166. nodes := sim.UpNodeIDs()
  167. nodeCnt := len(nodes)
  168. log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
  169. //array holding generated files
  170. rfiles := make([]string, nodeCnt)
  171. //array holding the root hashes of the files
  172. rootAddrs := make([]storage.Address, nodeCnt)
  173. var err error
  174. //for every node, generate a file and upload
  175. for i, id := range nodes {
  176. item, ok := sim.NodeItem(id, bucketKeyFileStore)
  177. if !ok {
  178. return nil, nil, fmt.Errorf("Error accessing localstore")
  179. }
  180. fileStore := item.(*storage.FileStore)
  181. //generate a file
  182. rfiles[i], err = generateRandomFile()
  183. if err != nil {
  184. return nil, nil, err
  185. }
  186. //store it (upload it) on the FileStore
  187. ctx := context.TODO()
  188. rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
  189. log.Debug("Uploaded random string file to node")
  190. if err != nil {
  191. return nil, nil, err
  192. }
  193. err = wait(ctx)
  194. if err != nil {
  195. return nil, nil, err
  196. }
  197. rootAddrs[i] = rk
  198. }
  199. return rootAddrs, rfiles, nil
  200. }
  201. //generate a random file (string)
  202. func generateRandomFile() (string, error) {
  203. //generate a random file size between minFileSize and maxFileSize
  204. fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
  205. log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
  206. b := make([]byte, fileSize*1024)
  207. _, err := crand.Read(b)
  208. if err != nil {
  209. log.Error("Error generating random file.", "err", err)
  210. return "", err
  211. }
  212. return string(b), nil
  213. }
  214. //create a local store for the given node
  215. func createTestLocalStorageForID(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, string, error) {
  216. var datadir string
  217. var err error
  218. datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
  219. if err != nil {
  220. return nil, "", err
  221. }
  222. var store storage.ChunkStore
  223. params := storage.NewDefaultLocalStoreParams()
  224. params.ChunkDbPath = datadir
  225. params.BaseKey = addr.Over()
  226. store, err = storage.NewTestLocalStoreForAddr(params)
  227. if err != nil {
  228. os.RemoveAll(datadir)
  229. return nil, "", err
  230. }
  231. return store, datadir, nil
  232. }