protocol.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "net"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/crypto"
  25. "github.com/ethereum/go-ethereum/p2p"
  26. "github.com/ethereum/go-ethereum/p2p/enode"
  27. "github.com/ethereum/go-ethereum/p2p/protocols"
  28. "github.com/ethereum/go-ethereum/rpc"
  29. "github.com/ethereum/go-ethereum/swarm/log"
  30. "github.com/ethereum/go-ethereum/swarm/state"
  31. )
  32. const (
  33. DefaultNetworkID = 3
  34. // ProtocolMaxMsgSize maximum allowed message size
  35. ProtocolMaxMsgSize = 10 * 1024 * 1024
  36. // timeout for waiting
  37. bzzHandshakeTimeout = 3000 * time.Millisecond
  38. )
  39. // BzzSpec is the spec of the generic swarm handshake
  40. var BzzSpec = &protocols.Spec{
  41. Name: "bzz",
  42. Version: 8,
  43. MaxMsgSize: 10 * 1024 * 1024,
  44. Messages: []interface{}{
  45. HandshakeMsg{},
  46. },
  47. }
  48. // DiscoverySpec is the spec for the bzz discovery subprotocols
  49. var DiscoverySpec = &protocols.Spec{
  50. Name: "hive",
  51. Version: 8,
  52. MaxMsgSize: 10 * 1024 * 1024,
  53. Messages: []interface{}{
  54. peersMsg{},
  55. subPeersMsg{},
  56. },
  57. }
  58. // BzzConfig captures the config params used by the hive
  59. type BzzConfig struct {
  60. OverlayAddr []byte // base address of the overlay network
  61. UnderlayAddr []byte // node's underlay address
  62. HiveParams *HiveParams
  63. NetworkID uint64
  64. LightNode bool
  65. }
  66. // Bzz is the swarm protocol bundle
  67. type Bzz struct {
  68. *Hive
  69. NetworkID uint64
  70. LightNode bool
  71. localAddr *BzzAddr
  72. mtx sync.Mutex
  73. handshakes map[enode.ID]*HandshakeMsg
  74. streamerSpec *protocols.Spec
  75. streamerRun func(*BzzPeer) error
  76. }
  77. // NewBzz is the swarm protocol constructor
  78. // arguments
  79. // * bzz config
  80. // * overlay driver
  81. // * peer store
  82. func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
  83. return &Bzz{
  84. Hive: NewHive(config.HiveParams, kad, store),
  85. NetworkID: config.NetworkID,
  86. LightNode: config.LightNode,
  87. localAddr: &BzzAddr{config.OverlayAddr, config.UnderlayAddr},
  88. handshakes: make(map[enode.ID]*HandshakeMsg),
  89. streamerRun: streamerRun,
  90. streamerSpec: streamerSpec,
  91. }
  92. }
  93. // UpdateLocalAddr updates underlayaddress of the running node
  94. func (b *Bzz) UpdateLocalAddr(byteaddr []byte) *BzzAddr {
  95. b.localAddr = b.localAddr.Update(&BzzAddr{
  96. UAddr: byteaddr,
  97. OAddr: b.localAddr.OAddr,
  98. })
  99. return b.localAddr
  100. }
  101. // NodeInfo returns the node's overlay address
  102. func (b *Bzz) NodeInfo() interface{} {
  103. return b.localAddr.Address()
  104. }
  105. // Protocols return the protocols swarm offers
  106. // Bzz implements the node.Service interface
  107. // * handshake/hive
  108. // * discovery
  109. func (b *Bzz) Protocols() []p2p.Protocol {
  110. protocol := []p2p.Protocol{
  111. {
  112. Name: BzzSpec.Name,
  113. Version: BzzSpec.Version,
  114. Length: BzzSpec.Length(),
  115. Run: b.runBzz,
  116. NodeInfo: b.NodeInfo,
  117. },
  118. {
  119. Name: DiscoverySpec.Name,
  120. Version: DiscoverySpec.Version,
  121. Length: DiscoverySpec.Length(),
  122. Run: b.RunProtocol(DiscoverySpec, b.Hive.Run),
  123. NodeInfo: b.Hive.NodeInfo,
  124. PeerInfo: b.Hive.PeerInfo,
  125. },
  126. }
  127. if b.streamerSpec != nil && b.streamerRun != nil {
  128. protocol = append(protocol, p2p.Protocol{
  129. Name: b.streamerSpec.Name,
  130. Version: b.streamerSpec.Version,
  131. Length: b.streamerSpec.Length(),
  132. Run: b.RunProtocol(b.streamerSpec, b.streamerRun),
  133. })
  134. }
  135. return protocol
  136. }
  137. // APIs returns the APIs offered by bzz
  138. // * hive
  139. // Bzz implements the node.Service interface
  140. func (b *Bzz) APIs() []rpc.API {
  141. return []rpc.API{{
  142. Namespace: "hive",
  143. Version: "3.0",
  144. Service: b.Hive,
  145. }}
  146. }
  147. // RunProtocol is a wrapper for swarm subprotocols
  148. // returns a p2p protocol run function that can be assigned to p2p.Protocol#Run field
  149. // arguments:
  150. // * p2p protocol spec
  151. // * run function taking BzzPeer as argument
  152. // this run function is meant to block for the duration of the protocol session
  153. // on return the session is terminated and the peer is disconnected
  154. // the protocol waits for the bzz handshake is negotiated
  155. // the overlay address on the BzzPeer is set from the remote handshake
  156. func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*BzzPeer) error) func(*p2p.Peer, p2p.MsgReadWriter) error {
  157. return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  158. // wait for the bzz protocol to perform the handshake
  159. handshake, _ := b.GetHandshake(p.ID())
  160. defer b.removeHandshake(p.ID())
  161. select {
  162. case <-handshake.done:
  163. case <-time.After(bzzHandshakeTimeout):
  164. return fmt.Errorf("%08x: %s protocol timeout waiting for handshake on %08x", b.BaseAddr()[:4], spec.Name, p.ID().Bytes()[:4])
  165. }
  166. if handshake.err != nil {
  167. return fmt.Errorf("%08x: %s protocol closed: %v", b.BaseAddr()[:4], spec.Name, handshake.err)
  168. }
  169. // the handshake has succeeded so construct the BzzPeer and run the protocol
  170. peer := &BzzPeer{
  171. Peer: protocols.NewPeer(p, rw, spec),
  172. BzzAddr: handshake.peerAddr,
  173. lastActive: time.Now(),
  174. LightNode: handshake.LightNode,
  175. }
  176. log.Debug("peer created", "addr", handshake.peerAddr.String())
  177. return run(peer)
  178. }
  179. }
  180. // performHandshake implements the negotiation of the bzz handshake
  181. // shared among swarm subprotocols
  182. func (b *Bzz) performHandshake(p *protocols.Peer, handshake *HandshakeMsg) error {
  183. ctx, cancel := context.WithTimeout(context.Background(), bzzHandshakeTimeout)
  184. defer func() {
  185. close(handshake.done)
  186. cancel()
  187. }()
  188. rsh, err := p.Handshake(ctx, handshake, b.checkHandshake)
  189. if err != nil {
  190. handshake.err = err
  191. return err
  192. }
  193. handshake.peerAddr = rsh.(*HandshakeMsg).Addr
  194. handshake.LightNode = rsh.(*HandshakeMsg).LightNode
  195. return nil
  196. }
  197. // runBzz is the p2p protocol run function for the bzz base protocol
  198. // that negotiates the bzz handshake
  199. func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  200. handshake, _ := b.GetHandshake(p.ID())
  201. if !<-handshake.init {
  202. return fmt.Errorf("%08x: bzz already started on peer %08x", b.localAddr.Over()[:4], p.ID().Bytes()[:4])
  203. }
  204. close(handshake.init)
  205. defer b.removeHandshake(p.ID())
  206. peer := protocols.NewPeer(p, rw, BzzSpec)
  207. err := b.performHandshake(peer, handshake)
  208. if err != nil {
  209. log.Warn(fmt.Sprintf("%08x: handshake failed with remote peer %08x: %v", b.localAddr.Over()[:4], p.ID().Bytes()[:4], err))
  210. return err
  211. }
  212. // fail if we get another handshake
  213. msg, err := rw.ReadMsg()
  214. if err != nil {
  215. return err
  216. }
  217. msg.Discard()
  218. return errors.New("received multiple handshakes")
  219. }
  220. // BzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer)
  221. // implements the Peer interface and all interfaces Peer implements: Addr, OverlayPeer
  222. type BzzPeer struct {
  223. *protocols.Peer // represents the connection for online peers
  224. *BzzAddr // remote address -> implements Addr interface = protocols.Peer
  225. lastActive time.Time // time is updated whenever mutexes are releasing
  226. LightNode bool
  227. }
  228. func NewBzzPeer(p *protocols.Peer) *BzzPeer {
  229. return &BzzPeer{Peer: p, BzzAddr: NewAddr(p.Node())}
  230. }
  231. // LastActive returns the time the peer was last active
  232. func (p *BzzPeer) LastActive() time.Time {
  233. return p.lastActive
  234. }
  235. // ID returns the peer's underlay node identifier.
  236. func (p *BzzPeer) ID() enode.ID {
  237. // This is here to resolve a method tie: both protocols.Peer and BzzAddr are embedded
  238. // into the struct and provide ID(). The protocols.Peer version is faster, ensure it
  239. // gets used.
  240. return p.Peer.ID()
  241. }
  242. /*
  243. Handshake
  244. * Version: 8 byte integer version of the protocol
  245. * NetworkID: 8 byte integer network identifier
  246. * Addr: the address advertised by the node including underlay and overlay connecctions
  247. */
  248. type HandshakeMsg struct {
  249. Version uint64
  250. NetworkID uint64
  251. Addr *BzzAddr
  252. LightNode bool
  253. // peerAddr is the address received in the peer handshake
  254. peerAddr *BzzAddr
  255. init chan bool
  256. done chan struct{}
  257. err error
  258. }
  259. // String pretty prints the handshake
  260. func (bh *HandshakeMsg) String() string {
  261. return fmt.Sprintf("Handshake: Version: %v, NetworkID: %v, Addr: %v, LightNode: %v, peerAddr: %v", bh.Version, bh.NetworkID, bh.Addr, bh.LightNode, bh.peerAddr)
  262. }
  263. // Perform initiates the handshake and validates the remote handshake message
  264. func (b *Bzz) checkHandshake(hs interface{}) error {
  265. rhs := hs.(*HandshakeMsg)
  266. if rhs.NetworkID != b.NetworkID {
  267. return fmt.Errorf("network id mismatch %d (!= %d)", rhs.NetworkID, b.NetworkID)
  268. }
  269. if rhs.Version != uint64(BzzSpec.Version) {
  270. return fmt.Errorf("version mismatch %d (!= %d)", rhs.Version, BzzSpec.Version)
  271. }
  272. return nil
  273. }
  274. // removeHandshake removes handshake for peer with peerID
  275. // from the bzz handshake store
  276. func (b *Bzz) removeHandshake(peerID enode.ID) {
  277. b.mtx.Lock()
  278. defer b.mtx.Unlock()
  279. delete(b.handshakes, peerID)
  280. }
  281. // GetHandshake returns the bzz handhake that the remote peer with peerID sent
  282. func (b *Bzz) GetHandshake(peerID enode.ID) (*HandshakeMsg, bool) {
  283. b.mtx.Lock()
  284. defer b.mtx.Unlock()
  285. handshake, found := b.handshakes[peerID]
  286. if !found {
  287. handshake = &HandshakeMsg{
  288. Version: uint64(BzzSpec.Version),
  289. NetworkID: b.NetworkID,
  290. Addr: b.localAddr,
  291. LightNode: b.LightNode,
  292. init: make(chan bool, 1),
  293. done: make(chan struct{}),
  294. }
  295. // when handhsake is first created for a remote peer
  296. // it is initialised with the init
  297. handshake.init <- true
  298. b.handshakes[peerID] = handshake
  299. }
  300. return handshake, found
  301. }
  302. // BzzAddr implements the PeerAddr interface
  303. type BzzAddr struct {
  304. OAddr []byte
  305. UAddr []byte
  306. }
  307. // Address implements OverlayPeer interface to be used in Overlay.
  308. func (a *BzzAddr) Address() []byte {
  309. return a.OAddr
  310. }
  311. // Over returns the overlay address.
  312. func (a *BzzAddr) Over() []byte {
  313. return a.OAddr
  314. }
  315. // Under returns the underlay address.
  316. func (a *BzzAddr) Under() []byte {
  317. return a.UAddr
  318. }
  319. // ID returns the node identifier in the underlay.
  320. func (a *BzzAddr) ID() enode.ID {
  321. n, err := enode.ParseV4(string(a.UAddr))
  322. if err != nil {
  323. return enode.ID{}
  324. }
  325. return n.ID()
  326. }
  327. // Update updates the underlay address of a peer record
  328. func (a *BzzAddr) Update(na *BzzAddr) *BzzAddr {
  329. return &BzzAddr{a.OAddr, na.UAddr}
  330. }
  331. // String pretty prints the address
  332. func (a *BzzAddr) String() string {
  333. return fmt.Sprintf("%x <%s>", a.OAddr, a.UAddr)
  334. }
  335. // RandomAddr is a utility method generating an address from a public key
  336. func RandomAddr() *BzzAddr {
  337. key, err := crypto.GenerateKey()
  338. if err != nil {
  339. panic("unable to generate key")
  340. }
  341. node := enode.NewV4(&key.PublicKey, net.IP{127, 0, 0, 1}, 30303, 30303)
  342. return NewAddr(node)
  343. }
  344. // NewAddr constucts a BzzAddr from a node record.
  345. func NewAddr(node *enode.Node) *BzzAddr {
  346. return &BzzAddr{OAddr: node.ID().Bytes(), UAddr: []byte(node.String())}
  347. }