serverpool.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package client
  17. import (
  18. "errors"
  19. "math/rand"
  20. "reflect"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common/mclock"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/les/utils"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. "github.com/ethereum/go-ethereum/p2p/enode"
  30. "github.com/ethereum/go-ethereum/p2p/enr"
  31. "github.com/ethereum/go-ethereum/p2p/nodestate"
  32. "github.com/ethereum/go-ethereum/rlp"
  33. )
  34. const (
  35. minTimeout = time.Millisecond * 500 // minimum request timeout suggested by the server pool
  36. timeoutRefresh = time.Second * 5 // recalculate timeout if older than this
  37. dialCost = 10000 // cost of a TCP dial (used for known node selection weight calculation)
  38. dialWaitStep = 1.5 // exponential multiplier of redial wait time when no value was provided by the server
  39. queryCost = 500 // cost of a UDP pre-negotiation query
  40. queryWaitStep = 1.02 // exponential multiplier of redial wait time when no value was provided by the server
  41. waitThreshold = time.Hour * 2000 // drop node if waiting time is over the threshold
  42. nodeWeightMul = 1000000 // multiplier constant for node weight calculation
  43. nodeWeightThreshold = 100 // minimum weight for keeping a node in the the known (valuable) set
  44. minRedialWait = 10 // minimum redial wait time in seconds
  45. preNegLimit = 5 // maximum number of simultaneous pre-negotiation queries
  46. maxQueryFails = 100 // number of consecutive UDP query failures before we print a warning
  47. )
  48. // ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered
  49. // nodes, a weighted random selection of known (previously valuable) nodes and trusted/paid nodes.
  50. type ServerPool struct {
  51. clock mclock.Clock
  52. unixTime func() int64
  53. db ethdb.KeyValueStore
  54. ns *nodestate.NodeStateMachine
  55. vt *ValueTracker
  56. mixer *enode.FairMix
  57. mixSources []enode.Iterator
  58. dialIterator enode.Iterator
  59. validSchemes enr.IdentityScheme
  60. trustedURLs []string
  61. fillSet *FillSet
  62. started, queryFails uint32
  63. timeoutLock sync.RWMutex
  64. timeout time.Duration
  65. timeWeights ResponseTimeWeights
  66. timeoutRefreshed mclock.AbsTime
  67. suggestedTimeoutGauge, totalValueGauge metrics.Gauge
  68. sessionValueMeter metrics.Meter
  69. }
  70. // nodeHistory keeps track of dial costs which determine node weight together with the
  71. // service value calculated by ValueTracker.
  72. type nodeHistory struct {
  73. dialCost utils.ExpiredValue
  74. redialWaitStart, redialWaitEnd int64 // unix time (seconds)
  75. }
  76. type nodeHistoryEnc struct {
  77. DialCost utils.ExpiredValue
  78. RedialWaitStart, RedialWaitEnd uint64
  79. }
  80. // queryFunc sends a pre-negotiation query and blocks until a response arrives or timeout occurs.
  81. // It returns 1 if the remote node has confirmed that connection is possible, 0 if not
  82. // possible and -1 if no response arrived (timeout).
  83. type queryFunc func(*enode.Node) int
  84. var (
  85. clientSetup = &nodestate.Setup{Version: 2}
  86. sfHasValue = clientSetup.NewPersistentFlag("hasValue")
  87. sfQueried = clientSetup.NewFlag("queried")
  88. sfCanDial = clientSetup.NewFlag("canDial")
  89. sfDialing = clientSetup.NewFlag("dialed")
  90. sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout")
  91. sfConnected = clientSetup.NewFlag("connected")
  92. sfRedialWait = clientSetup.NewFlag("redialWait")
  93. sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect")
  94. sfDisableSelection = nodestate.MergeFlags(sfQueried, sfCanDial, sfDialing, sfConnected, sfRedialWait)
  95. sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
  96. func(field interface{}) ([]byte, error) {
  97. if n, ok := field.(nodeHistory); ok {
  98. ne := nodeHistoryEnc{
  99. DialCost: n.dialCost,
  100. RedialWaitStart: uint64(n.redialWaitStart),
  101. RedialWaitEnd: uint64(n.redialWaitEnd),
  102. }
  103. enc, err := rlp.EncodeToBytes(&ne)
  104. return enc, err
  105. }
  106. return nil, errors.New("invalid field type")
  107. },
  108. func(enc []byte) (interface{}, error) {
  109. var ne nodeHistoryEnc
  110. err := rlp.DecodeBytes(enc, &ne)
  111. n := nodeHistory{
  112. dialCost: ne.DialCost,
  113. redialWaitStart: int64(ne.RedialWaitStart),
  114. redialWaitEnd: int64(ne.RedialWaitEnd),
  115. }
  116. return n, err
  117. },
  118. )
  119. sfiNodeWeight = clientSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
  120. sfiConnectedStats = clientSetup.NewField("connectedStats", reflect.TypeOf(ResponseTimeStats{}))
  121. sfiLocalAddress = clientSetup.NewPersistentField("localAddress", reflect.TypeOf(&enr.Record{}),
  122. func(field interface{}) ([]byte, error) {
  123. if enr, ok := field.(*enr.Record); ok {
  124. enc, err := rlp.EncodeToBytes(enr)
  125. return enc, err
  126. }
  127. return nil, errors.New("invalid field type")
  128. },
  129. func(enc []byte) (interface{}, error) {
  130. var enr enr.Record
  131. if err := rlp.DecodeBytes(enc, &enr); err != nil {
  132. return nil, err
  133. }
  134. return &enr, nil
  135. },
  136. )
  137. )
  138. // NewServerPool creates a new server pool
  139. func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator) {
  140. s := &ServerPool{
  141. db: db,
  142. clock: clock,
  143. unixTime: func() int64 { return time.Now().Unix() },
  144. validSchemes: enode.ValidSchemes,
  145. trustedURLs: trustedURLs,
  146. vt: NewValueTracker(db, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
  147. ns: nodestate.NewNodeStateMachine(db, []byte(string(dbKey)+"ns:"), clock, clientSetup),
  148. }
  149. s.recalTimeout()
  150. s.mixer = enode.NewFairMix(mixTimeout)
  151. knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDisableSelection, sfiNodeWeight)
  152. alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil)
  153. s.mixSources = append(s.mixSources, knownSelector)
  154. s.mixSources = append(s.mixSources, alwaysConnect)
  155. s.dialIterator = s.mixer
  156. if query != nil {
  157. s.dialIterator = s.addPreNegFilter(s.dialIterator, query)
  158. }
  159. s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) {
  160. if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
  161. // dial timeout, no connection
  162. s.setRedialWait(n, dialCost, dialWaitStep)
  163. s.ns.SetStateSub(n, nodestate.Flags{}, sfDialing, 0)
  164. }
  165. })
  166. return s, &serverPoolIterator{
  167. dialIterator: s.dialIterator,
  168. nextFn: func(node *enode.Node) {
  169. s.ns.Operation(func() {
  170. s.ns.SetStateSub(node, sfDialing, sfCanDial, 0)
  171. s.ns.SetStateSub(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
  172. })
  173. },
  174. nodeFn: s.DialNode,
  175. }
  176. }
  177. type serverPoolIterator struct {
  178. dialIterator enode.Iterator
  179. nextFn func(*enode.Node)
  180. nodeFn func(*enode.Node) *enode.Node
  181. }
  182. // Next implements enode.Iterator
  183. func (s *serverPoolIterator) Next() bool {
  184. if s.dialIterator.Next() {
  185. s.nextFn(s.dialIterator.Node())
  186. return true
  187. }
  188. return false
  189. }
  190. // Node implements enode.Iterator
  191. func (s *serverPoolIterator) Node() *enode.Node {
  192. return s.nodeFn(s.dialIterator.Node())
  193. }
  194. // Close implements enode.Iterator
  195. func (s *serverPoolIterator) Close() {
  196. s.dialIterator.Close()
  197. }
  198. // AddMetrics adds metrics to the server pool. Should be called before Start().
  199. func (s *ServerPool) AddMetrics(
  200. suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge metrics.Gauge,
  201. sessionValueMeter, serverDialedMeter metrics.Meter) {
  202. s.suggestedTimeoutGauge = suggestedTimeoutGauge
  203. s.totalValueGauge = totalValueGauge
  204. s.sessionValueMeter = sessionValueMeter
  205. if serverSelectableGauge != nil {
  206. s.ns.AddLogMetrics(sfHasValue, sfDisableSelection, "selectable", nil, nil, serverSelectableGauge)
  207. }
  208. if serverDialedMeter != nil {
  209. s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil)
  210. }
  211. if serverConnectedGauge != nil {
  212. s.ns.AddLogMetrics(sfConnected, nodestate.Flags{}, "connected", nil, nil, serverConnectedGauge)
  213. }
  214. }
  215. // AddSource adds a node discovery source to the server pool (should be called before start)
  216. func (s *ServerPool) AddSource(source enode.Iterator) {
  217. if source != nil {
  218. s.mixSources = append(s.mixSources, source)
  219. }
  220. }
  221. // addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
  222. // Nodes that are filtered out and does not appear on the output iterator are put back
  223. // into redialWait state.
  224. func (s *ServerPool) addPreNegFilter(input enode.Iterator, query queryFunc) enode.Iterator {
  225. s.fillSet = NewFillSet(s.ns, input, sfQueried)
  226. s.ns.SubscribeState(sfQueried, func(n *enode.Node, oldState, newState nodestate.Flags) {
  227. if newState.Equals(sfQueried) {
  228. fails := atomic.LoadUint32(&s.queryFails)
  229. if fails == maxQueryFails {
  230. log.Warn("UDP pre-negotiation query does not seem to work")
  231. }
  232. if fails > maxQueryFails {
  233. fails = maxQueryFails
  234. }
  235. if rand.Intn(maxQueryFails*2) < int(fails) {
  236. // skip pre-negotiation with increasing chance, max 50%
  237. // this ensures that the client can operate even if UDP is not working at all
  238. s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
  239. // set canDial before resetting queried so that FillSet will not read more
  240. // candidates unnecessarily
  241. s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
  242. return
  243. }
  244. go func() {
  245. q := query(n)
  246. if q == -1 {
  247. atomic.AddUint32(&s.queryFails, 1)
  248. } else {
  249. atomic.StoreUint32(&s.queryFails, 0)
  250. }
  251. s.ns.Operation(func() {
  252. // we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
  253. if q == 1 {
  254. s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
  255. } else {
  256. s.setRedialWait(n, queryCost, queryWaitStep)
  257. }
  258. s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
  259. })
  260. }()
  261. }
  262. })
  263. return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) {
  264. if waiting {
  265. s.fillSet.SetTarget(preNegLimit)
  266. } else {
  267. s.fillSet.SetTarget(0)
  268. }
  269. })
  270. }
  271. // start starts the server pool. Note that NodeStateMachine should be started first.
  272. func (s *ServerPool) Start() {
  273. s.ns.Start()
  274. for _, iter := range s.mixSources {
  275. // add sources to mixer at startup because the mixer instantly tries to read them
  276. // which should only happen after NodeStateMachine has been started
  277. s.mixer.AddSource(iter)
  278. }
  279. for _, url := range s.trustedURLs {
  280. if node, err := enode.Parse(s.validSchemes, url); err == nil {
  281. s.ns.SetState(node, sfAlwaysConnect, nodestate.Flags{}, 0)
  282. } else {
  283. log.Error("Invalid trusted server URL", "url", url, "error", err)
  284. }
  285. }
  286. unixTime := s.unixTime()
  287. s.ns.Operation(func() {
  288. s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  289. s.calculateWeight(node)
  290. if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.redialWaitEnd > unixTime {
  291. wait := n.redialWaitEnd - unixTime
  292. lastWait := n.redialWaitEnd - n.redialWaitStart
  293. if wait > lastWait {
  294. // if the time until expiration is larger than the last suggested
  295. // waiting time then the system clock was probably adjusted
  296. wait = lastWait
  297. }
  298. s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, time.Duration(wait)*time.Second)
  299. }
  300. })
  301. })
  302. atomic.StoreUint32(&s.started, 1)
  303. }
  304. // stop stops the server pool
  305. func (s *ServerPool) Stop() {
  306. if s.fillSet != nil {
  307. s.fillSet.Close()
  308. }
  309. s.ns.Operation(func() {
  310. s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
  311. // recalculate weight of connected nodes in order to update hasValue flag if necessary
  312. s.calculateWeight(n)
  313. })
  314. })
  315. s.ns.Stop()
  316. s.vt.Stop()
  317. }
  318. // RegisterNode implements serverPeerSubscriber
  319. func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error) {
  320. if atomic.LoadUint32(&s.started) == 0 {
  321. return nil, errors.New("server pool not started yet")
  322. }
  323. nvt := s.vt.Register(node.ID())
  324. s.ns.Operation(func() {
  325. s.ns.SetStateSub(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
  326. s.ns.SetFieldSub(node, sfiConnectedStats, nvt.RtStats())
  327. if node.IP().IsLoopback() {
  328. s.ns.SetFieldSub(node, sfiLocalAddress, node.Record())
  329. }
  330. })
  331. return nvt, nil
  332. }
  333. // UnregisterNode implements serverPeerSubscriber
  334. func (s *ServerPool) UnregisterNode(node *enode.Node) {
  335. s.ns.Operation(func() {
  336. s.setRedialWait(node, dialCost, dialWaitStep)
  337. s.ns.SetStateSub(node, nodestate.Flags{}, sfConnected, 0)
  338. s.ns.SetFieldSub(node, sfiConnectedStats, nil)
  339. })
  340. s.vt.Unregister(node.ID())
  341. }
  342. // recalTimeout calculates the current recommended timeout. This value is used by
  343. // the client as a "soft timeout" value. It also affects the service value calculation
  344. // of individual nodes.
  345. func (s *ServerPool) recalTimeout() {
  346. // Use cached result if possible, avoid recalculating too frequently.
  347. s.timeoutLock.RLock()
  348. refreshed := s.timeoutRefreshed
  349. s.timeoutLock.RUnlock()
  350. now := s.clock.Now()
  351. if refreshed != 0 && time.Duration(now-refreshed) < timeoutRefresh {
  352. return
  353. }
  354. // Cached result is stale, recalculate a new one.
  355. rts := s.vt.RtStats()
  356. // Add a fake statistic here. It is an easy way to initialize with some
  357. // conservative values when the database is new. As soon as we have a
  358. // considerable amount of real stats this small value won't matter.
  359. rts.Add(time.Second*2, 10, s.vt.StatsExpFactor())
  360. // Use either 10% failure rate timeout or twice the median response time
  361. // as the recommended timeout.
  362. timeout := minTimeout
  363. if t := rts.Timeout(0.1); t > timeout {
  364. timeout = t
  365. }
  366. if t := rts.Timeout(0.5) * 2; t > timeout {
  367. timeout = t
  368. }
  369. s.timeoutLock.Lock()
  370. if s.timeout != timeout {
  371. s.timeout = timeout
  372. s.timeWeights = TimeoutWeights(s.timeout)
  373. if s.suggestedTimeoutGauge != nil {
  374. s.suggestedTimeoutGauge.Update(int64(s.timeout / time.Millisecond))
  375. }
  376. if s.totalValueGauge != nil {
  377. s.totalValueGauge.Update(int64(rts.Value(s.timeWeights, s.vt.StatsExpFactor())))
  378. }
  379. }
  380. s.timeoutRefreshed = now
  381. s.timeoutLock.Unlock()
  382. }
  383. // GetTimeout returns the recommended request timeout.
  384. func (s *ServerPool) GetTimeout() time.Duration {
  385. s.recalTimeout()
  386. s.timeoutLock.RLock()
  387. defer s.timeoutLock.RUnlock()
  388. return s.timeout
  389. }
  390. // getTimeoutAndWeight returns the recommended request timeout as well as the
  391. // response time weight which is necessary to calculate service value.
  392. func (s *ServerPool) getTimeoutAndWeight() (time.Duration, ResponseTimeWeights) {
  393. s.recalTimeout()
  394. s.timeoutLock.RLock()
  395. defer s.timeoutLock.RUnlock()
  396. return s.timeout, s.timeWeights
  397. }
  398. // addDialCost adds the given amount of dial cost to the node history and returns the current
  399. // amount of total dial cost
  400. func (s *ServerPool) addDialCost(n *nodeHistory, amount int64) uint64 {
  401. logOffset := s.vt.StatsExpirer().LogOffset(s.clock.Now())
  402. if amount > 0 {
  403. n.dialCost.Add(amount, logOffset)
  404. }
  405. totalDialCost := n.dialCost.Value(logOffset)
  406. if totalDialCost < dialCost {
  407. totalDialCost = dialCost
  408. }
  409. return totalDialCost
  410. }
  411. // serviceValue returns the service value accumulated in this session and in total
  412. func (s *ServerPool) serviceValue(node *enode.Node) (sessionValue, totalValue float64) {
  413. nvt := s.vt.GetNode(node.ID())
  414. if nvt == nil {
  415. return 0, 0
  416. }
  417. currentStats := nvt.RtStats()
  418. _, timeWeights := s.getTimeoutAndWeight()
  419. expFactor := s.vt.StatsExpFactor()
  420. totalValue = currentStats.Value(timeWeights, expFactor)
  421. if connStats, ok := s.ns.GetField(node, sfiConnectedStats).(ResponseTimeStats); ok {
  422. diff := currentStats
  423. diff.SubStats(&connStats)
  424. sessionValue = diff.Value(timeWeights, expFactor)
  425. if s.sessionValueMeter != nil {
  426. s.sessionValueMeter.Mark(int64(sessionValue))
  427. }
  428. }
  429. return
  430. }
  431. // updateWeight calculates the node weight and updates the nodeWeight field and the
  432. // hasValue flag. It also saves the node state if necessary.
  433. // Note: this function should run inside a NodeStateMachine operation
  434. func (s *ServerPool) updateWeight(node *enode.Node, totalValue float64, totalDialCost uint64) {
  435. weight := uint64(totalValue * nodeWeightMul / float64(totalDialCost))
  436. if weight >= nodeWeightThreshold {
  437. s.ns.SetStateSub(node, sfHasValue, nodestate.Flags{}, 0)
  438. s.ns.SetFieldSub(node, sfiNodeWeight, weight)
  439. } else {
  440. s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
  441. s.ns.SetFieldSub(node, sfiNodeWeight, nil)
  442. s.ns.SetFieldSub(node, sfiNodeHistory, nil)
  443. s.ns.SetFieldSub(node, sfiLocalAddress, nil)
  444. }
  445. s.ns.Persist(node) // saved if node history or hasValue changed
  446. }
  447. // setRedialWait calculates and sets the redialWait timeout based on the service value
  448. // and dial cost accumulated during the last session/attempt and in total.
  449. // The waiting time is raised exponentially if no service value has been received in order
  450. // to prevent dialing an unresponsive node frequently for a very long time just because it
  451. // was useful in the past. It can still be occasionally dialed though and once it provides
  452. // a significant amount of service value again its waiting time is quickly reduced or reset
  453. // to the minimum.
  454. // Note: node weight is also recalculated and updated by this function.
  455. // Note 2: this function should run inside a NodeStateMachine operation
  456. func (s *ServerPool) setRedialWait(node *enode.Node, addDialCost int64, waitStep float64) {
  457. n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
  458. sessionValue, totalValue := s.serviceValue(node)
  459. totalDialCost := s.addDialCost(&n, addDialCost)
  460. // if the current dial session has yielded at least the average value/dial cost ratio
  461. // then the waiting time should be reset to the minimum. If the session value
  462. // is below average but still positive then timeout is limited to the ratio of
  463. // average / current service value multiplied by the minimum timeout. If the attempt
  464. // was unsuccessful then timeout is raised exponentially without limitation.
  465. // Note: dialCost is used in the formula below even if dial was not attempted at all
  466. // because the pre-negotiation query did not return a positive result. In this case
  467. // the ratio has no meaning anyway and waitFactor is always raised, though in smaller
  468. // steps because queries are cheaper and therefore we can allow more failed attempts.
  469. unixTime := s.unixTime()
  470. plannedTimeout := float64(n.redialWaitEnd - n.redialWaitStart) // last planned redialWait timeout
  471. var actualWait float64 // actual waiting time elapsed
  472. if unixTime > n.redialWaitEnd {
  473. // the planned timeout has elapsed
  474. actualWait = plannedTimeout
  475. } else {
  476. // if the node was redialed earlier then we do not raise the planned timeout
  477. // exponentially because that could lead to the timeout rising very high in
  478. // a short amount of time
  479. // Note that in case of an early redial actualWait also includes the dial
  480. // timeout or connection time of the last attempt but it still serves its
  481. // purpose of preventing the timeout rising quicker than linearly as a function
  482. // of total time elapsed without a successful connection.
  483. actualWait = float64(unixTime - n.redialWaitStart)
  484. }
  485. // raise timeout exponentially if the last planned timeout has elapsed
  486. // (use at least the last planned timeout otherwise)
  487. nextTimeout := actualWait * waitStep
  488. if plannedTimeout > nextTimeout {
  489. nextTimeout = plannedTimeout
  490. }
  491. // we reduce the waiting time if the server has provided service value during the
  492. // connection (but never under the minimum)
  493. a := totalValue * dialCost * float64(minRedialWait)
  494. b := float64(totalDialCost) * sessionValue
  495. if a < b*nextTimeout {
  496. nextTimeout = a / b
  497. }
  498. if nextTimeout < minRedialWait {
  499. nextTimeout = minRedialWait
  500. }
  501. wait := time.Duration(float64(time.Second) * nextTimeout)
  502. if wait < waitThreshold {
  503. n.redialWaitStart = unixTime
  504. n.redialWaitEnd = unixTime + int64(nextTimeout)
  505. s.ns.SetFieldSub(node, sfiNodeHistory, n)
  506. s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, wait)
  507. s.updateWeight(node, totalValue, totalDialCost)
  508. } else {
  509. // discard known node statistics if waiting time is very long because the node
  510. // hasn't been responsive for a very long time
  511. s.ns.SetFieldSub(node, sfiNodeHistory, nil)
  512. s.ns.SetFieldSub(node, sfiNodeWeight, nil)
  513. s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
  514. }
  515. }
  516. // calculateWeight calculates and sets the node weight without altering the node history.
  517. // This function should be called during startup and shutdown only, otherwise setRedialWait
  518. // will keep the weights updated as the underlying statistics are adjusted.
  519. // Note: this function should run inside a NodeStateMachine operation
  520. func (s *ServerPool) calculateWeight(node *enode.Node) {
  521. n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
  522. _, totalValue := s.serviceValue(node)
  523. totalDialCost := s.addDialCost(&n, 0)
  524. s.updateWeight(node, totalValue, totalDialCost)
  525. }
  526. // API returns the vflux client API
  527. func (s *ServerPool) API() *PrivateClientAPI {
  528. return NewPrivateClientAPI(s.vt)
  529. }
  530. type dummyIdentity enode.ID
  531. func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil }
  532. func (id dummyIdentity) NodeAddr(r *enr.Record) []byte { return id[:] }
  533. // DialNode replaces the given enode with a locally generated one containing the ENR
  534. // stored in the sfiLocalAddress field if present. This workaround ensures that nodes
  535. // on the local network can be dialed at the local address if a connection has been
  536. // successfully established previously.
  537. // Note that NodeStateMachine always remembers the enode with the latest version of
  538. // the remote signed ENR. ENR filtering should be performed on that version while
  539. // dialNode should be used for dialing the node over TCP or UDP.
  540. func (s *ServerPool) DialNode(n *enode.Node) *enode.Node {
  541. if enr, ok := s.ns.GetField(n, sfiLocalAddress).(*enr.Record); ok {
  542. n, _ := enode.New(dummyIdentity(n.ID()), enr)
  543. return n
  544. }
  545. return n
  546. }
  547. // Persist immediately stores the state of a node in the node database
  548. func (s *ServerPool) Persist(n *enode.Node) {
  549. s.ns.Persist(n)
  550. }