ethstats.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package ethstats implements the network stats reporting service.
  17. package ethstats
  18. import (
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "regexp"
  24. "runtime"
  25. "strconv"
  26. "strings"
  27. "time"
  28. "github.com/ethereum/go-ethereum/common"
  29. "github.com/ethereum/go-ethereum/core"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/eth"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/les"
  34. "github.com/ethereum/go-ethereum/logger"
  35. "github.com/ethereum/go-ethereum/logger/glog"
  36. "github.com/ethereum/go-ethereum/node"
  37. "github.com/ethereum/go-ethereum/p2p"
  38. "github.com/ethereum/go-ethereum/rpc"
  39. "golang.org/x/net/websocket"
  40. )
  41. // historyUpdateRange is the number of blocks a node should report upon login or
  42. // history request.
  43. const historyUpdateRange = 50
  44. // Service implements an Ethereum netstats reporting daemon that pushes local
  45. // chain statistics up to a monitoring server.
  46. type Service struct {
  47. stack *node.Node // Temporary workaround, remove when API finalized
  48. server *p2p.Server // Peer-to-peer server to retrieve networking infos
  49. eth *eth.Ethereum // Full Ethereum service if monitoring a full node
  50. les *les.LightEthereum // Light Ethereum service if monitoring a light node
  51. node string // Name of the node to display on the monitoring page
  52. pass string // Password to authorize access to the monitoring page
  53. host string // Remote address of the monitoring service
  54. pongCh chan struct{} // Pong notifications are fed into this channel
  55. histCh chan []uint64 // History request block numbers are fed into this channel
  56. }
  57. // New returns a monitoring service ready for stats reporting.
  58. func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Service, error) {
  59. // Parse the netstats connection url
  60. re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)")
  61. parts := re.FindStringSubmatch(url)
  62. if len(parts) != 5 {
  63. return nil, fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url)
  64. }
  65. // Assemble and return the stats service
  66. return &Service{
  67. eth: ethServ,
  68. les: lesServ,
  69. node: parts[1],
  70. pass: parts[3],
  71. host: parts[4],
  72. pongCh: make(chan struct{}),
  73. histCh: make(chan []uint64, 1),
  74. }, nil
  75. }
  76. // Protocols implements node.Service, returning the P2P network protocols used
  77. // by the stats service (nil as it doesn't use the devp2p overlay network).
  78. func (s *Service) Protocols() []p2p.Protocol { return nil }
  79. // APIs implements node.Service, returning the RPC API endpoints provided by the
  80. // stats service (nil as it doesn't provide any user callable APIs).
  81. func (s *Service) APIs() []rpc.API { return nil }
  82. // Start implements node.Service, starting up the monitoring and reporting daemon.
  83. func (s *Service) Start(server *p2p.Server) error {
  84. s.server = server
  85. go s.loop()
  86. glog.V(logger.Info).Infoln("Stats daemon started")
  87. return nil
  88. }
  89. // Stop implements node.Service, terminating the monitoring and reporting daemon.
  90. func (s *Service) Stop() error {
  91. glog.V(logger.Info).Infoln("Stats daemon stopped")
  92. return nil
  93. }
  94. // loop keeps trying to connect to the netstats server, reporting chain events
  95. // until termination.
  96. func (s *Service) loop() {
  97. // Subscribe tso chain events to execute updates on
  98. var emux *event.TypeMux
  99. if s.eth != nil {
  100. emux = s.eth.EventMux()
  101. } else {
  102. emux = s.les.EventMux()
  103. }
  104. headSub := emux.Subscribe(core.ChainHeadEvent{})
  105. defer headSub.Unsubscribe()
  106. txSub := emux.Subscribe(core.TxPreEvent{})
  107. defer txSub.Unsubscribe()
  108. // Loop reporting until termination
  109. for {
  110. // Establish a websocket connection to the server and authenticate the node
  111. url := fmt.Sprintf("%s/api", s.host)
  112. if !strings.Contains(url, "://") {
  113. url = "wss://" + url
  114. }
  115. conn, err := websocket.Dial(url, "", "http://localhost/")
  116. if err != nil {
  117. glog.V(logger.Warn).Infof("Stats server unreachable: %v", err)
  118. time.Sleep(10 * time.Second)
  119. continue
  120. }
  121. in := json.NewDecoder(conn)
  122. out := json.NewEncoder(conn)
  123. if err = s.login(in, out); err != nil {
  124. glog.V(logger.Warn).Infof("Stats login failed: %v", err)
  125. conn.Close()
  126. time.Sleep(10 * time.Second)
  127. continue
  128. }
  129. go s.readLoop(conn, in)
  130. // Send the initial stats so our node looks decent from the get go
  131. if err = s.report(out); err != nil {
  132. glog.V(logger.Warn).Infof("Initial stats report failed: %v", err)
  133. conn.Close()
  134. continue
  135. }
  136. if err = s.reportHistory(out, nil); err != nil {
  137. glog.V(logger.Warn).Infof("History report failed: %v", err)
  138. conn.Close()
  139. continue
  140. }
  141. // Keep sending status updates until the connection breaks
  142. fullReport := time.NewTicker(15 * time.Second)
  143. for err == nil {
  144. select {
  145. case <-fullReport.C:
  146. if err = s.report(out); err != nil {
  147. glog.V(logger.Warn).Infof("Full stats report failed: %v", err)
  148. }
  149. case list := <-s.histCh:
  150. if err = s.reportHistory(out, list); err != nil {
  151. glog.V(logger.Warn).Infof("Block history report failed: %v", err)
  152. }
  153. case head, ok := <-headSub.Chan():
  154. if !ok { // node stopped
  155. conn.Close()
  156. return
  157. }
  158. if err = s.reportBlock(out, head.Data.(core.ChainHeadEvent).Block); err != nil {
  159. glog.V(logger.Warn).Infof("Block stats report failed: %v", err)
  160. }
  161. if err = s.reportPending(out); err != nil {
  162. glog.V(logger.Warn).Infof("Post-block transaction stats report failed: %v", err)
  163. }
  164. case _, ok := <-txSub.Chan():
  165. if !ok { // node stopped
  166. conn.Close()
  167. return
  168. }
  169. // Exhaust events to avoid reporting too frequently
  170. for exhausted := false; !exhausted; {
  171. select {
  172. case <-headSub.Chan():
  173. default:
  174. exhausted = true
  175. }
  176. }
  177. if err = s.reportPending(out); err != nil {
  178. glog.V(logger.Warn).Infof("Transaction stats report failed: %v", err)
  179. }
  180. }
  181. }
  182. // Make sure the connection is closed
  183. conn.Close()
  184. }
  185. }
  186. // readLoop loops as long as the connection is alive and retrieves data packets
  187. // from the network socket. If any of them match an active request, it forwards
  188. // it, if they themselves are requests it initiates a reply, and lastly it drops
  189. // unknown packets.
  190. func (s *Service) readLoop(conn *websocket.Conn, in *json.Decoder) {
  191. // If the read loop exists, close the connection
  192. defer conn.Close()
  193. for {
  194. // Retrieve the next generic network packet and bail out on error
  195. var msg map[string][]interface{}
  196. if err := in.Decode(&msg); err != nil {
  197. glog.V(logger.Warn).Infof("Failed to decode stats server message: %v", err)
  198. return
  199. }
  200. if len(msg["emit"]) == 0 {
  201. glog.V(logger.Warn).Infof("Stats server sent non-broadcast: %v", msg)
  202. return
  203. }
  204. command, ok := msg["emit"][0].(string)
  205. if !ok {
  206. glog.V(logger.Warn).Infof("Invalid stats server message type: %v", msg["emit"][0])
  207. return
  208. }
  209. // If the message is a ping reply, deliver (someone must be listening!)
  210. if len(msg["emit"]) == 2 && command == "node-pong" {
  211. select {
  212. case s.pongCh <- struct{}{}:
  213. // Pong delivered, continue listening
  214. continue
  215. default:
  216. // Ping routine dead, abort
  217. glog.V(logger.Warn).Infof("Stats server pinger seems to have died")
  218. return
  219. }
  220. }
  221. // If the message is a history request, forward to the event processor
  222. if len(msg["emit"]) == 2 && command == "history" {
  223. // Make sure the request is valid and doesn't crash us
  224. request, ok := msg["emit"][1].(map[string]interface{})
  225. if !ok {
  226. glog.V(logger.Warn).Infof("Invalid history request: %v", msg["emit"][1])
  227. return
  228. }
  229. list, ok := request["list"].([]interface{})
  230. if !ok {
  231. glog.V(logger.Warn).Infof("Invalid history block list: %v", request["list"])
  232. return
  233. }
  234. // Convert the block number list to an integer list
  235. numbers := make([]uint64, len(list))
  236. for i, num := range list {
  237. n, ok := num.(float64)
  238. if !ok {
  239. glog.V(logger.Warn).Infof("Invalid history block number: %v", num)
  240. return
  241. }
  242. numbers[i] = uint64(n)
  243. }
  244. select {
  245. case s.histCh <- numbers:
  246. continue
  247. default:
  248. }
  249. }
  250. // Report anything else and continue
  251. glog.V(logger.Info).Infof("Unknown stats message: %v", msg)
  252. }
  253. }
  254. // nodeInfo is the collection of metainformation about a node that is displayed
  255. // on the monitoring page.
  256. type nodeInfo struct {
  257. Name string `json:"name"`
  258. Node string `json:"node"`
  259. Port int `json:"port"`
  260. Network string `json:"net"`
  261. Protocol string `json:"protocol"`
  262. API string `json:"api"`
  263. Os string `json:"os"`
  264. OsVer string `json:"os_v"`
  265. Client string `json:"client"`
  266. History bool `json:"canUpdateHistory"`
  267. }
  268. // authMsg is the authentication infos needed to login to a monitoring server.
  269. type authMsg struct {
  270. Id string `json:"id"`
  271. Info nodeInfo `json:"info"`
  272. Secret string `json:"secret"`
  273. }
  274. // login tries to authorize the client at the remote server.
  275. func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
  276. // Construct and send the login authentication
  277. infos := s.server.NodeInfo()
  278. var network, protocol string
  279. if info := infos.Protocols["eth"]; info != nil {
  280. network = strconv.Itoa(info.(*eth.EthNodeInfo).Network)
  281. protocol = fmt.Sprintf("eth/%d", eth.ProtocolVersions[0])
  282. } else {
  283. network = strconv.Itoa(infos.Protocols["les"].(*eth.EthNodeInfo).Network)
  284. protocol = fmt.Sprintf("les/%d", les.ProtocolVersions[0])
  285. }
  286. auth := &authMsg{
  287. Id: s.node,
  288. Info: nodeInfo{
  289. Name: s.node,
  290. Node: infos.Name,
  291. Port: infos.Ports.Listener,
  292. Network: network,
  293. Protocol: protocol,
  294. API: "No",
  295. Os: runtime.GOOS,
  296. OsVer: runtime.GOARCH,
  297. Client: "0.1.1",
  298. History: true,
  299. },
  300. Secret: s.pass,
  301. }
  302. login := map[string][]interface{}{
  303. "emit": {"hello", auth},
  304. }
  305. if err := out.Encode(login); err != nil {
  306. return err
  307. }
  308. // Retrieve the remote ack or connection termination
  309. var ack map[string][]string
  310. if err := in.Decode(&ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" {
  311. return errors.New("unauthorized")
  312. }
  313. return nil
  314. }
  315. // report collects all possible data to report and send it to the stats server.
  316. // This should only be used on reconnects or rarely to avoid overloading the
  317. // server. Use the individual methods for reporting subscribed events.
  318. func (s *Service) report(out *json.Encoder) error {
  319. if err := s.reportLatency(out); err != nil {
  320. return err
  321. }
  322. if err := s.reportBlock(out, nil); err != nil {
  323. return err
  324. }
  325. if err := s.reportPending(out); err != nil {
  326. return err
  327. }
  328. if err := s.reportStats(out); err != nil {
  329. return err
  330. }
  331. return nil
  332. }
  333. // reportLatency sends a ping request to the server, measures the RTT time and
  334. // finally sends a latency update.
  335. func (s *Service) reportLatency(out *json.Encoder) error {
  336. // Send the current time to the ethstats server
  337. start := time.Now()
  338. ping := map[string][]interface{}{
  339. "emit": {"node-ping", map[string]string{
  340. "id": s.node,
  341. "clientTime": start.String(),
  342. }},
  343. }
  344. if err := out.Encode(ping); err != nil {
  345. return err
  346. }
  347. // Wait for the pong request to arrive back
  348. select {
  349. case <-s.pongCh:
  350. // Pong delivered, report the latency
  351. case <-time.After(3 * time.Second):
  352. // Ping timeout, abort
  353. return errors.New("ping timed out")
  354. }
  355. // Send back the measured latency
  356. latency := map[string][]interface{}{
  357. "emit": {"latency", map[string]string{
  358. "id": s.node,
  359. "latency": strconv.Itoa(int((time.Since(start) / time.Duration(2)).Nanoseconds() / 1000000)),
  360. }},
  361. }
  362. return out.Encode(latency)
  363. }
  364. // blockStats is the information to report about individual blocks.
  365. type blockStats struct {
  366. Number *big.Int `json:"number"`
  367. Hash common.Hash `json:"hash"`
  368. Timestamp *big.Int `json:"timestamp"`
  369. Miner common.Address `json:"miner"`
  370. GasUsed *big.Int `json:"gasUsed"`
  371. GasLimit *big.Int `json:"gasLimit"`
  372. Diff string `json:"difficulty"`
  373. TotalDiff string `json:"totalDifficulty"`
  374. Txs txStats `json:"transactions"`
  375. Uncles uncleStats `json:"uncles"`
  376. }
  377. // txStats is a custom wrapper around a transaction array to force serializing
  378. // empty arrays instead of returning null for them.
  379. type txStats []*types.Transaction
  380. func (s txStats) MarshalJSON() ([]byte, error) {
  381. if txs := ([]*types.Transaction)(s); len(txs) > 0 {
  382. return json.Marshal(txs)
  383. }
  384. return []byte("[]"), nil
  385. }
  386. // uncleStats is a custom wrapper around an uncle array to force serializing
  387. // empty arrays instead of returning null for them.
  388. type uncleStats []*types.Header
  389. func (s uncleStats) MarshalJSON() ([]byte, error) {
  390. if uncles := ([]*types.Header)(s); len(uncles) > 0 {
  391. return json.Marshal(uncles)
  392. }
  393. return []byte("[]"), nil
  394. }
  395. // reportBlock retrieves the current chain head and repors it to the stats server.
  396. func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
  397. // Assemble the block stats report and send it to the server
  398. stats := map[string]interface{}{
  399. "id": s.node,
  400. "block": s.assembleBlockStats(block),
  401. }
  402. report := map[string][]interface{}{
  403. "emit": {"block", stats},
  404. }
  405. return out.Encode(report)
  406. }
  407. // assembleBlockStats retrieves any required metadata to report a single block
  408. // and assembles the block stats. If block is nil, the current head is processed.
  409. func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
  410. // Gather the block infos from the local blockchain
  411. var (
  412. header *types.Header
  413. td *big.Int
  414. txs []*types.Transaction
  415. uncles []*types.Header
  416. )
  417. if s.eth != nil {
  418. // Full nodes have all needed information available
  419. if block == nil {
  420. block = s.eth.BlockChain().CurrentBlock()
  421. }
  422. header = block.Header()
  423. td = s.eth.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
  424. txs = block.Transactions()
  425. uncles = block.Uncles()
  426. } else {
  427. // Light nodes would need on-demand lookups for transactions/uncles, skip
  428. if block != nil {
  429. header = block.Header()
  430. } else {
  431. header = s.les.BlockChain().CurrentHeader()
  432. }
  433. td = s.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
  434. }
  435. // Assemble and return the block stats
  436. return &blockStats{
  437. Number: header.Number,
  438. Hash: header.Hash(),
  439. Timestamp: header.Time,
  440. Miner: header.Coinbase,
  441. GasUsed: new(big.Int).Set(header.GasUsed),
  442. GasLimit: new(big.Int).Set(header.GasLimit),
  443. Diff: header.Difficulty.String(),
  444. TotalDiff: td.String(),
  445. Txs: txs,
  446. Uncles: uncles,
  447. }
  448. }
  449. // reportHistory retrieves the most recent batch of blocks and reports it to the
  450. // stats server.
  451. func (s *Service) reportHistory(out *json.Encoder, list []uint64) error {
  452. // Figure out the indexes that need reporting
  453. indexes := make([]uint64, 0, historyUpdateRange)
  454. if len(list) > 0 {
  455. // Specific indexes requested, send them back in particular
  456. indexes = append(indexes, list...)
  457. } else {
  458. // No indexes requested, send back the top ones
  459. var head *types.Header
  460. if s.eth != nil {
  461. head = s.eth.BlockChain().CurrentHeader()
  462. } else {
  463. head = s.les.BlockChain().CurrentHeader()
  464. }
  465. start := head.Number.Int64() - historyUpdateRange
  466. if start < 0 {
  467. start = 0
  468. }
  469. for i := uint64(start); i <= head.Number.Uint64(); i++ {
  470. indexes = append(indexes, i)
  471. }
  472. }
  473. // Gather the batch of blocks to report
  474. history := make([]*blockStats, len(indexes))
  475. for i, number := range indexes {
  476. if s.eth != nil {
  477. history[i] = s.assembleBlockStats(s.eth.BlockChain().GetBlockByNumber(number))
  478. } else {
  479. history[i] = s.assembleBlockStats(types.NewBlockWithHeader(s.les.BlockChain().GetHeaderByNumber(number)))
  480. }
  481. }
  482. // Assemble the history report and send it to the server
  483. stats := map[string]interface{}{
  484. "id": s.node,
  485. "history": history,
  486. }
  487. report := map[string][]interface{}{
  488. "emit": {"history", stats},
  489. }
  490. return out.Encode(report)
  491. }
  492. // pendStats is the information to report about pending transactions.
  493. type pendStats struct {
  494. Pending int `json:"pending"`
  495. }
  496. // reportPending retrieves the current number of pending transactions and reports
  497. // it to the stats server.
  498. func (s *Service) reportPending(out *json.Encoder) error {
  499. // Retrieve the pending count from the local blockchain
  500. var pending int
  501. if s.eth != nil {
  502. pending, _ = s.eth.TxPool().Stats()
  503. } else {
  504. pending = s.les.TxPool().Stats()
  505. }
  506. // Assemble the transaction stats and send it to the server
  507. stats := map[string]interface{}{
  508. "id": s.node,
  509. "stats": &pendStats{
  510. Pending: pending,
  511. },
  512. }
  513. report := map[string][]interface{}{
  514. "emit": {"pending", stats},
  515. }
  516. return out.Encode(report)
  517. }
  518. // blockStats is the information to report about the local node.
  519. type nodeStats struct {
  520. Active bool `json:"active"`
  521. Syncing bool `json:"syncing"`
  522. Mining bool `json:"mining"`
  523. Hashrate int `json:"hashrate"`
  524. Peers int `json:"peers"`
  525. GasPrice int `json:"gasPrice"`
  526. Uptime int `json:"uptime"`
  527. }
  528. // reportPending retrieves various stats about the node at the networking and
  529. // mining layer and reports it to the stats server.
  530. func (s *Service) reportStats(out *json.Encoder) error {
  531. // Gather the syncing and mining infos from the local miner instance
  532. var (
  533. mining bool
  534. hashrate int
  535. syncing bool
  536. gasprice int
  537. )
  538. if s.eth != nil {
  539. mining = s.eth.Miner().Mining()
  540. hashrate = int(s.eth.Miner().HashRate())
  541. sync := s.eth.Downloader().Progress()
  542. syncing = s.eth.BlockChain().CurrentHeader().Number.Uint64() >= sync.HighestBlock
  543. gasprice = int(s.eth.Miner().GasPrice().Uint64())
  544. } else {
  545. sync := s.les.Downloader().Progress()
  546. syncing = s.les.BlockChain().CurrentHeader().Number.Uint64() >= sync.HighestBlock
  547. }
  548. stats := map[string]interface{}{
  549. "id": s.node,
  550. "stats": &nodeStats{
  551. Active: true,
  552. Mining: mining,
  553. Hashrate: hashrate,
  554. Peers: s.server.PeerCount(),
  555. GasPrice: gasprice,
  556. Syncing: syncing,
  557. Uptime: 100,
  558. },
  559. }
  560. report := map[string][]interface{}{
  561. "emit": {"stats", stats},
  562. }
  563. return out.Encode(report)
  564. }