|
|
@@ -53,6 +53,9 @@ var (
|
|
|
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
|
|
|
ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts
|
|
|
|
|
|
+ diffFetchTick = 10 * time.Millisecond
|
|
|
+ diffFetchLimit = 5
|
|
|
+
|
|
|
qosTuningPeers = 5 // Number of peers to tune based on (best peers)
|
|
|
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
|
|
|
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
|
|
|
@@ -161,10 +164,10 @@ type Downloader struct {
|
|
|
quitLock sync.Mutex // Lock to prevent double closes
|
|
|
|
|
|
// Testing hooks
|
|
|
- syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
|
|
|
- bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch
|
|
|
- receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch
|
|
|
- chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
|
|
|
+ syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
|
|
|
+ bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
|
|
|
+ receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
|
|
|
+ chainInsertHook func([]*fetchResult, chan struct{}) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
|
|
|
}
|
|
|
|
|
|
// LightChain encapsulates functions required to synchronise a light chain.
|
|
|
@@ -232,27 +235,35 @@ type IPeerSet interface {
|
|
|
|
|
|
func EnableDiffFetchOp(peers IPeerSet) DownloadOption {
|
|
|
return func(dl *Downloader) *Downloader {
|
|
|
- var hook = func(headers []*types.Header, args ...interface{}) {
|
|
|
- if len(args) < 2 {
|
|
|
- return
|
|
|
- }
|
|
|
- peerID, ok := args[1].(string)
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
- mode, ok := args[0].(SyncMode)
|
|
|
- if !ok {
|
|
|
- return
|
|
|
- }
|
|
|
- if ep := peers.GetDiffPeer(peerID); mode == FullSync && ep != nil {
|
|
|
- hashes := make([]common.Hash, 0, len(headers))
|
|
|
- for _, header := range headers {
|
|
|
- hashes = append(hashes, header.Hash())
|
|
|
- }
|
|
|
- ep.RequestDiffLayers(hashes)
|
|
|
+ var hook = func(results []*fetchResult, stop chan struct{}) {
|
|
|
+ if dl.getMode() == FullSync {
|
|
|
+ go func() {
|
|
|
+ ticker := time.NewTicker(diffFetchTick)
|
|
|
+ defer ticker.Stop()
|
|
|
+ for _, r := range results {
|
|
|
+ Wait:
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-stop:
|
|
|
+ return
|
|
|
+ case <-ticker.C:
|
|
|
+ if dl.blockchain.CurrentHeader().Number.Int64()+int64(diffFetchLimit) > r.Header.Number.Int64() {
|
|
|
+ break Wait
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if ep := peers.GetDiffPeer(r.pid); ep != nil {
|
|
|
+ // It turns out a diff layer is 5x larger than block, we just request one diffLayer each time
|
|
|
+ err := ep.RequestDiffLayers([]common.Hash{r.Header.Hash()})
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
}
|
|
|
}
|
|
|
- dl.bodyFetchHook = hook
|
|
|
+ dl.chainInsertHook = hook
|
|
|
return dl
|
|
|
}
|
|
|
}
|
|
|
@@ -1405,7 +1416,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
|
|
|
// - kind: textual label of the type being downloaded to display in log messages
|
|
|
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
|
|
|
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
|
|
|
- fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
|
|
|
+ fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
|
|
|
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
|
|
|
|
|
|
// Create a ticker to detect expired retrieval tasks
|
|
|
@@ -1554,7 +1565,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
|
|
|
}
|
|
|
// Fetch the chunk and make sure any errors return the hashes to the queue
|
|
|
if fetchHook != nil {
|
|
|
- fetchHook(request.Headers, d.getMode(), peer.id)
|
|
|
+ fetchHook(request.Headers)
|
|
|
}
|
|
|
if err := fetch(peer, request); err != nil {
|
|
|
// Although we could try and make an attempt to fix this, this error really
|
|
|
@@ -1759,12 +1770,15 @@ func (d *Downloader) processFullSyncContent() error {
|
|
|
if len(results) == 0 {
|
|
|
return nil
|
|
|
}
|
|
|
+ stop := make(chan struct{})
|
|
|
if d.chainInsertHook != nil {
|
|
|
- d.chainInsertHook(results)
|
|
|
+ d.chainInsertHook(results, stop)
|
|
|
}
|
|
|
if err := d.importBlockResults(results); err != nil {
|
|
|
+ close(stop)
|
|
|
return err
|
|
|
}
|
|
|
+ close(stop)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1850,7 +1864,7 @@ func (d *Downloader) processFastSyncContent() error {
|
|
|
}
|
|
|
}
|
|
|
if d.chainInsertHook != nil {
|
|
|
- d.chainInsertHook(results)
|
|
|
+ d.chainInsertHook(results, nil)
|
|
|
}
|
|
|
// If we haven't downloaded the pivot block yet, check pivot staleness
|
|
|
// notifications from the header downloader
|