serverpool.go 25 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "io"
  20. "math"
  21. "math/rand"
  22. "net"
  23. "strconv"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/ethdb"
  28. "github.com/ethereum/go-ethereum/logger"
  29. "github.com/ethereum/go-ethereum/logger/glog"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/p2p/discover"
  32. "github.com/ethereum/go-ethereum/p2p/discv5"
  33. "github.com/ethereum/go-ethereum/rlp"
  34. )
  35. const (
  36. // After a connection has been ended or timed out, there is a waiting period
  37. // before it can be selected for connection again.
  38. // waiting period = base delay * (1 + random(1))
  39. // base delay = shortRetryDelay for the first shortRetryCnt times after a
  40. // successful connection, after that longRetryDelay is applied
  41. shortRetryCnt = 5
  42. shortRetryDelay = time.Second * 5
  43. longRetryDelay = time.Minute * 10
  44. // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
  45. // If the limit is reached, the least recently discovered one is thrown out.
  46. maxNewEntries = 1000
  47. // maxKnownEntries is the maximum number of known (already connected) nodes.
  48. // If the limit is reached, the least recently connected one is thrown out.
  49. // (not that unlike new entries, known entries are persistent)
  50. maxKnownEntries = 1000
  51. // target for simultaneously connected servers
  52. targetServerCount = 5
  53. // target for servers selected from the known table
  54. // (we leave room for trying new ones if there is any)
  55. targetKnownSelect = 3
  56. // after dialTimeout, consider the server unavailable and adjust statistics
  57. dialTimeout = time.Second * 30
  58. // targetConnTime is the minimum expected connection duration before a server
  59. // drops a client without any specific reason
  60. targetConnTime = time.Minute * 10
  61. // new entry selection weight calculation based on most recent discovery time:
  62. // unity until discoverExpireStart, then exponential decay with discoverExpireConst
  63. discoverExpireStart = time.Minute * 20
  64. discoverExpireConst = time.Minute * 20
  65. // known entry selection weight is dropped by a factor of exp(-failDropLn) after
  66. // each unsuccessful connection (restored after a successful one)
  67. failDropLn = 0.1
  68. // known node connection success and quality statistics have a long term average
  69. // and a short term value which is adjusted exponentially with a factor of
  70. // pstatRecentAdjust with each dial/connection and also returned exponentially
  71. // to the average with the time constant pstatReturnToMeanTC
  72. pstatRecentAdjust = 0.1
  73. pstatReturnToMeanTC = time.Hour
  74. // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
  75. // each unsuccessful connection (restored after a successful one)
  76. addrFailDropLn = math.Ln2
  77. // responseScoreTC and delayScoreTC are exponential decay time constants for
  78. // calculating selection chances from response times and block delay times
  79. responseScoreTC = time.Millisecond * 100
  80. delayScoreTC = time.Second * 5
  81. timeoutPow = 10
  82. // peerSelectMinWeight is added to calculated weights at request peer selection
  83. // to give poorly performing peers a little chance of coming back
  84. peerSelectMinWeight = 0.005
  85. // initStatsWeight is used to initialize previously unknown peers with good
  86. // statistics to give a chance to prove themselves
  87. initStatsWeight = 1
  88. )
  89. // serverPool implements a pool for storing and selecting newly discovered and already
  90. // known light server nodes. It received discovered nodes, stores statistics about
  91. // known nodes and takes care of always having enough good quality servers connected.
  92. type serverPool struct {
  93. db ethdb.Database
  94. dbKey []byte
  95. server *p2p.Server
  96. quit chan struct{}
  97. wg *sync.WaitGroup
  98. connWg sync.WaitGroup
  99. discSetPeriod chan time.Duration
  100. discNodes chan *discv5.Node
  101. discLookups chan bool
  102. entries map[discover.NodeID]*poolEntry
  103. lock sync.Mutex
  104. timeout, enableRetry chan *poolEntry
  105. adjustStats chan poolStatAdjust
  106. knownQueue, newQueue poolEntryQueue
  107. knownSelect, newSelect *weightedRandomSelect
  108. knownSelected, newSelected int
  109. fastDiscover bool
  110. }
  111. // newServerPool creates a new serverPool instance
  112. func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
  113. pool := &serverPool{
  114. db: db,
  115. dbKey: append(dbPrefix, []byte(topic)...),
  116. server: server,
  117. quit: quit,
  118. wg: wg,
  119. entries: make(map[discover.NodeID]*poolEntry),
  120. timeout: make(chan *poolEntry, 1),
  121. adjustStats: make(chan poolStatAdjust, 100),
  122. enableRetry: make(chan *poolEntry, 1),
  123. knownSelect: newWeightedRandomSelect(),
  124. newSelect: newWeightedRandomSelect(),
  125. fastDiscover: true,
  126. }
  127. pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
  128. pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
  129. wg.Add(1)
  130. pool.loadNodes()
  131. pool.checkDial()
  132. if pool.server.DiscV5 != nil {
  133. pool.discSetPeriod = make(chan time.Duration, 1)
  134. pool.discNodes = make(chan *discv5.Node, 100)
  135. pool.discLookups = make(chan bool, 100)
  136. go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
  137. }
  138. go pool.eventLoop()
  139. return pool
  140. }
  141. // connect should be called upon any incoming connection. If the connection has been
  142. // dialed by the server pool recently, the appropriate pool entry is returned.
  143. // Otherwise, the connection should be rejected.
  144. // Note that whenever a connection has been accepted and a pool entry has been returned,
  145. // disconnect should also always be called.
  146. func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
  147. pool.lock.Lock()
  148. defer pool.lock.Unlock()
  149. entry := pool.entries[p.ID()]
  150. if entry == nil {
  151. return nil
  152. }
  153. glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
  154. if entry.state != psDialed {
  155. return nil
  156. }
  157. pool.connWg.Add(1)
  158. entry.peer = p
  159. entry.state = psConnected
  160. addr := &poolEntryAddress{
  161. ip: ip,
  162. port: port,
  163. lastSeen: mclock.Now(),
  164. }
  165. entry.lastConnected = addr
  166. entry.addr = make(map[string]*poolEntryAddress)
  167. entry.addr[addr.strKey()] = addr
  168. entry.addrSelect = *newWeightedRandomSelect()
  169. entry.addrSelect.update(addr)
  170. return entry
  171. }
  172. // registered should be called after a successful handshake
  173. func (pool *serverPool) registered(entry *poolEntry) {
  174. glog.V(logger.Debug).Infof("registered %v", entry.id.String())
  175. pool.lock.Lock()
  176. defer pool.lock.Unlock()
  177. entry.state = psRegistered
  178. entry.regTime = mclock.Now()
  179. if !entry.known {
  180. pool.newQueue.remove(entry)
  181. entry.known = true
  182. }
  183. pool.knownQueue.setLatest(entry)
  184. entry.shortRetry = shortRetryCnt
  185. }
  186. // disconnect should be called when ending a connection. Service quality statistics
  187. // can be updated optionally (not updated if no registration happened, in this case
  188. // only connection statistics are updated, just like in case of timeout)
  189. func (pool *serverPool) disconnect(entry *poolEntry) {
  190. glog.V(logger.Debug).Infof("disconnected %v", entry.id.String())
  191. pool.lock.Lock()
  192. defer pool.lock.Unlock()
  193. if entry.state == psRegistered {
  194. connTime := mclock.Now() - entry.regTime
  195. connAdjust := float64(connTime) / float64(targetConnTime)
  196. if connAdjust > 1 {
  197. connAdjust = 1
  198. }
  199. stopped := false
  200. select {
  201. case <-pool.quit:
  202. stopped = true
  203. default:
  204. }
  205. if stopped {
  206. entry.connectStats.add(1, connAdjust)
  207. } else {
  208. entry.connectStats.add(connAdjust, 1)
  209. }
  210. }
  211. entry.state = psNotConnected
  212. if entry.knownSelected {
  213. pool.knownSelected--
  214. } else {
  215. pool.newSelected--
  216. }
  217. pool.setRetryDial(entry)
  218. pool.connWg.Done()
  219. }
  220. const (
  221. pseBlockDelay = iota
  222. pseResponseTime
  223. pseResponseTimeout
  224. )
  225. // poolStatAdjust records are sent to adjust peer block delay/response time statistics
  226. type poolStatAdjust struct {
  227. adjustType int
  228. entry *poolEntry
  229. time time.Duration
  230. }
  231. // adjustBlockDelay adjusts the block announce delay statistics of a node
  232. func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
  233. pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
  234. }
  235. // adjustResponseTime adjusts the request response time statistics of a node
  236. func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
  237. if timeout {
  238. pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
  239. } else {
  240. pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
  241. }
  242. }
  243. type selectPeerItem struct {
  244. peer *peer
  245. weight int64
  246. wait time.Duration
  247. }
  248. func (sp selectPeerItem) Weight() int64 {
  249. return sp.weight
  250. }
  251. // selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
  252. // and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
  253. // after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
  254. func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
  255. pool.lock.Lock()
  256. type selectPeer struct {
  257. peer *peer
  258. rstat, tstat float64
  259. }
  260. var list []selectPeer
  261. sel := newWeightedRandomSelect()
  262. for _, entry := range pool.entries {
  263. if entry.state == psRegistered {
  264. if !entry.peer.fcServer.IsAssigned() {
  265. list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
  266. }
  267. }
  268. }
  269. pool.lock.Unlock()
  270. for _, sp := range list {
  271. ok, wait := canSend(sp.peer)
  272. if ok {
  273. w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
  274. sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
  275. }
  276. }
  277. choice := sel.choose()
  278. if choice == nil {
  279. return nil, 0, false
  280. }
  281. peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
  282. locked := false
  283. if wait < time.Millisecond*100 {
  284. if peer.fcServer.AssignRequest(reqID) {
  285. ok, w := canSend(peer)
  286. wait = time.Duration(w)
  287. if ok && wait < time.Millisecond*100 {
  288. locked = true
  289. } else {
  290. peer.fcServer.DeassignRequest(reqID)
  291. wait = time.Millisecond * 100
  292. }
  293. }
  294. } else {
  295. wait = time.Millisecond * 100
  296. }
  297. return peer, wait, locked
  298. }
  299. // selectPeer selects a suitable peer for a request, waiting until an assignment to
  300. // the request is guaranteed or the process is aborted.
  301. func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
  302. for {
  303. peer, wait, locked := pool.selectPeer(reqID, canSend)
  304. if locked {
  305. return peer
  306. }
  307. select {
  308. case <-abort:
  309. return nil
  310. case <-time.After(wait):
  311. }
  312. }
  313. }
  314. // eventLoop handles pool events and mutex locking for all internal functions
  315. func (pool *serverPool) eventLoop() {
  316. lookupCnt := 0
  317. var convTime mclock.AbsTime
  318. pool.discSetPeriod <- time.Millisecond * 100
  319. for {
  320. select {
  321. case entry := <-pool.timeout:
  322. pool.lock.Lock()
  323. if !entry.removed {
  324. pool.checkDialTimeout(entry)
  325. }
  326. pool.lock.Unlock()
  327. case entry := <-pool.enableRetry:
  328. pool.lock.Lock()
  329. if !entry.removed {
  330. entry.delayedRetry = false
  331. pool.updateCheckDial(entry)
  332. }
  333. pool.lock.Unlock()
  334. case adj := <-pool.adjustStats:
  335. pool.lock.Lock()
  336. switch adj.adjustType {
  337. case pseBlockDelay:
  338. adj.entry.delayStats.add(float64(adj.time), 1)
  339. case pseResponseTime:
  340. adj.entry.responseStats.add(float64(adj.time), 1)
  341. adj.entry.timeoutStats.add(0, 1)
  342. case pseResponseTimeout:
  343. adj.entry.timeoutStats.add(1, 1)
  344. }
  345. pool.lock.Unlock()
  346. case node := <-pool.discNodes:
  347. pool.lock.Lock()
  348. now := mclock.Now()
  349. id := discover.NodeID(node.ID)
  350. entry := pool.entries[id]
  351. if entry == nil {
  352. glog.V(logger.Debug).Infof("discovered %v", node.String())
  353. entry = &poolEntry{
  354. id: id,
  355. addr: make(map[string]*poolEntryAddress),
  356. addrSelect: *newWeightedRandomSelect(),
  357. shortRetry: shortRetryCnt,
  358. }
  359. pool.entries[id] = entry
  360. // initialize previously unknown peers with good statistics to give a chance to prove themselves
  361. entry.connectStats.add(1, initStatsWeight)
  362. entry.delayStats.add(0, initStatsWeight)
  363. entry.responseStats.add(0, initStatsWeight)
  364. entry.timeoutStats.add(0, initStatsWeight)
  365. }
  366. entry.lastDiscovered = now
  367. addr := &poolEntryAddress{
  368. ip: node.IP,
  369. port: node.TCP,
  370. }
  371. if a, ok := entry.addr[addr.strKey()]; ok {
  372. addr = a
  373. } else {
  374. entry.addr[addr.strKey()] = addr
  375. }
  376. addr.lastSeen = now
  377. entry.addrSelect.update(addr)
  378. if !entry.known {
  379. pool.newQueue.setLatest(entry)
  380. }
  381. pool.updateCheckDial(entry)
  382. pool.lock.Unlock()
  383. case conv := <-pool.discLookups:
  384. if conv {
  385. if lookupCnt == 0 {
  386. convTime = mclock.Now()
  387. }
  388. lookupCnt++
  389. if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
  390. pool.fastDiscover = false
  391. pool.discSetPeriod <- time.Minute
  392. }
  393. }
  394. case <-pool.quit:
  395. close(pool.discSetPeriod)
  396. pool.connWg.Wait()
  397. pool.saveNodes()
  398. pool.wg.Done()
  399. return
  400. }
  401. }
  402. }
  403. // loadNodes loads known nodes and their statistics from the database
  404. func (pool *serverPool) loadNodes() {
  405. enc, err := pool.db.Get(pool.dbKey)
  406. if err != nil {
  407. return
  408. }
  409. var list []*poolEntry
  410. err = rlp.DecodeBytes(enc, &list)
  411. if err != nil {
  412. glog.V(logger.Debug).Infof("node list decode error: %v", err)
  413. return
  414. }
  415. for _, e := range list {
  416. glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight)
  417. pool.entries[e.id] = e
  418. pool.knownQueue.setLatest(e)
  419. pool.knownSelect.update((*knownEntry)(e))
  420. }
  421. }
  422. // saveNodes saves known nodes and their statistics into the database. Nodes are
  423. // ordered from least to most recently connected.
  424. func (pool *serverPool) saveNodes() {
  425. list := make([]*poolEntry, len(pool.knownQueue.queue))
  426. for i := range list {
  427. list[i] = pool.knownQueue.fetchOldest()
  428. }
  429. enc, err := rlp.EncodeToBytes(list)
  430. if err == nil {
  431. pool.db.Put(pool.dbKey, enc)
  432. }
  433. }
  434. // removeEntry removes a pool entry when the entry count limit is reached.
  435. // Note that it is called by the new/known queues from which the entry has already
  436. // been removed so removing it from the queues is not necessary.
  437. func (pool *serverPool) removeEntry(entry *poolEntry) {
  438. pool.newSelect.remove((*discoveredEntry)(entry))
  439. pool.knownSelect.remove((*knownEntry)(entry))
  440. entry.removed = true
  441. delete(pool.entries, entry.id)
  442. }
  443. // setRetryDial starts the timer which will enable dialing a certain node again
  444. func (pool *serverPool) setRetryDial(entry *poolEntry) {
  445. delay := longRetryDelay
  446. if entry.shortRetry > 0 {
  447. entry.shortRetry--
  448. delay = shortRetryDelay
  449. }
  450. delay += time.Duration(rand.Int63n(int64(delay) + 1))
  451. entry.delayedRetry = true
  452. go func() {
  453. select {
  454. case <-pool.quit:
  455. case <-time.After(delay):
  456. select {
  457. case <-pool.quit:
  458. case pool.enableRetry <- entry:
  459. }
  460. }
  461. }()
  462. }
  463. // updateCheckDial is called when an entry can potentially be dialed again. It updates
  464. // its selection weights and checks if new dials can/should be made.
  465. func (pool *serverPool) updateCheckDial(entry *poolEntry) {
  466. pool.newSelect.update((*discoveredEntry)(entry))
  467. pool.knownSelect.update((*knownEntry)(entry))
  468. pool.checkDial()
  469. }
  470. // checkDial checks if new dials can/should be made. It tries to select servers both
  471. // based on good statistics and recent discovery.
  472. func (pool *serverPool) checkDial() {
  473. fillWithKnownSelects := !pool.fastDiscover
  474. for pool.knownSelected < targetKnownSelect {
  475. entry := pool.knownSelect.choose()
  476. if entry == nil {
  477. fillWithKnownSelects = false
  478. break
  479. }
  480. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  481. }
  482. for pool.knownSelected+pool.newSelected < targetServerCount {
  483. entry := pool.newSelect.choose()
  484. if entry == nil {
  485. break
  486. }
  487. pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
  488. }
  489. if fillWithKnownSelects {
  490. // no more newly discovered nodes to select and since fast discover period
  491. // is over, we probably won't find more in the near future so select more
  492. // known entries if possible
  493. for pool.knownSelected < targetServerCount {
  494. entry := pool.knownSelect.choose()
  495. if entry == nil {
  496. break
  497. }
  498. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  499. }
  500. }
  501. }
  502. // dial initiates a new connection
  503. func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
  504. if entry.state != psNotConnected {
  505. return
  506. }
  507. entry.state = psDialed
  508. entry.knownSelected = knownSelected
  509. if knownSelected {
  510. pool.knownSelected++
  511. } else {
  512. pool.newSelected++
  513. }
  514. addr := entry.addrSelect.choose().(*poolEntryAddress)
  515. glog.V(logger.Debug).Infof("dialing %v out of %v, known: %v", entry.id.String()+"@"+addr.strKey(), len(entry.addr), knownSelected)
  516. entry.dialed = addr
  517. go func() {
  518. pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
  519. select {
  520. case <-pool.quit:
  521. case <-time.After(dialTimeout):
  522. select {
  523. case <-pool.quit:
  524. case pool.timeout <- entry:
  525. }
  526. }
  527. }()
  528. }
  529. // checkDialTimeout checks if the node is still in dialed state and if so, resets it
  530. // and adjusts connection statistics accordingly.
  531. func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
  532. if entry.state != psDialed {
  533. return
  534. }
  535. glog.V(logger.Debug).Infof("timeout %v", entry.id.String()+"@"+entry.dialed.strKey())
  536. entry.state = psNotConnected
  537. if entry.knownSelected {
  538. pool.knownSelected--
  539. } else {
  540. pool.newSelected--
  541. }
  542. entry.connectStats.add(0, 1)
  543. entry.dialed.fails++
  544. pool.setRetryDial(entry)
  545. }
  546. const (
  547. psNotConnected = iota
  548. psDialed
  549. psConnected
  550. psRegistered
  551. )
  552. // poolEntry represents a server node and stores its current state and statistics.
  553. type poolEntry struct {
  554. peer *peer
  555. id discover.NodeID
  556. addr map[string]*poolEntryAddress
  557. lastConnected, dialed *poolEntryAddress
  558. addrSelect weightedRandomSelect
  559. lastDiscovered mclock.AbsTime
  560. known, knownSelected bool
  561. connectStats, delayStats poolStats
  562. responseStats, timeoutStats poolStats
  563. state int
  564. regTime mclock.AbsTime
  565. queueIdx int
  566. removed bool
  567. delayedRetry bool
  568. shortRetry int
  569. }
  570. func (e *poolEntry) EncodeRLP(w io.Writer) error {
  571. return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
  572. }
  573. func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
  574. var entry struct {
  575. ID discover.NodeID
  576. IP net.IP
  577. Port uint16
  578. Fails uint
  579. CStat, DStat, RStat, TStat poolStats
  580. }
  581. if err := s.Decode(&entry); err != nil {
  582. return err
  583. }
  584. addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
  585. e.id = entry.ID
  586. e.addr = make(map[string]*poolEntryAddress)
  587. e.addr[addr.strKey()] = addr
  588. e.addrSelect = *newWeightedRandomSelect()
  589. e.addrSelect.update(addr)
  590. e.lastConnected = addr
  591. e.connectStats = entry.CStat
  592. e.delayStats = entry.DStat
  593. e.responseStats = entry.RStat
  594. e.timeoutStats = entry.TStat
  595. e.shortRetry = shortRetryCnt
  596. e.known = true
  597. return nil
  598. }
  599. // discoveredEntry implements wrsItem
  600. type discoveredEntry poolEntry
  601. // Weight calculates random selection weight for newly discovered entries
  602. func (e *discoveredEntry) Weight() int64 {
  603. if e.state != psNotConnected || e.delayedRetry {
  604. return 0
  605. }
  606. t := time.Duration(mclock.Now() - e.lastDiscovered)
  607. if t <= discoverExpireStart {
  608. return 1000000000
  609. } else {
  610. return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
  611. }
  612. }
  613. // knownEntry implements wrsItem
  614. type knownEntry poolEntry
  615. // Weight calculates random selection weight for known entries
  616. func (e *knownEntry) Weight() int64 {
  617. if e.state != psNotConnected || !e.known || e.delayedRetry {
  618. return 0
  619. }
  620. return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow))
  621. }
  622. // poolEntryAddress is a separate object because currently it is necessary to remember
  623. // multiple potential network addresses for a pool entry. This will be removed after
  624. // the final implementation of v5 discovery which will retrieve signed and serial
  625. // numbered advertisements, making it clear which IP/port is the latest one.
  626. type poolEntryAddress struct {
  627. ip net.IP
  628. port uint16
  629. lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
  630. fails uint // connection failures since last successful connection (persistent)
  631. }
  632. func (a *poolEntryAddress) Weight() int64 {
  633. t := time.Duration(mclock.Now() - a.lastSeen)
  634. return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
  635. }
  636. func (a *poolEntryAddress) strKey() string {
  637. return a.ip.String() + ":" + strconv.Itoa(int(a.port))
  638. }
  639. // poolStats implement statistics for a certain quantity with a long term average
  640. // and a short term value which is adjusted exponentially with a factor of
  641. // pstatRecentAdjust with each update and also returned exponentially to the
  642. // average with the time constant pstatReturnToMeanTC
  643. type poolStats struct {
  644. sum, weight, avg, recent float64
  645. lastRecalc mclock.AbsTime
  646. }
  647. // init initializes stats with a long term sum/update count pair retrieved from the database
  648. func (s *poolStats) init(sum, weight float64) {
  649. s.sum = sum
  650. s.weight = weight
  651. var avg float64
  652. if weight > 0 {
  653. avg = s.sum / weight
  654. }
  655. s.avg = avg
  656. s.recent = avg
  657. s.lastRecalc = mclock.Now()
  658. }
  659. // recalc recalculates recent value return-to-mean and long term average
  660. func (s *poolStats) recalc() {
  661. now := mclock.Now()
  662. s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
  663. if s.sum == 0 {
  664. s.avg = 0
  665. } else {
  666. if s.sum > s.weight*1e30 {
  667. s.avg = 1e30
  668. } else {
  669. s.avg = s.sum / s.weight
  670. }
  671. }
  672. s.lastRecalc = now
  673. }
  674. // add updates the stats with a new value
  675. func (s *poolStats) add(value, weight float64) {
  676. s.weight += weight
  677. s.sum += value * weight
  678. s.recalc()
  679. }
  680. // recentAvg returns the short-term adjusted average
  681. func (s *poolStats) recentAvg() float64 {
  682. s.recalc()
  683. return s.recent
  684. }
  685. func (s *poolStats) EncodeRLP(w io.Writer) error {
  686. return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
  687. }
  688. func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
  689. var stats struct {
  690. SumUint, WeightUint uint64
  691. }
  692. if err := st.Decode(&stats); err != nil {
  693. return err
  694. }
  695. s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
  696. return nil
  697. }
  698. // poolEntryQueue keeps track of its least recently accessed entries and removes
  699. // them when the number of entries reaches the limit
  700. type poolEntryQueue struct {
  701. queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
  702. newPtr, oldPtr, maxCnt int
  703. removeFromPool func(*poolEntry)
  704. }
  705. // newPoolEntryQueue returns a new poolEntryQueue
  706. func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
  707. return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
  708. }
  709. // fetchOldest returns and removes the least recently accessed entry
  710. func (q *poolEntryQueue) fetchOldest() *poolEntry {
  711. if len(q.queue) == 0 {
  712. return nil
  713. }
  714. for {
  715. if e := q.queue[q.oldPtr]; e != nil {
  716. delete(q.queue, q.oldPtr)
  717. q.oldPtr++
  718. return e
  719. }
  720. q.oldPtr++
  721. }
  722. }
  723. // remove removes an entry from the queue
  724. func (q *poolEntryQueue) remove(entry *poolEntry) {
  725. if q.queue[entry.queueIdx] == entry {
  726. delete(q.queue, entry.queueIdx)
  727. }
  728. }
  729. // setLatest adds or updates a recently accessed entry. It also checks if an old entry
  730. // needs to be removed and removes it from the parent pool too with a callback function.
  731. func (q *poolEntryQueue) setLatest(entry *poolEntry) {
  732. if q.queue[entry.queueIdx] == entry {
  733. delete(q.queue, entry.queueIdx)
  734. } else {
  735. if len(q.queue) == q.maxCnt {
  736. e := q.fetchOldest()
  737. q.remove(e)
  738. q.removeFromPool(e)
  739. }
  740. }
  741. entry.queueIdx = q.newPtr
  742. q.queue[entry.queueIdx] = entry
  743. q.newPtr++
  744. }