| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552 |
- // Copyright 2019 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package dashboard
- import (
- "container/list"
- "strings"
- "time"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/p2p"
- )
- const (
- eventBufferLimit = 128 // Maximum number of buffered peer events.
- knownPeerLimit = 100 // Maximum number of stored peers, which successfully made the handshake.
- attemptLimit = 200 // Maximum number of stored peers, which failed to make the handshake.
- // eventLimit is the maximum number of the dashboard's custom peer events,
- // that are collected between two metering period and sent to the clients
- // as one message.
- // TODO (kurkomisi): Limit the number of events.
- eventLimit = knownPeerLimit << 2
- )
- // peerContainer contains information about the node's peers. This data structure
- // maintains the metered peer data based on the different behaviours of the peers.
- //
- // Every peer has an IP address, and the peers that manage to make the handshake
- // (known peers) have node IDs too. There can appear more peers with the same IP,
- // therefore the peer container data structure is a tree consisting of a map of
- // maps, where the first key groups the peers by IP, while the second one groups
- // them by the node ID. The known peers can be active if their connection is still
- // open, or inactive otherwise. The peers failing before the handshake (unknown
- // peers) only have IP addresses, so their connection attempts are stored as part
- // of the value of the outer map.
- //
- // Another criteria is to limit the number of metered peers so that
- // they don't fill the memory. The selection order is based on the
- // peers activity: the peers that are inactive for the longest time
- // are thrown first. For the selection a fifo list is used which is
- // linked to the bottom of the peer tree in a way that every activity
- // of the peer pushes the peer to the end of the list, so the inactive
- // ones come to the front. When a peer has some activity, it is removed
- // from and reinserted into the list. When the length of the list reaches
- // the limit, the first element is removed from the list, as well as from
- // the tree.
- //
- // The active peers have priority over the inactive ones, therefore
- // they have their own list. The separation makes it sure that the
- // inactive peers are always removed before the active ones.
- //
- // The peers that don't manage to make handshake are not inserted into the list,
- // only their connection attempts are appended to the array belonging to their IP.
- // In order to keep the fifo principle, a super array contains the order of the
- // attempts, and when the overall count reaches the limit, the earliest attempt is
- // removed from the beginning of its array.
- //
- // This data structure makes it possible to marshal the peer
- // history simply by passing it to the JSON marshaler.
- type peerContainer struct {
- // Bundles is the outer map using the peer's IP address as key.
- Bundles map[string]*peerBundle `json:"bundles,omitempty"`
- activeCount int // Number of the still connected peers
- // inactivePeers contains the peers with closed connection in chronological order.
- inactivePeers *list.List
- // attemptOrder is the super array containing the IP addresses, from which
- // the peers attempted to connect then failed before/during the handshake.
- // Its values are appended in chronological order, which means that the
- // oldest attempt is at the beginning of the array. When the first element
- // is removed, the first element of the related bundle's attempt array is
- // removed too, ensuring that always the latest attempts are stored.
- attemptOrder []string
- // geodb is the geoip database used to retrieve the peers' geographical location.
- geodb *geoDB
- }
- // newPeerContainer returns a new instance of the peer container.
- func newPeerContainer(geodb *geoDB) *peerContainer {
- return &peerContainer{
- Bundles: make(map[string]*peerBundle),
- inactivePeers: list.New(),
- attemptOrder: make([]string, 0, attemptLimit),
- geodb: geodb,
- }
- }
- // bundle inserts a new peer bundle into the map, if the peer belonging
- // to the given IP wasn't metered so far. In this case retrieves the location of
- // the IP address from the database and creates a corresponding peer event.
- // Returns the bundle belonging to the given IP and the events occurring during
- // the initialization.
- func (pc *peerContainer) bundle(ip string) (*peerBundle, []*peerEvent) {
- var events []*peerEvent
- if _, ok := pc.Bundles[ip]; !ok {
- location := pc.geodb.location(ip)
- events = append(events, &peerEvent{
- IP: ip,
- Location: location,
- })
- pc.Bundles[ip] = &peerBundle{
- Location: location,
- KnownPeers: make(map[string]*knownPeer),
- }
- }
- return pc.Bundles[ip], events
- }
- // extendKnown handles the events of the successfully connected peers.
- // Returns the events occurring during the extension.
- func (pc *peerContainer) extendKnown(event *peerEvent) []*peerEvent {
- bundle, events := pc.bundle(event.IP)
- peer, peerEvents := bundle.knownPeer(event.IP, event.ID)
- events = append(events, peerEvents...)
- // Append the connect and the disconnect events to
- // the corresponding arrays keeping the limit.
- switch {
- case event.Connected != nil:
- peer.Connected = append(peer.Connected, event.Connected)
- if first := len(peer.Connected) - sampleLimit; first > 0 {
- peer.Connected = peer.Connected[first:]
- }
- peer.Active = true
- events = append(events, &peerEvent{
- Activity: Active,
- IP: peer.ip,
- ID: peer.id,
- })
- pc.activeCount++
- if peer.listElement != nil {
- _ = pc.inactivePeers.Remove(peer.listElement)
- peer.listElement = nil
- }
- case event.Disconnected != nil:
- peer.Disconnected = append(peer.Disconnected, event.Disconnected)
- if first := len(peer.Disconnected) - sampleLimit; first > 0 {
- peer.Disconnected = peer.Disconnected[first:]
- }
- peer.Active = false
- events = append(events, &peerEvent{
- Activity: Inactive,
- IP: peer.ip,
- ID: peer.id,
- })
- pc.activeCount--
- if peer.listElement != nil {
- // If the peer is already in the list, remove and reinsert it.
- _ = pc.inactivePeers.Remove(peer.listElement)
- }
- // Insert the peer into the list.
- peer.listElement = pc.inactivePeers.PushBack(peer)
- }
- for pc.inactivePeers.Len() > 0 && pc.activeCount+pc.inactivePeers.Len() > knownPeerLimit {
- // While the count of the known peers is greater than the limit,
- // remove the first element from the inactive peer list and from the map.
- if removedPeer, ok := pc.inactivePeers.Remove(pc.inactivePeers.Front()).(*knownPeer); ok {
- events = append(events, pc.removeKnown(removedPeer.ip, removedPeer.id)...)
- } else {
- log.Warn("Failed to parse the removed peer")
- }
- }
- if pc.activeCount > knownPeerLimit {
- log.Warn("Number of active peers is greater than the limit")
- }
- return events
- }
- // handleAttempt handles the events of the peers failing before/during the handshake.
- // Returns the events occurring during the extension.
- func (pc *peerContainer) handleAttempt(event *peerEvent) []*peerEvent {
- bundle, events := pc.bundle(event.IP)
- bundle.Attempts = append(bundle.Attempts, &peerAttempt{
- Connected: *event.Connected,
- Disconnected: *event.Disconnected,
- })
- pc.attemptOrder = append(pc.attemptOrder, event.IP)
- for len(pc.attemptOrder) > attemptLimit {
- // While the length of the connection attempt order array is greater
- // than the limit, remove the first element from the involved peer's
- // array and also from the super array.
- events = append(events, pc.removeAttempt(pc.attemptOrder[0])...)
- pc.attemptOrder = pc.attemptOrder[1:]
- }
- return events
- }
- // peerBundle contains the peers belonging to a given IP address.
- type peerBundle struct {
- // Location contains the geographical location based on the bundle's IP address.
- Location *geoLocation `json:"location,omitempty"`
- // KnownPeers is the inner map of the metered peer
- // maintainer data structure using the node ID as key.
- KnownPeers map[string]*knownPeer `json:"knownPeers,omitempty"`
- // Attempts contains the failed connection attempts of the
- // peers belonging to a given IP address in chronological order.
- Attempts []*peerAttempt `json:"attempts,omitempty"`
- }
- // removeKnown removes the known peer belonging to the
- // given IP address and node ID from the peer tree.
- func (pc *peerContainer) removeKnown(ip, id string) (events []*peerEvent) {
- // TODO (kurkomisi): Remove peers that don't have traffic samples anymore.
- if bundle, ok := pc.Bundles[ip]; ok {
- if _, ok := bundle.KnownPeers[id]; ok {
- events = append(events, &peerEvent{
- Remove: RemoveKnown,
- IP: ip,
- ID: id,
- })
- delete(bundle.KnownPeers, id)
- } else {
- log.Warn("No peer to remove", "ip", ip, "id", id)
- }
- if len(bundle.KnownPeers) < 1 && len(bundle.Attempts) < 1 {
- events = append(events, &peerEvent{
- Remove: RemoveBundle,
- IP: ip,
- })
- delete(pc.Bundles, ip)
- }
- } else {
- log.Warn("No bundle to remove", "ip", ip)
- }
- return events
- }
- // removeAttempt removes the peer attempt belonging to the
- // given IP address and node ID from the peer tree.
- func (pc *peerContainer) removeAttempt(ip string) (events []*peerEvent) {
- if bundle, ok := pc.Bundles[ip]; ok {
- if len(bundle.Attempts) > 0 {
- events = append(events, &peerEvent{
- Remove: RemoveAttempt,
- IP: ip,
- })
- bundle.Attempts = bundle.Attempts[1:]
- }
- if len(bundle.Attempts) < 1 && len(bundle.KnownPeers) < 1 {
- events = append(events, &peerEvent{
- Remove: RemoveBundle,
- IP: ip,
- })
- delete(pc.Bundles, ip)
- }
- }
- return events
- }
- // knownPeer inserts a new peer into the map, if the peer belonging
- // to the given IP address and node ID wasn't metered so far. Returns the peer
- // belonging to the given IP and ID as well as the events occurring during the
- // initialization.
- func (bundle *peerBundle) knownPeer(ip, id string) (*knownPeer, []*peerEvent) {
- var events []*peerEvent
- if _, ok := bundle.KnownPeers[id]; !ok {
- now := time.Now()
- ingress := emptyChartEntries(now, sampleLimit)
- egress := emptyChartEntries(now, sampleLimit)
- events = append(events, &peerEvent{
- IP: ip,
- ID: id,
- Ingress: append([]*ChartEntry{}, ingress...),
- Egress: append([]*ChartEntry{}, egress...),
- })
- bundle.KnownPeers[id] = &knownPeer{
- ip: ip,
- id: id,
- Ingress: ingress,
- Egress: egress,
- }
- }
- return bundle.KnownPeers[id], events
- }
- // knownPeer contains the metered data of a particular peer.
- type knownPeer struct {
- // Connected contains the timestamps of the peer's connection events.
- Connected []*time.Time `json:"connected,omitempty"`
- // Disconnected contains the timestamps of the peer's disconnection events.
- Disconnected []*time.Time `json:"disconnected,omitempty"`
- // Ingress and Egress contain the peer's traffic samples, which are collected
- // periodically from the metrics registry.
- //
- // A peer can connect multiple times, and we want to visualize the time
- // passed between two connections, so after the first connection a 0 value
- // is appended to the traffic arrays even if the peer is inactive until the
- // peer is removed.
- Ingress ChartEntries `json:"ingress,omitempty"`
- Egress ChartEntries `json:"egress,omitempty"`
- Active bool `json:"active"` // Denotes if the peer is still connected.
- listElement *list.Element // Pointer to the peer element in the list.
- ip, id string // The IP and the ID by which the peer can be accessed in the tree.
- prevIngress float64
- prevEgress float64
- }
- // peerAttempt contains a failed peer connection attempt's attributes.
- type peerAttempt struct {
- // Connected contains the timestamp of the connection attempt's moment.
- Connected time.Time `json:"connected"`
- // Disconnected contains the timestamp of the
- // moment when the connection attempt failed.
- Disconnected time.Time `json:"disconnected"`
- }
- type RemovedPeerType string
- type ActivityType string
- const (
- RemoveKnown RemovedPeerType = "known"
- RemoveAttempt RemovedPeerType = "attempt"
- RemoveBundle RemovedPeerType = "bundle"
- Active ActivityType = "active"
- Inactive ActivityType = "inactive"
- )
- // peerEvent contains the attributes of a peer event.
- type peerEvent struct {
- IP string `json:"ip,omitempty"` // IP address of the peer.
- ID string `json:"id,omitempty"` // Node ID of the peer.
- Remove RemovedPeerType `json:"remove,omitempty"` // Type of the peer that is to be removed.
- Location *geoLocation `json:"location,omitempty"` // Geographical location of the peer.
- Connected *time.Time `json:"connected,omitempty"` // Timestamp of the connection moment.
- Disconnected *time.Time `json:"disconnected,omitempty"` // Timestamp of the disonnection moment.
- Ingress ChartEntries `json:"ingress,omitempty"` // Ingress samples.
- Egress ChartEntries `json:"egress,omitempty"` // Egress samples.
- Activity ActivityType `json:"activity,omitempty"` // Connection status change.
- }
- // trafficMap is a container for the periodically collected peer traffic.
- type trafficMap map[string]map[string]float64
- // insert inserts a new value to the traffic map. Overwrites
- // the value at the given ip and id if that already exists.
- func (m *trafficMap) insert(ip, id string, val float64) {
- if _, ok := (*m)[ip]; !ok {
- (*m)[ip] = make(map[string]float64)
- }
- (*m)[ip][id] = val
- }
- // collectPeerData gathers data about the peers and sends it to the clients.
- func (db *Dashboard) collectPeerData() {
- defer db.wg.Done()
- // Open the geodb database for IP to geographical information conversions.
- var err error
- db.geodb, err = openGeoDB()
- if err != nil {
- log.Warn("Failed to open geodb", "err", err)
- return
- }
- defer db.geodb.close()
- peerCh := make(chan p2p.MeteredPeerEvent, eventBufferLimit) // Peer event channel.
- subPeer := p2p.SubscribeMeteredPeerEvent(peerCh) // Subscribe to peer events.
- defer subPeer.Unsubscribe() // Unsubscribe at the end.
- ticker := time.NewTicker(db.config.Refresh)
- defer ticker.Stop()
- type registryFunc func(name string, i interface{})
- type collectorFunc func(traffic *trafficMap) registryFunc
- // trafficCollector generates a function that can be passed to
- // the prefixed peer registry in order to collect the metered
- // traffic data from each peer meter.
- trafficCollector := func(prefix string) collectorFunc {
- // This part makes is possible to collect the
- // traffic data into a map from outside.
- return func(traffic *trafficMap) registryFunc {
- // The function which can be passed to the registry.
- return func(name string, i interface{}) {
- if m, ok := i.(metrics.Meter); ok {
- // The name of the meter has the format: <common traffic prefix><IP>/<ID>
- if k := strings.Split(strings.TrimPrefix(name, prefix), "/"); len(k) == 2 {
- traffic.insert(k[0], k[1], float64(m.Count()))
- } else {
- log.Warn("Invalid meter name", "name", name, "prefix", prefix)
- }
- } else {
- log.Warn("Invalid meter type", "name", name)
- }
- }
- }
- }
- collectIngress := trafficCollector(p2p.MetricsInboundTraffic + "/")
- collectEgress := trafficCollector(p2p.MetricsOutboundTraffic + "/")
- peers := newPeerContainer(db.geodb)
- db.peerLock.Lock()
- db.history.Network = &NetworkMessage{
- Peers: peers,
- }
- db.peerLock.Unlock()
- // newPeerEvents contains peer events, which trigger operations that
- // will be executed on the peer tree after a metering period.
- newPeerEvents := make([]*peerEvent, 0, eventLimit)
- ingress, egress := new(trafficMap), new(trafficMap)
- *ingress, *egress = make(trafficMap), make(trafficMap)
- for {
- select {
- case event := <-peerCh:
- now := time.Now()
- switch event.Type {
- case p2p.PeerConnected:
- connected := now.Add(-event.Elapsed)
- newPeerEvents = append(newPeerEvents, &peerEvent{
- IP: event.IP.String(),
- ID: event.ID.String(),
- Connected: &connected,
- })
- case p2p.PeerDisconnected:
- ip, id := event.IP.String(), event.ID.String()
- newPeerEvents = append(newPeerEvents, &peerEvent{
- IP: ip,
- ID: id,
- Disconnected: &now,
- })
- // The disconnect event comes with the last metered traffic count,
- // because after the disconnection the peer's meter is removed
- // from the registry. It can happen, that between two metering
- // period the same peer disconnects multiple times, and appending
- // all the samples to the traffic arrays would shift the metering,
- // so only the last metering is stored, overwriting the previous one.
- ingress.insert(ip, id, float64(event.Ingress))
- egress.insert(ip, id, float64(event.Egress))
- case p2p.PeerHandshakeFailed:
- connected := now.Add(-event.Elapsed)
- newPeerEvents = append(newPeerEvents, &peerEvent{
- IP: event.IP.String(),
- Connected: &connected,
- Disconnected: &now,
- })
- default:
- log.Error("Unknown metered peer event type", "type", event.Type)
- }
- case <-ticker.C:
- // Collect the traffic samples from the registry.
- p2p.PeerIngressRegistry.Each(collectIngress(ingress))
- p2p.PeerEgressRegistry.Each(collectEgress(egress))
- // Protect 'peers', because it is part of the history.
- db.peerLock.Lock()
- var diff []*peerEvent
- for i := 0; i < len(newPeerEvents); i++ {
- if newPeerEvents[i].IP == "" {
- log.Warn("Peer event without IP", "event", *newPeerEvents[i])
- continue
- }
- diff = append(diff, newPeerEvents[i])
- // There are two main branches of peer events coming from the event
- // feed, one belongs to the known peers, one to the unknown peers.
- // If the event has node ID, it belongs to a known peer, otherwise
- // to an unknown one, which is considered as connection attempt.
- //
- // The extension can produce additional peer events, such
- // as remove, location and initial samples events.
- if newPeerEvents[i].ID == "" {
- diff = append(diff, peers.handleAttempt(newPeerEvents[i])...)
- continue
- }
- diff = append(diff, peers.extendKnown(newPeerEvents[i])...)
- }
- // Update the peer tree using the traffic maps.
- for ip, bundle := range peers.Bundles {
- for id, peer := range bundle.KnownPeers {
- // Value is 0 if the traffic map doesn't have the
- // entry corresponding to the given IP and ID.
- curIngress, curEgress := (*ingress)[ip][id], (*egress)[ip][id]
- deltaIngress, deltaEgress := curIngress, curEgress
- if deltaIngress >= peer.prevIngress {
- deltaIngress -= peer.prevIngress
- }
- if deltaEgress >= peer.prevEgress {
- deltaEgress -= peer.prevEgress
- }
- peer.prevIngress, peer.prevEgress = curIngress, curEgress
- i := &ChartEntry{
- Value: deltaIngress,
- }
- e := &ChartEntry{
- Value: deltaEgress,
- }
- peer.Ingress = append(peer.Ingress, i)
- peer.Egress = append(peer.Egress, e)
- if first := len(peer.Ingress) - sampleLimit; first > 0 {
- peer.Ingress = peer.Ingress[first:]
- }
- if first := len(peer.Egress) - sampleLimit; first > 0 {
- peer.Egress = peer.Egress[first:]
- }
- // Creating the traffic sample events.
- diff = append(diff, &peerEvent{
- IP: ip,
- ID: id,
- Ingress: ChartEntries{i},
- Egress: ChartEntries{e},
- })
- }
- }
- db.peerLock.Unlock()
- if len(diff) > 0 {
- db.sendToAll(&Message{Network: &NetworkMessage{
- Diff: diff,
- }})
- }
- // Clear the traffic maps, and the event array,
- // prepare them for the next metering.
- *ingress, *egress = make(trafficMap), make(trafficMap)
- newPeerEvents = newPeerEvents[:0]
- case err := <-subPeer.Err():
- log.Warn("Peer subscription error", "err", err)
- return
- case errc := <-db.quit:
- errc <- nil
- return
- }
- }
- }
|