浏览代码

les: fix UDP connection query (#22451)

This PR fixes multiple issues with the UDP connection pre-negotiation feature:

- the enable condition was wrong (it checked the existence of the DiscV5 struct where it wasn't initialized yet, disabling the feature even if discv5 was enabled)
- the server pool queried already connected nodes when the discovery iterators returned them again
- servers responded positively before they were synced and really willing to accept connections

Metrics are also added on the server side that count the positive and negative replies to served connection queries.
Felföldi Zsolt 4 年之前
父节点
当前提交
62d8022b51
共有 8 个文件被更改,包括 110 次插入76 次删除
  1. 12 6
      les/client.go
  2. 16 2
      les/clientpool.go
  3. 12 12
      les/clientpool_test.go
  4. 2 3
      les/enr_entry.go
  5. 6 4
      les/metrics.go
  6. 1 1
      les/server.go
  7. 5 1
      les/test_helper.go
  8. 56 47
      les/vflux/client/serverpool.go

+ 12 - 6
les/client.go

@@ -73,8 +73,9 @@ type LightEthereum struct {
 	accountManager *accounts.Manager
 	accountManager *accounts.Manager
 	netRPCService  *ethapi.PublicNetAPI
 	netRPCService  *ethapi.PublicNetAPI
 
 
-	p2pServer *p2p.Server
-	p2pConfig *p2p.Config
+	p2pServer  *p2p.Server
+	p2pConfig  *p2p.Config
+	udpEnabled bool
 }
 }
 
 
 // New creates an instance of the light client.
 // New creates an instance of the light client.
@@ -113,10 +114,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
 		bloomIndexer:   core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
 		bloomIndexer:   core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
 		p2pServer:      stack.Server(),
 		p2pServer:      stack.Server(),
 		p2pConfig:      &stack.Config().P2P,
 		p2pConfig:      &stack.Config().P2P,
+		udpEnabled:     stack.Config().P2P.DiscoveryV5,
 	}
 	}
 
 
 	var prenegQuery vfc.QueryFunc
 	var prenegQuery vfc.QueryFunc
-	if leth.p2pServer.DiscV5 != nil {
+	if leth.udpEnabled {
 		prenegQuery = leth.prenegQuery
 		prenegQuery = leth.prenegQuery
 	}
 	}
 	leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
 	leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
@@ -198,7 +200,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
 
 
 // VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
 // VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
 func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
 func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
-	if s.p2pServer.DiscV5 == nil {
+	if !s.udpEnabled {
 		return nil
 		return nil
 	}
 	}
 	reqsEnc, _ := rlp.EncodeToBytes(&reqs)
 	reqsEnc, _ := rlp.EncodeToBytes(&reqs)
@@ -215,7 +217,7 @@ func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.R
 func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
 func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
 	if n.Seq() == 0 {
 	if n.Seq() == 0 {
 		var err error
 		var err error
-		if s.p2pServer.DiscV5 == nil {
+		if !s.udpEnabled {
 			return 0
 			return 0
 		}
 		}
 		if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
 		if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
@@ -346,7 +348,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
 func (s *LightEthereum) Start() error {
 func (s *LightEthereum) Start() error {
 	log.Warn("Light client mode is an experimental feature")
 	log.Warn("Light client mode is an experimental feature")
 
 
-	discovery, err := s.setupDiscovery(s.p2pConfig)
+	if s.udpEnabled && s.p2pServer.DiscV5 == nil {
+		s.udpEnabled = false
+		log.Error("Discovery v5 is not initialized")
+	}
+	discovery, err := s.setupDiscovery()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 16 - 2
les/clientpool.go

@@ -72,6 +72,7 @@ type clientPool struct {
 	clock      mclock.Clock
 	clock      mclock.Clock
 	closed     bool
 	closed     bool
 	removePeer func(enode.ID)
 	removePeer func(enode.ID)
+	synced     func() bool
 	ns         *nodestate.NodeStateMachine
 	ns         *nodestate.NodeStateMachine
 	pp         *vfs.PriorityPool
 	pp         *vfs.PriorityPool
 	bt         *vfs.BalanceTracker
 	bt         *vfs.BalanceTracker
@@ -107,7 +108,7 @@ type clientInfo struct {
 }
 }
 
 
 // newClientPool creates a new client pool
 // newClientPool creates a new client pool
-func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
+func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID), synced func() bool) *clientPool {
 	pool := &clientPool{
 	pool := &clientPool{
 		ns:                  ns,
 		ns:                  ns,
 		BalanceTrackerSetup: balanceTrackerSetup,
 		BalanceTrackerSetup: balanceTrackerSetup,
@@ -116,6 +117,7 @@ func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap
 		minCap:              minCap,
 		minCap:              minCap,
 		connectedBias:       connectedBias,
 		connectedBias:       connectedBias,
 		removePeer:          removePeer,
 		removePeer:          removePeer,
+		synced:              synced,
 	}
 	}
 	pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{})
 	pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{})
 	pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
 	pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
@@ -396,6 +398,13 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
 	if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
 	if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
 		return nil
 		return nil
 	}
 	}
+	result := make(vflux.CapacityQueryReply, len(req.AddTokens))
+	if !f.synced() {
+		capacityQueryZeroMeter.Mark(1)
+		reply, _ := rlp.EncodeToBytes(&result)
+		return reply
+	}
+
 	node := f.ns.GetNode(id)
 	node := f.ns.GetNode(id)
 	if node == nil {
 	if node == nil {
 		node = enode.SignNull(&enr.Record{}, id)
 		node = enode.SignNull(&enr.Record{}, id)
@@ -416,7 +425,6 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
 	}
 	}
 	// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
 	// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
 	curve := f.pp.GetCapacityCurve().Exclude(id)
 	curve := f.pp.GetCapacityCurve().Exclude(id)
-	result := make(vflux.CapacityQueryReply, len(req.AddTokens))
 	bias := time.Second * time.Duration(req.Bias)
 	bias := time.Second * time.Duration(req.Bias)
 	if f.connectedBias > bias {
 	if f.connectedBias > bias {
 		bias = f.connectedBias
 		bias = f.connectedBias
@@ -434,6 +442,12 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
 			result[i] = 0
 			result[i] = 0
 		}
 		}
 	}
 	}
+	// add first result to metrics (don't care about priority client multi-queries yet)
+	if result[0] == 0 {
+		capacityQueryZeroMeter.Mark(1)
+	} else {
+		capacityQueryNonZeroMeter.Mark(1)
+	}
 	reply, _ := rlp.EncodeToBytes(&result)
 	reply, _ := rlp.EncodeToBytes(&result)
 	return reply
 	return reply
 }
 }

+ 12 - 12
les/clientpool_test.go

@@ -133,7 +133,7 @@ func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, rando
 		disconnFn = func(id enode.ID) {
 		disconnFn = func(id enode.ID) {
 			disconnCh <- int(id[0]) + int(id[1])<<8
 			disconnCh <- int(id[0]) + int(id[1])<<8
 		}
 		}
-		pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn)
+		pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn, alwaysTrueFn)
 	)
 	)
 	pool.ns.Start()
 	pool.ns.Start()
 
 
@@ -239,7 +239,7 @@ func TestConnectPaidClient(t *testing.T) {
 		clock mclock.Simulated
 		clock mclock.Simulated
 		db    = rawdb.NewMemoryDatabase()
 		db    = rawdb.NewMemoryDatabase()
 	)
 	)
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10))
 	pool.setLimits(10, uint64(10))
@@ -255,7 +255,7 @@ func TestConnectPaidClientToSmallPool(t *testing.T) {
 		clock mclock.Simulated
 		clock mclock.Simulated
 		db    = rawdb.NewMemoryDatabase()
 		db    = rawdb.NewMemoryDatabase()
 	)
 	)
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -274,7 +274,7 @@ func TestConnectPaidClientToFullPool(t *testing.T) {
 		db    = rawdb.NewMemoryDatabase()
 		db    = rawdb.NewMemoryDatabase()
 	)
 	)
 	removeFn := func(enode.ID) {} // Noop
 	removeFn := func(enode.ID) {} // Noop
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -304,7 +304,7 @@ func TestPaidClientKickedOut(t *testing.T) {
 	removeFn := func(id enode.ID) {
 	removeFn := func(id enode.ID) {
 		kickedCh <- int(id[0])
 		kickedCh <- int(id[0])
 	}
 	}
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	pool.bt.SetExpirationTCs(0, 0)
 	pool.bt.SetExpirationTCs(0, 0)
 	defer pool.stop()
 	defer pool.stop()
@@ -335,7 +335,7 @@ func TestConnectFreeClient(t *testing.T) {
 		clock mclock.Simulated
 		clock mclock.Simulated
 		db    = rawdb.NewMemoryDatabase()
 		db    = rawdb.NewMemoryDatabase()
 	)
 	)
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10))
 	pool.setLimits(10, uint64(10))
@@ -352,7 +352,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) {
 		db    = rawdb.NewMemoryDatabase()
 		db    = rawdb.NewMemoryDatabase()
 	)
 	)
 	removeFn := func(enode.ID) {} // Noop
 	removeFn := func(enode.ID) {} // Noop
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -382,7 +382,7 @@ func TestFreeClientKickedOut(t *testing.T) {
 		kicked = make(chan int, 100)
 		kicked = make(chan int, 100)
 	)
 	)
 	removeFn := func(id enode.ID) { kicked <- int(id[0]) }
 	removeFn := func(id enode.ID) { kicked <- int(id[0]) }
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -424,7 +424,7 @@ func TestPositiveBalanceCalculation(t *testing.T) {
 		kicked = make(chan int, 10)
 		kicked = make(chan int, 10)
 	)
 	)
 	removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
 	removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -448,7 +448,7 @@ func TestDowngradePriorityClient(t *testing.T) {
 		kicked = make(chan int, 10)
 		kicked = make(chan int, 10)
 	)
 	)
 	removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
 	removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -483,7 +483,7 @@ func TestNegativeBalanceCalculation(t *testing.T) {
 		clock mclock.Simulated
 		clock mclock.Simulated
 		db    = rawdb.NewMemoryDatabase()
 		db    = rawdb.NewMemoryDatabase()
 	)
 	)
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
 	pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -521,7 +521,7 @@ func TestInactiveClient(t *testing.T) {
 		clock mclock.Simulated
 		clock mclock.Simulated
 		db    = rawdb.NewMemoryDatabase()
 		db    = rawdb.NewMemoryDatabase()
 	)
 	)
-	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
+	pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
 	pool.ns.Start()
 	pool.ns.Start()
 	defer pool.stop()
 	defer pool.stop()
 	pool.setLimits(2, uint64(2))
 	pool.setLimits(2, uint64(2))

+ 2 - 3
les/enr_entry.go

@@ -18,7 +18,6 @@ package les
 
 
 import (
 import (
 	"github.com/ethereum/go-ethereum/core/forkid"
 	"github.com/ethereum/go-ethereum/core/forkid"
-	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/p2p/dnsdisc"
 	"github.com/ethereum/go-ethereum/p2p/dnsdisc"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/p2p/enode"
 	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/ethereum/go-ethereum/rlp"
@@ -42,7 +41,7 @@ type ethEntry struct {
 func (ethEntry) ENRKey() string { return "eth" }
 func (ethEntry) ENRKey() string { return "eth" }
 
 
 // setupDiscovery creates the node discovery source for the eth protocol.
 // setupDiscovery creates the node discovery source for the eth protocol.
-func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
+func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) {
 	it := enode.NewFairMix(0)
 	it := enode.NewFairMix(0)
 
 
 	// Enable DNS discovery.
 	// Enable DNS discovery.
@@ -56,7 +55,7 @@ func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error
 	}
 	}
 
 
 	// Enable DHT.
 	// Enable DHT.
-	if cfg.DiscoveryV5 && eth.p2pServer.DiscV5 != nil {
+	if eth.udpEnabled {
 		it.AddSource(eth.p2pServer.DiscV5.RandomNodes())
 		it.AddSource(eth.p2pServer.DiscV5.RandomNodes())
 	}
 	}
 
 

+ 6 - 4
les/metrics.go

@@ -73,10 +73,12 @@ var (
 	serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
 	serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
 	clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
 	clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
 
 
-	totalCapacityGauge   = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
-	totalRechargeGauge   = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
-	totalConnectedGauge  = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
-	blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
+	totalCapacityGauge        = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
+	totalRechargeGauge        = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
+	totalConnectedGauge       = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
+	blockProcessingTimer      = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
+	capacityQueryZeroMeter    = metrics.NewRegisteredMeter("les/server/capQueryZero", nil)
+	capacityQueryNonZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryNonZero", nil)
 
 
 	requestServedMeter               = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
 	requestServedMeter               = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
 	requestServedTimer               = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
 	requestServedTimer               = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)

+ 1 - 1
les/server.go

@@ -149,7 +149,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
 		srv.maxCapacity = totalRecharge
 		srv.maxCapacity = totalRecharge
 	}
 	}
 	srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
 	srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
-	srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient)
+	srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient, issync)
 	srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
 	srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
 
 
 	checkpoint := srv.latestLocalCheckpoint()
 	checkpoint := srv.latestLocalCheckpoint()

+ 5 - 1
les/test_helper.go

@@ -307,7 +307,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
 	}
 	}
 	server.costTracker, server.minCapacity = newCostTracker(db, server.config)
 	server.costTracker, server.minCapacity = newCostTracker(db, server.config)
 	server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
 	server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
-	server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {})
+	server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}, alwaysTrueFn)
 	server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
 	server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
 	server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
 	server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
 	if server.oracle != nil {
 	if server.oracle != nil {
@@ -319,6 +319,10 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
 	return server.handler, simulation
 	return server.handler, simulation
 }
 }
 
 
+func alwaysTrueFn() bool {
+	return true
+}
+
 // testPeer is a simulated peer to allow testing direct network calls.
 // testPeer is a simulated peer to allow testing direct network calls.
 type testPeer struct {
 type testPeer struct {
 	cpeer *clientPeer
 	cpeer *clientPeer

+ 56 - 47
les/vflux/client/serverpool.go

@@ -47,7 +47,8 @@ const (
 	nodeWeightThreshold = 100                    // minimum weight for keeping a node in the the known (valuable) set
 	nodeWeightThreshold = 100                    // minimum weight for keeping a node in the the known (valuable) set
 	minRedialWait       = 10                     // minimum redial wait time in seconds
 	minRedialWait       = 10                     // minimum redial wait time in seconds
 	preNegLimit         = 5                      // maximum number of simultaneous pre-negotiation queries
 	preNegLimit         = 5                      // maximum number of simultaneous pre-negotiation queries
-	maxQueryFails       = 100                    // number of consecutive UDP query failures before we print a warning
+	warnQueryFails      = 20                     // number of consecutive UDP query failures before we print a warning
+	maxQueryFails       = 100                    // number of consecutive UDP query failures when then chance of skipping a query reaches 50%
 )
 )
 
 
 // ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered
 // ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered
@@ -94,16 +95,16 @@ type nodeHistoryEnc struct {
 type QueryFunc func(*enode.Node) int
 type QueryFunc func(*enode.Node) int
 
 
 var (
 var (
-	clientSetup        = &nodestate.Setup{Version: 2}
-	sfHasValue         = clientSetup.NewPersistentFlag("hasValue")
-	sfQueried          = clientSetup.NewFlag("queried")
-	sfCanDial          = clientSetup.NewFlag("canDial")
-	sfDialing          = clientSetup.NewFlag("dialed")
-	sfWaitDialTimeout  = clientSetup.NewFlag("dialTimeout")
-	sfConnected        = clientSetup.NewFlag("connected")
-	sfRedialWait       = clientSetup.NewFlag("redialWait")
-	sfAlwaysConnect    = clientSetup.NewFlag("alwaysConnect")
-	sfDisableSelection = nodestate.MergeFlags(sfQueried, sfCanDial, sfDialing, sfConnected, sfRedialWait)
+	clientSetup       = &nodestate.Setup{Version: 2}
+	sfHasValue        = clientSetup.NewPersistentFlag("hasValue")
+	sfQuery           = clientSetup.NewFlag("query")
+	sfCanDial         = clientSetup.NewFlag("canDial")
+	sfDialing         = clientSetup.NewFlag("dialed")
+	sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout")
+	sfConnected       = clientSetup.NewFlag("connected")
+	sfRedialWait      = clientSetup.NewFlag("redialWait")
+	sfAlwaysConnect   = clientSetup.NewFlag("alwaysConnect")
+	sfDialProcess     = nodestate.MergeFlags(sfQuery, sfCanDial, sfDialing, sfConnected, sfRedialWait)
 
 
 	sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
 	sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
 		func(field interface{}) ([]byte, error) {
 		func(field interface{}) ([]byte, error) {
@@ -162,8 +163,8 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio
 	}
 	}
 	s.recalTimeout()
 	s.recalTimeout()
 	s.mixer = enode.NewFairMix(mixTimeout)
 	s.mixer = enode.NewFairMix(mixTimeout)
-	knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDisableSelection, sfiNodeWeight)
-	alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil)
+	knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDialProcess, sfiNodeWeight)
+	alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDialProcess, true, nil)
 	s.mixSources = append(s.mixSources, knownSelector)
 	s.mixSources = append(s.mixSources, knownSelector)
 	s.mixSources = append(s.mixSources, alwaysConnect)
 	s.mixSources = append(s.mixSources, alwaysConnect)
 
 
@@ -226,7 +227,7 @@ func (s *ServerPool) AddMetrics(
 	s.totalValueGauge = totalValueGauge
 	s.totalValueGauge = totalValueGauge
 	s.sessionValueMeter = sessionValueMeter
 	s.sessionValueMeter = sessionValueMeter
 	if serverSelectableGauge != nil {
 	if serverSelectableGauge != nil {
-		s.ns.AddLogMetrics(sfHasValue, sfDisableSelection, "selectable", nil, nil, serverSelectableGauge)
+		s.ns.AddLogMetrics(sfHasValue, sfDialProcess, "selectable", nil, nil, serverSelectableGauge)
 	}
 	}
 	if serverDialedMeter != nil {
 	if serverDialedMeter != nil {
 		s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil)
 		s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil)
@@ -247,43 +248,51 @@ func (s *ServerPool) AddSource(source enode.Iterator) {
 // Nodes that are filtered out and does not appear on the output iterator are put back
 // Nodes that are filtered out and does not appear on the output iterator are put back
 // into redialWait state.
 // into redialWait state.
 func (s *ServerPool) addPreNegFilter(input enode.Iterator, query QueryFunc) enode.Iterator {
 func (s *ServerPool) addPreNegFilter(input enode.Iterator, query QueryFunc) enode.Iterator {
-	s.fillSet = NewFillSet(s.ns, input, sfQueried)
-	s.ns.SubscribeState(sfQueried, func(n *enode.Node, oldState, newState nodestate.Flags) {
-		if newState.Equals(sfQueried) {
-			fails := atomic.LoadUint32(&s.queryFails)
-			if fails == maxQueryFails {
-				log.Warn("UDP pre-negotiation query does not seem to work")
+	s.fillSet = NewFillSet(s.ns, input, sfQuery)
+	s.ns.SubscribeState(sfDialProcess, func(n *enode.Node, oldState, newState nodestate.Flags) {
+		if !newState.Equals(sfQuery) {
+			if newState.HasAll(sfQuery) {
+				// remove query flag if the node is already somewhere in the dial process
+				s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
 			}
 			}
-			if fails > maxQueryFails {
-				fails = maxQueryFails
-			}
-			if rand.Intn(maxQueryFails*2) < int(fails) {
-				// skip pre-negotiation with increasing chance, max 50%
-				// this ensures that the client can operate even if UDP is not working at all
-				s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
-				// set canDial before resetting queried so that FillSet will not read more
-				// candidates unnecessarily
-				s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
-				return
+			return
+		}
+		fails := atomic.LoadUint32(&s.queryFails)
+		failMax := fails
+		if failMax > maxQueryFails {
+			failMax = maxQueryFails
+		}
+		if rand.Intn(maxQueryFails*2) < int(failMax) {
+			// skip pre-negotiation with increasing chance, max 50%
+			// this ensures that the client can operate even if UDP is not working at all
+			s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
+			// set canDial before resetting queried so that FillSet will not read more
+			// candidates unnecessarily
+			s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
+			return
+		}
+		go func() {
+			q := query(n)
+			if q == -1 {
+				atomic.AddUint32(&s.queryFails, 1)
+				fails++
+				if fails%warnQueryFails == 0 {
+					// warn if a large number of consecutive queries have failed
+					log.Warn("UDP connection queries failed", "count", fails)
+				}
+			} else {
+				atomic.StoreUint32(&s.queryFails, 0)
 			}
 			}
-			go func() {
-				q := query(n)
-				if q == -1 {
-					atomic.AddUint32(&s.queryFails, 1)
+			s.ns.Operation(func() {
+				// we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
+				if q == 1 {
+					s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
 				} else {
 				} else {
-					atomic.StoreUint32(&s.queryFails, 0)
+					s.setRedialWait(n, queryCost, queryWaitStep)
 				}
 				}
-				s.ns.Operation(func() {
-					// we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
-					if q == 1 {
-						s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
-					} else {
-						s.setRedialWait(n, queryCost, queryWaitStep)
-					}
-					s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
-				})
-			}()
-		}
+				s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
+			})
+		}()
 	})
 	})
 	return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) {
 	return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) {
 		if waiting {
 		if waiting {