server.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "time"
  20. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/core"
  23. "github.com/ethereum/go-ethereum/eth"
  24. "github.com/ethereum/go-ethereum/les/flowcontrol"
  25. "github.com/ethereum/go-ethereum/light"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/p2p"
  28. "github.com/ethereum/go-ethereum/p2p/discv5"
  29. "github.com/ethereum/go-ethereum/p2p/enode"
  30. "github.com/ethereum/go-ethereum/params"
  31. "github.com/ethereum/go-ethereum/rpc"
  32. )
  33. type LesServer struct {
  34. lesCommons
  35. archiveMode bool // Flag whether the ethereum node runs in archive mode.
  36. handler *serverHandler
  37. lesTopics []discv5.Topic
  38. privateKey *ecdsa.PrivateKey
  39. // Flow control and capacity management
  40. fcManager *flowcontrol.ClientManager
  41. costTracker *costTracker
  42. defParams flowcontrol.ServerParams
  43. servingQueue *servingQueue
  44. clientPool *clientPool
  45. freeCapacity uint64 // The minimal client capacity used for free client.
  46. threadsIdle int // Request serving threads count when system is idle.
  47. threadsBusy int // Request serving threads count when system is busy(block insertion).
  48. }
  49. func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
  50. // Collect les protocol version information supported by local node.
  51. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
  52. for i, pv := range AdvertiseProtocolVersions {
  53. lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv)
  54. }
  55. // Calculate the number of threads used to service the light client
  56. // requests based on the user-specified value.
  57. threads := config.LightServ * 4 / 100
  58. if threads < 4 {
  59. threads = 4
  60. }
  61. srv := &LesServer{
  62. lesCommons: lesCommons{
  63. genesis: e.BlockChain().Genesis().Hash(),
  64. config: config,
  65. chainConfig: e.BlockChain().Config(),
  66. iConfig: light.DefaultServerIndexerConfig,
  67. chainDb: e.ChainDb(),
  68. peers: newPeerSet(),
  69. chainReader: e.BlockChain(),
  70. chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations),
  71. bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency),
  72. closeCh: make(chan struct{}),
  73. },
  74. archiveMode: e.ArchiveMode(),
  75. lesTopics: lesTopics,
  76. fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
  77. servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
  78. threadsBusy: config.LightServ/100 + 1,
  79. threadsIdle: threads,
  80. }
  81. srv.handler = newServerHandler(srv, e.BlockChain(), e.ChainDb(), e.TxPool(), e.Synced)
  82. srv.costTracker, srv.freeCapacity = newCostTracker(e.ChainDb(), config)
  83. // Set up checkpoint oracle.
  84. oracle := config.CheckpointOracle
  85. if oracle == nil {
  86. oracle = params.CheckpointOracles[e.BlockChain().Genesis().Hash()]
  87. }
  88. srv.oracle = newCheckpointOracle(oracle, srv.localCheckpoint)
  89. // Initialize server capacity management fields.
  90. srv.defParams = flowcontrol.ServerParams{
  91. BufLimit: srv.freeCapacity * bufLimitRatio,
  92. MinRecharge: srv.freeCapacity,
  93. }
  94. // LES flow control tries to more or less guarantee the possibility for the
  95. // clients to send a certain amount of requests at any time and get a quick
  96. // response. Most of the clients want this guarantee but don't actually need
  97. // to send requests most of the time. Our goal is to serve as many clients as
  98. // possible while the actually used server capacity does not exceed the limits
  99. totalRecharge := srv.costTracker.totalRecharge()
  100. maxCapacity := srv.freeCapacity * uint64(srv.config.LightPeers)
  101. if totalRecharge > maxCapacity {
  102. maxCapacity = totalRecharge
  103. }
  104. srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
  105. srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
  106. srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})
  107. checkpoint := srv.latestLocalCheckpoint()
  108. if !checkpoint.Empty() {
  109. log.Info("Loaded latest checkpoint", "section", checkpoint.SectionIndex, "head", checkpoint.SectionHead,
  110. "chtroot", checkpoint.CHTRoot, "bloomroot", checkpoint.BloomRoot)
  111. }
  112. srv.chtIndexer.Start(e.BlockChain())
  113. return srv, nil
  114. }
  115. func (s *LesServer) APIs() []rpc.API {
  116. return []rpc.API{
  117. {
  118. Namespace: "les",
  119. Version: "1.0",
  120. Service: NewPrivateLightAPI(&s.lesCommons),
  121. Public: false,
  122. },
  123. }
  124. }
  125. func (s *LesServer) Protocols() []p2p.Protocol {
  126. return s.makeProtocols(ServerProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} {
  127. if p := s.peers.Peer(peerIdToString(id)); p != nil {
  128. return p.Info()
  129. }
  130. return nil
  131. })
  132. }
  133. // Start starts the LES server
  134. func (s *LesServer) Start(srvr *p2p.Server) {
  135. s.privateKey = srvr.PrivateKey
  136. s.handler.start()
  137. s.wg.Add(1)
  138. go s.capacityManagement()
  139. if srvr.DiscV5 != nil {
  140. for _, topic := range s.lesTopics {
  141. topic := topic
  142. go func() {
  143. logger := log.New("topic", topic)
  144. logger.Info("Starting topic registration")
  145. defer logger.Info("Terminated topic registration")
  146. srvr.DiscV5.RegisterTopic(topic, s.closeCh)
  147. }()
  148. }
  149. }
  150. }
  151. // Stop stops the LES service
  152. func (s *LesServer) Stop() {
  153. close(s.closeCh)
  154. // Disconnect existing sessions.
  155. // This also closes the gate for any new registrations on the peer set.
  156. // sessions which are already established but not added to pm.peers yet
  157. // will exit when they try to register.
  158. s.peers.Close()
  159. s.fcManager.Stop()
  160. s.clientPool.stop()
  161. s.costTracker.stop()
  162. s.handler.stop()
  163. s.servingQueue.stop()
  164. // Note, bloom trie indexer is closed by parent bloombits indexer.
  165. s.chtIndexer.Close()
  166. s.wg.Wait()
  167. log.Info("Les server stopped")
  168. }
  169. func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
  170. bloomIndexer.AddChildIndexer(s.bloomTrieIndexer)
  171. }
  172. // SetClient sets the rpc client and starts running checkpoint contract if it is not yet watched.
  173. func (s *LesServer) SetContractBackend(backend bind.ContractBackend) {
  174. if s.oracle == nil {
  175. return
  176. }
  177. s.oracle.start(backend)
  178. }
  179. // capacityManagement starts an event handler loop that updates the recharge curve of
  180. // the client manager and adjusts the client pool's size according to the total
  181. // capacity updates coming from the client manager
  182. func (s *LesServer) capacityManagement() {
  183. defer s.wg.Done()
  184. processCh := make(chan bool, 100)
  185. sub := s.handler.blockchain.SubscribeBlockProcessingEvent(processCh)
  186. defer sub.Unsubscribe()
  187. totalRechargeCh := make(chan uint64, 100)
  188. totalRecharge := s.costTracker.subscribeTotalRecharge(totalRechargeCh)
  189. totalCapacityCh := make(chan uint64, 100)
  190. totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
  191. s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
  192. var (
  193. busy bool
  194. freePeers uint64
  195. blockProcess mclock.AbsTime
  196. )
  197. updateRecharge := func() {
  198. if busy {
  199. s.servingQueue.setThreads(s.threadsBusy)
  200. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
  201. } else {
  202. s.servingQueue.setThreads(s.threadsIdle)
  203. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 10, totalRecharge}, {totalRecharge, totalRecharge}})
  204. }
  205. }
  206. updateRecharge()
  207. for {
  208. select {
  209. case busy = <-processCh:
  210. if busy {
  211. blockProcess = mclock.Now()
  212. } else {
  213. blockProcessingTimer.Update(time.Duration(mclock.Now() - blockProcess))
  214. }
  215. updateRecharge()
  216. case totalRecharge = <-totalRechargeCh:
  217. totalRechargeGauge.Update(int64(totalRecharge))
  218. updateRecharge()
  219. case totalCapacity = <-totalCapacityCh:
  220. totalCapacityGauge.Update(int64(totalCapacity))
  221. newFreePeers := totalCapacity / s.freeCapacity
  222. if newFreePeers < freePeers && newFreePeers < uint64(s.config.LightPeers) {
  223. log.Warn("Reduced free peer connections", "from", freePeers, "to", newFreePeers)
  224. }
  225. freePeers = newFreePeers
  226. s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
  227. case <-s.closeCh:
  228. return
  229. }
  230. }
  231. }