serverpool.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "fmt"
  20. "io"
  21. "math"
  22. "math/rand"
  23. "net"
  24. "strconv"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/p2p/discv5"
  33. "github.com/ethereum/go-ethereum/p2p/enode"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. )
  36. const (
  37. // After a connection has been ended or timed out, there is a waiting period
  38. // before it can be selected for connection again.
  39. // waiting period = base delay * (1 + random(1))
  40. // base delay = shortRetryDelay for the first shortRetryCnt times after a
  41. // successful connection, after that longRetryDelay is applied
  42. shortRetryCnt = 5
  43. shortRetryDelay = time.Second * 5
  44. longRetryDelay = time.Minute * 10
  45. // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
  46. // If the limit is reached, the least recently discovered one is thrown out.
  47. maxNewEntries = 1000
  48. // maxKnownEntries is the maximum number of known (already connected) nodes.
  49. // If the limit is reached, the least recently connected one is thrown out.
  50. // (not that unlike new entries, known entries are persistent)
  51. maxKnownEntries = 1000
  52. // target for simultaneously connected servers
  53. targetServerCount = 5
  54. // target for servers selected from the known table
  55. // (we leave room for trying new ones if there is any)
  56. targetKnownSelect = 3
  57. // after dialTimeout, consider the server unavailable and adjust statistics
  58. dialTimeout = time.Second * 30
  59. // targetConnTime is the minimum expected connection duration before a server
  60. // drops a client without any specific reason
  61. targetConnTime = time.Minute * 10
  62. // new entry selection weight calculation based on most recent discovery time:
  63. // unity until discoverExpireStart, then exponential decay with discoverExpireConst
  64. discoverExpireStart = time.Minute * 20
  65. discoverExpireConst = time.Minute * 20
  66. // known entry selection weight is dropped by a factor of exp(-failDropLn) after
  67. // each unsuccessful connection (restored after a successful one)
  68. failDropLn = 0.1
  69. // known node connection success and quality statistics have a long term average
  70. // and a short term value which is adjusted exponentially with a factor of
  71. // pstatRecentAdjust with each dial/connection and also returned exponentially
  72. // to the average with the time constant pstatReturnToMeanTC
  73. pstatReturnToMeanTC = time.Hour
  74. // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
  75. // each unsuccessful connection (restored after a successful one)
  76. addrFailDropLn = math.Ln2
  77. // responseScoreTC and delayScoreTC are exponential decay time constants for
  78. // calculating selection chances from response times and block delay times
  79. responseScoreTC = time.Millisecond * 100
  80. delayScoreTC = time.Second * 5
  81. timeoutPow = 10
  82. // initStatsWeight is used to initialize previously unknown peers with good
  83. // statistics to give a chance to prove themselves
  84. initStatsWeight = 1
  85. )
  86. // connReq represents a request for peer connection.
  87. type connReq struct {
  88. p *serverPeer
  89. node *enode.Node
  90. result chan *poolEntry
  91. }
  92. // disconnReq represents a request for peer disconnection.
  93. type disconnReq struct {
  94. entry *poolEntry
  95. stopped bool
  96. done chan struct{}
  97. }
  98. // registerReq represents a request for peer registration.
  99. type registerReq struct {
  100. entry *poolEntry
  101. done chan struct{}
  102. }
  103. // serverPool implements a pool for storing and selecting newly discovered and already
  104. // known light server nodes. It received discovered nodes, stores statistics about
  105. // known nodes and takes care of always having enough good quality servers connected.
  106. type serverPool struct {
  107. db ethdb.Database
  108. dbKey []byte
  109. server *p2p.Server
  110. connWg sync.WaitGroup
  111. topic discv5.Topic
  112. discSetPeriod chan time.Duration
  113. discNodes chan *enode.Node
  114. discLookups chan bool
  115. trustedNodes map[enode.ID]*enode.Node
  116. entries map[enode.ID]*poolEntry
  117. timeout, enableRetry chan *poolEntry
  118. adjustStats chan poolStatAdjust
  119. knownQueue, newQueue poolEntryQueue
  120. knownSelect, newSelect *weightedRandomSelect
  121. knownSelected, newSelected int
  122. fastDiscover bool
  123. connCh chan *connReq
  124. disconnCh chan *disconnReq
  125. registerCh chan *registerReq
  126. closeCh chan struct{}
  127. wg sync.WaitGroup
  128. }
  129. // newServerPool creates a new serverPool instance
  130. func newServerPool(db ethdb.Database, ulcServers []string) *serverPool {
  131. pool := &serverPool{
  132. db: db,
  133. entries: make(map[enode.ID]*poolEntry),
  134. timeout: make(chan *poolEntry, 1),
  135. adjustStats: make(chan poolStatAdjust, 100),
  136. enableRetry: make(chan *poolEntry, 1),
  137. connCh: make(chan *connReq),
  138. disconnCh: make(chan *disconnReq),
  139. registerCh: make(chan *registerReq),
  140. closeCh: make(chan struct{}),
  141. knownSelect: newWeightedRandomSelect(),
  142. newSelect: newWeightedRandomSelect(),
  143. fastDiscover: true,
  144. trustedNodes: parseTrustedNodes(ulcServers),
  145. }
  146. pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
  147. pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
  148. return pool
  149. }
  150. func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
  151. pool.server = server
  152. pool.topic = topic
  153. pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
  154. pool.loadNodes()
  155. pool.connectToTrustedNodes()
  156. if pool.server.DiscV5 != nil {
  157. pool.discSetPeriod = make(chan time.Duration, 1)
  158. pool.discNodes = make(chan *enode.Node, 100)
  159. pool.discLookups = make(chan bool, 100)
  160. go pool.discoverNodes()
  161. }
  162. pool.checkDial()
  163. pool.wg.Add(1)
  164. go pool.eventLoop()
  165. // Inject the bootstrap nodes as initial dial candiates.
  166. pool.wg.Add(1)
  167. go func() {
  168. defer pool.wg.Done()
  169. for _, n := range server.BootstrapNodes {
  170. select {
  171. case pool.discNodes <- n:
  172. case <-pool.closeCh:
  173. return
  174. }
  175. }
  176. }()
  177. }
  178. func (pool *serverPool) stop() {
  179. close(pool.closeCh)
  180. pool.wg.Wait()
  181. }
  182. // discoverNodes wraps SearchTopic, converting result nodes to enode.Node.
  183. func (pool *serverPool) discoverNodes() {
  184. ch := make(chan *discv5.Node)
  185. go func() {
  186. pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups)
  187. close(ch)
  188. }()
  189. for n := range ch {
  190. pubkey, err := decodePubkey64(n.ID[:])
  191. if err != nil {
  192. continue
  193. }
  194. pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP))
  195. }
  196. }
  197. // connect should be called upon any incoming connection. If the connection has been
  198. // dialed by the server pool recently, the appropriate pool entry is returned.
  199. // Otherwise, the connection should be rejected.
  200. // Note that whenever a connection has been accepted and a pool entry has been returned,
  201. // disconnect should also always be called.
  202. func (pool *serverPool) connect(p *serverPeer, node *enode.Node) *poolEntry {
  203. log.Debug("Connect new entry", "enode", p.id)
  204. req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)}
  205. select {
  206. case pool.connCh <- req:
  207. case <-pool.closeCh:
  208. return nil
  209. }
  210. return <-req.result
  211. }
  212. // registered should be called after a successful handshake
  213. func (pool *serverPool) registered(entry *poolEntry) {
  214. log.Debug("Registered new entry", "enode", entry.node.ID())
  215. req := &registerReq{entry: entry, done: make(chan struct{})}
  216. select {
  217. case pool.registerCh <- req:
  218. case <-pool.closeCh:
  219. return
  220. }
  221. <-req.done
  222. }
  223. // disconnect should be called when ending a connection. Service quality statistics
  224. // can be updated optionally (not updated if no registration happened, in this case
  225. // only connection statistics are updated, just like in case of timeout)
  226. func (pool *serverPool) disconnect(entry *poolEntry) {
  227. stopped := false
  228. select {
  229. case <-pool.closeCh:
  230. stopped = true
  231. default:
  232. }
  233. log.Debug("Disconnected old entry", "enode", entry.node.ID())
  234. req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
  235. // Block until disconnection request is served.
  236. pool.disconnCh <- req
  237. <-req.done
  238. }
  239. const (
  240. pseBlockDelay = iota
  241. pseResponseTime
  242. pseResponseTimeout
  243. )
  244. // poolStatAdjust records are sent to adjust peer block delay/response time statistics
  245. type poolStatAdjust struct {
  246. adjustType int
  247. entry *poolEntry
  248. time time.Duration
  249. }
  250. // adjustBlockDelay adjusts the block announce delay statistics of a node
  251. func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
  252. if entry == nil {
  253. return
  254. }
  255. pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
  256. }
  257. // adjustResponseTime adjusts the request response time statistics of a node
  258. func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
  259. if entry == nil {
  260. return
  261. }
  262. if timeout {
  263. pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
  264. } else {
  265. pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
  266. }
  267. }
  268. // eventLoop handles pool events and mutex locking for all internal functions
  269. func (pool *serverPool) eventLoop() {
  270. defer pool.wg.Done()
  271. lookupCnt := 0
  272. var convTime mclock.AbsTime
  273. if pool.discSetPeriod != nil {
  274. pool.discSetPeriod <- time.Millisecond * 100
  275. }
  276. // disconnect updates service quality statistics depending on the connection time
  277. // and disconnection initiator.
  278. disconnect := func(req *disconnReq, stopped bool) {
  279. // Handle peer disconnection requests.
  280. entry := req.entry
  281. if entry.state == psRegistered {
  282. connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
  283. if connAdjust > 1 {
  284. connAdjust = 1
  285. }
  286. if stopped {
  287. // disconnect requested by ourselves.
  288. entry.connectStats.add(1, connAdjust)
  289. } else {
  290. // disconnect requested by server side.
  291. entry.connectStats.add(connAdjust, 1)
  292. }
  293. }
  294. entry.state = psNotConnected
  295. if entry.knownSelected {
  296. pool.knownSelected--
  297. } else {
  298. pool.newSelected--
  299. }
  300. pool.setRetryDial(entry)
  301. pool.connWg.Done()
  302. close(req.done)
  303. }
  304. for {
  305. select {
  306. case entry := <-pool.timeout:
  307. if !entry.removed {
  308. pool.checkDialTimeout(entry)
  309. }
  310. case entry := <-pool.enableRetry:
  311. if !entry.removed {
  312. entry.delayedRetry = false
  313. pool.updateCheckDial(entry)
  314. }
  315. case adj := <-pool.adjustStats:
  316. switch adj.adjustType {
  317. case pseBlockDelay:
  318. adj.entry.delayStats.add(float64(adj.time), 1)
  319. case pseResponseTime:
  320. adj.entry.responseStats.add(float64(adj.time), 1)
  321. adj.entry.timeoutStats.add(0, 1)
  322. case pseResponseTimeout:
  323. adj.entry.timeoutStats.add(1, 1)
  324. }
  325. case node := <-pool.discNodes:
  326. if pool.trustedNodes[node.ID()] == nil {
  327. entry := pool.findOrNewNode(node)
  328. pool.updateCheckDial(entry)
  329. }
  330. case conv := <-pool.discLookups:
  331. if conv {
  332. if lookupCnt == 0 {
  333. convTime = mclock.Now()
  334. }
  335. lookupCnt++
  336. if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
  337. pool.fastDiscover = false
  338. if pool.discSetPeriod != nil {
  339. pool.discSetPeriod <- time.Minute
  340. }
  341. }
  342. }
  343. case req := <-pool.connCh:
  344. if pool.trustedNodes[req.p.ID()] != nil {
  345. // ignore trusted nodes
  346. req.result <- &poolEntry{trusted: true}
  347. } else {
  348. // Handle peer connection requests.
  349. entry := pool.entries[req.p.ID()]
  350. if entry == nil {
  351. entry = pool.findOrNewNode(req.node)
  352. }
  353. if entry.state == psConnected || entry.state == psRegistered {
  354. req.result <- nil
  355. continue
  356. }
  357. pool.connWg.Add(1)
  358. entry.peer = req.p
  359. entry.state = psConnected
  360. addr := &poolEntryAddress{
  361. ip: req.node.IP(),
  362. port: uint16(req.node.TCP()),
  363. lastSeen: mclock.Now(),
  364. }
  365. entry.lastConnected = addr
  366. entry.addr = make(map[string]*poolEntryAddress)
  367. entry.addr[addr.strKey()] = addr
  368. entry.addrSelect = *newWeightedRandomSelect()
  369. entry.addrSelect.update(addr)
  370. req.result <- entry
  371. }
  372. case req := <-pool.registerCh:
  373. if req.entry.trusted {
  374. continue
  375. }
  376. // Handle peer registration requests.
  377. entry := req.entry
  378. entry.state = psRegistered
  379. entry.regTime = mclock.Now()
  380. if !entry.known {
  381. pool.newQueue.remove(entry)
  382. entry.known = true
  383. }
  384. pool.knownQueue.setLatest(entry)
  385. entry.shortRetry = shortRetryCnt
  386. close(req.done)
  387. case req := <-pool.disconnCh:
  388. if req.entry.trusted {
  389. continue
  390. }
  391. // Handle peer disconnection requests.
  392. disconnect(req, req.stopped)
  393. case <-pool.closeCh:
  394. if pool.discSetPeriod != nil {
  395. close(pool.discSetPeriod)
  396. }
  397. // Spawn a goroutine to close the disconnCh after all connections are disconnected.
  398. go func() {
  399. pool.connWg.Wait()
  400. close(pool.disconnCh)
  401. }()
  402. // Handle all remaining disconnection requests before exit.
  403. for req := range pool.disconnCh {
  404. disconnect(req, true)
  405. }
  406. pool.saveNodes()
  407. return
  408. }
  409. }
  410. }
  411. func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
  412. now := mclock.Now()
  413. entry := pool.entries[node.ID()]
  414. if entry == nil {
  415. log.Debug("Discovered new entry", "id", node.ID())
  416. entry = &poolEntry{
  417. node: node,
  418. addr: make(map[string]*poolEntryAddress),
  419. addrSelect: *newWeightedRandomSelect(),
  420. shortRetry: shortRetryCnt,
  421. }
  422. pool.entries[node.ID()] = entry
  423. // initialize previously unknown peers with good statistics to give a chance to prove themselves
  424. entry.connectStats.add(1, initStatsWeight)
  425. entry.delayStats.add(0, initStatsWeight)
  426. entry.responseStats.add(0, initStatsWeight)
  427. entry.timeoutStats.add(0, initStatsWeight)
  428. }
  429. entry.lastDiscovered = now
  430. addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())}
  431. if a, ok := entry.addr[addr.strKey()]; ok {
  432. addr = a
  433. } else {
  434. entry.addr[addr.strKey()] = addr
  435. }
  436. addr.lastSeen = now
  437. entry.addrSelect.update(addr)
  438. if !entry.known {
  439. pool.newQueue.setLatest(entry)
  440. }
  441. return entry
  442. }
  443. // loadNodes loads known nodes and their statistics from the database
  444. func (pool *serverPool) loadNodes() {
  445. enc, err := pool.db.Get(pool.dbKey)
  446. if err != nil {
  447. return
  448. }
  449. var list []*poolEntry
  450. err = rlp.DecodeBytes(enc, &list)
  451. if err != nil {
  452. log.Debug("Failed to decode node list", "err", err)
  453. return
  454. }
  455. for _, e := range list {
  456. log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails,
  457. "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
  458. "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
  459. "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
  460. "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
  461. pool.entries[e.node.ID()] = e
  462. if pool.trustedNodes[e.node.ID()] == nil {
  463. pool.knownQueue.setLatest(e)
  464. pool.knownSelect.update((*knownEntry)(e))
  465. }
  466. }
  467. }
  468. // connectToTrustedNodes adds trusted server nodes as static trusted peers.
  469. //
  470. // Note: trusted nodes are not handled by the server pool logic, they are not
  471. // added to either the known or new selection pools. They are connected/reconnected
  472. // by p2p.Server whenever possible.
  473. func (pool *serverPool) connectToTrustedNodes() {
  474. //connect to trusted nodes
  475. for _, node := range pool.trustedNodes {
  476. pool.server.AddTrustedPeer(node)
  477. pool.server.AddPeer(node)
  478. log.Debug("Added trusted node", "id", node.ID().String())
  479. }
  480. }
  481. // parseTrustedNodes returns valid and parsed enodes
  482. func parseTrustedNodes(trustedNodes []string) map[enode.ID]*enode.Node {
  483. nodes := make(map[enode.ID]*enode.Node)
  484. for _, node := range trustedNodes {
  485. node, err := enode.Parse(enode.ValidSchemes, node)
  486. if err != nil {
  487. log.Warn("Trusted node URL invalid", "enode", node, "err", err)
  488. continue
  489. }
  490. nodes[node.ID()] = node
  491. }
  492. return nodes
  493. }
  494. // saveNodes saves known nodes and their statistics into the database. Nodes are
  495. // ordered from least to most recently connected.
  496. func (pool *serverPool) saveNodes() {
  497. list := make([]*poolEntry, len(pool.knownQueue.queue))
  498. for i := range list {
  499. list[i] = pool.knownQueue.fetchOldest()
  500. }
  501. enc, err := rlp.EncodeToBytes(list)
  502. if err == nil {
  503. pool.db.Put(pool.dbKey, enc)
  504. }
  505. }
  506. // removeEntry removes a pool entry when the entry count limit is reached.
  507. // Note that it is called by the new/known queues from which the entry has already
  508. // been removed so removing it from the queues is not necessary.
  509. func (pool *serverPool) removeEntry(entry *poolEntry) {
  510. pool.newSelect.remove((*discoveredEntry)(entry))
  511. pool.knownSelect.remove((*knownEntry)(entry))
  512. entry.removed = true
  513. delete(pool.entries, entry.node.ID())
  514. }
  515. // setRetryDial starts the timer which will enable dialing a certain node again
  516. func (pool *serverPool) setRetryDial(entry *poolEntry) {
  517. delay := longRetryDelay
  518. if entry.shortRetry > 0 {
  519. entry.shortRetry--
  520. delay = shortRetryDelay
  521. }
  522. delay += time.Duration(rand.Int63n(int64(delay) + 1))
  523. entry.delayedRetry = true
  524. go func() {
  525. select {
  526. case <-pool.closeCh:
  527. case <-time.After(delay):
  528. select {
  529. case <-pool.closeCh:
  530. case pool.enableRetry <- entry:
  531. }
  532. }
  533. }()
  534. }
  535. // updateCheckDial is called when an entry can potentially be dialed again. It updates
  536. // its selection weights and checks if new dials can/should be made.
  537. func (pool *serverPool) updateCheckDial(entry *poolEntry) {
  538. pool.newSelect.update((*discoveredEntry)(entry))
  539. pool.knownSelect.update((*knownEntry)(entry))
  540. pool.checkDial()
  541. }
  542. // checkDial checks if new dials can/should be made. It tries to select servers both
  543. // based on good statistics and recent discovery.
  544. func (pool *serverPool) checkDial() {
  545. fillWithKnownSelects := !pool.fastDiscover
  546. for pool.knownSelected < targetKnownSelect {
  547. entry := pool.knownSelect.choose()
  548. if entry == nil {
  549. fillWithKnownSelects = false
  550. break
  551. }
  552. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  553. }
  554. for pool.knownSelected+pool.newSelected < targetServerCount {
  555. entry := pool.newSelect.choose()
  556. if entry == nil {
  557. break
  558. }
  559. pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
  560. }
  561. if fillWithKnownSelects {
  562. // no more newly discovered nodes to select and since fast discover period
  563. // is over, we probably won't find more in the near future so select more
  564. // known entries if possible
  565. for pool.knownSelected < targetServerCount {
  566. entry := pool.knownSelect.choose()
  567. if entry == nil {
  568. break
  569. }
  570. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  571. }
  572. }
  573. }
  574. // dial initiates a new connection
  575. func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
  576. if pool.server == nil || entry.state != psNotConnected {
  577. return
  578. }
  579. entry.state = psDialed
  580. entry.knownSelected = knownSelected
  581. if knownSelected {
  582. pool.knownSelected++
  583. } else {
  584. pool.newSelected++
  585. }
  586. addr := entry.addrSelect.choose().(*poolEntryAddress)
  587. log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
  588. entry.dialed = addr
  589. go func() {
  590. pool.server.AddPeer(entry.node)
  591. select {
  592. case <-pool.closeCh:
  593. case <-time.After(dialTimeout):
  594. select {
  595. case <-pool.closeCh:
  596. case pool.timeout <- entry:
  597. }
  598. }
  599. }()
  600. }
  601. // checkDialTimeout checks if the node is still in dialed state and if so, resets it
  602. // and adjusts connection statistics accordingly.
  603. func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
  604. if entry.state != psDialed {
  605. return
  606. }
  607. log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey())
  608. entry.state = psNotConnected
  609. if entry.knownSelected {
  610. pool.knownSelected--
  611. } else {
  612. pool.newSelected--
  613. }
  614. entry.connectStats.add(0, 1)
  615. entry.dialed.fails++
  616. pool.setRetryDial(entry)
  617. }
  618. const (
  619. psNotConnected = iota
  620. psDialed
  621. psConnected
  622. psRegistered
  623. )
  624. // poolEntry represents a server node and stores its current state and statistics.
  625. type poolEntry struct {
  626. peer *serverPeer
  627. pubkey [64]byte // secp256k1 key of the node
  628. addr map[string]*poolEntryAddress
  629. node *enode.Node
  630. lastConnected, dialed *poolEntryAddress
  631. addrSelect weightedRandomSelect
  632. lastDiscovered mclock.AbsTime
  633. known, knownSelected, trusted bool
  634. connectStats, delayStats poolStats
  635. responseStats, timeoutStats poolStats
  636. state int
  637. regTime mclock.AbsTime
  638. queueIdx int
  639. removed bool
  640. delayedRetry bool
  641. shortRetry int
  642. }
  643. // poolEntryEnc is the RLP encoding of poolEntry.
  644. type poolEntryEnc struct {
  645. Pubkey []byte
  646. IP net.IP
  647. Port uint16
  648. Fails uint
  649. CStat, DStat, RStat, TStat poolStats
  650. }
  651. func (e *poolEntry) EncodeRLP(w io.Writer) error {
  652. return rlp.Encode(w, &poolEntryEnc{
  653. Pubkey: encodePubkey64(e.node.Pubkey()),
  654. IP: e.lastConnected.ip,
  655. Port: e.lastConnected.port,
  656. Fails: e.lastConnected.fails,
  657. CStat: e.connectStats,
  658. DStat: e.delayStats,
  659. RStat: e.responseStats,
  660. TStat: e.timeoutStats,
  661. })
  662. }
  663. func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
  664. var entry poolEntryEnc
  665. if err := s.Decode(&entry); err != nil {
  666. return err
  667. }
  668. pubkey, err := decodePubkey64(entry.Pubkey)
  669. if err != nil {
  670. return err
  671. }
  672. addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
  673. e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
  674. e.addr = make(map[string]*poolEntryAddress)
  675. e.addr[addr.strKey()] = addr
  676. e.addrSelect = *newWeightedRandomSelect()
  677. e.addrSelect.update(addr)
  678. e.lastConnected = addr
  679. e.connectStats = entry.CStat
  680. e.delayStats = entry.DStat
  681. e.responseStats = entry.RStat
  682. e.timeoutStats = entry.TStat
  683. e.shortRetry = shortRetryCnt
  684. e.known = true
  685. return nil
  686. }
  687. func encodePubkey64(pub *ecdsa.PublicKey) []byte {
  688. return crypto.FromECDSAPub(pub)[1:]
  689. }
  690. func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) {
  691. return crypto.UnmarshalPubkey(append([]byte{0x04}, b...))
  692. }
  693. // discoveredEntry implements wrsItem
  694. type discoveredEntry poolEntry
  695. // Weight calculates random selection weight for newly discovered entries
  696. func (e *discoveredEntry) Weight() int64 {
  697. if e.state != psNotConnected || e.delayedRetry {
  698. return 0
  699. }
  700. t := time.Duration(mclock.Now() - e.lastDiscovered)
  701. if t <= discoverExpireStart {
  702. return 1000000000
  703. }
  704. return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
  705. }
  706. // knownEntry implements wrsItem
  707. type knownEntry poolEntry
  708. // Weight calculates random selection weight for known entries
  709. func (e *knownEntry) Weight() int64 {
  710. if e.state != psNotConnected || !e.known || e.delayedRetry {
  711. return 0
  712. }
  713. return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow(1-e.timeoutStats.recentAvg(), timeoutPow))
  714. }
  715. // poolEntryAddress is a separate object because currently it is necessary to remember
  716. // multiple potential network addresses for a pool entry. This will be removed after
  717. // the final implementation of v5 discovery which will retrieve signed and serial
  718. // numbered advertisements, making it clear which IP/port is the latest one.
  719. type poolEntryAddress struct {
  720. ip net.IP
  721. port uint16
  722. lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
  723. fails uint // connection failures since last successful connection (persistent)
  724. }
  725. func (a *poolEntryAddress) Weight() int64 {
  726. t := time.Duration(mclock.Now() - a.lastSeen)
  727. return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
  728. }
  729. func (a *poolEntryAddress) strKey() string {
  730. return a.ip.String() + ":" + strconv.Itoa(int(a.port))
  731. }
  732. // poolStats implement statistics for a certain quantity with a long term average
  733. // and a short term value which is adjusted exponentially with a factor of
  734. // pstatRecentAdjust with each update and also returned exponentially to the
  735. // average with the time constant pstatReturnToMeanTC
  736. type poolStats struct {
  737. sum, weight, avg, recent float64
  738. lastRecalc mclock.AbsTime
  739. }
  740. // init initializes stats with a long term sum/update count pair retrieved from the database
  741. func (s *poolStats) init(sum, weight float64) {
  742. s.sum = sum
  743. s.weight = weight
  744. var avg float64
  745. if weight > 0 {
  746. avg = s.sum / weight
  747. }
  748. s.avg = avg
  749. s.recent = avg
  750. s.lastRecalc = mclock.Now()
  751. }
  752. // recalc recalculates recent value return-to-mean and long term average
  753. func (s *poolStats) recalc() {
  754. now := mclock.Now()
  755. s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
  756. if s.sum == 0 {
  757. s.avg = 0
  758. } else {
  759. if s.sum > s.weight*1e30 {
  760. s.avg = 1e30
  761. } else {
  762. s.avg = s.sum / s.weight
  763. }
  764. }
  765. s.lastRecalc = now
  766. }
  767. // add updates the stats with a new value
  768. func (s *poolStats) add(value, weight float64) {
  769. s.weight += weight
  770. s.sum += value * weight
  771. s.recalc()
  772. }
  773. // recentAvg returns the short-term adjusted average
  774. func (s *poolStats) recentAvg() float64 {
  775. s.recalc()
  776. return s.recent
  777. }
  778. func (s *poolStats) EncodeRLP(w io.Writer) error {
  779. return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
  780. }
  781. func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
  782. var stats struct {
  783. SumUint, WeightUint uint64
  784. }
  785. if err := st.Decode(&stats); err != nil {
  786. return err
  787. }
  788. s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
  789. return nil
  790. }
  791. // poolEntryQueue keeps track of its least recently accessed entries and removes
  792. // them when the number of entries reaches the limit
  793. type poolEntryQueue struct {
  794. queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
  795. newPtr, oldPtr, maxCnt int
  796. removeFromPool func(*poolEntry)
  797. }
  798. // newPoolEntryQueue returns a new poolEntryQueue
  799. func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
  800. return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
  801. }
  802. // fetchOldest returns and removes the least recently accessed entry
  803. func (q *poolEntryQueue) fetchOldest() *poolEntry {
  804. if len(q.queue) == 0 {
  805. return nil
  806. }
  807. for {
  808. if e := q.queue[q.oldPtr]; e != nil {
  809. delete(q.queue, q.oldPtr)
  810. q.oldPtr++
  811. return e
  812. }
  813. q.oldPtr++
  814. }
  815. }
  816. // remove removes an entry from the queue
  817. func (q *poolEntryQueue) remove(entry *poolEntry) {
  818. if q.queue[entry.queueIdx] == entry {
  819. delete(q.queue, entry.queueIdx)
  820. }
  821. }
  822. // setLatest adds or updates a recently accessed entry. It also checks if an old entry
  823. // needs to be removed and removes it from the parent pool too with a callback function.
  824. func (q *poolEntryQueue) setLatest(entry *poolEntry) {
  825. if q.queue[entry.queueIdx] == entry {
  826. delete(q.queue, entry.queueIdx)
  827. } else {
  828. if len(q.queue) == q.maxCnt {
  829. e := q.fetchOldest()
  830. q.remove(e)
  831. q.removeFromPool(e)
  832. }
  833. }
  834. entry.queueIdx = q.newPtr
  835. q.queue[entry.queueIdx] = entry
  836. q.newPtr++
  837. }