Jelajahi Sumber

eth/protocols/snap: track reverts when peer rejects request (#22016)

* eth/protocols/snap: reschedule missed deliveries

* eth/protocols/snap: clarify log message

* eth/protocols/snap: revert failures async and update runloop

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
Martin Holst Swende 4 tahun lalu
induk
melakukan
58b9db5f7c
1 mengubah file dengan 116 tambahan dan 56 penghapusan
  1. 116 56
      eth/protocols/snap/sync.go

+ 116 - 56
eth/protocols/snap/sync.go

@@ -792,10 +792,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			log.Debug("Account range request timed out")
-			select {
-			case s.accountReqFails <- req:
-			default:
-			}
+			s.scheduleRevertAccountRequest(req)
 		})
 		s.accountReqs[reqid] = req
 		delete(s.accountIdlers, idle)
@@ -807,12 +804,8 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
 			// Attempt to send the remote request and revert if it fails
 			if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, maxRequestSize); err != nil {
 				peer.Log().Debug("Failed to request account range", "err", err)
-				select {
-				case s.accountReqFails <- req:
-				default:
-				}
+				s.scheduleRevertAccountRequest(req)
 			}
-			// Request successfully sent, start a
 		}(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
 
 		// Inject the request into the task to block further assignments
@@ -886,10 +879,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			log.Debug("Bytecode request timed out")
-			select {
-			case s.bytecodeReqFails <- req:
-			default:
-			}
+			s.scheduleRevertBytecodeRequest(req)
 		})
 		s.bytecodeReqs[reqid] = req
 		delete(s.bytecodeIdlers, idle)
@@ -901,12 +891,8 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
 			// Attempt to send the remote request and revert if it fails
 			if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
 				log.Debug("Failed to request bytecodes", "err", err)
-				select {
-				case s.bytecodeReqFails <- req:
-				default:
-				}
+				s.scheduleRevertBytecodeRequest(req)
 			}
-			// Request successfully sent, start a
 		}(s.peers[idle]) // We're in the lock, peers[id] surely exists
 	}
 }
@@ -1018,10 +1004,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			log.Debug("Storage request timed out")
-			select {
-			case s.storageReqFails <- req:
-			default:
-			}
+			s.scheduleRevertStorageRequest(req)
 		})
 		s.storageReqs[reqid] = req
 		delete(s.storageIdlers, idle)
@@ -1037,12 +1020,8 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
 			}
 			if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, maxRequestSize); err != nil {
 				log.Debug("Failed to request storage", "err", err)
-				select {
-				case s.storageReqFails <- req:
-				default:
-				}
+				s.scheduleRevertStorageRequest(req)
 			}
-			// Request successfully sent, start a
 		}(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
 
 		// Inject the request into the subtask to block further assignments
@@ -1140,10 +1119,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			log.Debug("Trienode heal request timed out")
-			select {
-			case s.trienodeHealReqFails <- req:
-			default:
-			}
+			s.scheduleRevertTrienodeHealRequest(req)
 		})
 		s.trienodeHealReqs[reqid] = req
 		delete(s.trienodeHealIdlers, idle)
@@ -1155,12 +1131,8 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
 			// Attempt to send the remote request and revert if it fails
 			if err := peer.RequestTrieNodes(reqid, root, pathsets, maxRequestSize); err != nil {
 				log.Debug("Failed to request trienode healers", "err", err)
-				select {
-				case s.trienodeHealReqFails <- req:
-				default:
-				}
+				s.scheduleRevertTrienodeHealRequest(req)
 			}
-			// Request successfully sent, start a
 		}(s.peers[idle], s.root) // We're in the lock, peers[id] surely exists
 	}
 }
@@ -1245,10 +1217,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			log.Debug("Bytecode heal request timed out")
-			select {
-			case s.bytecodeHealReqFails <- req:
-			default:
-			}
+			s.scheduleRevertBytecodeHealRequest(req)
 		})
 		s.bytecodeHealReqs[reqid] = req
 		delete(s.bytecodeHealIdlers, idle)
@@ -1260,12 +1229,8 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
 			// Attempt to send the remote request and revert if it fails
 			if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
 				log.Debug("Failed to request bytecode healers", "err", err)
-				select {
-				case s.bytecodeHealReqFails <- req:
-				default:
-				}
+				s.scheduleRevertBytecodeHealRequest(req)
 			}
-			// Request successfully sent, start a
 		}(s.peers[idle]) // We're in the lock, peers[id] surely exists
 	}
 }
@@ -1325,10 +1290,26 @@ func (s *Syncer) revertRequests(peer string) {
 	}
 }
 
+// scheduleRevertAccountRequest asks the event loop to clean up an account range
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) {
+	select {
+	case s.accountReqFails <- req:
+		// Sync event loop notified
+	case <-req.cancel:
+		// Sync cycle got cancelled
+	case <-req.stale:
+		// Request already reverted
+	}
+}
+
 // revertAccountRequest cleans up an account range request and returns all failed
 // retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertAccountRequest.
 func (s *Syncer) revertAccountRequest(req *accountRequest) {
-	log.Trace("Reverting account request", "peer", req.peer, "reqid", req.id)
+	log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id)
 	select {
 	case <-req.stale:
 		log.Trace("Account request already reverted", "peer", req.peer, "reqid", req.id)
@@ -1350,10 +1331,26 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) {
 	}
 }
 
-// revertBytecodeRequest cleans up an bytecode request and returns all failed
+// scheduleRevertBytecodeRequest asks the event loop to clean up a bytecode request
+// and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) {
+	select {
+	case s.bytecodeReqFails <- req:
+		// Sync event loop notified
+	case <-req.cancel:
+		// Sync cycle got cancelled
+	case <-req.stale:
+		// Request already reverted
+	}
+}
+
+// revertBytecodeRequest cleans up a bytecode request and returns all failed
 // retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertBytecodeRequest.
 func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
-	log.Trace("Reverting bytecode request", "peer", req.peer)
+	log.Debug("Reverting bytecode request", "peer", req.peer)
 	select {
 	case <-req.stale:
 		log.Trace("Bytecode request already reverted", "peer", req.peer, "reqid", req.id)
@@ -1375,10 +1372,26 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
 	}
 }
 
+// scheduleRevertStorageRequest asks the event loop to clean up a storage range
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) {
+	select {
+	case s.storageReqFails <- req:
+		// Sync event loop notified
+	case <-req.cancel:
+		// Sync cycle got cancelled
+	case <-req.stale:
+		// Request already reverted
+	}
+}
+
 // revertStorageRequest cleans up a storage range request and returns all failed
 // retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertStorageRequest.
 func (s *Syncer) revertStorageRequest(req *storageRequest) {
-	log.Trace("Reverting storage request", "peer", req.peer)
+	log.Debug("Reverting storage request", "peer", req.peer)
 	select {
 	case <-req.stale:
 		log.Trace("Storage request already reverted", "peer", req.peer, "reqid", req.id)
@@ -1404,10 +1417,26 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) {
 	}
 }
 
-// revertTrienodeHealRequest cleans up an trienode heal request and returns all
+// scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
+	select {
+	case s.trienodeHealReqFails <- req:
+		// Sync event loop notified
+	case <-req.cancel:
+		// Sync cycle got cancelled
+	case <-req.stale:
+		// Request already reverted
+	}
+}
+
+// revertTrienodeHealRequest cleans up a trienode heal request and returns all
 // failed retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertTrienodeHealRequest.
 func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
-	log.Trace("Reverting trienode heal request", "peer", req.peer)
+	log.Debug("Reverting trienode heal request", "peer", req.peer)
 	select {
 	case <-req.stale:
 		log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id)
@@ -1429,10 +1458,26 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
 	}
 }
 
-// revertBytecodeHealRequest cleans up an bytecode request and returns all failed
-// retrieval tasks to the scheduler for reassignment.
+// scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
+	select {
+	case s.bytecodeHealReqFails <- req:
+		// Sync event loop notified
+	case <-req.cancel:
+		// Sync cycle got cancelled
+	case <-req.stale:
+		// Request already reverted
+	}
+}
+
+// revertBytecodeHealRequest cleans up a bytecode heal request and returns all
+// failed retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertBytecodeHealRequest.
 func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
-	log.Trace("Reverting bytecode heal request", "peer", req.peer)
+	log.Debug("Reverting bytecode heal request", "peer", req.peer)
 	select {
 	case <-req.stale:
 		log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id)
@@ -1768,7 +1813,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
 	if err := batch.Write(); err != nil {
 		log.Crit("Failed to persist healing data", "err", err)
 	}
-	log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize()))
+	log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
 }
 
 // processBytecodeHealResponse integrates an already validated bytecode response
@@ -1804,7 +1849,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
 	if err := batch.Write(); err != nil {
 		log.Crit("Failed to persist healing data", "err", err)
 	}
-	log.Debug("Persisted set of healing data", "bytes", common.StorageSize(batch.ValueSize()))
+	log.Debug("Persisted set of healing data", "type", "bytecode", "bytes", common.StorageSize(batch.ValueSize()))
 }
 
 // forwardAccountTask takes a filled account task and persists anything available
@@ -1940,6 +1985,9 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
 		logger.Debug("Peer rejected account range request", "root", s.root)
 		s.statelessPeers[peer.id] = struct{}{}
 		s.lock.Unlock()
+
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertAccountRequest(req)
 		return nil
 	}
 	root := s.root
@@ -2055,6 +2103,9 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
 		logger.Debug("Peer rejected bytecode request")
 		s.statelessPeers[peer.id] = struct{}{}
 		s.lock.Unlock()
+
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertBytecodeRequest(req)
 		return nil
 	}
 	s.lock.Unlock()
@@ -2166,6 +2217,9 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
 		logger.Debug("Peer rejected storage request")
 		s.statelessPeers[peer.id] = struct{}{}
 		s.lock.Unlock()
+
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertStorageRequest(req)
 		return nil
 	}
 	s.lock.Unlock()
@@ -2287,6 +2341,9 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
 		logger.Debug("Peer rejected trienode heal request")
 		s.statelessPeers[peer.id] = struct{}{}
 		s.lock.Unlock()
+
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertTrienodeHealRequest(req)
 		return nil
 	}
 	s.lock.Unlock()
@@ -2371,6 +2428,9 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro
 		logger.Debug("Peer rejected bytecode heal request")
 		s.statelessPeers[peer.id] = struct{}{}
 		s.lock.Unlock()
+
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertBytecodeHealRequest(req)
 		return nil
 	}
 	s.lock.Unlock()