| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 |
- // Copyright 2018 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package swarm
- import (
- "context"
- "flag"
- "fmt"
- "io/ioutil"
- "math/rand"
- "os"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/node"
- "github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/simulations"
- "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
- "github.com/ethereum/go-ethereum/swarm/api"
- "github.com/ethereum/go-ethereum/swarm/network"
- "github.com/ethereum/go-ethereum/swarm/storage"
- colorable "github.com/mattn/go-colorable"
- )
- var (
- loglevel = flag.Int("loglevel", 2, "verbosity of logs")
- longrunning = flag.Bool("longrunning", false, "do run long-running tests")
- waitKademlia = flag.Bool("waitkademlia", false, "wait for healthy kademlia before checking files availability")
- )
- func init() {
- rand.Seed(time.Now().UnixNano())
- flag.Parse()
- log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
- }
- // TestSwarmNetwork runs a series of test simulations with
- // static and dynamic Swarm nodes in network simulation, by
- // uploading files to every node and retrieving them.
- func TestSwarmNetwork(t *testing.T) {
- for _, tc := range []struct {
- name string
- steps []testSwarmNetworkStep
- options *testSwarmNetworkOptions
- disabled bool
- }{
- {
- name: "10_nodes",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 10,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 45 * time.Second,
- },
- },
- {
- name: "10_nodes_skip_check",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 10,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 45 * time.Second,
- SkipCheck: true,
- },
- },
- {
- name: "100_nodes",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 100,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 3 * time.Minute,
- },
- disabled: !*longrunning,
- },
- {
- name: "100_nodes_skip_check",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 100,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 3 * time.Minute,
- SkipCheck: true,
- },
- disabled: !*longrunning,
- },
- {
- name: "inc_node_count",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 2,
- },
- {
- nodeCount: 5,
- },
- {
- nodeCount: 10,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 90 * time.Second,
- },
- disabled: !*longrunning,
- },
- {
- name: "dec_node_count",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 10,
- },
- {
- nodeCount: 6,
- },
- {
- nodeCount: 3,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 90 * time.Second,
- },
- disabled: !*longrunning,
- },
- {
- name: "dec_inc_node_count",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 5,
- },
- {
- nodeCount: 3,
- },
- {
- nodeCount: 10,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 90 * time.Second,
- },
- },
- {
- name: "inc_dec_node_count",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 3,
- },
- {
- nodeCount: 5,
- },
- {
- nodeCount: 25,
- },
- {
- nodeCount: 10,
- },
- {
- nodeCount: 4,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 5 * time.Minute,
- },
- disabled: !*longrunning,
- },
- {
- name: "inc_dec_node_count_skip_check",
- steps: []testSwarmNetworkStep{
- {
- nodeCount: 3,
- },
- {
- nodeCount: 5,
- },
- {
- nodeCount: 25,
- },
- {
- nodeCount: 10,
- },
- {
- nodeCount: 4,
- },
- },
- options: &testSwarmNetworkOptions{
- Timeout: 5 * time.Minute,
- SkipCheck: true,
- },
- disabled: !*longrunning,
- },
- } {
- if tc.disabled {
- continue
- }
- t.Run(tc.name, func(t *testing.T) {
- testSwarmNetwork(t, tc.options, tc.steps...)
- })
- }
- }
- // testSwarmNetworkStep is the configuration
- // for the state of the simulation network.
- type testSwarmNetworkStep struct {
- // number of swarm nodes that must be in the Up state
- nodeCount int
- }
- // file represents the file uploaded on a particular node.
- type file struct {
- addr storage.Address
- data string
- nodeID discover.NodeID
- }
- // check represents a reference to a file that is retrieved
- // from a particular node.
- type check struct {
- key string
- nodeID discover.NodeID
- }
- // testSwarmNetworkOptions contains optional parameters for running
- // testSwarmNetwork.
- type testSwarmNetworkOptions struct {
- Timeout time.Duration
- SkipCheck bool
- }
- // testSwarmNetwork is a helper function used for testing different
- // static and dynamic Swarm network simulations.
- // It is responsible for:
- // - Setting up a Swarm network simulation, and updates the number of nodes within the network on every step according to steps.
- // - Uploading a unique file to every node on every step.
- // - May wait for Kademlia on every node to be healthy.
- // - Checking if a file is retrievable from all nodes.
- func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwarmNetworkStep) {
- dir, err := ioutil.TempDir("", "swarm-network-test")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
- if o == nil {
- o = new(testSwarmNetworkOptions)
- }
- ctx := context.Background()
- if o.Timeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, o.Timeout)
- defer cancel()
- }
- swarms := make(map[discover.NodeID]*Swarm)
- files := make([]file, 0)
- services := map[string]adapters.ServiceFunc{
- "swarm": func(ctx *adapters.ServiceContext) (node.Service, error) {
- config := api.NewConfig()
- dir, err := ioutil.TempDir(dir, "node")
- if err != nil {
- return nil, err
- }
- config.Path = dir
- privkey, err := crypto.GenerateKey()
- if err != nil {
- return nil, err
- }
- config.Init(privkey)
- config.DeliverySkipCheck = o.SkipCheck
- s, err := NewSwarm(config, nil)
- if err != nil {
- return nil, err
- }
- log.Info("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", s.bzz.BaseAddr()))
- swarms[ctx.Config.ID] = s
- return s, nil
- },
- }
- a := adapters.NewSimAdapter(services)
- net := simulations.NewNetwork(a, &simulations.NetworkConfig{
- ID: "0",
- DefaultService: "swarm",
- })
- defer net.Shutdown()
- trigger := make(chan discover.NodeID)
- sim := simulations.NewSimulation(net)
- for i, step := range steps {
- log.Debug("test sync step", "n", i+1, "nodes", step.nodeCount)
- change := step.nodeCount - len(allNodeIDs(net))
- if change > 0 {
- _, err := addNodes(change, net)
- if err != nil {
- t.Fatal(err)
- }
- } else if change < 0 {
- err := removeNodes(-change, net)
- if err != nil {
- t.Fatal(err)
- }
- } else {
- t.Logf("step %v: no change in nodes", i)
- continue
- }
- nodeIDs := allNodeIDs(net)
- shuffle(len(nodeIDs), func(i, j int) {
- nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i]
- })
- for _, id := range nodeIDs {
- key, data, err := uploadFile(swarms[id])
- if err != nil {
- t.Fatal(err)
- }
- log.Trace("file uploaded", "node", id, "key", key.String())
- files = append(files, file{
- addr: key,
- data: data,
- nodeID: id,
- })
- }
- // Prepare PeerPot map for checking Kademlia health
- var ppmap map[string]*network.PeerPot
- nIDs := allNodeIDs(net)
- addrs := make([][]byte, len(nIDs))
- if *waitKademlia {
- for i, id := range nIDs {
- addrs[i] = swarms[id].bzz.BaseAddr()
- }
- ppmap = network.NewPeerPotMap(2, addrs)
- }
- var checkStatusM sync.Map
- var nodeStatusM sync.Map
- var totalFoundCount uint64
- result := sim.Run(ctx, &simulations.Step{
- Action: func(ctx context.Context) error {
- if *waitKademlia {
- // Wait for healthy Kademlia on every node before checking files
- ticker := time.NewTicker(200 * time.Millisecond)
- defer ticker.Stop()
- for range ticker.C {
- healthy := true
- log.Debug("kademlia health check", "node count", len(nIDs), "addr count", len(addrs))
- for i, id := range nIDs {
- swarm := swarms[id]
- //PeerPot for this node
- addr := common.Bytes2Hex(swarm.bzz.BaseAddr())
- pp := ppmap[addr]
- //call Healthy RPC
- h := swarm.bzz.Healthy(pp)
- //print info
- log.Debug(swarm.bzz.String())
- log.Debug("kademlia", "empty bins", pp.EmptyBins, "gotNN", h.GotNN, "knowNN", h.KnowNN, "full", h.Full)
- log.Debug("kademlia", "health", h.GotNN && h.KnowNN && h.Full, "addr", fmt.Sprintf("%x", swarm.bzz.BaseAddr()), "id", id, "i", i)
- log.Debug("kademlia", "ill condition", !h.GotNN || !h.Full, "addr", fmt.Sprintf("%x", swarm.bzz.BaseAddr()), "id", id, "i", i)
- if !h.GotNN || !h.Full {
- healthy = false
- break
- }
- }
- if healthy {
- break
- }
- }
- }
- go func() {
- // File retrieval check is repeated until all uploaded files are retrieved from all nodes
- // or until the timeout is reached.
- for {
- if retrieve(net, files, swarms, trigger, &checkStatusM, &nodeStatusM, &totalFoundCount) == 0 {
- return
- }
- }
- }()
- return nil
- },
- Trigger: trigger,
- Expect: &simulations.Expectation{
- Nodes: allNodeIDs(net),
- Check: func(ctx context.Context, id discover.NodeID) (bool, error) {
- // The check is done by a goroutine in the action function.
- return true, nil
- },
- },
- })
- if result.Error != nil {
- t.Fatal(result.Error)
- }
- log.Debug("done: test sync step", "n", i+1, "nodes", step.nodeCount)
- }
- }
- // allNodeIDs is returning NodeID for every node that is Up.
- func allNodeIDs(net *simulations.Network) (nodes []discover.NodeID) {
- for _, n := range net.GetNodes() {
- if n.Up {
- nodes = append(nodes, n.ID())
- }
- }
- return
- }
- // addNodes adds a number of nodes to the network.
- func addNodes(count int, net *simulations.Network) (ids []discover.NodeID, err error) {
- for i := 0; i < count; i++ {
- nodeIDs := allNodeIDs(net)
- l := len(nodeIDs)
- nodeconf := adapters.RandomNodeConfig()
- node, err := net.NewNodeWithConfig(nodeconf)
- if err != nil {
- return nil, fmt.Errorf("create node: %v", err)
- }
- err = net.Start(node.ID())
- if err != nil {
- return nil, fmt.Errorf("start node: %v", err)
- }
- log.Debug("created node", "id", node.ID())
- // connect nodes in a chain
- if l > 0 {
- var otherNodeID discover.NodeID
- for i := l - 1; i >= 0; i-- {
- n := net.GetNode(nodeIDs[i])
- if n.Up {
- otherNodeID = n.ID()
- break
- }
- }
- log.Debug("connect nodes", "one", node.ID(), "other", otherNodeID)
- if err := net.Connect(node.ID(), otherNodeID); err != nil {
- return nil, err
- }
- }
- ids = append(ids, node.ID())
- }
- return ids, nil
- }
- // removeNodes stops a random nodes in the network.
- func removeNodes(count int, net *simulations.Network) error {
- for i := 0; i < count; i++ {
- // allNodeIDs are returning only the Up nodes.
- nodeIDs := allNodeIDs(net)
- if len(nodeIDs) == 0 {
- break
- }
- node := net.GetNode(nodeIDs[rand.Intn(len(nodeIDs))])
- if err := node.Stop(); err != nil {
- return err
- }
- log.Debug("removed node", "id", node.ID())
- }
- return nil
- }
- // uploadFile, uploads a short file to the swarm instance
- // using the api.Put method.
- func uploadFile(swarm *Swarm) (storage.Address, string, error) {
- b := make([]byte, 8)
- _, err := rand.Read(b)
- if err != nil {
- return nil, "", err
- }
- // File data is very short, but it is ensured that its
- // uniqueness is very certain.
- data := fmt.Sprintf("test content %s %x", time.Now().Round(0), b)
- ctx := context.TODO()
- k, wait, err := swarm.api.Put(ctx, data, "text/plain", false)
- if err != nil {
- return nil, "", err
- }
- if wait != nil {
- err = wait(ctx)
- }
- return k, data, err
- }
- // retrieve is the function that is used for checking the availability of
- // uploaded files in testSwarmNetwork test helper function.
- func retrieve(
- net *simulations.Network,
- files []file,
- swarms map[discover.NodeID]*Swarm,
- trigger chan discover.NodeID,
- checkStatusM *sync.Map,
- nodeStatusM *sync.Map,
- totalFoundCount *uint64,
- ) (missing uint64) {
- shuffle(len(files), func(i, j int) {
- files[i], files[j] = files[j], files[i]
- })
- var totalWg sync.WaitGroup
- errc := make(chan error)
- nodeIDs := allNodeIDs(net)
- totalCheckCount := len(nodeIDs) * len(files)
- for _, id := range nodeIDs {
- if _, ok := nodeStatusM.Load(id); ok {
- continue
- }
- start := time.Now()
- var checkCount uint64
- var foundCount uint64
- totalWg.Add(1)
- var wg sync.WaitGroup
- for _, f := range files {
- swarm := swarms[id]
- checkKey := check{
- key: f.addr.String(),
- nodeID: id,
- }
- if n, ok := checkStatusM.Load(checkKey); ok && n.(int) == 0 {
- continue
- }
- checkCount++
- wg.Add(1)
- go func(f file, id discover.NodeID) {
- defer wg.Done()
- log.Debug("api get: check file", "node", id.String(), "key", f.addr.String(), "total files found", atomic.LoadUint64(totalFoundCount))
- r, _, _, _, err := swarm.api.Get(context.TODO(), f.addr, "/")
- if err != nil {
- errc <- fmt.Errorf("api get: node %s, key %s, kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err)
- return
- }
- d, err := ioutil.ReadAll(r)
- if err != nil {
- errc <- fmt.Errorf("api get: read response: node %s, key %s: kademlia %s: %v", id, f.addr, swarm.bzz.Hive, err)
- return
- }
- data := string(d)
- if data != f.data {
- errc <- fmt.Errorf("file contend missmatch: node %s, key %s, expected %q, got %q", id, f.addr, f.data, data)
- return
- }
- checkStatusM.Store(checkKey, 0)
- atomic.AddUint64(&foundCount, 1)
- log.Info("api get: file found", "node", id.String(), "key", f.addr.String(), "content", data, "files found", atomic.LoadUint64(&foundCount))
- }(f, id)
- }
- go func(id discover.NodeID) {
- defer totalWg.Done()
- wg.Wait()
- atomic.AddUint64(totalFoundCount, foundCount)
- if foundCount == checkCount {
- log.Info("all files are found for node", "id", id.String(), "duration", time.Since(start))
- nodeStatusM.Store(id, 0)
- trigger <- id
- return
- }
- log.Debug("files missing for node", "id", id.String(), "check", checkCount, "found", foundCount)
- }(id)
- }
- go func() {
- totalWg.Wait()
- close(errc)
- }()
- var errCount int
- for err := range errc {
- if err != nil {
- errCount++
- }
- log.Warn(err.Error())
- }
- log.Info("check stats", "total check count", totalCheckCount, "total files found", atomic.LoadUint64(totalFoundCount), "total errors", errCount)
- return uint64(totalCheckCount) - atomic.LoadUint64(totalFoundCount)
- }
- // Backported from stdlib https://golang.org/src/math/rand/rand.go?s=11175:11215#L333
- //
- // Replace with rand.Shuffle from go 1.10 when go 1.9 support is dropped.
- //
- // shuffle pseudo-randomizes the order of elements.
- // n is the number of elements. Shuffle panics if n < 0.
- // swap swaps the elements with indexes i and j.
- func shuffle(n int, swap func(i, j int)) {
- if n < 0 {
- panic("invalid argument to Shuffle")
- }
- // Fisher-Yates shuffle: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
- // Shuffle really ought not be called with n that doesn't fit in 32 bits.
- // Not only will it take a very long time, but with 2³¹! possible permutations,
- // there's no way that any PRNG can have a big enough internal state to
- // generate even a minuscule percentage of the possible permutations.
- // Nevertheless, the right API signature accepts an int n, so handle it as best we can.
- i := n - 1
- for ; i > 1<<31-1-1; i-- {
- j := int(rand.Int63n(int64(i + 1)))
- swap(i, j)
- }
- for ; i > 0; i-- {
- j := int(rand.Int31n(int32(i + 1)))
- swap(i, j)
- }
- }
|