serverpool.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "fmt"
  20. "io"
  21. "math"
  22. "math/rand"
  23. "net"
  24. "strconv"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/les/utils"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/p2p"
  33. "github.com/ethereum/go-ethereum/p2p/discv5"
  34. "github.com/ethereum/go-ethereum/p2p/enode"
  35. "github.com/ethereum/go-ethereum/rlp"
  36. )
  37. const (
  38. // After a connection has been ended or timed out, there is a waiting period
  39. // before it can be selected for connection again.
  40. // waiting period = base delay * (1 + random(1))
  41. // base delay = shortRetryDelay for the first shortRetryCnt times after a
  42. // successful connection, after that longRetryDelay is applied
  43. shortRetryCnt = 5
  44. shortRetryDelay = time.Second * 5
  45. longRetryDelay = time.Minute * 10
  46. // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
  47. // If the limit is reached, the least recently discovered one is thrown out.
  48. maxNewEntries = 1000
  49. // maxKnownEntries is the maximum number of known (already connected) nodes.
  50. // If the limit is reached, the least recently connected one is thrown out.
  51. // (not that unlike new entries, known entries are persistent)
  52. maxKnownEntries = 1000
  53. // target for simultaneously connected servers
  54. targetServerCount = 5
  55. // target for servers selected from the known table
  56. // (we leave room for trying new ones if there is any)
  57. targetKnownSelect = 3
  58. // after dialTimeout, consider the server unavailable and adjust statistics
  59. dialTimeout = time.Second * 30
  60. // targetConnTime is the minimum expected connection duration before a server
  61. // drops a client without any specific reason
  62. targetConnTime = time.Minute * 10
  63. // new entry selection weight calculation based on most recent discovery time:
  64. // unity until discoverExpireStart, then exponential decay with discoverExpireConst
  65. discoverExpireStart = time.Minute * 20
  66. discoverExpireConst = time.Minute * 20
  67. // known entry selection weight is dropped by a factor of exp(-failDropLn) after
  68. // each unsuccessful connection (restored after a successful one)
  69. failDropLn = 0.1
  70. // known node connection success and quality statistics have a long term average
  71. // and a short term value which is adjusted exponentially with a factor of
  72. // pstatRecentAdjust with each dial/connection and also returned exponentially
  73. // to the average with the time constant pstatReturnToMeanTC
  74. pstatReturnToMeanTC = time.Hour
  75. // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
  76. // each unsuccessful connection (restored after a successful one)
  77. addrFailDropLn = math.Ln2
  78. // responseScoreTC and delayScoreTC are exponential decay time constants for
  79. // calculating selection chances from response times and block delay times
  80. responseScoreTC = time.Millisecond * 100
  81. delayScoreTC = time.Second * 5
  82. timeoutPow = 10
  83. // initStatsWeight is used to initialize previously unknown peers with good
  84. // statistics to give a chance to prove themselves
  85. initStatsWeight = 1
  86. )
  87. // connReq represents a request for peer connection.
  88. type connReq struct {
  89. p *serverPeer
  90. node *enode.Node
  91. result chan *poolEntry
  92. }
  93. // disconnReq represents a request for peer disconnection.
  94. type disconnReq struct {
  95. entry *poolEntry
  96. stopped bool
  97. done chan struct{}
  98. }
  99. // registerReq represents a request for peer registration.
  100. type registerReq struct {
  101. entry *poolEntry
  102. done chan struct{}
  103. }
  104. // serverPool implements a pool for storing and selecting newly discovered and already
  105. // known light server nodes. It received discovered nodes, stores statistics about
  106. // known nodes and takes care of always having enough good quality servers connected.
  107. type serverPool struct {
  108. db ethdb.Database
  109. dbKey []byte
  110. server *p2p.Server
  111. connWg sync.WaitGroup
  112. topic discv5.Topic
  113. discSetPeriod chan time.Duration
  114. discNodes chan *enode.Node
  115. discLookups chan bool
  116. trustedNodes map[enode.ID]*enode.Node
  117. entries map[enode.ID]*poolEntry
  118. timeout, enableRetry chan *poolEntry
  119. adjustStats chan poolStatAdjust
  120. knownQueue, newQueue poolEntryQueue
  121. knownSelect, newSelect *utils.WeightedRandomSelect
  122. knownSelected, newSelected int
  123. fastDiscover bool
  124. connCh chan *connReq
  125. disconnCh chan *disconnReq
  126. registerCh chan *registerReq
  127. closeCh chan struct{}
  128. wg sync.WaitGroup
  129. }
  130. // newServerPool creates a new serverPool instance
  131. func newServerPool(db ethdb.Database, ulcServers []string) *serverPool {
  132. pool := &serverPool{
  133. db: db,
  134. entries: make(map[enode.ID]*poolEntry),
  135. timeout: make(chan *poolEntry, 1),
  136. adjustStats: make(chan poolStatAdjust, 100),
  137. enableRetry: make(chan *poolEntry, 1),
  138. connCh: make(chan *connReq),
  139. disconnCh: make(chan *disconnReq),
  140. registerCh: make(chan *registerReq),
  141. closeCh: make(chan struct{}),
  142. knownSelect: utils.NewWeightedRandomSelect(),
  143. newSelect: utils.NewWeightedRandomSelect(),
  144. fastDiscover: true,
  145. trustedNodes: parseTrustedNodes(ulcServers),
  146. }
  147. pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
  148. pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
  149. return pool
  150. }
  151. func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
  152. pool.server = server
  153. pool.topic = topic
  154. pool.dbKey = append([]byte("serverPool/"), []byte(topic)...)
  155. pool.loadNodes()
  156. pool.connectToTrustedNodes()
  157. if pool.server.DiscV5 != nil {
  158. pool.discSetPeriod = make(chan time.Duration, 1)
  159. pool.discNodes = make(chan *enode.Node, 100)
  160. pool.discLookups = make(chan bool, 100)
  161. go pool.discoverNodes()
  162. }
  163. pool.checkDial()
  164. pool.wg.Add(1)
  165. go pool.eventLoop()
  166. // Inject the bootstrap nodes as initial dial candiates.
  167. pool.wg.Add(1)
  168. go func() {
  169. defer pool.wg.Done()
  170. for _, n := range server.BootstrapNodes {
  171. select {
  172. case pool.discNodes <- n:
  173. case <-pool.closeCh:
  174. return
  175. }
  176. }
  177. }()
  178. }
  179. func (pool *serverPool) stop() {
  180. close(pool.closeCh)
  181. pool.wg.Wait()
  182. }
  183. // discoverNodes wraps SearchTopic, converting result nodes to enode.Node.
  184. func (pool *serverPool) discoverNodes() {
  185. ch := make(chan *discv5.Node)
  186. go func() {
  187. pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups)
  188. close(ch)
  189. }()
  190. for n := range ch {
  191. pubkey, err := decodePubkey64(n.ID[:])
  192. if err != nil {
  193. continue
  194. }
  195. pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP))
  196. }
  197. }
  198. // connect should be called upon any incoming connection. If the connection has been
  199. // dialed by the server pool recently, the appropriate pool entry is returned.
  200. // Otherwise, the connection should be rejected.
  201. // Note that whenever a connection has been accepted and a pool entry has been returned,
  202. // disconnect should also always be called.
  203. func (pool *serverPool) connect(p *serverPeer, node *enode.Node) *poolEntry {
  204. log.Debug("Connect new entry", "enode", p.id)
  205. req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)}
  206. select {
  207. case pool.connCh <- req:
  208. case <-pool.closeCh:
  209. return nil
  210. }
  211. return <-req.result
  212. }
  213. // registered should be called after a successful handshake
  214. func (pool *serverPool) registered(entry *poolEntry) {
  215. log.Debug("Registered new entry", "enode", entry.node.ID())
  216. req := &registerReq{entry: entry, done: make(chan struct{})}
  217. select {
  218. case pool.registerCh <- req:
  219. case <-pool.closeCh:
  220. return
  221. }
  222. <-req.done
  223. }
  224. // disconnect should be called when ending a connection. Service quality statistics
  225. // can be updated optionally (not updated if no registration happened, in this case
  226. // only connection statistics are updated, just like in case of timeout)
  227. func (pool *serverPool) disconnect(entry *poolEntry) {
  228. stopped := false
  229. select {
  230. case <-pool.closeCh:
  231. stopped = true
  232. default:
  233. }
  234. log.Debug("Disconnected old entry", "enode", entry.node.ID())
  235. req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
  236. // Block until disconnection request is served.
  237. pool.disconnCh <- req
  238. <-req.done
  239. }
  240. const (
  241. pseBlockDelay = iota
  242. pseResponseTime
  243. pseResponseTimeout
  244. )
  245. // poolStatAdjust records are sent to adjust peer block delay/response time statistics
  246. type poolStatAdjust struct {
  247. adjustType int
  248. entry *poolEntry
  249. time time.Duration
  250. }
  251. // adjustBlockDelay adjusts the block announce delay statistics of a node
  252. func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
  253. if entry == nil {
  254. return
  255. }
  256. pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
  257. }
  258. // adjustResponseTime adjusts the request response time statistics of a node
  259. func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
  260. if entry == nil {
  261. return
  262. }
  263. if timeout {
  264. pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
  265. } else {
  266. pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
  267. }
  268. }
  269. // eventLoop handles pool events and mutex locking for all internal functions
  270. func (pool *serverPool) eventLoop() {
  271. defer pool.wg.Done()
  272. lookupCnt := 0
  273. var convTime mclock.AbsTime
  274. if pool.discSetPeriod != nil {
  275. pool.discSetPeriod <- time.Millisecond * 100
  276. }
  277. // disconnect updates service quality statistics depending on the connection time
  278. // and disconnection initiator.
  279. disconnect := func(req *disconnReq, stopped bool) {
  280. // Handle peer disconnection requests.
  281. entry := req.entry
  282. if entry.state == psRegistered {
  283. connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
  284. if connAdjust > 1 {
  285. connAdjust = 1
  286. }
  287. if stopped {
  288. // disconnect requested by ourselves.
  289. entry.connectStats.add(1, connAdjust)
  290. } else {
  291. // disconnect requested by server side.
  292. entry.connectStats.add(connAdjust, 1)
  293. }
  294. }
  295. entry.state = psNotConnected
  296. if entry.knownSelected {
  297. pool.knownSelected--
  298. } else {
  299. pool.newSelected--
  300. }
  301. pool.setRetryDial(entry)
  302. pool.connWg.Done()
  303. close(req.done)
  304. }
  305. for {
  306. select {
  307. case entry := <-pool.timeout:
  308. if !entry.removed {
  309. pool.checkDialTimeout(entry)
  310. }
  311. case entry := <-pool.enableRetry:
  312. if !entry.removed {
  313. entry.delayedRetry = false
  314. pool.updateCheckDial(entry)
  315. }
  316. case adj := <-pool.adjustStats:
  317. switch adj.adjustType {
  318. case pseBlockDelay:
  319. adj.entry.delayStats.add(float64(adj.time), 1)
  320. case pseResponseTime:
  321. adj.entry.responseStats.add(float64(adj.time), 1)
  322. adj.entry.timeoutStats.add(0, 1)
  323. case pseResponseTimeout:
  324. adj.entry.timeoutStats.add(1, 1)
  325. }
  326. case node := <-pool.discNodes:
  327. if pool.trustedNodes[node.ID()] == nil {
  328. entry := pool.findOrNewNode(node)
  329. pool.updateCheckDial(entry)
  330. }
  331. case conv := <-pool.discLookups:
  332. if conv {
  333. if lookupCnt == 0 {
  334. convTime = mclock.Now()
  335. }
  336. lookupCnt++
  337. if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
  338. pool.fastDiscover = false
  339. if pool.discSetPeriod != nil {
  340. pool.discSetPeriod <- time.Minute
  341. }
  342. }
  343. }
  344. case req := <-pool.connCh:
  345. if pool.trustedNodes[req.p.ID()] != nil {
  346. // ignore trusted nodes
  347. req.result <- &poolEntry{trusted: true}
  348. } else {
  349. // Handle peer connection requests.
  350. entry := pool.entries[req.p.ID()]
  351. if entry == nil {
  352. entry = pool.findOrNewNode(req.node)
  353. }
  354. if entry.state == psConnected || entry.state == psRegistered {
  355. req.result <- nil
  356. continue
  357. }
  358. pool.connWg.Add(1)
  359. entry.peer = req.p
  360. entry.state = psConnected
  361. addr := &poolEntryAddress{
  362. ip: req.node.IP(),
  363. port: uint16(req.node.TCP()),
  364. lastSeen: mclock.Now(),
  365. }
  366. entry.lastConnected = addr
  367. entry.addr = make(map[string]*poolEntryAddress)
  368. entry.addr[addr.strKey()] = addr
  369. entry.addrSelect = *utils.NewWeightedRandomSelect()
  370. entry.addrSelect.Update(addr)
  371. req.result <- entry
  372. }
  373. case req := <-pool.registerCh:
  374. if req.entry.trusted {
  375. continue
  376. }
  377. // Handle peer registration requests.
  378. entry := req.entry
  379. entry.state = psRegistered
  380. entry.regTime = mclock.Now()
  381. if !entry.known {
  382. pool.newQueue.remove(entry)
  383. entry.known = true
  384. }
  385. pool.knownQueue.setLatest(entry)
  386. entry.shortRetry = shortRetryCnt
  387. close(req.done)
  388. case req := <-pool.disconnCh:
  389. if req.entry.trusted {
  390. continue
  391. }
  392. // Handle peer disconnection requests.
  393. disconnect(req, req.stopped)
  394. case <-pool.closeCh:
  395. if pool.discSetPeriod != nil {
  396. close(pool.discSetPeriod)
  397. }
  398. // Spawn a goroutine to close the disconnCh after all connections are disconnected.
  399. go func() {
  400. pool.connWg.Wait()
  401. close(pool.disconnCh)
  402. }()
  403. // Handle all remaining disconnection requests before exit.
  404. for req := range pool.disconnCh {
  405. disconnect(req, true)
  406. }
  407. pool.saveNodes()
  408. return
  409. }
  410. }
  411. }
  412. func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
  413. now := mclock.Now()
  414. entry := pool.entries[node.ID()]
  415. if entry == nil {
  416. log.Debug("Discovered new entry", "id", node.ID())
  417. entry = &poolEntry{
  418. node: node,
  419. addr: make(map[string]*poolEntryAddress),
  420. addrSelect: *utils.NewWeightedRandomSelect(),
  421. shortRetry: shortRetryCnt,
  422. }
  423. pool.entries[node.ID()] = entry
  424. // initialize previously unknown peers with good statistics to give a chance to prove themselves
  425. entry.connectStats.add(1, initStatsWeight)
  426. entry.delayStats.add(0, initStatsWeight)
  427. entry.responseStats.add(0, initStatsWeight)
  428. entry.timeoutStats.add(0, initStatsWeight)
  429. }
  430. entry.lastDiscovered = now
  431. addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())}
  432. if a, ok := entry.addr[addr.strKey()]; ok {
  433. addr = a
  434. } else {
  435. entry.addr[addr.strKey()] = addr
  436. }
  437. addr.lastSeen = now
  438. entry.addrSelect.Update(addr)
  439. if !entry.known {
  440. pool.newQueue.setLatest(entry)
  441. }
  442. return entry
  443. }
  444. // loadNodes loads known nodes and their statistics from the database
  445. func (pool *serverPool) loadNodes() {
  446. enc, err := pool.db.Get(pool.dbKey)
  447. if err != nil {
  448. return
  449. }
  450. var list []*poolEntry
  451. err = rlp.DecodeBytes(enc, &list)
  452. if err != nil {
  453. log.Debug("Failed to decode node list", "err", err)
  454. return
  455. }
  456. for _, e := range list {
  457. log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails,
  458. "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
  459. "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
  460. "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
  461. "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
  462. pool.entries[e.node.ID()] = e
  463. if pool.trustedNodes[e.node.ID()] == nil {
  464. pool.knownQueue.setLatest(e)
  465. pool.knownSelect.Update((*knownEntry)(e))
  466. }
  467. }
  468. }
  469. // connectToTrustedNodes adds trusted server nodes as static trusted peers.
  470. //
  471. // Note: trusted nodes are not handled by the server pool logic, they are not
  472. // added to either the known or new selection pools. They are connected/reconnected
  473. // by p2p.Server whenever possible.
  474. func (pool *serverPool) connectToTrustedNodes() {
  475. //connect to trusted nodes
  476. for _, node := range pool.trustedNodes {
  477. pool.server.AddTrustedPeer(node)
  478. pool.server.AddPeer(node)
  479. log.Debug("Added trusted node", "id", node.ID().String())
  480. }
  481. }
  482. // parseTrustedNodes returns valid and parsed enodes
  483. func parseTrustedNodes(trustedNodes []string) map[enode.ID]*enode.Node {
  484. nodes := make(map[enode.ID]*enode.Node)
  485. for _, node := range trustedNodes {
  486. node, err := enode.Parse(enode.ValidSchemes, node)
  487. if err != nil {
  488. log.Warn("Trusted node URL invalid", "enode", node, "err", err)
  489. continue
  490. }
  491. nodes[node.ID()] = node
  492. }
  493. return nodes
  494. }
  495. // saveNodes saves known nodes and their statistics into the database. Nodes are
  496. // ordered from least to most recently connected.
  497. func (pool *serverPool) saveNodes() {
  498. list := make([]*poolEntry, len(pool.knownQueue.queue))
  499. for i := range list {
  500. list[i] = pool.knownQueue.fetchOldest()
  501. }
  502. enc, err := rlp.EncodeToBytes(list)
  503. if err == nil {
  504. pool.db.Put(pool.dbKey, enc)
  505. }
  506. }
  507. // removeEntry removes a pool entry when the entry count limit is reached.
  508. // Note that it is called by the new/known queues from which the entry has already
  509. // been removed so removing it from the queues is not necessary.
  510. func (pool *serverPool) removeEntry(entry *poolEntry) {
  511. pool.newSelect.Remove((*discoveredEntry)(entry))
  512. pool.knownSelect.Remove((*knownEntry)(entry))
  513. entry.removed = true
  514. delete(pool.entries, entry.node.ID())
  515. }
  516. // setRetryDial starts the timer which will enable dialing a certain node again
  517. func (pool *serverPool) setRetryDial(entry *poolEntry) {
  518. delay := longRetryDelay
  519. if entry.shortRetry > 0 {
  520. entry.shortRetry--
  521. delay = shortRetryDelay
  522. }
  523. delay += time.Duration(rand.Int63n(int64(delay) + 1))
  524. entry.delayedRetry = true
  525. go func() {
  526. select {
  527. case <-pool.closeCh:
  528. case <-time.After(delay):
  529. select {
  530. case <-pool.closeCh:
  531. case pool.enableRetry <- entry:
  532. }
  533. }
  534. }()
  535. }
  536. // updateCheckDial is called when an entry can potentially be dialed again. It updates
  537. // its selection weights and checks if new dials can/should be made.
  538. func (pool *serverPool) updateCheckDial(entry *poolEntry) {
  539. pool.newSelect.Update((*discoveredEntry)(entry))
  540. pool.knownSelect.Update((*knownEntry)(entry))
  541. pool.checkDial()
  542. }
  543. // checkDial checks if new dials can/should be made. It tries to select servers both
  544. // based on good statistics and recent discovery.
  545. func (pool *serverPool) checkDial() {
  546. fillWithKnownSelects := !pool.fastDiscover
  547. for pool.knownSelected < targetKnownSelect {
  548. entry := pool.knownSelect.Choose()
  549. if entry == nil {
  550. fillWithKnownSelects = false
  551. break
  552. }
  553. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  554. }
  555. for pool.knownSelected+pool.newSelected < targetServerCount {
  556. entry := pool.newSelect.Choose()
  557. if entry == nil {
  558. break
  559. }
  560. pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
  561. }
  562. if fillWithKnownSelects {
  563. // no more newly discovered nodes to select and since fast discover period
  564. // is over, we probably won't find more in the near future so select more
  565. // known entries if possible
  566. for pool.knownSelected < targetServerCount {
  567. entry := pool.knownSelect.Choose()
  568. if entry == nil {
  569. break
  570. }
  571. pool.dial((*poolEntry)(entry.(*knownEntry)), true)
  572. }
  573. }
  574. }
  575. // dial initiates a new connection
  576. func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
  577. if pool.server == nil || entry.state != psNotConnected {
  578. return
  579. }
  580. entry.state = psDialed
  581. entry.knownSelected = knownSelected
  582. if knownSelected {
  583. pool.knownSelected++
  584. } else {
  585. pool.newSelected++
  586. }
  587. addr := entry.addrSelect.Choose().(*poolEntryAddress)
  588. log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
  589. entry.dialed = addr
  590. go func() {
  591. pool.server.AddPeer(entry.node)
  592. select {
  593. case <-pool.closeCh:
  594. case <-time.After(dialTimeout):
  595. select {
  596. case <-pool.closeCh:
  597. case pool.timeout <- entry:
  598. }
  599. }
  600. }()
  601. }
  602. // checkDialTimeout checks if the node is still in dialed state and if so, resets it
  603. // and adjusts connection statistics accordingly.
  604. func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
  605. if entry.state != psDialed {
  606. return
  607. }
  608. log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey())
  609. entry.state = psNotConnected
  610. if entry.knownSelected {
  611. pool.knownSelected--
  612. } else {
  613. pool.newSelected--
  614. }
  615. entry.connectStats.add(0, 1)
  616. entry.dialed.fails++
  617. pool.setRetryDial(entry)
  618. }
  619. const (
  620. psNotConnected = iota
  621. psDialed
  622. psConnected
  623. psRegistered
  624. )
  625. // poolEntry represents a server node and stores its current state and statistics.
  626. type poolEntry struct {
  627. peer *serverPeer
  628. pubkey [64]byte // secp256k1 key of the node
  629. addr map[string]*poolEntryAddress
  630. node *enode.Node
  631. lastConnected, dialed *poolEntryAddress
  632. addrSelect utils.WeightedRandomSelect
  633. lastDiscovered mclock.AbsTime
  634. known, knownSelected, trusted bool
  635. connectStats, delayStats poolStats
  636. responseStats, timeoutStats poolStats
  637. state int
  638. regTime mclock.AbsTime
  639. queueIdx int
  640. removed bool
  641. delayedRetry bool
  642. shortRetry int
  643. }
  644. // poolEntryEnc is the RLP encoding of poolEntry.
  645. type poolEntryEnc struct {
  646. Pubkey []byte
  647. IP net.IP
  648. Port uint16
  649. Fails uint
  650. CStat, DStat, RStat, TStat poolStats
  651. }
  652. func (e *poolEntry) EncodeRLP(w io.Writer) error {
  653. return rlp.Encode(w, &poolEntryEnc{
  654. Pubkey: encodePubkey64(e.node.Pubkey()),
  655. IP: e.lastConnected.ip,
  656. Port: e.lastConnected.port,
  657. Fails: e.lastConnected.fails,
  658. CStat: e.connectStats,
  659. DStat: e.delayStats,
  660. RStat: e.responseStats,
  661. TStat: e.timeoutStats,
  662. })
  663. }
  664. func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
  665. var entry poolEntryEnc
  666. if err := s.Decode(&entry); err != nil {
  667. return err
  668. }
  669. pubkey, err := decodePubkey64(entry.Pubkey)
  670. if err != nil {
  671. return err
  672. }
  673. addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
  674. e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
  675. e.addr = make(map[string]*poolEntryAddress)
  676. e.addr[addr.strKey()] = addr
  677. e.addrSelect = *utils.NewWeightedRandomSelect()
  678. e.addrSelect.Update(addr)
  679. e.lastConnected = addr
  680. e.connectStats = entry.CStat
  681. e.delayStats = entry.DStat
  682. e.responseStats = entry.RStat
  683. e.timeoutStats = entry.TStat
  684. e.shortRetry = shortRetryCnt
  685. e.known = true
  686. return nil
  687. }
  688. func encodePubkey64(pub *ecdsa.PublicKey) []byte {
  689. return crypto.FromECDSAPub(pub)[1:]
  690. }
  691. func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) {
  692. return crypto.UnmarshalPubkey(append([]byte{0x04}, b...))
  693. }
  694. // discoveredEntry implements wrsItem
  695. type discoveredEntry poolEntry
  696. // Weight calculates random selection weight for newly discovered entries
  697. func (e *discoveredEntry) Weight() int64 {
  698. if e.state != psNotConnected || e.delayedRetry {
  699. return 0
  700. }
  701. t := time.Duration(mclock.Now() - e.lastDiscovered)
  702. if t <= discoverExpireStart {
  703. return 1000000000
  704. }
  705. return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
  706. }
  707. // knownEntry implements wrsItem
  708. type knownEntry poolEntry
  709. // Weight calculates random selection weight for known entries
  710. func (e *knownEntry) Weight() int64 {
  711. if e.state != psNotConnected || !e.known || e.delayedRetry {
  712. return 0
  713. }
  714. return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow(1-e.timeoutStats.recentAvg(), timeoutPow))
  715. }
  716. // poolEntryAddress is a separate object because currently it is necessary to remember
  717. // multiple potential network addresses for a pool entry. This will be removed after
  718. // the final implementation of v5 discovery which will retrieve signed and serial
  719. // numbered advertisements, making it clear which IP/port is the latest one.
  720. type poolEntryAddress struct {
  721. ip net.IP
  722. port uint16
  723. lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
  724. fails uint // connection failures since last successful connection (persistent)
  725. }
  726. func (a *poolEntryAddress) Weight() int64 {
  727. t := time.Duration(mclock.Now() - a.lastSeen)
  728. return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
  729. }
  730. func (a *poolEntryAddress) strKey() string {
  731. return a.ip.String() + ":" + strconv.Itoa(int(a.port))
  732. }
  733. // poolStats implement statistics for a certain quantity with a long term average
  734. // and a short term value which is adjusted exponentially with a factor of
  735. // pstatRecentAdjust with each update and also returned exponentially to the
  736. // average with the time constant pstatReturnToMeanTC
  737. type poolStats struct {
  738. sum, weight, avg, recent float64
  739. lastRecalc mclock.AbsTime
  740. }
  741. // init initializes stats with a long term sum/update count pair retrieved from the database
  742. func (s *poolStats) init(sum, weight float64) {
  743. s.sum = sum
  744. s.weight = weight
  745. var avg float64
  746. if weight > 0 {
  747. avg = s.sum / weight
  748. }
  749. s.avg = avg
  750. s.recent = avg
  751. s.lastRecalc = mclock.Now()
  752. }
  753. // recalc recalculates recent value return-to-mean and long term average
  754. func (s *poolStats) recalc() {
  755. now := mclock.Now()
  756. s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
  757. if s.sum == 0 {
  758. s.avg = 0
  759. } else {
  760. if s.sum > s.weight*1e30 {
  761. s.avg = 1e30
  762. } else {
  763. s.avg = s.sum / s.weight
  764. }
  765. }
  766. s.lastRecalc = now
  767. }
  768. // add updates the stats with a new value
  769. func (s *poolStats) add(value, weight float64) {
  770. s.weight += weight
  771. s.sum += value * weight
  772. s.recalc()
  773. }
  774. // recentAvg returns the short-term adjusted average
  775. func (s *poolStats) recentAvg() float64 {
  776. s.recalc()
  777. return s.recent
  778. }
  779. func (s *poolStats) EncodeRLP(w io.Writer) error {
  780. return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
  781. }
  782. func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
  783. var stats struct {
  784. SumUint, WeightUint uint64
  785. }
  786. if err := st.Decode(&stats); err != nil {
  787. return err
  788. }
  789. s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
  790. return nil
  791. }
  792. // poolEntryQueue keeps track of its least recently accessed entries and removes
  793. // them when the number of entries reaches the limit
  794. type poolEntryQueue struct {
  795. queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
  796. newPtr, oldPtr, maxCnt int
  797. removeFromPool func(*poolEntry)
  798. }
  799. // newPoolEntryQueue returns a new poolEntryQueue
  800. func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
  801. return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
  802. }
  803. // fetchOldest returns and removes the least recently accessed entry
  804. func (q *poolEntryQueue) fetchOldest() *poolEntry {
  805. if len(q.queue) == 0 {
  806. return nil
  807. }
  808. for {
  809. if e := q.queue[q.oldPtr]; e != nil {
  810. delete(q.queue, q.oldPtr)
  811. q.oldPtr++
  812. return e
  813. }
  814. q.oldPtr++
  815. }
  816. }
  817. // remove removes an entry from the queue
  818. func (q *poolEntryQueue) remove(entry *poolEntry) {
  819. if q.queue[entry.queueIdx] == entry {
  820. delete(q.queue, entry.queueIdx)
  821. }
  822. }
  823. // setLatest adds or updates a recently accessed entry. It also checks if an old entry
  824. // needs to be removed and removes it from the parent pool too with a callback function.
  825. func (q *poolEntryQueue) setLatest(entry *poolEntry) {
  826. if q.queue[entry.queueIdx] == entry {
  827. delete(q.queue, entry.queueIdx)
  828. } else {
  829. if len(q.queue) == q.maxCnt {
  830. e := q.fetchOldest()
  831. q.remove(e)
  832. q.removeFromPool(e)
  833. }
  834. }
  835. entry.queueIdx = q.newPtr
  836. q.queue[entry.queueIdx] = entry
  837. q.newPtr++
  838. }