Эх сурвалжийг харах

miner: regenerate mining work every 3 seconds (#17413)

* miner: regenerate mining work every 3 seconds

* miner: polish
gary rong 7 жил өмнө
parent
commit
54216811a0
3 өөрчлөгдсөн 267 нэмэгдсэн , 133 устгасан
  1. 0 3
      core/events.go
  2. 202 130
      miner/worker.go
  3. 65 0
      miner/worker_test.go

+ 0 - 3
core/events.go

@@ -29,9 +29,6 @@ type PendingLogsEvent struct {
 	Logs []*types.Log
 }
 
-// PendingStateEvent is posted pre mining and notifies of pending state changes.
-type PendingStateEvent struct{}
-
 // NewMinedBlockEvent is posted when a block has been imported.
 type NewMinedBlockEvent struct{ Block *types.Block }
 

+ 202 - 130
miner/worker.go

@@ -40,19 +40,27 @@ import (
 const (
 	// resultQueueSize is the size of channel listening to sealing result.
 	resultQueueSize = 10
+
 	// txChanSize is the size of channel listening to NewTxsEvent.
 	// The number is referenced from the size of tx pool.
 	txChanSize = 4096
+
 	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
 	chainHeadChanSize = 10
+
 	// chainSideChanSize is the size of channel listening to ChainSideEvent.
 	chainSideChanSize = 10
-	miningLogAtDepth  = 5
+
+	// miningLogAtDepth is the number of confirmations before logging successful mining.
+	miningLogAtDepth = 5
+
+	// blockRecommitInterval is the time interval to recreate the mining block with
+	// any newly arrived transactions.
+	blockRecommitInterval = 3 * time.Second
 )
 
-// Env is the worker's current environment and holds all of the current state information.
-type Env struct {
-	config *params.ChainConfig
+// environment is the worker's current environment and holds all of the current state information.
+type environment struct {
 	signer types.Signer
 
 	state     *state.StateDB // apply state changes here
@@ -67,105 +75,6 @@ type Env struct {
 	receipts []*types.Receipt
 }
 
-func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) {
-	snap := env.state.Snapshot()
-
-	receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{})
-	if err != nil {
-		env.state.RevertToSnapshot(snap)
-		return err, nil
-	}
-	env.txs = append(env.txs, tx)
-	env.receipts = append(env.receipts, receipt)
-
-	return nil, receipt.Logs
-}
-
-func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
-	if env.gasPool == nil {
-		env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit)
-	}
-
-	var coalescedLogs []*types.Log
-
-	for {
-		// If we don't have enough gas for any further transactions then we're done
-		if env.gasPool.Gas() < params.TxGas {
-			log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
-			break
-		}
-		// Retrieve the next transaction and abort if all done
-		tx := txs.Peek()
-		if tx == nil {
-			break
-		}
-		// Error may be ignored here. The error has already been checked
-		// during transaction acceptance is the transaction pool.
-		//
-		// We use the eip155 signer regardless of the current hf.
-		from, _ := types.Sender(env.signer, tx)
-		// Check whether the tx is replay protected. If we're not in the EIP155 hf
-		// phase, start ignoring the sender until we do.
-		if tx.Protected() && !env.config.IsEIP155(env.header.Number) {
-			log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block)
-
-			txs.Pop()
-			continue
-		}
-		// Start executing the transaction
-		env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
-
-		err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool)
-		switch err {
-		case core.ErrGasLimitReached:
-			// Pop the current out-of-gas transaction without shifting in the next from the account
-			log.Trace("Gas limit exceeded for current block", "sender", from)
-			txs.Pop()
-
-		case core.ErrNonceTooLow:
-			// New head notification data race between the transaction pool and miner, shift
-			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
-			txs.Shift()
-
-		case core.ErrNonceTooHigh:
-			// Reorg notification data race between the transaction pool and miner, skip account =
-			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
-			txs.Pop()
-
-		case nil:
-			// Everything ok, collect the logs and shift in the next transaction from the same account
-			coalescedLogs = append(coalescedLogs, logs...)
-			env.tcount++
-			txs.Shift()
-
-		default:
-			// Strange error, discard the transaction and get the next in line (note, the
-			// nonce-too-high clause will prevent us from executing in vain).
-			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
-			txs.Shift()
-		}
-	}
-
-	if len(coalescedLogs) > 0 || env.tcount > 0 {
-		// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
-		// logs by filling in the block hash when the block was mined by the local miner. This can
-		// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
-		cpy := make([]*types.Log, len(coalescedLogs))
-		for i, l := range coalescedLogs {
-			cpy[i] = new(types.Log)
-			*cpy[i] = *l
-		}
-		go func(logs []*types.Log, tcount int) {
-			if len(logs) > 0 {
-				mux.Post(core.PendingLogsEvent{Logs: logs})
-			}
-			if tcount > 0 {
-				mux.Post(core.PendingStateEvent{})
-			}
-		}(cpy, env.tcount)
-	}
-}
-
 // task contains all information for consensus engine sealing and result submitting.
 type task struct {
 	receipts  []*types.Receipt
@@ -174,6 +83,17 @@ type task struct {
 	createdAt time.Time
 }
 
+const (
+	commitInterruptNone int32 = iota
+	commitInterruptNewHead
+	commitInterruptResubmit
+)
+
+type newWorkReq struct {
+	interrupt *int32
+	noempty   bool
+}
+
 // worker is the main object which takes care of submitting new work to consensus engine
 // and gathering the sealing result.
 type worker struct {
@@ -192,12 +112,13 @@ type worker struct {
 	chainSideSub event.Subscription
 
 	// Channels
-	newWork  chan struct{}
-	taskCh   chan *task
-	resultCh chan *task
-	exitCh   chan struct{}
+	newWorkCh chan *newWorkReq
+	taskCh    chan *task
+	resultCh  chan *task
+	startCh   chan struct{}
+	exitCh    chan struct{}
 
-	current        *Env                         // An environment for current running cycle.
+	current        *environment                 // An environment for current running cycle.
 	possibleUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
 	unconfirmed    *unconfirmedBlocks           // A set of locally mined blocks pending canonicalness confirmations.
 
@@ -230,10 +151,11 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
 		txsCh:          make(chan core.NewTxsEvent, txChanSize),
 		chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),
 		chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),
-		newWork:        make(chan struct{}, 1),
+		newWorkCh:      make(chan *newWorkReq),
 		taskCh:         make(chan *task),
 		resultCh:       make(chan *task, resultQueueSize),
 		exitCh:         make(chan struct{}),
+		startCh:        make(chan struct{}, 1),
 	}
 	// Subscribe NewTxsEvent for tx pool
 	worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
@@ -242,11 +164,13 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
 	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
 
 	go worker.mainLoop()
+	go worker.newWorkLoop()
 	go worker.resultLoop()
 	go worker.taskLoop()
 
 	// Submit first work to initialize pending state.
-	worker.newWork <- struct{}{}
+	worker.startCh <- struct{}{}
+
 	return worker
 }
 
@@ -286,7 +210,7 @@ func (w *worker) pendingBlock() *types.Block {
 // start sets the running status as 1 and triggers new work submitting.
 func (w *worker) start() {
 	atomic.StoreInt32(&w.running, 1)
-	w.newWork <- struct{}{}
+	w.startCh <- struct{}{}
 }
 
 // stop sets the running status as 0.
@@ -313,6 +237,44 @@ func (w *worker) close() {
 	}
 }
 
+// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
+func (w *worker) newWorkLoop() {
+	var interrupt *int32
+
+	timer := time.NewTimer(0)
+	<-timer.C // discard the initial tick
+
+	// recommit aborts in-flight transaction execution with given signal and resubmits a new one.
+	recommit := func(noempty bool, s int32) {
+		if interrupt != nil {
+			atomic.StoreInt32(interrupt, s)
+		}
+		interrupt = new(int32)
+		w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty}
+		timer.Reset(blockRecommitInterval)
+	}
+
+	for {
+		select {
+		case <-w.startCh:
+			recommit(false, commitInterruptNewHead)
+
+		case <-w.chainHeadCh:
+			recommit(false, commitInterruptNewHead)
+
+		case <-timer.C:
+			// If mining is running resubmit a new work cycle periodically to pull in
+			// higher priced transactions. Disable this overhead for pending blocks.
+			if w.isRunning() && (w.config.Clique == nil || w.config.Clique.Period > 0) {
+				recommit(true, commitInterruptResubmit)
+			}
+
+		case <-w.exitCh:
+			return
+		}
+	}
+}
+
 // mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
 func (w *worker) mainLoop() {
 	defer w.txsSub.Unsubscribe()
@@ -321,13 +283,8 @@ func (w *worker) mainLoop() {
 
 	for {
 		select {
-		case <-w.newWork:
-			// Submit a work when the worker is created or started.
-			w.commitNewWork()
-
-		case <-w.chainHeadCh:
-			// Resubmit a work for new cycle once worker receives chain head event.
-			w.commitNewWork()
+		case req := <-w.newWorkCh:
+			w.commitNewWork(req.interrupt, req.noempty)
 
 		case ev := <-w.chainSideCh:
 			if _, exist := w.possibleUncles[ev.Block.Hash()]; exist {
@@ -364,9 +321,9 @@ func (w *worker) mainLoop() {
 			// already included in the current mining block. These transactions will
 			// be automatically eliminated.
 			if !w.isRunning() && w.current != nil {
-				w.mu.Lock()
+				w.mu.RLock()
 				coinbase := w.coinbase
-				w.mu.Unlock()
+				w.mu.RUnlock()
 
 				txs := make(map[common.Address]types.Transactions)
 				for _, tx := range ev.Txs {
@@ -374,12 +331,12 @@ func (w *worker) mainLoop() {
 					txs[acc] = append(txs[acc], tx)
 				}
 				txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
-				w.current.commitTransactions(w.mux, txset, w.chain, coinbase)
+				w.commitTransactions(txset, coinbase, nil)
 				w.updateSnapshot()
 			} else {
 				// If we're mining, but nothing is being processed, wake on new transactions
 				if w.config.Clique != nil && w.config.Clique.Period == 0 {
-					w.commitNewWork()
+					w.commitNewWork(nil, false)
 				}
 			}
 
@@ -508,8 +465,7 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
 	if err != nil {
 		return err
 	}
-	env := &Env{
-		config:    w.config,
+	env := &environment{
 		signer:    types.NewEIP155Signer(w.config.ChainID),
 		state:     state,
 		ancestors: mapset.NewSet(),
@@ -534,7 +490,7 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
 }
 
 // commitUncle adds the given block to uncle block set, returns error if failed to add.
-func (w *worker) commitUncle(env *Env, uncle *types.Header) error {
+func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
 	hash := uncle.Hash()
 	if env.uncles.Contains(hash) {
 		return fmt.Errorf("uncle not unique")
@@ -579,8 +535,120 @@ func (w *worker) updateSnapshot() {
 	w.snapshotState = w.current.state.Copy()
 }
 
+func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
+	snap := w.current.state.Snapshot()
+
+	receipt, _, err := core.ApplyTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, vm.Config{})
+	if err != nil {
+		w.current.state.RevertToSnapshot(snap)
+		return nil, err
+	}
+	w.current.txs = append(w.current.txs, tx)
+	w.current.receipts = append(w.current.receipts, receipt)
+
+	return receipt.Logs, nil
+}
+
+func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
+	// Short circuit if current is nil
+	if w.current == nil {
+		return true
+	}
+
+	if w.current.gasPool == nil {
+		w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
+	}
+
+	var coalescedLogs []*types.Log
+
+	for {
+		// In the following three cases, we will interrupt the execution of the transaction.
+		// (1) new head block event arrival, the interrupt signal is 1
+		// (2) worker start or restart, the interrupt signal is 1
+		// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
+		// For the first two cases, the semi-finished work will be discarded.
+		// For the third case, the semi-finished work will be submitted to the consensus engine.
+		// TODO(rjl493456442) give feedback to newWorkLoop to adjust resubmit interval if it is too short.
+		if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
+			return atomic.LoadInt32(interrupt) == commitInterruptNewHead
+		}
+		// If we don't have enough gas for any further transactions then we're done
+		if w.current.gasPool.Gas() < params.TxGas {
+			log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
+			break
+		}
+		// Retrieve the next transaction and abort if all done
+		tx := txs.Peek()
+		if tx == nil {
+			break
+		}
+		// Error may be ignored here. The error has already been checked
+		// during transaction acceptance is the transaction pool.
+		//
+		// We use the eip155 signer regardless of the current hf.
+		from, _ := types.Sender(w.current.signer, tx)
+		// Check whether the tx is replay protected. If we're not in the EIP155 hf
+		// phase, start ignoring the sender until we do.
+		if tx.Protected() && !w.config.IsEIP155(w.current.header.Number) {
+			log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.config.EIP155Block)
+
+			txs.Pop()
+			continue
+		}
+		// Start executing the transaction
+		w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)
+
+		logs, err := w.commitTransaction(tx, coinbase)
+		switch err {
+		case core.ErrGasLimitReached:
+			// Pop the current out-of-gas transaction without shifting in the next from the account
+			log.Trace("Gas limit exceeded for current block", "sender", from)
+			txs.Pop()
+
+		case core.ErrNonceTooLow:
+			// New head notification data race between the transaction pool and miner, shift
+			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
+			txs.Shift()
+
+		case core.ErrNonceTooHigh:
+			// Reorg notification data race between the transaction pool and miner, skip account =
+			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
+			txs.Pop()
+
+		case nil:
+			// Everything ok, collect the logs and shift in the next transaction from the same account
+			coalescedLogs = append(coalescedLogs, logs...)
+			w.current.tcount++
+			txs.Shift()
+
+		default:
+			// Strange error, discard the transaction and get the next in line (note, the
+			// nonce-too-high clause will prevent us from executing in vain).
+			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
+			txs.Shift()
+		}
+	}
+
+	if !w.isRunning() && len(coalescedLogs) > 0 {
+		// We don't push the pendingLogsEvent while we are mining. The reason is that
+		// when we are mining, the worker will regenerate a mining block every 3 seconds.
+		// In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.
+
+		// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
+		// logs by filling in the block hash when the block was mined by the local miner. This can
+		// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
+		cpy := make([]*types.Log, len(coalescedLogs))
+		for i, l := range coalescedLogs {
+			cpy[i] = new(types.Log)
+			*cpy[i] = *l
+		}
+		go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
+	}
+	return false
+}
+
 // commitNewWork generates several new sealing tasks based on the parent block.
-func (w *worker) commitNewWork() {
+func (w *worker) commitNewWork(interrupt *int32, noempty bool) {
 	w.mu.RLock()
 	defer w.mu.RUnlock()
 
@@ -666,9 +734,11 @@ func (w *worker) commitNewWork() {
 		delete(w.possibleUncles, hash)
 	}
 
-	// Create an empty block based on temporary copied state for sealing in advance without waiting block
-	// execution finished.
-	w.commit(uncles, nil, false, tstart)
+	if !noempty {
+		// Create an empty block based on temporary copied state for sealing in advance without waiting block
+		// execution finished.
+		w.commit(uncles, nil, false, tstart)
+	}
 
 	// Fill the block with all available pending transactions.
 	pending, err := w.eth.TxPool().Pending()
@@ -682,7 +752,9 @@ func (w *worker) commitNewWork() {
 		return
 	}
 	txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending)
-	env.commitTransactions(w.mux, txs, w.chain, w.coinbase)
+	if w.commitTransactions(txs, w.coinbase, interrupt) {
+		return
+	}
 
 	w.commit(uncles, w.fullTaskHook, true, tstart)
 }

+ 65 - 0
miner/worker_test.go

@@ -270,3 +270,68 @@ func TestStreamUncleBlock(t *testing.T) {
 		t.Error("new task timeout")
 	}
 }
+
+func TestRegenerateMiningBlockEthash(t *testing.T) {
+	testRegenerateMiningBlock(t, ethashChainConfig, ethash.NewFaker())
+}
+
+func TestRegenerateMiningBlockClique(t *testing.T) {
+	testRegenerateMiningBlock(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, ethdb.NewMemDatabase()))
+}
+
+func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
+	defer engine.Close()
+
+	w, b := newTestWorker(t, chainConfig, engine)
+	defer w.close()
+
+	var taskCh = make(chan struct{})
+
+	taskIndex := 0
+	w.newTaskHook = func(task *task) {
+		if task.block.NumberU64() == 1 {
+			if taskIndex == 2 {
+				receiptLen, balance := 2, big.NewInt(2000)
+				if len(task.receipts) != receiptLen {
+					t.Errorf("receipt number mismatch has %d, want %d", len(task.receipts), receiptLen)
+				}
+				if task.state.GetBalance(acc1Addr).Cmp(balance) != 0 {
+					t.Errorf("account balance mismatch has %d, want %d", task.state.GetBalance(acc1Addr), balance)
+				}
+			}
+			taskCh <- struct{}{}
+			taskIndex += 1
+		}
+	}
+	w.skipSealHook = func(task *task) bool {
+		return true
+	}
+	w.fullTaskHook = func() {
+		time.Sleep(100 * time.Millisecond)
+	}
+	// Ensure worker has finished initialization
+	for {
+		b := w.pendingBlock()
+		if b != nil && b.NumberU64() == 1 {
+			break
+		}
+	}
+
+	w.start()
+	// Ignore the first two works
+	for i := 0; i < 2; i += 1 {
+		select {
+		case <-taskCh:
+		case <-time.NewTimer(time.Second).C:
+			t.Error("new task timeout")
+		}
+	}
+	b.txPool.AddLocals(newTxs)
+	time.Sleep(3 * time.Second)
+
+	select {
+	case <-taskCh:
+	case <-time.NewTimer(time.Second).C:
+		t.Error("new task timeout")
+	}
+}