network_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package swarm
  17. import (
  18. "context"
  19. "flag"
  20. "fmt"
  21. "io/ioutil"
  22. "math/rand"
  23. "os"
  24. "sync"
  25. "sync/atomic"
  26. "testing"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. "github.com/ethereum/go-ethereum/crypto"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/node"
  32. "github.com/ethereum/go-ethereum/p2p/discover"
  33. "github.com/ethereum/go-ethereum/p2p/simulations"
  34. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  35. "github.com/ethereum/go-ethereum/swarm/api"
  36. "github.com/ethereum/go-ethereum/swarm/network"
  37. "github.com/ethereum/go-ethereum/swarm/storage"
  38. colorable "github.com/mattn/go-colorable"
  39. )
  40. var (
  41. loglevel = flag.Int("loglevel", 2, "verbosity of logs")
  42. longrunning = flag.Bool("longrunning", false, "do run long-running tests")
  43. waitKademlia = flag.Bool("waitkademlia", false, "wait for healthy kademlia before checking files availability")
  44. )
  45. func init() {
  46. rand.Seed(time.Now().UnixNano())
  47. flag.Parse()
  48. log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
  49. }
  50. // TestSwarmNetwork runs a series of test simulations with
  51. // static and dynamic Swarm nodes in network simulation, by
  52. // uploading files to every node and retrieving them.
  53. func TestSwarmNetwork(t *testing.T) {
  54. for _, tc := range []struct {
  55. name string
  56. steps []testSwarmNetworkStep
  57. options *testSwarmNetworkOptions
  58. disabled bool
  59. }{
  60. {
  61. name: "10_nodes",
  62. steps: []testSwarmNetworkStep{
  63. {
  64. nodeCount: 10,
  65. },
  66. },
  67. options: &testSwarmNetworkOptions{
  68. Timeout: 45 * time.Second,
  69. },
  70. },
  71. {
  72. name: "10_nodes_skip_check",
  73. steps: []testSwarmNetworkStep{
  74. {
  75. nodeCount: 10,
  76. },
  77. },
  78. options: &testSwarmNetworkOptions{
  79. Timeout: 45 * time.Second,
  80. SkipCheck: true,
  81. },
  82. },
  83. {
  84. name: "100_nodes",
  85. steps: []testSwarmNetworkStep{
  86. {
  87. nodeCount: 100,
  88. },
  89. },
  90. options: &testSwarmNetworkOptions{
  91. Timeout: 3 * time.Minute,
  92. },
  93. disabled: !*longrunning,
  94. },
  95. {
  96. name: "100_nodes_skip_check",
  97. steps: []testSwarmNetworkStep{
  98. {
  99. nodeCount: 100,
  100. },
  101. },
  102. options: &testSwarmNetworkOptions{
  103. Timeout: 3 * time.Minute,
  104. SkipCheck: true,
  105. },
  106. disabled: !*longrunning,
  107. },
  108. {
  109. name: "inc_node_count",
  110. steps: []testSwarmNetworkStep{
  111. {
  112. nodeCount: 2,
  113. },
  114. {
  115. nodeCount: 5,
  116. },
  117. {
  118. nodeCount: 10,
  119. },
  120. },
  121. options: &testSwarmNetworkOptions{
  122. Timeout: 90 * time.Second,
  123. },
  124. disabled: !*longrunning,
  125. },
  126. {
  127. name: "dec_node_count",
  128. steps: []testSwarmNetworkStep{
  129. {
  130. nodeCount: 10,
  131. },
  132. {
  133. nodeCount: 6,
  134. },
  135. {
  136. nodeCount: 3,
  137. },
  138. },
  139. options: &testSwarmNetworkOptions{
  140. Timeout: 90 * time.Second,
  141. },
  142. disabled: !*longrunning,
  143. },
  144. {
  145. name: "dec_inc_node_count",
  146. steps: []testSwarmNetworkStep{
  147. {
  148. nodeCount: 5,
  149. },
  150. {
  151. nodeCount: 3,
  152. },
  153. {
  154. nodeCount: 10,
  155. },
  156. },
  157. options: &testSwarmNetworkOptions{
  158. Timeout: 90 * time.Second,
  159. },
  160. },
  161. {
  162. name: "inc_dec_node_count",
  163. steps: []testSwarmNetworkStep{
  164. {
  165. nodeCount: 3,
  166. },
  167. {
  168. nodeCount: 5,
  169. },
  170. {
  171. nodeCount: 25,
  172. },
  173. {
  174. nodeCount: 10,
  175. },
  176. {
  177. nodeCount: 4,
  178. },
  179. },
  180. options: &testSwarmNetworkOptions{
  181. Timeout: 5 * time.Minute,
  182. },
  183. disabled: !*longrunning,
  184. },
  185. {
  186. name: "inc_dec_node_count_skip_check",
  187. steps: []testSwarmNetworkStep{
  188. {
  189. nodeCount: 3,
  190. },
  191. {
  192. nodeCount: 5,
  193. },
  194. {
  195. nodeCount: 25,
  196. },
  197. {
  198. nodeCount: 10,
  199. },
  200. {
  201. nodeCount: 4,
  202. },
  203. },
  204. options: &testSwarmNetworkOptions{
  205. Timeout: 5 * time.Minute,
  206. SkipCheck: true,
  207. },
  208. disabled: !*longrunning,
  209. },
  210. } {
  211. if tc.disabled {
  212. continue
  213. }
  214. t.Run(tc.name, func(t *testing.T) {
  215. testSwarmNetwork(t, tc.options, tc.steps...)
  216. })
  217. }
  218. }
  219. // testSwarmNetworkStep is the configuration
  220. // for the state of the simulation network.
  221. type testSwarmNetworkStep struct {
  222. // number of swarm nodes that must be in the Up state
  223. nodeCount int
  224. }
  225. // file represents the file uploaded on a particular node.
  226. type file struct {
  227. addr storage.Address
  228. data string
  229. nodeID discover.NodeID
  230. }
  231. // check represents a reference to a file that is retrieved
  232. // from a particular node.
  233. type check struct {
  234. key string
  235. nodeID discover.NodeID
  236. }
  237. // testSwarmNetworkOptions contains optional parameters for running
  238. // testSwarmNetwork.
  239. type testSwarmNetworkOptions struct {
  240. Timeout time.Duration
  241. SkipCheck bool
  242. }
  243. // testSwarmNetwork is a helper function used for testing different
  244. // static and dynamic Swarm network simulations.
  245. // It is responsible for:
  246. // - Setting up a Swarm network simulation, and updates the number of nodes within the network on every step according to steps.
  247. // - Uploading a unique file to every node on every step.
  248. // - May wait for Kademlia on every node to be healthy.
  249. // - Checking if a file is retrievable from all nodes.
  250. func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwarmNetworkStep) {
  251. dir, err := ioutil.TempDir("", "swarm-network-test")
  252. if err != nil {
  253. t.Fatal(err)
  254. }
  255. defer os.RemoveAll(dir)
  256. if o == nil {
  257. o = new(testSwarmNetworkOptions)
  258. }
  259. ctx := context.Background()
  260. if o.Timeout > 0 {
  261. var cancel context.CancelFunc
  262. ctx, cancel = context.WithTimeout(ctx, o.Timeout)
  263. defer cancel()
  264. }
  265. swarms := make(map[discover.NodeID]*Swarm)
  266. files := make([]file, 0)
  267. services := map[string]adapters.ServiceFunc{
  268. "swarm": func(ctx *adapters.ServiceContext) (node.Service, error) {
  269. config := api.NewConfig()
  270. dir, err := ioutil.TempDir(dir, "node")
  271. if err != nil {
  272. return nil, err
  273. }
  274. config.Path = dir
  275. privkey, err := crypto.GenerateKey()
  276. if err != nil {
  277. return nil, err
  278. }
  279. config.Init(privkey)
  280. config.DeliverySkipCheck = o.SkipCheck
  281. s, err := NewSwarm(config, nil)
  282. if err != nil {
  283. return nil, err
  284. }
  285. log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", s.bzz.BaseAddr()))
  286. swarms[ctx.Config.ID] = s
  287. return s, nil
  288. },
  289. }
  290. a := adapters.NewSimAdapter(services)
  291. net := simulations.NewNetwork(a, &simulations.NetworkConfig{
  292. ID: "0",
  293. DefaultService: "swarm",
  294. })
  295. defer net.Shutdown()
  296. trigger := make(chan discover.NodeID)
  297. sim := simulations.NewSimulation(net)
  298. for i, step := range steps {
  299. log.Debug("test sync step", "n", i+1, "nodes", step.nodeCount)
  300. change := step.nodeCount - len(allNodeIDs(net))
  301. if change > 0 {
  302. _, err := addNodes(change, net)
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. } else if change < 0 {
  307. err := removeNodes(-change, net)
  308. if err != nil {
  309. t.Fatal(err)
  310. }
  311. } else {
  312. t.Logf("step %v: no change in nodes", i)
  313. continue
  314. }
  315. nodeIDs := allNodeIDs(net)
  316. shuffle(len(nodeIDs), func(i, j int) {
  317. nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i]
  318. })
  319. for _, id := range nodeIDs {
  320. key, data, err := uploadFile(swarms[id])
  321. if err != nil {
  322. t.Fatal(err)
  323. }
  324. log.Trace("file uploaded", "node", id, "key", key.String())
  325. files = append(files, file{
  326. addr: key,
  327. data: data,
  328. nodeID: id,
  329. })
  330. }
  331. // Prepare PeerPot map for checking Kademlia health
  332. var ppmap map[string]*network.PeerPot
  333. nIDs := allNodeIDs(net)
  334. addrs := make([][]byte, len(nIDs))
  335. if *waitKademlia {
  336. for i, id := range nIDs {
  337. addrs[i] = swarms[id].bzz.BaseAddr()
  338. }
  339. ppmap = network.NewPeerPotMap(2, addrs)
  340. }
  341. var checkStatusM sync.Map
  342. var nodeStatusM sync.Map
  343. var totalFoundCount uint64
  344. result := sim.Run(ctx, &simulations.Step{
  345. Action: func(ctx context.Context) error {
  346. if *waitKademlia {
  347. // Wait for healthy Kademlia on every node before checking files
  348. ticker := time.NewTicker(200 * time.Millisecond)
  349. defer ticker.Stop()
  350. for range ticker.C {
  351. healthy := true
  352. log.Debug("kademlia health check", "node count", len(nIDs), "addr count", len(addrs))
  353. for i, id := range nIDs {
  354. swarm := swarms[id]
  355. //PeerPot for this node
  356. addr := common.Bytes2Hex(swarm.bzz.BaseAddr())
  357. pp := ppmap[addr]
  358. //call Healthy RPC
  359. h := swarm.bzz.Healthy(pp)
  360. //print info
  361. log.Debug(swarm.bzz.String())
  362. log.Debug("kademlia", "empty bins", pp.EmptyBins, "gotNN", h.GotNN, "knowNN", h.KnowNN, "full", h.Full)
  363. log.Debug("kademlia", "health", h.GotNN && h.KnowNN && h.Full, "addr", fmt.Sprintf("%x", swarm.bzz.BaseAddr()), "id", id, "i", i)
  364. log.Debug("kademlia", "ill condition", !h.GotNN || !h.Full, "addr", fmt.Sprintf("%x", swarm.bzz.BaseAddr()), "id", id, "i", i)
  365. if !h.GotNN || !h.Full {
  366. healthy = false
  367. break
  368. }
  369. }
  370. if healthy {
  371. break
  372. }
  373. }
  374. }
  375. go func() {
  376. // File retrieval check is repeated until all uploaded files are retrieved from all nodes
  377. // or until the timeout is reached.
  378. for {
  379. if retrieve(net, files, swarms, trigger, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 {
  380. return
  381. }
  382. }
  383. }()
  384. return nil
  385. },
  386. Trigger: trigger,
  387. Expect: &simulations.Expectation{
  388. Nodes: allNodeIDs(net),
  389. Check: func(ctx context.Context, id discover.NodeID) (bool, error) {
  390. // The check is done by a goroutine in the action function.
  391. return true, nil
  392. },
  393. },
  394. })
  395. if result.Error != nil {
  396. t.Fatal(result.Error)
  397. }
  398. log.Debug("done: test sync step", "n", i+1, "nodes", step.nodeCount)
  399. }
  400. }
  401. // allNodeIDs is returning NodeID for every node that is Up.
  402. func allNodeIDs(net *simulations.Network) (nodes []discover.NodeID) {
  403. for _, n := range net.GetNodes() {
  404. if n.Up {
  405. nodes = append(nodes, n.ID())
  406. }
  407. }
  408. return
  409. }
  410. // addNodes adds a number of nodes to the network.
  411. func addNodes(count int, net *simulations.Network) (ids []discover.NodeID, err error) {
  412. for i := 0; i < count; i++ {
  413. nodeIDs := allNodeIDs(net)
  414. l := len(nodeIDs)
  415. nodeconf := adapters.RandomNodeConfig()
  416. node, err := net.NewNodeWithConfig(nodeconf)
  417. if err != nil {
  418. return nil, fmt.Errorf("create node: %v", err)
  419. }
  420. err = net.Start(node.ID())
  421. if err != nil {
  422. return nil, fmt.Errorf("start node: %v", err)
  423. }
  424. log.Debug("created node", "id", node.ID())
  425. // connect nodes in a chain
  426. if l > 0 {
  427. var otherNodeID discover.NodeID
  428. for i := l - 1; i >= 0; i-- {
  429. n := net.GetNode(nodeIDs[i])
  430. if n.Up {
  431. otherNodeID = n.ID()
  432. break
  433. }
  434. }
  435. log.Debug("connect nodes", "one", node.ID(), "other", otherNodeID)
  436. if err := net.Connect(node.ID(), otherNodeID); err != nil {
  437. return nil, err
  438. }
  439. }
  440. ids = append(ids, node.ID())
  441. }
  442. return ids, nil
  443. }
  444. // removeNodes stops a random nodes in the network.
  445. func removeNodes(count int, net *simulations.Network) error {
  446. for i := 0; i < count; i++ {
  447. // allNodeIDs are returning only the Up nodes.
  448. nodeIDs := allNodeIDs(net)
  449. if len(nodeIDs) == 0 {
  450. break
  451. }
  452. node := net.GetNode(nodeIDs[rand.Intn(len(nodeIDs))])
  453. if err := node.Stop(); err != nil {
  454. return err
  455. }
  456. log.Debug("removed node", "id", node.ID())
  457. }
  458. return nil
  459. }
  460. // uploadFile, uploads a short file to the swarm instance
  461. // using the api.Put method.
  462. func uploadFile(swarm *Swarm) (storage.Address, string, error) {
  463. b := make([]byte, 8)
  464. _, err := rand.Read(b)
  465. if err != nil {
  466. return nil, "", err
  467. }
  468. // File data is very short, but it is ensured that its
  469. // uniqueness is very certain.
  470. data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b)
  471. ctx := context.TODO()
  472. k, wait, err := swarm.api.Put(ctx, data, "text/plain", false)
  473. if err != nil {
  474. return nil, "", err
  475. }
  476. if wait != nil {
  477. err = wait(ctx)
  478. }
  479. return k, data, err
  480. }
  481. // retrieve is the function that is used for checking the availability of
  482. // uploaded files in testSwarmNetwork test helper function.
  483. func retrieve(
  484. net *simulations.Network,
  485. files []file,
  486. swarms map[discover.NodeID]*Swarm,
  487. trigger chan discover.NodeID,
  488. checkStatusM *sync.Map,
  489. nodeStatusM *sync.Map,
  490. totalFoundCount *uint64,
  491. ) (missing uint64) {
  492. shuffle(len(files), func(i, j int) {
  493. files[i], files[j] = files[j], files[i]
  494. })
  495. var totalWg sync.WaitGroup
  496. errc := make(chan error)
  497. nodeIDs := allNodeIDs(net)
  498. totalCheckCount := len(nodeIDs) * len(files)
  499. for _, id := range nodeIDs {
  500. if _, ok := nodeStatusM.Load(id); ok {
  501. continue
  502. }
  503. start := time.Now()
  504. var checkCount uint64
  505. var foundCount uint64
  506. totalWg.Add(1)
  507. var wg sync.WaitGroup
  508. for _, f := range files {
  509. swarm := swarms[id]
  510. checkKey := check{
  511. key: f.addr.String(),
  512. nodeID: id,
  513. }
  514. if n, ok := checkStatusM.Load(checkKey); ok && n.(int) == 0 {
  515. continue
  516. }
  517. checkCount++
  518. wg.Add(1)
  519. go func(f file, id discover.NodeID) {
  520. defer wg.Done()
  521. log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", atomic.LoadUint64(totalFoundCount))
  522. r, _, _, _, err := swarm.api.Get(context.TODO(), f.addr, "/")
  523. if err != nil {
  524. errc <- fmt.Errorf("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err)
  525. return
  526. }
  527. d, err := ioutil.ReadAll(r)
  528. if err != nil {
  529. errc <- fmt.Errorf("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err)
  530. return
  531. }
  532. data := string(d)
  533. if data != f.data {
  534. errc <- fmt.Errorf("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data)
  535. return
  536. }
  537. checkStatusM.Store(checkKey, 0)
  538. atomic.AddUint64(&foundCount, 1)
  539. log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", atomic.LoadUint64(&foundCount))
  540. }(f, id)
  541. }
  542. go func(id discover.NodeID) {
  543. defer totalWg.Done()
  544. wg.Wait()
  545. atomic.AddUint64(totalFoundCount, foundCount)
  546. if foundCount == checkCount {
  547. log.Info("all files are found for node", "id", id.String(), "duration", time.Since(start))
  548. nodeStatusM.Store(id, 0)
  549. trigger <- id
  550. return
  551. }
  552. log.Debug("files missing for node", "id", id.String(), "check", checkCount, "found", foundCount)
  553. }(id)
  554. }
  555. go func() {
  556. totalWg.Wait()
  557. close(errc)
  558. }()
  559. var errCount int
  560. for err := range errc {
  561. if err != nil {
  562. errCount++
  563. }
  564. log.Warn(err.Error())
  565. }
  566. log.Info("check stats", "total check count", totalCheckCount, "total files found", atomic.LoadUint64(totalFoundCount), "total errors", errCount)
  567. return uint64(totalCheckCount) - atomic.LoadUint64(totalFoundCount)
  568. }
  569. // Backported from stdlib https://golang.org/src/math/rand/rand.go?s=11175:11215#L333
  570. //
  571. // Replace with rand.Shuffle from go 1.10 when go 1.9 support is dropped.
  572. //
  573. // shuffle pseudo-randomizes the order of elements.
  574. // n is the number of elements. Shuffle panics if n < 0.
  575. // swap swaps the elements with indexes i and j.
  576. func shuffle(n int, swap func(i, j int)) {
  577. if n < 0 {
  578. panic("invalid argument to Shuffle")
  579. }
  580. // Fisher-Yates shuffle: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
  581. // Shuffle really ought not be called with n that doesn't fit in 32 bits.
  582. // Not only will it take a very long time, but with 2³¹! possible permutations,
  583. // there's no way that any PRNG can have a big enough internal state to
  584. // generate even a minuscule percentage of the possible permutations.
  585. // Nevertheless, the right API signature accepts an int n, so handle it as best we can.
  586. i := n - 1
  587. for ; i > 1<<31-1-1; i-- {
  588. j := int(rand.Int63n(int64(i + 1)))
  589. swap(i, j)
  590. }
  591. for ; i > 0; i-- {
  592. j := int(rand.Int31n(int32(i + 1)))
  593. swap(i, j)
  594. }
  595. }