| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837 |
- // Copyright 2015 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // Package fetcher contains the block announcement based synchonisation.
- package fetcher
- import (
- "errors"
- "fmt"
- "math/rand"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "gopkg.in/karalabe/cookiejar.v2/collections/prque"
- )
- const (
- arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
- gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
- fetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
- maxUncleDist = 7 // Maximum allowed backward distance from the chain head
- maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
- hashLimit = 256 // Maximum number of unique blocks a peer may have announced
- blockLimit = 64 // Maximum number of unique blocks a per may have delivered
- )
- var (
- errTerminated = errors.New("terminated")
- )
- // blockRetrievalFn is a callback type for retrieving a block from the local chain.
- type blockRetrievalFn func(common.Hash) *types.Block
- // blockRequesterFn is a callback type for sending a block retrieval request.
- type blockRequesterFn func([]common.Hash) error
- // headerRequesterFn is a callback type for sending a header retrieval request.
- type headerRequesterFn func(common.Hash) error
- // bodyRequesterFn is a callback type for sending a body retrieval request.
- type bodyRequesterFn func([]common.Hash) error
- // blockValidatorFn is a callback type to verify a block's header for fast propagation.
- type blockValidatorFn func(block *types.Block, parent *types.Block) error
- // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
- type blockBroadcasterFn func(block *types.Block, propagate bool)
- // chainHeightFn is a callback type to retrieve the current chain height.
- type chainHeightFn func() uint64
- // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
- type chainInsertFn func(types.Blocks) (int, error)
- // peerDropFn is a callback type for dropping a peer detected as malicious.
- type peerDropFn func(id string)
- // announce is the hash notification of the availability of a new block in the
- // network.
- type announce struct {
- hash common.Hash // Hash of the block being announced
- number uint64 // Number of the block being announced (0 = unknown | old protocol)
- header *types.Header // Header of the block partially reassembled (new protocol)
- time time.Time // Timestamp of the announcement
- origin string // Identifier of the peer originating the notification
- fetch61 blockRequesterFn // [eth/61] Fetcher function to retrieve an announced block
- fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block
- fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block
- }
- // headerFilterTask represents a batch of headers needing fetcher filtering.
- type headerFilterTask struct {
- headers []*types.Header // Collection of headers to filter
- time time.Time // Arrival time of the headers
- }
- // headerFilterTask represents a batch of block bodies (transactions and uncles)
- // needing fetcher filtering.
- type bodyFilterTask struct {
- transactions [][]*types.Transaction // Collection of transactions per block bodies
- uncles [][]*types.Header // Collection of uncles per block bodies
- time time.Time // Arrival time of the blocks' contents
- }
- // inject represents a schedules import operation.
- type inject struct {
- origin string
- block *types.Block
- }
- // Fetcher is responsible for accumulating block announcements from various peers
- // and scheduling them for retrieval.
- type Fetcher struct {
- // Various event channels
- notify chan *announce
- inject chan *inject
- blockFilter chan chan []*types.Block
- headerFilter chan chan *headerFilterTask
- bodyFilter chan chan *bodyFilterTask
- done chan common.Hash
- quit chan struct{}
- // Announce states
- announces map[string]int // Per peer announce counts to prevent memory exhaustion
- announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
- fetching map[common.Hash]*announce // Announced blocks, currently fetching
- fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
- completing map[common.Hash]*announce // Blocks with headers, currently body-completing
- // Block cache
- queue *prque.Prque // Queue containing the import operations (block number sorted)
- queues map[string]int // Per peer block counts to prevent memory exhaustion
- queued map[common.Hash]*inject // Set of already queued blocks (to dedup imports)
- // Callbacks
- getBlock blockRetrievalFn // Retrieves a block from the local chain
- validateBlock blockValidatorFn // Checks if a block's headers have a valid proof of work
- broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
- chainHeight chainHeightFn // Retrieves the current chain's height
- insertChain chainInsertFn // Injects a batch of blocks into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
- // Testing hooks
- announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
- queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
- fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
- completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
- importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
- }
- // New creates a block fetcher to retrieve blocks based on hash announcements.
- func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
- return &Fetcher{
- notify: make(chan *announce),
- inject: make(chan *inject),
- blockFilter: make(chan chan []*types.Block),
- headerFilter: make(chan chan *headerFilterTask),
- bodyFilter: make(chan chan *bodyFilterTask),
- done: make(chan common.Hash),
- quit: make(chan struct{}),
- announces: make(map[string]int),
- announced: make(map[common.Hash][]*announce),
- fetching: make(map[common.Hash]*announce),
- fetched: make(map[common.Hash][]*announce),
- completing: make(map[common.Hash]*announce),
- queue: prque.New(),
- queues: make(map[string]int),
- queued: make(map[common.Hash]*inject),
- getBlock: getBlock,
- validateBlock: validateBlock,
- broadcastBlock: broadcastBlock,
- chainHeight: chainHeight,
- insertChain: insertChain,
- dropPeer: dropPeer,
- }
- }
- // Start boots up the announcement based synchoniser, accepting and processing
- // hash notifications and block fetches until termination requested.
- func (f *Fetcher) Start() {
- go f.loop()
- }
- // Stop terminates the announcement based synchroniser, canceling all pending
- // operations.
- func (f *Fetcher) Stop() {
- close(f.quit)
- }
- // Notify announces the fetcher of the potential availability of a new block in
- // the network.
- func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
- blockFetcher blockRequesterFn, // eth/61 specific whole block fetcher
- headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
- block := &announce{
- hash: hash,
- number: number,
- time: time,
- origin: peer,
- fetch61: blockFetcher,
- fetchHeader: headerFetcher,
- fetchBodies: bodyFetcher,
- }
- select {
- case f.notify <- block:
- return nil
- case <-f.quit:
- return errTerminated
- }
- }
- // Enqueue tries to fill gaps the the fetcher's future import queue.
- func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
- op := &inject{
- origin: peer,
- block: block,
- }
- select {
- case f.inject <- op:
- return nil
- case <-f.quit:
- return errTerminated
- }
- }
- // FilterBlocks extracts all the blocks that were explicitly requested by the fetcher,
- // returning those that should be handled differently.
- func (f *Fetcher) FilterBlocks(blocks types.Blocks) types.Blocks {
- glog.V(logger.Detail).Infof("[eth/61] filtering %d blocks", len(blocks))
- // Send the filter channel to the fetcher
- filter := make(chan []*types.Block)
- select {
- case f.blockFilter <- filter:
- case <-f.quit:
- return nil
- }
- // Request the filtering of the block list
- select {
- case filter <- blocks:
- case <-f.quit:
- return nil
- }
- // Retrieve the blocks remaining after filtering
- select {
- case blocks := <-filter:
- return blocks
- case <-f.quit:
- return nil
- }
- }
- // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
- // returning those that should be handled differently.
- func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
- glog.V(logger.Detail).Infof("[eth/62] filtering %d headers", len(headers))
- // Send the filter channel to the fetcher
- filter := make(chan *headerFilterTask)
- select {
- case f.headerFilter <- filter:
- case <-f.quit:
- return nil
- }
- // Request the filtering of the header list
- select {
- case filter <- &headerFilterTask{headers: headers, time: time}:
- case <-f.quit:
- return nil
- }
- // Retrieve the headers remaining after filtering
- select {
- case task := <-filter:
- return task.headers
- case <-f.quit:
- return nil
- }
- }
- // FilterBodies extracts all the block bodies that were explicitly requested by
- // the fetcher, returning those that should be handled differently.
- func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
- glog.V(logger.Detail).Infof("[eth/62] filtering %d:%d bodies", len(transactions), len(uncles))
- // Send the filter channel to the fetcher
- filter := make(chan *bodyFilterTask)
- select {
- case f.bodyFilter <- filter:
- case <-f.quit:
- return nil, nil
- }
- // Request the filtering of the body list
- select {
- case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}:
- case <-f.quit:
- return nil, nil
- }
- // Retrieve the bodies remaining after filtering
- select {
- case task := <-filter:
- return task.transactions, task.uncles
- case <-f.quit:
- return nil, nil
- }
- }
- // Loop is the main fetcher loop, checking and processing various notification
- // events.
- func (f *Fetcher) loop() {
- // Iterate the block fetching until a quit is requested
- fetchTimer := time.NewTimer(0)
- completeTimer := time.NewTimer(0)
- for {
- // Clean up any expired block fetches
- for hash, announce := range f.fetching {
- if time.Since(announce.time) > fetchTimeout {
- f.forgetHash(hash)
- }
- }
- // Import any queued blocks that could potentially fit
- height := f.chainHeight()
- for !f.queue.Empty() {
- op := f.queue.PopItem().(*inject)
- if f.queueChangeHook != nil {
- f.queueChangeHook(op.block.Hash(), false)
- }
- // If too high up the chain or phase, continue later
- number := op.block.NumberU64()
- if number > height+1 {
- f.queue.Push(op, -float32(op.block.NumberU64()))
- if f.queueChangeHook != nil {
- f.queueChangeHook(op.block.Hash(), true)
- }
- break
- }
- // Otherwise if fresh and still unknown, try and import
- hash := op.block.Hash()
- if number+maxUncleDist < height || f.getBlock(hash) != nil {
- f.forgetBlock(hash)
- continue
- }
- f.insert(op.origin, op.block)
- }
- // Wait for an outside event to occur
- select {
- case <-f.quit:
- // Fetcher terminating, abort all operations
- return
- case notification := <-f.notify:
- // A block was announced, make sure the peer isn't DOSing us
- propAnnounceInMeter.Mark(1)
- count := f.announces[notification.origin] + 1
- if count > hashLimit {
- glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)
- propAnnounceDOSMeter.Mark(1)
- break
- }
- // If we have a valid block number, check that it's potentially useful
- if notification.number > 0 {
- if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
- glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)
- propAnnounceDropMeter.Mark(1)
- break
- }
- }
- // All is well, schedule the announce if block's not yet downloading
- if _, ok := f.fetching[notification.hash]; ok {
- break
- }
- if _, ok := f.completing[notification.hash]; ok {
- break
- }
- f.announces[notification.origin] = count
- f.announced[notification.hash] = append(f.announced[notification.hash], notification)
- if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
- f.announceChangeHook(notification.hash, true)
- }
- if len(f.announced) == 1 {
- f.rescheduleFetch(fetchTimer)
- }
- case op := <-f.inject:
- // A direct block insertion was requested, try and fill any pending gaps
- propBroadcastInMeter.Mark(1)
- f.enqueue(op.origin, op.block)
- case hash := <-f.done:
- // A pending import finished, remove all traces of the notification
- f.forgetHash(hash)
- f.forgetBlock(hash)
- case <-fetchTimer.C:
- // At least one block's timer ran out, check for needing retrieval
- request := make(map[string][]common.Hash)
- for hash, announces := range f.announced {
- if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
- // Pick a random peer to retrieve from, reset all others
- announce := announces[rand.Intn(len(announces))]
- f.forgetHash(hash)
- // If the block still didn't arrive, queue for fetching
- if f.getBlock(hash) == nil {
- request[announce.origin] = append(request[announce.origin], hash)
- f.fetching[hash] = announce
- }
- }
- }
- // Send out all block (eth/61) or header (eth/62) requests
- for peer, hashes := range request {
- if glog.V(logger.Detail) && len(hashes) > 0 {
- list := "["
- for _, hash := range hashes {
- list += fmt.Sprintf("%x…, ", hash[:4])
- }
- list = list[:len(list)-2] + "]"
- if f.fetching[hashes[0]].fetch61 != nil {
- glog.V(logger.Detail).Infof("[eth/61] Peer %s: fetching blocks %s", peer, list)
- } else {
- glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching headers %s", peer, list)
- }
- }
- // Create a closure of the fetch and schedule in on a new thread
- fetchBlocks, fetchHeader, hashes := f.fetching[hashes[0]].fetch61, f.fetching[hashes[0]].fetchHeader, hashes
- go func() {
- if f.fetchingHook != nil {
- f.fetchingHook(hashes)
- }
- if fetchBlocks != nil {
- // Use old eth/61 protocol to retrieve whole blocks
- blockFetchMeter.Mark(int64(len(hashes)))
- fetchBlocks(hashes)
- } else {
- // Use new eth/62 protocol to retrieve headers first
- for _, hash := range hashes {
- headerFetchMeter.Mark(1)
- fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
- }
- }
- }()
- }
- // Schedule the next fetch if blocks are still pending
- f.rescheduleFetch(fetchTimer)
- case <-completeTimer.C:
- // At least one header's timer ran out, retrieve everything
- request := make(map[string][]common.Hash)
- for hash, announces := range f.fetched {
- // Pick a random peer to retrieve from, reset all others
- announce := announces[rand.Intn(len(announces))]
- f.forgetHash(hash)
- // If the block still didn't arrive, queue for completion
- if f.getBlock(hash) == nil {
- request[announce.origin] = append(request[announce.origin], hash)
- f.completing[hash] = announce
- }
- }
- // Send out all block body requests
- for peer, hashes := range request {
- if glog.V(logger.Detail) && len(hashes) > 0 {
- list := "["
- for _, hash := range hashes {
- list += fmt.Sprintf("%x…, ", hash[:4])
- }
- list = list[:len(list)-2] + "]"
- glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching bodies %s", peer, list)
- }
- // Create a closure of the fetch and schedule in on a new thread
- if f.completingHook != nil {
- f.completingHook(hashes)
- }
- bodyFetchMeter.Mark(int64(len(hashes)))
- go f.completing[hashes[0]].fetchBodies(hashes)
- }
- // Schedule the next fetch if blocks are still pending
- f.rescheduleComplete(completeTimer)
- case filter := <-f.blockFilter:
- // Blocks arrived, extract any explicit fetches, return all else
- var blocks types.Blocks
- select {
- case blocks = <-filter:
- case <-f.quit:
- return
- }
- blockFilterInMeter.Mark(int64(len(blocks)))
- explicit, download := []*types.Block{}, []*types.Block{}
- for _, block := range blocks {
- hash := block.Hash()
- // Filter explicitly requested blocks from hash announcements
- if f.fetching[hash] != nil && f.queued[hash] == nil {
- // Discard if already imported by other means
- if f.getBlock(hash) == nil {
- explicit = append(explicit, block)
- } else {
- f.forgetHash(hash)
- }
- } else {
- download = append(download, block)
- }
- }
- blockFilterOutMeter.Mark(int64(len(download)))
- select {
- case filter <- download:
- case <-f.quit:
- return
- }
- // Schedule the retrieved blocks for ordered import
- for _, block := range explicit {
- if announce := f.fetching[block.Hash()]; announce != nil {
- f.enqueue(announce.origin, block)
- }
- }
- case filter := <-f.headerFilter:
- // Headers arrived from a remote peer. Extract those that were explicitly
- // requested by the fetcher, and return everything else so it's delivered
- // to other parts of the system.
- var task *headerFilterTask
- select {
- case task = <-filter:
- case <-f.quit:
- return
- }
- headerFilterInMeter.Mark(int64(len(task.headers)))
- // Split the batch of headers into unknown ones (to return to the caller),
- // known incomplete ones (requiring body retrievals) and completed blocks.
- unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
- for _, header := range task.headers {
- hash := header.Hash()
- // Filter fetcher-requested headers from other synchronisation algorithms
- if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
- // If the delivered header does not match the promised number, drop the announcer
- if header.Number.Uint64() != announce.number {
- glog.V(logger.Detail).Infof("[eth/62] Peer %s: invalid block number for [%x…]: announced %d, provided %d", announce.origin, header.Hash().Bytes()[:4], announce.number, header.Number.Uint64())
- f.dropPeer(announce.origin)
- f.forgetHash(hash)
- continue
- }
- // Only keep if not imported by other means
- if f.getBlock(hash) == nil {
- announce.header = header
- announce.time = task.time
- // If the block is empty (header only), short circuit into the final import queue
- if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
- glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
- block := types.NewBlockWithHeader(header)
- block.ReceivedAt = task.time
- complete = append(complete, block)
- f.completing[hash] = announce
- continue
- }
- // Otherwise add to the list of blocks needing completion
- incomplete = append(incomplete, announce)
- } else {
- glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] already imported, discarding header", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
- f.forgetHash(hash)
- }
- } else {
- // Fetcher doesn't know about it, add to the return list
- unknown = append(unknown, header)
- }
- }
- headerFilterOutMeter.Mark(int64(len(unknown)))
- select {
- case filter <- &headerFilterTask{headers: unknown, time: task.time}:
- case <-f.quit:
- return
- }
- // Schedule the retrieved headers for body completion
- for _, announce := range incomplete {
- hash := announce.header.Hash()
- if _, ok := f.completing[hash]; ok {
- continue
- }
- f.fetched[hash] = append(f.fetched[hash], announce)
- if len(f.fetched) == 1 {
- f.rescheduleComplete(completeTimer)
- }
- }
- // Schedule the header-only blocks for import
- for _, block := range complete {
- if announce := f.completing[block.Hash()]; announce != nil {
- f.enqueue(announce.origin, block)
- }
- }
- case filter := <-f.bodyFilter:
- // Block bodies arrived, extract any explicitly requested blocks, return the rest
- var task *bodyFilterTask
- select {
- case task = <-filter:
- case <-f.quit:
- return
- }
- bodyFilterInMeter.Mark(int64(len(task.transactions)))
- blocks := []*types.Block{}
- for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
- // Match up a body to any possible completion request
- matched := false
- for hash, announce := range f.completing {
- if f.queued[hash] == nil {
- txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
- uncleHash := types.CalcUncleHash(task.uncles[i])
- if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash {
- // Mark the body matched, reassemble if still unknown
- matched = true
- if f.getBlock(hash) == nil {
- block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
- block.ReceivedAt = task.time
- blocks = append(blocks, block)
- } else {
- f.forgetHash(hash)
- }
- }
- }
- }
- if matched {
- task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
- task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
- i--
- continue
- }
- }
- bodyFilterOutMeter.Mark(int64(len(task.transactions)))
- select {
- case filter <- task:
- case <-f.quit:
- return
- }
- // Schedule the retrieved blocks for ordered import
- for _, block := range blocks {
- if announce := f.completing[block.Hash()]; announce != nil {
- f.enqueue(announce.origin, block)
- }
- }
- }
- }
- }
- // rescheduleFetch resets the specified fetch timer to the next announce timeout.
- func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
- // Short circuit if no blocks are announced
- if len(f.announced) == 0 {
- return
- }
- // Otherwise find the earliest expiring announcement
- earliest := time.Now()
- for _, announces := range f.announced {
- if earliest.After(announces[0].time) {
- earliest = announces[0].time
- }
- }
- fetch.Reset(arriveTimeout - time.Since(earliest))
- }
- // rescheduleComplete resets the specified completion timer to the next fetch timeout.
- func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
- // Short circuit if no headers are fetched
- if len(f.fetched) == 0 {
- return
- }
- // Otherwise find the earliest expiring announcement
- earliest := time.Now()
- for _, announces := range f.fetched {
- if earliest.After(announces[0].time) {
- earliest = announces[0].time
- }
- }
- complete.Reset(gatherSlack - time.Since(earliest))
- }
- // enqueue schedules a new future import operation, if the block to be imported
- // has not yet been seen.
- func (f *Fetcher) enqueue(peer string, block *types.Block) {
- hash := block.Hash()
- // Ensure the peer isn't DOSing us
- count := f.queues[peer] + 1
- if count > blockLimit {
- glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
- propBroadcastDOSMeter.Mark(1)
- f.forgetHash(hash)
- return
- }
- // Discard any past or too distant blocks
- if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
- glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
- propBroadcastDropMeter.Mark(1)
- f.forgetHash(hash)
- return
- }
- // Schedule the block for future importing
- if _, ok := f.queued[hash]; !ok {
- op := &inject{
- origin: peer,
- block: block,
- }
- f.queues[peer] = count
- f.queued[hash] = op
- f.queue.Push(op, -float32(block.NumberU64()))
- if f.queueChangeHook != nil {
- f.queueChangeHook(op.block.Hash(), true)
- }
- if glog.V(logger.Debug) {
- glog.Infof("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size())
- }
- }
- }
- // insert spawns a new goroutine to run a block insertion into the chain. If the
- // block's number is at the same height as the current import phase, if updates
- // the phase states accordingly.
- func (f *Fetcher) insert(peer string, block *types.Block) {
- hash := block.Hash()
- // Run the import on a new thread
- glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x…]", peer, block.NumberU64(), hash[:4])
- go func() {
- defer func() { f.done <- hash }()
- // If the parent's unknown, abort insertion
- parent := f.getBlock(block.ParentHash())
- if parent == nil {
- glog.V(logger.Debug).Infof("Peer %s: parent []%x] of block #%d [%x…] unknown", block.ParentHash().Bytes()[:4], peer, block.NumberU64(), hash[:4])
- return
- }
- // Quickly validate the header and propagate the block if it passes
- switch err := f.validateBlock(block, parent); err {
- case nil:
- // All ok, quickly propagate to our peers
- propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
- go f.broadcastBlock(block, true)
- case core.BlockFutureErr:
- // Weird future block, don't fail, but neither propagate
- default:
- // Something went very wrong, drop the peer
- glog.V(logger.Debug).Infof("Peer %s: block #%d [%x…] verification failed: %v", peer, block.NumberU64(), hash[:4], err)
- f.dropPeer(peer)
- return
- }
- // Run the actual import and log any issues
- if _, err := f.insertChain(types.Blocks{block}); err != nil {
- glog.V(logger.Warn).Infof("Peer %s: block #%d [%x…] import failed: %v", peer, block.NumberU64(), hash[:4], err)
- return
- }
- // If import succeeded, broadcast the block
- propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
- go f.broadcastBlock(block, false)
- // Invoke the testing hook if needed
- if f.importedHook != nil {
- f.importedHook(block)
- }
- }()
- }
- // forgetHash removes all traces of a block announcement from the fetcher's
- // internal state.
- func (f *Fetcher) forgetHash(hash common.Hash) {
- // Remove all pending announces and decrement DOS counters
- for _, announce := range f.announced[hash] {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- }
- delete(f.announced, hash)
- if f.announceChangeHook != nil {
- f.announceChangeHook(hash, false)
- }
- // Remove any pending fetches and decrement the DOS counters
- if announce := f.fetching[hash]; announce != nil {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- delete(f.fetching, hash)
- }
- // Remove any pending completion requests and decrement the DOS counters
- for _, announce := range f.fetched[hash] {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- }
- delete(f.fetched, hash)
- // Remove any pending completions and decrement the DOS counters
- if announce := f.completing[hash]; announce != nil {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- delete(f.completing, hash)
- }
- }
- // forgetBlock removes all traces of a queued block from the fetcher's internal
- // state.
- func (f *Fetcher) forgetBlock(hash common.Hash) {
- if insert := f.queued[hash]; insert != nil {
- f.queues[insert.origin]--
- if f.queues[insert.origin] == 0 {
- delete(f.queues, insert.origin)
- }
- delete(f.queued, hash)
- }
- }
|