|
|
@@ -234,3 +234,170 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
|
|
|
t.Fatal(result.Error)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+//TestSameVersionID just checks that if the version is not changed,
|
|
|
+//then streamer peers see each other
|
|
|
+func TestSameVersionID(t *testing.T) {
|
|
|
+ //test version ID
|
|
|
+ v := uint(1)
|
|
|
+ sim := simulation.New(map[string]simulation.ServiceFunc{
|
|
|
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
|
|
|
+ var store storage.ChunkStore
|
|
|
+ var datadir string
|
|
|
+
|
|
|
+ node := ctx.Config.Node()
|
|
|
+ addr := network.NewAddr(node)
|
|
|
+
|
|
|
+ store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+ bucket.Store(bucketKeyStore, store)
|
|
|
+ cleanup = func() {
|
|
|
+ store.Close()
|
|
|
+ os.RemoveAll(datadir)
|
|
|
+ }
|
|
|
+ localStore := store.(*storage.LocalStore)
|
|
|
+ netStore, err := storage.NewNetStore(localStore, nil)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+ bucket.Store(bucketKeyDB, netStore)
|
|
|
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
|
|
|
+ delivery := NewDelivery(kad, netStore)
|
|
|
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
|
|
|
+
|
|
|
+ bucket.Store(bucketKeyDelivery, delivery)
|
|
|
+
|
|
|
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
|
|
|
+ Retrieval: RetrievalDisabled,
|
|
|
+ Syncing: SyncingAutoSubscribe,
|
|
|
+ }, nil)
|
|
|
+ //assign to each node the same version ID
|
|
|
+ r.spec.Version = v
|
|
|
+
|
|
|
+ bucket.Store(bucketKeyRegistry, r)
|
|
|
+
|
|
|
+ return r, cleanup, nil
|
|
|
+
|
|
|
+ },
|
|
|
+ })
|
|
|
+ defer sim.Close()
|
|
|
+
|
|
|
+ //connect just two nodes
|
|
|
+ log.Info("Adding nodes to simulation")
|
|
|
+ _, err := sim.AddNodesAndConnectChain(2)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("Starting simulation")
|
|
|
+ ctx := context.Background()
|
|
|
+ //make sure they have time to connect
|
|
|
+ time.Sleep(200 * time.Millisecond)
|
|
|
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
|
|
|
+ //get the pivot node's filestore
|
|
|
+ nodes := sim.UpNodeIDs()
|
|
|
+
|
|
|
+ item, ok := sim.NodeItem(nodes[0], bucketKeyRegistry)
|
|
|
+ if !ok {
|
|
|
+ return fmt.Errorf("No filestore")
|
|
|
+ }
|
|
|
+ registry := item.(*Registry)
|
|
|
+
|
|
|
+ //the peers should connect, thus getting the peer should not return nil
|
|
|
+ if registry.getPeer(nodes[1]) == nil {
|
|
|
+ t.Fatal("Expected the peer to not be nil, but it is")
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ if result.Error != nil {
|
|
|
+ t.Fatal(result.Error)
|
|
|
+ }
|
|
|
+ log.Info("Simulation ended")
|
|
|
+}
|
|
|
+
|
|
|
+//TestDifferentVersionID proves that if the streamer protocol version doesn't match,
|
|
|
+//then the peers are not connected at streamer level
|
|
|
+func TestDifferentVersionID(t *testing.T) {
|
|
|
+ //create a variable to hold the version ID
|
|
|
+ v := uint(0)
|
|
|
+ sim := simulation.New(map[string]simulation.ServiceFunc{
|
|
|
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
|
|
|
+ var store storage.ChunkStore
|
|
|
+ var datadir string
|
|
|
+
|
|
|
+ node := ctx.Config.Node()
|
|
|
+ addr := network.NewAddr(node)
|
|
|
+
|
|
|
+ store, datadir, err = createTestLocalStorageForID(node.ID(), addr)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+ bucket.Store(bucketKeyStore, store)
|
|
|
+ cleanup = func() {
|
|
|
+ store.Close()
|
|
|
+ os.RemoveAll(datadir)
|
|
|
+ }
|
|
|
+ localStore := store.(*storage.LocalStore)
|
|
|
+ netStore, err := storage.NewNetStore(localStore, nil)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, err
|
|
|
+ }
|
|
|
+ bucket.Store(bucketKeyDB, netStore)
|
|
|
+ kad := network.NewKademlia(addr.Over(), network.NewKadParams())
|
|
|
+ delivery := NewDelivery(kad, netStore)
|
|
|
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
|
|
|
+
|
|
|
+ bucket.Store(bucketKeyDelivery, delivery)
|
|
|
+
|
|
|
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
|
|
|
+ Retrieval: RetrievalDisabled,
|
|
|
+ Syncing: SyncingAutoSubscribe,
|
|
|
+ }, nil)
|
|
|
+
|
|
|
+ //increase the version ID for each node
|
|
|
+ v++
|
|
|
+ r.spec.Version = v
|
|
|
+
|
|
|
+ bucket.Store(bucketKeyRegistry, r)
|
|
|
+
|
|
|
+ return r, cleanup, nil
|
|
|
+
|
|
|
+ },
|
|
|
+ })
|
|
|
+ defer sim.Close()
|
|
|
+
|
|
|
+ //connect the nodes
|
|
|
+ log.Info("Adding nodes to simulation")
|
|
|
+ _, err := sim.AddNodesAndConnectChain(2)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Info("Starting simulation")
|
|
|
+ ctx := context.Background()
|
|
|
+ //make sure they have time to connect
|
|
|
+ time.Sleep(200 * time.Millisecond)
|
|
|
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
|
|
|
+ //get the pivot node's filestore
|
|
|
+ nodes := sim.UpNodeIDs()
|
|
|
+
|
|
|
+ item, ok := sim.NodeItem(nodes[0], bucketKeyRegistry)
|
|
|
+ if !ok {
|
|
|
+ return fmt.Errorf("No filestore")
|
|
|
+ }
|
|
|
+ registry := item.(*Registry)
|
|
|
+
|
|
|
+ //getting the other peer should fail due to the different version numbers
|
|
|
+ if registry.getPeer(nodes[1]) != nil {
|
|
|
+ t.Fatal("Expected the peer to be nil, but it is not")
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ if result.Error != nil {
|
|
|
+ t.Fatal(result.Error)
|
|
|
+ }
|
|
|
+ log.Info("Simulation ended")
|
|
|
+
|
|
|
+}
|