Sfoglia il codice sorgente

[R4R]apply max commit tx time for miner worker (#112)

* apply max commit tx time for miner worker

* update to 200 ms
zjubfd 4 anni fa
parent
commit
f8faf7faaa

+ 1 - 1
cmd/faucet/website.go

@@ -224,7 +224,7 @@ type bintree struct {
 }
 
 var _bintree = &bintree{nil, map[string]*bintree{
-	"faucet.html": &bintree{faucetHtml, map[string]*bintree{}},
+	"faucet.html": {faucetHtml, map[string]*bintree{}},
 }}
 
 // RestoreAsset restores an asset under the given directory.

+ 2 - 2
cmd/geth/chaincmd.go

@@ -39,9 +39,9 @@ import (
 	"github.com/ethereum/go-ethereum/eth/downloader"
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/node"
 	"github.com/ethereum/go-ethereum/p2p/enode"
-	"github.com/ethereum/go-ethereum/metrics"
 	"github.com/ethereum/go-ethereum/trie"
 	"gopkg.in/urfave/cli.v1"
 )
@@ -363,7 +363,7 @@ func initNetwork(ctx *cli.Context) error {
 		defer dump.Close()
 		dump.Write(out)
 	}
-        return nil
+	return nil
 }
 
 func dumpGenesis(ctx *cli.Context) error {

+ 1 - 0
cmd/geth/main.go

@@ -130,6 +130,7 @@ var (
 		utils.MinerExtraDataFlag,
 		utils.MinerLegacyExtraDataFlag,
 		utils.MinerRecommitIntervalFlag,
+		utils.MinerDelayLeftoverFlag,
 		utils.MinerNoVerfiyFlag,
 		utils.NATFlag,
 		utils.NoDiscoverFlag,

+ 4 - 0
cmd/geth/retesteth.go

@@ -256,6 +256,10 @@ func (e *NoRewardEngine) FinalizeAndAssemble(chain consensus.ChainReader, header
 	}
 }
 
+func (e *NoRewardEngine) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration {
+	return nil
+}
+
 func (e *NoRewardEngine) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
 	return e.inner.Seal(chain, block, results, stop)
 }

+ 1 - 0
cmd/geth/usage.go

@@ -211,6 +211,7 @@ var AppHelpFlagGroups = []flagGroup{
 			utils.MinerEtherbaseFlag,
 			utils.MinerExtraDataFlag,
 			utils.MinerRecommitIntervalFlag,
+			utils.MinerDelayLeftoverFlag,
 			utils.MinerNoVerfiyFlag,
 		},
 	},

+ 8 - 0
cmd/utils/flags.go

@@ -491,6 +491,11 @@ var (
 		Usage: "Time interval to recreate the block being mined",
 		Value: eth.DefaultConfig.Miner.Recommit,
 	}
+	MinerDelayLeftoverFlag = cli.DurationFlag{
+		Name:  "miner.delayleftover",
+		Usage: "Time interval to for broadcast block",
+		Value: eth.DefaultConfig.Miner.DelayLeftOver,
+	}
 	MinerNoVerfiyFlag = cli.BoolFlag{
 		Name:  "miner.noverify",
 		Usage: "Disable remote sealing verification",
@@ -1412,6 +1417,9 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
 	if ctx.GlobalIsSet(MinerRecommitIntervalFlag.Name) {
 		cfg.Recommit = ctx.Duration(MinerRecommitIntervalFlag.Name)
 	}
+	if ctx.GlobalIsSet(MinerDelayLeftoverFlag.Name) {
+		cfg.DelayLeftOver = ctx.Duration(MinerDelayLeftoverFlag.Name)
+	}
 	if ctx.GlobalIsSet(MinerNoVerfiyFlag.Name) {
 		cfg.Noverify = ctx.Bool(MinerNoVerfiyFlag.Name)
 	}

+ 4 - 0
consensus/clique/clique.go

@@ -582,6 +582,10 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) {
 	c.signFn = signFn
 }
 
+func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration {
+	return nil
+}
+
 // Seal implements consensus.Engine, attempting to create a sealed block using
 // the local signing credentials.
 func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {

+ 4 - 0
consensus/consensus.go

@@ -19,6 +19,7 @@ package consensus
 
 import (
 	"math/big"
+	"time"
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/state"
@@ -116,6 +117,9 @@ type Engine interface {
 	// APIs returns the RPC APIs this consensus engine provides.
 	APIs(chain ChainReader) []rpc.API
 
+	// Delay returns the max duration the miner can commit txs
+	Delay(chain ChainReader, header *types.Header) *time.Duration
+
 	// Close terminates any background threads maintained by the consensus engine.
 	Close() error
 }

+ 4 - 0
consensus/ethash/consensus.go

@@ -589,6 +589,10 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainReader, header *t
 	return types.NewBlock(header, txs, uncles, receipts), receipts, nil
 }
 
+func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration {
+	return nil
+}
+
 // SealHash returns the hash of a block prior to it being sealed.
 func (ethash *Ethash) SealHash(header *types.Header) (hash common.Hash) {
 	hasher := sha3.NewLegacyKeccak256()

+ 10 - 0
consensus/parlia/parlia.go

@@ -780,6 +780,16 @@ func (p *Parlia) Authorize(val common.Address, signFn SignerFn, signTxFn SignerT
 	p.signTxFn = signTxFn
 }
 
+func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration {
+	number := header.Number.Uint64()
+	snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
+	if err != nil {
+		return nil
+	}
+	delay := p.delayForRamanujanFork(snap, header)
+	return &delay
+}
+
 // Seal implements consensus.Engine, attempting to create a sealed block using
 // the local signing credentials.
 func (p *Parlia) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {

+ 5 - 4
eth/config.go

@@ -54,10 +54,11 @@ var DefaultConfig = Config{
 	TrieTimeout:        60 * time.Minute,
 	SnapshotCache:      256,
 	Miner: miner.Config{
-		GasFloor: 8000000,
-		GasCeil:  8000000,
-		GasPrice: big.NewInt(params.GWei),
-		Recommit: 3 * time.Second,
+		GasFloor:      8000000,
+		GasCeil:       8000000,
+		GasPrice:      big.NewInt(params.GWei),
+		Recommit:      3 * time.Second,
+		DelayLeftOver: 50 * time.Millisecond,
 	},
 	TxPool: core.DefaultTxPoolConfig,
 	GPO: gasprice.Config{

+ 9 - 8
miner/miner.go

@@ -43,14 +43,15 @@ type Backend interface {
 
 // Config is the configuration parameters of mining.
 type Config struct {
-	Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account)
-	Notify    []string       `toml:",omitempty"` // HTTP URL list to be notified of new work packages(only useful in ethash).
-	ExtraData hexutil.Bytes  `toml:",omitempty"` // Block extra data set by the miner
-	GasFloor  uint64         // Target gas floor for mined blocks.
-	GasCeil   uint64         // Target gas ceiling for mined blocks.
-	GasPrice  *big.Int       // Minimum gas price for mining a transaction
-	Recommit  time.Duration  // The time interval for miner to re-create mining work.
-	Noverify  bool           // Disable remote mining solution verification(only useful in ethash).
+	Etherbase     common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account)
+	Notify        []string       `toml:",omitempty"` // HTTP URL list to be notified of new work packages(only useful in ethash).
+	ExtraData     hexutil.Bytes  `toml:",omitempty"` // Block extra data set by the miner
+	DelayLeftOver time.Duration  // Time for broadcast block
+	GasFloor      uint64         // Target gas floor for mined blocks.
+	GasCeil       uint64         // Target gas ceiling for mined blocks.
+	GasPrice      *big.Int       // Minimum gas price for mining a transaction
+	Recommit      time.Duration  // The time interval for miner to re-create mining work.
+	Noverify      bool           // Disable remote mining solution verification(only useful in ethash).
 }
 
 // Miner creates blocks and searches for proof-of-work values.

+ 39 - 27
miner/worker.go

@@ -354,7 +354,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
 		case <-w.startCh:
 			clearPending(w.chain.CurrentBlock().NumberU64())
 			timestamp = time.Now().Unix()
-			commit(false, commitInterruptNewHead)
+			commit(true, commitInterruptNewHead)
 
 		case head := <-w.chainHeadCh:
 			if !w.isRunning() {
@@ -362,7 +362,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
 			}
 			clearPending(head.Block.NumberU64())
 			timestamp = time.Now().Unix()
-			commit(false, commitInterruptNewHead)
+			commit(true, commitInterruptNewHead)
 
 		case <-timer.C:
 			// If mining is running resubmit a new work cycle periodically to pull in
@@ -737,7 +737,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
 	}
 
 	var coalescedLogs []*types.Log
-
+	var stopTimer *time.Timer
+	delay := w.engine.Delay(w.chain, w.current.header)
+	if delay != nil {
+		stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver)
+		log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver)
+		defer stopTimer.Stop()
+	}
+LOOP:
 	for {
 		// In the following three cases, we will interrupt the execution of the transaction.
 		// (1) new head block event arrival, the interrupt signal is 1
@@ -764,6 +771,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
 			log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
 			break
 		}
+		if stopTimer != nil {
+			select {
+			case <-stopTimer.C:
+				log.Info("Not enough time for further transactions", "txs", len(w.current.txs))
+				break LOOP
+			default:
+			}
+		}
 		// Retrieve the next transaction and abort if all done
 		tx := txs.Peek()
 		if tx == nil {
@@ -937,36 +952,33 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
 	pending, err := w.eth.TxPool().Pending()
 	if err != nil {
 		log.Error("Failed to fetch pending transactions", "err", err)
-		return
 	}
 	// Short circuit if there is no available pending transactions
-	if len(pending) == 0 {
-		w.updateSnapshot()
-		return
-	}
-	start := time.Now()
-	// Split the pending transactions into locals and remotes
-	localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
-	for _, account := range w.eth.TxPool().Locals() {
-		if txs := remoteTxs[account]; len(txs) > 0 {
-			delete(remoteTxs, account)
-			localTxs[account] = txs
+	if len(pending) != 0 {
+		start := time.Now()
+		// Split the pending transactions into locals and remotes
+		localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
+		for _, account := range w.eth.TxPool().Locals() {
+			if txs := remoteTxs[account]; len(txs) > 0 {
+				delete(remoteTxs, account)
+				localTxs[account] = txs
+			}
 		}
-	}
-	if len(localTxs) > 0 {
-		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
-		if w.commitTransactions(txs, w.coinbase, interrupt) {
-			return
+		if len(localTxs) > 0 {
+			txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
+			if w.commitTransactions(txs, w.coinbase, interrupt) {
+				return
+			}
 		}
-	}
-	if len(remoteTxs) > 0 {
-		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
-		if w.commitTransactions(txs, w.coinbase, interrupt) {
-			return
+		if len(remoteTxs) > 0 {
+			txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
+			if w.commitTransactions(txs, w.coinbase, interrupt) {
+				return
+			}
 		}
+		commitTxsTimer.UpdateSince(start)
+		log.Info("Gas pool", "height", header.Number.String(), "pool", w.current.gasPool.String())
 	}
-	commitTxsTimer.UpdateSince(start)
-	log.Info("Gas pool", "height", header.Number.String(), "pool", w.current.gasPool.String())
 	w.commit(uncles, w.fullTaskHook, true, tstart)
 }