server.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "crypto/ecdsa"
  20. "encoding/binary"
  21. "math"
  22. "sync"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/core"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/eth"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/les/flowcontrol"
  30. "github.com/ethereum/go-ethereum/light"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/p2p"
  33. "github.com/ethereum/go-ethereum/p2p/discv5"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. )
  36. type LesServer struct {
  37. lesCommons
  38. fcManager *flowcontrol.ClientManager // nil if our node is client only
  39. fcCostStats *requestCostStats
  40. defParams *flowcontrol.ServerParams
  41. lesTopics []discv5.Topic
  42. privateKey *ecdsa.PrivateKey
  43. quitSync chan struct{}
  44. }
  45. func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
  46. quitSync := make(chan struct{})
  47. pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
  48. if err != nil {
  49. return nil, err
  50. }
  51. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
  52. for i, pv := range AdvertiseProtocolVersions {
  53. lesTopics[i] = lesTopic(eth.BlockChain().Genesis().Hash(), pv)
  54. }
  55. srv := &LesServer{
  56. lesCommons: lesCommons{
  57. config: config,
  58. chainDb: eth.ChainDb(),
  59. chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil),
  60. bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil),
  61. protocolManager: pm,
  62. },
  63. quitSync: quitSync,
  64. lesTopics: lesTopics,
  65. }
  66. logger := log.New()
  67. chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility
  68. chtV2SectionCount := chtV1SectionCount / (light.CHTFrequencyClient / light.CHTFrequencyServer)
  69. if chtV2SectionCount != 0 {
  70. // convert to LES/2 section
  71. chtLastSection := chtV2SectionCount - 1
  72. // convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead
  73. chtLastSectionV1 := (chtLastSection+1)*(light.CHTFrequencyClient/light.CHTFrequencyServer) - 1
  74. chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1)
  75. chtRoot := light.GetChtV2Root(pm.chainDb, chtLastSection, chtSectionHead)
  76. logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot)
  77. }
  78. bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections()
  79. if bloomTrieSectionCount != 0 {
  80. bloomTrieLastSection := bloomTrieSectionCount - 1
  81. bloomTrieSectionHead := srv.bloomTrieIndexer.SectionHead(bloomTrieLastSection)
  82. bloomTrieRoot := light.GetBloomTrieRoot(pm.chainDb, bloomTrieLastSection, bloomTrieSectionHead)
  83. logger.Info("Loaded bloom trie", "section", bloomTrieLastSection, "head", bloomTrieSectionHead, "root", bloomTrieRoot)
  84. }
  85. srv.chtIndexer.Start(eth.BlockChain())
  86. pm.server = srv
  87. srv.defParams = &flowcontrol.ServerParams{
  88. BufLimit: 300000000,
  89. MinRecharge: 50000,
  90. }
  91. srv.fcManager = flowcontrol.NewClientManager(uint64(config.LightServ), 10, 1000000000)
  92. srv.fcCostStats = newCostStats(eth.ChainDb())
  93. return srv, nil
  94. }
  95. func (s *LesServer) Protocols() []p2p.Protocol {
  96. return s.makeProtocols(ServerProtocolVersions)
  97. }
  98. // Start starts the LES server
  99. func (s *LesServer) Start(srvr *p2p.Server) {
  100. s.protocolManager.Start(s.config.LightPeers)
  101. if srvr.DiscV5 != nil {
  102. for _, topic := range s.lesTopics {
  103. topic := topic
  104. go func() {
  105. logger := log.New("topic", topic)
  106. logger.Info("Starting topic registration")
  107. defer logger.Info("Terminated topic registration")
  108. srvr.DiscV5.RegisterTopic(topic, s.quitSync)
  109. }()
  110. }
  111. }
  112. s.privateKey = srvr.PrivateKey
  113. s.protocolManager.blockLoop()
  114. }
  115. func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) {
  116. bloomIndexer.AddChildIndexer(s.bloomTrieIndexer)
  117. }
  118. // Stop stops the LES service
  119. func (s *LesServer) Stop() {
  120. s.chtIndexer.Close()
  121. // bloom trie indexer is closed by parent bloombits indexer
  122. s.fcCostStats.store()
  123. s.fcManager.Stop()
  124. go func() {
  125. <-s.protocolManager.noMorePeers
  126. }()
  127. s.protocolManager.Stop()
  128. }
  129. type requestCosts struct {
  130. baseCost, reqCost uint64
  131. }
  132. type requestCostTable map[uint64]*requestCosts
  133. type RequestCostList []struct {
  134. MsgCode, BaseCost, ReqCost uint64
  135. }
  136. func (list RequestCostList) decode() requestCostTable {
  137. table := make(requestCostTable)
  138. for _, e := range list {
  139. table[e.MsgCode] = &requestCosts{
  140. baseCost: e.BaseCost,
  141. reqCost: e.ReqCost,
  142. }
  143. }
  144. return table
  145. }
  146. type linReg struct {
  147. sumX, sumY, sumXX, sumXY float64
  148. cnt uint64
  149. }
  150. const linRegMaxCnt = 100000
  151. func (l *linReg) add(x, y float64) {
  152. if l.cnt >= linRegMaxCnt {
  153. sub := float64(l.cnt+1-linRegMaxCnt) / linRegMaxCnt
  154. l.sumX -= l.sumX * sub
  155. l.sumY -= l.sumY * sub
  156. l.sumXX -= l.sumXX * sub
  157. l.sumXY -= l.sumXY * sub
  158. l.cnt = linRegMaxCnt - 1
  159. }
  160. l.cnt++
  161. l.sumX += x
  162. l.sumY += y
  163. l.sumXX += x * x
  164. l.sumXY += x * y
  165. }
  166. func (l *linReg) calc() (b, m float64) {
  167. if l.cnt == 0 {
  168. return 0, 0
  169. }
  170. cnt := float64(l.cnt)
  171. d := cnt*l.sumXX - l.sumX*l.sumX
  172. if d < 0.001 {
  173. return l.sumY / cnt, 0
  174. }
  175. m = (cnt*l.sumXY - l.sumX*l.sumY) / d
  176. b = (l.sumY / cnt) - (m * l.sumX / cnt)
  177. return b, m
  178. }
  179. func (l *linReg) toBytes() []byte {
  180. var arr [40]byte
  181. binary.BigEndian.PutUint64(arr[0:8], math.Float64bits(l.sumX))
  182. binary.BigEndian.PutUint64(arr[8:16], math.Float64bits(l.sumY))
  183. binary.BigEndian.PutUint64(arr[16:24], math.Float64bits(l.sumXX))
  184. binary.BigEndian.PutUint64(arr[24:32], math.Float64bits(l.sumXY))
  185. binary.BigEndian.PutUint64(arr[32:40], l.cnt)
  186. return arr[:]
  187. }
  188. func linRegFromBytes(data []byte) *linReg {
  189. if len(data) != 40 {
  190. return nil
  191. }
  192. l := &linReg{}
  193. l.sumX = math.Float64frombits(binary.BigEndian.Uint64(data[0:8]))
  194. l.sumY = math.Float64frombits(binary.BigEndian.Uint64(data[8:16]))
  195. l.sumXX = math.Float64frombits(binary.BigEndian.Uint64(data[16:24]))
  196. l.sumXY = math.Float64frombits(binary.BigEndian.Uint64(data[24:32]))
  197. l.cnt = binary.BigEndian.Uint64(data[32:40])
  198. return l
  199. }
  200. type requestCostStats struct {
  201. lock sync.RWMutex
  202. db ethdb.Database
  203. stats map[uint64]*linReg
  204. }
  205. type requestCostStatsRlp []struct {
  206. MsgCode uint64
  207. Data []byte
  208. }
  209. var rcStatsKey = []byte("_requestCostStats")
  210. func newCostStats(db ethdb.Database) *requestCostStats {
  211. stats := make(map[uint64]*linReg)
  212. for _, code := range reqList {
  213. stats[code] = &linReg{cnt: 100}
  214. }
  215. if db != nil {
  216. data, err := db.Get(rcStatsKey)
  217. var statsRlp requestCostStatsRlp
  218. if err == nil {
  219. err = rlp.DecodeBytes(data, &statsRlp)
  220. }
  221. if err == nil {
  222. for _, r := range statsRlp {
  223. if stats[r.MsgCode] != nil {
  224. if l := linRegFromBytes(r.Data); l != nil {
  225. stats[r.MsgCode] = l
  226. }
  227. }
  228. }
  229. }
  230. }
  231. return &requestCostStats{
  232. db: db,
  233. stats: stats,
  234. }
  235. }
  236. func (s *requestCostStats) store() {
  237. s.lock.Lock()
  238. defer s.lock.Unlock()
  239. statsRlp := make(requestCostStatsRlp, len(reqList))
  240. for i, code := range reqList {
  241. statsRlp[i].MsgCode = code
  242. statsRlp[i].Data = s.stats[code].toBytes()
  243. }
  244. if data, err := rlp.EncodeToBytes(statsRlp); err == nil {
  245. s.db.Put(rcStatsKey, data)
  246. }
  247. }
  248. func (s *requestCostStats) getCurrentList() RequestCostList {
  249. s.lock.Lock()
  250. defer s.lock.Unlock()
  251. list := make(RequestCostList, len(reqList))
  252. //fmt.Println("RequestCostList")
  253. for idx, code := range reqList {
  254. b, m := s.stats[code].calc()
  255. //fmt.Println(code, s.stats[code].cnt, b/1000000, m/1000000)
  256. if m < 0 {
  257. b += m
  258. m = 0
  259. }
  260. if b < 0 {
  261. b = 0
  262. }
  263. list[idx].MsgCode = code
  264. list[idx].BaseCost = uint64(b * 2)
  265. list[idx].ReqCost = uint64(m * 2)
  266. }
  267. return list
  268. }
  269. func (s *requestCostStats) update(msgCode, reqCnt, cost uint64) {
  270. s.lock.Lock()
  271. defer s.lock.Unlock()
  272. c, ok := s.stats[msgCode]
  273. if !ok || reqCnt == 0 {
  274. return
  275. }
  276. c.add(float64(reqCnt), float64(cost))
  277. }
  278. func (pm *ProtocolManager) blockLoop() {
  279. pm.wg.Add(1)
  280. headCh := make(chan core.ChainHeadEvent, 10)
  281. headSub := pm.blockchain.SubscribeChainHeadEvent(headCh)
  282. go func() {
  283. var lastHead *types.Header
  284. lastBroadcastTd := common.Big0
  285. for {
  286. select {
  287. case ev := <-headCh:
  288. peers := pm.peers.AllPeers()
  289. if len(peers) > 0 {
  290. header := ev.Block.Header()
  291. hash := header.Hash()
  292. number := header.Number.Uint64()
  293. td := rawdb.ReadTd(pm.chainDb, hash, number)
  294. if td != nil && td.Cmp(lastBroadcastTd) > 0 {
  295. var reorg uint64
  296. if lastHead != nil {
  297. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(pm.chainDb, header, lastHead).Number.Uint64()
  298. }
  299. lastHead = header
  300. lastBroadcastTd = td
  301. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  302. announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg}
  303. var (
  304. signed bool
  305. signedAnnounce announceData
  306. )
  307. for _, p := range peers {
  308. switch p.announceType {
  309. case announceTypeSimple:
  310. select {
  311. case p.announceChn <- announce:
  312. default:
  313. pm.removePeer(p.id)
  314. }
  315. case announceTypeSigned:
  316. if !signed {
  317. signedAnnounce = announce
  318. signedAnnounce.sign(pm.server.privateKey)
  319. signed = true
  320. }
  321. select {
  322. case p.announceChn <- signedAnnounce:
  323. default:
  324. pm.removePeer(p.id)
  325. }
  326. }
  327. }
  328. }
  329. }
  330. case <-pm.quitSync:
  331. headSub.Unsubscribe()
  332. pm.wg.Done()
  333. return
  334. }
  335. }
  336. }()
  337. }