serverpool.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "fmt"
  20. "io"
  21. "math"
  22. "math/rand"
  23. "net"
  24. "strconv"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/p2p/discv5"
  33. "github.com/ethereum/go-ethereum/p2p/enode"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. )
  36. const (
  37. // After a connection has been ended or timed out, there is a waiting period
  38. // before it can be selected for connection again.
  39. // waiting period = base delay * (1 + random(1))
  40. // base delay = shortRetryDelay for the first shortRetryCnt times after a
  41. // successful connection, after that longRetryDelay is applied
  42. shortRetryCnt = 5
  43. shortRetryDelay = time.Second * 5
  44. longRetryDelay = time.Minute * 10
  45. // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
  46. // If the limit is reached, the least recently discovered one is thrown out.
  47. maxNewEntries = 1000
  48. // maxKnownEntries is the maximum number of known (already connected) nodes.
  49. // If the limit is reached, the least recently connected one is thrown out.
  50. // (not that unlike new entries, known entries are persistent)
  51. maxKnownEntries = 1000
  52. // target for simultaneously connected servers
  53. targetServerCount = 5
  54. // target for servers selected from the known table
  55. // (we leave room for trying new ones if there is any)
  56. targetKnownSelect = 3
  57. // after dialTimeout, consider the server unavailable and adjust statistics
  58. dialTimeout = time.Second * 30
  59. // targetConnTime is the minimum expected connection duration before a server
  60. // drops a client without any specific reason
  61. targetConnTime = time.Minute * 10
  62. // new entry selection weight calculation based on most recent discovery time:
  63. // unity until discoverExpireStart, then exponential decay with discoverExpireConst
  64. discoverExpireStart = time.Minute * 20
  65. discoverExpireConst = time.Minute * 20
  66. // known entry selection weight is dropped by a factor of exp(-failDropLn) after
  67. // each unsuccessful connection (restored after a successful one)
  68. failDropLn = 0.1
  69. // known node connection success and quality statistics have a long term average
  70. // and a short term value which is adjusted exponentially with a factor of
  71. // pstatRecentAdjust with each dial/connection and also returned exponentially
  72. // to the average with the time constant pstatReturnToMeanTC
  73. pstatReturnToMeanTC = time.Hour
  74. // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
  75. // each unsuccessful connection (restored after a successful one)
  76. addrFailDropLn = math.Ln2
  77. // responseScoreTC and delayScoreTC are exponential decay time constants for
  78. // calculating selection chances from response times and block delay times
  79. responseScoreTC = time.Millisecond * 100
  80. delayScoreTC = time.Second * 5
  81. timeoutPow = 10
  82. // initStatsWeight is used to initialize previously unknown peers with good
  83. // statistics to give a chance to prove themselves
  84. initStatsWeight = 1
  85. )
  86. // connReq represents a request for peer connection.
  87. type connReq struct {
  88. p *peer
  89. node *enode.Node
  90. result chan *poolEntry
  91. }
  92. // disconnReq represents a request for peer disconnection.
  93. type disconnReq struct {
  94. entry *poolEntry
  95. stopped bool
  96. done chan struct{}
  97. }
  98. // registerReq represents a request for peer registration.
  99. type registerReq struct {
  100. entry *poolEntry
  101. done chan struct{}
  102. }
  103. // serverPool implements a pool for storing and selecting newly discovered and already
  104. // known light server nodes. It received discovered nodes, stores statistics about
  105. // known nodes and takes care of always having enough good quality servers connected.
  106. type serverPool struct {
  107. db ethdb.Database
  108. dbKey []byte
  109. server *p2p.Server
  110. connWg sync.WaitGroup
  111. topic discv5.Topic
  112. discSetPeriod chan time.Duration
  113. discNodes chan *enode.Node
  114. discLookups chan bool
  115. trustedNodes map[enode.ID]*enode.Node
  116. entries map[enode.ID]*poolEntry
  117. timeout, enableRetry chan *poolEntry
  118. adjustStats chan poolStatAdjust
  119. knownQueue, newQueue poolEntryQueue
  120. knownSelect, newSelect *weightedRandomSelect
  121. knownSelected, newSelected int
  122. fastDiscover bool
  123. connCh chan *connReq
  124. disconnCh chan *disconnReq
  125. registerCh chan *registerReq
  126. closeCh chan struct{}
  127. wg sync.WaitGroup
  128. }
  129. // newServerPool creates a new serverPool instance
  130. func newServerPool(db ethdb.Database, ulcServers []string) *serverPool {
  131. pool := &serverPool{
  132. db: db,
  133. entries: make(map[enode.ID]*poolEntry),
  134. timeout: make(chan *poolEntry, 1),
  135. adjustStats: make(chan poolStatAdjust, 100),
  136. enableRetry: make(chan *poolEntry, 1),
  137. connCh: make(chan *connReq),
  138. disconnCh: make(chan *disconnReq),
  139. registerCh: make(chan *registerReq),
  140. closeCh: make(chan struct{}),
  141. knownSelect: newWeightedRandomSelect(),
  142. newSelect: newWeightedRandomSelect(),
  143. fastDiscover: true,
  144. trustedNodes: parseTrustedNodes(ulcServers),
  145. }
  146. pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
  147. pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
  148. return pool
  149. }
  150. func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
  151. pool.server = server
  152. pool.topic = topic
  153. pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
  154. pool.loadNodes()
  155. pool.connectToTrustedNodes()
  156. if pool.server.DiscV5 != nil {
  157. pool.discSetPeriod = make(chan time.Duration, 1)
  158. pool.discNodes = make(chan *enode.Node, 100)
  159. pool.discLookups = make(chan bool, 100)
  160. go pool.discoverNodes()
  161. }
  162. pool.checkDial()
  163. pool.wg.Add(1)
  164. go pool.eventLoop()
  165. }
  166. func (pool *serverPool) stop() {
  167. close(pool.closeCh)
  168. pool.wg.Wait()
  169. }
  170. // discoverNodes wraps SearchTopic, converting result nodes to enode.Node.
  171. func (pool *serverPool) discoverNodes() {
  172. ch := make(chan *discv5.Node)
  173. go func() {
  174. pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups)
  175. close(ch)
  176. }()
  177. for n := range ch {
  178. pubkey, err := decodePubkey64(n.ID[:])
  179. if err != nil {
  180. continue
  181. }
  182. pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP))
  183. }
  184. }
  185. // connect should be called upon any incoming connection. If the connection has been
  186. // dialed by the server pool recently, the appropriate pool entry is returned.
  187. // Otherwise, the connection should be rejected.
  188. // Note that whenever a connection has been accepted and a pool entry has been returned,
  189. // disconnect should also always be called.
  190. func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry {
  191. log.Debug("Connect new entry", "enode", p.id)
  192. req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)}
  193. select {
  194. case pool.connCh <- req:
  195. case <-pool.closeCh:
  196. return nil
  197. }
  198. return <-req.result
  199. }
  200. // registered should be called after a successful handshake
  201. func (pool *serverPool) registered(entry *poolEntry) {
  202. log.Debug("Registered new entry", "enode", entry.node.ID())
  203. req := &registerReq{entry: entry, done: make(chan struct{})}
  204. select {
  205. case pool.registerCh <- req:
  206. case <-pool.closeCh:
  207. return
  208. }
  209. <-req.done
  210. }
  211. // disconnect should be called when ending a connection. Service quality statistics
  212. // can be updated optionally (not updated if no registration happened, in this case
  213. // only connection statistics are updated, just like in case of timeout)
  214. func (pool *serverPool) disconnect(entry *poolEntry) {
  215. stopped := false
  216. select {
  217. case <-pool.closeCh:
  218. stopped = true
  219. default:
  220. }
  221. log.Debug("Disconnected old entry", "enode", entry.node.ID())
  222. req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
  223. // Block until disconnection request is served.
  224. pool.disconnCh <- req
  225. <-req.done
  226. }
  227. const (
  228. pseBlockDelay = iota
  229. pseResponseTime
  230. pseResponseTimeout
  231. )
  232. // poolStatAdjust records are sent to adjust peer block delay/response time statistics
  233. type poolStatAdjust struct {
  234. adjustType int
  235. entry *poolEntry
  236. time time.Duration
  237. }
  238. // adjustBlockDelay adjusts the block announce delay statistics of a node
  239. func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
  240. if entry == nil {
  241. return
  242. }
  243. pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
  244. }
  245. // adjustResponseTime adjusts the request response time statistics of a node
  246. func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
  247. if entry == nil {
  248. return
  249. }
  250. if timeout {
  251. pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
  252. } else {
  253. pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
  254. }
  255. }
  256. // eventLoop handles pool events and mutex locking for all internal functions
  257. func (pool *serverPool) eventLoop() {
  258. defer pool.wg.Done()
  259. lookupCnt := 0
  260. var convTime mclock.AbsTime
  261. if pool.discSetPeriod != nil {
  262. pool.discSetPeriod <- time.Millisecond * 100
  263. }
  264. // disconnect updates service quality statistics depending on the connection time
  265. // and disconnection initiator.
  266. disconnect := func(req *disconnReq, stopped bool) {
  267. // Handle peer disconnection requests.
  268. entry := req.entry
  269. if entry.state == psRegistered {
  270. connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
  271. if connAdjust > 1 {
  272. connAdjust = 1
  273. }
  274. if stopped {
  275. // disconnect requested by ourselves.
  276. entry.connectStats.add(1, connAdjust)
  277. } else {
  278. // disconnect requested by server side.
  279. entry.connectStats.add(connAdjust, 1)
  280. }
  281. }
  282. entry.state = psNotConnected
  283. if entry.knownSelected {
  284. pool.knownSelected--
  285. } else {
  286. pool.newSelected--
  287. }
  288. pool.setRetryDial(entry)
  289. pool.connWg.Done()
  290. close(req.done)
  291. }
  292. for {
  293. select {
  294. case entry := <-pool.timeout:
  295. if !entry.removed {
  296. pool.checkDialTimeout(entry)
  297. }
  298. case entry := <-pool.enableRetry:
  299. if !entry.removed {
  300. entry.delayedRetry = false
  301. pool.updateCheckDial(entry)
  302. }
  303. case adj := <-pool.adjustStats:
  304. switch adj.adjustType {
  305. case pseBlockDelay:
  306. adj.entry.delayStats.add(float64(adj.time), 1)
  307. case pseResponseTime:
  308. adj.entry.responseStats.add(float64(adj.time), 1)
  309. adj.entry.timeoutStats.add(0, 1)
  310. case pseResponseTimeout:
  311. adj.entry.timeoutStats.add(1, 1)
  312. }
  313. case node := <-pool.discNodes:
  314. if pool.trustedNodes[node.ID()] == nil {
  315. entry := pool.findOrNewNode(node)
  316. pool.updateCheckDial(entry)
  317. }
  318. case conv := <-pool.discLookups:
  319. if conv {
  320. if lookupCnt == 0 {
  321. convTime = mclock.Now()
  322. }
  323. lookupCnt++
  324. if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
  325. pool.fastDiscover = false
  326. if pool.discSetPeriod != nil {
  327. pool.discSetPeriod <- time.Minute
  328. }
  329. }
  330. }
  331. case req := <-pool.connCh:
  332. if pool.trustedNodes[req.p.ID()] != nil {
  333. // ignore trusted nodes
  334. req.result <- &poolEntry{trusted: true}
  335. } else {
  336. // Handle peer connection requests.
  337. entry := pool.entries[req.p.ID()]
  338. if entry == nil {
  339. entry = pool.findOrNewNode(req.node)
  340. }
  341. if entry.state == psConnected || entry.state == psRegistered {
  342. req.result <- nil
  343. continue
  344. }
  345. pool.connWg.Add(1)
  346. entry.peer = req.p
  347. entry.state = psConnected
  348. addr := &poolEntryAddress{
  349. ip: req.node.IP(),
  350. port: uint16(req.node.TCP()),
  351. lastSeen: mclock.Now(),
  352. }
  353. entry.lastConnected = addr
  354. entry.addr = make(map[string]*poolEntryAddress)
  355. entry.addr[addr.strKey()] = addr
  356. entry.addrSelect = *newWeightedRandomSelect()
  357. entry.addrSelect.update(addr)
  358. req.result <- entry
  359. }
  360. case req := <-pool.registerCh:
  361. if req.entry.trusted {
  362. continue
  363. }
  364. // Handle peer registration requests.
  365. entry := req.entry
  366. entry.state = psRegistered
  367. entry.regTime = mclock.Now()
  368. if !entry.known {
  369. pool.newQueue.remove(entry)
  370. entry.known = true
  371. }
  372. pool.knownQueue.setLatest(entry)
  373. entry.shortRetry = shortRetryCnt
  374. close(req.done)
  375. case req := <-pool.disconnCh:
  376. if req.entry.trusted {
  377. continue
  378. }
  379. // Handle peer disconnection requests.
  380. disconnect(req, req.stopped)
  381. case <-pool.closeCh:
  382. if pool.discSetPeriod != nil {
  383. close(pool.discSetPeriod)
  384. }
  385. // Spawn a goroutine to close the disconnCh after all connections are disconnected.
  386. go func() {
  387. pool.connWg.Wait()
  388. close(pool.disconnCh)
  389. }()
  390. // Handle all remaining disconnection requests before exit.
  391. for req := range pool.disconnCh {
  392. disconnect(req, true)
  393. }
  394. pool.saveNodes()
  395. return
  396. }
  397. }
  398. }
  399. func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
  400. now := mclock.Now()
  401. entry := pool.entries[node.ID()]
  402. if entry == nil {
  403. log.Debug("Discovered new entry", "id", node.ID())
  404. entry = &poolEntry{
  405. node: node,
  406. addr: make(map[string]*poolEntryAddress),
  407. addrSelect: *newWeightedRandomSelect(),
  408. shortRetry: shortRetryCnt,
  409. }
  410. pool.entries[node.ID()] = entry
  411. // initialize previously unknown peers with good statistics to give a chance to prove themselves
  412. entry.connectStats.add(1, initStatsWeight)
  413. entry.delayStats.add(0, initStatsWeight)
  414. entry.responseStats.add(0, initStatsWeight)
  415. entry.timeoutStats.add(0, initStatsWeight)
  416. }
  417. entry.lastDiscovered = now
  418. addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())}
  419. if a, ok := entry.addr[addr.strKey()]; ok {
  420. addr = a
  421. } else {
  422. entry.addr[addr.strKey()] = addr
  423. }
  424. addr.lastSeen = now
  425. entry.addrSelect.update(addr)
  426. if !entry.known {
  427. pool.newQueue.setLatest(entry)
  428. }
  429. return entry
  430. }
  431. // loadNodes loads known nodes and their statistics from the database
  432. func (pool *serverPool) loadNodes() {
  433. enc, err := pool.db.Get(pool.dbKey)
  434. if err != nil {
  435. return
  436. }
  437. var list []*poolEntry
  438. err = rlp.DecodeBytes(enc, &list)
  439. if err != nil {
  440. log.Debug("Failed to decode node list", "err", err)
  441. return
  442. }
  443. for _, e := range list {
  444. log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails,
  445. "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
  446. "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
  447. "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
  448. "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
  449. pool.entries[e.node.ID()] = e
  450. if pool.trustedNodes[e.node.ID()] == nil {
  451. pool.knownQueue.setLatest(e)
  452. pool.knownSelect.update((*knownEntry)(e))
  453. }
  454. }
  455. }
  456. // connectToTrustedNodes adds trusted server nodes as static trusted peers.
  457. //
  458. // Note: trusted nodes are not handled by the server pool logic, they are not
  459. // added to either the known or new selection pools. They are connected/reconnected
  460. // by p2p.Server whenever possible.
  461. func (pool *serverPool) connectToTrustedNodes() {
  462. //connect to trusted nodes
  463. for _, node := range pool.trustedNodes {
  464. pool.server.AddTrustedPeer(node)
  465. pool.server.AddPeer(node)
  466. log.Debug("Added trusted node", "id", node.ID().String())
  467. }
  468. }
  469. // parseTrustedNodes returns valid and parsed enodes
  470. func parseTrustedNodes(trustedNodes []string) map[enode.ID]*enode.Node {
  471. nodes := make(map[enode.ID]*enode.Node)
  472. for _, node := range trustedNodes {
  473. node, err := enode.Parse(enode.ValidSchemes, node)
  474. if err != nil {
  475. log.Warn("Trusted node URL invalid", "enode", node, "err", err)
  476. continue
  477. }
  478. nodes[node.ID()] = node
  479. }
  480. return nodes
  481. }
  482. // saveNodes saves known nodes and their statistics into the database. Nodes are
  483. // ordered from least to most recently connected.
  484. func (pool *serverPool) saveNodes() {
  485. list := make([]*poolEntry, len(pool.knownQueue.queue))
  486. for i := range list {
  487. list[i] = pool.knownQueue.fetchOldest()
  488. }
  489. enc, err := rlp.EncodeToBytes(list)
  490. if err == nil {
  491. pool.db.Put(pool.dbKey, enc)
  492. }
  493. }
  494. // removeEntry removes a pool entry when the entry count limit is reached.
  495. // Note that it is called by the new/known queues from which the entry has already
  496. // been removed so removing it from the queues is not necessary.
  497. func (pool *serverPool) removeEntry(entry *poolEntry) {
  498. pool.newSelect.remove((*discoveredEntry)(entry))
  499. pool.knownSelect.remove((*knownEntry)(entry))
  500. entry.removed = true
  501. delete(pool.entries, entry.node.ID())
  502. }
  503. // setRetryDial starts the timer which will enable dialing a certain node again
  504. func (pool *serverPool) setRetryDial(entry *poolEntry) {
  505. delay := longRetryDelay
  506. if entry.shortRetry > 0 {
  507. entry.shortRetry--
  508. delay = shortRetryDelay
  509. }
  510. delay += time.Duration(rand.Int63n(int64(delay) + 1))
  511. entry.delayedRetry = true
  512. go func() {
  513. select {
  514. case <-pool.closeCh:
  515. case <-time.After(delay):
  516. select {
  517. case <-pool.closeCh:
  518. case pool.enableRetry <- entry:
  519. }
  520. }
  521. }()
  522. }
  523. // updateCheckDial is called when an entry can potentially be dialed again. It updates
  524. // its selection weights and checks if new dials can/should be made.
  525. func (pool *serverPool) updateCheckDial(entry *poolEntry) {
  526. pool.newSelect.update((*discoveredEntry)(entry))
  527. pool.knownSelect.update((*knownEntry)(entry))
  528. pool.checkDial()
  529. }
  530. // checkDial checks if new dials can/should be made. It tries to select servers both
  531. // based on good statistics and recent discovery.
  532. func (pool *serverPool) checkDial() {
  533. fillWithKnownSelects := !pool.fastDiscover
  534. for pool.knownSelected < targetKnownSelect {
  535. entry := pool.knownSelect.choose()
  536. if entry == nil {
  537. fillWithKnownSelects = false
  538. break
  539. }
  540. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  541. }
  542. for pool.knownSelected+pool.newSelected < targetServerCount {
  543. entry := pool.newSelect.choose()
  544. if entry == nil {
  545. break
  546. }
  547. pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
  548. }
  549. if fillWithKnownSelects {
  550. // no more newly discovered nodes to select and since fast discover period
  551. // is over, we probably won't find more in the near future so select more
  552. // known entries if possible
  553. for pool.knownSelected < targetServerCount {
  554. entry := pool.knownSelect.choose()
  555. if entry == nil {
  556. break
  557. }
  558. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  559. }
  560. }
  561. }
  562. // dial initiates a new connection
  563. func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
  564. if pool.server == nil || entry.state != psNotConnected {
  565. return
  566. }
  567. entry.state = psDialed
  568. entry.knownSelected = knownSelected
  569. if knownSelected {
  570. pool.knownSelected++
  571. } else {
  572. pool.newSelected++
  573. }
  574. addr := entry.addrSelect.choose().(*poolEntryAddress)
  575. log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
  576. entry.dialed = addr
  577. go func() {
  578. pool.server.AddPeer(entry.node)
  579. select {
  580. case <-pool.closeCh:
  581. case <-time.After(dialTimeout):
  582. select {
  583. case <-pool.closeCh:
  584. case pool.timeout <- entry:
  585. }
  586. }
  587. }()
  588. }
  589. // checkDialTimeout checks if the node is still in dialed state and if so, resets it
  590. // and adjusts connection statistics accordingly.
  591. func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
  592. if entry.state != psDialed {
  593. return
  594. }
  595. log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey())
  596. entry.state = psNotConnected
  597. if entry.knownSelected {
  598. pool.knownSelected--
  599. } else {
  600. pool.newSelected--
  601. }
  602. entry.connectStats.add(0, 1)
  603. entry.dialed.fails++
  604. pool.setRetryDial(entry)
  605. }
  606. const (
  607. psNotConnected = iota
  608. psDialed
  609. psConnected
  610. psRegistered
  611. )
  612. // poolEntry represents a server node and stores its current state and statistics.
  613. type poolEntry struct {
  614. peer *peer
  615. pubkey [64]byte // secp256k1 key of the node
  616. addr map[string]*poolEntryAddress
  617. node *enode.Node
  618. lastConnected, dialed *poolEntryAddress
  619. addrSelect weightedRandomSelect
  620. lastDiscovered mclock.AbsTime
  621. known, knownSelected, trusted bool
  622. connectStats, delayStats poolStats
  623. responseStats, timeoutStats poolStats
  624. state int
  625. regTime mclock.AbsTime
  626. queueIdx int
  627. removed bool
  628. delayedRetry bool
  629. shortRetry int
  630. }
  631. // poolEntryEnc is the RLP encoding of poolEntry.
  632. type poolEntryEnc struct {
  633. Pubkey []byte
  634. IP net.IP
  635. Port uint16
  636. Fails uint
  637. CStat, DStat, RStat, TStat poolStats
  638. }
  639. func (e *poolEntry) EncodeRLP(w io.Writer) error {
  640. return rlp.Encode(w, &poolEntryEnc{
  641. Pubkey: encodePubkey64(e.node.Pubkey()),
  642. IP: e.lastConnected.ip,
  643. Port: e.lastConnected.port,
  644. Fails: e.lastConnected.fails,
  645. CStat: e.connectStats,
  646. DStat: e.delayStats,
  647. RStat: e.responseStats,
  648. TStat: e.timeoutStats,
  649. })
  650. }
  651. func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
  652. var entry poolEntryEnc
  653. if err := s.Decode(&entry); err != nil {
  654. return err
  655. }
  656. pubkey, err := decodePubkey64(entry.Pubkey)
  657. if err != nil {
  658. return err
  659. }
  660. addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
  661. e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
  662. e.addr = make(map[string]*poolEntryAddress)
  663. e.addr[addr.strKey()] = addr
  664. e.addrSelect = *newWeightedRandomSelect()
  665. e.addrSelect.update(addr)
  666. e.lastConnected = addr
  667. e.connectStats = entry.CStat
  668. e.delayStats = entry.DStat
  669. e.responseStats = entry.RStat
  670. e.timeoutStats = entry.TStat
  671. e.shortRetry = shortRetryCnt
  672. e.known = true
  673. return nil
  674. }
  675. func encodePubkey64(pub *ecdsa.PublicKey) []byte {
  676. return crypto.FromECDSAPub(pub)[1:]
  677. }
  678. func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) {
  679. return crypto.UnmarshalPubkey(append([]byte{0x04}, b...))
  680. }
  681. // discoveredEntry implements wrsItem
  682. type discoveredEntry poolEntry
  683. // Weight calculates random selection weight for newly discovered entries
  684. func (e *discoveredEntry) Weight() int64 {
  685. if e.state != psNotConnected || e.delayedRetry {
  686. return 0
  687. }
  688. t := time.Duration(mclock.Now() - e.lastDiscovered)
  689. if t <= discoverExpireStart {
  690. return 1000000000
  691. }
  692. return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
  693. }
  694. // knownEntry implements wrsItem
  695. type knownEntry poolEntry
  696. // Weight calculates random selection weight for known entries
  697. func (e *knownEntry) Weight() int64 {
  698. if e.state != psNotConnected || !e.known || e.delayedRetry {
  699. return 0
  700. }
  701. return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow(1-e.timeoutStats.recentAvg(), timeoutPow))
  702. }
  703. // poolEntryAddress is a separate object because currently it is necessary to remember
  704. // multiple potential network addresses for a pool entry. This will be removed after
  705. // the final implementation of v5 discovery which will retrieve signed and serial
  706. // numbered advertisements, making it clear which IP/port is the latest one.
  707. type poolEntryAddress struct {
  708. ip net.IP
  709. port uint16
  710. lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
  711. fails uint // connection failures since last successful connection (persistent)
  712. }
  713. func (a *poolEntryAddress) Weight() int64 {
  714. t := time.Duration(mclock.Now() - a.lastSeen)
  715. return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
  716. }
  717. func (a *poolEntryAddress) strKey() string {
  718. return a.ip.String() + ":" + strconv.Itoa(int(a.port))
  719. }
  720. // poolStats implement statistics for a certain quantity with a long term average
  721. // and a short term value which is adjusted exponentially with a factor of
  722. // pstatRecentAdjust with each update and also returned exponentially to the
  723. // average with the time constant pstatReturnToMeanTC
  724. type poolStats struct {
  725. sum, weight, avg, recent float64
  726. lastRecalc mclock.AbsTime
  727. }
  728. // init initializes stats with a long term sum/update count pair retrieved from the database
  729. func (s *poolStats) init(sum, weight float64) {
  730. s.sum = sum
  731. s.weight = weight
  732. var avg float64
  733. if weight > 0 {
  734. avg = s.sum / weight
  735. }
  736. s.avg = avg
  737. s.recent = avg
  738. s.lastRecalc = mclock.Now()
  739. }
  740. // recalc recalculates recent value return-to-mean and long term average
  741. func (s *poolStats) recalc() {
  742. now := mclock.Now()
  743. s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
  744. if s.sum == 0 {
  745. s.avg = 0
  746. } else {
  747. if s.sum > s.weight*1e30 {
  748. s.avg = 1e30
  749. } else {
  750. s.avg = s.sum / s.weight
  751. }
  752. }
  753. s.lastRecalc = now
  754. }
  755. // add updates the stats with a new value
  756. func (s *poolStats) add(value, weight float64) {
  757. s.weight += weight
  758. s.sum += value * weight
  759. s.recalc()
  760. }
  761. // recentAvg returns the short-term adjusted average
  762. func (s *poolStats) recentAvg() float64 {
  763. s.recalc()
  764. return s.recent
  765. }
  766. func (s *poolStats) EncodeRLP(w io.Writer) error {
  767. return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
  768. }
  769. func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
  770. var stats struct {
  771. SumUint, WeightUint uint64
  772. }
  773. if err := st.Decode(&stats); err != nil {
  774. return err
  775. }
  776. s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
  777. return nil
  778. }
  779. // poolEntryQueue keeps track of its least recently accessed entries and removes
  780. // them when the number of entries reaches the limit
  781. type poolEntryQueue struct {
  782. queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
  783. newPtr, oldPtr, maxCnt int
  784. removeFromPool func(*poolEntry)
  785. }
  786. // newPoolEntryQueue returns a new poolEntryQueue
  787. func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
  788. return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
  789. }
  790. // fetchOldest returns and removes the least recently accessed entry
  791. func (q *poolEntryQueue) fetchOldest() *poolEntry {
  792. if len(q.queue) == 0 {
  793. return nil
  794. }
  795. for {
  796. if e := q.queue[q.oldPtr]; e != nil {
  797. delete(q.queue, q.oldPtr)
  798. q.oldPtr++
  799. return e
  800. }
  801. q.oldPtr++
  802. }
  803. }
  804. // remove removes an entry from the queue
  805. func (q *poolEntryQueue) remove(entry *poolEntry) {
  806. if q.queue[entry.queueIdx] == entry {
  807. delete(q.queue, entry.queueIdx)
  808. }
  809. }
  810. // setLatest adds or updates a recently accessed entry. It also checks if an old entry
  811. // needs to be removed and removes it from the parent pool too with a callback function.
  812. func (q *poolEntryQueue) setLatest(entry *poolEntry) {
  813. if q.queue[entry.queueIdx] == entry {
  814. delete(q.queue, entry.queueIdx)
  815. } else {
  816. if len(q.queue) == q.maxCnt {
  817. e := q.fetchOldest()
  818. q.remove(e)
  819. q.removeFromPool(e)
  820. }
  821. }
  822. entry.queueIdx = q.newPtr
  823. q.queue[entry.queueIdx] = entry
  824. q.newPtr++
  825. }