peers.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package dashboard
  17. import (
  18. "container/list"
  19. "reflect"
  20. "strings"
  21. "time"
  22. "github.com/ethereum/go-ethereum/metrics"
  23. "github.com/ethereum/go-ethereum/log"
  24. "github.com/ethereum/go-ethereum/p2p"
  25. )
  26. const (
  27. knownPeerLimit = 100 // Maximum number of stored peers, which successfully made the handshake.
  28. // eventLimit is the maximum number of the dashboard's custom peer events,
  29. // that are collected between two metering period and sent to the clients
  30. // as one message.
  31. // TODO (kurkomisi): Limit the number of events.
  32. eventLimit = knownPeerLimit << 2
  33. )
  34. // peerContainer contains information about the node's peers. This data structure
  35. // maintains the metered peer data based on the different behaviours of the peers.
  36. //
  37. // Every peer has an IP address, and the peers that manage to make the handshake
  38. // (known peers) have node IDs too. There can appear more peers with the same IP,
  39. // therefore the peer container data structure is a tree consisting of a map of
  40. // maps, where the first key groups the peers by IP, while the second one groups
  41. // them by the node ID. The known peers can be active if their connection is still
  42. // open, or inactive otherwise. The peers failing before the handshake (unknown
  43. // peers) only have IP addresses, so their connection attempts are stored as part
  44. // of the value of the outer map.
  45. //
  46. // Another criteria is to limit the number of metered peers so that
  47. // they don't fill the memory. The selection order is based on the
  48. // peers activity: the peers that are inactive for the longest time
  49. // are thrown first. For the selection a fifo list is used which is
  50. // linked to the bottom of the peer tree in a way that every activity
  51. // of the peer pushes the peer to the end of the list, so the inactive
  52. // ones come to the front. When a peer has some activity, it is removed
  53. // from and reinserted into the list. When the length of the list reaches
  54. // the limit, the first element is removed from the list, as well as from
  55. // the tree.
  56. //
  57. // The active peers have priority over the inactive ones, therefore
  58. // they have their own list. The separation makes it sure that the
  59. // inactive peers are always removed before the active ones.
  60. //
  61. // The peers that don't manage to make handshake are not inserted into the list,
  62. // only their connection attempts are appended to the array belonging to their IP.
  63. // In order to keep the fifo principle, a super array contains the order of the
  64. // attempts, and when the overall count reaches the limit, the earliest attempt is
  65. // removed from the beginning of its array.
  66. //
  67. // This data structure makes it possible to marshal the peer
  68. // history simply by passing it to the JSON marshaler.
  69. type peerContainer struct {
  70. // Bundles is the outer map using the peer's IP address as key.
  71. Bundles map[string]*peerBundle `json:"bundles,omitempty"`
  72. activeCount int // Number of the still connected peers
  73. // inactivePeers contains the peers with closed connection in chronological order.
  74. inactivePeers *list.List
  75. // geodb is the geoip database used to retrieve the peers' geographical location.
  76. geodb *geoDB
  77. }
  78. // newPeerContainer returns a new instance of the peer container.
  79. func newPeerContainer(geodb *geoDB) *peerContainer {
  80. return &peerContainer{
  81. Bundles: make(map[string]*peerBundle),
  82. inactivePeers: list.New(),
  83. geodb: geodb,
  84. }
  85. }
  86. // bundle inserts a new peer bundle into the map, if the peer belonging
  87. // to the given IP wasn't metered so far. In this case retrieves the location of
  88. // the IP address from the database and creates a corresponding peer event.
  89. // Returns the bundle belonging to the given IP and the events occurring during
  90. // the initialization.
  91. func (pc *peerContainer) bundle(addr string) (*peerBundle, []*peerEvent) {
  92. var events []*peerEvent
  93. if _, ok := pc.Bundles[addr]; !ok {
  94. i := strings.IndexByte(addr, ':')
  95. if i < 0 {
  96. i = len(addr)
  97. }
  98. location := pc.geodb.location(addr[:i])
  99. events = append(events, &peerEvent{
  100. Addr: addr,
  101. Location: location,
  102. })
  103. pc.Bundles[addr] = &peerBundle{
  104. Location: location,
  105. KnownPeers: make(map[string]*knownPeer),
  106. }
  107. }
  108. return pc.Bundles[addr], events
  109. }
  110. // extendKnown handles the events of the successfully connected peers.
  111. // Returns the events occurring during the extension.
  112. func (pc *peerContainer) extendKnown(event *peerEvent) []*peerEvent {
  113. bundle, events := pc.bundle(event.Addr)
  114. peer, peerEvents := bundle.knownPeer(event.Addr, event.Enode)
  115. events = append(events, peerEvents...)
  116. // Append the connect and the disconnect events to
  117. // the corresponding arrays keeping the limit.
  118. switch {
  119. case event.Connected != nil: // Handshake succeeded
  120. peer.Connected = append(peer.Connected, event.Connected)
  121. if first := len(peer.Connected) - sampleLimit; first > 0 {
  122. peer.Connected = peer.Connected[first:]
  123. }
  124. if event.peer == nil {
  125. log.Warn("Peer handshake succeeded event without peer instance", "addr", event.Addr, "enode", event.Enode)
  126. }
  127. peer.peer = event.peer
  128. info := event.peer.Info()
  129. peer.Name = info.Name
  130. peer.Protocols = info.Protocols
  131. peer.Active = true
  132. e := &peerEvent{
  133. Activity: Active,
  134. Name: info.Name,
  135. Addr: peer.addr,
  136. Enode: peer.enode,
  137. Protocols: peer.Protocols,
  138. }
  139. events = append(events, e)
  140. pc.activeCount++
  141. if peer.listElement != nil {
  142. _ = pc.inactivePeers.Remove(peer.listElement)
  143. peer.listElement = nil
  144. }
  145. case event.Disconnected != nil: // Peer disconnected
  146. peer.Disconnected = append(peer.Disconnected, event.Disconnected)
  147. if first := len(peer.Disconnected) - sampleLimit; first > 0 {
  148. peer.Disconnected = peer.Disconnected[first:]
  149. }
  150. peer.Active = false
  151. events = append(events, &peerEvent{
  152. Activity: Inactive,
  153. Addr: peer.addr,
  154. Enode: peer.enode,
  155. })
  156. pc.activeCount--
  157. if peer.listElement != nil {
  158. // If the peer is already in the list, remove and reinsert it.
  159. _ = pc.inactivePeers.Remove(peer.listElement)
  160. }
  161. // Insert the peer into the list.
  162. peer.listElement = pc.inactivePeers.PushBack(peer)
  163. default:
  164. log.Warn("Unexpected known peer event", "event", *event)
  165. }
  166. for pc.inactivePeers.Len() > 0 && pc.activeCount+pc.inactivePeers.Len() > knownPeerLimit {
  167. // While the count of the known peers is greater than the limit,
  168. // remove the first element from the inactive peer list and from the map.
  169. if removedPeer, ok := pc.inactivePeers.Remove(pc.inactivePeers.Front()).(*knownPeer); ok {
  170. events = append(events, pc.removeKnown(removedPeer.addr, removedPeer.enode)...)
  171. } else {
  172. log.Warn("Failed to parse the removed peer")
  173. }
  174. }
  175. if pc.activeCount > knownPeerLimit {
  176. log.Warn("Number of active peers is greater than the limit")
  177. }
  178. return events
  179. }
  180. // peerBundle contains the peers belonging to a given IP address.
  181. type peerBundle struct {
  182. // Location contains the geographical location based on the bundle's IP address.
  183. Location *geoLocation `json:"location,omitempty"`
  184. // KnownPeers is the inner map of the metered peer
  185. // maintainer data structure using the node ID as key.
  186. KnownPeers map[string]*knownPeer `json:"knownPeers,omitempty"`
  187. // Attempts contains the count of the failed connection
  188. // attempts of the peers belonging to a given IP address.
  189. Attempts uint `json:"attempts,omitempty"`
  190. }
  191. // removeKnown removes the known peer belonging to the
  192. // given IP address and node ID from the peer tree.
  193. func (pc *peerContainer) removeKnown(addr, enode string) (events []*peerEvent) {
  194. // TODO (kurkomisi): Remove peers that don't have traffic samples anymore.
  195. if bundle, ok := pc.Bundles[addr]; ok {
  196. if _, ok := bundle.KnownPeers[enode]; ok {
  197. events = append(events, &peerEvent{
  198. Remove: RemoveKnown,
  199. Addr: addr,
  200. Enode: enode,
  201. })
  202. delete(bundle.KnownPeers, enode)
  203. } else {
  204. log.Warn("No peer to remove", "addr", addr, "enode", enode)
  205. }
  206. if len(bundle.KnownPeers) < 1 && bundle.Attempts < 1 {
  207. events = append(events, &peerEvent{
  208. Remove: RemoveBundle,
  209. Addr: addr,
  210. })
  211. delete(pc.Bundles, addr)
  212. }
  213. } else {
  214. log.Warn("No bundle to remove", "addr", addr)
  215. }
  216. return events
  217. }
  218. // knownPeer inserts a new peer into the map, if the peer belonging
  219. // to the given IP address and node ID wasn't metered so far. Returns the peer
  220. // belonging to the given IP and ID as well as the events occurring during the
  221. // initialization.
  222. func (bundle *peerBundle) knownPeer(addr, enode string) (*knownPeer, []*peerEvent) {
  223. var events []*peerEvent
  224. if _, ok := bundle.KnownPeers[enode]; !ok {
  225. ingress := emptyChartEntries(sampleLimit)
  226. egress := emptyChartEntries(sampleLimit)
  227. events = append(events, &peerEvent{
  228. Addr: addr,
  229. Enode: enode,
  230. Ingress: append([]*ChartEntry{}, ingress...),
  231. Egress: append([]*ChartEntry{}, egress...),
  232. })
  233. bundle.KnownPeers[enode] = &knownPeer{
  234. addr: addr,
  235. enode: enode,
  236. Ingress: ingress,
  237. Egress: egress,
  238. }
  239. }
  240. return bundle.KnownPeers[enode], events
  241. }
  242. // knownPeer contains the metered data of a particular peer.
  243. type knownPeer struct {
  244. // Connected contains the timestamps of the peer's connection events.
  245. Connected []*time.Time `json:"connected,omitempty"`
  246. // Disconnected contains the timestamps of the peer's disconnection events.
  247. Disconnected []*time.Time `json:"disconnected,omitempty"`
  248. // Ingress and Egress contain the peer's traffic samples, which are collected
  249. // periodically from the metrics registry.
  250. //
  251. // A peer can connect multiple times, and we want to visualize the time
  252. // passed between two connections, so after the first connection a 0 value
  253. // is appended to the traffic arrays even if the peer is inactive until the
  254. // peer is removed.
  255. Ingress ChartEntries `json:"ingress,omitempty"`
  256. Egress ChartEntries `json:"egress,omitempty"`
  257. Name string `json:"name,omitempty"` // Name of the node, including client type, version, OS, custom data
  258. Enode string `json:"enode,omitempty"` // Node URL
  259. Protocols map[string]interface{} `json:"protocols,omitempty"` // Sub-protocol specific metadata fields
  260. Active bool `json:"active"` // Denotes if the peer is still connected.
  261. listElement *list.Element // Pointer to the peer element in the list.
  262. addr, enode string // The IP and the ID by which the peer can be accessed in the tree.
  263. prevIngress float64
  264. prevEgress float64
  265. peer *p2p.Peer // Connected remote node instance
  266. }
  267. type RemovedPeerType string
  268. type ActivityType string
  269. const (
  270. RemoveKnown RemovedPeerType = "known"
  271. RemoveBundle RemovedPeerType = "bundle"
  272. Active ActivityType = "active"
  273. Inactive ActivityType = "inactive"
  274. )
  275. // peerEvent contains the attributes of a peer event.
  276. type peerEvent struct {
  277. Name string `json:"name,omitempty"` // Name of the node, including client type, version, OS, custom data
  278. Addr string `json:"addr,omitempty"` // TCP address of the peer.
  279. Enode string `json:"enode,omitempty"` // Node URL
  280. Protocols map[string]interface{} `json:"protocols,omitempty"` // Sub-protocol specific metadata fields
  281. Remove RemovedPeerType `json:"remove,omitempty"` // Type of the peer that is to be removed.
  282. Location *geoLocation `json:"location,omitempty"` // Geographical location of the peer.
  283. Connected *time.Time `json:"connected,omitempty"` // Timestamp of the connection moment.
  284. Disconnected *time.Time `json:"disconnected,omitempty"` // Timestamp of the disonnection moment.
  285. Ingress ChartEntries `json:"ingress,omitempty"` // Ingress samples.
  286. Egress ChartEntries `json:"egress,omitempty"` // Egress samples.
  287. Activity ActivityType `json:"activity,omitempty"` // Connection status change.
  288. peer *p2p.Peer // Connected remote node instance.
  289. }
  290. // trafficMap is a container for the periodically collected peer traffic.
  291. type trafficMap map[string]map[string]float64
  292. // insert inserts a new value to the traffic map. Overwrites
  293. // the value at the given ip and id if that already exists.
  294. func (m *trafficMap) insert(ip, id string, val float64) {
  295. if _, ok := (*m)[ip]; !ok {
  296. (*m)[ip] = make(map[string]float64)
  297. }
  298. (*m)[ip][id] = val
  299. }
  300. // collectPeerData gathers data about the peers and sends it to the clients.
  301. func (db *Dashboard) collectPeerData() {
  302. defer db.wg.Done()
  303. // Open the geodb database for IP to geographical information conversions.
  304. var err error
  305. db.geodb, err = openGeoDB()
  306. if err != nil {
  307. log.Warn("Failed to open geodb", "err", err)
  308. errc := <-db.quit
  309. errc <- nil
  310. return
  311. }
  312. defer db.geodb.close()
  313. ticker := time.NewTicker(db.config.Refresh)
  314. defer ticker.Stop()
  315. type registryFunc func(name string, i interface{})
  316. type collectorFunc func(traffic *trafficMap) registryFunc
  317. // trafficCollector generates a function that can be passed to
  318. // the prefixed peer registry in order to collect the metered
  319. // traffic data from each peer meter.
  320. trafficCollector := func(prefix string) collectorFunc {
  321. // This part makes is possible to collect the
  322. // traffic data into a map from outside.
  323. return func(traffic *trafficMap) registryFunc {
  324. // The function which can be passed to the registry.
  325. return func(name string, i interface{}) {
  326. if m, ok := i.(metrics.Meter); ok {
  327. enode := strings.TrimPrefix(name, prefix)
  328. if addr := strings.Split(enode, "@"); len(addr) == 2 {
  329. traffic.insert(addr[1], enode, float64(m.Count()))
  330. } else {
  331. log.Warn("Invalid enode", "enode", enode)
  332. }
  333. } else {
  334. log.Warn("Invalid meter type", "name", name)
  335. }
  336. }
  337. }
  338. }
  339. collectIngress := trafficCollector(p2p.MetricsInboundTraffic + "/")
  340. collectEgress := trafficCollector(p2p.MetricsOutboundTraffic + "/")
  341. peers := newPeerContainer(db.geodb)
  342. db.peerLock.Lock()
  343. db.history.Network = &NetworkMessage{
  344. Peers: peers,
  345. }
  346. db.peerLock.Unlock()
  347. // newPeerEvents contains peer events, which trigger operations that
  348. // will be executed on the peer tree after a metering period.
  349. newPeerEvents := make([]*peerEvent, 0, eventLimit)
  350. ingress, egress := new(trafficMap), new(trafficMap)
  351. *ingress, *egress = make(trafficMap), make(trafficMap)
  352. defer db.subPeer.Unsubscribe()
  353. for {
  354. select {
  355. case event := <-db.peerCh:
  356. now := time.Now()
  357. switch event.Type {
  358. case p2p.PeerHandshakeFailed:
  359. connected := now.Add(-event.Elapsed)
  360. newPeerEvents = append(newPeerEvents, &peerEvent{
  361. Addr: event.Addr,
  362. Connected: &connected,
  363. Disconnected: &now,
  364. })
  365. case p2p.PeerHandshakeSucceeded:
  366. connected := now.Add(-event.Elapsed)
  367. newPeerEvents = append(newPeerEvents, &peerEvent{
  368. Addr: event.Addr,
  369. Enode: event.Peer.Node().String(),
  370. peer: event.Peer,
  371. Connected: &connected,
  372. })
  373. case p2p.PeerDisconnected:
  374. addr, enode := event.Addr, event.Peer.Node().String()
  375. newPeerEvents = append(newPeerEvents, &peerEvent{
  376. Addr: addr,
  377. Enode: enode,
  378. Disconnected: &now,
  379. })
  380. // The disconnect event comes with the last metered traffic count,
  381. // because after the disconnection the peer's meter is removed
  382. // from the registry. It can happen, that between two metering
  383. // period the same peer disconnects multiple times, and appending
  384. // all the samples to the traffic arrays would shift the metering,
  385. // so only the last metering is stored, overwriting the previous one.
  386. ingress.insert(addr, enode, float64(event.Ingress))
  387. egress.insert(addr, enode, float64(event.Egress))
  388. default:
  389. log.Error("Unknown metered peer event type", "type", event.Type)
  390. }
  391. case <-ticker.C:
  392. // Collect the traffic samples from the registry.
  393. p2p.PeerIngressRegistry.Each(collectIngress(ingress))
  394. p2p.PeerEgressRegistry.Each(collectEgress(egress))
  395. // Protect 'peers', because it is part of the history.
  396. db.peerLock.Lock()
  397. var diff []*peerEvent
  398. for i := 0; i < len(newPeerEvents); i++ {
  399. if newPeerEvents[i].Addr == "" {
  400. log.Warn("Peer event without IP", "event", *newPeerEvents[i])
  401. continue
  402. }
  403. diff = append(diff, newPeerEvents[i])
  404. // There are two main branches of peer events coming from the event
  405. // feed, one belongs to the known peers, one to the unknown peers.
  406. // If the event has node ID, it belongs to a known peer, otherwise
  407. // to an unknown one, which is considered as connection attempt.
  408. //
  409. // The extension can produce additional peer events, such
  410. // as remove, location and initial samples events.
  411. if newPeerEvents[i].Enode == "" {
  412. bundle, events := peers.bundle(newPeerEvents[i].Addr)
  413. bundle.Attempts++
  414. diff = append(diff, events...)
  415. continue
  416. }
  417. diff = append(diff, peers.extendKnown(newPeerEvents[i])...)
  418. }
  419. // Update the peer tree using the traffic maps.
  420. for addr, bundle := range peers.Bundles {
  421. for enode, peer := range bundle.KnownPeers {
  422. // Value is 0 if the traffic map doesn't have the
  423. // entry corresponding to the given IP and ID.
  424. curIngress, curEgress := (*ingress)[addr][enode], (*egress)[addr][enode]
  425. deltaIngress, deltaEgress := curIngress, curEgress
  426. if deltaIngress >= peer.prevIngress {
  427. deltaIngress -= peer.prevIngress
  428. }
  429. if deltaEgress >= peer.prevEgress {
  430. deltaEgress -= peer.prevEgress
  431. }
  432. peer.prevIngress, peer.prevEgress = curIngress, curEgress
  433. i := &ChartEntry{
  434. Value: deltaIngress,
  435. }
  436. e := &ChartEntry{
  437. Value: deltaEgress,
  438. }
  439. peer.Ingress = append(peer.Ingress, i)
  440. peer.Egress = append(peer.Egress, e)
  441. if first := len(peer.Ingress) - sampleLimit; first > 0 {
  442. peer.Ingress = peer.Ingress[first:]
  443. }
  444. if first := len(peer.Egress) - sampleLimit; first > 0 {
  445. peer.Egress = peer.Egress[first:]
  446. }
  447. // Creating the traffic sample events.
  448. diff = append(diff, &peerEvent{
  449. Addr: addr,
  450. Enode: enode,
  451. Ingress: ChartEntries{i},
  452. Egress: ChartEntries{e},
  453. })
  454. if peer.peer != nil {
  455. info := peer.peer.Info()
  456. if !reflect.DeepEqual(peer.Protocols, info.Protocols) {
  457. peer.Protocols = info.Protocols
  458. diff = append(diff, &peerEvent{
  459. Addr: addr,
  460. Enode: enode,
  461. Protocols: peer.Protocols,
  462. })
  463. }
  464. }
  465. }
  466. }
  467. db.peerLock.Unlock()
  468. if len(diff) > 0 {
  469. db.sendToAll(&Message{Network: &NetworkMessage{
  470. Diff: diff,
  471. }})
  472. }
  473. // Clear the traffic maps, and the event array,
  474. // prepare them for the next metering.
  475. *ingress, *egress = make(trafficMap), make(trafficMap)
  476. newPeerEvents = newPeerEvents[:0]
  477. case err := <-db.subPeer.Err():
  478. log.Warn("Peer subscription error", "err", err)
  479. errc := <-db.quit
  480. errc <- nil
  481. return
  482. case errc := <-db.quit:
  483. errc <- nil
  484. return
  485. }
  486. }
  487. }