Explorar el Código

eth, eth/downloader: surface downloaded block origin, drop on error

Péter Szilágyi hace 10 años
padre
commit
eafdc1f8e3
Se han modificado 5 ficheros con 39 adiciones y 23 borrados
  1. 7 1
      eth/downloader/downloader.go
  2. 4 4
      eth/downloader/downloader_test.go
  3. 11 9
      eth/downloader/queue.go
  4. 6 6
      eth/handler.go
  5. 11 3
      eth/sync.go

+ 7 - 1
eth/downloader/downloader.go

@@ -93,6 +93,12 @@ type Downloader struct {
 	cancelLock sync.RWMutex  // Lock to protect the cancel channel in delivers
 }
 
+// Block is an origin-tagged blockchain block.
+type Block struct {
+	RawBlock   *types.Block
+	OriginPeer string
+}
+
 func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
 	downloader := &Downloader{
 		mux:       mux,
@@ -177,7 +183,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
 }
 
 // TakeBlocks takes blocks from the queue and yields them to the caller.
-func (d *Downloader) TakeBlocks() types.Blocks {
+func (d *Downloader) TakeBlocks() []*Block {
 	return d.queue.TakeBlocks()
 }
 

+ 4 - 4
eth/downloader/downloader_test.go

@@ -88,10 +88,10 @@ func (dl *downloadTester) sync(peerId string, head common.Hash) error {
 // syncTake is starts synchronising with a remote peer, but concurrently it also
 // starts fetching blocks that the downloader retrieved. IT blocks until both go
 // routines terminate.
-func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) {
+func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, error) {
 	// Start a block collector to take blocks as they become available
 	done := make(chan struct{})
-	took := []*types.Block{}
+	took := []*Block{}
 	go func() {
 		for running := true; running; {
 			select {
@@ -349,7 +349,7 @@ func TestNonExistingParentAttack(t *testing.T) {
 	if len(bs) != 1 {
 		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
 	}
-	if tester.hasBlock(bs[0].ParentHash()) {
+	if tester.hasBlock(bs[0].RawBlock.ParentHash()) {
 		t.Fatalf("tester knows about the unknown hash")
 	}
 	tester.downloader.Cancel()
@@ -364,7 +364,7 @@ func TestNonExistingParentAttack(t *testing.T) {
 	if len(bs) != 1 {
 		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
 	}
-	if !tester.hasBlock(bs[0].ParentHash()) {
+	if !tester.hasBlock(bs[0].RawBlock.ParentHash()) {
 		t.Fatalf("tester doesn't know about the origin hash")
 	}
 }

+ 11 - 9
eth/downloader/queue.go

@@ -36,7 +36,7 @@ type queue struct {
 	pendPool map[string]*fetchRequest // Currently pending block retrieval operations
 
 	blockPool   map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
-	blockCache  []*types.Block      // Downloaded but not yet delivered blocks
+	blockCache  []*Block            // Downloaded but not yet delivered blocks
 	blockOffset int                 // Offset of the first cached block in the block-chain
 
 	lock sync.RWMutex
@@ -148,7 +148,7 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash {
 
 // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
 // been downloaded yet (or simply non existent).
-func (q *queue) GetHeadBlock() *types.Block {
+func (q *queue) GetHeadBlock() *Block {
 	q.lock.RLock()
 	defer q.lock.RUnlock()
 
@@ -159,7 +159,7 @@ func (q *queue) GetHeadBlock() *types.Block {
 }
 
 // GetBlock retrieves a downloaded block, or nil if non-existent.
-func (q *queue) GetBlock(hash common.Hash) *types.Block {
+func (q *queue) GetBlock(hash common.Hash) *Block {
 	q.lock.RLock()
 	defer q.lock.RUnlock()
 
@@ -176,18 +176,18 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block {
 }
 
 // TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
-func (q *queue) TakeBlocks() types.Blocks {
+func (q *queue) TakeBlocks() []*Block {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
 	// Accumulate all available blocks
-	var blocks types.Blocks
+	blocks := []*Block{}
 	for _, block := range q.blockCache {
 		if block == nil {
 			break
 		}
 		blocks = append(blocks, block)
-		delete(q.blockPool, block.Hash())
+		delete(q.blockPool, block.RawBlock.Hash())
 	}
 	// Delete the blocks from the slice and let them be garbage collected
 	// without this slice trick the blocks would stay in memory until nil
@@ -312,8 +312,10 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
 			return ErrInvalidChain
 		}
 		// Otherwise merge the block and mark the hash block
-		q.blockCache[index] = block
-
+		q.blockCache[index] = &Block{
+			RawBlock:   block,
+			OriginPeer: id,
+		}
 		delete(request.Hashes, hash)
 		delete(q.hashPool, hash)
 		q.blockPool[hash] = int(block.NumberU64())
@@ -342,6 +344,6 @@ func (q *queue) Alloc(offset int) {
 		size = blockCacheLimit
 	}
 	if len(q.blockCache) < size {
-		q.blockCache = append(q.blockCache, make([]*types.Block, size-len(q.blockCache))...)
+		q.blockCache = append(q.blockCache, make([]*Block, size-len(q.blockCache))...)
 	}
 }

+ 6 - 6
eth/handler.go

@@ -92,13 +92,13 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
 	return manager
 }
 
-func (pm *ProtocolManager) removePeer(peer *peer) {
+func (pm *ProtocolManager) removePeer(id string) {
 	// Unregister the peer from the downloader
-	pm.downloader.UnregisterPeer(peer.id)
+	pm.downloader.UnregisterPeer(id)
 
 	// Remove the peer from the Ethereum peer set too
-	glog.V(logger.Detail).Infoln("Removing peer", peer.id)
-	if err := pm.peers.Unregister(peer.id); err != nil {
+	glog.V(logger.Detail).Infoln("Removing peer", id)
+	if err := pm.peers.Unregister(id); err != nil {
 		glog.V(logger.Error).Infoln("Removal failed:", err)
 	}
 }
@@ -148,7 +148,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
 		glog.V(logger.Error).Infoln("Addition failed:", err)
 		return err
 	}
-	defer pm.removePeer(p)
+	defer pm.removePeer(p.id)
 
 	if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil {
 		return err
@@ -315,7 +315,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 			if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
 				glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
 
-				self.removePeer(p)
+				self.removePeer(p.id)
 
 				return nil
 			}

+ 11 - 3
eth/sync.go

@@ -5,6 +5,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/eth/downloader"
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/logger/glog"
@@ -57,13 +58,20 @@ func (pm *ProtocolManager) processBlocks() error {
 	if len(blocks) == 0 {
 		return nil
 	}
-	glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
+	glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
 
 	for len(blocks) != 0 && !pm.quit {
+		// Retrieve the first batch of blocks to insert
 		max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
-		_, err := pm.chainman.InsertChain(blocks[:max])
+		raw := make(types.Blocks, 0, max)
+		for _, block := range blocks[:max] {
+			raw = append(raw, block.RawBlock)
+		}
+		// Try to inset the blocks, drop the originating peer if there's an error
+		index, err := pm.chainman.InsertChain(raw)
 		if err != nil {
 			glog.V(logger.Warn).Infof("Block insertion failed: %v", err)
+			pm.removePeer(blocks[index].OriginPeer)
 			pm.downloader.Cancel()
 			return err
 		}
@@ -105,7 +113,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
 
 	case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed:
 		glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err)
-		pm.removePeer(peer)
+		pm.removePeer(peer.id)
 
 	case downloader.ErrPendingQueue:
 		glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)