serverpool.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "io"
  20. "math"
  21. "math/rand"
  22. "net"
  23. "strconv"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/ethdb"
  28. "github.com/ethereum/go-ethereum/logger"
  29. "github.com/ethereum/go-ethereum/logger/glog"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/p2p/discover"
  32. "github.com/ethereum/go-ethereum/p2p/discv5"
  33. "github.com/ethereum/go-ethereum/rlp"
  34. )
  35. const (
  36. // After a connection has been ended or timed out, there is a waiting period
  37. // before it can be selected for connection again.
  38. // waiting period = base delay * (1 + random(1))
  39. // base delay = shortRetryDelay for the first shortRetryCnt times after a
  40. // successful connection, after that longRetryDelay is applied
  41. shortRetryCnt = 5
  42. shortRetryDelay = time.Second * 5
  43. longRetryDelay = time.Minute * 10
  44. // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
  45. // If the limit is reached, the least recently discovered one is thrown out.
  46. maxNewEntries = 1000
  47. // maxKnownEntries is the maximum number of known (already connected) nodes.
  48. // If the limit is reached, the least recently connected one is thrown out.
  49. // (not that unlike new entries, known entries are persistent)
  50. maxKnownEntries = 1000
  51. // target for simultaneously connected servers
  52. targetServerCount = 5
  53. // target for servers selected from the known table
  54. // (we leave room for trying new ones if there is any)
  55. targetKnownSelect = 3
  56. // after dialTimeout, consider the server unavailable and adjust statistics
  57. dialTimeout = time.Second * 30
  58. // targetConnTime is the minimum expected connection duration before a server
  59. // drops a client without any specific reason
  60. targetConnTime = time.Minute * 10
  61. // new entry selection weight calculation based on most recent discovery time:
  62. // unity until discoverExpireStart, then exponential decay with discoverExpireConst
  63. discoverExpireStart = time.Minute * 20
  64. discoverExpireConst = time.Minute * 20
  65. // known entry selection weight is dropped by a factor of exp(-failDropLn) after
  66. // each unsuccessful connection (restored after a successful one)
  67. failDropLn = 0.1
  68. // known node connection success and quality statistics have a long term average
  69. // and a short term value which is adjusted exponentially with a factor of
  70. // pstatRecentAdjust with each dial/connection and also returned exponentially
  71. // to the average with the time constant pstatReturnToMeanTC
  72. pstatRecentAdjust = 0.1
  73. pstatReturnToMeanTC = time.Hour
  74. // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
  75. // each unsuccessful connection (restored after a successful one)
  76. addrFailDropLn = math.Ln2
  77. // responseScoreTC and delayScoreTC are exponential decay time constants for
  78. // calculating selection chances from response times and block delay times
  79. responseScoreTC = time.Millisecond * 100
  80. delayScoreTC = time.Second * 5
  81. timeoutPow = 10
  82. // peerSelectMinWeight is added to calculated weights at request peer selection
  83. // to give poorly performing peers a little chance of coming back
  84. peerSelectMinWeight = 0.005
  85. // initStatsWeight is used to initialize previously unknown peers with good
  86. // statistics to give a chance to prove themselves
  87. initStatsWeight = 1
  88. )
  89. // serverPool implements a pool for storing and selecting newly discovered and already
  90. // known light server nodes. It received discovered nodes, stores statistics about
  91. // known nodes and takes care of always having enough good quality servers connected.
  92. type serverPool struct {
  93. db ethdb.Database
  94. dbKey []byte
  95. server *p2p.Server
  96. quit chan struct{}
  97. wg *sync.WaitGroup
  98. connWg sync.WaitGroup
  99. discSetPeriod chan time.Duration
  100. discNodes chan *discv5.Node
  101. discLookups chan bool
  102. entries map[discover.NodeID]*poolEntry
  103. lock sync.Mutex
  104. timeout, enableRetry chan *poolEntry
  105. adjustStats chan poolStatAdjust
  106. knownQueue, newQueue poolEntryQueue
  107. knownSelect, newSelect *weightedRandomSelect
  108. knownSelected, newSelected int
  109. fastDiscover bool
  110. }
  111. // newServerPool creates a new serverPool instance
  112. func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
  113. pool := &serverPool{
  114. db: db,
  115. dbKey: append(dbPrefix, []byte(topic)...),
  116. server: server,
  117. quit: quit,
  118. wg: wg,
  119. entries: make(map[discover.NodeID]*poolEntry),
  120. timeout: make(chan *poolEntry, 1),
  121. adjustStats: make(chan poolStatAdjust, 100),
  122. enableRetry: make(chan *poolEntry, 1),
  123. knownSelect: newWeightedRandomSelect(),
  124. newSelect: newWeightedRandomSelect(),
  125. fastDiscover: true,
  126. }
  127. pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
  128. pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
  129. wg.Add(1)
  130. pool.loadNodes()
  131. pool.checkDial()
  132. if pool.server.DiscV5 != nil {
  133. pool.discSetPeriod = make(chan time.Duration, 1)
  134. pool.discNodes = make(chan *discv5.Node, 100)
  135. pool.discLookups = make(chan bool, 100)
  136. go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
  137. }
  138. go pool.eventLoop()
  139. return pool
  140. }
  141. // connect should be called upon any incoming connection. If the connection has been
  142. // dialed by the server pool recently, the appropriate pool entry is returned.
  143. // Otherwise, the connection should be rejected.
  144. // Note that whenever a connection has been accepted and a pool entry has been returned,
  145. // disconnect should also always be called.
  146. func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
  147. pool.lock.Lock()
  148. defer pool.lock.Unlock()
  149. entry := pool.entries[p.ID()]
  150. if entry == nil {
  151. return nil
  152. }
  153. glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
  154. if entry.state != psDialed {
  155. return nil
  156. }
  157. pool.connWg.Add(1)
  158. entry.peer = p
  159. entry.state = psConnected
  160. addr := &poolEntryAddress{
  161. ip: ip,
  162. port: port,
  163. lastSeen: mclock.Now(),
  164. }
  165. entry.lastConnected = addr
  166. entry.addr = make(map[string]*poolEntryAddress)
  167. entry.addr[addr.strKey()] = addr
  168. entry.addrSelect = *newWeightedRandomSelect()
  169. entry.addrSelect.update(addr)
  170. return entry
  171. }
  172. // registered should be called after a successful handshake
  173. func (pool *serverPool) registered(entry *poolEntry) {
  174. glog.V(logger.Debug).Infof("registered %v", entry.id.String())
  175. pool.lock.Lock()
  176. defer pool.lock.Unlock()
  177. entry.state = psRegistered
  178. entry.regTime = mclock.Now()
  179. if !entry.known {
  180. pool.newQueue.remove(entry)
  181. entry.known = true
  182. }
  183. pool.knownQueue.setLatest(entry)
  184. entry.shortRetry = shortRetryCnt
  185. }
  186. // disconnect should be called when ending a connection. Service quality statistics
  187. // can be updated optionally (not updated if no registration happened, in this case
  188. // only connection statistics are updated, just like in case of timeout)
  189. func (pool *serverPool) disconnect(entry *poolEntry) {
  190. glog.V(logger.Debug).Infof("disconnected %v", entry.id.String())
  191. pool.lock.Lock()
  192. defer pool.lock.Unlock()
  193. if entry.state == psRegistered {
  194. connTime := mclock.Now() - entry.regTime
  195. connAdjust := float64(connTime) / float64(targetConnTime)
  196. if connAdjust > 1 {
  197. connAdjust = 1
  198. }
  199. stopped := false
  200. select {
  201. case <-pool.quit:
  202. stopped = true
  203. default:
  204. }
  205. if stopped {
  206. entry.connectStats.add(1, connAdjust)
  207. } else {
  208. entry.connectStats.add(connAdjust, 1)
  209. }
  210. }
  211. entry.state = psNotConnected
  212. if entry.knownSelected {
  213. pool.knownSelected--
  214. } else {
  215. pool.newSelected--
  216. }
  217. pool.setRetryDial(entry)
  218. pool.connWg.Done()
  219. }
  220. const (
  221. pseBlockDelay = iota
  222. pseResponseTime
  223. pseResponseTimeout
  224. )
  225. // poolStatAdjust records are sent to adjust peer block delay/response time statistics
  226. type poolStatAdjust struct {
  227. adjustType int
  228. entry *poolEntry
  229. time time.Duration
  230. }
  231. // adjustBlockDelay adjusts the block announce delay statistics of a node
  232. func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
  233. pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
  234. }
  235. // adjustResponseTime adjusts the request response time statistics of a node
  236. func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
  237. if timeout {
  238. pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
  239. } else {
  240. pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
  241. }
  242. }
  243. type selectPeerItem struct {
  244. peer *peer
  245. weight int64
  246. }
  247. func (sp selectPeerItem) Weight() int64 {
  248. return sp.weight
  249. }
  250. // selectPeer selects a suitable peer for a request
  251. func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer {
  252. pool.lock.Lock()
  253. defer pool.lock.Unlock()
  254. sel := newWeightedRandomSelect()
  255. for _, entry := range pool.entries {
  256. if entry.state == psRegistered {
  257. p := entry.peer
  258. ok, cost := canSend(p)
  259. if ok {
  260. w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow)))
  261. sel.update(selectPeerItem{peer: p, weight: w})
  262. }
  263. }
  264. }
  265. choice := sel.choose()
  266. if choice == nil {
  267. return nil
  268. }
  269. return choice.(selectPeerItem).peer
  270. }
  271. // eventLoop handles pool events and mutex locking for all internal functions
  272. func (pool *serverPool) eventLoop() {
  273. lookupCnt := 0
  274. var convTime mclock.AbsTime
  275. pool.discSetPeriod <- time.Millisecond * 100
  276. for {
  277. select {
  278. case entry := <-pool.timeout:
  279. pool.lock.Lock()
  280. if !entry.removed {
  281. pool.checkDialTimeout(entry)
  282. }
  283. pool.lock.Unlock()
  284. case entry := <-pool.enableRetry:
  285. pool.lock.Lock()
  286. if !entry.removed {
  287. entry.delayedRetry = false
  288. pool.updateCheckDial(entry)
  289. }
  290. pool.lock.Unlock()
  291. case adj := <-pool.adjustStats:
  292. pool.lock.Lock()
  293. switch adj.adjustType {
  294. case pseBlockDelay:
  295. adj.entry.delayStats.add(float64(adj.time), 1)
  296. case pseResponseTime:
  297. adj.entry.responseStats.add(float64(adj.time), 1)
  298. adj.entry.timeoutStats.add(0, 1)
  299. case pseResponseTimeout:
  300. adj.entry.timeoutStats.add(1, 1)
  301. }
  302. pool.lock.Unlock()
  303. case node := <-pool.discNodes:
  304. pool.lock.Lock()
  305. now := mclock.Now()
  306. id := discover.NodeID(node.ID)
  307. entry := pool.entries[id]
  308. if entry == nil {
  309. glog.V(logger.Debug).Infof("discovered %v", node.String())
  310. entry = &poolEntry{
  311. id: id,
  312. addr: make(map[string]*poolEntryAddress),
  313. addrSelect: *newWeightedRandomSelect(),
  314. shortRetry: shortRetryCnt,
  315. }
  316. pool.entries[id] = entry
  317. // initialize previously unknown peers with good statistics to give a chance to prove themselves
  318. entry.connectStats.add(1, initStatsWeight)
  319. entry.delayStats.add(0, initStatsWeight)
  320. entry.responseStats.add(0, initStatsWeight)
  321. entry.timeoutStats.add(0, initStatsWeight)
  322. }
  323. entry.lastDiscovered = now
  324. addr := &poolEntryAddress{
  325. ip: node.IP,
  326. port: node.TCP,
  327. }
  328. if a, ok := entry.addr[addr.strKey()]; ok {
  329. addr = a
  330. } else {
  331. entry.addr[addr.strKey()] = addr
  332. }
  333. addr.lastSeen = now
  334. entry.addrSelect.update(addr)
  335. if !entry.known {
  336. pool.newQueue.setLatest(entry)
  337. }
  338. pool.updateCheckDial(entry)
  339. pool.lock.Unlock()
  340. case conv := <-pool.discLookups:
  341. if conv {
  342. if lookupCnt == 0 {
  343. convTime = mclock.Now()
  344. }
  345. lookupCnt++
  346. if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
  347. pool.fastDiscover = false
  348. pool.discSetPeriod <- time.Minute
  349. }
  350. }
  351. case <-pool.quit:
  352. close(pool.discSetPeriod)
  353. pool.connWg.Wait()
  354. pool.saveNodes()
  355. pool.wg.Done()
  356. return
  357. }
  358. }
  359. }
  360. // loadNodes loads known nodes and their statistics from the database
  361. func (pool *serverPool) loadNodes() {
  362. enc, err := pool.db.Get(pool.dbKey)
  363. if err != nil {
  364. return
  365. }
  366. var list []*poolEntry
  367. err = rlp.DecodeBytes(enc, &list)
  368. if err != nil {
  369. glog.V(logger.Debug).Infof("node list decode error: %v", err)
  370. return
  371. }
  372. for _, e := range list {
  373. glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight)
  374. pool.entries[e.id] = e
  375. pool.knownQueue.setLatest(e)
  376. pool.knownSelect.update((*knownEntry)(e))
  377. }
  378. }
  379. // saveNodes saves known nodes and their statistics into the database. Nodes are
  380. // ordered from least to most recently connected.
  381. func (pool *serverPool) saveNodes() {
  382. list := make([]*poolEntry, len(pool.knownQueue.queue))
  383. for i, _ := range list {
  384. list[i] = pool.knownQueue.fetchOldest()
  385. }
  386. enc, err := rlp.EncodeToBytes(list)
  387. if err == nil {
  388. pool.db.Put(pool.dbKey, enc)
  389. }
  390. }
  391. // removeEntry removes a pool entry when the entry count limit is reached.
  392. // Note that it is called by the new/known queues from which the entry has already
  393. // been removed so removing it from the queues is not necessary.
  394. func (pool *serverPool) removeEntry(entry *poolEntry) {
  395. pool.newSelect.remove((*discoveredEntry)(entry))
  396. pool.knownSelect.remove((*knownEntry)(entry))
  397. entry.removed = true
  398. delete(pool.entries, entry.id)
  399. }
  400. // setRetryDial starts the timer which will enable dialing a certain node again
  401. func (pool *serverPool) setRetryDial(entry *poolEntry) {
  402. delay := longRetryDelay
  403. if entry.shortRetry > 0 {
  404. entry.shortRetry--
  405. delay = shortRetryDelay
  406. }
  407. delay += time.Duration(rand.Int63n(int64(delay) + 1))
  408. entry.delayedRetry = true
  409. go func() {
  410. select {
  411. case <-pool.quit:
  412. case <-time.After(delay):
  413. select {
  414. case <-pool.quit:
  415. case pool.enableRetry <- entry:
  416. }
  417. }
  418. }()
  419. }
  420. // updateCheckDial is called when an entry can potentially be dialed again. It updates
  421. // its selection weights and checks if new dials can/should be made.
  422. func (pool *serverPool) updateCheckDial(entry *poolEntry) {
  423. pool.newSelect.update((*discoveredEntry)(entry))
  424. pool.knownSelect.update((*knownEntry)(entry))
  425. pool.checkDial()
  426. }
  427. // checkDial checks if new dials can/should be made. It tries to select servers both
  428. // based on good statistics and recent discovery.
  429. func (pool *serverPool) checkDial() {
  430. fillWithKnownSelects := !pool.fastDiscover
  431. for pool.knownSelected < targetKnownSelect {
  432. entry := pool.knownSelect.choose()
  433. if entry == nil {
  434. fillWithKnownSelects = false
  435. break
  436. }
  437. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  438. }
  439. for pool.knownSelected+pool.newSelected < targetServerCount {
  440. entry := pool.newSelect.choose()
  441. if entry == nil {
  442. break
  443. }
  444. pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
  445. }
  446. if fillWithKnownSelects {
  447. // no more newly discovered nodes to select and since fast discover period
  448. // is over, we probably won't find more in the near future so select more
  449. // known entries if possible
  450. for pool.knownSelected < targetServerCount {
  451. entry := pool.knownSelect.choose()
  452. if entry == nil {
  453. break
  454. }
  455. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  456. }
  457. }
  458. }
  459. // dial initiates a new connection
  460. func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
  461. if entry.state != psNotConnected {
  462. return
  463. }
  464. entry.state = psDialed
  465. entry.knownSelected = knownSelected
  466. if knownSelected {
  467. pool.knownSelected++
  468. } else {
  469. pool.newSelected++
  470. }
  471. addr := entry.addrSelect.choose().(*poolEntryAddress)
  472. glog.V(logger.Debug).Infof("dialing %v out of %v, known: %v", entry.id.String()+"@"+addr.strKey(), len(entry.addr), knownSelected)
  473. entry.dialed = addr
  474. go func() {
  475. pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
  476. select {
  477. case <-pool.quit:
  478. case <-time.After(dialTimeout):
  479. select {
  480. case <-pool.quit:
  481. case pool.timeout <- entry:
  482. }
  483. }
  484. }()
  485. }
  486. // checkDialTimeout checks if the node is still in dialed state and if so, resets it
  487. // and adjusts connection statistics accordingly.
  488. func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
  489. if entry.state != psDialed {
  490. return
  491. }
  492. glog.V(logger.Debug).Infof("timeout %v", entry.id.String()+"@"+entry.dialed.strKey())
  493. entry.state = psNotConnected
  494. if entry.knownSelected {
  495. pool.knownSelected--
  496. } else {
  497. pool.newSelected--
  498. }
  499. entry.connectStats.add(0, 1)
  500. entry.dialed.fails++
  501. pool.setRetryDial(entry)
  502. }
  503. const (
  504. psNotConnected = iota
  505. psDialed
  506. psConnected
  507. psRegistered
  508. )
  509. // poolEntry represents a server node and stores its current state and statistics.
  510. type poolEntry struct {
  511. peer *peer
  512. id discover.NodeID
  513. addr map[string]*poolEntryAddress
  514. lastConnected, dialed *poolEntryAddress
  515. addrSelect weightedRandomSelect
  516. lastDiscovered mclock.AbsTime
  517. known, knownSelected bool
  518. connectStats, delayStats poolStats
  519. responseStats, timeoutStats poolStats
  520. state int
  521. regTime mclock.AbsTime
  522. queueIdx int
  523. removed bool
  524. delayedRetry bool
  525. shortRetry int
  526. }
  527. func (e *poolEntry) EncodeRLP(w io.Writer) error {
  528. return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
  529. }
  530. func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
  531. var entry struct {
  532. ID discover.NodeID
  533. IP net.IP
  534. Port uint16
  535. Fails uint
  536. CStat, DStat, RStat, TStat poolStats
  537. }
  538. if err := s.Decode(&entry); err != nil {
  539. return err
  540. }
  541. addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
  542. e.id = entry.ID
  543. e.addr = make(map[string]*poolEntryAddress)
  544. e.addr[addr.strKey()] = addr
  545. e.addrSelect = *newWeightedRandomSelect()
  546. e.addrSelect.update(addr)
  547. e.lastConnected = addr
  548. e.connectStats = entry.CStat
  549. e.delayStats = entry.DStat
  550. e.responseStats = entry.RStat
  551. e.timeoutStats = entry.TStat
  552. e.shortRetry = shortRetryCnt
  553. e.known = true
  554. return nil
  555. }
  556. // discoveredEntry implements wrsItem
  557. type discoveredEntry poolEntry
  558. // Weight calculates random selection weight for newly discovered entries
  559. func (e *discoveredEntry) Weight() int64 {
  560. if e.state != psNotConnected || e.delayedRetry {
  561. return 0
  562. }
  563. t := time.Duration(mclock.Now() - e.lastDiscovered)
  564. if t <= discoverExpireStart {
  565. return 1000000000
  566. } else {
  567. return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
  568. }
  569. }
  570. // knownEntry implements wrsItem
  571. type knownEntry poolEntry
  572. // Weight calculates random selection weight for known entries
  573. func (e *knownEntry) Weight() int64 {
  574. if e.state != psNotConnected || !e.known || e.delayedRetry {
  575. return 0
  576. }
  577. return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow))
  578. }
  579. // poolEntryAddress is a separate object because currently it is necessary to remember
  580. // multiple potential network addresses for a pool entry. This will be removed after
  581. // the final implementation of v5 discovery which will retrieve signed and serial
  582. // numbered advertisements, making it clear which IP/port is the latest one.
  583. type poolEntryAddress struct {
  584. ip net.IP
  585. port uint16
  586. lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
  587. fails uint // connection failures since last successful connection (persistent)
  588. }
  589. func (a *poolEntryAddress) Weight() int64 {
  590. t := time.Duration(mclock.Now() - a.lastSeen)
  591. return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
  592. }
  593. func (a *poolEntryAddress) strKey() string {
  594. return a.ip.String() + ":" + strconv.Itoa(int(a.port))
  595. }
  596. // poolStats implement statistics for a certain quantity with a long term average
  597. // and a short term value which is adjusted exponentially with a factor of
  598. // pstatRecentAdjust with each update and also returned exponentially to the
  599. // average with the time constant pstatReturnToMeanTC
  600. type poolStats struct {
  601. sum, weight, avg, recent float64
  602. lastRecalc mclock.AbsTime
  603. }
  604. // init initializes stats with a long term sum/update count pair retrieved from the database
  605. func (s *poolStats) init(sum, weight float64) {
  606. s.sum = sum
  607. s.weight = weight
  608. var avg float64
  609. if weight > 0 {
  610. avg = s.sum / weight
  611. }
  612. s.avg = avg
  613. s.recent = avg
  614. s.lastRecalc = mclock.Now()
  615. }
  616. // recalc recalculates recent value return-to-mean and long term average
  617. func (s *poolStats) recalc() {
  618. now := mclock.Now()
  619. s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
  620. if s.sum == 0 {
  621. s.avg = 0
  622. } else {
  623. if s.sum > s.weight*1e30 {
  624. s.avg = 1e30
  625. } else {
  626. s.avg = s.sum / s.weight
  627. }
  628. }
  629. s.lastRecalc = now
  630. }
  631. // add updates the stats with a new value
  632. func (s *poolStats) add(value, weight float64) {
  633. s.weight += weight
  634. s.sum += value * weight
  635. s.recalc()
  636. }
  637. // recentAvg returns the short-term adjusted average
  638. func (s *poolStats) recentAvg() float64 {
  639. s.recalc()
  640. return s.recent
  641. }
  642. func (s *poolStats) EncodeRLP(w io.Writer) error {
  643. return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
  644. }
  645. func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
  646. var stats struct {
  647. SumUint, WeightUint uint64
  648. }
  649. if err := st.Decode(&stats); err != nil {
  650. return err
  651. }
  652. s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
  653. return nil
  654. }
  655. // poolEntryQueue keeps track of its least recently accessed entries and removes
  656. // them when the number of entries reaches the limit
  657. type poolEntryQueue struct {
  658. queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
  659. newPtr, oldPtr, maxCnt int
  660. removeFromPool func(*poolEntry)
  661. }
  662. // newPoolEntryQueue returns a new poolEntryQueue
  663. func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
  664. return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
  665. }
  666. // fetchOldest returns and removes the least recently accessed entry
  667. func (q *poolEntryQueue) fetchOldest() *poolEntry {
  668. if len(q.queue) == 0 {
  669. return nil
  670. }
  671. for {
  672. if e := q.queue[q.oldPtr]; e != nil {
  673. delete(q.queue, q.oldPtr)
  674. q.oldPtr++
  675. return e
  676. }
  677. q.oldPtr++
  678. }
  679. }
  680. // remove removes an entry from the queue
  681. func (q *poolEntryQueue) remove(entry *poolEntry) {
  682. if q.queue[entry.queueIdx] == entry {
  683. delete(q.queue, entry.queueIdx)
  684. }
  685. }
  686. // setLatest adds or updates a recently accessed entry. It also checks if an old entry
  687. // needs to be removed and removes it from the parent pool too with a callback function.
  688. func (q *poolEntryQueue) setLatest(entry *poolEntry) {
  689. if q.queue[entry.queueIdx] == entry {
  690. delete(q.queue, entry.queueIdx)
  691. } else {
  692. if len(q.queue) == q.maxCnt {
  693. e := q.fetchOldest()
  694. q.remove(e)
  695. q.removeFromPool(e)
  696. }
  697. }
  698. entry.queueIdx = q.newPtr
  699. q.queue[entry.queueIdx] = entry
  700. q.newPtr++
  701. }