Преглед изворни кода

eth/downloader: fix dysfunctional ignore list hidden by generic set

Péter Szilágyi пре 10 година
родитељ
комит
b658a73ed5
2 измењених фајлова са 46 додато и 11 уклоњено
  1. 40 5
      eth/downloader/peer.go
  2. 6 6
      eth/downloader/queue.go

+ 40 - 5
eth/downloader/peer.go

@@ -28,9 +28,11 @@ import (
 	"time"
 
 	"github.com/ethereum/go-ethereum/common"
-	"gopkg.in/fatih/set.v0"
 )
 
+// Maximum number of entries allowed on the list or lacking items.
+const maxLackingHashes = 4096
+
 // Hash and block fetchers belonging to eth/61 and below
 type relativeHashFetcherFn func(common.Hash) error
 type absoluteHashFetcherFn func(uint64, int) error
@@ -67,7 +69,8 @@ type peer struct {
 	receiptStarted time.Time // Time instance when the last receipt fetch was started
 	stateStarted   time.Time // Time instance when the last node data fetch was started
 
-	ignored *set.Set // Set of hashes not to request (didn't have previously)
+	lacking     map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
+	lackingLock sync.RWMutex             // Lock protecting the lacking hashes list
 
 	getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
 	getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@@ -95,7 +98,7 @@ func newPeer(id string, version int, head common.Hash,
 		blockCapacity:   1,
 		receiptCapacity: 1,
 		stateCapacity:   1,
-		ignored:         set.New(),
+		lacking:         make(map[common.Hash]struct{}),
 
 		getRelHashes: getRelHashes,
 		getAbsHashes: getAbsHashes,
@@ -119,7 +122,10 @@ func (p *peer) Reset() {
 	atomic.StoreInt32(&p.blockCapacity, 1)
 	atomic.StoreInt32(&p.receiptCapacity, 1)
 	atomic.StoreInt32(&p.stateCapacity, 1)
-	p.ignored.Clear()
+
+	p.lackingLock.Lock()
+	p.lacking = make(map[common.Hash]struct{})
+	p.lackingLock.Unlock()
 }
 
 // Fetch61 sends a block retrieval request to the remote peer.
@@ -305,13 +311,42 @@ func (p *peer) Demote() {
 	}
 }
 
+// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
+// that a peer is known not to have (i.e. have been requested before). If the
+// set reaches its maximum allowed capacity, items are randomly dropped off.
+func (p *peer) MarkLacking(hash common.Hash) {
+	p.lackingLock.Lock()
+	defer p.lackingLock.Unlock()
+
+	for len(p.lacking) >= maxLackingHashes {
+		for drop, _ := range p.lacking {
+			delete(p.lacking, drop)
+			break
+		}
+	}
+	p.lacking[hash] = struct{}{}
+}
+
+// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
+// list (i.e. whether we know that the peer does not have it).
+func (p *peer) Lacks(hash common.Hash) bool {
+	p.lackingLock.RLock()
+	defer p.lackingLock.RUnlock()
+
+	_, ok := p.lacking[hash]
+	return ok
+}
+
 // String implements fmt.Stringer.
 func (p *peer) String() string {
+	p.lackingLock.RLock()
+	defer p.lackingLock.RUnlock()
+
 	return fmt.Sprintf("Peer %s [%s]", p.id,
 		fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
 			fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
 			fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
-			fmt.Sprintf("ignored %4d", p.ignored.Size()),
+			fmt.Sprintf("lacking %4d", len(p.lacking)),
 	)
 }
 

+ 6 - 6
eth/downloader/queue.go

@@ -501,7 +501,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe
 
 	for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
 		hash, priority := taskQueue.Pop()
-		if p.ignored.Has(hash) {
+		if p.Lacks(hash.(common.Hash)) {
 			skip[hash.(common.Hash)] = int(priority)
 		} else {
 			send[hash.(common.Hash)] = int(priority)
@@ -607,7 +607,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
 			continue
 		}
 		// Otherwise unless the peer is known not to have the data, add to the retrieve list
-		if p.ignored.Has(header.Hash()) {
+		if p.Lacks(header.Hash()) {
 			skip = append(skip, header)
 		} else {
 			send = append(send, header)
@@ -781,7 +781,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
 	// If no blocks were retrieved, mark them as unavailable for the origin peer
 	if len(blocks) == 0 {
 		for hash, _ := range request.Hashes {
-			request.Peer.ignored.Add(hash)
+			request.Peer.MarkLacking(hash)
 		}
 	}
 	// Iterate over the downloaded blocks and add each of them
@@ -877,8 +877,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
 
 	// If no data items were retrieved, mark them as unavailable for the origin peer
 	if results == 0 {
-		for hash, _ := range request.Headers {
-			request.Peer.ignored.Add(hash)
+		for _, header := range request.Headers {
+			request.Peer.MarkLacking(header.Hash())
 		}
 	}
 	// Assemble each of the results with their headers and retrieved data parts
@@ -944,7 +944,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
 	// If no data was retrieved, mark their hashes as unavailable for the origin peer
 	if len(data) == 0 {
 		for hash, _ := range request.Hashes {
-			request.Peer.ignored.Add(hash)
+			request.Peer.MarkLacking(hash)
 		}
 	}
 	// Iterate over the downloaded data and verify each of them