浏览代码

eth/downloader: fix peer idleness tracking when restarting state sync (#21260)

This fixes two issues with state sync restarts:

When sync restarts with a new root, some peers can have in-flight requests.
Since all peers with active requests were marked idle when exiting sync,
the new sync would schedule more requests for those peers. When the
response for the earlier request arrived, the new sync would reject it and
mark the peer idle again, rendering the peer useless until it disconnected.

The other issue was that peers would not be marked idle when they had
delivered a response, but the response hadn't been processed before
restarting the state sync. This also made the peer useless because it
would be permanently marked busy.

Co-authored-by: Felix Lange <fjl@twurst.com>
Martin Holst Swende 5 年之前
父节点
当前提交
967d8de77a
共有 2 个文件被更改,包括 67 次插入35 次删除
  1. 9 22
      eth/downloader/downloader.go
  2. 58 13
      eth/downloader/statesync.go

+ 9 - 22
eth/downloader/downloader.go

@@ -328,7 +328,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
 		return err
 		return err
 	}
 	}
 
 
-	if errors.Is(err, errInvalidChain) {
+	if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
+		errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
+		errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
 		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
 		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
 		if d.dropPeer == nil {
 		if d.dropPeer == nil {
 			// The dropPeer method is nil when `--copydb` is used for a local copy.
 			// The dropPeer method is nil when `--copydb` is used for a local copy.
@@ -339,22 +341,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
 		}
 		}
 		return err
 		return err
 	}
 	}
-
-	switch err {
-	case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
-		errEmptyHeaderSet, errPeersUnavailable, errTooOld,
-		errInvalidAncestor:
-		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
-		if d.dropPeer == nil {
-			// The dropPeer method is nil when `--copydb` is used for a local copy.
-			// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
-			log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
-		} else {
-			d.dropPeer(id)
-		}
-	default:
-		log.Warn("Synchronisation failed, retrying", "err", err)
-	}
+	log.Warn("Synchronisation failed, retrying", "err", err)
 	return err
 	return err
 }
 }
 
 
@@ -643,7 +630,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
 			headers := packet.(*headerPack).headers
 			headers := packet.(*headerPack).headers
 			if len(headers) != 1 {
 			if len(headers) != 1 {
 				p.log.Debug("Multiple headers for single request", "headers", len(headers))
 				p.log.Debug("Multiple headers for single request", "headers", len(headers))
-				return nil, errBadPeer
+				return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
 			}
 			}
 			head := headers[0]
 			head := headers[0]
 			if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
 			if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
@@ -876,7 +863,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
 				headers := packer.(*headerPack).headers
 				headers := packer.(*headerPack).headers
 				if len(headers) != 1 {
 				if len(headers) != 1 {
 					p.log.Debug("Multiple headers for single request", "headers", len(headers))
 					p.log.Debug("Multiple headers for single request", "headers", len(headers))
-					return 0, errBadPeer
+					return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
 				}
 				}
 				arrived = true
 				arrived = true
 
 
@@ -900,7 +887,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
 				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
 				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
 				if header.Number.Uint64() != check {
 				if header.Number.Uint64() != check {
 					p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
 					p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
-					return 0, errBadPeer
+					return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
 				}
 				}
 				start = check
 				start = check
 				hash = h
 				hash = h
@@ -1092,7 +1079,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
 			case d.headerProcCh <- nil:
 			case d.headerProcCh <- nil:
 			case <-d.cancelCh:
 			case <-d.cancelCh:
 			}
 			}
-			return errBadPeer
+			return fmt.Errorf("%w: header request timed out", errBadPeer)
 		}
 		}
 	}
 	}
 }
 }
@@ -1520,7 +1507,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 					inserts := d.queue.Schedule(chunk, origin)
 					inserts := d.queue.Schedule(chunk, origin)
 					if len(inserts) != len(chunk) {
 					if len(inserts) != len(chunk) {
 						log.Debug("Stale headers")
 						log.Debug("Stale headers")
-						return errBadPeer
+						return fmt.Errorf("%w: stale headers", errBadPeer)
 					}
 					}
 				}
 				}
 				headers = headers[limit:]
 				headers = headers[limit:]

+ 58 - 13
eth/downloader/statesync.go

@@ -63,6 +63,10 @@ func (d *Downloader) syncState(root common.Hash) *stateSync {
 	s := newStateSync(d, root)
 	s := newStateSync(d, root)
 	select {
 	select {
 	case d.stateSyncStart <- s:
 	case d.stateSyncStart <- s:
+		// If we tell the statesync to restart with a new root, we also need
+		// to wait for it to actually also start -- when old requests have timed
+		// out or been delivered
+		<-s.started
 	case <-d.quitCh:
 	case <-d.quitCh:
 		s.err = errCancelStateFetch
 		s.err = errCancelStateFetch
 		close(s.done)
 		close(s.done)
@@ -95,15 +99,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
 		finished []*stateReq                  // Completed or failed requests
 		finished []*stateReq                  // Completed or failed requests
 		timeout  = make(chan *stateReq)       // Timed out active requests
 		timeout  = make(chan *stateReq)       // Timed out active requests
 	)
 	)
-	defer func() {
-		// Cancel active request timers on exit. Also set peers to idle so they're
-		// available for the next sync.
-		for _, req := range active {
-			req.timer.Stop()
-			req.peer.SetNodeDataIdle(len(req.items))
-		}
-	}()
+
 	// Run the state sync.
 	// Run the state sync.
+	log.Trace("State sync starting", "root", s.root)
 	go s.run()
 	go s.run()
 	defer s.Cancel()
 	defer s.Cancel()
 
 
@@ -126,9 +124,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
 		select {
 		select {
 		// The stateSync lifecycle:
 		// The stateSync lifecycle:
 		case next := <-d.stateSyncStart:
 		case next := <-d.stateSyncStart:
+			d.spindownStateSync(active, finished, timeout, peerDrop)
 			return next
 			return next
 
 
 		case <-s.done:
 		case <-s.done:
+			d.spindownStateSync(active, finished, timeout, peerDrop)
 			return nil
 			return nil
 
 
 		// Send the next finished request to the current sync:
 		// Send the next finished request to the current sync:
@@ -189,11 +189,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
 			// causes valid requests to go missing and sync to get stuck.
 			// causes valid requests to go missing and sync to get stuck.
 			if old := active[req.peer.id]; old != nil {
 			if old := active[req.peer.id]; old != nil {
 				log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
 				log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
-
-				// Make sure the previous one doesn't get siletly lost
+				// Move the previous request to the finished set
 				old.timer.Stop()
 				old.timer.Stop()
 				old.dropped = true
 				old.dropped = true
-
 				finished = append(finished, old)
 				finished = append(finished, old)
 			}
 			}
 			// Start a timer to notify the sync loop if the peer stalled.
 			// Start a timer to notify the sync loop if the peer stalled.
@@ -210,6 +208,46 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
 	}
 	}
 }
 }
 
 
+// spindownStateSync 'drains' the outstanding requests; some will be delivered and other
+// will time out. This is to ensure that when the next stateSync starts working, all peers
+// are marked as idle and de facto _are_ idle.
+func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*stateReq, timeout chan *stateReq, peerDrop chan *peerConnection) {
+	log.Trace("State sync spinning down", "active", len(active), "finished", len(finished))
+
+	for len(active) > 0 {
+		var (
+			req    *stateReq
+			reason string
+		)
+		select {
+		// Handle (drop) incoming state packs:
+		case pack := <-d.stateCh:
+			req = active[pack.PeerId()]
+			reason = "delivered"
+		// Handle dropped peer connections:
+		case p := <-peerDrop:
+			req = active[p.id]
+			reason = "peerdrop"
+		// Handle timed-out requests:
+		case req = <-timeout:
+			reason = "timeout"
+		}
+		if req == nil {
+			continue
+		}
+		req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason)
+		req.timer.Stop()
+		delete(active, req.peer.id)
+		req.peer.SetNodeDataIdle(len(req.items))
+	}
+	// The 'finished' set contains deliveries that we were going to pass to processing.
+	// Those are now moot, but we still need to set those peers as idle, which would
+	// otherwise have been done after processing
+	for _, req := range finished {
+		req.peer.SetNodeDataIdle(len(req.items))
+	}
+}
+
 // stateSync schedules requests for downloading a particular state trie defined
 // stateSync schedules requests for downloading a particular state trie defined
 // by a given state root.
 // by a given state root.
 type stateSync struct {
 type stateSync struct {
@@ -222,11 +260,15 @@ type stateSync struct {
 	numUncommitted   int
 	numUncommitted   int
 	bytesUncommitted int
 	bytesUncommitted int
 
 
+	started chan struct{} // Started is signalled once the sync loop starts
+
 	deliver    chan *stateReq // Delivery channel multiplexing peer responses
 	deliver    chan *stateReq // Delivery channel multiplexing peer responses
 	cancel     chan struct{}  // Channel to signal a termination request
 	cancel     chan struct{}  // Channel to signal a termination request
 	cancelOnce sync.Once      // Ensures cancel only ever gets called once
 	cancelOnce sync.Once      // Ensures cancel only ever gets called once
 	done       chan struct{}  // Channel to signal termination completion
 	done       chan struct{}  // Channel to signal termination completion
 	err        error          // Any error hit during sync (set before completion)
 	err        error          // Any error hit during sync (set before completion)
+
+	root common.Hash
 }
 }
 
 
 // stateTask represents a single trie node download task, containing a set of
 // stateTask represents a single trie node download task, containing a set of
@@ -246,6 +288,8 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
 		deliver: make(chan *stateReq),
 		deliver: make(chan *stateReq),
 		cancel:  make(chan struct{}),
 		cancel:  make(chan struct{}),
 		done:    make(chan struct{}),
 		done:    make(chan struct{}),
+		started: make(chan struct{}),
+		root:    root,
 	}
 	}
 }
 }
 
 
@@ -276,6 +320,7 @@ func (s *stateSync) Cancel() error {
 // pushed here async. The reason is to decouple processing from data receipt
 // pushed here async. The reason is to decouple processing from data receipt
 // and timeouts.
 // and timeouts.
 func (s *stateSync) loop() (err error) {
 func (s *stateSync) loop() (err error) {
+	close(s.started)
 	// Listen for new peer events to assign tasks to them
 	// Listen for new peer events to assign tasks to them
 	newPeer := make(chan *peerConnection, 1024)
 	newPeer := make(chan *peerConnection, 1024)
 	peerSub := s.d.peers.SubscribeNewPeers(newPeer)
 	peerSub := s.d.peers.SubscribeNewPeers(newPeer)
@@ -331,11 +376,11 @@ func (s *stateSync) loop() (err error) {
 			}
 			}
 			// Process all the received blobs and check for stale delivery
 			// Process all the received blobs and check for stale delivery
 			delivered, err := s.process(req)
 			delivered, err := s.process(req)
+			req.peer.SetNodeDataIdle(delivered)
 			if err != nil {
 			if err != nil {
 				log.Warn("Node data write error", "err", err)
 				log.Warn("Node data write error", "err", err)
 				return err
 				return err
 			}
 			}
-			req.peer.SetNodeDataIdle(delivered)
 		}
 		}
 	}
 	}
 	return nil
 	return nil
@@ -372,7 +417,7 @@ func (s *stateSync) assignTasks() {
 
 
 		// If the peer was assigned tasks to fetch, send the network request
 		// If the peer was assigned tasks to fetch, send the network request
 		if len(req.items) > 0 {
 		if len(req.items) > 0 {
-			req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
+			req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root)
 			select {
 			select {
 			case s.d.trackStateReq <- req:
 			case s.d.trackStateReq <- req:
 				req.peer.FetchNodeData(req.items)
 				req.peer.FetchNodeData(req.items)