protocol.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. /*
  18. bzz implements the swarm wire protocol [bzz] (sister of eth and shh)
  19. the protocol instance is launched on each peer by the network layer if the
  20. bzz protocol handler is registered on the p2p server.
  21. The bzz protocol component speaks the bzz protocol
  22. * handle the protocol handshake
  23. * register peers in the KΛÐΞMLIΛ table via the hive logistic manager
  24. * dispatch to hive for handling the DHT logic
  25. * encode and decode requests for storage and retrieval
  26. * handle sync protocol messages via the syncer
  27. * talks the SWAP payment protocol (swap accounting is done within NetStore)
  28. */
  29. import (
  30. "fmt"
  31. "net"
  32. "strconv"
  33. "time"
  34. "github.com/ethereum/go-ethereum/contracts/chequebook"
  35. "github.com/ethereum/go-ethereum/errs"
  36. "github.com/ethereum/go-ethereum/logger"
  37. "github.com/ethereum/go-ethereum/logger/glog"
  38. "github.com/ethereum/go-ethereum/p2p"
  39. "github.com/ethereum/go-ethereum/p2p/discover"
  40. bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
  41. "github.com/ethereum/go-ethereum/swarm/services/swap/swap"
  42. "github.com/ethereum/go-ethereum/swarm/storage"
  43. )
  44. const (
  45. Version = 0
  46. ProtocolLength = uint64(8)
  47. ProtocolMaxMsgSize = 10 * 1024 * 1024
  48. NetworkId = 3
  49. )
  50. const (
  51. ErrMsgTooLarge = iota
  52. ErrDecode
  53. ErrInvalidMsgCode
  54. ErrVersionMismatch
  55. ErrNetworkIdMismatch
  56. ErrNoStatusMsg
  57. ErrExtraStatusMsg
  58. ErrSwap
  59. ErrSync
  60. ErrUnwanted
  61. )
  62. var errorToString = map[int]string{
  63. ErrMsgTooLarge: "Message too long",
  64. ErrDecode: "Invalid message",
  65. ErrInvalidMsgCode: "Invalid message code",
  66. ErrVersionMismatch: "Protocol version mismatch",
  67. ErrNetworkIdMismatch: "NetworkId mismatch",
  68. ErrNoStatusMsg: "No status message",
  69. ErrExtraStatusMsg: "Extra status message",
  70. ErrSwap: "SWAP error",
  71. ErrSync: "Sync error",
  72. ErrUnwanted: "Unwanted peer",
  73. }
  74. // bzz represents the swarm wire protocol
  75. // an instance is running on each peer
  76. type bzz struct {
  77. selfID discover.NodeID // peer's node id used in peer advertising in handshake
  78. key storage.Key // baseaddress as storage.Key
  79. storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol
  80. hive *Hive // the logistic manager, peerPool, routing service and peer handler
  81. dbAccess *DbAccess // access to db storage counter and iterator for syncing
  82. requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing
  83. remoteAddr *peerAddr // remote peers address
  84. peer *p2p.Peer // the p2p peer object
  85. rw p2p.MsgReadWriter // messageReadWriter to send messages to
  86. errors *errs.Errors // errors table
  87. backend chequebook.Backend
  88. lastActive time.Time
  89. NetworkId uint64
  90. swap *swap.Swap // swap instance for the peer connection
  91. swapParams *bzzswap.SwapParams // swap settings both local and remote
  92. swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake)
  93. syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake)
  94. syncer *syncer // syncer instance for the peer connection
  95. syncParams *SyncParams // syncer params
  96. syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter)
  97. }
  98. // interface type for handler of storage/retrieval related requests coming
  99. // via the bzz wire protocol
  100. // messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest
  101. type StorageHandler interface {
  102. HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
  103. HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error
  104. HandleStoreRequestMsg(req *storeRequestMsgData, p *peer)
  105. HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
  106. }
  107. /*
  108. main entrypoint, wrappers starting a server that will run the bzz protocol
  109. use this constructor to attach the protocol ("class") to server caps
  110. This is done by node.Node#Register(func(node.ServiceContext) (Service, error))
  111. Service implements Protocols() which is an array of protocol constructors
  112. at node startup the protocols are initialised
  113. the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
  114. on each peer connection
  115. The Run function of the Bzz protocol class creates a bzz instance
  116. which will represent the peer for the swarm hive and all peer-aware components
  117. */
  118. func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64) (p2p.Protocol, error) {
  119. // a single global request db is created for all peer connections
  120. // this is to persist delivery backlog and aid syncronisation
  121. requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath)
  122. if err != nil {
  123. return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err)
  124. }
  125. if networkId == 0 {
  126. networkId = NetworkId
  127. }
  128. return p2p.Protocol{
  129. Name: "bzz",
  130. Version: Version,
  131. Length: ProtocolLength,
  132. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  133. return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, networkId, p, rw)
  134. },
  135. }, nil
  136. }
  137. /*
  138. the main protocol loop that
  139. * does the handshake by exchanging statusMsg
  140. * if peer is valid and accepted, registers with the hive
  141. * then enters into a forever loop handling incoming messages
  142. * storage and retrieval related queries coming via bzz are dispatched to StorageHandler
  143. * peer-related messages are dispatched to the hive
  144. * payment related messages are relayed to SWAP service
  145. * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook)
  146. * whenever the loop terminates, the peer will disconnect with Subprotocol error
  147. * whenever handlers return an error the loop terminates
  148. */
  149. func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
  150. self := &bzz{
  151. storage: depo,
  152. backend: backend,
  153. hive: hive,
  154. dbAccess: dbaccess,
  155. requestDb: requestDb,
  156. peer: p,
  157. rw: rw,
  158. errors: &errs.Errors{
  159. Package: "BZZ",
  160. Errors: errorToString,
  161. },
  162. swapParams: sp,
  163. syncParams: sy,
  164. swapEnabled: hive.swapEnabled,
  165. syncEnabled: true,
  166. NetworkId: networkId,
  167. }
  168. // handle handshake
  169. err = self.handleStatus()
  170. if err != nil {
  171. return err
  172. }
  173. defer func() {
  174. // if the handler loop exits, the peer is disconnecting
  175. // deregister the peer in the hive
  176. self.hive.removePeer(&peer{bzz: self})
  177. if self.syncer != nil {
  178. self.syncer.stop() // quits request db and delivery loops, save requests
  179. }
  180. if self.swap != nil {
  181. self.swap.Stop() // quits chequebox autocash etc
  182. }
  183. }()
  184. // the main forever loop that handles incoming requests
  185. for {
  186. if self.hive.blockRead {
  187. glog.V(logger.Warn).Infof("Cannot read network")
  188. time.Sleep(100 * time.Millisecond)
  189. continue
  190. }
  191. err = self.handle()
  192. if err != nil {
  193. return
  194. }
  195. }
  196. }
  197. // TODO: may need to implement protocol drop only? don't want to kick off the peer
  198. // if they are useful for other protocols
  199. func (self *bzz) Drop() {
  200. self.peer.Disconnect(p2p.DiscSubprotocolError)
  201. }
  202. // one cycle of the main forever loop that handles and dispatches incoming messages
  203. func (self *bzz) handle() error {
  204. msg, err := self.rw.ReadMsg()
  205. glog.V(logger.Debug).Infof("<- %v", msg)
  206. if err != nil {
  207. return err
  208. }
  209. if msg.Size > ProtocolMaxMsgSize {
  210. return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  211. }
  212. // make sure that the payload has been fully consumed
  213. defer msg.Discard()
  214. switch msg.Code {
  215. case statusMsg:
  216. // no extra status message allowed. The one needed already handled by
  217. // handleStatus
  218. glog.V(logger.Debug).Infof("Status message: %v", msg)
  219. return self.protoError(ErrExtraStatusMsg, "")
  220. case storeRequestMsg:
  221. // store requests are dispatched to netStore
  222. var req storeRequestMsgData
  223. if err := msg.Decode(&req); err != nil {
  224. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  225. }
  226. if len(req.SData) < 9 {
  227. return self.protoError(ErrDecode, "<- %v: Data too short (%v)", msg)
  228. }
  229. // last Active time is set only when receiving chunks
  230. self.lastActive = time.Now()
  231. glog.V(logger.Detail).Infof("incoming store request: %s", req.String())
  232. // swap accounting is done within forwarding
  233. self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})
  234. case retrieveRequestMsg:
  235. // retrieve Requests are dispatched to netStore
  236. var req retrieveRequestMsgData
  237. if err := msg.Decode(&req); err != nil {
  238. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  239. }
  240. req.from = &peer{bzz: self}
  241. // if request is lookup and not to be delivered
  242. if req.isLookup() {
  243. glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from)
  244. } else if req.Key == nil {
  245. return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil")
  246. } else {
  247. // swap accounting is done within netStore
  248. self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self})
  249. }
  250. // direct response with peers, TODO: sort this out
  251. self.hive.peers(&req)
  252. case peersMsg:
  253. // response to lookups and immediate response to retrieve requests
  254. // dispatches new peer data to the hive that adds them to KADDB
  255. var req peersMsgData
  256. if err := msg.Decode(&req); err != nil {
  257. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  258. }
  259. req.from = &peer{bzz: self}
  260. glog.V(logger.Detail).Infof("<- peer addresses: %v", req)
  261. self.hive.HandlePeersMsg(&req, &peer{bzz: self})
  262. case syncRequestMsg:
  263. var req syncRequestMsgData
  264. if err := msg.Decode(&req); err != nil {
  265. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  266. }
  267. glog.V(logger.Debug).Infof("<- sync request: %v", req)
  268. self.lastActive = time.Now()
  269. self.sync(req.SyncState)
  270. case unsyncedKeysMsg:
  271. // coming from parent node offering
  272. var req unsyncedKeysMsgData
  273. if err := msg.Decode(&req); err != nil {
  274. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  275. }
  276. glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String())
  277. err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
  278. self.lastActive = time.Now()
  279. if err != nil {
  280. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  281. }
  282. case deliveryRequestMsg:
  283. // response to syncKeysMsg hashes filtered not existing in db
  284. // also relays the last synced state to the source
  285. var req deliveryRequestMsgData
  286. if err := msg.Decode(&req); err != nil {
  287. return self.protoError(ErrDecode, "<-msg %v: %v", msg, err)
  288. }
  289. glog.V(logger.Debug).Infof("<- delivery request: %s", req.String())
  290. err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
  291. self.lastActive = time.Now()
  292. if err != nil {
  293. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  294. }
  295. case paymentMsg:
  296. // swap protocol message for payment, Units paid for, Cheque paid with
  297. if self.swapEnabled {
  298. var req paymentMsgData
  299. if err := msg.Decode(&req); err != nil {
  300. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  301. }
  302. glog.V(logger.Debug).Infof("<- payment: %s", req.String())
  303. self.swap.Receive(int(req.Units), req.Promise)
  304. }
  305. default:
  306. // no other message is allowed
  307. return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
  308. }
  309. return nil
  310. }
  311. func (self *bzz) handleStatus() (err error) {
  312. handshake := &statusMsgData{
  313. Version: uint64(Version),
  314. ID: "honey",
  315. Addr: self.selfAddr(),
  316. NetworkId: uint64(self.NetworkId),
  317. Swap: &bzzswap.SwapProfile{
  318. Profile: self.swapParams.Profile,
  319. PayProfile: self.swapParams.PayProfile,
  320. },
  321. }
  322. err = p2p.Send(self.rw, statusMsg, handshake)
  323. if err != nil {
  324. self.protoError(ErrNoStatusMsg, err.Error())
  325. }
  326. // read and handle remote status
  327. var msg p2p.Msg
  328. msg, err = self.rw.ReadMsg()
  329. if err != nil {
  330. return err
  331. }
  332. if msg.Code != statusMsg {
  333. self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, statusMsg)
  334. }
  335. if msg.Size > ProtocolMaxMsgSize {
  336. return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  337. }
  338. var status statusMsgData
  339. if err := msg.Decode(&status); err != nil {
  340. return self.protoError(ErrDecode, " %v: %v", msg, err)
  341. }
  342. if status.NetworkId != self.NetworkId {
  343. return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, self.NetworkId)
  344. }
  345. if Version != status.Version {
  346. return self.protoError(ErrVersionMismatch, "%d (!= %d)", status.Version, Version)
  347. }
  348. self.remoteAddr = self.peerAddr(status.Addr)
  349. glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr())
  350. if self.swapEnabled {
  351. // set remote profile for accounting
  352. self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self)
  353. if err != nil {
  354. return self.protoError(ErrSwap, "%v", err)
  355. }
  356. }
  357. glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId)
  358. err = self.hive.addPeer(&peer{bzz: self})
  359. if err != nil {
  360. return self.protoError(ErrUnwanted, "%v", err)
  361. }
  362. // hive sets syncstate so sync should start after node added
  363. glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState)
  364. self.syncRequest()
  365. return nil
  366. }
  367. func (self *bzz) sync(state *syncState) error {
  368. // syncer setup
  369. if self.syncer != nil {
  370. return self.protoError(ErrSync, "sync request can only be sent once")
  371. }
  372. cnt := self.dbAccess.counter()
  373. remoteaddr := self.remoteAddr.Addr
  374. start, stop := self.hive.kad.KeyRange(remoteaddr)
  375. // an explicitly received nil syncstate disables syncronisation
  376. if state == nil {
  377. self.syncEnabled = false
  378. glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self)
  379. state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
  380. } else {
  381. state.synced = make(chan bool)
  382. state.SessionAt = cnt
  383. if storage.IsZeroKey(state.Stop) && state.Synced {
  384. state.Start = storage.Key(start[:])
  385. state.Stop = storage.Key(stop[:])
  386. }
  387. glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state)
  388. }
  389. var err error
  390. self.syncer, err = newSyncer(
  391. self.requestDb,
  392. storage.Key(remoteaddr[:]),
  393. self.dbAccess,
  394. self.unsyncedKeys, self.store,
  395. self.syncParams, state, func() bool { return self.syncEnabled },
  396. )
  397. if err != nil {
  398. return self.protoError(ErrSync, "%v", err)
  399. }
  400. glog.V(logger.Detail).Infof("syncer set for peer %v", self)
  401. return nil
  402. }
  403. func (self *bzz) String() string {
  404. return self.remoteAddr.String()
  405. }
  406. // repair reported address if IP missing
  407. func (self *bzz) peerAddr(base *peerAddr) *peerAddr {
  408. if base.IP.IsUnspecified() {
  409. host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String())
  410. base.IP = net.ParseIP(host)
  411. }
  412. return base
  413. }
  414. // returns self advertised node connection info (listening address w enodes)
  415. // IP will get repaired on the other end if missing
  416. // or resolved via ID by discovery at dialout
  417. func (self *bzz) selfAddr() *peerAddr {
  418. id := self.hive.id
  419. host, port, _ := net.SplitHostPort(self.hive.listenAddr())
  420. intport, _ := strconv.Atoi(port)
  421. addr := &peerAddr{
  422. Addr: self.hive.addr,
  423. ID: id[:],
  424. IP: net.ParseIP(host),
  425. Port: uint16(intport),
  426. }
  427. return addr
  428. }
  429. // outgoing messages
  430. // send retrieveRequestMsg
  431. func (self *bzz) retrieve(req *retrieveRequestMsgData) error {
  432. return self.send(retrieveRequestMsg, req)
  433. }
  434. // send storeRequestMsg
  435. func (self *bzz) store(req *storeRequestMsgData) error {
  436. return self.send(storeRequestMsg, req)
  437. }
  438. func (self *bzz) syncRequest() error {
  439. req := &syncRequestMsgData{}
  440. if self.hive.syncEnabled {
  441. glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState)
  442. req.SyncState = self.syncState
  443. }
  444. if self.syncState == nil {
  445. glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState)
  446. }
  447. return self.send(syncRequestMsg, req)
  448. }
  449. // queue storeRequestMsg in request db
  450. func (self *bzz) deliveryRequest(reqs []*syncRequest) error {
  451. req := &deliveryRequestMsgData{
  452. Deliver: reqs,
  453. }
  454. return self.send(deliveryRequestMsg, req)
  455. }
  456. // batch of syncRequests to send off
  457. func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error {
  458. req := &unsyncedKeysMsgData{
  459. Unsynced: reqs,
  460. State: state,
  461. }
  462. return self.send(unsyncedKeysMsg, req)
  463. }
  464. // send paymentMsg
  465. func (self *bzz) Pay(units int, promise swap.Promise) {
  466. req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)}
  467. self.payment(req)
  468. }
  469. // send paymentMsg
  470. func (self *bzz) payment(req *paymentMsgData) error {
  471. return self.send(paymentMsg, req)
  472. }
  473. // sends peersMsg
  474. func (self *bzz) peers(req *peersMsgData) error {
  475. return self.send(peersMsg, req)
  476. }
  477. func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
  478. err = self.errors.New(code, format, params...)
  479. err.Log(glog.V(logger.Info))
  480. return
  481. }
  482. func (self *bzz) send(msg uint64, data interface{}) error {
  483. if self.hive.blockWrite {
  484. return fmt.Errorf("network write blocked")
  485. }
  486. glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self)
  487. err := p2p.Send(self.rw, msg, data)
  488. if err != nil {
  489. self.Drop()
  490. }
  491. return err
  492. }