Browse Source

eth/downloader: fix #910, thread safe peers & polishes

Péter Szilágyi 10 years ago
parent
commit
685862d2ce
4 changed files with 210 additions and 161 deletions
  1. 56 84
      eth/downloader/downloader.go
  2. 2 1
      eth/downloader/downloader_test.go
  3. 143 76
      eth/downloader/peer.go
  4. 9 0
      eth/downloader/queue.go

+ 56 - 84
eth/downloader/downloader.go

@@ -49,12 +49,6 @@ type blockPack struct {
 	blocks []*types.Block
 }
 
-type syncPack struct {
-	peer          *peer
-	hash          common.Hash
-	ignoreInitial bool
-}
-
 type hashPack struct {
 	peerId string
 	hashes []common.Hash
@@ -63,7 +57,7 @@ type hashPack struct {
 type Downloader struct {
 	mu         sync.RWMutex
 	queue      *queue
-	peers      peers
+	peers      *peerSet
 	activePeer string
 
 	// Callbacks
@@ -83,7 +77,7 @@ type Downloader struct {
 func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
 	downloader := &Downloader{
 		queue:     newQueue(),
-		peers:     make(peers),
+		peers:     newPeerSet(),
 		hasBlock:  hasBlock,
 		getBlock:  getBlock,
 		newPeerCh: make(chan *peer, 1),
@@ -98,29 +92,26 @@ func (d *Downloader) Stats() (current int, max int) {
 	return d.queue.Size()
 }
 
-func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
-	d.mu.Lock()
-	defer d.mu.Unlock()
-
-	glog.V(logger.Detail).Infoln("Register peer", id)
-
-	// Create a new peer and add it to the list of known peers
-	peer := newPeer(id, hash, getHashes, getBlocks)
-	// add peer to our peer set
-	d.peers[id] = peer
-	// broadcast new peer
-
+// RegisterPeer injects a new download peer into the set of block source to be
+// used for fetching hashes and blocks from.
+func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
+	glog.V(logger.Detail).Infoln("Registering peer", id)
+	if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil {
+		glog.V(logger.Error).Infoln("Register failed:", err)
+		return err
+	}
 	return nil
 }
 
-// UnregisterPeer unregisters a peer. This will prevent any action from the specified peer.
-func (d *Downloader) UnregisterPeer(id string) {
-	d.mu.Lock()
-	defer d.mu.Unlock()
-
-	glog.V(logger.Detail).Infoln("Unregister peer", id)
-
-	delete(d.peers, id)
+// UnregisterPeer remove a peer from the known list, preventing any action from
+// the specified peer.
+func (d *Downloader) UnregisterPeer(id string) error {
+	glog.V(logger.Detail).Infoln("Unregistering peer", id)
+	if err := d.peers.Unregister(id); err != nil {
+		glog.V(logger.Error).Infoln("Unregister failed:", err)
+		return err
+	}
+	return nil
 }
 
 // Synchronise will select the peer and use it for synchronising. If an empty string is given
@@ -140,15 +131,16 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
 	if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
 		return errPendingQueue
 	}
-	// Reset the queue to clean any internal leftover state
+	// Reset the queue and peer set to clean any internal leftover state
 	d.queue.Reset()
+	d.peers.Reset()
 
 	// Retrieve the origin peer and initiate the downloading process
-	p := d.peers[id]
+	p := d.peers.Peer(id)
 	if p == nil {
 		return errUnknownPeer
 	}
-	return d.getFromPeer(p, hash, false)
+	return d.syncWithPeer(p, hash)
 }
 
 // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
@@ -167,7 +159,9 @@ func (d *Downloader) Has(hash common.Hash) bool {
 	return d.queue.Has(hash)
 }
 
-func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
+// syncWithPeer starts a block synchronization based on the hash chain from the
+// specified peer and head hash.
+func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
 	d.activePeer = p.id
 	defer func() {
 		// reset on error
@@ -177,21 +171,12 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
 	}()
 
 	glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id)
-	// Start the fetcher. This will block the update entirely
-	// interupts need to be send to the appropriate channels
-	// respectively.
-	if err = d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
+	if err = d.fetchHashes(p, hash); err != nil {
 		return err
 	}
-
-	// Start fetching blocks in paralel. The strategy is simple
-	// take any available peers, seserve a chunk for each peer available,
-	// let the peer deliver the chunkn and periodically check if a peer
-	// has timedout.
-	if err = d.startFetchingBlocks(p); err != nil {
+	if err = d.fetchBlocks(); err != nil {
 		return err
 	}
-
 	glog.V(logger.Debug).Infoln("Synchronization completed")
 
 	return nil
@@ -234,17 +219,14 @@ blockDone:
 }
 
 // XXX Make synchronous
-func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
+func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
 	glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
 
 	start := time.Now()
 
-	// We ignore the initial hash in some cases (e.g. we received a block without it's parent)
-	// In such circumstances we don't need to download the block so don't add it to the queue.
-	if !ignoreInitial {
-		// Add the hash to the queue first
-		d.queue.Insert([]common.Hash{h})
-	}
+	// Add the hash to the queue first
+	d.queue.Insert([]common.Hash{h})
+
 	// Get the first batch of hashes
 	p.getHashes(h)
 
@@ -308,20 +290,18 @@ out:
 			// Attempt to find a new peer by checking inclusion of peers best hash in our
 			// already fetched hash list. This can't guarantee 100% correctness but does
 			// a fair job. This is always either correct or false incorrect.
-			for id, peer := range d.peers {
-				if d.queue.Has(peer.recentHash) && !attemptedPeers[id] {
+			for _, peer := range d.peers.AllPeers() {
+				if d.queue.Has(peer.head) && !attemptedPeers[p.id] {
 					p = peer
 					break
 				}
 			}
-
 			// if all peers have been tried, abort the process entirely or if the hash is
 			// the zero hash.
 			if p == nil || (hash == common.Hash{}) {
 				d.queue.Reset()
 				return ErrTimeout
 			}
-
 			// set p to the active peer. this will invalidate any hashes that may be returned
 			// by our previous (delayed) peer.
 			activePeer = p
@@ -334,14 +314,11 @@ out:
 	return nil
 }
 
-func (d *Downloader) startFetchingBlocks(p *peer) error {
+// fetchBlocks iteratively downloads the entire schedules block-chain, taking
+// any available peers, reserving a chunk of blocks for each, wait for delivery
+// and periodically checking for timeouts.
+func (d *Downloader) fetchBlocks() error {
 	glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
-
-	// Defer the peer reset. This will empty the peer requested set
-	// and makes sure there are no lingering peers with an incorrect
-	// state
-	defer d.peers.reset()
-
 	start := time.Now()
 
 	// default ticker for re-fetching blocks every now and then
@@ -354,19 +331,19 @@ out:
 		case blockPack := <-d.blockCh:
 			// If the peer was previously banned and failed to deliver it's pack
 			// in a reasonable time frame, ignore it's message.
-			if d.peers[blockPack.peerId] != nil {
-				err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
-				if err != nil {
-					glog.V(logger.Debug).Infof("deliver failed for peer %s: %v\n", blockPack.peerId, err)
-					// FIXME d.UnregisterPeer(blockPack.peerId)
+			if peer := d.peers.Peer(blockPack.peerId); peer != nil {
+				// Deliver the received chunk of blocks, but drop the peer if invalid
+				if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
+					glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
+					d.peers.Unregister(blockPack.peerId)
 					break
 				}
-
 				if glog.V(logger.Debug) {
-					glog.Infof("adding %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId)
+					glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId)
 				}
-				d.peers[blockPack.peerId].promote()
-				d.peers.setState(blockPack.peerId, idleState)
+				// Promote the peer and update it's idle state
+				peer.Promote()
+				peer.SetIdle()
 			}
 		case <-ticker.C:
 			// Check for bad peers. Bad peers may indicate a peer not responding
@@ -381,13 +358,10 @@ out:
 				// 1) Time for them to respond;
 				// 2) Measure their speed;
 				// 3) Amount and availability.
-				if peer := d.peers[pid]; peer != nil {
-					peer.demote()
-					peer.reset()
-				}
+				d.peers.Unregister(pid)
 			}
 			// After removing bad peers make sure we actually have sufficient peer left to keep downloading
-			if len(d.peers) == 0 {
+			if d.peers.Peers() == 0 {
 				d.queue.Reset()
 				return errNoPeers
 			}
@@ -398,31 +372,29 @@ out:
 				if d.queue.Throttle() {
 					continue
 				}
-
-				availablePeers := d.peers.get(idleState)
-				for _, peer := range availablePeers {
+				// Send a download request to all idle peers
+				idlePeers := d.peers.IdlePeers()
+				for _, peer := range idlePeers {
 					// Get a possible chunk. If nil is returned no chunk
 					// could be returned due to no hashes available.
 					request := d.queue.Reserve(peer, maxBlockFetch)
 					if request == nil {
 						continue
 					}
-					// XXX make fetch blocking.
 					// Fetch the chunk and check for error. If the peer was somehow
 					// already fetching a chunk due to a bug, it will be returned to
 					// the queue
-					if err := peer.fetch(request); err != nil {
-						// log for tracing
-						glog.V(logger.Debug).Infof("peer %s received double work (state = %v)\n", peer.id, peer.state)
+					if err := peer.Fetch(request); err != nil {
+						glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id)
 						d.queue.Cancel(request)
 					}
 				}
-				// make sure that we have peers available for fetching. If all peers have been tried
+				// Make sure that we have peers available for fetching. If all peers have been tried
 				// and all failed throw an error
 				if d.queue.InFlight() == 0 {
 					d.queue.Reset()
 
-					return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.Pending())
+					return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Peers(), d.queue.Pending())
 				}
 
 			} else if d.queue.InFlight() == 0 {

+ 2 - 1
eth/downloader/downloader_test.go

@@ -229,7 +229,7 @@ func TestThrottling(t *testing.T) {
 	minDesiredPeerCount = 4
 	blockTtl = 1 * time.Second
 
-	targetBlocks := 4 * blockCacheLimit
+	targetBlocks := 16 * blockCacheLimit
 	hashes := createHashes(0, targetBlocks)
 	blocks := createBlocksFromHashes(hashes)
 	tester := newTester(t, hashes, blocks)
@@ -256,6 +256,7 @@ func TestThrottling(t *testing.T) {
 				return
 			default:
 				took = append(took, tester.downloader.TakeBlocks()...)
+				time.Sleep(time.Millisecond)
 			}
 		}
 	}()

+ 143 - 76
eth/downloader/peer.go

@@ -1,125 +1,192 @@
+// Contains the active peer-set of the downloader, maintaining both failures
+// as well as reputation metrics to prioritize the block retrievals.
+
 package downloader
 
 import (
 	"errors"
 	"sync"
+	"sync/atomic"
 
 	"github.com/ethereum/go-ethereum/common"
 	"gopkg.in/fatih/set.v0"
 )
 
-const (
-	workingState = 2
-	idleState    = 4
-)
-
 type hashFetcherFn func(common.Hash) error
 type blockFetcherFn func([]common.Hash) error
 
-// XXX make threadsafe!!!!
-type peers map[string]*peer
+var (
+	errAlreadyFetching   = errors.New("already fetching blocks from peer")
+	errAlreadyRegistered = errors.New("peer is already registered")
+	errNotRegistered     = errors.New("peer is not registered")
+)
 
-func (p peers) reset() {
-	for _, peer := range p {
-		peer.reset()
-	}
+// peer represents an active peer from which hashes and blocks are retrieved.
+type peer struct {
+	id   string      // Unique identifier of the peer
+	head common.Hash // Hash of the peers latest known block
+
+	idle int32 // Current activity state of the peer (idle = 0, active = 1)
+	rep  int32 // Simple peer reputation (not used currently)
+
+	mu sync.RWMutex
+
+	ignored *set.Set
+
+	getHashes hashFetcherFn
+	getBlocks blockFetcherFn
 }
 
-func (p peers) get(state int) []*peer {
-	var peers []*peer
-	for _, peer := range p {
-		peer.mu.RLock()
-		if peer.state == state {
-			peers = append(peers, peer)
-		}
-		peer.mu.RUnlock()
+// newPeer create a new downloader peer, with specific hash and block retrieval
+// mechanisms.
+func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
+	return &peer{
+		id:        id,
+		head:      head,
+		getHashes: getHashes,
+		getBlocks: getBlocks,
+		ignored:   set.New(),
 	}
+}
 
-	return peers
+// Reset clears the internal state of a peer entity.
+func (p *peer) Reset() {
+	atomic.StoreInt32(&p.idle, 0)
+	p.ignored.Clear()
 }
 
-func (p peers) setState(id string, state int) {
-	if peer, exist := p[id]; exist {
-		peer.mu.Lock()
-		defer peer.mu.Unlock()
-		peer.state = state
+// Fetch sends a block retrieval request to the remote peer.
+func (p *peer) Fetch(request *fetchRequest) error {
+	// Short circuit if the peer is already fetching
+	if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
+		return errAlreadyFetching
 	}
-}
+	// Convert the hash set to a retrievable slice
+	hashes := make([]common.Hash, 0, len(request.Hashes))
+	for hash, _ := range request.Hashes {
+		hashes = append(hashes, hash)
+	}
+	p.getBlocks(hashes)
 
-func (p peers) getPeer(id string) *peer {
-	return p[id]
+	return nil
 }
 
-// peer represents an active peer
-type peer struct {
-	state int // Peer state (working, idle)
-	rep   int // TODO peer reputation
+// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
+func (p *peer) SetIdle() {
+	atomic.StoreInt32(&p.idle, 0)
+}
 
-	mu         sync.RWMutex
-	id         string
-	recentHash common.Hash
+// Promote increases the peer's reputation.
+func (p *peer) Promote() {
+	atomic.AddInt32(&p.rep, 1)
+}
 
-	ignored *set.Set
+// Demote decreases the peer's reputation or leaves it at 0.
+func (p *peer) Demote() {
+	for {
+		// Calculate the new reputation value
+		prev := atomic.LoadInt32(&p.rep)
+		next := prev - 2
+		if next < 0 {
+			next = 0
+		}
+		// Try to update the old value
+		if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
+			return
+		}
+	}
+}
 
-	getHashes hashFetcherFn
-	getBlocks blockFetcherFn
+// peerSet represents the collection of active peer participating in the block
+// download procedure.
+type peerSet struct {
+	peers map[string]*peer
+	lock  sync.RWMutex
 }
 
-// create a new peer
-func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
-	return &peer{
-		id:         id,
-		recentHash: hash,
-		getHashes:  getHashes,
-		getBlocks:  getBlocks,
-		state:      idleState,
-		ignored:    set.New(),
+// newPeerSet creates a new peer set top track the active download sources.
+func newPeerSet() *peerSet {
+	return &peerSet{
+		peers: make(map[string]*peer),
 	}
 }
 
-// fetch a chunk using the peer
-func (p *peer) fetch(request *fetchRequest) error {
-	p.mu.Lock()
-	defer p.mu.Unlock()
+// Reset iterates over the current peer set, and resets each of the known peers
+// to prepare for a next batch of block retrieval.
+func (ps *peerSet) Reset() {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
 
-	if p.state == workingState {
-		return errors.New("peer already fetching chunk")
+	for _, peer := range ps.peers {
+		peer.Reset()
 	}
+}
 
-	// set working state
-	p.state = workingState
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known.
+func (ps *peerSet) Register(p *peer) error {
+	ps.lock.Lock()
+	defer ps.lock.Unlock()
 
-	// Convert the hash set to a fetchable slice
-	hashes := make([]common.Hash, 0, len(request.Hashes))
-	for hash, _ := range request.Hashes {
-		hashes = append(hashes, hash)
+	if _, ok := ps.peers[p.id]; ok {
+		return errAlreadyRegistered
 	}
-	p.getBlocks(hashes)
+	ps.peers[p.id] = p
+	return nil
+}
 
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+	ps.lock.Lock()
+	defer ps.lock.Unlock()
+
+	if _, ok := ps.peers[id]; !ok {
+		return errNotRegistered
+	}
+	delete(ps.peers, id)
 	return nil
 }
 
-// promote increases the peer's reputation
-func (p *peer) promote() {
-	p.mu.Lock()
-	defer p.mu.Unlock()
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peer {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	return ps.peers[id]
+}
+
+// Peers returns if the current number of peers in the set.
+func (ps *peerSet) Peers() int {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
 
-	p.rep++
+	return len(ps.peers)
 }
 
-// demote decreases the peer's reputation or leaves it at 0
-func (p *peer) demote() {
-	p.mu.Lock()
-	defer p.mu.Unlock()
+// AllPeers retrieves a flat list of all the peers within the set.
+func (ps *peerSet) AllPeers() []*peer {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
 
-	if p.rep > 1 {
-		p.rep -= 2
-	} else {
-		p.rep = 0
+	list := make([]*peer, 0, len(ps.peers))
+	for _, p := range ps.peers {
+		list = append(list, p)
 	}
+	return list
 }
 
-func (p *peer) reset() {
-	p.state = idleState
-	p.ignored.Clear()
+// IdlePeers retrieves a flat list of all the currently idle peers within the
+// active peer set.
+func (ps *peerSet) IdlePeers() []*peer {
+	ps.lock.RLock()
+	defer ps.lock.RUnlock()
+
+	list := make([]*peer, 0, len(ps.peers))
+	for _, p := range ps.peers {
+		if atomic.LoadInt32(&p.idle) == 0 {
+			list = append(list, p)
+		}
+	}
+	return list
 }

+ 9 - 0
eth/downloader/queue.go

@@ -1,3 +1,6 @@
+// Contains the block download scheduler to collect download tasks and schedule
+// them in an ordered, and throttled way.
+
 package downloader
 
 import (
@@ -8,6 +11,8 @@ import (
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/logger"
+	"github.com/ethereum/go-ethereum/logger/glog"
 	"gopkg.in/karalabe/cookiejar.v2/collections/prque"
 )
 
@@ -126,6 +131,10 @@ func (q *queue) Insert(hashes []common.Hash) {
 	for i, hash := range hashes {
 		index := q.hashCounter + i
 
+		if old, ok := q.hashPool[hash]; ok {
+			glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
+			continue
+		}
 		q.hashPool[hash] = index
 		q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
 	}