Selaa lähdekoodia

eth/protocols/snap: use ephemeral channels to avoid cross-sync delveries

Péter Szilágyi 4 vuotta sitten
vanhempi
commit
9553c98de8
1 muutettua tiedostoa jossa 112 lisäystä ja 101 poistoa
  1. 112 101
      eth/protocols/snap/sync.go

+ 112 - 101
eth/protocols/snap/sync.go

@@ -106,9 +106,11 @@ type accountRequest struct {
 	peer string // Peer to which this request is assigned
 	id   uint64 // Request ID of this request
 
-	cancel  chan struct{} // Channel to track sync cancellation
-	timeout *time.Timer   // Timer to track delivery timeout
-	stale   chan struct{} // Channel to signal the request was dropped
+	deliver chan *accountResponse // Channel to deliver successful response on
+	revert  chan *accountRequest  // Channel to deliver request failure on
+	cancel  chan struct{}         // Channel to track sync cancellation
+	timeout *time.Timer           // Timer to track delivery timeout
+	stale   chan struct{}         // Channel to signal the request was dropped
 
 	origin common.Hash // First account requested to allow continuation checks
 	limit  common.Hash // Last account requested to allow non-overlapping chunking
@@ -147,9 +149,11 @@ type bytecodeRequest struct {
 	peer string // Peer to which this request is assigned
 	id   uint64 // Request ID of this request
 
-	cancel  chan struct{} // Channel to track sync cancellation
-	timeout *time.Timer   // Timer to track delivery timeout
-	stale   chan struct{} // Channel to signal the request was dropped
+	deliver chan *bytecodeResponse // Channel to deliver successful response on
+	revert  chan *bytecodeRequest  // Channel to deliver request failure on
+	cancel  chan struct{}          // Channel to track sync cancellation
+	timeout *time.Timer            // Timer to track delivery timeout
+	stale   chan struct{}          // Channel to signal the request was dropped
 
 	hashes []common.Hash // Bytecode hashes to validate responses
 	task   *accountTask  // Task which this request is filling (only access fields through the runloop!!)
@@ -176,9 +180,11 @@ type storageRequest struct {
 	peer string // Peer to which this request is assigned
 	id   uint64 // Request ID of this request
 
-	cancel  chan struct{} // Channel to track sync cancellation
-	timeout *time.Timer   // Timer to track delivery timeout
-	stale   chan struct{} // Channel to signal the request was dropped
+	deliver chan *storageResponse // Channel to deliver successful response on
+	revert  chan *storageRequest  // Channel to deliver request failure on
+	cancel  chan struct{}         // Channel to track sync cancellation
+	timeout *time.Timer           // Timer to track delivery timeout
+	stale   chan struct{}         // Channel to signal the request was dropped
 
 	accounts []common.Hash // Account hashes to validate responses
 	roots    []common.Hash // Storage roots to validate responses
@@ -224,9 +230,11 @@ type trienodeHealRequest struct {
 	peer string // Peer to which this request is assigned
 	id   uint64 // Request ID of this request
 
-	cancel  chan struct{} // Channel to track sync cancellation
-	timeout *time.Timer   // Timer to track delivery timeout
-	stale   chan struct{} // Channel to signal the request was dropped
+	deliver chan *trienodeHealResponse // Channel to deliver successful response on
+	revert  chan *trienodeHealRequest  // Channel to deliver request failure on
+	cancel  chan struct{}              // Channel to track sync cancellation
+	timeout *time.Timer                // Timer to track delivery timeout
+	stale   chan struct{}              // Channel to signal the request was dropped
 
 	hashes []common.Hash   // Trie node hashes to validate responses
 	paths  []trie.SyncPath // Trie node paths requested for rescheduling
@@ -256,9 +264,11 @@ type bytecodeHealRequest struct {
 	peer string // Peer to which this request is assigned
 	id   uint64 // Request ID of this request
 
-	cancel  chan struct{} // Channel to track sync cancellation
-	timeout *time.Timer   // Timer to track delivery timeout
-	stale   chan struct{} // Channel to signal the request was dropped
+	deliver chan *bytecodeHealResponse // Channel to deliver successful response on
+	revert  chan *bytecodeHealRequest  // Channel to deliver request failure on
+	cancel  chan struct{}              // Channel to track sync cancellation
+	timeout *time.Timer                // Timer to track delivery timeout
+	stale   chan struct{}              // Channel to signal the request was dropped
 
 	hashes []common.Hash // Bytecode hashes to validate responses
 	task   *healTask     // Task which this request is filling (only access fields through the runloop!!)
@@ -399,14 +409,6 @@ type Syncer struct {
 	bytecodeReqs map[uint64]*bytecodeRequest // Bytecode requests currently running
 	storageReqs  map[uint64]*storageRequest  // Storage requests currently running
 
-	accountReqFails  chan *accountRequest  // Failed account range requests to revert
-	bytecodeReqFails chan *bytecodeRequest // Failed bytecode requests to revert
-	storageReqFails  chan *storageRequest  // Failed storage requests to revert
-
-	accountResps  chan *accountResponse  // Account sub-tries to integrate into the database
-	bytecodeResps chan *bytecodeResponse // Bytecodes to integrate into the database
-	storageResps  chan *storageResponse  // Storage sub-tries to integrate into the database
-
 	accountSynced  uint64             // Number of accounts downloaded
 	accountBytes   common.StorageSize // Number of account trie bytes persisted to disk
 	bytecodeSynced uint64             // Number of bytecodes downloaded
@@ -421,12 +423,6 @@ type Syncer struct {
 	trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
 	bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
 
-	trienodeHealReqFails chan *trienodeHealRequest // Failed trienode requests to revert
-	bytecodeHealReqFails chan *bytecodeHealRequest // Failed bytecode requests to revert
-
-	trienodeHealResps chan *trienodeHealResponse // Trie nodes to integrate into the database
-	bytecodeHealResps chan *bytecodeHealResponse // Bytecodes to integrate into the database
-
 	trienodeHealSynced uint64             // Number of state trie nodes downloaded
 	trienodeHealBytes  common.StorageSize // Number of state trie bytes persisted to disk
 	trienodeHealDups   uint64             // Number of state trie nodes already processed
@@ -464,26 +460,16 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
 		storageIdlers:  make(map[string]struct{}),
 		bytecodeIdlers: make(map[string]struct{}),
 
-		accountReqs:      make(map[uint64]*accountRequest),
-		storageReqs:      make(map[uint64]*storageRequest),
-		bytecodeReqs:     make(map[uint64]*bytecodeRequest),
-		accountReqFails:  make(chan *accountRequest),
-		storageReqFails:  make(chan *storageRequest),
-		bytecodeReqFails: make(chan *bytecodeRequest),
-		accountResps:     make(chan *accountResponse),
-		storageResps:     make(chan *storageResponse),
-		bytecodeResps:    make(chan *bytecodeResponse),
+		accountReqs:  make(map[uint64]*accountRequest),
+		storageReqs:  make(map[uint64]*storageRequest),
+		bytecodeReqs: make(map[uint64]*bytecodeRequest),
 
 		trienodeHealIdlers: make(map[string]struct{}),
 		bytecodeHealIdlers: make(map[string]struct{}),
 
-		trienodeHealReqs:     make(map[uint64]*trienodeHealRequest),
-		bytecodeHealReqs:     make(map[uint64]*bytecodeHealRequest),
-		trienodeHealReqFails: make(chan *trienodeHealRequest),
-		bytecodeHealReqFails: make(chan *bytecodeHealRequest),
-		trienodeHealResps:    make(chan *trienodeHealResponse),
-		bytecodeHealResps:    make(chan *bytecodeHealResponse),
-		stateWriter:          db.NewBatch(),
+		trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
+		bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
+		stateWriter:      db.NewBatch(),
 	}
 }
 
@@ -611,6 +597,21 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
 	peerDropSub := s.peerDrop.Subscribe(peerDrop)
 	defer peerDropSub.Unsubscribe()
 
+	// Create a set of unique channels for this sync cycle. We need these to be
+	// ephemeral so a data race doesn't accidentally deliver something stale on
+	// a persistent channel across syncs (yup, this happened)
+	var (
+		accountReqFails      = make(chan *accountRequest)
+		storageReqFails      = make(chan *storageRequest)
+		bytecodeReqFails     = make(chan *bytecodeRequest)
+		accountResps         = make(chan *accountResponse)
+		storageResps         = make(chan *storageResponse)
+		bytecodeResps        = make(chan *bytecodeResponse)
+		trienodeHealReqFails = make(chan *trienodeHealRequest)
+		bytecodeHealReqFails = make(chan *bytecodeHealRequest)
+		trienodeHealResps    = make(chan *trienodeHealResponse)
+		bytecodeHealResps    = make(chan *bytecodeHealResponse)
+	)
 	for {
 		// Remove all completed tasks and terminate sync if everything's done
 		s.cleanStorageTasks()
@@ -619,14 +620,14 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
 			return nil
 		}
 		// Assign all the data retrieval tasks to any free peers
-		s.assignAccountTasks(cancel)
-		s.assignBytecodeTasks(cancel)
-		s.assignStorageTasks(cancel)
+		s.assignAccountTasks(accountResps, accountReqFails, cancel)
+		s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
+		s.assignStorageTasks(storageResps, storageReqFails, cancel)
 
 		if len(s.tasks) == 0 {
 			// Sync phase done, run heal phase
-			s.assignTrienodeHealTasks(cancel)
-			s.assignBytecodeHealTasks(cancel)
+			s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel)
+			s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel)
 		}
 		// Wait for something to happen
 		select {
@@ -639,26 +640,26 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
 		case <-cancel:
 			return ErrCancelled
 
-		case req := <-s.accountReqFails:
+		case req := <-accountReqFails:
 			s.revertAccountRequest(req)
-		case req := <-s.bytecodeReqFails:
+		case req := <-bytecodeReqFails:
 			s.revertBytecodeRequest(req)
-		case req := <-s.storageReqFails:
+		case req := <-storageReqFails:
 			s.revertStorageRequest(req)
-		case req := <-s.trienodeHealReqFails:
+		case req := <-trienodeHealReqFails:
 			s.revertTrienodeHealRequest(req)
-		case req := <-s.bytecodeHealReqFails:
+		case req := <-bytecodeHealReqFails:
 			s.revertBytecodeHealRequest(req)
 
-		case res := <-s.accountResps:
+		case res := <-accountResps:
 			s.processAccountResponse(res)
-		case res := <-s.bytecodeResps:
+		case res := <-bytecodeResps:
 			s.processBytecodeResponse(res)
-		case res := <-s.storageResps:
+		case res := <-storageResps:
 			s.processStorageResponse(res)
-		case res := <-s.trienodeHealResps:
+		case res := <-trienodeHealResps:
 			s.processTrienodeHealResponse(res)
-		case res := <-s.bytecodeHealResps:
+		case res := <-bytecodeHealResps:
 			s.processBytecodeHealResponse(res)
 		}
 		// Report stats if something meaningful happened
@@ -801,7 +802,7 @@ func (s *Syncer) cleanStorageTasks() {
 
 // assignAccountTasks attempts to match idle peers to pending account range
 // retrievals.
-func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
+func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *accountRequest, cancel chan struct{}) {
 	s.lock.Lock()
 	defer s.lock.Unlock()
 
@@ -847,13 +848,15 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
 		}
 		// Generate the network query and send it to the peer
 		req := &accountRequest{
-			peer:   idle,
-			id:     reqid,
-			cancel: cancel,
-			stale:  make(chan struct{}),
-			origin: task.Next,
-			limit:  task.Last,
-			task:   task,
+			peer:    idle,
+			id:      reqid,
+			deliver: success,
+			revert:  fail,
+			cancel:  cancel,
+			stale:   make(chan struct{}),
+			origin:  task.Next,
+			limit:   task.Last,
+			task:    task,
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			peer.Log().Debug("Account range request timed out", "reqid", reqid)
@@ -879,7 +882,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
 }
 
 // assignBytecodeTasks attempts to match idle peers to pending code retrievals.
-func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
+func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *bytecodeRequest, cancel chan struct{}) {
 	s.lock.Lock()
 	defer s.lock.Unlock()
 
@@ -937,12 +940,14 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
 			}
 		}
 		req := &bytecodeRequest{
-			peer:   idle,
-			id:     reqid,
-			cancel: cancel,
-			stale:  make(chan struct{}),
-			hashes: hashes,
-			task:   task,
+			peer:    idle,
+			id:      reqid,
+			deliver: success,
+			revert:  fail,
+			cancel:  cancel,
+			stale:   make(chan struct{}),
+			hashes:  hashes,
+			task:    task,
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			peer.Log().Debug("Bytecode request timed out", "reqid", reqid)
@@ -966,7 +971,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
 
 // assignStorageTasks attempts to match idle peers to pending storage range
 // retrievals.
-func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
+func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *storageRequest, cancel chan struct{}) {
 	s.lock.Lock()
 	defer s.lock.Unlock()
 
@@ -1059,6 +1064,8 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
 		req := &storageRequest{
 			peer:     idle,
 			id:       reqid,
+			deliver:  success,
+			revert:   fail,
 			cancel:   cancel,
 			stale:    make(chan struct{}),
 			accounts: accounts,
@@ -1101,7 +1108,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
 
 // assignTrienodeHealTasks attempts to match idle peers to trie node requests to
 // heal any trie errors caused by the snap sync's chunked retrieval model.
-func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
+func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fail chan *trienodeHealRequest, cancel chan struct{}) {
 	s.lock.Lock()
 	defer s.lock.Unlock()
 
@@ -1179,13 +1186,15 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
 			}
 		}
 		req := &trienodeHealRequest{
-			peer:   idle,
-			id:     reqid,
-			cancel: cancel,
-			stale:  make(chan struct{}),
-			hashes: hashes,
-			paths:  paths,
-			task:   s.healer,
+			peer:    idle,
+			id:      reqid,
+			deliver: success,
+			revert:  fail,
+			cancel:  cancel,
+			stale:   make(chan struct{}),
+			hashes:  hashes,
+			paths:   paths,
+			task:    s.healer,
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			peer.Log().Debug("Trienode heal request timed out", "reqid", reqid)
@@ -1209,7 +1218,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
 
 // assignBytecodeHealTasks attempts to match idle peers to bytecode requests to
 // heal any trie errors caused by the snap sync's chunked retrieval model.
-func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
+func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fail chan *bytecodeHealRequest, cancel chan struct{}) {
 	s.lock.Lock()
 	defer s.lock.Unlock()
 
@@ -1280,12 +1289,14 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
 			}
 		}
 		req := &bytecodeHealRequest{
-			peer:   idle,
-			id:     reqid,
-			cancel: cancel,
-			stale:  make(chan struct{}),
-			hashes: hashes,
-			task:   s.healer,
+			peer:    idle,
+			id:      reqid,
+			deliver: success,
+			revert:  fail,
+			cancel:  cancel,
+			stale:   make(chan struct{}),
+			hashes:  hashes,
+			task:    s.healer,
 		}
 		req.timeout = time.AfterFunc(requestTimeout, func() {
 			peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid)
@@ -1366,7 +1377,7 @@ func (s *Syncer) revertRequests(peer string) {
 // request and return all failed retrieval tasks to the scheduler for reassignment.
 func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) {
 	select {
-	case s.accountReqFails <- req:
+	case req.revert <- req:
 		// Sync event loop notified
 	case <-req.cancel:
 		// Sync cycle got cancelled
@@ -1407,7 +1418,7 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) {
 // and return all failed retrieval tasks to the scheduler for reassignment.
 func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) {
 	select {
-	case s.bytecodeReqFails <- req:
+	case req.revert <- req:
 		// Sync event loop notified
 	case <-req.cancel:
 		// Sync cycle got cancelled
@@ -1448,7 +1459,7 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) {
 // request and return all failed retrieval tasks to the scheduler for reassignment.
 func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) {
 	select {
-	case s.storageReqFails <- req:
+	case req.revert <- req:
 		// Sync event loop notified
 	case <-req.cancel:
 		// Sync cycle got cancelled
@@ -1493,7 +1504,7 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) {
 // request and return all failed retrieval tasks to the scheduler for reassignment.
 func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
 	select {
-	case s.trienodeHealReqFails <- req:
+	case req.revert <- req:
 		// Sync event loop notified
 	case <-req.cancel:
 		// Sync cycle got cancelled
@@ -1534,7 +1545,7 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
 // request and return all failed retrieval tasks to the scheduler for reassignment.
 func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
 	select {
-	case s.bytecodeHealReqFails <- req:
+	case req.revert <- req:
 		// Sync event loop notified
 	case <-req.cancel:
 		// Sync cycle got cancelled
@@ -2147,7 +2158,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
 		cont:     cont,
 	}
 	select {
-	case s.accountResps <- response:
+	case req.deliver <- response:
 	case <-req.cancel:
 	case <-req.stale:
 	}
@@ -2253,7 +2264,7 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
 		codes:  codes,
 	}
 	select {
-	case s.bytecodeResps <- response:
+	case req.deliver <- response:
 	case <-req.cancel:
 	case <-req.stale:
 	}
@@ -2411,7 +2422,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
 		cont:     cont,
 	}
 	select {
-	case s.storageResps <- response:
+	case req.deliver <- response:
 	case <-req.cancel:
 	case <-req.stale:
 	}
@@ -2505,7 +2516,7 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
 		nodes:  nodes,
 	}
 	select {
-	case s.trienodeHealResps <- response:
+	case req.deliver <- response:
 	case <-req.cancel:
 	case <-req.stale:
 	}
@@ -2598,7 +2609,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
 		codes:  codes,
 	}
 	select {
-	case s.bytecodeHealResps <- response:
+	case req.deliver <- response:
 	case <-req.cancel:
 	case <-req.stale:
 	}