server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/core"
  24. "github.com/ethereum/go-ethereum/core/rawdb"
  25. "github.com/ethereum/go-ethereum/core/types"
  26. "github.com/ethereum/go-ethereum/eth"
  27. "github.com/ethereum/go-ethereum/les/csvlogger"
  28. "github.com/ethereum/go-ethereum/les/flowcontrol"
  29. "github.com/ethereum/go-ethereum/light"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/p2p/discv5"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/rpc"
  35. )
  36. const bufLimitRatio = 6000 // fixed bufLimit/MRR ratio
  37. const (
  38. logFileName = "" // csv log file name (disabled if empty)
  39. logClientPoolMetrics = true // log client pool metrics
  40. logClientPoolEvents = false // detailed client pool event logging
  41. logRequestServing = true // log request serving metrics and events
  42. logBlockProcEvents = true // log block processing events
  43. logProtocolHandler = true // log protocol handler events
  44. )
  45. type LesServer struct {
  46. lesCommons
  47. archiveMode bool // Flag whether the ethereum node runs in archive mode.
  48. fcManager *flowcontrol.ClientManager // nil if our node is client only
  49. costTracker *costTracker
  50. testCost uint64
  51. defParams flowcontrol.ServerParams
  52. lesTopics []discv5.Topic
  53. privateKey *ecdsa.PrivateKey
  54. quitSync chan struct{}
  55. onlyAnnounce bool
  56. csvLogger *csvlogger.Logger
  57. logTotalCap *csvlogger.Channel
  58. thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode
  59. maxPeers int
  60. minCapacity, freeClientCap uint64
  61. freeClientPool *freeClientPool
  62. priorityClientPool *priorityClientPool
  63. }
  64. func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
  65. var csvLogger *csvlogger.Logger
  66. if logFileName != "" {
  67. csvLogger = csvlogger.NewLogger(logFileName, time.Second*10, "event, peerId")
  68. }
  69. quitSync := make(chan struct{})
  70. pm, err := NewProtocolManager(
  71. eth.BlockChain().Config(),
  72. light.DefaultServerIndexerConfig,
  73. false,
  74. config.NetworkId,
  75. eth.EventMux(),
  76. eth.Engine(),
  77. newPeerSet(),
  78. eth.BlockChain(),
  79. eth.TxPool(),
  80. eth.ChainDb(),
  81. nil,
  82. nil,
  83. nil,
  84. quitSync,
  85. new(sync.WaitGroup),
  86. config.ULC,
  87. eth.Synced)
  88. if err != nil {
  89. return nil, err
  90. }
  91. if logProtocolHandler {
  92. pm.logger = csvLogger
  93. }
  94. requestLogger := csvLogger
  95. if !logRequestServing {
  96. requestLogger = nil
  97. }
  98. pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100, requestLogger)
  99. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
  100. for i, pv := range AdvertiseProtocolVersions {
  101. lesTopics[i] = lesTopic(eth.BlockChain().Genesis().Hash(), pv)
  102. }
  103. srv := &LesServer{
  104. lesCommons: lesCommons{
  105. config: config,
  106. chainDb: eth.ChainDb(),
  107. iConfig: light.DefaultServerIndexerConfig,
  108. chtIndexer: light.NewChtIndexer(eth.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations),
  109. bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency),
  110. protocolManager: pm,
  111. },
  112. archiveMode: eth.ArchiveMode(),
  113. quitSync: quitSync,
  114. lesTopics: lesTopics,
  115. onlyAnnounce: config.OnlyAnnounce,
  116. csvLogger: csvLogger,
  117. logTotalCap: requestLogger.NewChannel("totalCapacity", 0.01),
  118. }
  119. srv.costTracker, srv.minCapacity = newCostTracker(eth.ChainDb(), config, requestLogger)
  120. logger := log.New()
  121. pm.server = srv
  122. srv.thcNormal = config.LightServ * 4 / 100
  123. if srv.thcNormal < 4 {
  124. srv.thcNormal = 4
  125. }
  126. srv.thcBlockProcessing = config.LightServ/100 + 1
  127. srv.fcManager = flowcontrol.NewClientManager(nil, &mclock.System{})
  128. chtSectionCount, _, _ := srv.chtIndexer.Sections()
  129. if chtSectionCount != 0 {
  130. chtLastSection := chtSectionCount - 1
  131. chtSectionHead := srv.chtIndexer.SectionHead(chtLastSection)
  132. chtRoot := light.GetChtRoot(pm.chainDb, chtLastSection, chtSectionHead)
  133. logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot)
  134. }
  135. bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections()
  136. if bloomTrieSectionCount != 0 {
  137. bloomTrieLastSection := bloomTrieSectionCount - 1
  138. bloomTrieSectionHead := srv.bloomTrieIndexer.SectionHead(bloomTrieLastSection)
  139. bloomTrieRoot := light.GetBloomTrieRoot(pm.chainDb, bloomTrieLastSection, bloomTrieSectionHead)
  140. logger.Info("Loaded bloom trie", "section", bloomTrieLastSection, "head", bloomTrieSectionHead, "root", bloomTrieRoot)
  141. }
  142. srv.chtIndexer.Start(eth.BlockChain())
  143. return srv, nil
  144. }
  145. func (s *LesServer) APIs() []rpc.API {
  146. return []rpc.API{
  147. {
  148. Namespace: "les",
  149. Version: "1.0",
  150. Service: NewPrivateLightServerAPI(s),
  151. Public: false,
  152. },
  153. }
  154. }
  155. // startEventLoop starts an event handler loop that updates the recharge curve of
  156. // the client manager and adjusts the client pool's size according to the total
  157. // capacity updates coming from the client manager
  158. func (s *LesServer) startEventLoop() {
  159. s.protocolManager.wg.Add(1)
  160. blockProcLogger := s.csvLogger
  161. if !logBlockProcEvents {
  162. blockProcLogger = nil
  163. }
  164. var processing, procLast bool
  165. blockProcFeed := make(chan bool, 100)
  166. s.protocolManager.blockchain.(*core.BlockChain).SubscribeBlockProcessingEvent(blockProcFeed)
  167. totalRechargeCh := make(chan uint64, 100)
  168. totalRecharge := s.costTracker.subscribeTotalRecharge(totalRechargeCh)
  169. totalCapacityCh := make(chan uint64, 100)
  170. updateRecharge := func() {
  171. if processing {
  172. if !procLast {
  173. blockProcLogger.Event("block processing started")
  174. }
  175. s.protocolManager.servingQueue.setThreads(s.thcBlockProcessing)
  176. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
  177. } else {
  178. if procLast {
  179. blockProcLogger.Event("block processing finished")
  180. }
  181. s.protocolManager.servingQueue.setThreads(s.thcNormal)
  182. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 16, totalRecharge / 2}, {totalRecharge / 2, totalRecharge / 2}, {totalRecharge, totalRecharge}})
  183. }
  184. procLast = processing
  185. }
  186. updateRecharge()
  187. totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
  188. s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
  189. var maxFreePeers uint64
  190. go func() {
  191. for {
  192. select {
  193. case processing = <-blockProcFeed:
  194. updateRecharge()
  195. case totalRecharge = <-totalRechargeCh:
  196. updateRecharge()
  197. case totalCapacity = <-totalCapacityCh:
  198. s.logTotalCap.Update(float64(totalCapacity))
  199. newFreePeers := totalCapacity / s.freeClientCap
  200. if newFreePeers < maxFreePeers && newFreePeers < uint64(s.maxPeers) {
  201. log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers)
  202. }
  203. maxFreePeers = newFreePeers
  204. s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
  205. case <-s.protocolManager.quitSync:
  206. s.protocolManager.wg.Done()
  207. return
  208. }
  209. }
  210. }()
  211. }
  212. func (s *LesServer) Protocols() []p2p.Protocol {
  213. return s.makeProtocols(ServerProtocolVersions)
  214. }
  215. // Start starts the LES server
  216. func (s *LesServer) Start(srvr *p2p.Server) {
  217. s.maxPeers = s.config.LightPeers
  218. totalRecharge := s.costTracker.totalRecharge()
  219. if s.maxPeers > 0 {
  220. s.freeClientCap = s.minCapacity //totalRecharge / uint64(s.maxPeers)
  221. if s.freeClientCap < s.minCapacity {
  222. s.freeClientCap = s.minCapacity
  223. }
  224. if s.freeClientCap > 0 {
  225. s.defParams = flowcontrol.ServerParams{
  226. BufLimit: s.freeClientCap * bufLimitRatio,
  227. MinRecharge: s.freeClientCap,
  228. }
  229. }
  230. }
  231. maxCapacity := s.freeClientCap * uint64(s.maxPeers)
  232. if totalRecharge > maxCapacity {
  233. maxCapacity = totalRecharge
  234. }
  235. s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2)
  236. poolMetricsLogger := s.csvLogger
  237. if !logClientPoolMetrics {
  238. poolMetricsLogger = nil
  239. }
  240. poolEventLogger := s.csvLogger
  241. if !logClientPoolEvents {
  242. poolEventLogger = nil
  243. }
  244. s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }, poolMetricsLogger, poolEventLogger)
  245. s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool, poolMetricsLogger, poolEventLogger)
  246. s.protocolManager.peers.notify(s.priorityClientPool)
  247. s.csvLogger.Start()
  248. s.startEventLoop()
  249. s.protocolManager.Start(s.config.LightPeers)
  250. if srvr.DiscV5 != nil {
  251. for _, topic := range s.lesTopics {
  252. topic := topic
  253. go func() {
  254. logger := log.New("topic", topic)
  255. logger.Info("Starting topic registration")
  256. defer logger.Info("Terminated topic registration")
  257. srvr.DiscV5.RegisterTopic(topic, s.quitSync)
  258. }()
  259. }
  260. }
  261. s.privateKey = srvr.PrivateKey
  262. s.protocolManager.blockLoop()
  263. }
  264. func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
  265. bloomIndexer.AddChildIndexer(s.bloomTrieIndexer)
  266. }
  267. // Stop stops the LES service
  268. func (s *LesServer) Stop() {
  269. s.fcManager.Stop()
  270. s.chtIndexer.Close()
  271. // bloom trie indexer is closed by parent bloombits indexer
  272. go func() {
  273. <-s.protocolManager.noMorePeers
  274. }()
  275. s.freeClientPool.stop()
  276. s.costTracker.stop()
  277. s.protocolManager.Stop()
  278. s.csvLogger.Stop()
  279. }
  280. // todo(rjl493456442) separate client and server implementation.
  281. func (pm *ProtocolManager) blockLoop() {
  282. pm.wg.Add(1)
  283. headCh := make(chan core.ChainHeadEvent, 10)
  284. headSub := pm.blockchain.SubscribeChainHeadEvent(headCh)
  285. go func() {
  286. var lastHead *types.Header
  287. lastBroadcastTd := common.Big0
  288. for {
  289. select {
  290. case ev := <-headCh:
  291. peers := pm.peers.AllPeers()
  292. if len(peers) > 0 {
  293. header := ev.Block.Header()
  294. hash := header.Hash()
  295. number := header.Number.Uint64()
  296. td := rawdb.ReadTd(pm.chainDb, hash, number)
  297. if td != nil && td.Cmp(lastBroadcastTd) > 0 {
  298. var reorg uint64
  299. if lastHead != nil {
  300. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(pm.chainDb, header, lastHead).Number.Uint64()
  301. }
  302. lastHead = header
  303. lastBroadcastTd = td
  304. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  305. announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg}
  306. var (
  307. signed bool
  308. signedAnnounce announceData
  309. )
  310. for _, p := range peers {
  311. p := p
  312. switch p.announceType {
  313. case announceTypeSimple:
  314. p.queueSend(func() { p.SendAnnounce(announce) })
  315. case announceTypeSigned:
  316. if !signed {
  317. signedAnnounce = announce
  318. signedAnnounce.sign(pm.server.privateKey)
  319. signed = true
  320. }
  321. p.queueSend(func() { p.SendAnnounce(signedAnnounce) })
  322. }
  323. }
  324. }
  325. }
  326. case <-pm.quitSync:
  327. headSub.Unsubscribe()
  328. pm.wg.Done()
  329. return
  330. }
  331. }
  332. }()
  333. }