Kaynağa Gözat

core, miner: added write block method & changed mining propagation

Jeffrey Wilcke 10 yıl önce
ebeveyn
işleme
d1e93db3eb
2 değiştirilmiş dosya ile 95 ekleme ve 73 silme
  1. 67 45
      core/chain_manager.go
  2. 28 28
      miner/worker.go

+ 67 - 45
core/chain_manager.go

@@ -548,6 +548,58 @@ func (self *ChainManager) procFutureBlocks() {
 	}
 }
 
+type writeStatus byte
+
+const (
+	nonStatTy writeStatus = iota
+	canonStatTy
+	splitStatTy
+	sideStatTy
+)
+
+func (self *ChainManager) WriteBlock(block *types.Block) (status writeStatus, err error) {
+	self.wg.Add(1)
+	defer self.wg.Done()
+
+	cblock := self.currentBlock
+	// Compare the TD of the last known block in the canonical chain to make sure it's greater.
+	// At this point it's possible that a different chain (fork) becomes the new canonical chain.
+	if block.Td.Cmp(self.Td()) > 0 {
+		// chain fork
+		if block.ParentHash() != cblock.Hash() {
+			// during split we merge two different chains and create the new canonical chain
+			err := self.merge(cblock, block)
+			if err != nil {
+				return nonStatTy, err
+			}
+
+			status = splitStatTy
+		}
+
+		self.mu.Lock()
+		self.setTotalDifficulty(block.Td)
+		self.insert(block)
+		self.mu.Unlock()
+
+		self.setTransState(state.New(block.Root(), self.stateDb))
+		self.txState.SetState(state.New(block.Root(), self.stateDb))
+
+		status = canonStatTy
+	} else {
+		status = sideStatTy
+	}
+
+	// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
+	// not in the canonical chain.
+	self.mu.Lock()
+	self.write(block)
+	self.mu.Unlock()
+	// Delete from future blocks
+	self.futureBlocks.Delete(block.Hash())
+
+	return
+}
+
 // InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
 // it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
 func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
@@ -641,59 +693,29 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
 
 		txcount += len(block.Transactions())
 
-		cblock := self.currentBlock
-		// Compare the TD of the last known block in the canonical chain to make sure it's greater.
-		// At this point it's possible that a different chain (fork) becomes the new canonical chain.
-		if block.Td.Cmp(self.Td()) > 0 {
-			// chain fork
-			if block.ParentHash() != cblock.Hash() {
-				// during split we merge two different chains and create the new canonical chain
-				err := self.merge(cblock, block)
-				if err != nil {
-					return i, err
-				}
-
-				queue[i] = ChainSplitEvent{block, logs}
-				queueEvent.splitCount++
-			}
-
-			self.mu.Lock()
-			self.setTotalDifficulty(block.Td)
-			self.insert(block)
-			self.mu.Unlock()
-
-			jsonlogger.LogJson(&logger.EthChainNewHead{
-				BlockHash:     block.Hash().Hex(),
-				BlockNumber:   block.Number(),
-				ChainHeadHash: cblock.Hash().Hex(),
-				BlockPrevHash: block.ParentHash().Hex(),
-			})
-
-			self.setTransState(state.New(block.Root(), self.stateDb))
-			self.txState.SetState(state.New(block.Root(), self.stateDb))
-
-			queue[i] = ChainEvent{block, block.Hash(), logs}
-			queueEvent.canonicalCount++
-
+		// write the block to the chain and get the status
+		status, err := self.WriteBlock(block)
+		if err != nil {
+			return i, err
+		}
+		switch status {
+		case canonStatTy:
 			if glog.V(logger.Debug) {
 				glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
 			}
-		} else {
+			queue[i] = ChainEvent{block, block.Hash(), logs}
+			queueEvent.canonicalCount++
+		case sideStatTy:
 			if glog.V(logger.Detail) {
 				glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
 			}
-
 			queue[i] = ChainSideEvent{block, logs}
 			queueEvent.sideCount++
+		case splitStatTy:
+			queue[i] = ChainSplitEvent{block, logs}
+			queueEvent.splitCount++
 		}
-		// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
-		// not in the canonical chain.
-		self.write(block)
-		// Delete from future blocks
-		self.futureBlocks.Delete(block.Hash())
-
 		stats.processed++
-		blockInsertTimer.UpdateSince(bstart)
 	}
 
 	if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
@@ -752,9 +774,9 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
 		}
 	}
 
-	if glog.V(logger.Info) {
+	if glog.V(logger.Debug) {
 		commonHash := commonBlock.Hash()
-		glog.Infof("Fork detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
+		glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
 	}
 
 	return newChain, nil

+ 28 - 28
miner/worker.go

@@ -239,43 +239,35 @@ func (self *worker) wait() {
 				continue
 			}
 
-			// broadcast before waiting for validation
-			go self.mux.Post(core.NewMinedBlockEvent{block})
-			// insert mined block in to our own chain
-			if _, err := self.chain.InsertChain(types.Blocks{block}); err == nil {
-				// remove uncles we've previously inserted
-				for _, uncle := range block.Uncles() {
-					delete(self.possibleUncles, uncle.Hash())
-				}
-
-				// check staleness and display confirmation
-				var stale, confirm string
-				canonBlock := self.chain.GetBlockByNumber(block.NumberU64())
-				if canonBlock != nil && canonBlock.Hash() != block.Hash() {
-					stale = "stale "
-				} else {
-					confirm = "Wait 5 blocks for confirmation"
-					self.current.localMinedBlocks = newLocalMinedBlock(block.Number().Uint64(), self.current.localMinedBlocks)
-				}
-
-				glog.V(logger.Info).Infof("🔨  Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm)
+			_, err := self.chain.WriteBlock(block)
+			if err != nil {
+				glog.V(logger.Error).Infoln("error writing block to chain", err)
+				continue
+			}
 
-				// XXX remove old structured json logging
-				jsonlogger.LogJson(&logger.EthMinerNewBlock{
-					BlockHash:     block.Hash().Hex(),
-					BlockNumber:   block.Number(),
-					ChainHeadHash: block.ParentHeaderHash.Hex(),
-					BlockPrevHash: block.ParentHeaderHash.Hex(),
-				})
+			// check staleness and display confirmation
+			var stale, confirm string
+			canonBlock := self.chain.GetBlockByNumber(block.NumberU64())
+			if canonBlock != nil && canonBlock.Hash() != block.Hash() {
+				stale = "stale "
 			} else {
-				self.commitNewWork()
+				confirm = "Wait 5 blocks for confirmation"
+				self.current.localMinedBlocks = newLocalMinedBlock(block.Number().Uint64(), self.current.localMinedBlocks)
 			}
+
+			glog.V(logger.Info).Infof("🔨  Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm)
+
+			// broadcast before waiting for validation
+			go self.mux.Post(core.NewMinedBlockEvent{block})
+
+			self.commitNewWork()
 		}
 	}
 }
 
 func (self *worker) push() {
 	if atomic.LoadInt32(&self.mining) == 1 {
+		self.current.state.Sync()
 		self.current.block.SetRoot(self.current.state.Root())
 
 		// push new work to agents
@@ -302,6 +294,13 @@ func (self *worker) makeCurrent() {
 	if block.Time() <= parent.Time() {
 		block.Header().Time = parent.Header().Time + 1
 	}
+	// this will ensure we're not going off too far in the future
+	if now := time.Now().Unix(); block.Time() > now+4 {
+		wait := time.Duration(block.Time()-now) * time.Second
+		glog.V(logger.Info).Infoln("We are too far in the future. Waiting for", wait)
+		time.Sleep(wait)
+	}
+
 	block.Header().Extra = self.extra
 
 	// when 08 is processed ancestors contain 07 (quick block)
@@ -428,6 +427,7 @@ func (self *worker) commitNewWork() {
 	self.current.block.SetUncles(uncles)
 
 	core.AccumulateRewards(self.current.state, self.current.block)
+	self.current.block.Td = new(big.Int).Set(core.CalcTD(self.current.block, self.chain.GetBlock(self.current.block.ParentHash())))
 
 	self.current.state.Update()