common_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. "errors"
  20. "flag"
  21. "fmt"
  22. "io"
  23. "io/ioutil"
  24. "math/rand"
  25. "os"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "github.com/ethereum/go-ethereum/common"
  31. "github.com/ethereum/go-ethereum/crypto"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/p2p/enode"
  34. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  35. p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
  36. "github.com/ethereum/go-ethereum/swarm/chunk"
  37. "github.com/ethereum/go-ethereum/swarm/network"
  38. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  39. "github.com/ethereum/go-ethereum/swarm/state"
  40. "github.com/ethereum/go-ethereum/swarm/storage"
  41. "github.com/ethereum/go-ethereum/swarm/storage/localstore"
  42. "github.com/ethereum/go-ethereum/swarm/storage/mock"
  43. "github.com/ethereum/go-ethereum/swarm/testutil"
  44. colorable "github.com/mattn/go-colorable"
  45. )
  46. var (
  47. loglevel = flag.Int("loglevel", 2, "verbosity of logs")
  48. nodes = flag.Int("nodes", 0, "number of nodes")
  49. chunks = flag.Int("chunks", 0, "number of chunks")
  50. useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)")
  51. longrunning = flag.Bool("longrunning", false, "do run long-running tests")
  52. bucketKeyStore = simulation.BucketKey("store")
  53. bucketKeyFileStore = simulation.BucketKey("filestore")
  54. bucketKeyNetStore = simulation.BucketKey("netstore")
  55. bucketKeyDelivery = simulation.BucketKey("delivery")
  56. bucketKeyRegistry = simulation.BucketKey("registry")
  57. chunkSize = 4096
  58. pof = network.Pof
  59. )
  60. func init() {
  61. flag.Parse()
  62. rand.Seed(time.Now().UnixNano())
  63. log.PrintOrigins(true)
  64. log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
  65. }
  66. // newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
  67. func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
  68. addr := network.NewAddr(ctx.Config.Node())
  69. netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
  70. if err != nil {
  71. return nil, nil, nil, nil, err
  72. }
  73. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  74. return addr, netStore, delivery, cleanup, nil
  75. }
  76. // newNetStoreAndDeliveryWithBzzAddr is a constructor for NetStore and Delivery, used in Simulations, accepting any BzzAddr
  77. func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
  78. netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
  79. if err != nil {
  80. return nil, nil, nil, err
  81. }
  82. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  83. return netStore, delivery, cleanup, nil
  84. }
  85. // newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
  86. func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
  87. addr := network.NewAddr(ctx.Config.Node())
  88. netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
  89. if err != nil {
  90. return nil, nil, nil, nil, err
  91. }
  92. netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
  93. return addr, netStore, delivery, cleanup, nil
  94. }
  95. func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
  96. n := ctx.Config.Node()
  97. localStore, localStoreCleanup, err := newTestLocalStore(n.ID(), addr, nil)
  98. if err != nil {
  99. return nil, nil, nil, err
  100. }
  101. netStore, err := storage.NewNetStore(localStore, nil)
  102. if err != nil {
  103. localStore.Close()
  104. localStoreCleanup()
  105. return nil, nil, nil, err
  106. }
  107. fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams(), chunk.NewTags())
  108. kad := network.NewKademlia(addr.Over(), network.NewKadParams())
  109. delivery := NewDelivery(kad, netStore)
  110. bucket.Store(bucketKeyStore, localStore)
  111. bucket.Store(bucketKeyDelivery, delivery)
  112. bucket.Store(bucketKeyFileStore, fileStore)
  113. // for the kademlia object, we use the global key from the simulation package,
  114. // as the simulation will try to access it in the WaitTillHealthy with that key
  115. bucket.Store(simulation.BucketKeyKademlia, kad)
  116. cleanup := func() {
  117. netStore.Close()
  118. localStoreCleanup()
  119. }
  120. return netStore, delivery, cleanup, nil
  121. }
  122. func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *localstore.DB, func(), error) {
  123. // setup
  124. addr := network.RandomAddr() // tested peers peer address
  125. to := network.NewKademlia(addr.OAddr, network.NewKadParams())
  126. // temp datadir
  127. datadir, err := ioutil.TempDir("", "streamer")
  128. if err != nil {
  129. return nil, nil, nil, nil, err
  130. }
  131. removeDataDir := func() {
  132. os.RemoveAll(datadir)
  133. }
  134. localStore, err := localstore.New(datadir, addr.Over(), nil)
  135. if err != nil {
  136. removeDataDir()
  137. return nil, nil, nil, nil, err
  138. }
  139. netStore, err := storage.NewNetStore(localStore, nil)
  140. if err != nil {
  141. localStore.Close()
  142. removeDataDir()
  143. return nil, nil, nil, nil, err
  144. }
  145. delivery := NewDelivery(to, netStore)
  146. netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
  147. intervalsStore := state.NewInmemoryStore()
  148. streamer := NewRegistry(addr.ID(), delivery, netStore, intervalsStore, registryOptions, nil)
  149. prvkey, err := crypto.GenerateKey()
  150. if err != nil {
  151. removeDataDir()
  152. return nil, nil, nil, nil, err
  153. }
  154. protocolTester := p2ptest.NewProtocolTester(prvkey, 1, streamer.runProtocol)
  155. teardown := func() {
  156. protocolTester.Stop()
  157. streamer.Close()
  158. intervalsStore.Close()
  159. netStore.Close()
  160. removeDataDir()
  161. }
  162. err = waitForPeers(streamer, 10*time.Second, 1)
  163. if err != nil {
  164. teardown()
  165. return nil, nil, nil, nil, errors.New("timeout: peer is not created")
  166. }
  167. return protocolTester, streamer, localStore, teardown, nil
  168. }
  169. func waitForPeers(streamer *Registry, timeout time.Duration, expectedPeers int) error {
  170. ticker := time.NewTicker(10 * time.Millisecond)
  171. timeoutTimer := time.NewTimer(timeout)
  172. for {
  173. select {
  174. case <-ticker.C:
  175. if streamer.peersCount() >= expectedPeers {
  176. return nil
  177. }
  178. case <-timeoutTimer.C:
  179. return errors.New("timeout")
  180. }
  181. }
  182. }
  183. type roundRobinStore struct {
  184. index uint32
  185. stores []storage.ChunkStore
  186. }
  187. func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
  188. return &roundRobinStore{
  189. stores: stores,
  190. }
  191. }
  192. // not used in this context, only to fulfill ChunkStore interface
  193. func (rrs *roundRobinStore) Has(_ context.Context, _ storage.Address) (bool, error) {
  194. return false, errors.New("roundRobinStore doesn't support Has")
  195. }
  196. func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Address) (storage.Chunk, error) {
  197. return nil, errors.New("roundRobinStore doesn't support Get")
  198. }
  199. func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) {
  200. i := atomic.AddUint32(&rrs.index, 1)
  201. idx := int(i) % len(rrs.stores)
  202. return rrs.stores[idx].Put(ctx, mode, ch)
  203. }
  204. func (rrs *roundRobinStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
  205. return errors.New("roundRobinStore doesn't support Set")
  206. }
  207. func (rrs *roundRobinStore) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
  208. return 0, errors.New("roundRobinStore doesn't support LastPullSubscriptionBinID")
  209. }
  210. func (rrs *roundRobinStore) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
  211. return nil, nil
  212. }
  213. func (rrs *roundRobinStore) Close() error {
  214. for _, store := range rrs.stores {
  215. store.Close()
  216. }
  217. return nil
  218. }
  219. func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
  220. r, _ := fileStore.Retrieve(context.TODO(), hash)
  221. buf := make([]byte, 1024)
  222. var n int
  223. var total int64
  224. var err error
  225. for (total == 0 || n > 0) && err == nil {
  226. n, err = r.ReadAt(buf, total)
  227. total += int64(n)
  228. }
  229. if err != nil && err != io.EOF {
  230. return total, err
  231. }
  232. return total, nil
  233. }
  234. func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) {
  235. nodes := sim.UpNodeIDs()
  236. nodeCnt := len(nodes)
  237. log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt))
  238. //array holding generated files
  239. rfiles := make([]string, nodeCnt)
  240. //array holding the root hashes of the files
  241. rootAddrs := make([]storage.Address, nodeCnt)
  242. var err error
  243. //for every node, generate a file and upload
  244. for i, id := range nodes {
  245. item, ok := sim.NodeItem(id, bucketKeyFileStore)
  246. if !ok {
  247. return nil, nil, fmt.Errorf("Error accessing localstore")
  248. }
  249. fileStore := item.(*storage.FileStore)
  250. //generate a file
  251. rfiles[i], err = generateRandomFile()
  252. if err != nil {
  253. return nil, nil, err
  254. }
  255. //store it (upload it) on the FileStore
  256. ctx := context.TODO()
  257. rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false)
  258. log.Debug("Uploaded random string file to node")
  259. if err != nil {
  260. return nil, nil, err
  261. }
  262. err = wait(ctx)
  263. if err != nil {
  264. return nil, nil, err
  265. }
  266. rootAddrs[i] = rk
  267. }
  268. return rootAddrs, rfiles, nil
  269. }
  270. //generate a random file (string)
  271. func generateRandomFile() (string, error) {
  272. //generate a random file size between minFileSize and maxFileSize
  273. fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize
  274. log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize))
  275. b := testutil.RandomBytes(1, fileSize*1024)
  276. return string(b), nil
  277. }
  278. func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) {
  279. dir, err := ioutil.TempDir("", "swarm-stream-")
  280. if err != nil {
  281. return nil, nil, err
  282. }
  283. cleanup = func() {
  284. os.RemoveAll(dir)
  285. }
  286. var mockStore *mock.NodeStore
  287. if globalStore != nil {
  288. mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes()))
  289. }
  290. localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{
  291. MockStore: mockStore,
  292. })
  293. if err != nil {
  294. cleanup()
  295. return nil, nil, err
  296. }
  297. return localStore, cleanup, nil
  298. }
  299. // watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
  300. // disconnected to true in case of a disconnect event.
  301. func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
  302. log.Debug("Watching for disconnections")
  303. disconnections := sim.PeerEvents(
  304. ctx,
  305. sim.NodeIDs(),
  306. simulation.NewPeerEventsFilter().Drop(),
  307. )
  308. disconnected = new(boolean)
  309. go func() {
  310. for {
  311. select {
  312. case <-ctx.Done():
  313. return
  314. case d := <-disconnections:
  315. if d.Error != nil {
  316. log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
  317. } else {
  318. log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
  319. }
  320. disconnected.set(true)
  321. }
  322. }
  323. }()
  324. return disconnected
  325. }
  326. // boolean is used to concurrently set
  327. // and read a boolean value.
  328. type boolean struct {
  329. v bool
  330. mu sync.RWMutex
  331. }
  332. // set sets the value.
  333. func (b *boolean) set(v bool) {
  334. b.mu.Lock()
  335. defer b.mu.Unlock()
  336. b.v = v
  337. }
  338. // bool reads the value.
  339. func (b *boolean) bool() bool {
  340. b.mu.RLock()
  341. defer b.mu.RUnlock()
  342. return b.v
  343. }