瀏覽代碼

eth, eth/fetcher: use an import queue to store out of order blocks

Péter Szilágyi 10 年之前
父節點
當前提交
057bc237ad
共有 3 個文件被更改,包括 91 次插入33 次删除
  1. 48 30
      eth/fetcher/fetcher.go
  2. 34 1
      eth/fetcher/fetcher_test.go
  3. 9 2
      eth/handler.go

+ 48 - 30
eth/fetcher/fetcher.go

@@ -10,11 +10,13 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/logger/glog"
+	"gopkg.in/karalabe/cookiejar.v2/collections/prque"
 )
 
 const (
 	arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 	fetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
+	maxQueueDist  = 256                    // Maximum allowed distance from the chain head to queue
 )
 
 var (
@@ -30,6 +32,9 @@ type blockRequesterFn func([]common.Hash) error
 // blockImporterFn is a callback type for trying to inject a block into the local chain.
 type blockImporterFn func(peer string, block *types.Block) error
 
+// chainHeightFn is a callback type to retrieve the current chain height.
+type chainHeightFn func() uint64
+
 // announce is the hash notification of the availability of a new block in the
 // network.
 type announce struct {
@@ -40,6 +45,12 @@ type announce struct {
 	fetch  blockRequesterFn // Fetcher function to retrieve
 }
 
+// inject represents a schedules import operation.
+type inject struct {
+	origin string
+	block  *types.Block
+}
+
 // Fetcher is responsible for accumulating block announcements from various peers
 // and scheduling them for retrieval.
 type Fetcher struct {
@@ -51,16 +62,18 @@ type Fetcher struct {
 	// Callbacks
 	hasBlock    hashCheckFn     // Checks if a block is present in the chain
 	importBlock blockImporterFn // Injects a block from an origin peer into the chain
+	chainHeight chainHeightFn   // Retrieves the current chain's height
 }
 
 // New creates a block fetcher to retrieve blocks based on hash announcements.
-func New(hasBlock hashCheckFn, importBlock blockImporterFn) *Fetcher {
+func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher {
 	return &Fetcher{
 		notify:      make(chan *announce),
 		filter:      make(chan chan []*types.Block),
 		quit:        make(chan struct{}),
 		hasBlock:    hasBlock,
 		importBlock: importBlock,
+		chainHeight: chainHeight,
 	}
 }
 
@@ -124,6 +137,7 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
 func (f *Fetcher) loop() {
 	announced := make(map[common.Hash][]*announce)
 	fetching := make(map[common.Hash]*announce)
+	queued := prque.New()
 	fetch := time.NewTimer(0)
 	done := make(chan common.Hash)
 
@@ -136,6 +150,30 @@ func (f *Fetcher) loop() {
 				delete(fetching, hash)
 			}
 		}
+		// Import any queued blocks that could potentially fit
+		height := f.chainHeight()
+		for !queued.Empty() {
+			// Fetch the next block, and skip if already known
+			op := queued.PopItem().(*inject)
+			if f.hasBlock(op.block.Hash()) {
+				continue
+			}
+			// If unknown, but too high up the chain, continue later
+			if number := op.block.NumberU64(); number > height+1 {
+				queued.Push(op, -float32(op.block.NumberU64()))
+				break
+			}
+			// Block may just fit, try to import it
+			glog.V(logger.Debug).Infof("Peer %s: importing block %x", op.origin, op.block.Hash().Bytes()[:4])
+			go func() {
+				defer func() { done <- op.block.Hash() }()
+
+				if err := f.importBlock(op.origin, op.block); err != nil {
+					glog.V(logger.Detail).Infof("Peer %s: block %x import failed: %v", op.origin, op.block.Hash().Bytes()[:4], err)
+					return
+				}
+			}()
+		}
 		// Wait for an outside event to occur
 		select {
 		case <-f.quit:
@@ -221,40 +259,20 @@ func (f *Fetcher) loop() {
 			case <-f.quit:
 				return
 			}
-			// Create a closure with the retrieved blocks and origin peers
-			peers := make([]string, 0, len(explicit))
-			blocks = make([]*types.Block, 0, len(explicit))
+			// Schedule the retrieved blocks for ordered import
+			height := f.chainHeight()
 			for _, block := range explicit {
+				// Skip any blocks too far into the future
+				if height+maxQueueDist < block.NumberU64() {
+					continue
+				}
+				// Otherwise if the announce is still pending, schedule
 				hash := block.Hash()
 				if announce := fetching[hash]; announce != nil {
-					// Drop the block if it surely cannot fit
-					if f.hasBlock(hash) || !f.hasBlock(block.ParentHash()) {
-						// delete(fetching, hash) // if we drop, it will re-fetch it, wait for timeout?
-						continue
-					}
-					// Otherwise accumulate for import
-					peers = append(peers, announce.origin)
-					blocks = append(blocks, block)
+					queued.Push(&inject{origin: announce.origin, block: block}, -float32(block.NumberU64()))
+					glog.V(logger.Detail).Infof("Peer %s: scheduled block %x, total %v", announce.origin, hash[:4], queued.Size())
 				}
 			}
-			// If any explicit fetches were replied to, import them
-			if count := len(blocks); count > 0 {
-				glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks))
-				go func() {
-					// Make sure all hashes are cleaned up
-					for _, block := range blocks {
-						hash := block.Hash()
-						defer func() { done <- hash }()
-					}
-					// Try and actually import the blocks
-					for i := 0; i < len(blocks); i++ {
-						if err := f.importBlock(peers[i], blocks[i]); err != nil {
-							glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
-							return
-						}
-					}
-				}()
-			}
 		}
 	}
 }

+ 34 - 1
eth/fetcher/fetcher_test.go

@@ -77,7 +77,7 @@ func newTester() *fetcherTester {
 		ownHashes: []common.Hash{knownHash},
 		ownBlocks: map[common.Hash]*types.Block{knownHash: genesis},
 	}
-	tester.fetcher = New(tester.hasBlock, tester.importBlock)
+	tester.fetcher = New(tester.hasBlock, tester.importBlock, tester.chainHeight)
 	tester.fetcher.Start()
 
 	return tester
@@ -99,6 +99,11 @@ func (f *fetcherTester) importBlock(peer string, block *types.Block) error {
 	return nil
 }
 
+// chainHeight retrieves the current height (block number) of the chain.
+func (f *fetcherTester) chainHeight() uint64 {
+	return f.ownBlocks[f.ownHashes[len(f.ownHashes)-1]].NumberU64()
+}
+
 // peerFetcher retrieves a fetcher associated with a simulated peer.
 func (f *fetcherTester) makeFetcher(blocks map[common.Hash]*types.Block) blockRequesterFn {
 	// Copy all the blocks to ensure they are not tampered with
@@ -238,3 +243,31 @@ func TestPendingDeduplication(t *testing.T) {
 		t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
 	}
 }
+
+// Tests that announcements retrieved in a random order are cached and eventually
+// imported when all the gaps are filled in.
+func TestRandomArrivalImport(t *testing.T) {
+	// Create a chain of blocks to import, and choose one to delay
+	targetBlocks := 24
+	hashes := createHashes(targetBlocks, knownHash)
+	blocks := createBlocksFromHashes(hashes)
+	skip := targetBlocks / 2
+
+	tester := newTester()
+	fetcher := tester.makeFetcher(blocks)
+
+	// Iteratively announce blocks, skipping one entry
+	for i := len(hashes) - 1; i >= 0; i-- {
+		if i != skip {
+			tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher)
+			time.Sleep(50 * time.Millisecond)
+		}
+	}
+	// Finally announce the skipped entry and check full import
+	tester.fetcher.Notify("valid", hashes[skip], time.Now().Add(-arriveTimeout), fetcher)
+	time.Sleep(50 * time.Millisecond)
+
+	if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
+		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
+	}
+}

+ 9 - 2
eth/handler.go

@@ -1,6 +1,7 @@
 package eth
 
 import (
+	"errors"
 	"fmt"
 	"math"
 	"math/big"
@@ -94,9 +95,15 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
 	manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
 
 	importer := func(peer string, block *types.Block) error {
-		return manager.importBlock(manager.peers.Peer(peer), block, nil)
+		if p := manager.peers.Peer(peer); p != nil {
+			return manager.importBlock(manager.peers.Peer(peer), block, nil)
+		}
+		return errors.New("unknown peer")
+	}
+	heighter := func() uint64 {
+		return manager.chainman.CurrentBlock().NumberU64()
 	}
-	manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer)
+	manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer, heighter)
 
 	return manager
 }