Browse Source

eth: interrupt chain insertion on shutdown (#21114)

This adds a new API method on core.BlockChain to allow interrupting
running data inserts, and calls the method before shutting down the
downloader.

The BlockChain interrupt checks are now done through a method instead
of inlining the atomic load everywhere. There is no loss of efficiency from
this and it makes the interrupt protocol a lot clearer because the check is
defined next to the method that sets the flag.
Felix Lange 5 years ago
parent
commit
9219e0fba4
3 changed files with 38 additions and 22 deletions
  1. 23 17
      core/blockchain.go
  2. 6 2
      eth/sync.go
  3. 9 3
      light/lightchain.go

+ 23 - 17
core/blockchain.go

@@ -178,11 +178,10 @@ type BlockChain struct {
 	txLookupCache *lru.Cache     // Cache for the most recent transaction lookup data.
 	futureBlocks  *lru.Cache     // future blocks are blocks added for later processing
 
-	quit    chan struct{} // blockchain quit channel
-	running int32         // running must be called atomically
-	// procInterrupt must be atomically called
-	procInterrupt int32          // interrupt signaler for block processing
+	quit          chan struct{}  // blockchain quit channel
 	wg            sync.WaitGroup // chain processing wait group for shutting down
+	running       int32          // 0 if chain is running, 1 when stopped
+	procInterrupt int32          // interrupt signaler for block processing
 
 	engine     consensus.Engine
 	validator  Validator  // Block and state validator interface
@@ -239,7 +238,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 	bc.processor = NewStateProcessor(chainConfig, bc, engine)
 
 	var err error
-	bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
+	bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
 	if err != nil {
 		return nil, err
 	}
@@ -332,10 +331,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 	return bc, nil
 }
 
-func (bc *BlockChain) getProcInterrupt() bool {
-	return atomic.LoadInt32(&bc.procInterrupt) == 1
-}
-
 // GetVMConfig returns the block chain VM config.
 func (bc *BlockChain) GetVMConfig() *vm.Config {
 	return &bc.vmConfig
@@ -882,8 +877,7 @@ func (bc *BlockChain) Stop() {
 	// Unsubscribe all subscriptions registered from blockchain
 	bc.scope.Close()
 	close(bc.quit)
-	atomic.StoreInt32(&bc.procInterrupt, 1)
-
+	bc.StopInsert()
 	bc.wg.Wait()
 
 	// Ensure that the entirety of the state snapshot is journalled to disk.
@@ -928,6 +922,18 @@ func (bc *BlockChain) Stop() {
 	log.Info("Blockchain stopped")
 }
 
+// StopInsert interrupts all insertion methods, causing them to return
+// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
+// calling this method.
+func (bc *BlockChain) StopInsert() {
+	atomic.StoreInt32(&bc.procInterrupt, 1)
+}
+
+// insertStopped returns true after StopInsert has been called.
+func (bc *BlockChain) insertStopped() bool {
+	return atomic.LoadInt32(&bc.procInterrupt) == 1
+}
+
 func (bc *BlockChain) procFutureBlocks() {
 	blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
 	for _, hash := range bc.futureBlocks.Keys() {
@@ -1113,7 +1119,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
 		var deleted []*numberHash
 		for i, block := range blockChain {
 			// Short circuit insertion if shutting down or processing failed
-			if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+			if bc.insertStopped() {
 				return 0, errInsertionInterrupted
 			}
 			// Short circuit insertion if it is required(used in testing only)
@@ -1260,7 +1266,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
 		batch := bc.db.NewBatch()
 		for i, block := range blockChain {
 			// Short circuit insertion if shutting down or processing failed
-			if atomic.LoadInt32(&bc.procInterrupt) == 1 {
+			if bc.insertStopped() {
 				return 0, errInsertionInterrupted
 			}
 			// Short circuit if the owner header is unknown
@@ -1708,8 +1714,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
 	// No validation errors for the first block (or chain prefix skipped)
 	for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
 		// If the chain is terminating, stop processing blocks
-		if atomic.LoadInt32(&bc.procInterrupt) == 1 {
-			log.Debug("Premature abort during blocks processing")
+		if bc.insertStopped() {
+			log.Debug("Abort during block processing")
 			break
 		}
 		// If the header is a banned one, straight out abort
@@ -1996,8 +2002,8 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
 			blocks, memory = blocks[:0], 0
 
 			// If the chain is terminating, stop processing blocks
-			if atomic.LoadInt32(&bc.procInterrupt) == 1 {
-				log.Debug("Premature abort during blocks processing")
+			if bc.insertStopped() {
+				log.Debug("Abort during blocks processing")
 				return 0, nil
 			}
 		}

+ 6 - 2
eth/sync.go

@@ -199,7 +199,6 @@ func (cs *chainSyncer) loop() {
 	cs.pm.txFetcher.Start()
 	defer cs.pm.blockFetcher.Stop()
 	defer cs.pm.txFetcher.Stop()
-	defer cs.pm.downloader.Terminate()
 
 	// The force timer lowers the peer count threshold down to one when it fires.
 	// This ensures we'll always start sync even if there aren't enough peers.
@@ -222,8 +221,13 @@ func (cs *chainSyncer) loop() {
 			cs.forced = true
 
 		case <-cs.pm.quitSync:
+			// Disable all insertion on the blockchain. This needs to happen before
+			// terminating the downloader because the downloader waits for blockchain
+			// inserts, and these can take a long time to finish.
+			cs.pm.blockchain.StopInsert()
+			cs.pm.downloader.Terminate()
 			if cs.doneCh != nil {
-				cs.pm.downloader.Terminate() // Double term is fine, Cancel would block until queue is emptied
+				// Wait for the current sync to end.
 				<-cs.doneCh
 			}
 			return

+ 9 - 3
light/lightchain.go

@@ -314,10 +314,16 @@ func (lc *LightChain) Stop() {
 		return
 	}
 	close(lc.quit)
-	atomic.StoreInt32(&lc.procInterrupt, 1)
-
+	lc.StopInsert()
 	lc.wg.Wait()
-	log.Info("Blockchain manager stopped")
+	log.Info("Blockchain stopped")
+}
+
+// StopInsert interrupts all insertion methods, causing them to return
+// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
+// calling this method.
+func (lc *LightChain) StopInsert() {
+	atomic.StoreInt32(&lc.procInterrupt, 1)
 }
 
 // Rollback is designed to remove a chain of links from the database that aren't