| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package network
- import (
- "fmt"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/hexutil"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/swarm/log"
- "github.com/ethereum/go-ethereum/swarm/state"
- )
- /*
- Hive is the logistic manager of the swarm
- When the hive is started, a forever loop is launched that
- asks the Overlay Topology driver (e.g., generic kademlia nodetable)
- to suggest peers to bootstrap connectivity
- */
- // Overlay is the interface for kademlia (or other topology drivers)
- type Overlay interface {
- // suggest peers to connect to
- SuggestPeer() (OverlayAddr, int, bool)
- // register and deregister peer connections
- On(OverlayConn) (depth uint8, changed bool)
- Off(OverlayConn)
- // register peer addresses
- Register([]OverlayAddr) error
- // iterate over connected peers
- EachConn([]byte, int, func(OverlayConn, int, bool) bool)
- // iterate over known peers (address records)
- EachAddr([]byte, int, func(OverlayAddr, int, bool) bool)
- // pretty print the connectivity
- String() string
- // base Overlay address of the node itself
- BaseAddr() []byte
- // connectivity health check used for testing
- Healthy(*PeerPot) *Health
- }
- // HiveParams holds the config options to hive
- type HiveParams struct {
- Discovery bool // if want discovery of not
- PeersBroadcastSetSize uint8 // how many peers to use when relaying
- MaxPeersPerRequest uint8 // max size for peer address batches
- KeepAliveInterval time.Duration
- }
- // NewHiveParams returns hive config with only the
- func NewHiveParams() *HiveParams {
- return &HiveParams{
- Discovery: true,
- PeersBroadcastSetSize: 3,
- MaxPeersPerRequest: 5,
- KeepAliveInterval: 500 * time.Millisecond,
- }
- }
- // Hive manages network connections of the swarm node
- type Hive struct {
- *HiveParams // settings
- Overlay // the overlay connectiviy driver
- Store state.Store // storage interface to save peers across sessions
- addPeer func(*discover.Node) // server callback to connect to a peer
- // bookkeeping
- lock sync.Mutex
- ticker *time.Ticker
- }
- // NewHive constructs a new hive
- // HiveParams: config parameters
- // Overlay: connectivity driver using a network topology
- // StateStore: to save peers across sessions
- func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive {
- return &Hive{
- HiveParams: params,
- Overlay: overlay,
- Store: store,
- }
- }
- // Start stars the hive, receives p2p.Server only at startup
- // server is used to connect to a peer based on its NodeID or enode URL
- // these are called on the p2p.Server which runs on the node
- func (h *Hive) Start(server *p2p.Server) error {
- log.Info("Starting hive", "baseaddr", fmt.Sprintf("%x", h.BaseAddr()[:4]))
- // if state store is specified, load peers to prepopulate the overlay address book
- if h.Store != nil {
- log.Info("Detected an existing store. trying to load peers")
- if err := h.loadPeers(); err != nil {
- log.Error(fmt.Sprintf("%08x hive encoutered an error trying to load peers", h.BaseAddr()[:4]))
- return err
- }
- }
- // assigns the p2p.Server#AddPeer function to connect to peers
- h.addPeer = server.AddPeer
- // ticker to keep the hive alive
- h.ticker = time.NewTicker(h.KeepAliveInterval)
- // this loop is doing bootstrapping and maintains a healthy table
- go h.connect()
- return nil
- }
- // Stop terminates the updateloop and saves the peers
- func (h *Hive) Stop() error {
- log.Info(fmt.Sprintf("%08x hive stopping, saving peers", h.BaseAddr()[:4]))
- h.ticker.Stop()
- if h.Store != nil {
- if err := h.savePeers(); err != nil {
- return fmt.Errorf("could not save peers to persistence store: %v", err)
- }
- if err := h.Store.Close(); err != nil {
- return fmt.Errorf("could not close file handle to persistence store: %v", err)
- }
- }
- log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
- h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool {
- log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
- p.Drop(nil)
- return true
- })
- log.Info(fmt.Sprintf("%08x all peers dropped", h.BaseAddr()[:4]))
- return nil
- }
- // connect is a forever loop
- // at each iteration, ask the overlay driver to suggest the most preferred peer to connect to
- // as well as advertises saturation depth if needed
- func (h *Hive) connect() {
- for range h.ticker.C {
- addr, depth, changed := h.SuggestPeer()
- if h.Discovery && changed {
- NotifyDepth(uint8(depth), h)
- }
- if addr == nil {
- continue
- }
- log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
- under, err := discover.ParseNode(string(addr.(Addr).Under()))
- if err != nil {
- log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
- continue
- }
- log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
- h.addPeer(under)
- }
- }
- // Run protocol run function
- func (h *Hive) Run(p *BzzPeer) error {
- dp := newDiscovery(p, h)
- depth, changed := h.On(dp)
- // if we want discovery, advertise change of depth
- if h.Discovery {
- if changed {
- // if depth changed, send to all peers
- NotifyDepth(depth, h)
- } else {
- // otherwise just send depth to new peer
- dp.NotifyDepth(depth)
- }
- }
- NotifyPeer(p.Off(), h)
- defer h.Off(dp)
- return dp.Run(dp.HandleMsg)
- }
- // NodeInfo function is used by the p2p.server RPC interface to display
- // protocol specific node information
- func (h *Hive) NodeInfo() interface{} {
- return h.String()
- }
- // PeerInfo function is used by the p2p.server RPC interface to display
- // protocol specific information any connected peer referred to by their NodeID
- func (h *Hive) PeerInfo(id discover.NodeID) interface{} {
- addr := NewAddrFromNodeID(id)
- return struct {
- OAddr hexutil.Bytes
- UAddr hexutil.Bytes
- }{
- OAddr: addr.OAddr,
- UAddr: addr.UAddr,
- }
- }
- // ToAddr returns the serialisable version of u
- func ToAddr(pa OverlayPeer) *BzzAddr {
- if addr, ok := pa.(*BzzAddr); ok {
- return addr
- }
- if p, ok := pa.(*discPeer); ok {
- return p.BzzAddr
- }
- return pa.(*BzzPeer).BzzAddr
- }
- // loadPeers, savePeer implement persistence callback/
- func (h *Hive) loadPeers() error {
- var as []*BzzAddr
- err := h.Store.Get("peers", &as)
- if err != nil {
- if err == state.ErrNotFound {
- log.Info(fmt.Sprintf("hive %08x: no persisted peers found", h.BaseAddr()[:4]))
- return nil
- }
- return err
- }
- log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4]))
- return h.Register(toOverlayAddrs(as...))
- }
- // toOverlayAddrs transforms an array of BzzAddr to OverlayAddr
- func toOverlayAddrs(as ...*BzzAddr) (oas []OverlayAddr) {
- for _, a := range as {
- oas = append(oas, OverlayAddr(a))
- }
- return
- }
- // savePeers, savePeer implement persistence callback/
- func (h *Hive) savePeers() error {
- var peers []*BzzAddr
- h.Overlay.EachAddr(nil, 256, func(pa OverlayAddr, i int, _ bool) bool {
- if pa == nil {
- log.Warn(fmt.Sprintf("empty addr: %v", i))
- return true
- }
- apa := ToAddr(pa)
- log.Trace("saving peer", "peer", apa)
- peers = append(peers, apa)
- return true
- })
- if err := h.Store.Put("peers", peers); err != nil {
- return fmt.Errorf("could not save peers: %v", err)
- }
- return nil
- }
|