Browse Source

eth, eth/downloader: move block processing into the downlaoder

Péter Szilágyi 10 years ago
parent
commit
fc7abd9886
4 changed files with 253 additions and 231 deletions
  1. 119 58
      eth/downloader/downloader.go
  2. 130 122
      eth/downloader/downloader_test.go
  3. 1 1
      eth/handler.go
  4. 3 50
      eth/sync.go

+ 119 - 58
eth/downloader/downloader.go

@@ -3,6 +3,7 @@ package downloader
 import (
 import (
 	"bytes"
 	"bytes"
 	"errors"
 	"errors"
+	"math"
 	"math/rand"
 	"math/rand"
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
@@ -28,25 +29,27 @@ var (
 	crossCheckCycle = time.Second      // Period after which to check for expired cross checks
 	crossCheckCycle = time.Second      // Period after which to check for expired cross checks
 
 
 	maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
 	maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
+	maxBlockProcess = 256  // Number of blocks to import at once into the chain
 )
 )
 
 
 var (
 var (
-	errBusy             = errors.New("busy")
-	errUnknownPeer      = errors.New("peer is unknown or unhealthy")
-	errBadPeer          = errors.New("action from bad peer ignored")
-	errStallingPeer     = errors.New("peer is stalling")
-	errBannedHead       = errors.New("peer head hash already banned")
-	errNoPeers          = errors.New("no peers to keep download active")
-	errPendingQueue     = errors.New("pending items in queue")
-	errTimeout          = errors.New("timeout")
-	errEmptyHashSet     = errors.New("empty hash set by peer")
-	errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
-	errAlreadyInPool    = errors.New("hash already in pool")
-	errInvalidChain     = errors.New("retrieved hash chain is invalid")
-	errCrossCheckFailed = errors.New("block cross-check failed")
-	errCancelHashFetch  = errors.New("hash fetching cancelled (requested)")
-	errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
-	errNoSyncActive     = errors.New("no sync active")
+	errBusy              = errors.New("busy")
+	errUnknownPeer       = errors.New("peer is unknown or unhealthy")
+	errBadPeer           = errors.New("action from bad peer ignored")
+	errStallingPeer      = errors.New("peer is stalling")
+	errBannedHead        = errors.New("peer head hash already banned")
+	errNoPeers           = errors.New("no peers to keep download active")
+	errPendingQueue      = errors.New("pending items in queue")
+	errTimeout           = errors.New("timeout")
+	errEmptyHashSet      = errors.New("empty hash set by peer")
+	errPeersUnavailable  = errors.New("no peers available or all peers tried for block download process")
+	errAlreadyInPool     = errors.New("hash already in pool")
+	errInvalidChain      = errors.New("retrieved hash chain is invalid")
+	errCrossCheckFailed  = errors.New("block cross-check failed")
+	errCancelHashFetch   = errors.New("hash fetching canceled (requested)")
+	errCancelBlockFetch  = errors.New("block downloading canceled (requested)")
+	errCancelChainImport = errors.New("chain importing canceled (requested)")
+	errNoSyncActive      = errors.New("no sync active")
 )
 )
 
 
 // hashCheckFn is a callback type for verifying a hash's presence in the local chain.
 // hashCheckFn is a callback type for verifying a hash's presence in the local chain.
@@ -55,6 +58,9 @@ type hashCheckFn func(common.Hash) bool
 // blockRetrievalFn is a callback type for retrieving a block from the local chain.
 // blockRetrievalFn is a callback type for retrieving a block from the local chain.
 type blockRetrievalFn func(common.Hash) *types.Block
 type blockRetrievalFn func(common.Hash) *types.Block
 
 
+// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type chainInsertFn func(types.Blocks) (int, error)
+
 // peerDropFn is a callback type for dropping a peer detected as malicious.
 // peerDropFn is a callback type for dropping a peer detected as malicious.
 type peerDropFn func(id string)
 type peerDropFn func(id string)
 
 
@@ -88,13 +94,15 @@ type Downloader struct {
 	importLock  sync.Mutex
 	importLock  sync.Mutex
 
 
 	// Callbacks
 	// Callbacks
-	hasBlock hashCheckFn      // Checks if a block is present in the chain
-	getBlock blockRetrievalFn // Retrieves a block from the chain
-	dropPeer peerDropFn       // Retrieved the TD of our own chain
+	hasBlock    hashCheckFn      // Checks if a block is present in the chain
+	getBlock    blockRetrievalFn // Retrieves a block from the chain
+	insertChain chainInsertFn    // Injects a batch of blocks into the chain
+	dropPeer    peerDropFn       // Retrieved the TD of our own chain
 
 
 	// Status
 	// Status
 	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
 	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
 	synchronising   int32
 	synchronising   int32
+	processing      int32
 	notified        int32
 	notified        int32
 
 
 	// Channels
 	// Channels
@@ -113,18 +121,19 @@ type Block struct {
 }
 }
 
 
 // New creates a new downloader to fetch hashes and blocks from remote peers.
 // New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader {
+func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
 	// Create the base downloader
 	// Create the base downloader
 	downloader := &Downloader{
 	downloader := &Downloader{
-		mux:       mux,
-		queue:     newQueue(),
-		peers:     newPeerSet(),
-		hasBlock:  hasBlock,
-		getBlock:  getBlock,
-		dropPeer:  dropPeer,
-		newPeerCh: make(chan *peer, 1),
-		hashCh:    make(chan hashPack, 1),
-		blockCh:   make(chan blockPack, 1),
+		mux:         mux,
+		queue:       newQueue(),
+		peers:       newPeerSet(),
+		hasBlock:    hasBlock,
+		getBlock:    getBlock,
+		insertChain: insertChain,
+		dropPeer:    dropPeer,
+		newPeerCh:   make(chan *peer, 1),
+		hashCh:      make(chan hashPack, 1),
+		blockCh:     make(chan blockPack, 1),
 	}
 	}
 	// Inject all the known bad hashes
 	// Inject all the known bad hashes
 	downloader.banned = set.New()
 	downloader.banned = set.New()
@@ -157,7 +166,7 @@ func (d *Downloader) Stats() (pending int, cached int, importing int, estimate t
 	return
 	return
 }
 }
 
 
-// Synchronising returns the state of the downloader
+// Synchronising returns whether the downloader is currently retrieving blocks.
 func (d *Downloader) Synchronising() bool {
 func (d *Downloader) Synchronising() bool {
 	return atomic.LoadInt32(&d.synchronising) > 0
 	return atomic.LoadInt32(&d.synchronising) > 0
 }
 }
@@ -260,19 +269,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
 	return d.syncWithPeer(p, hash)
 	return d.syncWithPeer(p, hash)
 }
 }
 
 
-// TakeBlocks takes blocks from the queue and yields them to the caller.
-func (d *Downloader) TakeBlocks() []*Block {
-	blocks := d.queue.TakeBlocks()
-	if len(blocks) > 0 {
-		d.importLock.Lock()
-		d.importStart = time.Now()
-		d.importQueue = blocks
-		d.importDone = 0
-		d.importLock.Unlock()
-	}
-	return blocks
-}
-
 // Has checks if the downloader knows about a particular hash, meaning that its
 // Has checks if the downloader knows about a particular hash, meaning that its
 // either already downloaded of pending retrieval.
 // either already downloaded of pending retrieval.
 func (d *Downloader) Has(hash common.Hash) bool {
 func (d *Downloader) Has(hash common.Hash) bool {
@@ -307,19 +303,16 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
 
 
 // Cancel cancels all of the operations and resets the queue. It returns true
 // Cancel cancels all of the operations and resets the queue. It returns true
 // if the cancel operation was completed.
 // if the cancel operation was completed.
-func (d *Downloader) Cancel() bool {
-	// If we're not syncing just return.
-	hs, bs := d.queue.Size()
-	if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
-		return false
-	}
+func (d *Downloader) Cancel() {
 	// Close the current cancel channel
 	// Close the current cancel channel
 	d.cancelLock.Lock()
 	d.cancelLock.Lock()
-	select {
-	case <-d.cancelCh:
-		// Channel was already closed
-	default:
-		close(d.cancelCh)
+	if d.cancelCh != nil {
+		select {
+		case <-d.cancelCh:
+			// Channel was already closed
+		default:
+			close(d.cancelCh)
+		}
 	}
 	}
 	d.cancelLock.Unlock()
 	d.cancelLock.Unlock()
 
 
@@ -330,11 +323,11 @@ func (d *Downloader) Cancel() bool {
 	d.importQueue = nil
 	d.importQueue = nil
 	d.importDone = 0
 	d.importDone = 0
 	d.importLock.Unlock()
 	d.importLock.Unlock()
-
-	return true
 }
 }
 
 
-// XXX Make synchronous
+// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
+// up until it finds a common ancestor. If the source peer times out, alternative
+// ones are tried for continuation.
 func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
 func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
 	var (
 	var (
 		start  = time.Now()
 		start  = time.Now()
@@ -530,10 +523,13 @@ out:
 						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
 						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
 						break
 						break
 					}
 					}
-					// All was successful, promote the peer
+					// All was successful, promote the peer and potentially start processing
 					peer.Promote()
 					peer.Promote()
 					peer.SetIdle()
 					peer.SetIdle()
 					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
 					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+					if atomic.LoadInt32(&d.processing) == 0 {
+						go d.process()
+					}
 
 
 				case errInvalidChain:
 				case errInvalidChain:
 					// The hash chain is invalid (blocks are not ordered properly), abort
 					// The hash chain is invalid (blocks are not ordered properly), abort
@@ -709,6 +705,71 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
 	}
 	}
 }
 }
 
 
+// process takes blocks from the queue and tries to import them into the chain.
+func (d *Downloader) process() (err error) {
+	// Make sure only one goroutine is ever allowed to process blocks at once
+	if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
+		return
+	}
+	// If the processor just exited, but there are freshly pending items, try to
+	// reenter. This is needed because the goroutine spinned up for processing
+	// the fresh blocks might have been rejected entry to to this present thread
+	// not yet releasing the `processing` state.
+	defer func() {
+		if err == nil && d.queue.GetHeadBlock() != nil {
+			err = d.process()
+		}
+	}()
+	// Release the lock upon exit (note, before checking for reentry!)
+	defer atomic.StoreInt32(&d.processing, 0)
+
+	// Fetch the current cancel channel to allow termination
+	d.cancelLock.RLock()
+	cancel := d.cancelCh
+	d.cancelLock.RUnlock()
+
+	// Repeat the processing as long as there are blocks to import
+	for {
+		// Fetch the next batch of blocks
+		blocks := d.queue.TakeBlocks()
+		if len(blocks) == 0 {
+			return nil
+		}
+		// Reset the import statistics
+		d.importLock.Lock()
+		d.importStart = time.Now()
+		d.importQueue = blocks
+		d.importDone = 0
+		d.importLock.Unlock()
+
+		// Actually import the blocks
+		glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
+		for len(blocks) != 0 { // TODO: quit
+			// Check for any termination requests
+			select {
+			case <-cancel:
+				return errCancelChainImport
+			default:
+			}
+			// Retrieve the first batch of blocks to insert
+			max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
+			raw := make(types.Blocks, 0, max)
+			for _, block := range blocks[:max] {
+				raw = append(raw, block.RawBlock)
+			}
+			// Try to inset the blocks, drop the originating peer if there's an error
+			index, err := d.insertChain(raw)
+			if err != nil {
+				glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err)
+				d.dropPeer(blocks[index].OriginPeer)
+				d.Cancel()
+				return errCancelChainImport
+			}
+			blocks = blocks[max:]
+		}
+	}
+}
+
 // DeliverBlocks injects a new batch of blocks received from a remote node.
 // DeliverBlocks injects a new batch of blocks received from a remote node.
 // This is usually invoked through the BlocksMsg by the protocol handler.
 // This is usually invoked through the BlocksMsg by the protocol handler.
 func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
 func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {

+ 130 - 122
eth/downloader/downloader_test.go

@@ -2,8 +2,10 @@ package downloader
 
 
 import (
 import (
 	"encoding/binary"
 	"encoding/binary"
+	"errors"
 	"fmt"
 	"fmt"
 	"math/big"
 	"math/big"
+	"sync/atomic"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -81,7 +83,7 @@ func newTester() *downloadTester {
 		peerBlocks: make(map[string]map[common.Hash]*types.Block),
 		peerBlocks: make(map[string]map[common.Hash]*types.Block),
 	}
 	}
 	var mux event.TypeMux
 	var mux event.TypeMux
-	downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.dropPeer)
+	downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer)
 	tester.downloader = downloader
 	tester.downloader = downloader
 
 
 	return tester
 	return tester
@@ -89,44 +91,14 @@ func newTester() *downloadTester {
 
 
 // sync starts synchronizing with a remote peer, blocking until it completes.
 // sync starts synchronizing with a remote peer, blocking until it completes.
 func (dl *downloadTester) sync(id string) error {
 func (dl *downloadTester) sync(id string) error {
-	return dl.downloader.synchronise(id, dl.peerHashes[id][0])
-}
-
-// syncTake is starts synchronising with a remote peer, but concurrently it also
-// starts fetching blocks that the downloader retrieved. IT blocks until both go
-// routines terminate.
-func (dl *downloadTester) syncTake(id string) ([]*Block, error) {
-	// Start a block collector to take blocks as they become available
-	done := make(chan struct{})
-	took := []*Block{}
-	go func() {
-		for running := true; running; {
-			select {
-			case <-done:
-				running = false
-			default:
-				time.Sleep(time.Millisecond)
-			}
-			// Take a batch of blocks and accumulate
-			blocks := dl.downloader.TakeBlocks()
-			for _, block := range blocks {
-				dl.ownHashes = append(dl.ownHashes, block.RawBlock.Hash())
-				dl.ownBlocks[block.RawBlock.Hash()] = block.RawBlock
-			}
-			took = append(took, blocks...)
-		}
-		done <- struct{}{}
-	}()
-	// Start the downloading, sync the taker and return
-	err := dl.sync(id)
-
-	done <- struct{}{}
-	<-done
-
-	return took, err
+	err := dl.downloader.synchronise(id, dl.peerHashes[id][0])
+	for atomic.LoadInt32(&dl.downloader.processing) == 1 {
+		time.Sleep(time.Millisecond)
+	}
+	return err
 }
 }
 
 
-// hasBlock checks if a block is present in the testers canonical chain.
+// hasBlock checks if a block is pres	ent in the testers canonical chain.
 func (dl *downloadTester) hasBlock(hash common.Hash) bool {
 func (dl *downloadTester) hasBlock(hash common.Hash) bool {
 	return dl.getBlock(hash) != nil
 	return dl.getBlock(hash) != nil
 }
 }
@@ -136,6 +108,18 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
 	return dl.ownBlocks[hash]
 	return dl.ownBlocks[hash]
 }
 }
 
 
+// insertChain injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
+	for i, block := range blocks {
+		if _, ok := dl.ownBlocks[block.ParentHash()]; !ok {
+			return i, errors.New("unknown parent")
+		}
+		dl.ownHashes = append(dl.ownHashes, block.Hash())
+		dl.ownBlocks[block.Hash()] = block
+	}
+	return len(blocks), nil
+}
+
 // newPeer registers a new block download source into the downloader.
 // newPeer registers a new block download source into the downloader.
 func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
 func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
 	err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id))
 	err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id))
@@ -223,27 +207,8 @@ func TestSynchronisation(t *testing.T) {
 	if err := tester.sync("peer"); err != nil {
 	if err := tester.sync("peer"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
-	if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks {
-		t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks)
-	}
-}
-
-// Tests that the synchronized blocks can be correctly retrieved.
-func TestBlockTaking(t *testing.T) {
-	// Create a small enough block chain to download and the tester
-	targetBlocks := blockCacheLimit - 15
-	hashes := createHashes(targetBlocks, knownHash)
-	blocks := createBlocksFromHashes(hashes)
-
-	tester := newTester()
-	tester.newPeer("peer", hashes, blocks)
-
-	// Synchronise with the peer and test block retrieval
-	if err := tester.sync("peer"); err != nil {
-		t.Fatalf("failed to synchronise blocks: %v", err)
-	}
-	if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks {
-		t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks)
+	if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
+		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
 	}
 	}
 }
 }
 
 
@@ -270,21 +235,21 @@ func TestCancel(t *testing.T) {
 	tester := newTester()
 	tester := newTester()
 	tester.newPeer("peer", hashes, blocks)
 	tester.newPeer("peer", hashes, blocks)
 
 
+	// Make sure canceling works with a pristine downloader
+	tester.downloader.Cancel()
+	hashCount, blockCount := tester.downloader.queue.Size()
+	if hashCount > 0 || blockCount > 0 {
+		t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
+	}
 	// Synchronise with the peer, but cancel afterwards
 	// Synchronise with the peer, but cancel afterwards
 	if err := tester.sync("peer"); err != nil {
 	if err := tester.sync("peer"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
-	if !tester.downloader.Cancel() {
-		t.Fatalf("cancel operation failed")
-	}
-	// Make sure the queue reports empty and no blocks can be taken
-	hashCount, blockCount := tester.downloader.queue.Size()
+	tester.downloader.Cancel()
+	hashCount, blockCount = tester.downloader.queue.Size()
 	if hashCount > 0 || blockCount > 0 {
 	if hashCount > 0 || blockCount > 0 {
 		t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
 		t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
 	}
 	}
-	if took := tester.downloader.TakeBlocks(); len(took) != 0 {
-		t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0)
-	}
 }
 }
 
 
 // Tests that if a large batch of blocks are being downloaded, it is throttled
 // Tests that if a large batch of blocks are being downloaded, it is throttled
@@ -298,29 +263,46 @@ func TestThrottling(t *testing.T) {
 	tester := newTester()
 	tester := newTester()
 	tester.newPeer("peer", hashes, blocks)
 	tester.newPeer("peer", hashes, blocks)
 
 
+	// Wrap the importer to allow stepping
+	done := make(chan int)
+	tester.downloader.insertChain = func(blocks types.Blocks) (int, error) {
+		n, err := tester.insertChain(blocks)
+		done <- n
+		return n, err
+	}
 	// Start a synchronisation concurrently
 	// Start a synchronisation concurrently
 	errc := make(chan error)
 	errc := make(chan error)
 	go func() {
 	go func() {
 		errc <- tester.sync("peer")
 		errc <- tester.sync("peer")
 	}()
 	}()
 	// Iteratively take some blocks, always checking the retrieval count
 	// Iteratively take some blocks, always checking the retrieval count
-	for total := 0; total < targetBlocks; {
-		// Wait a bit for sync to complete
+	for len(tester.ownBlocks) < targetBlocks+1 {
+		// Wait a bit for sync to throttle itself
+		var cached int
 		for start := time.Now(); time.Since(start) < 3*time.Second; {
 		for start := time.Now(); time.Since(start) < 3*time.Second; {
 			time.Sleep(25 * time.Millisecond)
 			time.Sleep(25 * time.Millisecond)
-			if len(tester.downloader.queue.blockPool) == blockCacheLimit {
+
+			cached = len(tester.downloader.queue.blockPool)
+			if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 {
 				break
 				break
 			}
 			}
 		}
 		}
-		// Fetch the next batch of blocks
-		took := tester.downloader.TakeBlocks()
-		if len(took) != blockCacheLimit {
-			t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit)
+		// Make sure we filled up the cache, then exhaust it
+		time.Sleep(25 * time.Millisecond) // give it a chance to screw up
+		if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 {
+			t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit)
 		}
 		}
-		total += len(took)
-		if total > targetBlocks {
-			t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks)
+		<-done // finish previous blocking import
+		for cached > maxBlockProcess {
+			cached -= <-done
 		}
 		}
+		time.Sleep(25 * time.Millisecond) // yield to the insertion
+	}
+	<-done // finish the last blocking import
+
+	// Check that we haven't pulled more blocks than available
+	if len(tester.ownBlocks) > targetBlocks+1 {
+		t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1)
 	}
 	}
 	if err := <-errc; err != nil {
 	if err := <-errc; err != nil {
 		t.Fatalf("block synchronization failed: %v", err)
 		t.Fatalf("block synchronization failed: %v", err)
@@ -343,28 +325,18 @@ func TestNonExistingParentAttack(t *testing.T) {
 	tester.newPeer("attack", hashes, blocks)
 	tester.newPeer("attack", hashes, blocks)
 
 
 	// Try and sync with the malicious node and check that it fails
 	// Try and sync with the malicious node and check that it fails
-	if err := tester.sync("attack"); err != nil {
-		t.Fatalf("failed to synchronise blocks: %v", err)
+	if err := tester.sync("attack"); err == nil {
+		t.Fatalf("block synchronization succeeded")
 	}
 	}
-	bs := tester.downloader.TakeBlocks()
-	if len(bs) != 1 {
-		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
+	if tester.hasBlock(hashes[0]) {
+		t.Fatalf("tester accepted unknown-parent block: %v", blocks[hashes[0]])
 	}
 	}
-	if tester.hasBlock(bs[0].RawBlock.ParentHash()) {
-		t.Fatalf("tester knows about the unknown hash")
-	}
-	tester.downloader.Cancel()
-
 	// Try to synchronize with the valid chain and make sure it succeeds
 	// Try to synchronize with the valid chain and make sure it succeeds
 	if err := tester.sync("valid"); err != nil {
 	if err := tester.sync("valid"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
-	bs = tester.downloader.TakeBlocks()
-	if len(bs) != 1 {
-		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
-	}
-	if !tester.hasBlock(bs[0].RawBlock.ParentHash()) {
-		t.Fatalf("tester doesn't know about the origin hash")
+	if !tester.hasBlock(tester.peerHashes["valid"][0]) {
+		t.Fatalf("tester didn't accept known-parent block: %v", tester.peerBlocks["valid"][hashes[0]])
 	}
 	}
 }
 }
 
 
@@ -442,11 +414,11 @@ func TestInvalidHashOrderAttack(t *testing.T) {
 	tester.newPeer("attack", hashes, blocks)
 	tester.newPeer("attack", hashes, blocks)
 
 
 	// Try and sync with the malicious node and check that it fails
 	// Try and sync with the malicious node and check that it fails
-	if _, err := tester.syncTake("attack"); err != errInvalidChain {
+	if err := tester.sync("attack"); err != errInvalidChain {
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
 	}
 	}
 	// Ensure that a valid chain can still pass sync
 	// Ensure that a valid chain can still pass sync
-	if _, err := tester.syncTake("valid"); err != nil {
+	if err := tester.sync("valid"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
 }
 }
@@ -466,11 +438,11 @@ func TestMadeupHashChainAttack(t *testing.T) {
 	tester.newPeer("attack", createHashes(1024*blockCacheLimit, knownHash), nil)
 	tester.newPeer("attack", createHashes(1024*blockCacheLimit, knownHash), nil)
 
 
 	// Try and sync with the malicious node and check that it fails
 	// Try and sync with the malicious node and check that it fails
-	if _, err := tester.syncTake("attack"); err != errCrossCheckFailed {
+	if err := tester.sync("attack"); err != errCrossCheckFailed {
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
 	}
 	}
 	// Ensure that a valid chain can still pass sync
 	// Ensure that a valid chain can still pass sync
-	if _, err := tester.syncTake("valid"); err != nil {
+	if err := tester.sync("valid"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
 }
 }
@@ -487,7 +459,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) {
 	// Try and sync with the attacker, one hash at a time
 	// Try and sync with the attacker, one hash at a time
 	tester.maxHashFetch = 1
 	tester.maxHashFetch = 1
 	tester.newPeer("attack", hashes, nil)
 	tester.newPeer("attack", hashes, nil)
-	if _, err := tester.syncTake("attack"); err != errStallingPeer {
+	if err := tester.sync("attack"); err != errStallingPeer {
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
 	}
 	}
 }
 }
@@ -512,7 +484,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
 	// Try and sync with the malicious node and check that it fails
 	// Try and sync with the malicious node and check that it fails
 	tester := newTester()
 	tester := newTester()
 	tester.newPeer("attack", gapped, blocks)
 	tester.newPeer("attack", gapped, blocks)
-	if _, err := tester.syncTake("attack"); err != errCrossCheckFailed {
+	if err := tester.sync("attack"); err != errCrossCheckFailed {
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
 	}
 	}
 	// Ensure that a valid chain can still pass sync
 	// Ensure that a valid chain can still pass sync
@@ -520,7 +492,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
 	crossCheckCycle = defaultCrossCheckCycle
 	crossCheckCycle = defaultCrossCheckCycle
 
 
 	tester.newPeer("valid", hashes, blocks)
 	tester.newPeer("valid", hashes, blocks)
-	if _, err := tester.syncTake("valid"); err != nil {
+	if err := tester.sync("valid"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
 }
 }
@@ -548,14 +520,14 @@ func TestMadeupParentBlockChainAttack(t *testing.T) {
 	tester.newPeer("attack", hashes, blocks)
 	tester.newPeer("attack", hashes, blocks)
 
 
 	// Try and sync with the malicious node and check that it fails
 	// Try and sync with the malicious node and check that it fails
-	if _, err := tester.syncTake("attack"); err != errCrossCheckFailed {
+	if err := tester.sync("attack"); err != errCrossCheckFailed {
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
 	}
 	}
 	// Ensure that a valid chain can still pass sync
 	// Ensure that a valid chain can still pass sync
 	blockSoftTTL = defaultBlockTTL
 	blockSoftTTL = defaultBlockTTL
 	crossCheckCycle = defaultCrossCheckCycle
 	crossCheckCycle = defaultCrossCheckCycle
 
 
-	if _, err := tester.syncTake("valid"); err != nil {
+	if err := tester.sync("valid"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
 }
 }
@@ -582,7 +554,7 @@ func TestBannedChainStarvationAttack(t *testing.T) {
 	// the head of the invalid chain is blocked too.
 	// the head of the invalid chain is blocked too.
 	for banned := tester.downloader.banned.Size(); ; {
 	for banned := tester.downloader.banned.Size(); ; {
 		// Try to sync with the attacker, check hash chain failure
 		// Try to sync with the attacker, check hash chain failure
-		if _, err := tester.syncTake("attack"); err != errInvalidChain {
+		if err := tester.sync("attack"); err != errInvalidChain {
 			if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead {
 			if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead {
 				break
 				break
 			}
 			}
@@ -603,7 +575,7 @@ func TestBannedChainStarvationAttack(t *testing.T) {
 		t.Fatalf("banned attacker registered: %v", peer)
 		t.Fatalf("banned attacker registered: %v", peer)
 	}
 	}
 	// Ensure that a valid chain can still pass sync
 	// Ensure that a valid chain can still pass sync
-	if _, err := tester.syncTake("valid"); err != nil {
+	if err := tester.sync("valid"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
 }
 }
@@ -637,7 +609,7 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
 	// the head of the invalid chain is blocked too.
 	// the head of the invalid chain is blocked too.
 	for {
 	for {
 		// Try to sync with the attacker, check hash chain failure
 		// Try to sync with the attacker, check hash chain failure
-		if _, err := tester.syncTake("attack"); err != errInvalidChain {
+		if err := tester.sync("attack"); err != errInvalidChain {
 			t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
 			t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
 		}
 		}
 		// Short circuit if the entire chain was banned
 		// Short circuit if the entire chain was banned
@@ -658,33 +630,34 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
 	MaxBlockFetch = defaultMaxBlockFetch
 	MaxBlockFetch = defaultMaxBlockFetch
 	maxBannedHashes = defaultMaxBannedHashes
 	maxBannedHashes = defaultMaxBannedHashes
 
 
-	if _, err := tester.syncTake("valid"); err != nil {
+	if err := tester.sync("valid"); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
 		t.Fatalf("failed to synchronise blocks: %v", err)
 	}
 	}
 }
 }
 
 
 // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
 // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
-func TestAttackerDropping(t *testing.T) {
-	// Define the disconnection requirement for individual errors
+func TestHashAttackerDropping(t *testing.T) {
+	// Define the disconnection requirement for individual hash fetch errors
 	tests := []struct {
 	tests := []struct {
 		result error
 		result error
 		drop   bool
 		drop   bool
 	}{
 	}{
-		{nil, false},                 // Sync succeeded, all is well
-		{errBusy, false},             // Sync is already in progress, no problem
-		{errUnknownPeer, false},      // Peer is unknown, was already dropped, don't double drop
-		{errBadPeer, true},           // Peer was deemed bad for some reason, drop it
-		{errStallingPeer, true},      // Peer was detected to be stalling, drop it
-		{errBannedHead, true},        // Peer's head hash is a known bad hash, drop it
-		{errNoPeers, false},          // No peers to download from, soft race, no issue
-		{errPendingQueue, false},     // There are blocks still cached, wait to exhaust, no issue
-		{errTimeout, true},           // No hashes received in due time, drop the peer
-		{errEmptyHashSet, true},      // No hashes were returned as a response, drop as it's a dead end
-		{errPeersUnavailable, true},  // Nobody had the advertised blocks, drop the advertiser
-		{errInvalidChain, true},      // Hash chain was detected as invalid, definitely drop
-		{errCrossCheckFailed, true},  // Hash-origin failed to pass a block cross check, drop
-		{errCancelHashFetch, false},  // Synchronisation was canceled, origin may be innocent, don't drop
-		{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+		{nil, false},                  // Sync succeeded, all is well
+		{errBusy, false},              // Sync is already in progress, no problem
+		{errUnknownPeer, false},       // Peer is unknown, was already dropped, don't double drop
+		{errBadPeer, true},            // Peer was deemed bad for some reason, drop it
+		{errStallingPeer, true},       // Peer was detected to be stalling, drop it
+		{errBannedHead, true},         // Peer's head hash is a known bad hash, drop it
+		{errNoPeers, false},           // No peers to download from, soft race, no issue
+		{errPendingQueue, false},      // There are blocks still cached, wait to exhaust, no issue
+		{errTimeout, true},            // No hashes received in due time, drop the peer
+		{errEmptyHashSet, true},       // No hashes were returned as a response, drop as it's a dead end
+		{errPeersUnavailable, true},   // Nobody had the advertised blocks, drop the advertiser
+		{errInvalidChain, true},       // Hash chain was detected as invalid, definitely drop
+		{errCrossCheckFailed, true},   // Hash-origin failed to pass a block cross check, drop
+		{errCancelHashFetch, false},   // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelBlockFetch, false},  // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop
 	}
 	}
 	// Run the tests and check disconnection status
 	// Run the tests and check disconnection status
 	tester := newTester()
 	tester := newTester()
@@ -706,3 +679,38 @@ func TestAttackerDropping(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+// Tests that feeding bad blocks will result in a peer drop.
+func TestBlockAttackerDropping(t *testing.T) {
+	// Define the disconnection requirement for individual block import errors
+	tests := []struct {
+		failure bool
+		drop    bool
+	}{{true, true}, {false, false}}
+
+	// Run the tests and check disconnection status
+	tester := newTester()
+	for i, tt := range tests {
+		// Register a new peer and ensure it's presence
+		id := fmt.Sprintf("test %d", i)
+		if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil {
+			t.Fatalf("test %d: failed to register new peer: %v", i, err)
+		}
+		if _, ok := tester.peerHashes[id]; !ok {
+			t.Fatalf("test %d: registered peer not found", i)
+		}
+		// Assemble a good or bad block, depending of the test
+		raw := createBlock(1, knownHash, common.Hash{})
+		if tt.failure {
+			raw = createBlock(1, unknownHash, common.Hash{})
+		}
+		block := &Block{OriginPeer: id, RawBlock: raw}
+
+		// Simulate block processing and check the result
+		tester.downloader.queue.blockCache[0] = block
+		tester.downloader.process()
+		if _, ok := tester.peerHashes[id]; !ok != tt.drop {
+			t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop)
+		}
+	}
+}

+ 1 - 1
eth/handler.go

@@ -80,7 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
 		txsyncCh:   make(chan *txsync),
 		txsyncCh:   make(chan *txsync),
 		quitSync:   make(chan struct{}),
 		quitSync:   make(chan struct{}),
 	}
 	}
-	manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.removePeer)
+	manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
 	manager.SubProtocol = p2p.Protocol{
 	manager.SubProtocol = p2p.Protocol{
 		Name:    "eth",
 		Name:    "eth",
 		Version: uint(protocolVersion),
 		Version: uint(protocolVersion),

+ 3 - 50
eth/sync.go

@@ -1,9 +1,7 @@
 package eth
 package eth
 
 
 import (
 import (
-	"math"
 	"math/rand"
 	"math/rand"
-	"sync/atomic"
 	"time"
 	"time"
 
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/common"
@@ -15,12 +13,10 @@ import (
 
 
 const (
 const (
 	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
 	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
-	blockProcCycle      = 500 * time.Millisecond // Time interval to check for new blocks to process
 	notifyCheckCycle    = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
 	notifyCheckCycle    = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
 	notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 	notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 	notifyFetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
 	notifyFetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
 	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
 	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
-	blockProcAmount     = 256
 
 
 	// This is the target size for the packs of transactions sent by txsyncLoop.
 	// This is the target size for the packs of transactions sent by txsyncLoop.
 	// A pack can get larger than this if a single transactions exceeds this size.
 	// A pack can get larger than this if a single transactions exceeds this size.
@@ -254,10 +250,10 @@ func (pm *ProtocolManager) fetcher() {
 // syncer is responsible for periodically synchronising with the network, both
 // syncer is responsible for periodically synchronising with the network, both
 // downloading hashes and blocks as well as retrieving cached ones.
 // downloading hashes and blocks as well as retrieving cached ones.
 func (pm *ProtocolManager) syncer() {
 func (pm *ProtocolManager) syncer() {
-	forceSync := time.Tick(forceSyncCycle)
-	blockProc := time.Tick(blockProcCycle)
-	blockProcPend := int32(0)
+	// Abort any pending syncs if we terminate
+	defer pm.downloader.Cancel()
 
 
+	forceSync := time.Tick(forceSyncCycle)
 	for {
 	for {
 		select {
 		select {
 		case <-pm.newPeerCh:
 		case <-pm.newPeerCh:
@@ -271,55 +267,12 @@ func (pm *ProtocolManager) syncer() {
 			// Force a sync even if not enough peers are present
 			// Force a sync even if not enough peers are present
 			go pm.synchronise(pm.peers.BestPeer())
 			go pm.synchronise(pm.peers.BestPeer())
 
 
-		case <-blockProc:
-			// Try to pull some blocks from the downloaded
-			if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) {
-				go func() {
-					pm.processBlocks()
-					atomic.StoreInt32(&blockProcPend, 0)
-				}()
-			}
-
 		case <-pm.quitSync:
 		case <-pm.quitSync:
 			return
 			return
 		}
 		}
 	}
 	}
 }
 }
 
 
-// processBlocks retrieves downloaded blocks from the download cache and tries
-// to construct the local block chain with it. Note, since the block retrieval
-// order matters, access to this function *must* be synchronized/serialized.
-func (pm *ProtocolManager) processBlocks() error {
-	pm.wg.Add(1)
-	defer pm.wg.Done()
-
-	// Short circuit if no blocks are available for insertion
-	blocks := pm.downloader.TakeBlocks()
-	if len(blocks) == 0 {
-		return nil
-	}
-	glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
-
-	for len(blocks) != 0 && !pm.quit {
-		// Retrieve the first batch of blocks to insert
-		max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
-		raw := make(types.Blocks, 0, max)
-		for _, block := range blocks[:max] {
-			raw = append(raw, block.RawBlock)
-		}
-		// Try to inset the blocks, drop the originating peer if there's an error
-		index, err := pm.chainman.InsertChain(raw)
-		if err != nil {
-			glog.V(logger.Debug).Infoln("Downloaded block import failed:", err)
-			pm.removePeer(blocks[index].OriginPeer)
-			pm.downloader.Cancel()
-			return err
-		}
-		blocks = blocks[max:]
-	}
-	return nil
-}
-
 // synchronise tries to sync up our local block chain with a remote peer, both
 // synchronise tries to sync up our local block chain with a remote peer, both
 // adding various sanity checks as well as wrapping it with various log entries.
 // adding various sanity checks as well as wrapping it with various log entries.
 func (pm *ProtocolManager) synchronise(peer *peer) {
 func (pm *ProtocolManager) synchronise(peer *peer) {