hive.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "fmt"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/hexutil"
  22. "github.com/ethereum/go-ethereum/p2p"
  23. "github.com/ethereum/go-ethereum/p2p/discover"
  24. "github.com/ethereum/go-ethereum/swarm/log"
  25. "github.com/ethereum/go-ethereum/swarm/state"
  26. )
  27. /*
  28. Hive is the logistic manager of the swarm
  29. When the hive is started, a forever loop is launched that
  30. asks the Overlay Topology driver (e.g., generic kademlia nodetable)
  31. to suggest peers to bootstrap connectivity
  32. */
  33. // Overlay is the interface for kademlia (or other topology drivers)
  34. type Overlay interface {
  35. // suggest peers to connect to
  36. SuggestPeer() (OverlayAddr, int, bool)
  37. // register and deregister peer connections
  38. On(OverlayConn) (depth uint8, changed bool)
  39. Off(OverlayConn)
  40. // register peer addresses
  41. Register([]OverlayAddr) error
  42. // iterate over connected peers
  43. EachConn([]byte, int, func(OverlayConn, int, bool) bool)
  44. // iterate over known peers (address records)
  45. EachAddr([]byte, int, func(OverlayAddr, int, bool) bool)
  46. // pretty print the connectivity
  47. String() string
  48. // base Overlay address of the node itself
  49. BaseAddr() []byte
  50. // connectivity health check used for testing
  51. Healthy(*PeerPot) *Health
  52. }
  53. // HiveParams holds the config options to hive
  54. type HiveParams struct {
  55. Discovery bool // if want discovery of not
  56. PeersBroadcastSetSize uint8 // how many peers to use when relaying
  57. MaxPeersPerRequest uint8 // max size for peer address batches
  58. KeepAliveInterval time.Duration
  59. }
  60. // NewHiveParams returns hive config with only the
  61. func NewHiveParams() *HiveParams {
  62. return &HiveParams{
  63. Discovery: true,
  64. PeersBroadcastSetSize: 3,
  65. MaxPeersPerRequest: 5,
  66. KeepAliveInterval: 500 * time.Millisecond,
  67. }
  68. }
  69. // Hive manages network connections of the swarm node
  70. type Hive struct {
  71. *HiveParams // settings
  72. Overlay // the overlay connectiviy driver
  73. Store state.Store // storage interface to save peers across sessions
  74. addPeer func(*discover.Node) // server callback to connect to a peer
  75. // bookkeeping
  76. lock sync.Mutex
  77. ticker *time.Ticker
  78. }
  79. // NewHive constructs a new hive
  80. // HiveParams: config parameters
  81. // Overlay: connectivity driver using a network topology
  82. // StateStore: to save peers across sessions
  83. func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive {
  84. return &Hive{
  85. HiveParams: params,
  86. Overlay: overlay,
  87. Store: store,
  88. }
  89. }
  90. // Start stars the hive, receives p2p.Server only at startup
  91. // server is used to connect to a peer based on its NodeID or enode URL
  92. // these are called on the p2p.Server which runs on the node
  93. func (h *Hive) Start(server *p2p.Server) error {
  94. log.Info("Starting hive", "baseaddr", fmt.Sprintf("%x", h.BaseAddr()[:4]))
  95. // if state store is specified, load peers to prepopulate the overlay address book
  96. if h.Store != nil {
  97. log.Info("Detected an existing store. trying to load peers")
  98. if err := h.loadPeers(); err != nil {
  99. log.Error(fmt.Sprintf("%08x hive encoutered an error trying to load peers", h.BaseAddr()[:4]))
  100. return err
  101. }
  102. }
  103. // assigns the p2p.Server#AddPeer function to connect to peers
  104. h.addPeer = server.AddPeer
  105. // ticker to keep the hive alive
  106. h.ticker = time.NewTicker(h.KeepAliveInterval)
  107. // this loop is doing bootstrapping and maintains a healthy table
  108. go h.connect()
  109. return nil
  110. }
  111. // Stop terminates the updateloop and saves the peers
  112. func (h *Hive) Stop() error {
  113. log.Info(fmt.Sprintf("%08x hive stopping, saving peers", h.BaseAddr()[:4]))
  114. h.ticker.Stop()
  115. if h.Store != nil {
  116. if err := h.savePeers(); err != nil {
  117. return fmt.Errorf("could not save peers to persistence store: %v", err)
  118. }
  119. if err := h.Store.Close(); err != nil {
  120. return fmt.Errorf("could not close file handle to persistence store: %v", err)
  121. }
  122. }
  123. log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
  124. h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool {
  125. log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
  126. p.Drop(nil)
  127. return true
  128. })
  129. log.Info(fmt.Sprintf("%08x all peers dropped", h.BaseAddr()[:4]))
  130. return nil
  131. }
  132. // connect is a forever loop
  133. // at each iteration, ask the overlay driver to suggest the most preferred peer to connect to
  134. // as well as advertises saturation depth if needed
  135. func (h *Hive) connect() {
  136. for range h.ticker.C {
  137. addr, depth, changed := h.SuggestPeer()
  138. if h.Discovery && changed {
  139. NotifyDepth(uint8(depth), h)
  140. }
  141. if addr == nil {
  142. continue
  143. }
  144. log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
  145. under, err := discover.ParseNode(string(addr.(Addr).Under()))
  146. if err != nil {
  147. log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
  148. continue
  149. }
  150. log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
  151. h.addPeer(under)
  152. }
  153. }
  154. // Run protocol run function
  155. func (h *Hive) Run(p *BzzPeer) error {
  156. dp := newDiscovery(p, h)
  157. depth, changed := h.On(dp)
  158. // if we want discovery, advertise change of depth
  159. if h.Discovery {
  160. if changed {
  161. // if depth changed, send to all peers
  162. NotifyDepth(depth, h)
  163. } else {
  164. // otherwise just send depth to new peer
  165. dp.NotifyDepth(depth)
  166. }
  167. }
  168. NotifyPeer(p.Off(), h)
  169. defer h.Off(dp)
  170. return dp.Run(dp.HandleMsg)
  171. }
  172. // NodeInfo function is used by the p2p.server RPC interface to display
  173. // protocol specific node information
  174. func (h *Hive) NodeInfo() interface{} {
  175. return h.String()
  176. }
  177. // PeerInfo function is used by the p2p.server RPC interface to display
  178. // protocol specific information any connected peer referred to by their NodeID
  179. func (h *Hive) PeerInfo(id discover.NodeID) interface{} {
  180. addr := NewAddrFromNodeID(id)
  181. return struct {
  182. OAddr hexutil.Bytes
  183. UAddr hexutil.Bytes
  184. }{
  185. OAddr: addr.OAddr,
  186. UAddr: addr.UAddr,
  187. }
  188. }
  189. // ToAddr returns the serialisable version of u
  190. func ToAddr(pa OverlayPeer) *BzzAddr {
  191. if addr, ok := pa.(*BzzAddr); ok {
  192. return addr
  193. }
  194. if p, ok := pa.(*discPeer); ok {
  195. return p.BzzAddr
  196. }
  197. return pa.(*BzzPeer).BzzAddr
  198. }
  199. // loadPeers, savePeer implement persistence callback/
  200. func (h *Hive) loadPeers() error {
  201. var as []*BzzAddr
  202. err := h.Store.Get("peers", &as)
  203. if err != nil {
  204. if err == state.ErrNotFound {
  205. log.Info(fmt.Sprintf("hive %08x: no persisted peers found", h.BaseAddr()[:4]))
  206. return nil
  207. }
  208. return err
  209. }
  210. log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4]))
  211. return h.Register(toOverlayAddrs(as...))
  212. }
  213. // toOverlayAddrs transforms an array of BzzAddr to OverlayAddr
  214. func toOverlayAddrs(as ...*BzzAddr) (oas []OverlayAddr) {
  215. for _, a := range as {
  216. oas = append(oas, OverlayAddr(a))
  217. }
  218. return
  219. }
  220. // savePeers, savePeer implement persistence callback/
  221. func (h *Hive) savePeers() error {
  222. var peers []*BzzAddr
  223. h.Overlay.EachAddr(nil, 256, func(pa OverlayAddr, i int, _ bool) bool {
  224. if pa == nil {
  225. log.Warn(fmt.Sprintf("empty addr: %v", i))
  226. return true
  227. }
  228. apa := ToAddr(pa)
  229. log.Trace("saving peer", "peer", apa)
  230. peers = append(peers, apa)
  231. return true
  232. })
  233. if err := h.Store.Put("peers", peers); err != nil {
  234. return fmt.Errorf("could not save peers: %v", err)
  235. }
  236. return nil
  237. }