فهرست منبع

Merge pull request #426 from binance-chain/improve_backoff

[R4R] add block proccess backoff time when validator is not in turn and received in turn block
yutianwu 4 سال پیش
والد
کامیت
c737f66839
6فایلهای تغییر یافته به همراه76 افزوده شده و 5 حذف شده
  1. 3 0
      consensus/consensus.go
  2. 27 0
      consensus/parlia/parlia.go
  3. 33 3
      core/blockchain.go
  4. 1 0
      core/chain_makers.go
  5. 6 1
      core/headerchain.go
  6. 6 1
      light/lightchain.go

+ 3 - 0
consensus/consensus.go

@@ -49,6 +49,9 @@ type ChainHeaderReader interface {
 
 	// GetHeaderByHash retrieves a block header from the database by its hash.
 	GetHeaderByHash(hash common.Hash) *types.Header
+
+	// GetHighestVerifiedHeader retrieves the highest header verified.
+	GetHighestVerifiedHeader() *types.Header
 }
 
 // ChainReader defines a small collection of methods needed to access the local

+ 27 - 0
consensus/parlia/parlia.go

@@ -56,6 +56,7 @@ const (
 	validatorBytesLength = common.AddressLength
 	wiggleTime           = uint64(1) // second, Random delay (per signer) to allow concurrent signers
 	initialBackOffTime   = uint64(1) // second
+	processBackOffTime   = uint64(1) // second
 
 	systemRewardPercent = 4 // it means 1/2^4 = 1/16 percentage of gas fee incoming will be distributed to system
 
@@ -868,6 +869,16 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res
 			return
 		case <-time.After(delay):
 		}
+		if p.shouldWaitForCurrentBlockProcess(chain, header, snap) {
+			log.Info("Waiting for received in turn block to process")
+			select {
+			case <-stop:
+				log.Info("Received block process finished, abort block seal")
+				return
+			case <-time.After(time.Duration(processBackOffTime) * time.Second):
+				log.Info("Process backoff time exhausted, start to seal block")
+			}
+		}
 
 		select {
 		case results <- block.WithSeal(header):
@@ -879,6 +890,22 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res
 	return nil
 }
 
+func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderReader, header *types.Header, snap *Snapshot) bool {
+	if header.Difficulty.Cmp(diffInTurn) == 0 {
+		return false
+	}
+
+	highestVerifiedHeader := chain.GetHighestVerifiedHeader()
+	if highestVerifiedHeader == nil {
+		return false
+	}
+
+	if header.ParentHash == highestVerifiedHeader.ParentHash {
+		return true
+	}
+	return false
+}
+
 func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool {
 	snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil)
 	if err != nil {

+ 33 - 3
core/blockchain.go

@@ -28,6 +28,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	lru "github.com/hashicorp/golang-lru"
+
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/common/mclock"
 	"github.com/ethereum/go-ethereum/common/prque"
@@ -44,7 +46,6 @@ import (
 	"github.com/ethereum/go-ethereum/params"
 	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/ethereum/go-ethereum/trie"
-	lru "github.com/hashicorp/golang-lru"
 )
 
 var (
@@ -200,8 +201,9 @@ type BlockChain struct {
 
 	chainmu sync.RWMutex // blockchain insertion lock
 
-	currentBlock     atomic.Value // Current head of the block chain
-	currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
+	currentBlock          atomic.Value // Current head of the block chain
+	currentFastBlock      atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
+	highestVerifiedHeader atomic.Value
 
 	stateCache    state.Database // State database to reuse between imports (contains state cache)
 	bodyCache     *lru.Cache     // Cache for the most recent block bodies
@@ -310,6 +312,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 	bc.currentBlock.Store(nilBlock)
 	bc.currentFastBlock.Store(nilBlock)
 
+	var nilHeader *types.Header
+	bc.highestVerifiedHeader.Store(nilHeader)
+
 	// Initialize the chain with ancient data if it isn't empty.
 	var txIndexBlock uint64
 
@@ -2039,6 +2044,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
 		if err != nil {
 			return it.index, err
 		}
+		bc.updateHighestVerifiedHeader(block.Header())
+
 		// Enable prefetching to pull in trie node paths while processing transactions
 		statedb.StartPrefetcher("chain")
 
@@ -2144,6 +2151,29 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
 	return it.index, err
 }
 
+func (bc *BlockChain) updateHighestVerifiedHeader(header *types.Header) {
+	if header == nil || header.Number == nil {
+		return
+	}
+	currentHeader := bc.highestVerifiedHeader.Load().(*types.Header)
+	if currentHeader == nil {
+		bc.highestVerifiedHeader.Store(types.CopyHeader(header))
+		return
+	}
+
+	newTD := big.NewInt(0).Add(bc.GetTdByHash(header.ParentHash), header.Difficulty)
+	oldTD := big.NewInt(0).Add(bc.GetTdByHash(currentHeader.ParentHash), currentHeader.Difficulty)
+
+	if newTD.Cmp(oldTD) > 0 {
+		bc.highestVerifiedHeader.Store(types.CopyHeader(header))
+		return
+	}
+}
+
+func (bc *BlockChain) GetHighestVerifiedHeader() *types.Header {
+	return bc.highestVerifiedHeader.Load().(*types.Header)
+}
+
 // insertSideChain is called when an import batch hits upon a pruned ancestor
 // error, which happens when a sidechain with a sufficiently old fork-block is
 // found.

+ 1 - 0
core/chain_makers.go

@@ -303,3 +303,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header
 func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header          { return nil }
 func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil }
 func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block   { return nil }
+func (cr *fakeChainReader) GetHighestVerifiedHeader() *types.Header                 { return nil }

+ 6 - 1
core/headerchain.go

@@ -26,6 +26,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	lru "github.com/hashicorp/golang-lru"
+
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/consensus"
 	"github.com/ethereum/go-ethereum/core/rawdb"
@@ -33,7 +35,6 @@ import (
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/params"
-	lru "github.com/hashicorp/golang-lru"
 )
 
 const (
@@ -413,6 +414,10 @@ func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []co
 	return chain
 }
 
+func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header {
+	return nil
+}
+
 // GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or
 // a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the
 // number of blocks to be individually checked before we reach the canonical chain.

+ 6 - 1
light/lightchain.go

@@ -26,6 +26,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	lru "github.com/hashicorp/golang-lru"
+
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/consensus"
 	"github.com/ethereum/go-ethereum/core"
@@ -37,7 +39,6 @@ import (
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/params"
 	"github.com/ethereum/go-ethereum/rlp"
-	lru "github.com/hashicorp/golang-lru"
 )
 
 var (
@@ -148,6 +149,10 @@ func (lc *LightChain) HeaderChain() *core.HeaderChain {
 	return lc.hc
 }
 
+func (lc *LightChain) GetHighestVerifiedHeader() *types.Header {
+	return nil
+}
+
 // loadLastState loads the last known chain state from the database. This method
 // assumes that the chain manager mutex is held.
 func (lc *LightChain) loadLastState() error {