Browse Source

miner: seperate state, receipts for different mining work (#17323)

gary rong 7 years ago
parent
commit
941018b570
2 changed files with 67 additions and 64 deletions
  1. 17 12
      miner/agent.go
  2. 50 52
      miner/worker.go

+ 17 - 12
miner/agent.go

@@ -27,10 +27,10 @@ import (
 type CpuAgent struct {
 	mu sync.Mutex
 
-	workCh        chan *Work
+	taskCh        chan *Package
+	returnCh      chan<- *Package
 	stop          chan struct{}
 	quitCurrentOp chan struct{}
-	returnCh      chan<- *Result
 
 	chain  consensus.ChainReader
 	engine consensus.Engine
@@ -43,13 +43,17 @@ func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent
 		chain:  chain,
 		engine: engine,
 		stop:   make(chan struct{}, 1),
-		workCh: make(chan *Work, 1),
+		taskCh: make(chan *Package, 1),
 	}
 	return agent
 }
 
-func (self *CpuAgent) Work() chan<- *Work            { return self.workCh }
-func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch }
+func (self *CpuAgent) AssignTask(p *Package) {
+	if atomic.LoadInt32(&self.started) == 1 {
+		self.taskCh <- p
+	}
+}
+func (self *CpuAgent) DeliverTo(ch chan<- *Package) { self.returnCh = ch }
 
 func (self *CpuAgent) Start() {
 	if !atomic.CompareAndSwapInt32(&self.started, 0, 1) {
@@ -67,7 +71,7 @@ done:
 	// Empty work channel
 	for {
 		select {
-		case <-self.workCh:
+		case <-self.taskCh:
 		default:
 			break done
 		}
@@ -78,13 +82,13 @@ func (self *CpuAgent) update() {
 out:
 	for {
 		select {
-		case work := <-self.workCh:
+		case p := <-self.taskCh:
 			self.mu.Lock()
 			if self.quitCurrentOp != nil {
 				close(self.quitCurrentOp)
 			}
 			self.quitCurrentOp = make(chan struct{})
-			go self.mine(work, self.quitCurrentOp)
+			go self.mine(p, self.quitCurrentOp)
 			self.mu.Unlock()
 		case <-self.stop:
 			self.mu.Lock()
@@ -98,10 +102,11 @@ out:
 	}
 }
 
-func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
-	if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
-		log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
-		self.returnCh <- &Result{work, result}
+func (self *CpuAgent) mine(p *Package, stop <-chan struct{}) {
+	var err error
+	if p.Block, err = self.engine.Seal(self.chain, p.Block, stop); p.Block != nil {
+		log.Info("Successfully sealed new block", "number", p.Block.Number(), "hash", p.Block.Hash())
+		self.returnCh <- p
 	} else {
 		if err != nil {
 			log.Warn("Block sealing failed", "err", err)

+ 50 - 52
miner/worker.go

@@ -51,17 +51,16 @@ const (
 	chainSideChanSize = 10
 )
 
-// Agent can register themself with the worker
+// Agent can register themselves with the worker
 type Agent interface {
-	Work() chan<- *Work
-	SetReturnCh(chan<- *Result)
+	AssignTask(*Package)
+	DeliverTo(chan<- *Package)
 	Start()
 	Stop()
 }
 
-// Work is the workers current environment and holds
-// all of the current state information
-type Work struct {
+// Env is the workers current environment and holds all of the current state information.
+type Env struct {
 	config *params.ChainConfig
 	signer types.Signer
 
@@ -72,8 +71,6 @@ type Work struct {
 	tcount    int            // tx count in cycle
 	gasPool   *core.GasPool  // available gas used to pack transactions
 
-	Block *types.Block // the new block
-
 	header   *types.Header
 	txs      []*types.Transaction
 	receipts []*types.Receipt
@@ -81,9 +78,11 @@ type Work struct {
 	createdAt time.Time
 }
 
-type Result struct {
-	Work  *Work
-	Block *types.Block
+// Package contains all information for consensus engine sealing and result submitting.
+type Package struct {
+	Receipts []*types.Receipt
+	State    *state.StateDB
+	Block    *types.Block
 }
 
 // worker is the main object which takes care of applying messages to the new state
@@ -103,7 +102,7 @@ type worker struct {
 	chainSideSub event.Subscription
 
 	agents map[Agent]struct{}
-	recv   chan *Result
+	recv   chan *Package
 
 	eth     Backend
 	chain   *core.BlockChain
@@ -114,7 +113,7 @@ type worker struct {
 	extra    []byte
 
 	currentMu sync.Mutex
-	current   *Work
+	current   *Env
 
 	snapshotMu    sync.RWMutex
 	snapshotBlock *types.Block
@@ -126,7 +125,6 @@ type worker struct {
 	unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
 
 	// atomic status counters
-	atWork  int32 // The number of in-flight consensus engine work.
 	running int32 // The indicator whether the consensus engine is running or not.
 }
 
@@ -140,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
 		chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),
 		chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),
 		chainDb:        eth.ChainDb(),
-		recv:           make(chan *Result, resultQueueSize),
+		recv:           make(chan *Package, resultQueueSize),
 		chain:          eth.BlockChain(),
 		proc:           eth.BlockChain().Validator(),
 		possibleUncles: make(map[common.Hash]*types.Block),
@@ -203,7 +201,6 @@ func (self *worker) stop() {
 	for agent := range self.agents {
 		agent.Stop()
 	}
-	atomic.StoreInt32(&self.atWork, 0)
 }
 
 func (self *worker) isRunning() bool {
@@ -214,7 +211,7 @@ func (self *worker) register(agent Agent) {
 	self.mu.Lock()
 	defer self.mu.Unlock()
 	self.agents[agent] = struct{}{}
-	agent.SetReturnCh(self.recv)
+	agent.DeliverTo(self.recv)
 	if self.isRunning() {
 		agent.Start()
 	}
@@ -284,26 +281,24 @@ func (self *worker) update() {
 func (self *worker) wait() {
 	for {
 		for result := range self.recv {
-			atomic.AddInt32(&self.atWork, -1)
 
 			if result == nil {
 				continue
 			}
 			block := result.Block
-			work := result.Work
 
 			// Update the block hash in all logs since it is now available and not when the
 			// receipt/log of individual transactions were created.
-			for _, r := range work.receipts {
+			for _, r := range result.Receipts {
 				for _, l := range r.Logs {
 					l.BlockHash = block.Hash()
 				}
 			}
-			for _, log := range work.state.Logs() {
+			for _, log := range result.State.Logs() {
 				log.BlockHash = block.Hash()
 			}
 			self.currentMu.Lock()
-			stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
+			stat, err := self.chain.WriteBlockWithState(block, result.Receipts, result.State)
 			self.currentMu.Unlock()
 			if err != nil {
 				log.Error("Failed writing block to chain", "err", err)
@@ -313,7 +308,7 @@ func (self *worker) wait() {
 			self.mux.Post(core.NewMinedBlockEvent{Block: block})
 			var (
 				events []interface{}
-				logs   = work.state.Logs()
+				logs   = result.State.Logs()
 			)
 			events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
 			if stat == core.CanonStatTy {
@@ -328,12 +323,9 @@ func (self *worker) wait() {
 }
 
 // push sends a new work task to currently live miner agents.
-func (self *worker) push(work *Work) {
+func (self *worker) push(p *Package) {
 	for agent := range self.agents {
-		atomic.AddInt32(&self.atWork, 1)
-		if ch := agent.Work(); ch != nil {
-			ch <- work
-		}
+		agent.AssignTask(p)
 	}
 }
 
@@ -343,7 +335,7 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error
 	if err != nil {
 		return err
 	}
-	work := &Work{
+	env := &Env{
 		config:    self.config,
 		signer:    types.NewEIP155Signer(self.config.ChainID),
 		state:     state,
@@ -357,15 +349,15 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error
 	// when 08 is processed ancestors contain 07 (quick block)
 	for _, ancestor := range self.chain.GetBlocksFromHash(parent.Hash(), 7) {
 		for _, uncle := range ancestor.Uncles() {
-			work.family.Add(uncle.Hash())
+			env.family.Add(uncle.Hash())
 		}
-		work.family.Add(ancestor.Hash())
-		work.ancestors.Add(ancestor.Hash())
+		env.family.Add(ancestor.Hash())
+		env.ancestors.Add(ancestor.Hash())
 	}
 
 	// Keep track of transactions which return errors so they can be removed
-	work.tcount = 0
-	self.current = work
+	env.tcount = 0
+	self.current = env
 	return nil
 }
 
@@ -431,9 +423,9 @@ func (self *worker) commitNewWork() {
 		return
 	}
 	// Create the current work task and check any fork transitions needed
-	work := self.current
+	env := self.current
 	if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
-		misc.ApplyDAOHardFork(work.state)
+		misc.ApplyDAOHardFork(env.state)
 	}
 
 	// compute uncles for the new block.
@@ -445,7 +437,7 @@ func (self *worker) commitNewWork() {
 		if len(uncles) == 2 {
 			break
 		}
-		if err := self.commitUncle(work, uncle.Header()); err != nil {
+		if err := self.commitUncle(env, uncle.Header()); err != nil {
 			log.Trace("Bad uncle found and will be removed", "hash", hash)
 			log.Trace(fmt.Sprint(uncle))
 
@@ -459,17 +451,23 @@ func (self *worker) commitNewWork() {
 		delete(self.possibleUncles, hash)
 	}
 
+	var (
+		emptyBlock *types.Block
+		fullBlock  *types.Block
+	)
+
 	// Create an empty block based on temporary copied state for sealing in advance without waiting block
 	// execution finished.
-	if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil {
+	emptyState := env.state.Copy()
+	if emptyBlock, err = self.engine.Finalize(self.chain, header, emptyState, nil, uncles, nil); err != nil {
 		log.Error("Failed to finalize block for temporary sealing", "err", err)
 	} else {
 		// Push empty work in advance without applying pending transaction.
 		// The reason is transactions execution can cost a lot and sealer need to
 		// take advantage of this part time.
 		if self.isRunning() {
-			log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles))
-			self.push(work)
+			log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles))
+			self.push(&Package{nil, emptyState, emptyBlock})
 		}
 	}
 
@@ -480,34 +478,34 @@ func (self *worker) commitNewWork() {
 		return
 	}
 	txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
-	work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
+	env.commitTransactions(self.mux, txs, self.chain, self.coinbase)
 
 	// Create the full block to seal with the consensus engine
-	if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
+	if fullBlock, err = self.engine.Finalize(self.chain, header, env.state, env.txs, uncles, env.receipts); err != nil {
 		log.Error("Failed to finalize block for sealing", "err", err)
 		return
 	}
 	// We only care about logging if we're actually mining.
 	if self.isRunning() {
-		log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
-		self.unconfirmed.Shift(work.Block.NumberU64() - 1)
-		self.push(work)
+		log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
+		self.unconfirmed.Shift(fullBlock.NumberU64() - 1)
+		self.push(&Package{env.receipts, env.state, fullBlock})
 	}
 	self.updateSnapshot()
 }
 
-func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
+func (self *worker) commitUncle(env *Env, uncle *types.Header) error {
 	hash := uncle.Hash()
-	if work.uncles.Contains(hash) {
+	if env.uncles.Contains(hash) {
 		return fmt.Errorf("uncle not unique")
 	}
-	if !work.ancestors.Contains(uncle.ParentHash) {
+	if !env.ancestors.Contains(uncle.ParentHash) {
 		return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4])
 	}
-	if work.family.Contains(hash) {
+	if env.family.Contains(hash) {
 		return fmt.Errorf("uncle already in family (%x)", hash)
 	}
-	work.uncles.Add(uncle.Hash())
+	env.uncles.Add(uncle.Hash())
 	return nil
 }
 
@@ -533,7 +531,7 @@ func (self *worker) updateSnapshot() {
 	self.snapshotState = self.current.state.Copy()
 }
 
-func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
+func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
 	if env.gasPool == nil {
 		env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit)
 	}
@@ -618,7 +616,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
 	}
 }
 
-func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) {
+func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) {
 	snap := env.state.Snapshot()
 
 	receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{})