瀏覽代碼

swarm/storage/netstore: add fetcher cancellation on shutdown (#19049)

swarm/network/stream: remove netstore internal wg
swarm/network/stream: run individual tests with t.Run
Elad 6 年之前
父節點
當前提交
3ee09ba035
共有 3 個文件被更改,包括 144 次插入120 次删除
  1. 116 113
      swarm/network/stream/delivery_test.go
  2. 9 6
      swarm/network/stream/snapshot_retrieval_test.go
  3. 19 1
      swarm/storage/netstore.go

+ 116 - 113
swarm/network/stream/delivery_test.go

@@ -453,133 +453,136 @@ func TestDeliveryFromNodes(t *testing.T) {
 }
 
 func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
-	sim := simulation.New(map[string]simulation.ServiceFunc{
-		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-			addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
-			if err != nil {
-				return nil, nil, err
-			}
-
-			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
-				SkipCheck: skipCheck,
-				Syncing:   SyncingDisabled,
-				Retrieval: RetrievalEnabled,
-			}, nil)
-			bucket.Store(bucketKeyRegistry, r)
+	t.Helper()
+	t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
+		sim := simulation.New(map[string]simulation.ServiceFunc{
+			"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+				addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+				if err != nil {
+					return nil, nil, err
+				}
 
-			cleanup = func() {
-				r.Close()
-				clean()
-			}
+				r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+					SkipCheck: skipCheck,
+					Syncing:   SyncingDisabled,
+					Retrieval: RetrievalEnabled,
+				}, nil)
+				bucket.Store(bucketKeyRegistry, r)
 
-			return r, cleanup, nil
-		},
-	})
-	defer sim.Close()
+				cleanup = func() {
+					r.Close()
+					clean()
+				}
 
-	log.Info("Adding nodes to simulation")
-	_, err := sim.AddNodesAndConnectChain(nodes)
-	if err != nil {
-		t.Fatal(err)
-	}
+				return r, cleanup, nil
+			},
+		})
+		defer sim.Close()
 
-	log.Info("Starting simulation")
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
-		nodeIDs := sim.UpNodeIDs()
-		//determine the pivot node to be the first node of the simulation
-		pivot := nodeIDs[0]
-
-		//distribute chunks of a random file into Stores of nodes 1 to nodes
-		//we will do this by creating a file store with an underlying round-robin store:
-		//the file store will create a hash for the uploaded file, but every chunk will be
-		//distributed to different nodes via round-robin scheduling
-		log.Debug("Writing file to round-robin file store")
-		//to do this, we create an array for chunkstores (length minus one, the pivot node)
-		stores := make([]storage.ChunkStore, len(nodeIDs)-1)
-		//we then need to get all stores from the sim....
-		lStores := sim.NodesItems(bucketKeyStore)
-		i := 0
-		//...iterate the buckets...
-		for id, bucketVal := range lStores {
-			//...and remove the one which is the pivot node
-			if id == pivot {
-				continue
-			}
-			//the other ones are added to the array...
-			stores[i] = bucketVal.(storage.ChunkStore)
-			i++
-		}
-		//...which then gets passed to the round-robin file store
-		roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
-		//now we can actually upload a (random) file to the round-robin store
-		size := chunkCount * chunkSize
-		log.Debug("Storing data to file store")
-		fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
-		// wait until all chunks stored
+		log.Info("Adding nodes to simulation")
+		_, err := sim.AddNodesAndConnectChain(nodes)
 		if err != nil {
-			return err
-		}
-		err = wait(ctx)
-		if err != nil {
-			return err
+			t.Fatal(err)
 		}
 
-		log.Debug("Waiting for kademlia")
-		// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
-		if _, err := sim.WaitTillHealthy(ctx); err != nil {
-			return err
-		}
+		log.Info("Starting simulation")
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+		result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+			nodeIDs := sim.UpNodeIDs()
+			//determine the pivot node to be the first node of the simulation
+			pivot := nodeIDs[0]
+
+			//distribute chunks of a random file into Stores of nodes 1 to nodes
+			//we will do this by creating a file store with an underlying round-robin store:
+			//the file store will create a hash for the uploaded file, but every chunk will be
+			//distributed to different nodes via round-robin scheduling
+			log.Debug("Writing file to round-robin file store")
+			//to do this, we create an array for chunkstores (length minus one, the pivot node)
+			stores := make([]storage.ChunkStore, len(nodeIDs)-1)
+			//we then need to get all stores from the sim....
+			lStores := sim.NodesItems(bucketKeyStore)
+			i := 0
+			//...iterate the buckets...
+			for id, bucketVal := range lStores {
+				//...and remove the one which is the pivot node
+				if id == pivot {
+					continue
+				}
+				//the other ones are added to the array...
+				stores[i] = bucketVal.(storage.ChunkStore)
+				i++
+			}
+			//...which then gets passed to the round-robin file store
+			roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams())
+			//now we can actually upload a (random) file to the round-robin store
+			size := chunkCount * chunkSize
+			log.Debug("Storing data to file store")
+			fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
+			// wait until all chunks stored
+			if err != nil {
+				return err
+			}
+			err = wait(ctx)
+			if err != nil {
+				return err
+			}
 
-		//get the pivot node's filestore
-		item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
-		if !ok {
-			return fmt.Errorf("No filestore")
-		}
-		pivotFileStore := item.(*storage.FileStore)
-		log.Debug("Starting retrieval routine")
-		retErrC := make(chan error)
-		go func() {
-			// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
-			// we must wait for the peer connections to have started before requesting
-			n, err := readAll(pivotFileStore, fileHash)
-			log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
-			retErrC <- err
-		}()
+			log.Debug("Waiting for kademlia")
+			// TODO this does not seem to be correct usage of the function, as the simulation may have no kademlias
+			if _, err := sim.WaitTillHealthy(ctx); err != nil {
+				return err
+			}
 
-		disconnected := watchDisconnections(ctx, sim)
-		defer func() {
-			if err != nil && disconnected.bool() {
-				err = errors.New("disconnect events received")
+			//get the pivot node's filestore
+			item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
+			if !ok {
+				return fmt.Errorf("No filestore")
 			}
-		}()
+			pivotFileStore := item.(*storage.FileStore)
+			log.Debug("Starting retrieval routine")
+			retErrC := make(chan error)
+			go func() {
+				// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
+				// we must wait for the peer connections to have started before requesting
+				n, err := readAll(pivotFileStore, fileHash)
+				log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
+				retErrC <- err
+			}()
+
+			disconnected := watchDisconnections(ctx, sim)
+			defer func() {
+				if err != nil && disconnected.bool() {
+					err = errors.New("disconnect events received")
+				}
+			}()
 
-		//finally check that the pivot node gets all chunks via the root hash
-		log.Debug("Check retrieval")
-		success := true
-		var total int64
-		total, err = readAll(pivotFileStore, fileHash)
-		if err != nil {
-			return err
-		}
-		log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
-		if err != nil || total != int64(size) {
-			success = false
-		}
+			//finally check that the pivot node gets all chunks via the root hash
+			log.Debug("Check retrieval")
+			success := true
+			var total int64
+			total, err = readAll(pivotFileStore, fileHash)
+			if err != nil {
+				return err
+			}
+			log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
+			if err != nil || total != int64(size) {
+				success = false
+			}
 
-		if !success {
-			return fmt.Errorf("Test failed, chunks not available on all nodes")
-		}
-		if err := <-retErrC; err != nil {
-			return fmt.Errorf("requesting chunks: %v", err)
+			if !success {
+				return fmt.Errorf("Test failed, chunks not available on all nodes")
+			}
+			if err := <-retErrC; err != nil {
+				return fmt.Errorf("requesting chunks: %v", err)
+			}
+			log.Debug("Test terminated successfully")
+			return nil
+		})
+		if result.Error != nil {
+			t.Fatal(result.Error)
 		}
-		log.Debug("Test terminated successfully")
-		return nil
 	})
-	if result.Error != nil {
-		t.Fatal(result.Error)
-	}
 }
 
 func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {

+ 9 - 6
swarm/network/stream/snapshot_retrieval_test.go

@@ -74,7 +74,7 @@ func TestRetrieval(t *testing.T) {
 	//if nodes/chunks have been provided via commandline,
 	//run the tests with these values
 	if *nodes != 0 && *chunks != 0 {
-		err := runRetrievalTest(*chunks, *nodes)
+		err := runRetrievalTest(t, *chunks, *nodes)
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -93,10 +93,12 @@ func TestRetrieval(t *testing.T) {
 		}
 		for _, n := range nodeCnt {
 			for _, c := range chnkCnt {
-				err := runRetrievalTest(c, n)
-				if err != nil {
-					t.Fatal(err)
-				}
+				t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) {
+					err := runRetrievalTest(t, c, n)
+					if err != nil {
+						t.Fatal(err)
+					}
+				})
 			}
 		}
 	}
@@ -225,7 +227,8 @@ simulation's `action` function.
 
 The snapshot should have 'streamer' in its service list.
 */
-func runRetrievalTest(chunkCount int, nodeCount int) error {
+func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
+	t.Helper()
 	sim := simulation.New(retrievalSimServiceMap)
 	defer sim.Close()
 

+ 19 - 1
swarm/storage/netstore.go

@@ -128,7 +128,25 @@ func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Cont
 func (n *NetStore) Close() {
 	close(n.closeC)
 	n.store.Close()
-	// TODO: loop through fetchers to cancel them
+
+	wg := sync.WaitGroup{}
+	for _, key := range n.fetchers.Keys() {
+		if f, ok := n.fetchers.Get(key); ok {
+			if fetch, ok := f.(*fetcher); ok {
+				wg.Add(1)
+				go func(fetch *fetcher) {
+					defer wg.Done()
+					fetch.cancel()
+
+					select {
+					case <-fetch.deliveredC:
+					case <-fetch.cancelledC:
+					}
+				}(fetch)
+			}
+		}
+	}
+	wg.Wait()
 }
 
 // get attempts at retrieving the chunk from LocalStore