瀏覽代碼

fix: block fetcher efficiency (#333)

dylanhuang 4 年之前
父節點
當前提交
7e8d9fb51d
共有 1 個文件被更改,包括 36 次插入8 次删除
  1. 36 8
      eth/fetcher/block_fetcher.go

+ 36 - 8
eth/fetcher/block_fetcher.go

@@ -33,10 +33,12 @@ import (
 )
 
 const (
-	lightTimeout  = time.Millisecond       // Time allowance before an announced header is explicitly requested
-	arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
-	gatherSlack   = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
-	fetchTimeout  = 5 * time.Second        // Maximum allotted time to return an explicitly requested block/transaction
+	lightTimeout        = time.Millisecond       // Time allowance before an announced header is explicitly requested
+	arriveTimeout       = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
+	gatherSlack         = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
+	fetchTimeout        = 5 * time.Second        // Maximum allotted time to return an explicitly requested block/transaction
+	reQueueBlockTimeout = 500 * time.Millisecond // Time allowance before blocks are requeued for import
+
 )
 
 const (
@@ -167,6 +169,8 @@ type BlockFetcher struct {
 	done chan common.Hash
 	quit chan struct{}
 
+	requeue chan *blockOrHeaderInject
+
 	// Announce states
 	announces  map[string]int                   // Per peer blockAnnounce counts to prevent memory exhaustion
 	announced  map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
@@ -207,6 +211,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
 		bodyFilter:     make(chan chan *bodyFilterTask),
 		done:           make(chan common.Hash),
 		quit:           make(chan struct{}),
+		requeue:        make(chan *blockOrHeaderInject),
 		announces:      make(map[string]int),
 		announced:      make(map[common.Hash][]*blockAnnounce),
 		fetching:       make(map[common.Hash]*blockAnnounce),
@@ -371,9 +376,9 @@ func (f *BlockFetcher) loop() {
 				continue
 			}
 			if f.light {
-				f.importHeaders(op.origin, op.header)
+				f.importHeaders(op)
 			} else {
-				f.importBlocks(op.origin, op.block)
+				f.importBlocks(op)
 			}
 		}
 		// Wait for an outside event to occur
@@ -416,6 +421,21 @@ func (f *BlockFetcher) loop() {
 				f.rescheduleFetch(fetchTimer)
 			}
 
+		case op := <-f.requeue:
+			// Re-queue blocks that have not been written due to fork block competition
+			number := int64(0)
+			hash := ""
+			if op.header != nil {
+				number = op.header.Number.Int64()
+				hash = op.header.Hash().String()
+			} else if op.block != nil {
+				number = op.block.Number().Int64()
+				hash = op.block.Hash().String()
+			}
+
+			log.Info("Re-queue blocks", "number", number, "hash", hash)
+			f.enqueue(op.origin, op.header, op.block)
+
 		case op := <-f.inject:
 			// A direct block insertion was requested, try and fill any pending gaps
 			blockBroadcastInMeter.Mark(1)
@@ -751,7 +771,9 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
 // importHeaders spawns a new goroutine to run a header insertion into the chain.
 // If the header's number is at the same height as the current import phase, it
 // updates the phase states accordingly.
-func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
+func (f *BlockFetcher) importHeaders(op *blockOrHeaderInject) {
+	peer := op.origin
+	header := op.header
 	hash := header.Hash()
 	log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)
 
@@ -761,6 +783,8 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
 		parent := f.getHeader(header.ParentHash)
 		if parent == nil {
 			log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
+			time.Sleep(reQueueBlockTimeout)
+			f.requeue <- op
 			return
 		}
 		// Validate the header and if something went wrong, drop the peer
@@ -784,7 +808,9 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
 // importBlocks spawns a new goroutine to run a block insertion into the chain. If the
 // block's number is at the same height as the current import phase, it updates
 // the phase states accordingly.
-func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
+func (f *BlockFetcher) importBlocks(op *blockOrHeaderInject) {
+	peer := op.origin
+	block := op.block
 	hash := block.Hash()
 
 	// Run the import on a new thread
@@ -796,6 +822,8 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
 		parent := f.getBlock(block.ParentHash())
 		if parent == nil {
 			log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
+			time.Sleep(reQueueBlockTimeout)
+			f.requeue <- op
 			return
 		}
 		// Quickly validate the header and propagate the block if it passes