network_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package swarm
  17. import (
  18. "context"
  19. "flag"
  20. "fmt"
  21. "io/ioutil"
  22. "math/rand"
  23. "os"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "testing"
  28. "time"
  29. "github.com/ethereum/go-ethereum/swarm/sctx"
  30. "github.com/ethereum/go-ethereum/swarm/testutil"
  31. "github.com/ethereum/go-ethereum/crypto"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/node"
  34. "github.com/ethereum/go-ethereum/p2p/enode"
  35. "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
  36. "github.com/ethereum/go-ethereum/swarm/api"
  37. "github.com/ethereum/go-ethereum/swarm/network/simulation"
  38. "github.com/ethereum/go-ethereum/swarm/storage"
  39. "github.com/mattn/go-colorable"
  40. )
  41. var (
  42. loglevel = flag.Int("loglevel", 2, "verbosity of logs")
  43. longrunning = flag.Bool("longrunning", false, "do run long-running tests")
  44. waitKademlia = flag.Bool("waitkademlia", false, "wait for healthy kademlia before checking files availability")
  45. )
  46. func init() {
  47. rand.Seed(time.Now().UnixNano())
  48. flag.Parse()
  49. log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
  50. }
  51. // TestSwarmNetwork runs a series of test simulations with
  52. // static and dynamic Swarm nodes in network simulation, by
  53. // uploading files to every node and retrieving them.
  54. func TestSwarmNetwork(t *testing.T) {
  55. var tests = []testSwarmNetworkCase{
  56. {
  57. name: "10_nodes",
  58. steps: []testSwarmNetworkStep{
  59. {
  60. nodeCount: 10,
  61. },
  62. },
  63. options: &testSwarmNetworkOptions{
  64. Timeout: 45 * time.Second,
  65. },
  66. },
  67. {
  68. name: "10_nodes_skip_check",
  69. steps: []testSwarmNetworkStep{
  70. {
  71. nodeCount: 10,
  72. },
  73. },
  74. options: &testSwarmNetworkOptions{
  75. Timeout: 45 * time.Second,
  76. SkipCheck: true,
  77. },
  78. },
  79. {
  80. name: "dec_inc_node_count",
  81. steps: []testSwarmNetworkStep{
  82. {
  83. nodeCount: 3,
  84. },
  85. {
  86. nodeCount: 1,
  87. },
  88. {
  89. nodeCount: 5,
  90. },
  91. },
  92. options: &testSwarmNetworkOptions{
  93. Timeout: 90 * time.Second,
  94. },
  95. },
  96. }
  97. if *longrunning {
  98. tests = append(tests, longRunningCases()...)
  99. } else if testutil.RaceEnabled {
  100. tests = shortCaseForRace()
  101. }
  102. for _, tc := range tests {
  103. t.Run(tc.name, func(t *testing.T) {
  104. testSwarmNetwork(t, tc.options, tc.steps...)
  105. })
  106. }
  107. }
  108. type testSwarmNetworkCase struct {
  109. name string
  110. steps []testSwarmNetworkStep
  111. options *testSwarmNetworkOptions
  112. }
  113. // testSwarmNetworkStep is the configuration
  114. // for the state of the simulation network.
  115. type testSwarmNetworkStep struct {
  116. // number of swarm nodes that must be in the Up state
  117. nodeCount int
  118. }
  119. // testSwarmNetworkOptions contains optional parameters for running
  120. // testSwarmNetwork.
  121. type testSwarmNetworkOptions struct {
  122. Timeout time.Duration
  123. SkipCheck bool
  124. }
  125. func longRunningCases() []testSwarmNetworkCase {
  126. return []testSwarmNetworkCase{
  127. {
  128. name: "50_nodes",
  129. steps: []testSwarmNetworkStep{
  130. {
  131. nodeCount: 50,
  132. },
  133. },
  134. options: &testSwarmNetworkOptions{
  135. Timeout: 3 * time.Minute,
  136. },
  137. },
  138. {
  139. name: "50_nodes_skip_check",
  140. steps: []testSwarmNetworkStep{
  141. {
  142. nodeCount: 50,
  143. },
  144. },
  145. options: &testSwarmNetworkOptions{
  146. Timeout: 3 * time.Minute,
  147. SkipCheck: true,
  148. },
  149. },
  150. {
  151. name: "inc_node_count",
  152. steps: []testSwarmNetworkStep{
  153. {
  154. nodeCount: 2,
  155. },
  156. {
  157. nodeCount: 5,
  158. },
  159. {
  160. nodeCount: 10,
  161. },
  162. },
  163. options: &testSwarmNetworkOptions{
  164. Timeout: 90 * time.Second,
  165. },
  166. },
  167. {
  168. name: "dec_node_count",
  169. steps: []testSwarmNetworkStep{
  170. {
  171. nodeCount: 10,
  172. },
  173. {
  174. nodeCount: 6,
  175. },
  176. {
  177. nodeCount: 3,
  178. },
  179. },
  180. options: &testSwarmNetworkOptions{
  181. Timeout: 90 * time.Second,
  182. },
  183. },
  184. {
  185. name: "inc_dec_node_count",
  186. steps: []testSwarmNetworkStep{
  187. {
  188. nodeCount: 3,
  189. },
  190. {
  191. nodeCount: 5,
  192. },
  193. {
  194. nodeCount: 25,
  195. },
  196. {
  197. nodeCount: 10,
  198. },
  199. {
  200. nodeCount: 4,
  201. },
  202. },
  203. options: &testSwarmNetworkOptions{
  204. Timeout: 5 * time.Minute,
  205. },
  206. },
  207. {
  208. name: "inc_dec_node_count_skip_check",
  209. steps: []testSwarmNetworkStep{
  210. {
  211. nodeCount: 3,
  212. },
  213. {
  214. nodeCount: 5,
  215. },
  216. {
  217. nodeCount: 25,
  218. },
  219. {
  220. nodeCount: 10,
  221. },
  222. {
  223. nodeCount: 4,
  224. },
  225. },
  226. options: &testSwarmNetworkOptions{
  227. Timeout: 5 * time.Minute,
  228. SkipCheck: true,
  229. },
  230. },
  231. }
  232. }
  233. func shortCaseForRace() []testSwarmNetworkCase {
  234. // As for now, Travis with -race can only run 8 nodes
  235. return []testSwarmNetworkCase{
  236. {
  237. name: "8_nodes",
  238. steps: []testSwarmNetworkStep{
  239. {
  240. nodeCount: 8,
  241. },
  242. },
  243. options: &testSwarmNetworkOptions{
  244. Timeout: 1 * time.Minute,
  245. },
  246. },
  247. }
  248. }
  249. // file represents the file uploaded on a particular node.
  250. type file struct {
  251. addr storage.Address
  252. data string
  253. nodeID enode.ID
  254. }
  255. // check represents a reference to a file that is retrieved
  256. // from a particular node.
  257. type check struct {
  258. key string
  259. nodeID enode.ID
  260. }
  261. // testSwarmNetwork is a helper function used for testing different
  262. // static and dynamic Swarm network simulations.
  263. // It is responsible for:
  264. // - Setting up a Swarm network simulation, and updates the number of nodes within the network on every step according to steps.
  265. // - Uploading a unique file to every node on every step.
  266. // - May wait for Kademlia on every node to be healthy.
  267. // - Checking if a file is retrievable from all nodes.
  268. func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwarmNetworkStep) {
  269. t.Helper()
  270. if o == nil {
  271. o = new(testSwarmNetworkOptions)
  272. }
  273. sim := simulation.New(map[string]simulation.ServiceFunc{
  274. "swarm": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
  275. config := api.NewConfig()
  276. dir, err := ioutil.TempDir("", "swarm-network-test-node")
  277. if err != nil {
  278. return nil, nil, err
  279. }
  280. cleanup = func() {
  281. err := os.RemoveAll(dir)
  282. if err != nil {
  283. log.Error("cleaning up swarm temp dir", "err", err)
  284. }
  285. }
  286. config.Path = dir
  287. privkey, err := crypto.GenerateKey()
  288. if err != nil {
  289. return nil, cleanup, err
  290. }
  291. nodekey, err := crypto.GenerateKey()
  292. if err != nil {
  293. return nil, cleanup, err
  294. }
  295. config.Init(privkey, nodekey)
  296. config.DeliverySkipCheck = o.SkipCheck
  297. config.Port = ""
  298. swarm, err := NewSwarm(config, nil)
  299. if err != nil {
  300. return nil, cleanup, err
  301. }
  302. bucket.Store(simulation.BucketKeyKademlia, swarm.bzz.Hive.Kademlia)
  303. log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", swarm.bzz.BaseAddr()))
  304. return swarm, cleanup, nil
  305. },
  306. })
  307. defer sim.Close()
  308. ctx := context.Background()
  309. if o.Timeout > 0 {
  310. var cancel context.CancelFunc
  311. ctx, cancel = context.WithTimeout(ctx, o.Timeout)
  312. defer cancel()
  313. }
  314. files := make([]file, 0)
  315. for i, step := range steps {
  316. log.Debug("test sync step", "n", i+1, "nodes", step.nodeCount)
  317. change := step.nodeCount - len(sim.UpNodeIDs())
  318. if change > 0 {
  319. _, err := sim.AddNodesAndConnectChain(change)
  320. if err != nil {
  321. t.Fatal(err)
  322. }
  323. } else if change < 0 {
  324. _, err := sim.StopRandomNodes(-change)
  325. if err != nil {
  326. t.Fatal(err)
  327. }
  328. } else {
  329. t.Logf("step %v: no change in nodes", i)
  330. continue
  331. }
  332. var checkStatusM sync.Map
  333. var nodeStatusM sync.Map
  334. var totalFoundCount uint64
  335. result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
  336. nodeIDs := sim.UpNodeIDs()
  337. rand.Shuffle(len(nodeIDs), func(i, j int) {
  338. nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i]
  339. })
  340. for _, id := range nodeIDs {
  341. key, data, err := uploadFile(sim.Service("swarm", id).(*Swarm))
  342. if err != nil {
  343. return err
  344. }
  345. log.Trace("file uploaded", "node", id, "key", key.String())
  346. files = append(files, file{
  347. addr: key,
  348. data: data,
  349. nodeID: id,
  350. })
  351. }
  352. if *waitKademlia {
  353. if _, err := sim.WaitTillHealthy(ctx); err != nil {
  354. return err
  355. }
  356. }
  357. // File retrieval check is repeated until all uploaded files are retrieved from all nodes
  358. // or until the timeout is reached.
  359. for {
  360. if retrieve(sim, files, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 {
  361. return nil
  362. }
  363. }
  364. })
  365. if result.Error != nil {
  366. t.Fatal(result.Error)
  367. }
  368. log.Debug("done: test sync step", "n", i+1, "nodes", step.nodeCount)
  369. }
  370. }
  371. // uploadFile, uploads a short file to the swarm instance
  372. // using the api.Put method.
  373. func uploadFile(swarm *Swarm) (storage.Address, string, error) {
  374. b := make([]byte, 8)
  375. _, err := rand.Read(b)
  376. if err != nil {
  377. return nil, "", err
  378. }
  379. // File data is very short, but it is ensured that its
  380. // uniqueness is very certain.
  381. data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b)
  382. ctx := context.TODO()
  383. k, wait, err := putString(ctx, swarm.api, data, "text/plain", false)
  384. if err != nil {
  385. return nil, "", err
  386. }
  387. if wait != nil {
  388. err = wait(ctx)
  389. }
  390. return k, data, err
  391. }
  392. // retrieve is the function that is used for checking the availability of
  393. // uploaded files in testSwarmNetwork test helper function.
  394. func retrieve(
  395. sim *simulation.Simulation,
  396. files []file,
  397. checkStatusM *sync.Map,
  398. nodeStatusM *sync.Map,
  399. totalFoundCount *uint64,
  400. ) (missing uint64) {
  401. rand.Shuffle(len(files), func(i, j int) {
  402. files[i], files[j] = files[j], files[i]
  403. })
  404. var totalWg sync.WaitGroup
  405. errc := make(chan error)
  406. nodeIDs := sim.UpNodeIDs()
  407. totalCheckCount := len(nodeIDs) * len(files)
  408. for _, id := range nodeIDs {
  409. if _, ok := nodeStatusM.Load(id); ok {
  410. continue
  411. }
  412. start := time.Now()
  413. var checkCount uint64
  414. var foundCount uint64
  415. totalWg.Add(1)
  416. var wg sync.WaitGroup
  417. swarm := sim.Service("swarm", id).(*Swarm)
  418. for _, f := range files {
  419. checkKey := check{
  420. key: f.addr.String(),
  421. nodeID: id,
  422. }
  423. if n, ok := checkStatusM.Load(checkKey); ok && n.(int) == 0 {
  424. continue
  425. }
  426. checkCount++
  427. wg.Add(1)
  428. go func(f file, id enode.ID) {
  429. defer wg.Done()
  430. log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", atomic.LoadUint64(totalFoundCount))
  431. r, _, _, _, err := swarm.api.Get(context.TODO(), api.NOOPDecrypt, f.addr, "/")
  432. if err != nil {
  433. errc <- fmt.Errorf("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err)
  434. return
  435. }
  436. d, err := ioutil.ReadAll(r)
  437. if err != nil {
  438. errc <- fmt.Errorf("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err)
  439. return
  440. }
  441. data := string(d)
  442. if data != f.data {
  443. errc <- fmt.Errorf("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data)
  444. return
  445. }
  446. checkStatusM.Store(checkKey, 0)
  447. atomic.AddUint64(&foundCount, 1)
  448. log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", atomic.LoadUint64(&foundCount))
  449. }(f, id)
  450. }
  451. go func(id enode.ID) {
  452. defer totalWg.Done()
  453. wg.Wait()
  454. atomic.AddUint64(totalFoundCount, foundCount)
  455. if foundCount == checkCount {
  456. log.Info("all files are found for node", "id", id.String(), "duration", time.Since(start))
  457. nodeStatusM.Store(id, 0)
  458. return
  459. }
  460. log.Debug("files missing for node", "id", id.String(), "check", checkCount, "found", foundCount)
  461. }(id)
  462. }
  463. go func() {
  464. totalWg.Wait()
  465. close(errc)
  466. }()
  467. var errCount int
  468. for err := range errc {
  469. if err != nil {
  470. errCount++
  471. }
  472. log.Warn(err.Error())
  473. }
  474. log.Info("check stats", "total check count", totalCheckCount, "total files found", atomic.LoadUint64(totalFoundCount), "total errors", errCount)
  475. return uint64(totalCheckCount) - atomic.LoadUint64(totalFoundCount)
  476. }
  477. // putString provides singleton manifest creation on top of api.API
  478. func putString(ctx context.Context, a *api.API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
  479. r := strings.NewReader(content)
  480. tag, err := a.Tags.New("unnamed-tag", 0)
  481. log.Trace("created new tag", "uid", tag.Uid)
  482. cCtx := sctx.SetTag(ctx, tag.Uid)
  483. key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
  484. if err != nil {
  485. return nil, nil, err
  486. }
  487. manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
  488. r = strings.NewReader(manifest)
  489. key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
  490. if err != nil {
  491. return nil, nil, err
  492. }
  493. tag.DoneSplit(key)
  494. return key, func(ctx context.Context) error {
  495. err := waitContent(ctx)
  496. if err != nil {
  497. return err
  498. }
  499. return waitManifest(ctx)
  500. }, nil
  501. }