server.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/core"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/eth"
  28. "github.com/ethereum/go-ethereum/les/flowcontrol"
  29. "github.com/ethereum/go-ethereum/light"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/p2p/discv5"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/rpc"
  35. )
  36. const bufLimitRatio = 6000 // fixed bufLimit/MRR ratio
  37. type LesServer struct {
  38. lesCommons
  39. archiveMode bool // Flag whether the ethereum node runs in archive mode.
  40. fcManager *flowcontrol.ClientManager // nil if our node is client only
  41. costTracker *costTracker
  42. testCost uint64
  43. defParams flowcontrol.ServerParams
  44. lesTopics []discv5.Topic
  45. privateKey *ecdsa.PrivateKey
  46. quitSync chan struct{}
  47. onlyAnnounce bool
  48. thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode
  49. maxPeers int
  50. minCapacity, freeClientCap uint64
  51. freeClientPool *freeClientPool
  52. }
  53. func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
  54. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
  55. for i, pv := range AdvertiseProtocolVersions {
  56. lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv)
  57. }
  58. quitSync := make(chan struct{})
  59. srv := &LesServer{
  60. lesCommons: lesCommons{
  61. config: config,
  62. iConfig: light.DefaultServerIndexerConfig,
  63. chainDb: e.ChainDb(),
  64. chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations),
  65. bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency),
  66. },
  67. archiveMode: e.ArchiveMode(),
  68. quitSync: quitSync,
  69. lesTopics: lesTopics,
  70. onlyAnnounce: config.UltraLightOnlyAnnounce,
  71. }
  72. srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
  73. logger := log.New()
  74. srv.thcNormal = config.LightServ * 4 / 100
  75. if srv.thcNormal < 4 {
  76. srv.thcNormal = 4
  77. }
  78. srv.thcBlockProcessing = config.LightServ/100 + 1
  79. srv.fcManager = flowcontrol.NewClientManager(nil, &mclock.System{})
  80. checkpoint := srv.latestLocalCheckpoint()
  81. if !checkpoint.Empty() {
  82. logger.Info("Loaded latest checkpoint", "section", checkpoint.SectionIndex, "head", checkpoint.SectionHead,
  83. "chtroot", checkpoint.CHTRoot, "bloomroot", checkpoint.BloomRoot)
  84. }
  85. srv.chtIndexer.Start(e.BlockChain())
  86. oracle := config.CheckpointOracle
  87. if oracle == nil {
  88. oracle = params.CheckpointOracles[e.BlockChain().Genesis().Hash()]
  89. }
  90. registrar := newCheckpointOracle(oracle, srv.getLocalCheckpoint)
  91. // TODO(rjl493456442) Checkpoint is useless for les server, separate handler for client and server.
  92. pm, err := NewProtocolManager(e.BlockChain().Config(), nil, light.DefaultServerIndexerConfig, config.UltraLightServers, config.UltraLightFraction, false, config.NetworkId, e.EventMux(), newPeerSet(), e.BlockChain(), e.TxPool(), e.ChainDb(), nil, nil, registrar, quitSync, new(sync.WaitGroup), e.Synced)
  93. if err != nil {
  94. return nil, err
  95. }
  96. srv.protocolManager = pm
  97. pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100)
  98. pm.server = srv
  99. return srv, nil
  100. }
  101. func (s *LesServer) APIs() []rpc.API {
  102. return []rpc.API{
  103. {
  104. Namespace: "les",
  105. Version: "1.0",
  106. Service: NewPrivateLightAPI(&s.lesCommons, s.protocolManager.reg),
  107. Public: false,
  108. },
  109. }
  110. }
  111. // startEventLoop starts an event handler loop that updates the recharge curve of
  112. // the client manager and adjusts the client pool's size according to the total
  113. // capacity updates coming from the client manager
  114. func (s *LesServer) startEventLoop() {
  115. s.protocolManager.wg.Add(1)
  116. var (
  117. processing, procLast bool
  118. procStarted time.Time
  119. )
  120. blockProcFeed := make(chan bool, 100)
  121. s.protocolManager.blockchain.(*core.BlockChain).SubscribeBlockProcessingEvent(blockProcFeed)
  122. totalRechargeCh := make(chan uint64, 100)
  123. totalRecharge := s.costTracker.subscribeTotalRecharge(totalRechargeCh)
  124. totalCapacityCh := make(chan uint64, 100)
  125. updateRecharge := func() {
  126. if processing {
  127. if !procLast {
  128. procStarted = time.Now()
  129. }
  130. s.protocolManager.servingQueue.setThreads(s.thcBlockProcessing)
  131. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
  132. } else {
  133. if procLast {
  134. blockProcessingTimer.UpdateSince(procStarted)
  135. }
  136. s.protocolManager.servingQueue.setThreads(s.thcNormal)
  137. s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 16, totalRecharge / 2}, {totalRecharge / 2, totalRecharge / 2}, {totalRecharge, totalRecharge}})
  138. }
  139. procLast = processing
  140. }
  141. updateRecharge()
  142. totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
  143. s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
  144. var maxFreePeers uint64
  145. go func() {
  146. for {
  147. select {
  148. case processing = <-blockProcFeed:
  149. updateRecharge()
  150. case totalRecharge = <-totalRechargeCh:
  151. updateRecharge()
  152. case totalCapacity = <-totalCapacityCh:
  153. totalCapacityGauge.Update(int64(totalCapacity))
  154. newFreePeers := totalCapacity / s.freeClientCap
  155. if newFreePeers < maxFreePeers && newFreePeers < uint64(s.maxPeers) {
  156. log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers)
  157. }
  158. maxFreePeers = newFreePeers
  159. s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
  160. case <-s.protocolManager.quitSync:
  161. s.protocolManager.wg.Done()
  162. return
  163. }
  164. }
  165. }()
  166. }
  167. func (s *LesServer) Protocols() []p2p.Protocol {
  168. return s.makeProtocols(ServerProtocolVersions)
  169. }
  170. // Start starts the LES server
  171. func (s *LesServer) Start(srvr *p2p.Server) {
  172. s.maxPeers = s.config.LightPeers
  173. totalRecharge := s.costTracker.totalRecharge()
  174. if s.maxPeers > 0 {
  175. s.freeClientCap = s.minCapacity //totalRecharge / uint64(s.maxPeers)
  176. if s.freeClientCap < s.minCapacity {
  177. s.freeClientCap = s.minCapacity
  178. }
  179. if s.freeClientCap > 0 {
  180. s.defParams = flowcontrol.ServerParams{
  181. BufLimit: s.freeClientCap * bufLimitRatio,
  182. MinRecharge: s.freeClientCap,
  183. }
  184. }
  185. }
  186. maxCapacity := s.freeClientCap * uint64(s.maxPeers)
  187. if totalRecharge > maxCapacity {
  188. maxCapacity = totalRecharge
  189. }
  190. s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2)
  191. s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) })
  192. s.protocolManager.peers.notify(s.freeClientPool)
  193. s.startEventLoop()
  194. s.protocolManager.Start(s.config.LightPeers)
  195. if srvr.DiscV5 != nil {
  196. for _, topic := range s.lesTopics {
  197. topic := topic
  198. go func() {
  199. logger := log.New("topic", topic)
  200. logger.Info("Starting topic registration")
  201. defer logger.Info("Terminated topic registration")
  202. srvr.DiscV5.RegisterTopic(topic, s.quitSync)
  203. }()
  204. }
  205. }
  206. s.privateKey = srvr.PrivateKey
  207. s.protocolManager.blockLoop()
  208. }
  209. func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
  210. bloomIndexer.AddChildIndexer(s.bloomTrieIndexer)
  211. }
  212. // SetClient sets the rpc client and starts running checkpoint contract if it is not yet watched.
  213. func (s *LesServer) SetContractBackend(backend bind.ContractBackend) {
  214. if s.protocolManager.reg != nil {
  215. s.protocolManager.reg.start(backend)
  216. }
  217. }
  218. // Stop stops the LES service
  219. func (s *LesServer) Stop() {
  220. s.fcManager.Stop()
  221. s.chtIndexer.Close()
  222. // bloom trie indexer is closed by parent bloombits indexer
  223. go func() {
  224. <-s.protocolManager.noMorePeers
  225. }()
  226. s.freeClientPool.stop()
  227. s.costTracker.stop()
  228. s.protocolManager.Stop()
  229. }
  230. // todo(rjl493456442) separate client and server implementation.
  231. func (pm *ProtocolManager) blockLoop() {
  232. pm.wg.Add(1)
  233. headCh := make(chan core.ChainHeadEvent, 10)
  234. headSub := pm.blockchain.SubscribeChainHeadEvent(headCh)
  235. go func() {
  236. var lastHead *types.Header
  237. lastBroadcastTd := common.Big0
  238. for {
  239. select {
  240. case ev := <-headCh:
  241. peers := pm.peers.AllPeers()
  242. if len(peers) > 0 {
  243. header := ev.Block.Header()
  244. hash := header.Hash()
  245. number := header.Number.Uint64()
  246. td := rawdb.ReadTd(pm.chainDb, hash, number)
  247. if td != nil && td.Cmp(lastBroadcastTd) > 0 {
  248. var reorg uint64
  249. if lastHead != nil {
  250. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(pm.chainDb, header, lastHead).Number.Uint64()
  251. }
  252. lastHead = header
  253. lastBroadcastTd = td
  254. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  255. announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg}
  256. var (
  257. signed bool
  258. signedAnnounce announceData
  259. )
  260. for _, p := range peers {
  261. p := p
  262. switch p.announceType {
  263. case announceTypeSimple:
  264. p.queueSend(func() { p.SendAnnounce(announce) })
  265. case announceTypeSigned:
  266. if !signed {
  267. signedAnnounce = announce
  268. signedAnnounce.sign(pm.server.privateKey)
  269. signed = true
  270. }
  271. p.queueSend(func() { p.SendAnnounce(signedAnnounce) })
  272. }
  273. }
  274. }
  275. }
  276. case <-pm.quitSync:
  277. headSub.Unsubscribe()
  278. pm.wg.Done()
  279. return
  280. }
  281. }
  282. }()
  283. }