Explorar o código

eth, eth/downloader: better remote head tracking

Péter Szilágyi %!s(int64=9) %!d(string=hai) anos
pai
achega
1dd272080d
Modificáronse 6 ficheiros con 59 adicións e 50 borrados
  1. 4 3
      eth/downloader/downloader.go
  2. 14 3
      eth/downloader/downloader_test.go
  3. 9 4
      eth/downloader/peer.go
  4. 16 14
      eth/handler.go
  5. 11 23
      eth/peer.go
  6. 5 3
      eth/sync.go

+ 4 - 3
eth/downloader/downloader.go

@@ -236,12 +236,12 @@ func (d *Downloader) Synchronising() bool {
 
 // RegisterPeer injects a new download peer into the set of block source to be
 // used for fetching hashes and blocks from.
-func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
+func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
 	getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
 	getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
 
 	glog.V(logger.Detail).Infoln("Registering peer", id)
-	if err := d.peers.Register(newPeer(id, version, head, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
+	if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
 		glog.V(logger.Error).Infoln("Register failed:", err)
 		return err
 	}
@@ -501,7 +501,8 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
 	glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
 
 	// Request the advertised remote head block and wait for the response
-	go p.getRelHeaders(p.head, 1, 0, false)
+	head, _ := p.currentHead()
+	go p.getRelHeaders(head, 1, 0, false)
 
 	timeout := time.After(d.requestTTL())
 	for {

+ 14 - 3
eth/downloader/downloader_test.go

@@ -400,11 +400,11 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
 	var err error
 	switch version {
 	case 62:
-		err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
+		err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
 	case 63:
-		err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
+		err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
 	case 64:
-		err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
+		err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
 	}
 	if err == nil {
 		// Assign the owned hashes, headers and blocks to the peer (deep copy)
@@ -463,6 +463,17 @@ func (dl *downloadTester) dropPeer(id string) {
 	dl.downloader.UnregisterPeer(id)
 }
 
+// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash
+// and total difficulty.
+func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) {
+	return func() (common.Hash, *big.Int) {
+		dl.lock.RLock()
+		defer dl.lock.RUnlock()
+
+		return dl.peerHashes[id][0], nil
+	}
+}
+
 // peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed
 // origin; associated with a particular peer in the download tester. The returned
 // function can be used to retrieve batches of headers from the particular peer.

+ 9 - 4
eth/downloader/peer.go

@@ -23,6 +23,7 @@ import (
 	"errors"
 	"fmt"
 	"math"
+	"math/big"
 	"sort"
 	"strings"
 	"sync"
@@ -37,6 +38,9 @@ const (
 	measurementImpact = 0.1  // The impact a single measurement has on a peer's final throughput value.
 )
 
+// Head hash and total difficulty retriever for
+type currentHeadRetrievalFn func() (common.Hash, *big.Int)
+
 // Block header and body fetchers belonging to eth/62 and above
 type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
 type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
@@ -52,8 +56,7 @@ var (
 
 // peer represents an active peer from which hashes and blocks are retrieved.
 type peer struct {
-	id   string      // Unique identifier of the peer
-	head common.Hash // Hash of the peers latest known block
+	id string // Unique identifier of the peer
 
 	headerIdle  int32 // Current header activity state of the peer (idle = 0, active = 1)
 	blockIdle   int32 // Current block activity state of the peer (idle = 0, active = 1)
@@ -74,6 +77,8 @@ type peer struct {
 
 	lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
 
+	currentHead currentHeadRetrievalFn // Method to fetch the currently known head of the peer
+
 	getRelHeaders  relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
 	getAbsHeaders  absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
 	getBlockBodies blockBodyFetcherFn      // [eth/62] Method to retrieve a batch of block bodies
@@ -87,14 +92,14 @@ type peer struct {
 
 // newPeer create a new downloader peer, with specific hash and block retrieval
 // mechanisms.
-func newPeer(id string, version int, head common.Hash,
+func newPeer(id string, version int, currentHead currentHeadRetrievalFn,
 	getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
 	getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
 	return &peer{
 		id:      id,
-		head:    head,
 		lacking: make(map[common.Hash]struct{}),
 
+		currentHead:    currentHead,
 		getRelHeaders:  getRelHeaders,
 		getAbsHeaders:  getAbsHeaders,
 		getBlockBodies: getBlockBodies,

+ 16 - 14
eth/handler.go

@@ -272,11 +272,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
 	defer pm.removePeer(p.id)
 
 	// Register the peer in the downloader. If the downloader considers it banned, we disconnect
-	err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
-		p.RequestHeadersByHash, p.RequestHeadersByNumber,
-		p.RequestBodies, p.RequestReceipts, p.RequestNodeData,
-	)
-	if err != nil {
+	if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
 		return err
 	}
 	// Propagate existing transactions. new transactions appearing
@@ -413,7 +409,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			// If we already have a DAO header, we can check the peer's TD against it. If
 			// the peer's ahead of this, it too must have a reply to the DAO check
 			if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
-				if p.Td().Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
+				if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
 					verifyDAO = false
 				}
 			}
@@ -619,7 +615,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		// Mark the hashes as present at the remote node
 		for _, block := range announces {
 			p.MarkBlock(block.Hash)
-			p.SetHead(block.Hash)
 		}
 		// Schedule all the unknown hashes for retrieval
 		unknown := make([]announce, 0, len(announces))
@@ -646,16 +641,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 
 		// Mark the peer as owning the block and schedule it for import
 		p.MarkBlock(request.Block.Hash())
-		p.SetHead(request.Block.Hash())
-
 		pm.fetcher.Enqueue(p.id, request.Block)
 
-		// Update the peers total difficulty if needed, schedule a download if gapped
-		if request.TD.Cmp(p.Td()) > 0 {
-			p.SetTd(request.TD)
+		// Assuming the block is importable by the peer, but possibly not yet done so,
+		// calculate the head hash and TD that the peer truly must have.
+		var (
+			trueHead = request.Block.ParentHash()
+			trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
+		)
+		// Update the peers total difficulty if better than the previous
+		if _, td := p.Head(); trueTD.Cmp(td) > 0 {
+			p.SetHead(trueHead, trueTD)
+
+			// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
+			// a singe block (as the true TD is below the propagated block), however this
+			// scenario should easily be covered by the fetcher.
 			currentBlock := pm.blockchain.CurrentBlock()
-			td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
-			if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 {
+			if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
 				go pm.synchronise(p)
 			}
 		}

+ 11 - 23
eth/peer.go

@@ -84,43 +84,31 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
 
 // Info gathers and returns a collection of metadata known about a peer.
 func (p *peer) Info() *PeerInfo {
+	hash, td := p.Head()
+
 	return &PeerInfo{
 		Version:    p.version,
-		Difficulty: p.Td(),
-		Head:       fmt.Sprintf("%x", p.Head()),
+		Difficulty: td,
+		Head:       hash.Hex(),
 	}
 }
 
-// Head retrieves a copy of the current head (most recent) hash of the peer.
-func (p *peer) Head() (hash common.Hash) {
+// Head retrieves a copy of the current head hash and total difficulty of the
+// peer.
+func (p *peer) Head() (hash common.Hash, td *big.Int) {
 	p.lock.RLock()
 	defer p.lock.RUnlock()
 
 	copy(hash[:], p.head[:])
-	return hash
+	return hash, new(big.Int).Set(p.td)
 }
 
-// SetHead updates the head (most recent) hash of the peer.
-func (p *peer) SetHead(hash common.Hash) {
+// SetHead updates the head hash and total difficulty of the peer.
+func (p *peer) SetHead(hash common.Hash, td *big.Int) {
 	p.lock.Lock()
 	defer p.lock.Unlock()
 
 	copy(p.head[:], hash[:])
-}
-
-// Td retrieves the current total difficulty of a peer.
-func (p *peer) Td() *big.Int {
-	p.lock.RLock()
-	defer p.lock.RUnlock()
-
-	return new(big.Int).Set(p.td)
-}
-
-// SetTd updates the current total difficulty of a peer.
-func (p *peer) SetTd(td *big.Int) {
-	p.lock.Lock()
-	defer p.lock.Unlock()
-
 	p.td.Set(td)
 }
 
@@ -411,7 +399,7 @@ func (ps *peerSet) BestPeer() *peer {
 		bestTd   *big.Int
 	)
 	for _, p := range ps.peers {
-		if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 {
+		if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
 			bestPeer, bestTd = p, td
 		}
 	}

+ 5 - 3
eth/sync.go

@@ -161,10 +161,12 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
 	if peer == nil {
 		return
 	}
-	// Make sure the peer's TD is higher than our own. If not drop.
+	// Make sure the peer's TD is higher than our own
 	currentBlock := pm.blockchain.CurrentBlock()
 	td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
-	if peer.Td().Cmp(td) <= 0 {
+
+	pHead, pTd := peer.Head()
+	if pTd.Cmp(td) <= 0 {
 		return
 	}
 	// Otherwise try to sync with the downloader
@@ -172,7 +174,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
 	if atomic.LoadUint32(&pm.fastSync) == 1 {
 		mode = downloader.FastSync
 	}
-	if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil {
+	if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
 		return
 	}
 	atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done