浏览代码

eth/downloader: more context in errors (#21067)

This PR makes use of go 1.13 error handling, wrapping errors and using
errors.Is to check a wrapped root-cause. It also removes the travis
builders for go 1.11 and go 1.12.
Martin Holst Swende 5 年之前
父节点
当前提交
a5eee8d1dc
共有 4 个文件被更改,包括 49 次插入47 次删除
  1. 0 20
      .travis.yml
  2. 25 10
      eth/downloader/downloader.go
  3. 16 11
      eth/downloader/downloader_test.go
  4. 8 6
      eth/downloader/queue.go

+ 0 - 20
.travis.yml

@@ -24,26 +24,6 @@ jobs:
       script:
       script:
         - go run build/ci.go lint
         - go run build/ci.go lint
 
 
-    - stage: build
-      os: linux
-      dist: xenial
-      go: 1.11.x
-      env:
-        - GO111MODULE=on
-      script:
-        - go run build/ci.go install
-        - go run build/ci.go test -coverage $TEST_PACKAGES
-
-    - stage: build
-      os: linux
-      dist: xenial
-      go: 1.12.x
-      env:
-        - GO111MODULE=on
-      script:
-        - go run build/ci.go install
-        - go run build/ci.go test -coverage $TEST_PACKAGES
-
     - stage: build
     - stage: build
       os: linux
       os: linux
       dist: xenial
       dist: xenial

+ 25 - 10
eth/downloader/downloader.go

@@ -321,13 +321,28 @@ func (d *Downloader) UnregisterPeer(id string) error {
 // adding various sanity checks as well as wrapping it with various log entries.
 // adding various sanity checks as well as wrapping it with various log entries.
 func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
 func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
 	err := d.synchronise(id, head, td, mode)
 	err := d.synchronise(id, head, td, mode)
+
 	switch err {
 	switch err {
-	case nil:
-	case errBusy, errCanceled:
+	case nil, errBusy, errCanceled:
+		return err
+	}
 
 
+	if errors.Is(err, errInvalidChain) {
+		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
+		if d.dropPeer == nil {
+			// The dropPeer method is nil when `--copydb` is used for a local copy.
+			// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
+			log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
+		} else {
+			d.dropPeer(id)
+		}
+		return err
+	}
+
+	switch err {
 	case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
 	case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
 		errEmptyHeaderSet, errPeersUnavailable, errTooOld,
 		errEmptyHeaderSet, errPeersUnavailable, errTooOld,
-		errInvalidAncestor, errInvalidChain:
+		errInvalidAncestor:
 		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
 		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
 		if d.dropPeer == nil {
 		if d.dropPeer == nil {
 			// The dropPeer method is nil when `--copydb` is used for a local copy.
 			// The dropPeer method is nil when `--copydb` is used for a local copy.
@@ -774,7 +789,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
 				expectNumber := from + int64(i)*int64(skip+1)
 				expectNumber := from + int64(i)*int64(skip+1)
 				if number := header.Number.Int64(); number != expectNumber {
 				if number := header.Number.Int64(); number != expectNumber {
 					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
 					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
-					return 0, errInvalidChain
+					return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering"))
 				}
 				}
 			}
 			}
 			// Check if a common ancestor was found
 			// Check if a common ancestor was found
@@ -988,7 +1003,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
 				filled, proced, err := d.fillHeaderSkeleton(from, headers)
 				filled, proced, err := d.fillHeaderSkeleton(from, headers)
 				if err != nil {
 				if err != nil {
 					p.log.Debug("Skeleton chain invalid", "err", err)
 					p.log.Debug("Skeleton chain invalid", "err", err)
-					return errInvalidChain
+					return fmt.Errorf("%w: %v", errInvalidChain, err)
 				}
 				}
 				headers = filled[proced:]
 				headers = filled[proced:]
 				from += uint64(proced)
 				from += uint64(proced)
@@ -1207,13 +1222,13 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
 			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
 			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
 				// Deliver the received chunk of data and check chain validity
 				// Deliver the received chunk of data and check chain validity
 				accepted, err := deliver(packet)
 				accepted, err := deliver(packet)
-				if err == errInvalidChain {
+				if errors.Is(err, errInvalidChain) {
 					return err
 					return err
 				}
 				}
 				// Unless a peer delivered something completely else than requested (usually
 				// Unless a peer delivered something completely else than requested (usually
 				// caused by a timed out request which came through in the end), set it to
 				// caused by a timed out request which came through in the end), set it to
 				// idle. If the delivery's stale, the peer should have already been idled.
 				// idle. If the delivery's stale, the peer should have already been idled.
-				if err != errStaleDelivery {
+				if !errors.Is(err, errStaleDelivery) {
 					setIdle(peer, accepted)
 					setIdle(peer, accepted)
 				}
 				}
 				// Issue a log to the user to see what's going on
 				// Issue a log to the user to see what's going on
@@ -1473,7 +1488,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 							rollback = append(rollback, chunk[:n]...)
 							rollback = append(rollback, chunk[:n]...)
 						}
 						}
 						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
 						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
-						return errInvalidChain
+						return fmt.Errorf("%w: %v", errInvalidChain, err)
 					}
 					}
 					// All verifications passed, store newly found uncertain headers
 					// All verifications passed, store newly found uncertain headers
 					rollback = append(rollback, unknown...)
 					rollback = append(rollback, unknown...)
@@ -1565,7 +1580,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
 			// of the blocks delivered from the downloader, and the indexing will be off.
 			// of the blocks delivered from the downloader, and the indexing will be off.
 			log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
 			log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
 		}
 		}
-		return errInvalidChain
+		return fmt.Errorf("%w: %v", errInvalidChain, err)
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -1706,7 +1721,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
 	}
 	}
 	if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
 	if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
 		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
 		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
-		return errInvalidChain
+		return fmt.Errorf("%w: %v", errInvalidChain, err)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 16 - 11
eth/downloader/downloader_test.go

@@ -242,27 +242,32 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
 func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
 func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
 	dl.lock.Lock()
 	dl.lock.Lock()
 	defer dl.lock.Unlock()
 	defer dl.lock.Unlock()
-
 	// Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
 	// Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
 	if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
 	if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
-		return 0, errors.New("unknown parent")
+		return 0, errors.New("InsertHeaderChain: unknown parent at first position")
 	}
 	}
+	var hashes []common.Hash
 	for i := 1; i < len(headers); i++ {
 	for i := 1; i < len(headers); i++ {
+		hash := headers[i-1].Hash()
 		if headers[i].ParentHash != headers[i-1].Hash() {
 		if headers[i].ParentHash != headers[i-1].Hash() {
-			return i, errors.New("unknown parent")
+			return i, fmt.Errorf("non-contiguous import at position %d", i)
 		}
 		}
+		hashes = append(hashes, hash)
 	}
 	}
+	hashes = append(hashes, headers[len(headers)-1].Hash())
 	// Do a full insert if pre-checks passed
 	// Do a full insert if pre-checks passed
 	for i, header := range headers {
 	for i, header := range headers {
-		if _, ok := dl.ownHeaders[header.Hash()]; ok {
+		hash := hashes[i]
+		if _, ok := dl.ownHeaders[hash]; ok {
 			continue
 			continue
 		}
 		}
 		if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
 		if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
-			return i, errors.New("unknown parent")
+			// This _should_ be impossible, due to precheck and induction
+			return i, fmt.Errorf("InsertHeaderChain: unknown parent at position %d", i)
 		}
 		}
-		dl.ownHashes = append(dl.ownHashes, header.Hash())
-		dl.ownHeaders[header.Hash()] = header
-		dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
+		dl.ownHashes = append(dl.ownHashes, hash)
+		dl.ownHeaders[hash] = header
+		dl.ownChainTd[hash] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
 	}
 	}
 	return len(headers), nil
 	return len(headers), nil
 }
 }
@@ -274,9 +279,9 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
 
 
 	for i, block := range blocks {
 	for i, block := range blocks {
 		if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
 		if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
-			return i, errors.New("unknown parent")
+			return i, fmt.Errorf("InsertChain: unknown parent at position %d / %d", i, len(blocks))
 		} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
 		} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
-			return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
+			return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err)
 		}
 		}
 		if _, ok := dl.ownHeaders[block.Hash()]; !ok {
 		if _, ok := dl.ownHeaders[block.Hash()]; !ok {
 			dl.ownHashes = append(dl.ownHashes, block.Hash())
 			dl.ownHashes = append(dl.ownHashes, block.Hash())
@@ -301,7 +306,7 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ
 		}
 		}
 		if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
 		if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
 			if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
 			if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
-				return i, errors.New("unknown parent")
+				return i, errors.New("InsertReceiptChain: unknown parent")
 			}
 			}
 		}
 		}
 		if blocks[i].NumberU64() <= ancientLimit {
 		if blocks[i].NumberU64() <= ancientLimit {

+ 8 - 6
eth/downloader/queue.go

@@ -509,7 +509,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
 		index := int(header.Number.Int64() - int64(q.resultOffset))
 		index := int(header.Number.Int64() - int64(q.resultOffset))
 		if index >= len(q.resultCache) || index < 0 {
 		if index >= len(q.resultCache) || index < 0 {
 			common.Report("index allocation went beyond available resultCache space")
 			common.Report("index allocation went beyond available resultCache space")
-			return nil, false, errInvalidChain
+			return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain)
 		}
 		}
 		if q.resultCache[index] == nil {
 		if q.resultCache[index] == nil {
 			components := 1
 			components := 1
@@ -863,14 +863,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
 		q.active.Signal()
 		q.active.Signal()
 	}
 	}
 	// If none of the data was good, it's a stale delivery
 	// If none of the data was good, it's a stale delivery
-	switch {
-	case failure == nil || failure == errInvalidChain:
+	if failure == nil {
+		return accepted, nil
+	}
+	if errors.Is(failure, errInvalidChain) {
 		return accepted, failure
 		return accepted, failure
-	case useful:
+	}
+	if useful {
 		return accepted, fmt.Errorf("partial failure: %v", failure)
 		return accepted, fmt.Errorf("partial failure: %v", failure)
-	default:
-		return accepted, errStaleDelivery
 	}
 	}
+	return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
 }
 }
 
 
 // Prepare configures the result cache to allow accepting and caching inbound
 // Prepare configures the result cache to allow accepting and caching inbound