hive.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "fmt"
  19. "math/rand"
  20. "path/filepath"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/logger"
  24. "github.com/ethereum/go-ethereum/logger/glog"
  25. "github.com/ethereum/go-ethereum/p2p/discover"
  26. "github.com/ethereum/go-ethereum/p2p/netutil"
  27. "github.com/ethereum/go-ethereum/swarm/network/kademlia"
  28. "github.com/ethereum/go-ethereum/swarm/storage"
  29. )
  30. // Hive is the logistic manager of the swarm
  31. // it uses a generic kademlia nodetable to find best peer list
  32. // for any target
  33. // this is used by the netstore to search for content in the swarm
  34. // the bzz protocol peersMsgData exchange is relayed to Kademlia
  35. // for db storage and filtering
  36. // connections and disconnections are reported and relayed
  37. // to keep the nodetable uptodate
  38. type Hive struct {
  39. listenAddr func() string
  40. callInterval uint64
  41. id discover.NodeID
  42. addr kademlia.Address
  43. kad *kademlia.Kademlia
  44. path string
  45. quit chan bool
  46. toggle chan bool
  47. more chan bool
  48. // for testing only
  49. swapEnabled bool
  50. syncEnabled bool
  51. blockRead bool
  52. blockWrite bool
  53. }
  54. const (
  55. callInterval = 3000000000
  56. // bucketSize = 3
  57. // maxProx = 8
  58. // proxBinSize = 4
  59. )
  60. type HiveParams struct {
  61. CallInterval uint64
  62. KadDbPath string
  63. *kademlia.KadParams
  64. }
  65. func NewHiveParams(path string) *HiveParams {
  66. kad := kademlia.NewKadParams()
  67. // kad.BucketSize = bucketSize
  68. // kad.MaxProx = maxProx
  69. // kad.ProxBinSize = proxBinSize
  70. return &HiveParams{
  71. CallInterval: callInterval,
  72. KadDbPath: filepath.Join(path, "bzz-peers.json"),
  73. KadParams: kad,
  74. }
  75. }
  76. func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive {
  77. kad := kademlia.New(kademlia.Address(addr), params.KadParams)
  78. return &Hive{
  79. callInterval: params.CallInterval,
  80. kad: kad,
  81. addr: kad.Addr(),
  82. path: params.KadDbPath,
  83. swapEnabled: swapEnabled,
  84. syncEnabled: syncEnabled,
  85. }
  86. }
  87. func (self *Hive) SyncEnabled(on bool) {
  88. self.syncEnabled = on
  89. }
  90. func (self *Hive) SwapEnabled(on bool) {
  91. self.swapEnabled = on
  92. }
  93. func (self *Hive) BlockNetworkRead(on bool) {
  94. self.blockRead = on
  95. }
  96. func (self *Hive) BlockNetworkWrite(on bool) {
  97. self.blockWrite = on
  98. }
  99. // public accessor to the hive base address
  100. func (self *Hive) Addr() kademlia.Address {
  101. return self.addr
  102. }
  103. // Start receives network info only at startup
  104. // listedAddr is a function to retrieve listening address to advertise to peers
  105. // connectPeer is a function to connect to a peer based on its NodeID or enode URL
  106. // there are called on the p2p.Server which runs on the node
  107. func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) {
  108. self.toggle = make(chan bool)
  109. self.more = make(chan bool)
  110. self.quit = make(chan bool)
  111. self.id = id
  112. self.listenAddr = listenAddr
  113. err = self.kad.Load(self.path, nil)
  114. if err != nil {
  115. glog.V(logger.Warn).Infof("Warning: error reading kaddb '%s' (skipping): %v", self.path, err)
  116. err = nil
  117. }
  118. // this loop is doing bootstrapping and maintains a healthy table
  119. go self.keepAlive()
  120. go func() {
  121. // whenever toggled ask kademlia about most preferred peer
  122. for alive := range self.more {
  123. if !alive {
  124. // receiving false closes the loop while allowing parallel routines
  125. // to attempt to write to more (remove Peer when shutting down)
  126. return
  127. }
  128. node, need, proxLimit := self.kad.Suggest()
  129. if node != nil && len(node.Url) > 0 {
  130. glog.V(logger.Detail).Infof("call known bee %v", node.Url)
  131. // enode or any lower level connection address is unnecessary in future
  132. // discovery table is used to look it up.
  133. connectPeer(node.Url)
  134. }
  135. if need {
  136. // a random peer is taken from the table
  137. peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1)
  138. if len(peers) > 0 {
  139. // a random address at prox bin 0 is sent for lookup
  140. randAddr := kademlia.RandomAddressAt(self.addr, proxLimit)
  141. req := &retrieveRequestMsgData{
  142. Key: storage.Key(randAddr[:]),
  143. }
  144. glog.V(logger.Detail).Infof("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0])
  145. peers[0].(*peer).retrieve(req)
  146. } else {
  147. glog.V(logger.Warn).Infof("no peer")
  148. }
  149. glog.V(logger.Detail).Infof("buzz kept alive")
  150. } else {
  151. glog.V(logger.Info).Infof("no need for more bees")
  152. }
  153. select {
  154. case self.toggle <- need:
  155. case <-self.quit:
  156. return
  157. }
  158. glog.V(logger.Debug).Infof("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount())
  159. }
  160. }()
  161. return
  162. }
  163. // keepAlive is a forever loop
  164. // in its awake state it periodically triggers connection attempts
  165. // by writing to self.more until Kademlia Table is saturated
  166. // wake state is toggled by writing to self.toggle
  167. // it restarts if the table becomes non-full again due to disconnections
  168. func (self *Hive) keepAlive() {
  169. alarm := time.NewTicker(time.Duration(self.callInterval)).C
  170. for {
  171. select {
  172. case <-alarm:
  173. if self.kad.DBCount() > 0 {
  174. select {
  175. case self.more <- true:
  176. glog.V(logger.Debug).Infof("buzz wakeup")
  177. default:
  178. }
  179. }
  180. case need := <-self.toggle:
  181. if alarm == nil && need {
  182. alarm = time.NewTicker(time.Duration(self.callInterval)).C
  183. }
  184. if alarm != nil && !need {
  185. alarm = nil
  186. }
  187. case <-self.quit:
  188. return
  189. }
  190. }
  191. }
  192. func (self *Hive) Stop() error {
  193. // closing toggle channel quits the updateloop
  194. close(self.quit)
  195. return self.kad.Save(self.path, saveSync)
  196. }
  197. // called at the end of a successful protocol handshake
  198. func (self *Hive) addPeer(p *peer) error {
  199. defer func() {
  200. select {
  201. case self.more <- true:
  202. default:
  203. }
  204. }()
  205. glog.V(logger.Detail).Infof("hi new bee %v", p)
  206. err := self.kad.On(p, loadSync)
  207. if err != nil {
  208. return err
  209. }
  210. // self lookup (can be encoded as nil/zero key since peers addr known) + no id ()
  211. // the most common way of saying hi in bzz is initiation of gossip
  212. // let me know about anyone new from my hood , here is the storageradius
  213. // to send the 6 byte self lookup
  214. // we do not record as request or forward it, just reply with peers
  215. p.retrieve(&retrieveRequestMsgData{})
  216. glog.V(logger.Detail).Infof("'whatsup wheresdaparty' sent to %v", p)
  217. return nil
  218. }
  219. // called after peer disconnected
  220. func (self *Hive) removePeer(p *peer) {
  221. glog.V(logger.Debug).Infof("bee %v removed", p)
  222. self.kad.Off(p, saveSync)
  223. select {
  224. case self.more <- true:
  225. default:
  226. }
  227. if self.kad.Count() == 0 {
  228. glog.V(logger.Debug).Infof("empty, all bees gone")
  229. }
  230. }
  231. // Retrieve a list of live peers that are closer to target than us
  232. func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) {
  233. var addr kademlia.Address
  234. copy(addr[:], target[:])
  235. for _, node := range self.kad.FindClosest(addr, max) {
  236. peers = append(peers, node.(*peer))
  237. }
  238. return
  239. }
  240. // disconnects all the peers
  241. func (self *Hive) DropAll() {
  242. glog.V(logger.Info).Infof("dropping all bees")
  243. for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) {
  244. node.Drop()
  245. }
  246. }
  247. // contructor for kademlia.NodeRecord based on peer address alone
  248. // TODO: should go away and only addr passed to kademlia
  249. func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord {
  250. now := time.Now()
  251. return &kademlia.NodeRecord{
  252. Addr: addr.Addr,
  253. Url: addr.String(),
  254. Seen: now,
  255. After: now,
  256. }
  257. }
  258. // called by the protocol when receiving peerset (for target address)
  259. // peersMsgData is converted to a slice of NodeRecords for Kademlia
  260. // this is to store all thats needed
  261. func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) {
  262. var nrs []*kademlia.NodeRecord
  263. for _, p := range req.Peers {
  264. if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil {
  265. glog.V(logger.Detail).Infof("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err)
  266. continue
  267. }
  268. nrs = append(nrs, newNodeRecord(p))
  269. }
  270. self.kad.Add(nrs)
  271. }
  272. // peer wraps the protocol instance to represent a connected peer
  273. // it implements kademlia.Node interface
  274. type peer struct {
  275. *bzz // protocol instance running on peer connection
  276. }
  277. // protocol instance implements kademlia.Node interface (embedded peer)
  278. func (self *peer) Addr() kademlia.Address {
  279. return self.remoteAddr.Addr
  280. }
  281. func (self *peer) Url() string {
  282. return self.remoteAddr.String()
  283. }
  284. // TODO take into account traffic
  285. func (self *peer) LastActive() time.Time {
  286. return self.lastActive
  287. }
  288. // reads the serialised form of sync state persisted as the 'Meta' attribute
  289. // and sets the decoded syncState on the online node
  290. func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
  291. p, ok := node.(*peer)
  292. if !ok {
  293. return fmt.Errorf("invalid type")
  294. }
  295. if record.Meta == nil {
  296. glog.V(logger.Debug).Infof("no sync state for node record %v setting default", record)
  297. p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}}
  298. return nil
  299. }
  300. state, err := decodeSync(record.Meta)
  301. if err != nil {
  302. return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err)
  303. }
  304. glog.V(logger.Detail).Infof("sync state for node record %v read from Meta: %s", record, string(*(record.Meta)))
  305. p.syncState = state
  306. return err
  307. }
  308. // callback when saving a sync state
  309. func saveSync(record *kademlia.NodeRecord, node kademlia.Node) {
  310. if p, ok := node.(*peer); ok {
  311. meta, err := encodeSync(p.syncState)
  312. if err != nil {
  313. glog.V(logger.Warn).Infof("error saving sync state for %v: %v", node, err)
  314. return
  315. }
  316. glog.V(logger.Detail).Infof("saved sync state for %v: %s", node, string(*meta))
  317. record.Meta = meta
  318. }
  319. }
  320. // the immediate response to a retrieve request,
  321. // sends relevant peer data given by the kademlia hive to the requester
  322. // TODO: remember peers sent for duration of the session, only new peers sent
  323. func (self *Hive) peers(req *retrieveRequestMsgData) {
  324. if req != nil && req.MaxPeers >= 0 {
  325. var addrs []*peerAddr
  326. if req.timeout == nil || time.Now().Before(*(req.timeout)) {
  327. key := req.Key
  328. // self lookup from remote peer
  329. if storage.IsZeroKey(key) {
  330. addr := req.from.Addr()
  331. key = storage.Key(addr[:])
  332. req.Key = nil
  333. }
  334. // get peer addresses from hive
  335. for _, peer := range self.getPeers(key, int(req.MaxPeers)) {
  336. addrs = append(addrs, peer.remoteAddr)
  337. }
  338. glog.V(logger.Debug).Infof("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log())
  339. peersData := &peersMsgData{
  340. Peers: addrs,
  341. Key: req.Key,
  342. Id: req.Id,
  343. }
  344. peersData.setTimeout(req.timeout)
  345. req.from.peers(peersData)
  346. }
  347. }
  348. }
  349. func (self *Hive) String() string {
  350. return self.kad.String()
  351. }