server.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "time"
  20. "github.com/ethereum/go-ethereum/common/mclock"
  21. "github.com/ethereum/go-ethereum/eth"
  22. "github.com/ethereum/go-ethereum/les/flowcontrol"
  23. lps "github.com/ethereum/go-ethereum/les/lespay/server"
  24. "github.com/ethereum/go-ethereum/light"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/node"
  27. "github.com/ethereum/go-ethereum/p2p"
  28. "github.com/ethereum/go-ethereum/p2p/discv5"
  29. "github.com/ethereum/go-ethereum/p2p/enode"
  30. "github.com/ethereum/go-ethereum/p2p/enr"
  31. "github.com/ethereum/go-ethereum/params"
  32. "github.com/ethereum/go-ethereum/rpc"
  33. )
  34. type LesServer struct {
  35. lesCommons
  36. archiveMode bool // Flag whether the ethereum node runs in archive mode.
  37. peers *clientPeerSet
  38. serverset *serverSet
  39. handler *serverHandler
  40. lesTopics []discv5.Topic
  41. privateKey *ecdsa.PrivateKey
  42. // Flow control and capacity management
  43. fcManager *flowcontrol.ClientManager
  44. costTracker *costTracker
  45. defParams flowcontrol.ServerParams
  46. servingQueue *servingQueue
  47. clientPool *clientPool
  48. minCapacity, maxCapacity uint64
  49. threadsIdle int // Request serving threads count when system is idle.
  50. threadsBusy int // Request serving threads count when system is busy(block insertion).
  51. p2pSrv *p2p.Server
  52. }
  53. func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
  54. // Collect les protocol version information supported by local node.
  55. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
  56. for i, pv := range AdvertiseProtocolVersions {
  57. lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv)
  58. }
  59. // Calculate the number of threads used to service the light client
  60. // requests based on the user-specified value.
  61. threads := config.LightServ * 4 / 100
  62. if threads < 4 {
  63. threads = 4
  64. }
  65. srv := &LesServer{
  66. lesCommons: lesCommons{
  67. genesis: e.BlockChain().Genesis().Hash(),
  68. config: config,
  69. chainConfig: e.BlockChain().Config(),
  70. iConfig: light.DefaultServerIndexerConfig,
  71. chainDb: e.ChainDb(),
  72. chainReader: e.BlockChain(),
  73. chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true),
  74. bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
  75. closeCh: make(chan struct{}),
  76. },
  77. archiveMode: e.ArchiveMode(),
  78. peers: newClientPeerSet(),
  79. serverset: newServerSet(),
  80. lesTopics: lesTopics,
  81. fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
  82. servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
  83. threadsBusy: config.LightServ/100 + 1,
  84. threadsIdle: threads,
  85. p2pSrv: node.Server(),
  86. }
  87. srv.handler = newServerHandler(srv, e.BlockChain(), e.ChainDb(), e.TxPool(), e.Synced)
  88. srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
  89. srv.oracle = srv.setupOracle(node, e.BlockChain().Genesis().Hash(), config)
  90. // Initialize the bloom trie indexer.
  91. e.BloomIndexer().AddChildIndexer(srv.bloomTrieIndexer)
  92. // Initialize server capacity management fields.
  93. srv.defParams = flowcontrol.ServerParams{
  94. BufLimit: srv.minCapacity * bufLimitRatio,
  95. MinRecharge: srv.minCapacity,
  96. }
  97. // LES flow control tries to more or less guarantee the possibility for the
  98. // clients to send a certain amount of requests at any time and get a quick
  99. // response. Most of the clients want this guarantee but don't actually need
  100. // to send requests most of the time. Our goal is to serve as many clients as
  101. // possible while the actually used server capacity does not exceed the limits
  102. totalRecharge := srv.costTracker.totalRecharge()
  103. srv.maxCapacity = srv.minCapacity * uint64(srv.config.LightPeers)
  104. if totalRecharge > srv.maxCapacity {
  105. srv.maxCapacity = totalRecharge
  106. }
  107. srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
  108. srv.clientPool = newClientPool(srv.chainDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, func(id enode.ID) { go srv.peers.unregister(id.String()) })
  109. srv.clientPool.setDefaultFactors(lps.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
  110. checkpoint := srv.latestLocalCheckpoint()
  111. if !checkpoint.Empty() {
  112. log.Info("Loaded latest checkpoint", "section", checkpoint.SectionIndex, "head", checkpoint.SectionHead,
  113. "chtroot", checkpoint.CHTRoot, "bloomroot", checkpoint.BloomRoot)
  114. }
  115. srv.chtIndexer.Start(e.BlockChain())
  116. node.RegisterProtocols(srv.Protocols())
  117. node.RegisterAPIs(srv.APIs())
  118. node.RegisterLifecycle(srv)
  119. return srv, nil
  120. }
  121. func (s *LesServer) APIs() []rpc.API {
  122. return []rpc.API{
  123. {
  124. Namespace: "les",
  125. Version: "1.0",
  126. Service: NewPrivateLightAPI(&s.lesCommons),
  127. Public: false,
  128. },
  129. {
  130. Namespace: "les",
  131. Version: "1.0",
  132. Service: NewPrivateLightServerAPI(s),
  133. Public: false,
  134. },
  135. {
  136. Namespace: "debug",
  137. Version: "1.0",
  138. Service: NewPrivateDebugAPI(s),
  139. Public: false,
  140. },
  141. }
  142. }
  143. func (s *LesServer) Protocols() []p2p.Protocol {
  144. ps := s.makeProtocols(ServerProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} {
  145. if p := s.peers.peer(id.String()); p != nil {
  146. return p.Info()
  147. }
  148. return nil
  149. }, nil)
  150. // Add "les" ENR entries.
  151. for i := range ps {
  152. ps[i].Attributes = []enr.Entry{&lesEntry{}}
  153. }
  154. return ps
  155. }
  156. // Start starts the LES server
  157. func (s *LesServer) Start() error {
  158. s.privateKey = s.p2pSrv.PrivateKey
  159. s.handler.start()
  160. s.wg.Add(1)
  161. go s.capacityManagement()
  162. if s.p2pSrv.DiscV5 != nil {
  163. for _, topic := range s.lesTopics {
  164. topic := topic
  165. go func() {
  166. logger := log.New("topic", topic)
  167. logger.Info("Starting topic registration")
  168. defer logger.Info("Terminated topic registration")
  169. s.p2pSrv.DiscV5.RegisterTopic(topic, s.closeCh)
  170. }()
  171. }
  172. }
  173. return nil
  174. }
  175. // Stop stops the LES service
  176. func (s *LesServer) Stop() error {
  177. close(s.closeCh)
  178. // Disconnect existing connections with other LES servers.
  179. s.serverset.close()
  180. // Disconnect existing sessions.
  181. // This also closes the gate for any new registrations on the peer set.
  182. // sessions which are already established but not added to pm.peers yet
  183. // will exit when they try to register.
  184. s.peers.close()
  185. s.fcManager.Stop()
  186. s.costTracker.stop()
  187. s.handler.stop()
  188. s.clientPool.stop() // client pool should be closed after handler.
  189. s.servingQueue.stop()
  190. // Note, bloom trie indexer is closed by parent bloombits indexer.
  191. s.chtIndexer.Close()
  192. s.wg.Wait()
  193. log.Info("Les server stopped")
  194. return nil
  195. }
  196. // capacityManagement starts an event handler loop that updates the recharge curve of
  197. // the client manager and adjusts the client pool's size according to the total
  198. // capacity updates coming from the client manager
  199. func (s *LesServer) capacityManagement() {
  200. defer s.wg.Done()
  201. processCh := make(chan bool, 100)
  202. sub := s.handler.blockchain.SubscribeBlockProcessingEvent(processCh)
  203. defer sub.Unsubscribe()
  204. totalRechargeCh := make(chan uint64, 100)
  205. totalRecharge := s.costTracker.subscribeTotalRecharge(totalRechargeCh)
  206. totalCapacityCh := make(chan uint64, 100)
  207. totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
  208. s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
  209. var (
  210. busy bool
  211. freePeers uint64
  212. blockProcess mclock.AbsTime
  213. )
  214. updateRecharge := func() {
  215. if busy {
  216. s.servingQueue.setThreads(s.threadsBusy)
  217. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
  218. } else {
  219. s.servingQueue.setThreads(s.threadsIdle)
  220. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 10, totalRecharge}, {totalRecharge, totalRecharge}})
  221. }
  222. }
  223. updateRecharge()
  224. for {
  225. select {
  226. case busy = <-processCh:
  227. if busy {
  228. blockProcess = mclock.Now()
  229. } else {
  230. blockProcessingTimer.Update(time.Duration(mclock.Now() - blockProcess))
  231. }
  232. updateRecharge()
  233. case totalRecharge = <-totalRechargeCh:
  234. totalRechargeGauge.Update(int64(totalRecharge))
  235. updateRecharge()
  236. case totalCapacity = <-totalCapacityCh:
  237. totalCapacityGauge.Update(int64(totalCapacity))
  238. newFreePeers := totalCapacity / s.minCapacity
  239. if newFreePeers < freePeers && newFreePeers < uint64(s.config.LightPeers) {
  240. log.Warn("Reduced free peer connections", "from", freePeers, "to", newFreePeers)
  241. }
  242. freePeers = newFreePeers
  243. s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
  244. case <-s.closeCh:
  245. return
  246. }
  247. }
  248. }