server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "reflect"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/core"
  23. "github.com/ethereum/go-ethereum/eth/ethconfig"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/les/flowcontrol"
  26. "github.com/ethereum/go-ethereum/les/vflux"
  27. vfs "github.com/ethereum/go-ethereum/les/vflux/server"
  28. "github.com/ethereum/go-ethereum/light"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/node"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/p2p/enode"
  33. "github.com/ethereum/go-ethereum/p2p/enr"
  34. "github.com/ethereum/go-ethereum/p2p/nodestate"
  35. "github.com/ethereum/go-ethereum/params"
  36. "github.com/ethereum/go-ethereum/rpc"
  37. )
  38. var (
  39. serverSetup = &nodestate.Setup{}
  40. clientPeerField = serverSetup.NewField("clientPeer", reflect.TypeOf(&clientPeer{}))
  41. clientInfoField = serverSetup.NewField("clientInfo", reflect.TypeOf(&clientInfo{}))
  42. connAddressField = serverSetup.NewField("connAddr", reflect.TypeOf(""))
  43. balanceTrackerSetup = vfs.NewBalanceTrackerSetup(serverSetup)
  44. priorityPoolSetup = vfs.NewPriorityPoolSetup(serverSetup)
  45. )
  46. func init() {
  47. balanceTrackerSetup.Connect(connAddressField, priorityPoolSetup.CapacityField)
  48. priorityPoolSetup.Connect(balanceTrackerSetup.BalanceField, balanceTrackerSetup.UpdateFlag) // NodeBalance implements nodePriority
  49. }
  50. type ethBackend interface {
  51. ArchiveMode() bool
  52. BlockChain() *core.BlockChain
  53. BloomIndexer() *core.ChainIndexer
  54. ChainDb() ethdb.Database
  55. Synced() bool
  56. TxPool() *core.TxPool
  57. }
  58. type LesServer struct {
  59. lesCommons
  60. ns *nodestate.NodeStateMachine
  61. archiveMode bool // Flag whether the ethereum node runs in archive mode.
  62. handler *serverHandler
  63. broadcaster *broadcaster
  64. vfluxServer *vfs.Server
  65. privateKey *ecdsa.PrivateKey
  66. // Flow control and capacity management
  67. fcManager *flowcontrol.ClientManager
  68. costTracker *costTracker
  69. defParams flowcontrol.ServerParams
  70. servingQueue *servingQueue
  71. clientPool *clientPool
  72. minCapacity, maxCapacity uint64
  73. threadsIdle int // Request serving threads count when system is idle.
  74. threadsBusy int // Request serving threads count when system is busy(block insertion).
  75. p2pSrv *p2p.Server
  76. }
  77. func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*LesServer, error) {
  78. lesDb, err := node.OpenDatabase("les.server", 0, 0, "eth/db/lesserver/", false)
  79. if err != nil {
  80. return nil, err
  81. }
  82. ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
  83. // Calculate the number of threads used to service the light client
  84. // requests based on the user-specified value.
  85. threads := config.LightServ * 4 / 100
  86. if threads < 4 {
  87. threads = 4
  88. }
  89. srv := &LesServer{
  90. lesCommons: lesCommons{
  91. genesis: e.BlockChain().Genesis().Hash(),
  92. config: config,
  93. chainConfig: e.BlockChain().Config(),
  94. iConfig: light.DefaultServerIndexerConfig,
  95. chainDb: e.ChainDb(),
  96. lesDb: lesDb,
  97. chainReader: e.BlockChain(),
  98. chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true),
  99. bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
  100. closeCh: make(chan struct{}),
  101. },
  102. ns: ns,
  103. archiveMode: e.ArchiveMode(),
  104. broadcaster: newBroadcaster(ns),
  105. vfluxServer: vfs.NewServer(time.Millisecond * 10),
  106. fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
  107. servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
  108. threadsBusy: config.LightServ/100 + 1,
  109. threadsIdle: threads,
  110. p2pSrv: node.Server(),
  111. }
  112. srv.vfluxServer.Register(srv)
  113. issync := e.Synced
  114. if config.LightNoSyncServe {
  115. issync = func() bool { return true }
  116. }
  117. srv.handler = newServerHandler(srv, e.BlockChain(), e.ChainDb(), e.TxPool(), issync)
  118. srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
  119. srv.oracle = srv.setupOracle(node, e.BlockChain().Genesis().Hash(), config)
  120. // Initialize the bloom trie indexer.
  121. e.BloomIndexer().AddChildIndexer(srv.bloomTrieIndexer)
  122. // Initialize server capacity management fields.
  123. srv.defParams = flowcontrol.ServerParams{
  124. BufLimit: srv.minCapacity * bufLimitRatio,
  125. MinRecharge: srv.minCapacity,
  126. }
  127. // LES flow control tries to more or less guarantee the possibility for the
  128. // clients to send a certain amount of requests at any time and get a quick
  129. // response. Most of the clients want this guarantee but don't actually need
  130. // to send requests most of the time. Our goal is to serve as many clients as
  131. // possible while the actually used server capacity does not exceed the limits
  132. totalRecharge := srv.costTracker.totalRecharge()
  133. srv.maxCapacity = srv.minCapacity * uint64(srv.config.LightPeers)
  134. if totalRecharge > srv.maxCapacity {
  135. srv.maxCapacity = totalRecharge
  136. }
  137. srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
  138. srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient, issync)
  139. srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
  140. checkpoint := srv.latestLocalCheckpoint()
  141. if !checkpoint.Empty() {
  142. log.Info("Loaded latest checkpoint", "section", checkpoint.SectionIndex, "head", checkpoint.SectionHead,
  143. "chtroot", checkpoint.CHTRoot, "bloomroot", checkpoint.BloomRoot)
  144. }
  145. srv.chtIndexer.Start(e.BlockChain())
  146. node.RegisterProtocols(srv.Protocols())
  147. node.RegisterAPIs(srv.APIs())
  148. node.RegisterLifecycle(srv)
  149. // disconnect all peers at nsm shutdown
  150. ns.SubscribeField(clientPeerField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  151. if state.Equals(serverSetup.OfflineFlag()) && oldValue != nil {
  152. oldValue.(*clientPeer).Peer.Disconnect(p2p.DiscRequested)
  153. }
  154. })
  155. ns.Start()
  156. return srv, nil
  157. }
  158. func (s *LesServer) APIs() []rpc.API {
  159. return []rpc.API{
  160. {
  161. Namespace: "les",
  162. Version: "1.0",
  163. Service: NewPrivateLightAPI(&s.lesCommons),
  164. Public: false,
  165. },
  166. {
  167. Namespace: "les",
  168. Version: "1.0",
  169. Service: NewPrivateLightServerAPI(s),
  170. Public: false,
  171. },
  172. {
  173. Namespace: "debug",
  174. Version: "1.0",
  175. Service: NewPrivateDebugAPI(s),
  176. Public: false,
  177. },
  178. }
  179. }
  180. func (s *LesServer) Protocols() []p2p.Protocol {
  181. ps := s.makeProtocols(ServerProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} {
  182. if p := s.getClient(id); p != nil {
  183. return p.Info()
  184. }
  185. return nil
  186. }, nil)
  187. // Add "les" ENR entries.
  188. for i := range ps {
  189. ps[i].Attributes = []enr.Entry{&lesEntry{
  190. VfxVersion: 1,
  191. }}
  192. }
  193. return ps
  194. }
  195. // Start starts the LES server
  196. func (s *LesServer) Start() error {
  197. s.privateKey = s.p2pSrv.PrivateKey
  198. s.broadcaster.setSignerKey(s.privateKey)
  199. s.handler.start()
  200. s.wg.Add(1)
  201. go s.capacityManagement()
  202. if s.p2pSrv.DiscV5 != nil {
  203. s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
  204. }
  205. return nil
  206. }
  207. // Stop stops the LES service
  208. func (s *LesServer) Stop() error {
  209. close(s.closeCh)
  210. s.clientPool.stop()
  211. s.ns.Stop()
  212. s.fcManager.Stop()
  213. s.costTracker.stop()
  214. s.handler.stop()
  215. s.servingQueue.stop()
  216. s.vfluxServer.Stop()
  217. // Note, bloom trie indexer is closed by parent bloombits indexer.
  218. s.chtIndexer.Close()
  219. s.lesDb.Close()
  220. s.wg.Wait()
  221. log.Info("Les server stopped")
  222. return nil
  223. }
  224. // capacityManagement starts an event handler loop that updates the recharge curve of
  225. // the client manager and adjusts the client pool's size according to the total
  226. // capacity updates coming from the client manager
  227. func (s *LesServer) capacityManagement() {
  228. defer s.wg.Done()
  229. processCh := make(chan bool, 100)
  230. sub := s.handler.blockchain.SubscribeBlockProcessingEvent(processCh)
  231. defer sub.Unsubscribe()
  232. totalRechargeCh := make(chan uint64, 100)
  233. totalRecharge := s.costTracker.subscribeTotalRecharge(totalRechargeCh)
  234. totalCapacityCh := make(chan uint64, 100)
  235. totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
  236. s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
  237. var (
  238. busy bool
  239. freePeers uint64
  240. blockProcess mclock.AbsTime
  241. )
  242. updateRecharge := func() {
  243. if busy {
  244. s.servingQueue.setThreads(s.threadsBusy)
  245. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
  246. } else {
  247. s.servingQueue.setThreads(s.threadsIdle)
  248. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 10, totalRecharge}, {totalRecharge, totalRecharge}})
  249. }
  250. }
  251. updateRecharge()
  252. for {
  253. select {
  254. case busy = <-processCh:
  255. if busy {
  256. blockProcess = mclock.Now()
  257. } else {
  258. blockProcessingTimer.Update(time.Duration(mclock.Now() - blockProcess))
  259. }
  260. updateRecharge()
  261. case totalRecharge = <-totalRechargeCh:
  262. totalRechargeGauge.Update(int64(totalRecharge))
  263. updateRecharge()
  264. case totalCapacity = <-totalCapacityCh:
  265. totalCapacityGauge.Update(int64(totalCapacity))
  266. newFreePeers := totalCapacity / s.minCapacity
  267. if newFreePeers < freePeers && newFreePeers < uint64(s.config.LightPeers) {
  268. log.Warn("Reduced free peer connections", "from", freePeers, "to", newFreePeers)
  269. }
  270. freePeers = newFreePeers
  271. s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
  272. case <-s.closeCh:
  273. return
  274. }
  275. }
  276. }
  277. func (s *LesServer) getClient(id enode.ID) *clientPeer {
  278. if node := s.ns.GetNode(id); node != nil {
  279. if p, ok := s.ns.GetField(node, clientPeerField).(*clientPeer); ok {
  280. return p
  281. }
  282. }
  283. return nil
  284. }
  285. func (s *LesServer) dropClient(id enode.ID) {
  286. if p := s.getClient(id); p != nil {
  287. p.Peer.Disconnect(p2p.DiscRequested)
  288. }
  289. }
  290. // ServiceInfo implements vfs.Service
  291. func (s *LesServer) ServiceInfo() (string, string) {
  292. return "les", "Ethereum light client service"
  293. }
  294. // Handle implements vfs.Service
  295. func (s *LesServer) Handle(id enode.ID, address string, name string, data []byte) []byte {
  296. switch name {
  297. case vflux.CapacityQueryName:
  298. return s.clientPool.serveCapQuery(id, address, data)
  299. default:
  300. return nil
  301. }
  302. }