protocol.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "net"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/crypto"
  25. "github.com/ethereum/go-ethereum/p2p"
  26. "github.com/ethereum/go-ethereum/p2p/discover"
  27. "github.com/ethereum/go-ethereum/p2p/protocols"
  28. "github.com/ethereum/go-ethereum/rpc"
  29. "github.com/ethereum/go-ethereum/swarm/log"
  30. "github.com/ethereum/go-ethereum/swarm/state"
  31. )
  32. const (
  33. DefaultNetworkID = 3
  34. // ProtocolMaxMsgSize maximum allowed message size
  35. ProtocolMaxMsgSize = 10 * 1024 * 1024
  36. // timeout for waiting
  37. bzzHandshakeTimeout = 3000 * time.Millisecond
  38. )
  39. // BzzSpec is the spec of the generic swarm handshake
  40. var BzzSpec = &protocols.Spec{
  41. Name: "bzz",
  42. Version: 5,
  43. MaxMsgSize: 10 * 1024 * 1024,
  44. Messages: []interface{}{
  45. HandshakeMsg{},
  46. },
  47. }
  48. // DiscoverySpec is the spec for the bzz discovery subprotocols
  49. var DiscoverySpec = &protocols.Spec{
  50. Name: "hive",
  51. Version: 5,
  52. MaxMsgSize: 10 * 1024 * 1024,
  53. Messages: []interface{}{
  54. peersMsg{},
  55. subPeersMsg{},
  56. },
  57. }
  58. // Addr interface that peerPool needs
  59. type Addr interface {
  60. OverlayPeer
  61. Over() []byte
  62. Under() []byte
  63. String() string
  64. Update(OverlayAddr) OverlayAddr
  65. }
  66. // Peer interface represents an live peer connection
  67. type Peer interface {
  68. Addr // the address of a peer
  69. Conn // the live connection (protocols.Peer)
  70. LastActive() time.Time // last time active
  71. }
  72. // Conn interface represents an live peer connection
  73. type Conn interface {
  74. ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool
  75. Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages
  76. Send(context.Context, interface{}) error // can send messages
  77. Drop(error) // disconnect this peer
  78. Run(func(context.Context, interface{}) error) error // the run function to run a protocol
  79. Off() OverlayAddr
  80. }
  81. // BzzConfig captures the config params used by the hive
  82. type BzzConfig struct {
  83. OverlayAddr []byte // base address of the overlay network
  84. UnderlayAddr []byte // node's underlay address
  85. HiveParams *HiveParams
  86. NetworkID uint64
  87. }
  88. // Bzz is the swarm protocol bundle
  89. type Bzz struct {
  90. *Hive
  91. NetworkID uint64
  92. localAddr *BzzAddr
  93. mtx sync.Mutex
  94. handshakes map[discover.NodeID]*HandshakeMsg
  95. streamerSpec *protocols.Spec
  96. streamerRun func(*BzzPeer) error
  97. }
  98. // NewBzz is the swarm protocol constructor
  99. // arguments
  100. // * bzz config
  101. // * overlay driver
  102. // * peer store
  103. func NewBzz(config *BzzConfig, kad Overlay, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
  104. return &Bzz{
  105. Hive: NewHive(config.HiveParams, kad, store),
  106. NetworkID: config.NetworkID,
  107. localAddr: &BzzAddr{config.OverlayAddr, config.UnderlayAddr},
  108. handshakes: make(map[discover.NodeID]*HandshakeMsg),
  109. streamerRun: streamerRun,
  110. streamerSpec: streamerSpec,
  111. }
  112. }
  113. // UpdateLocalAddr updates underlayaddress of the running node
  114. func (b *Bzz) UpdateLocalAddr(byteaddr []byte) *BzzAddr {
  115. b.localAddr = b.localAddr.Update(&BzzAddr{
  116. UAddr: byteaddr,
  117. OAddr: b.localAddr.OAddr,
  118. }).(*BzzAddr)
  119. return b.localAddr
  120. }
  121. // NodeInfo returns the node's overlay address
  122. func (b *Bzz) NodeInfo() interface{} {
  123. return b.localAddr.Address()
  124. }
  125. // Protocols return the protocols swarm offers
  126. // Bzz implements the node.Service interface
  127. // * handshake/hive
  128. // * discovery
  129. func (b *Bzz) Protocols() []p2p.Protocol {
  130. protocol := []p2p.Protocol{
  131. {
  132. Name: BzzSpec.Name,
  133. Version: BzzSpec.Version,
  134. Length: BzzSpec.Length(),
  135. Run: b.runBzz,
  136. NodeInfo: b.NodeInfo,
  137. },
  138. {
  139. Name: DiscoverySpec.Name,
  140. Version: DiscoverySpec.Version,
  141. Length: DiscoverySpec.Length(),
  142. Run: b.RunProtocol(DiscoverySpec, b.Hive.Run),
  143. NodeInfo: b.Hive.NodeInfo,
  144. PeerInfo: b.Hive.PeerInfo,
  145. },
  146. }
  147. if b.streamerSpec != nil && b.streamerRun != nil {
  148. protocol = append(protocol, p2p.Protocol{
  149. Name: b.streamerSpec.Name,
  150. Version: b.streamerSpec.Version,
  151. Length: b.streamerSpec.Length(),
  152. Run: b.RunProtocol(b.streamerSpec, b.streamerRun),
  153. })
  154. }
  155. return protocol
  156. }
  157. // APIs returns the APIs offered by bzz
  158. // * hive
  159. // Bzz implements the node.Service interface
  160. func (b *Bzz) APIs() []rpc.API {
  161. return []rpc.API{{
  162. Namespace: "hive",
  163. Version: "3.0",
  164. Service: b.Hive,
  165. }}
  166. }
  167. // RunProtocol is a wrapper for swarm subprotocols
  168. // returns a p2p protocol run function that can be assigned to p2p.Protocol#Run field
  169. // arguments:
  170. // * p2p protocol spec
  171. // * run function taking BzzPeer as argument
  172. // this run function is meant to block for the duration of the protocol session
  173. // on return the session is terminated and the peer is disconnected
  174. // the protocol waits for the bzz handshake is negotiated
  175. // the overlay address on the BzzPeer is set from the remote handshake
  176. func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*BzzPeer) error) func(*p2p.Peer, p2p.MsgReadWriter) error {
  177. return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  178. // wait for the bzz protocol to perform the handshake
  179. handshake, _ := b.GetHandshake(p.ID())
  180. defer b.removeHandshake(p.ID())
  181. select {
  182. case <-handshake.done:
  183. case <-time.After(bzzHandshakeTimeout):
  184. return fmt.Errorf("%08x: %s protocol timeout waiting for handshake on %08x", b.BaseAddr()[:4], spec.Name, p.ID().Bytes()[:4])
  185. }
  186. if handshake.err != nil {
  187. return fmt.Errorf("%08x: %s protocol closed: %v", b.BaseAddr()[:4], spec.Name, handshake.err)
  188. }
  189. // the handshake has succeeded so construct the BzzPeer and run the protocol
  190. peer := &BzzPeer{
  191. Peer: protocols.NewPeer(p, rw, spec),
  192. localAddr: b.localAddr,
  193. BzzAddr: handshake.peerAddr,
  194. lastActive: time.Now(),
  195. }
  196. return run(peer)
  197. }
  198. }
  199. // performHandshake implements the negotiation of the bzz handshake
  200. // shared among swarm subprotocols
  201. func (b *Bzz) performHandshake(p *protocols.Peer, handshake *HandshakeMsg) error {
  202. ctx, cancel := context.WithTimeout(context.Background(), bzzHandshakeTimeout)
  203. defer func() {
  204. close(handshake.done)
  205. cancel()
  206. }()
  207. rsh, err := p.Handshake(ctx, handshake, b.checkHandshake)
  208. if err != nil {
  209. handshake.err = err
  210. return err
  211. }
  212. handshake.peerAddr = rsh.(*HandshakeMsg).Addr
  213. return nil
  214. }
  215. // runBzz is the p2p protocol run function for the bzz base protocol
  216. // that negotiates the bzz handshake
  217. func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  218. handshake, _ := b.GetHandshake(p.ID())
  219. if !<-handshake.init {
  220. return fmt.Errorf("%08x: bzz already started on peer %08x", b.localAddr.Over()[:4], ToOverlayAddr(p.ID().Bytes())[:4])
  221. }
  222. close(handshake.init)
  223. defer b.removeHandshake(p.ID())
  224. peer := protocols.NewPeer(p, rw, BzzSpec)
  225. err := b.performHandshake(peer, handshake)
  226. if err != nil {
  227. log.Warn(fmt.Sprintf("%08x: handshake failed with remote peer %08x: %v", b.localAddr.Over()[:4], ToOverlayAddr(p.ID().Bytes())[:4], err))
  228. return err
  229. }
  230. // fail if we get another handshake
  231. msg, err := rw.ReadMsg()
  232. if err != nil {
  233. return err
  234. }
  235. msg.Discard()
  236. return errors.New("received multiple handshakes")
  237. }
  238. // BzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer)
  239. // implements the Peer interface and all interfaces Peer implements: Addr, OverlayPeer
  240. type BzzPeer struct {
  241. *protocols.Peer // represents the connection for online peers
  242. localAddr *BzzAddr // local Peers address
  243. *BzzAddr // remote address -> implements Addr interface = protocols.Peer
  244. lastActive time.Time // time is updated whenever mutexes are releasing
  245. }
  246. func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer {
  247. return &BzzPeer{
  248. Peer: p,
  249. localAddr: addr,
  250. BzzAddr: NewAddrFromNodeID(p.ID()),
  251. }
  252. }
  253. // Off returns the overlay peer record for offline persistence
  254. func (p *BzzPeer) Off() OverlayAddr {
  255. return p.BzzAddr
  256. }
  257. // LastActive returns the time the peer was last active
  258. func (p *BzzPeer) LastActive() time.Time {
  259. return p.lastActive
  260. }
  261. /*
  262. Handshake
  263. * Version: 8 byte integer version of the protocol
  264. * NetworkID: 8 byte integer network identifier
  265. * Addr: the address advertised by the node including underlay and overlay connecctions
  266. */
  267. type HandshakeMsg struct {
  268. Version uint64
  269. NetworkID uint64
  270. Addr *BzzAddr
  271. // peerAddr is the address received in the peer handshake
  272. peerAddr *BzzAddr
  273. init chan bool
  274. done chan struct{}
  275. err error
  276. }
  277. // String pretty prints the handshake
  278. func (bh *HandshakeMsg) String() string {
  279. return fmt.Sprintf("Handshake: Version: %v, NetworkID: %v, Addr: %v", bh.Version, bh.NetworkID, bh.Addr)
  280. }
  281. // Perform initiates the handshake and validates the remote handshake message
  282. func (b *Bzz) checkHandshake(hs interface{}) error {
  283. rhs := hs.(*HandshakeMsg)
  284. if rhs.NetworkID != b.NetworkID {
  285. return fmt.Errorf("network id mismatch %d (!= %d)", rhs.NetworkID, b.NetworkID)
  286. }
  287. if rhs.Version != uint64(BzzSpec.Version) {
  288. return fmt.Errorf("version mismatch %d (!= %d)", rhs.Version, BzzSpec.Version)
  289. }
  290. return nil
  291. }
  292. // removeHandshake removes handshake for peer with peerID
  293. // from the bzz handshake store
  294. func (b *Bzz) removeHandshake(peerID discover.NodeID) {
  295. b.mtx.Lock()
  296. defer b.mtx.Unlock()
  297. delete(b.handshakes, peerID)
  298. }
  299. // GetHandshake returns the bzz handhake that the remote peer with peerID sent
  300. func (b *Bzz) GetHandshake(peerID discover.NodeID) (*HandshakeMsg, bool) {
  301. b.mtx.Lock()
  302. defer b.mtx.Unlock()
  303. handshake, found := b.handshakes[peerID]
  304. if !found {
  305. handshake = &HandshakeMsg{
  306. Version: uint64(BzzSpec.Version),
  307. NetworkID: b.NetworkID,
  308. Addr: b.localAddr,
  309. init: make(chan bool, 1),
  310. done: make(chan struct{}),
  311. }
  312. // when handhsake is first created for a remote peer
  313. // it is initialised with the init
  314. handshake.init <- true
  315. b.handshakes[peerID] = handshake
  316. }
  317. return handshake, found
  318. }
  319. // BzzAddr implements the PeerAddr interface
  320. type BzzAddr struct {
  321. OAddr []byte
  322. UAddr []byte
  323. }
  324. // Address implements OverlayPeer interface to be used in Overlay
  325. func (a *BzzAddr) Address() []byte {
  326. return a.OAddr
  327. }
  328. // Over returns the overlay address
  329. func (a *BzzAddr) Over() []byte {
  330. return a.OAddr
  331. }
  332. // Under returns the underlay address
  333. func (a *BzzAddr) Under() []byte {
  334. return a.UAddr
  335. }
  336. // ID returns the nodeID from the underlay enode address
  337. func (a *BzzAddr) ID() discover.NodeID {
  338. return discover.MustParseNode(string(a.UAddr)).ID
  339. }
  340. // Update updates the underlay address of a peer record
  341. func (a *BzzAddr) Update(na OverlayAddr) OverlayAddr {
  342. return &BzzAddr{a.OAddr, na.(Addr).Under()}
  343. }
  344. // String pretty prints the address
  345. func (a *BzzAddr) String() string {
  346. return fmt.Sprintf("%x <%s>", a.OAddr, a.UAddr)
  347. }
  348. // RandomAddr is a utility method generating an address from a public key
  349. func RandomAddr() *BzzAddr {
  350. key, err := crypto.GenerateKey()
  351. if err != nil {
  352. panic("unable to generate key")
  353. }
  354. pubkey := crypto.FromECDSAPub(&key.PublicKey)
  355. var id discover.NodeID
  356. copy(id[:], pubkey[1:])
  357. return NewAddrFromNodeID(id)
  358. }
  359. // NewNodeIDFromAddr transforms the underlay address to an adapters.NodeID
  360. func NewNodeIDFromAddr(addr Addr) discover.NodeID {
  361. log.Info(fmt.Sprintf("uaddr=%s", string(addr.Under())))
  362. node := discover.MustParseNode(string(addr.Under()))
  363. return node.ID
  364. }
  365. // NewAddrFromNodeID constucts a BzzAddr from a discover.NodeID
  366. // the overlay address is derived as the hash of the nodeID
  367. func NewAddrFromNodeID(id discover.NodeID) *BzzAddr {
  368. return &BzzAddr{
  369. OAddr: ToOverlayAddr(id.Bytes()),
  370. UAddr: []byte(discover.NewNode(id, net.IP{127, 0, 0, 1}, 30303, 30303).String()),
  371. }
  372. }
  373. // NewAddrFromNodeIDAndPort constucts a BzzAddr from a discover.NodeID and port uint16
  374. // the overlay address is derived as the hash of the nodeID
  375. func NewAddrFromNodeIDAndPort(id discover.NodeID, host net.IP, port uint16) *BzzAddr {
  376. return &BzzAddr{
  377. OAddr: ToOverlayAddr(id.Bytes()),
  378. UAddr: []byte(discover.NewNode(id, host, port, port).String()),
  379. }
  380. }
  381. // ToOverlayAddr creates an overlayaddress from a byte slice
  382. func ToOverlayAddr(id []byte) []byte {
  383. return crypto.Keccak256(id)
  384. }