protocol.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. /*
  18. bzz implements the swarm wire protocol [bzz] (sister of eth and shh)
  19. the protocol instance is launched on each peer by the network layer if the
  20. bzz protocol handler is registered on the p2p server.
  21. The bzz protocol component speaks the bzz protocol
  22. * handle the protocol handshake
  23. * register peers in the KΛÐΞMLIΛ table via the hive logistic manager
  24. * dispatch to hive for handling the DHT logic
  25. * encode and decode requests for storage and retrieval
  26. * handle sync protocol messages via the syncer
  27. * talks the SWAP payment protocol (swap accounting is done within NetStore)
  28. */
  29. import (
  30. "fmt"
  31. "net"
  32. "strconv"
  33. "time"
  34. "github.com/ethereum/go-ethereum/contracts/chequebook"
  35. "github.com/ethereum/go-ethereum/errs"
  36. "github.com/ethereum/go-ethereum/logger"
  37. "github.com/ethereum/go-ethereum/logger/glog"
  38. "github.com/ethereum/go-ethereum/p2p"
  39. "github.com/ethereum/go-ethereum/p2p/discover"
  40. bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
  41. "github.com/ethereum/go-ethereum/swarm/services/swap/swap"
  42. "github.com/ethereum/go-ethereum/swarm/storage"
  43. )
  44. const (
  45. Version = 0
  46. ProtocolLength = uint64(8)
  47. ProtocolMaxMsgSize = 10 * 1024 * 1024
  48. NetworkId = 322
  49. )
  50. const (
  51. ErrMsgTooLarge = iota
  52. ErrDecode
  53. ErrInvalidMsgCode
  54. ErrVersionMismatch
  55. ErrNetworkIdMismatch
  56. ErrNoStatusMsg
  57. ErrExtraStatusMsg
  58. ErrSwap
  59. ErrSync
  60. ErrUnwanted
  61. )
  62. var errorToString = map[int]string{
  63. ErrMsgTooLarge: "Message too long",
  64. ErrDecode: "Invalid message",
  65. ErrInvalidMsgCode: "Invalid message code",
  66. ErrVersionMismatch: "Protocol version mismatch",
  67. ErrNetworkIdMismatch: "NetworkId mismatch",
  68. ErrNoStatusMsg: "No status message",
  69. ErrExtraStatusMsg: "Extra status message",
  70. ErrSwap: "SWAP error",
  71. ErrSync: "Sync error",
  72. ErrUnwanted: "Unwanted peer",
  73. }
  74. // bzz represents the swarm wire protocol
  75. // an instance is running on each peer
  76. type bzz struct {
  77. selfID discover.NodeID // peer's node id used in peer advertising in handshake
  78. key storage.Key // baseaddress as storage.Key
  79. storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol
  80. hive *Hive // the logistic manager, peerPool, routing service and peer handler
  81. dbAccess *DbAccess // access to db storage counter and iterator for syncing
  82. requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing
  83. remoteAddr *peerAddr // remote peers address
  84. peer *p2p.Peer // the p2p peer object
  85. rw p2p.MsgReadWriter // messageReadWriter to send messages to
  86. errors *errs.Errors // errors table
  87. backend chequebook.Backend
  88. lastActive time.Time
  89. swap *swap.Swap // swap instance for the peer connection
  90. swapParams *bzzswap.SwapParams // swap settings both local and remote
  91. swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake)
  92. syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake)
  93. syncer *syncer // syncer instance for the peer connection
  94. syncParams *SyncParams // syncer params
  95. syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter)
  96. }
  97. // interface type for handler of storage/retrieval related requests coming
  98. // via the bzz wire protocol
  99. // messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest
  100. type StorageHandler interface {
  101. HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
  102. HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error
  103. HandleStoreRequestMsg(req *storeRequestMsgData, p *peer)
  104. HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
  105. }
  106. /*
  107. main entrypoint, wrappers starting a server that will run the bzz protocol
  108. use this constructor to attach the protocol ("class") to server caps
  109. This is done by node.Node#Register(func(node.ServiceContext) (Service, error))
  110. Service implements Protocols() which is an array of protocol constructors
  111. at node startup the protocols are initialised
  112. the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
  113. on each peer connection
  114. The Run function of the Bzz protocol class creates a bzz instance
  115. which will represent the peer for the swarm hive and all peer-aware components
  116. */
  117. func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams) (p2p.Protocol, error) {
  118. // a single global request db is created for all peer connections
  119. // this is to persist delivery backlog and aid syncronisation
  120. requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath)
  121. if err != nil {
  122. return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err)
  123. }
  124. return p2p.Protocol{
  125. Name: "bzz",
  126. Version: Version,
  127. Length: ProtocolLength,
  128. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  129. return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, p, rw)
  130. },
  131. }, nil
  132. }
  133. /*
  134. the main protocol loop that
  135. * does the handshake by exchanging statusMsg
  136. * if peer is valid and accepted, registers with the hive
  137. * then enters into a forever loop handling incoming messages
  138. * storage and retrieval related queries coming via bzz are dispatched to StorageHandler
  139. * peer-related messages are dispatched to the hive
  140. * payment related messages are relayed to SWAP service
  141. * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook)
  142. * whenever the loop terminates, the peer will disconnect with Subprotocol error
  143. * whenever handlers return an error the loop terminates
  144. */
  145. func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
  146. self := &bzz{
  147. storage: depo,
  148. backend: backend,
  149. hive: hive,
  150. dbAccess: dbaccess,
  151. requestDb: requestDb,
  152. peer: p,
  153. rw: rw,
  154. errors: &errs.Errors{
  155. Package: "BZZ",
  156. Errors: errorToString,
  157. },
  158. swapParams: sp,
  159. syncParams: sy,
  160. swapEnabled: hive.swapEnabled,
  161. syncEnabled: true,
  162. }
  163. // handle handshake
  164. err = self.handleStatus()
  165. if err != nil {
  166. return err
  167. }
  168. defer func() {
  169. // if the handler loop exits, the peer is disconnecting
  170. // deregister the peer in the hive
  171. self.hive.removePeer(&peer{bzz: self})
  172. if self.syncer != nil {
  173. self.syncer.stop() // quits request db and delivery loops, save requests
  174. }
  175. if self.swap != nil {
  176. self.swap.Stop() // quits chequebox autocash etc
  177. }
  178. }()
  179. // the main forever loop that handles incoming requests
  180. for {
  181. if self.hive.blockRead {
  182. glog.V(logger.Warn).Infof("Cannot read network")
  183. time.Sleep(100 * time.Millisecond)
  184. continue
  185. }
  186. err = self.handle()
  187. if err != nil {
  188. return
  189. }
  190. }
  191. }
  192. // TODO: may need to implement protocol drop only? don't want to kick off the peer
  193. // if they are useful for other protocols
  194. func (self *bzz) Drop() {
  195. self.peer.Disconnect(p2p.DiscSubprotocolError)
  196. }
  197. // one cycle of the main forever loop that handles and dispatches incoming messages
  198. func (self *bzz) handle() error {
  199. msg, err := self.rw.ReadMsg()
  200. glog.V(logger.Debug).Infof("<- %v", msg)
  201. if err != nil {
  202. return err
  203. }
  204. if msg.Size > ProtocolMaxMsgSize {
  205. return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  206. }
  207. // make sure that the payload has been fully consumed
  208. defer msg.Discard()
  209. switch msg.Code {
  210. case statusMsg:
  211. // no extra status message allowed. The one needed already handled by
  212. // handleStatus
  213. glog.V(logger.Debug).Infof("Status message: %v", msg)
  214. return self.protoError(ErrExtraStatusMsg, "")
  215. case storeRequestMsg:
  216. // store requests are dispatched to netStore
  217. var req storeRequestMsgData
  218. if err := msg.Decode(&req); err != nil {
  219. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  220. }
  221. if len(req.SData) < 9 {
  222. return self.protoError(ErrDecode, "<- %v: Data too short (%v)", msg)
  223. }
  224. // last Active time is set only when receiving chunks
  225. self.lastActive = time.Now()
  226. glog.V(logger.Detail).Infof("incoming store request: %s", req.String())
  227. // swap accounting is done within forwarding
  228. self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})
  229. case retrieveRequestMsg:
  230. // retrieve Requests are dispatched to netStore
  231. var req retrieveRequestMsgData
  232. if err := msg.Decode(&req); err != nil {
  233. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  234. }
  235. req.from = &peer{bzz: self}
  236. // if request is lookup and not to be delivered
  237. if req.isLookup() {
  238. glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from)
  239. } else if req.Key == nil {
  240. return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil")
  241. } else {
  242. // swap accounting is done within netStore
  243. self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self})
  244. }
  245. // direct response with peers, TODO: sort this out
  246. self.hive.peers(&req)
  247. case peersMsg:
  248. // response to lookups and immediate response to retrieve requests
  249. // dispatches new peer data to the hive that adds them to KADDB
  250. var req peersMsgData
  251. if err := msg.Decode(&req); err != nil {
  252. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  253. }
  254. req.from = &peer{bzz: self}
  255. glog.V(logger.Detail).Infof("<- peer addresses: %v", req)
  256. self.hive.HandlePeersMsg(&req, &peer{bzz: self})
  257. case syncRequestMsg:
  258. var req syncRequestMsgData
  259. if err := msg.Decode(&req); err != nil {
  260. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  261. }
  262. glog.V(logger.Debug).Infof("<- sync request: %v", req)
  263. self.lastActive = time.Now()
  264. self.sync(req.SyncState)
  265. case unsyncedKeysMsg:
  266. // coming from parent node offering
  267. var req unsyncedKeysMsgData
  268. if err := msg.Decode(&req); err != nil {
  269. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  270. }
  271. glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String())
  272. err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
  273. self.lastActive = time.Now()
  274. if err != nil {
  275. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  276. }
  277. case deliveryRequestMsg:
  278. // response to syncKeysMsg hashes filtered not existing in db
  279. // also relays the last synced state to the source
  280. var req deliveryRequestMsgData
  281. if err := msg.Decode(&req); err != nil {
  282. return self.protoError(ErrDecode, "<-msg %v: %v", msg, err)
  283. }
  284. glog.V(logger.Debug).Infof("<- delivery request: %s", req.String())
  285. err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
  286. self.lastActive = time.Now()
  287. if err != nil {
  288. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  289. }
  290. case paymentMsg:
  291. // swap protocol message for payment, Units paid for, Cheque paid with
  292. if self.swapEnabled {
  293. var req paymentMsgData
  294. if err := msg.Decode(&req); err != nil {
  295. return self.protoError(ErrDecode, "<- %v: %v", msg, err)
  296. }
  297. glog.V(logger.Debug).Infof("<- payment: %s", req.String())
  298. self.swap.Receive(int(req.Units), req.Promise)
  299. }
  300. default:
  301. // no other message is allowed
  302. return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
  303. }
  304. return nil
  305. }
  306. func (self *bzz) handleStatus() (err error) {
  307. handshake := &statusMsgData{
  308. Version: uint64(Version),
  309. ID: "honey",
  310. Addr: self.selfAddr(),
  311. NetworkId: uint64(NetworkId),
  312. Swap: &bzzswap.SwapProfile{
  313. Profile: self.swapParams.Profile,
  314. PayProfile: self.swapParams.PayProfile,
  315. },
  316. }
  317. err = p2p.Send(self.rw, statusMsg, handshake)
  318. if err != nil {
  319. self.protoError(ErrNoStatusMsg, err.Error())
  320. }
  321. // read and handle remote status
  322. var msg p2p.Msg
  323. msg, err = self.rw.ReadMsg()
  324. if err != nil {
  325. return err
  326. }
  327. if msg.Code != statusMsg {
  328. self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, statusMsg)
  329. }
  330. if msg.Size > ProtocolMaxMsgSize {
  331. return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  332. }
  333. var status statusMsgData
  334. if err := msg.Decode(&status); err != nil {
  335. return self.protoError(ErrDecode, " %v: %v", msg, err)
  336. }
  337. if status.NetworkId != NetworkId {
  338. return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId)
  339. }
  340. if Version != status.Version {
  341. return self.protoError(ErrVersionMismatch, "%d (!= %d)", status.Version, Version)
  342. }
  343. self.remoteAddr = self.peerAddr(status.Addr)
  344. glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr())
  345. if self.swapEnabled {
  346. // set remote profile for accounting
  347. self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self)
  348. if err != nil {
  349. return self.protoError(ErrSwap, "%v", err)
  350. }
  351. }
  352. glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId)
  353. err = self.hive.addPeer(&peer{bzz: self})
  354. if err != nil {
  355. return self.protoError(ErrUnwanted, "%v", err)
  356. }
  357. // hive sets syncstate so sync should start after node added
  358. glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState)
  359. self.syncRequest()
  360. return nil
  361. }
  362. func (self *bzz) sync(state *syncState) error {
  363. // syncer setup
  364. if self.syncer != nil {
  365. return self.protoError(ErrSync, "sync request can only be sent once")
  366. }
  367. cnt := self.dbAccess.counter()
  368. remoteaddr := self.remoteAddr.Addr
  369. start, stop := self.hive.kad.KeyRange(remoteaddr)
  370. // an explicitly received nil syncstate disables syncronisation
  371. if state == nil {
  372. self.syncEnabled = false
  373. glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self)
  374. state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
  375. } else {
  376. state.synced = make(chan bool)
  377. state.SessionAt = cnt
  378. if storage.IsZeroKey(state.Stop) && state.Synced {
  379. state.Start = storage.Key(start[:])
  380. state.Stop = storage.Key(stop[:])
  381. }
  382. glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state)
  383. }
  384. var err error
  385. self.syncer, err = newSyncer(
  386. self.requestDb,
  387. storage.Key(remoteaddr[:]),
  388. self.dbAccess,
  389. self.unsyncedKeys, self.store,
  390. self.syncParams, state, func() bool { return self.syncEnabled },
  391. )
  392. if err != nil {
  393. return self.protoError(ErrSync, "%v", err)
  394. }
  395. glog.V(logger.Detail).Infof("syncer set for peer %v", self)
  396. return nil
  397. }
  398. func (self *bzz) String() string {
  399. return self.remoteAddr.String()
  400. }
  401. // repair reported address if IP missing
  402. func (self *bzz) peerAddr(base *peerAddr) *peerAddr {
  403. if base.IP.IsUnspecified() {
  404. host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String())
  405. base.IP = net.ParseIP(host)
  406. }
  407. return base
  408. }
  409. // returns self advertised node connection info (listening address w enodes)
  410. // IP will get repaired on the other end if missing
  411. // or resolved via ID by discovery at dialout
  412. func (self *bzz) selfAddr() *peerAddr {
  413. id := self.hive.id
  414. host, port, _ := net.SplitHostPort(self.hive.listenAddr())
  415. intport, _ := strconv.Atoi(port)
  416. addr := &peerAddr{
  417. Addr: self.hive.addr,
  418. ID: id[:],
  419. IP: net.ParseIP(host),
  420. Port: uint16(intport),
  421. }
  422. return addr
  423. }
  424. // outgoing messages
  425. // send retrieveRequestMsg
  426. func (self *bzz) retrieve(req *retrieveRequestMsgData) error {
  427. return self.send(retrieveRequestMsg, req)
  428. }
  429. // send storeRequestMsg
  430. func (self *bzz) store(req *storeRequestMsgData) error {
  431. return self.send(storeRequestMsg, req)
  432. }
  433. func (self *bzz) syncRequest() error {
  434. req := &syncRequestMsgData{}
  435. if self.hive.syncEnabled {
  436. glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState)
  437. req.SyncState = self.syncState
  438. }
  439. if self.syncState == nil {
  440. glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState)
  441. }
  442. return self.send(syncRequestMsg, req)
  443. }
  444. // queue storeRequestMsg in request db
  445. func (self *bzz) deliveryRequest(reqs []*syncRequest) error {
  446. req := &deliveryRequestMsgData{
  447. Deliver: reqs,
  448. }
  449. return self.send(deliveryRequestMsg, req)
  450. }
  451. // batch of syncRequests to send off
  452. func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error {
  453. req := &unsyncedKeysMsgData{
  454. Unsynced: reqs,
  455. State: state,
  456. }
  457. return self.send(unsyncedKeysMsg, req)
  458. }
  459. // send paymentMsg
  460. func (self *bzz) Pay(units int, promise swap.Promise) {
  461. req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)}
  462. self.payment(req)
  463. }
  464. // send paymentMsg
  465. func (self *bzz) payment(req *paymentMsgData) error {
  466. return self.send(paymentMsg, req)
  467. }
  468. // sends peersMsg
  469. func (self *bzz) peers(req *peersMsgData) error {
  470. return self.send(peersMsg, req)
  471. }
  472. func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
  473. err = self.errors.New(code, format, params...)
  474. err.Log(glog.V(logger.Info))
  475. return
  476. }
  477. func (self *bzz) protoErrorDisconnect(err *errs.Error) {
  478. err.Log(glog.V(logger.Info))
  479. if err.Fatal() {
  480. self.peer.Disconnect(p2p.DiscSubprotocolError)
  481. }
  482. }
  483. func (self *bzz) send(msg uint64, data interface{}) error {
  484. if self.hive.blockWrite {
  485. return fmt.Errorf("network write blocked")
  486. }
  487. glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self)
  488. err := p2p.Send(self.rw, msg, data)
  489. if err != nil {
  490. self.Drop()
  491. }
  492. return err
  493. }