Browse Source

eth, core: interupt the chain processing on stop

Added an additional channel which is used to interupt the chain manager
when it's processing blocks.
obscuren 10 years ago
parent
commit
90c4493a10
2 changed files with 105 additions and 95 deletions
  1. 104 94
      core/chain_manager.go
  2. 1 1
      eth/backend.go

+ 104 - 94
core/chain_manager.go

@@ -100,8 +100,9 @@ type ChainManager struct {
 	cache        *BlockCache
 	futureBlocks *BlockCache
 
-	quit chan struct{}
-	wg   sync.WaitGroup
+	quit         chan struct{}
+	procInterupt chan struct{} // interupt signaler for block processing
+	wg           sync.WaitGroup
 
 	pow pow.PoW
 }
@@ -113,6 +114,7 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
 		genesisBlock: GenesisBlock(42, stateDb),
 		eventMux:     mux,
 		quit:         make(chan struct{}),
+		procInterupt: make(chan struct{}),
 		cache:        NewBlockCache(blockCacheLimit),
 		pow:          pow,
 	}
@@ -516,6 +518,7 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
 
 func (bc *ChainManager) Stop() {
 	close(bc.quit)
+	close(bc.procInterupt)
 
 	bc.wg.Wait()
 
@@ -568,119 +571,126 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
 	defer close(nonceQuit)
 
 	txcount := 0
+done:
 	for i, block := range chain {
-		bstart := time.Now()
-		// Wait for block i's nonce to be verified before processing
-		// its state transition.
-		for !nonceChecked[i] {
-			r := <-nonceDone
-			nonceChecked[r.i] = true
-			if !r.valid {
-				block := chain[r.i]
-				return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
+		select {
+		case <-self.procInterupt:
+			glog.V(logger.Debug).Infoln("Premature abort during chain processing")
+			break done
+		default:
+			bstart := time.Now()
+			// Wait for block i's nonce to be verified before processing
+			// its state transition.
+			for !nonceChecked[i] {
+				r := <-nonceDone
+				nonceChecked[r.i] = true
+				if !r.valid {
+					block := chain[r.i]
+					return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
+				}
 			}
-		}
 
-		if BadHashes[block.Hash()] {
-			err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
-			blockErr(block, err)
-			return i, err
-		}
-
-		// Setting block.Td regardless of error (known for example) prevents errors down the line
-		// in the protocol handler
-		block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
-
-		// Call in to the block processor and check for errors. It's likely that if one block fails
-		// all others will fail too (unless a known block is returned).
-		logs, err := self.processor.Process(block)
-		if err != nil {
-			if IsKnownBlockErr(err) {
-				stats.ignored++
-				continue
+			if BadHashes[block.Hash()] {
+				err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
+				blockErr(block, err)
+				return i, err
 			}
 
-			if err == BlockFutureErr {
-				// Allow up to MaxFuture second in the future blocks. If this limit
-				// is exceeded the chain is discarded and processed at a later time
-				// if given.
-				if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
-					return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
+			// Setting block.Td regardless of error (known for example) prevents errors down the line
+			// in the protocol handler
+			block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
+
+			// Call in to the block processor and check for errors. It's likely that if one block fails
+			// all others will fail too (unless a known block is returned).
+			logs, err := self.processor.Process(block)
+			if err != nil {
+				if IsKnownBlockErr(err) {
+					stats.ignored++
+					continue
 				}
 
-				block.SetQueued(true)
-				self.futureBlocks.Push(block)
-				stats.queued++
-				continue
-			}
+				if err == BlockFutureErr {
+					// Allow up to MaxFuture second in the future blocks. If this limit
+					// is exceeded the chain is discarded and processed at a later time
+					// if given.
+					if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
+						return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
+					}
 
-			if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
-				block.SetQueued(true)
-				self.futureBlocks.Push(block)
-				stats.queued++
-				continue
-			}
+					block.SetQueued(true)
+					self.futureBlocks.Push(block)
+					stats.queued++
+					continue
+				}
 
-			blockErr(block, err)
+				if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
+					block.SetQueued(true)
+					self.futureBlocks.Push(block)
+					stats.queued++
+					continue
+				}
 
-			return i, err
-		}
+				blockErr(block, err)
 
-		txcount += len(block.Transactions())
-
-		cblock := self.currentBlock
-		// Compare the TD of the last known block in the canonical chain to make sure it's greater.
-		// At this point it's possible that a different chain (fork) becomes the new canonical chain.
-		if block.Td.Cmp(self.Td()) > 0 {
-			// chain fork
-			if block.ParentHash() != cblock.Hash() {
-				// during split we merge two different chains and create the new canonical chain
-				err := self.merge(cblock, block)
-				if err != nil {
-					return i, err
+				return i, err
+			}
+
+			txcount += len(block.Transactions())
+
+			cblock := self.currentBlock
+			// Compare the TD of the last known block in the canonical chain to make sure it's greater.
+			// At this point it's possible that a different chain (fork) becomes the new canonical chain.
+			if block.Td.Cmp(self.Td()) > 0 {
+				// chain fork
+				if block.ParentHash() != cblock.Hash() {
+					// during split we merge two different chains and create the new canonical chain
+					err := self.merge(cblock, block)
+					if err != nil {
+						return i, err
+					}
+
+					queue[i] = ChainSplitEvent{block, logs}
+					queueEvent.splitCount++
 				}
 
-				queue[i] = ChainSplitEvent{block, logs}
-				queueEvent.splitCount++
-			}
+				self.mu.Lock()
+				self.setTotalDifficulty(block.Td)
+				self.insert(block)
+				self.mu.Unlock()
 
-			self.mu.Lock()
-			self.setTotalDifficulty(block.Td)
-			self.insert(block)
-			self.mu.Unlock()
+				jsonlogger.LogJson(&logger.EthChainNewHead{
+					BlockHash:     block.Hash().Hex(),
+					BlockNumber:   block.Number(),
+					ChainHeadHash: cblock.Hash().Hex(),
+					BlockPrevHash: block.ParentHash().Hex(),
+				})
 
-			jsonlogger.LogJson(&logger.EthChainNewHead{
-				BlockHash:     block.Hash().Hex(),
-				BlockNumber:   block.Number(),
-				ChainHeadHash: cblock.Hash().Hex(),
-				BlockPrevHash: block.ParentHash().Hex(),
-			})
+				self.setTransState(state.New(block.Root(), self.stateDb))
+				self.txState.SetState(state.New(block.Root(), self.stateDb))
 
-			self.setTransState(state.New(block.Root(), self.stateDb))
-			self.txState.SetState(state.New(block.Root(), self.stateDb))
+				queue[i] = ChainEvent{block, block.Hash(), logs}
+				queueEvent.canonicalCount++
 
-			queue[i] = ChainEvent{block, block.Hash(), logs}
-			queueEvent.canonicalCount++
+				if glog.V(logger.Debug) {
+					glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
+				}
+			} else {
+				if glog.V(logger.Detail) {
+					glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
+				}
 
-			if glog.V(logger.Debug) {
-				glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
-			}
-		} else {
-			if glog.V(logger.Detail) {
-				glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
+				queue[i] = ChainSideEvent{block, logs}
+				queueEvent.sideCount++
 			}
+			// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
+			// not in the canonical chain.
+			self.write(block)
+			// Delete from future blocks
+			self.futureBlocks.Delete(block.Hash())
 
-			queue[i] = ChainSideEvent{block, logs}
-			queueEvent.sideCount++
-		}
-		// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
-		// not in the canonical chain.
-		self.write(block)
-		// Delete from future blocks
-		self.futureBlocks.Delete(block.Hash())
-
-		stats.processed++
+			stats.processed++
 
+		}
 	}
 
 	if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {

+ 1 - 1
eth/backend.go

@@ -527,8 +527,8 @@ func (self *Ethereum) AddPeer(nodeURL string) error {
 
 func (s *Ethereum) Stop() {
 	s.net.Stop()
-	s.protocolManager.Stop()
 	s.chainManager.Stop()
+	s.protocolManager.Stop()
 	s.txPool.Stop()
 	s.eventMux.Stop()
 	if s.whisper != nil {