peers.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package dashboard
  17. import (
  18. "container/list"
  19. "strings"
  20. "time"
  21. "github.com/ethereum/go-ethereum/metrics"
  22. "github.com/ethereum/go-ethereum/log"
  23. "github.com/ethereum/go-ethereum/p2p"
  24. )
  25. const (
  26. eventBufferLimit = 128 // Maximum number of buffered peer events.
  27. knownPeerLimit = 100 // Maximum number of stored peers, which successfully made the handshake.
  28. attemptLimit = 200 // Maximum number of stored peers, which failed to make the handshake.
  29. // eventLimit is the maximum number of the dashboard's custom peer events,
  30. // that are collected between two metering period and sent to the clients
  31. // as one message.
  32. // TODO (kurkomisi): Limit the number of events.
  33. eventLimit = knownPeerLimit << 2
  34. )
  35. // peerContainer contains information about the node's peers. This data structure
  36. // maintains the metered peer data based on the different behaviours of the peers.
  37. //
  38. // Every peer has an IP address, and the peers that manage to make the handshake
  39. // (known peers) have node IDs too. There can appear more peers with the same IP,
  40. // therefore the peer container data structure is a tree consisting of a map of
  41. // maps, where the first key groups the peers by IP, while the second one groups
  42. // them by the node ID. The known peers can be active if their connection is still
  43. // open, or inactive otherwise. The peers failing before the handshake (unknown
  44. // peers) only have IP addresses, so their connection attempts are stored as part
  45. // of the value of the outer map.
  46. //
  47. // Another criteria is to limit the number of metered peers so that
  48. // they don't fill the memory. The selection order is based on the
  49. // peers activity: the peers that are inactive for the longest time
  50. // are thrown first. For the selection a fifo list is used which is
  51. // linked to the bottom of the peer tree in a way that every activity
  52. // of the peer pushes the peer to the end of the list, so the inactive
  53. // ones come to the front. When a peer has some activity, it is removed
  54. // from and reinserted into the list. When the length of the list reaches
  55. // the limit, the first element is removed from the list, as well as from
  56. // the tree.
  57. //
  58. // The active peers have priority over the inactive ones, therefore
  59. // they have their own list. The separation makes it sure that the
  60. // inactive peers are always removed before the active ones.
  61. //
  62. // The peers that don't manage to make handshake are not inserted into the list,
  63. // only their connection attempts are appended to the array belonging to their IP.
  64. // In order to keep the fifo principle, a super array contains the order of the
  65. // attempts, and when the overall count reaches the limit, the earliest attempt is
  66. // removed from the beginning of its array.
  67. //
  68. // This data structure makes it possible to marshal the peer
  69. // history simply by passing it to the JSON marshaler.
  70. type peerContainer struct {
  71. // Bundles is the outer map using the peer's IP address as key.
  72. Bundles map[string]*peerBundle `json:"bundles,omitempty"`
  73. activeCount int // Number of the still connected peers
  74. // inactivePeers contains the peers with closed connection in chronological order.
  75. inactivePeers *list.List
  76. // attemptOrder is the super array containing the IP addresses, from which
  77. // the peers attempted to connect then failed before/during the handshake.
  78. // Its values are appended in chronological order, which means that the
  79. // oldest attempt is at the beginning of the array. When the first element
  80. // is removed, the first element of the related bundle's attempt array is
  81. // removed too, ensuring that always the latest attempts are stored.
  82. attemptOrder []string
  83. // geodb is the geoip database used to retrieve the peers' geographical location.
  84. geodb *geoDB
  85. }
  86. // newPeerContainer returns a new instance of the peer container.
  87. func newPeerContainer(geodb *geoDB) *peerContainer {
  88. return &peerContainer{
  89. Bundles: make(map[string]*peerBundle),
  90. inactivePeers: list.New(),
  91. attemptOrder: make([]string, 0, attemptLimit),
  92. geodb: geodb,
  93. }
  94. }
  95. // bundle inserts a new peer bundle into the map, if the peer belonging
  96. // to the given IP wasn't metered so far. In this case retrieves the location of
  97. // the IP address from the database and creates a corresponding peer event.
  98. // Returns the bundle belonging to the given IP and the events occurring during
  99. // the initialization.
  100. func (pc *peerContainer) bundle(ip string) (*peerBundle, []*peerEvent) {
  101. var events []*peerEvent
  102. if _, ok := pc.Bundles[ip]; !ok {
  103. location := pc.geodb.location(ip)
  104. events = append(events, &peerEvent{
  105. IP: ip,
  106. Location: location,
  107. })
  108. pc.Bundles[ip] = &peerBundle{
  109. Location: location,
  110. KnownPeers: make(map[string]*knownPeer),
  111. }
  112. }
  113. return pc.Bundles[ip], events
  114. }
  115. // extendKnown handles the events of the successfully connected peers.
  116. // Returns the events occurring during the extension.
  117. func (pc *peerContainer) extendKnown(event *peerEvent) []*peerEvent {
  118. bundle, events := pc.bundle(event.IP)
  119. peer, peerEvents := bundle.knownPeer(event.IP, event.ID)
  120. events = append(events, peerEvents...)
  121. // Append the connect and the disconnect events to
  122. // the corresponding arrays keeping the limit.
  123. switch {
  124. case event.Connected != nil:
  125. peer.Connected = append(peer.Connected, event.Connected)
  126. if first := len(peer.Connected) - sampleLimit; first > 0 {
  127. peer.Connected = peer.Connected[first:]
  128. }
  129. peer.Active = true
  130. events = append(events, &peerEvent{
  131. Activity: Active,
  132. IP: peer.ip,
  133. ID: peer.id,
  134. })
  135. pc.activeCount++
  136. if peer.listElement != nil {
  137. _ = pc.inactivePeers.Remove(peer.listElement)
  138. peer.listElement = nil
  139. }
  140. case event.Disconnected != nil:
  141. peer.Disconnected = append(peer.Disconnected, event.Disconnected)
  142. if first := len(peer.Disconnected) - sampleLimit; first > 0 {
  143. peer.Disconnected = peer.Disconnected[first:]
  144. }
  145. peer.Active = false
  146. events = append(events, &peerEvent{
  147. Activity: Inactive,
  148. IP: peer.ip,
  149. ID: peer.id,
  150. })
  151. pc.activeCount--
  152. if peer.listElement != nil {
  153. // If the peer is already in the list, remove and reinsert it.
  154. _ = pc.inactivePeers.Remove(peer.listElement)
  155. }
  156. // Insert the peer into the list.
  157. peer.listElement = pc.inactivePeers.PushBack(peer)
  158. }
  159. for pc.inactivePeers.Len() > 0 && pc.activeCount+pc.inactivePeers.Len() > knownPeerLimit {
  160. // While the count of the known peers is greater than the limit,
  161. // remove the first element from the inactive peer list and from the map.
  162. if removedPeer, ok := pc.inactivePeers.Remove(pc.inactivePeers.Front()).(*knownPeer); ok {
  163. events = append(events, pc.removeKnown(removedPeer.ip, removedPeer.id)...)
  164. } else {
  165. log.Warn("Failed to parse the removed peer")
  166. }
  167. }
  168. if pc.activeCount > knownPeerLimit {
  169. log.Warn("Number of active peers is greater than the limit")
  170. }
  171. return events
  172. }
  173. // handleAttempt handles the events of the peers failing before/during the handshake.
  174. // Returns the events occurring during the extension.
  175. func (pc *peerContainer) handleAttempt(event *peerEvent) []*peerEvent {
  176. bundle, events := pc.bundle(event.IP)
  177. bundle.Attempts = append(bundle.Attempts, &peerAttempt{
  178. Connected: *event.Connected,
  179. Disconnected: *event.Disconnected,
  180. })
  181. pc.attemptOrder = append(pc.attemptOrder, event.IP)
  182. for len(pc.attemptOrder) > attemptLimit {
  183. // While the length of the connection attempt order array is greater
  184. // than the limit, remove the first element from the involved peer's
  185. // array and also from the super array.
  186. events = append(events, pc.removeAttempt(pc.attemptOrder[0])...)
  187. pc.attemptOrder = pc.attemptOrder[1:]
  188. }
  189. return events
  190. }
  191. // peerBundle contains the peers belonging to a given IP address.
  192. type peerBundle struct {
  193. // Location contains the geographical location based on the bundle's IP address.
  194. Location *geoLocation `json:"location,omitempty"`
  195. // KnownPeers is the inner map of the metered peer
  196. // maintainer data structure using the node ID as key.
  197. KnownPeers map[string]*knownPeer `json:"knownPeers,omitempty"`
  198. // Attempts contains the failed connection attempts of the
  199. // peers belonging to a given IP address in chronological order.
  200. Attempts []*peerAttempt `json:"attempts,omitempty"`
  201. }
  202. // removeKnown removes the known peer belonging to the
  203. // given IP address and node ID from the peer tree.
  204. func (pc *peerContainer) removeKnown(ip, id string) (events []*peerEvent) {
  205. // TODO (kurkomisi): Remove peers that don't have traffic samples anymore.
  206. if bundle, ok := pc.Bundles[ip]; ok {
  207. if _, ok := bundle.KnownPeers[id]; ok {
  208. events = append(events, &peerEvent{
  209. Remove: RemoveKnown,
  210. IP: ip,
  211. ID: id,
  212. })
  213. delete(bundle.KnownPeers, id)
  214. } else {
  215. log.Warn("No peer to remove", "ip", ip, "id", id)
  216. }
  217. if len(bundle.KnownPeers) < 1 && len(bundle.Attempts) < 1 {
  218. events = append(events, &peerEvent{
  219. Remove: RemoveBundle,
  220. IP: ip,
  221. })
  222. delete(pc.Bundles, ip)
  223. }
  224. } else {
  225. log.Warn("No bundle to remove", "ip", ip)
  226. }
  227. return events
  228. }
  229. // removeAttempt removes the peer attempt belonging to the
  230. // given IP address and node ID from the peer tree.
  231. func (pc *peerContainer) removeAttempt(ip string) (events []*peerEvent) {
  232. if bundle, ok := pc.Bundles[ip]; ok {
  233. if len(bundle.Attempts) > 0 {
  234. events = append(events, &peerEvent{
  235. Remove: RemoveAttempt,
  236. IP: ip,
  237. })
  238. bundle.Attempts = bundle.Attempts[1:]
  239. }
  240. if len(bundle.Attempts) < 1 && len(bundle.KnownPeers) < 1 {
  241. events = append(events, &peerEvent{
  242. Remove: RemoveBundle,
  243. IP: ip,
  244. })
  245. delete(pc.Bundles, ip)
  246. }
  247. }
  248. return events
  249. }
  250. // knownPeer inserts a new peer into the map, if the peer belonging
  251. // to the given IP address and node ID wasn't metered so far. Returns the peer
  252. // belonging to the given IP and ID as well as the events occurring during the
  253. // initialization.
  254. func (bundle *peerBundle) knownPeer(ip, id string) (*knownPeer, []*peerEvent) {
  255. var events []*peerEvent
  256. if _, ok := bundle.KnownPeers[id]; !ok {
  257. now := time.Now()
  258. ingress := emptyChartEntries(now, sampleLimit)
  259. egress := emptyChartEntries(now, sampleLimit)
  260. events = append(events, &peerEvent{
  261. IP: ip,
  262. ID: id,
  263. Ingress: append([]*ChartEntry{}, ingress...),
  264. Egress: append([]*ChartEntry{}, egress...),
  265. })
  266. bundle.KnownPeers[id] = &knownPeer{
  267. ip: ip,
  268. id: id,
  269. Ingress: ingress,
  270. Egress: egress,
  271. }
  272. }
  273. return bundle.KnownPeers[id], events
  274. }
  275. // knownPeer contains the metered data of a particular peer.
  276. type knownPeer struct {
  277. // Connected contains the timestamps of the peer's connection events.
  278. Connected []*time.Time `json:"connected,omitempty"`
  279. // Disconnected contains the timestamps of the peer's disconnection events.
  280. Disconnected []*time.Time `json:"disconnected,omitempty"`
  281. // Ingress and Egress contain the peer's traffic samples, which are collected
  282. // periodically from the metrics registry.
  283. //
  284. // A peer can connect multiple times, and we want to visualize the time
  285. // passed between two connections, so after the first connection a 0 value
  286. // is appended to the traffic arrays even if the peer is inactive until the
  287. // peer is removed.
  288. Ingress ChartEntries `json:"ingress,omitempty"`
  289. Egress ChartEntries `json:"egress,omitempty"`
  290. Active bool `json:"active"` // Denotes if the peer is still connected.
  291. listElement *list.Element // Pointer to the peer element in the list.
  292. ip, id string // The IP and the ID by which the peer can be accessed in the tree.
  293. prevIngress float64
  294. prevEgress float64
  295. }
  296. // peerAttempt contains a failed peer connection attempt's attributes.
  297. type peerAttempt struct {
  298. // Connected contains the timestamp of the connection attempt's moment.
  299. Connected time.Time `json:"connected"`
  300. // Disconnected contains the timestamp of the
  301. // moment when the connection attempt failed.
  302. Disconnected time.Time `json:"disconnected"`
  303. }
  304. type RemovedPeerType string
  305. type ActivityType string
  306. const (
  307. RemoveKnown RemovedPeerType = "known"
  308. RemoveAttempt RemovedPeerType = "attempt"
  309. RemoveBundle RemovedPeerType = "bundle"
  310. Active ActivityType = "active"
  311. Inactive ActivityType = "inactive"
  312. )
  313. // peerEvent contains the attributes of a peer event.
  314. type peerEvent struct {
  315. IP string `json:"ip,omitempty"` // IP address of the peer.
  316. ID string `json:"id,omitempty"` // Node ID of the peer.
  317. Remove RemovedPeerType `json:"remove,omitempty"` // Type of the peer that is to be removed.
  318. Location *geoLocation `json:"location,omitempty"` // Geographical location of the peer.
  319. Connected *time.Time `json:"connected,omitempty"` // Timestamp of the connection moment.
  320. Disconnected *time.Time `json:"disconnected,omitempty"` // Timestamp of the disonnection moment.
  321. Ingress ChartEntries `json:"ingress,omitempty"` // Ingress samples.
  322. Egress ChartEntries `json:"egress,omitempty"` // Egress samples.
  323. Activity ActivityType `json:"activity,omitempty"` // Connection status change.
  324. }
  325. // trafficMap is a container for the periodically collected peer traffic.
  326. type trafficMap map[string]map[string]float64
  327. // insert inserts a new value to the traffic map. Overwrites
  328. // the value at the given ip and id if that already exists.
  329. func (m *trafficMap) insert(ip, id string, val float64) {
  330. if _, ok := (*m)[ip]; !ok {
  331. (*m)[ip] = make(map[string]float64)
  332. }
  333. (*m)[ip][id] = val
  334. }
  335. // collectPeerData gathers data about the peers and sends it to the clients.
  336. func (db *Dashboard) collectPeerData() {
  337. defer db.wg.Done()
  338. // Open the geodb database for IP to geographical information conversions.
  339. var err error
  340. db.geodb, err = openGeoDB()
  341. if err != nil {
  342. log.Warn("Failed to open geodb", "err", err)
  343. return
  344. }
  345. defer db.geodb.close()
  346. peerCh := make(chan p2p.MeteredPeerEvent, eventBufferLimit) // Peer event channel.
  347. subPeer := p2p.SubscribeMeteredPeerEvent(peerCh) // Subscribe to peer events.
  348. defer subPeer.Unsubscribe() // Unsubscribe at the end.
  349. ticker := time.NewTicker(db.config.Refresh)
  350. defer ticker.Stop()
  351. type registryFunc func(name string, i interface{})
  352. type collectorFunc func(traffic *trafficMap) registryFunc
  353. // trafficCollector generates a function that can be passed to
  354. // the prefixed peer registry in order to collect the metered
  355. // traffic data from each peer meter.
  356. trafficCollector := func(prefix string) collectorFunc {
  357. // This part makes is possible to collect the
  358. // traffic data into a map from outside.
  359. return func(traffic *trafficMap) registryFunc {
  360. // The function which can be passed to the registry.
  361. return func(name string, i interface{}) {
  362. if m, ok := i.(metrics.Meter); ok {
  363. // The name of the meter has the format: <common traffic prefix><IP>/<ID>
  364. if k := strings.Split(strings.TrimPrefix(name, prefix), "/"); len(k) == 2 {
  365. traffic.insert(k[0], k[1], float64(m.Count()))
  366. } else {
  367. log.Warn("Invalid meter name", "name", name, "prefix", prefix)
  368. }
  369. } else {
  370. log.Warn("Invalid meter type", "name", name)
  371. }
  372. }
  373. }
  374. }
  375. collectIngress := trafficCollector(p2p.MetricsInboundTraffic + "/")
  376. collectEgress := trafficCollector(p2p.MetricsOutboundTraffic + "/")
  377. peers := newPeerContainer(db.geodb)
  378. db.peerLock.Lock()
  379. db.history.Network = &NetworkMessage{
  380. Peers: peers,
  381. }
  382. db.peerLock.Unlock()
  383. // newPeerEvents contains peer events, which trigger operations that
  384. // will be executed on the peer tree after a metering period.
  385. newPeerEvents := make([]*peerEvent, 0, eventLimit)
  386. ingress, egress := new(trafficMap), new(trafficMap)
  387. *ingress, *egress = make(trafficMap), make(trafficMap)
  388. for {
  389. select {
  390. case event := <-peerCh:
  391. now := time.Now()
  392. switch event.Type {
  393. case p2p.PeerConnected:
  394. connected := now.Add(-event.Elapsed)
  395. newPeerEvents = append(newPeerEvents, &peerEvent{
  396. IP: event.IP.String(),
  397. ID: event.ID.String(),
  398. Connected: &connected,
  399. })
  400. case p2p.PeerDisconnected:
  401. ip, id := event.IP.String(), event.ID.String()
  402. newPeerEvents = append(newPeerEvents, &peerEvent{
  403. IP: ip,
  404. ID: id,
  405. Disconnected: &now,
  406. })
  407. // The disconnect event comes with the last metered traffic count,
  408. // because after the disconnection the peer's meter is removed
  409. // from the registry. It can happen, that between two metering
  410. // period the same peer disconnects multiple times, and appending
  411. // all the samples to the traffic arrays would shift the metering,
  412. // so only the last metering is stored, overwriting the previous one.
  413. ingress.insert(ip, id, float64(event.Ingress))
  414. egress.insert(ip, id, float64(event.Egress))
  415. case p2p.PeerHandshakeFailed:
  416. connected := now.Add(-event.Elapsed)
  417. newPeerEvents = append(newPeerEvents, &peerEvent{
  418. IP: event.IP.String(),
  419. Connected: &connected,
  420. Disconnected: &now,
  421. })
  422. default:
  423. log.Error("Unknown metered peer event type", "type", event.Type)
  424. }
  425. case <-ticker.C:
  426. // Collect the traffic samples from the registry.
  427. p2p.PeerIngressRegistry.Each(collectIngress(ingress))
  428. p2p.PeerEgressRegistry.Each(collectEgress(egress))
  429. // Protect 'peers', because it is part of the history.
  430. db.peerLock.Lock()
  431. var diff []*peerEvent
  432. for i := 0; i < len(newPeerEvents); i++ {
  433. if newPeerEvents[i].IP == "" {
  434. log.Warn("Peer event without IP", "event", *newPeerEvents[i])
  435. continue
  436. }
  437. diff = append(diff, newPeerEvents[i])
  438. // There are two main branches of peer events coming from the event
  439. // feed, one belongs to the known peers, one to the unknown peers.
  440. // If the event has node ID, it belongs to a known peer, otherwise
  441. // to an unknown one, which is considered as connection attempt.
  442. //
  443. // The extension can produce additional peer events, such
  444. // as remove, location and initial samples events.
  445. if newPeerEvents[i].ID == "" {
  446. diff = append(diff, peers.handleAttempt(newPeerEvents[i])...)
  447. continue
  448. }
  449. diff = append(diff, peers.extendKnown(newPeerEvents[i])...)
  450. }
  451. // Update the peer tree using the traffic maps.
  452. for ip, bundle := range peers.Bundles {
  453. for id, peer := range bundle.KnownPeers {
  454. // Value is 0 if the traffic map doesn't have the
  455. // entry corresponding to the given IP and ID.
  456. curIngress, curEgress := (*ingress)[ip][id], (*egress)[ip][id]
  457. deltaIngress, deltaEgress := curIngress, curEgress
  458. if deltaIngress >= peer.prevIngress {
  459. deltaIngress -= peer.prevIngress
  460. }
  461. if deltaEgress >= peer.prevEgress {
  462. deltaEgress -= peer.prevEgress
  463. }
  464. peer.prevIngress, peer.prevEgress = curIngress, curEgress
  465. i := &ChartEntry{
  466. Value: deltaIngress,
  467. }
  468. e := &ChartEntry{
  469. Value: deltaEgress,
  470. }
  471. peer.Ingress = append(peer.Ingress, i)
  472. peer.Egress = append(peer.Egress, e)
  473. if first := len(peer.Ingress) - sampleLimit; first > 0 {
  474. peer.Ingress = peer.Ingress[first:]
  475. }
  476. if first := len(peer.Egress) - sampleLimit; first > 0 {
  477. peer.Egress = peer.Egress[first:]
  478. }
  479. // Creating the traffic sample events.
  480. diff = append(diff, &peerEvent{
  481. IP: ip,
  482. ID: id,
  483. Ingress: ChartEntries{i},
  484. Egress: ChartEntries{e},
  485. })
  486. }
  487. }
  488. db.peerLock.Unlock()
  489. if len(diff) > 0 {
  490. db.sendToAll(&Message{Network: &NetworkMessage{
  491. Diff: diff,
  492. }})
  493. }
  494. // Clear the traffic maps, and the event array,
  495. // prepare them for the next metering.
  496. *ingress, *egress = make(trafficMap), make(trafficMap)
  497. newPeerEvents = newPeerEvents[:0]
  498. case err := <-subPeer.Err():
  499. log.Warn("Peer subscription error", "err", err)
  500. return
  501. case errc := <-db.quit:
  502. errc <- nil
  503. return
  504. }
  505. }
  506. }