Переглянути джерело

core, eth, les: support resuming fast sync on heavy rollback (#3743)

Péter Szilágyi 8 роки тому
батько
коміт
9b84caf3a5

+ 49 - 24
core/blockchain.go

@@ -182,16 +182,25 @@ func (self *BlockChain) loadLastState() error {
 	head := GetHeadBlockHash(self.chainDb)
 	if head == (common.Hash{}) {
 		// Corrupt or empty database, init from scratch
-		self.Reset()
-	} else {
-		if block := self.GetBlockByHash(head); block != nil {
-			// Block found, set as the current head
-			self.currentBlock = block
-		} else {
-			// Corrupt or empty database, init from scratch
-			self.Reset()
-		}
+		log.Warn("Empty database, resetting chain")
+		return self.Reset()
+	}
+	// Make sure the entire head block is available
+	currentBlock := self.GetBlockByHash(head)
+	if currentBlock == nil {
+		// Corrupt or empty database, init from scratch
+		log.Warn("Head block missing, resetting chain", "hash", head)
+		return self.Reset()
 	}
+	// Make sure the state associated with the block is available
+	if _, err := state.New(currentBlock.Root(), self.chainDb); err != nil {
+		// Dangling block without a state associated, init from scratch
+		log.Warn("Head state missing, resetting chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
+		return self.Reset()
+	}
+	// Everything seems to be fine, set as the head block
+	self.currentBlock = currentBlock
+
 	// Restore the last known head header
 	currentHeader := self.currentBlock.Header()
 	if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) {
@@ -200,6 +209,7 @@ func (self *BlockChain) loadLastState() error {
 		}
 	}
 	self.hc.SetCurrentHeader(currentHeader)
+
 	// Restore the last known head fast block
 	self.currentFastBlock = self.currentBlock
 	if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) {
@@ -233,14 +243,18 @@ func (self *BlockChain) loadLastState() error {
 // above the new head will be deleted and the new one set. In the case of blocks
 // though, the head may be further rewound if block bodies are missing (non-archive
 // nodes after a fast sync).
-func (bc *BlockChain) SetHead(head uint64) {
+func (bc *BlockChain) SetHead(head uint64) error {
+	log.Warn("Rewinding blockchain", "target", head)
+
 	bc.mu.Lock()
 	defer bc.mu.Unlock()
 
+	// Rewind the header chain, deleting all block bodies until then
 	delFn := func(hash common.Hash, num uint64) {
 		DeleteBody(bc.chainDb, hash, num)
 	}
 	bc.hc.SetHead(head, delFn)
+	currentHeader := bc.hc.CurrentHeader()
 
 	// Clear out any stale content from the caches
 	bc.bodyCache.Purge()
@@ -248,29 +262,34 @@ func (bc *BlockChain) SetHead(head uint64) {
 	bc.blockCache.Purge()
 	bc.futureBlocks.Purge()
 
-	// Update all computed fields to the new head
-	currentHeader := bc.hc.CurrentHeader()
+	// Rewind the block chain, ensuring we don't end up with a stateless head block
 	if bc.currentBlock != nil && currentHeader.Number.Uint64() < bc.currentBlock.NumberU64() {
 		bc.currentBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
 	}
+	if bc.currentBlock != nil {
+		if _, err := state.New(bc.currentBlock.Root(), bc.chainDb); err != nil {
+			// Rewound state missing, rolled back to before pivot, reset to genesis
+			bc.currentBlock = nil
+		}
+	}
+	// Rewind the fast block in a simpleton way to the target head
 	if bc.currentFastBlock != nil && currentHeader.Number.Uint64() < bc.currentFastBlock.NumberU64() {
 		bc.currentFastBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
 	}
-
+	// If either blocks reached nil, reset to the genesis state
 	if bc.currentBlock == nil {
 		bc.currentBlock = bc.genesisBlock
 	}
 	if bc.currentFastBlock == nil {
 		bc.currentFastBlock = bc.genesisBlock
 	}
-
 	if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil {
 		log.Crit("Failed to reset head full block", "err", err)
 	}
 	if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil {
 		log.Crit("Failed to reset head fast block", "err", err)
 	}
-	bc.loadLastState()
+	return bc.loadLastState()
 }
 
 // FastSyncCommitHead sets the current head block to the one defined by the hash
@@ -378,16 +397,17 @@ func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
 }
 
 // Reset purges the entire blockchain, restoring it to its genesis state.
-func (bc *BlockChain) Reset() {
-	bc.ResetWithGenesisBlock(bc.genesisBlock)
+func (bc *BlockChain) Reset() error {
+	return bc.ResetWithGenesisBlock(bc.genesisBlock)
 }
 
 // ResetWithGenesisBlock purges the entire blockchain, restoring it to the
 // specified genesis state.
-func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
+func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
 	// Dump the entire block chain and purge the caches
-	bc.SetHead(0)
-
+	if err := bc.SetHead(0); err != nil {
+		return err
+	}
 	bc.mu.Lock()
 	defer bc.mu.Unlock()
 
@@ -404,6 +424,8 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
 	bc.hc.SetGenesis(bc.genesisBlock.Header())
 	bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
 	bc.currentFastBlock = bc.genesisBlock
+
+	return nil
 }
 
 // Export writes the active chain to the given writer.
@@ -790,12 +812,15 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
 	}
 	// Update the head fast sync block if better
 	self.mu.Lock()
+
 	head := blockChain[len(errs)-1]
-	if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(self.GetTd(head.Hash(), head.NumberU64())) < 0 {
-		if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil {
-			log.Crit("Failed to update head fast block hash", "err", err)
+	if td := self.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
+		if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(td) < 0 {
+			if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil {
+				log.Crit("Failed to update head fast block hash", "err", err)
+			}
+			self.currentFastBlock = head
 		}
-		self.currentFastBlock = head
 	}
 	self.mu.Unlock()
 

+ 1 - 0
eth/api_backend.go

@@ -51,6 +51,7 @@ func (b *EthApiBackend) CurrentBlock() *types.Block {
 }
 
 func (b *EthApiBackend) SetHead(number uint64) {
+	b.eth.protocolManager.downloader.Cancel()
 	b.eth.blockchain.SetHead(number)
 }
 

+ 8 - 8
eth/downloader/downloader.go

@@ -277,7 +277,7 @@ func (d *Downloader) UnregisterPeer(id string) error {
 	d.cancelLock.RUnlock()
 
 	if master {
-		d.cancel()
+		d.Cancel()
 	}
 	return nil
 }
@@ -352,7 +352,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
 	d.cancelPeer = id
 	d.cancelLock.Unlock()
 
-	defer d.cancel() // No matter what, we can't leave the cancel channel open
+	defer d.Cancel() // No matter what, we can't leave the cancel channel open
 
 	// Set the requested sync mode, unless it's forbidden
 	d.mode = mode
@@ -473,7 +473,7 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
 		}
 	}
 	d.queue.Close()
-	d.cancel()
+	d.Cancel()
 	wg.Wait()
 
 	// If sync failed in the critical section, bump the fail counter
@@ -483,9 +483,9 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
 	return err
 }
 
-// cancel cancels all of the operations and resets the queue. It returns true
+// Cancel cancels all of the operations and resets the queue. It returns true
 // if the cancel operation was completed.
-func (d *Downloader) cancel() {
+func (d *Downloader) Cancel() {
 	// Close the current cancel channel
 	d.cancelLock.Lock()
 	if d.cancelCh != nil {
@@ -512,7 +512,7 @@ func (d *Downloader) Terminate() {
 	d.quitLock.Unlock()
 
 	// Cancel any pending download requests
-	d.cancel()
+	d.Cancel()
 }
 
 // fetchHeight retrieves the head header of the remote peer to aid in estimating
@@ -945,7 +945,7 @@ func (d *Downloader) fetchNodeData() error {
 				if err != nil {
 					// If the node data processing failed, the root hash is very wrong, abort
 					log.Error("State processing failed", "peer", packet.PeerId(), "err", err)
-					d.cancel()
+					d.Cancel()
 					return
 				}
 				// Processing succeeded, notify state fetcher of continuation
@@ -1208,7 +1208,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
 				if atomic.LoadUint32(&d.fsPivotFails) == 0 {
 					for _, header := range rollback {
 						if header.Number.Uint64() == pivot {
-							log.Warn("Fast-sync critical section failure, locked pivot to header", "number", pivot, "hash", header.Hash())
+							log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash())
 							d.fsPivotLock = header
 						}
 					}

+ 2 - 2
eth/downloader/downloader_test.go

@@ -982,7 +982,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
 	tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
 
 	// Make sure canceling works with a pristine downloader
-	tester.downloader.cancel()
+	tester.downloader.Cancel()
 	if !tester.downloader.queue.Idle() {
 		t.Errorf("download queue not idle")
 	}
@@ -990,7 +990,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
 	if err := tester.sync("peer", nil, mode); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
-	tester.downloader.cancel()
+	tester.downloader.Cancel()
 	if !tester.downloader.queue.Idle() {
 		t.Errorf("download queue not idle")
 	}

+ 8 - 0
eth/sync.go

@@ -175,6 +175,14 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
 	// Otherwise try to sync with the downloader
 	mode := downloader.FullSync
 	if atomic.LoadUint32(&pm.fastSync) == 1 {
+		// Fast sync was explicitly requested, and explicitly granted
+		mode = downloader.FastSync
+	} else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
+		// The database seems empty as the current block is the genesis. Yet the fast
+		// block is ahead, so fast sync was enabled for this node at a certain point.
+		// The only scenario where this can happen is if the user manually (or via a
+		// bad block) rolled back a fast sync node below the sync point. In this case
+		// however it's safe to reenable fast sync.
 		mode = downloader.FastSync
 	}
 	if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {

+ 1 - 0
les/api_backend.go

@@ -50,6 +50,7 @@ func (b *LesApiBackend) CurrentBlock() *types.Block {
 }
 
 func (b *LesApiBackend) SetHead(number uint64) {
+	b.eth.protocolManager.downloader.Cancel()
 	b.eth.blockchain.SetHead(number)
 }