stream.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "math"
  22. "reflect"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/metrics"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/p2p/protocols"
  29. "github.com/ethereum/go-ethereum/rpc"
  30. "github.com/ethereum/go-ethereum/swarm/log"
  31. "github.com/ethereum/go-ethereum/swarm/network"
  32. "github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
  33. "github.com/ethereum/go-ethereum/swarm/state"
  34. "github.com/ethereum/go-ethereum/swarm/storage"
  35. )
  36. const (
  37. Low uint8 = iota
  38. Mid
  39. High
  40. Top
  41. PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
  42. PriorityQueueCap = 4096 // queue capacity
  43. HashSize = 32
  44. )
  45. // Enumerate options for syncing and retrieval
  46. type SyncingOption int
  47. type RetrievalOption int
  48. // Syncing options
  49. const (
  50. // Syncing disabled
  51. SyncingDisabled SyncingOption = iota
  52. // Register the client and the server but not subscribe
  53. SyncingRegisterOnly
  54. // Both client and server funcs are registered, subscribe sent automatically
  55. SyncingAutoSubscribe
  56. )
  57. const (
  58. // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
  59. RetrievalDisabled RetrievalOption = iota
  60. // Only the client side of the retrieve request is registered.
  61. // (light nodes do not serve retrieve requests)
  62. // once the client is registered, subscription to retrieve request stream is always sent
  63. RetrievalClientOnly
  64. // Both client and server funcs are registered, subscribe sent automatically
  65. RetrievalEnabled
  66. )
  67. // subscriptionFunc is used to determine what to do in order to perform subscriptions
  68. // usually we would start to really subscribe to nodes, but for tests other functionality may be needed
  69. // (see TestRequestPeerSubscriptions in streamer_test.go)
  70. var subscriptionFunc = doRequestSubscription
  71. // Registry registry for outgoing and incoming streamer constructors
  72. type Registry struct {
  73. addr enode.ID
  74. api *API
  75. skipCheck bool
  76. clientMu sync.RWMutex
  77. serverMu sync.RWMutex
  78. peersMu sync.RWMutex
  79. serverFuncs map[string]func(*Peer, string, bool) (Server, error)
  80. clientFuncs map[string]func(*Peer, string, bool) (Client, error)
  81. peers map[enode.ID]*Peer
  82. delivery *Delivery
  83. intervalsStore state.Store
  84. autoRetrieval bool // automatically subscribe to retrieve request stream
  85. maxPeerServers int
  86. spec *protocols.Spec //this protocol's spec
  87. balance protocols.Balance //implements protocols.Balance, for accounting
  88. prices protocols.Prices //implements protocols.Prices, provides prices to accounting
  89. quit chan struct{} // terminates registry goroutines
  90. }
  91. // RegistryOptions holds optional values for NewRegistry constructor.
  92. type RegistryOptions struct {
  93. SkipCheck bool
  94. Syncing SyncingOption // Defines syncing behavior
  95. Retrieval RetrievalOption // Defines retrieval behavior
  96. SyncUpdateDelay time.Duration
  97. MaxPeerServers int // The limit of servers for each peer in registry
  98. }
  99. // NewRegistry is Streamer constructor
  100. func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
  101. if options == nil {
  102. options = &RegistryOptions{}
  103. }
  104. if options.SyncUpdateDelay <= 0 {
  105. options.SyncUpdateDelay = 15 * time.Second
  106. }
  107. // check if retrieval has been disabled
  108. retrieval := options.Retrieval != RetrievalDisabled
  109. quit := make(chan struct{})
  110. streamer := &Registry{
  111. addr: localID,
  112. skipCheck: options.SkipCheck,
  113. serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)),
  114. clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)),
  115. peers: make(map[enode.ID]*Peer),
  116. delivery: delivery,
  117. intervalsStore: intervalsStore,
  118. autoRetrieval: retrieval,
  119. maxPeerServers: options.MaxPeerServers,
  120. balance: balance,
  121. quit: quit,
  122. }
  123. streamer.setupSpec()
  124. streamer.api = NewAPI(streamer)
  125. delivery.getPeer = streamer.getPeer
  126. // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
  127. if options.Retrieval == RetrievalEnabled {
  128. streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
  129. if !live {
  130. return nil, errors.New("only live retrieval requests supported")
  131. }
  132. return NewSwarmChunkServer(delivery.chunkStore), nil
  133. })
  134. }
  135. // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
  136. if options.Retrieval != RetrievalDisabled {
  137. streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
  138. return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
  139. })
  140. }
  141. // If syncing is not disabled, the syncing functions are registered (both client and server)
  142. if options.Syncing != SyncingDisabled {
  143. RegisterSwarmSyncerServer(streamer, syncChunkStore)
  144. RegisterSwarmSyncerClient(streamer, syncChunkStore)
  145. }
  146. // if syncing is set to automatically subscribe to the syncing stream, start the subscription process
  147. if options.Syncing == SyncingAutoSubscribe {
  148. // latestIntC function ensures that
  149. // - receiving from the in chan is not blocked by processing inside the for loop
  150. // - the latest int value is delivered to the loop after the processing is done
  151. // In context of NeighbourhoodDepthC:
  152. // after the syncing is done updating inside the loop, we do not need to update on the intermediate
  153. // depth changes, only to the latest one
  154. latestIntC := func(in <-chan int) <-chan int {
  155. out := make(chan int, 1)
  156. go func() {
  157. defer close(out)
  158. for {
  159. select {
  160. case i, ok := <-in:
  161. if !ok {
  162. return
  163. }
  164. select {
  165. case <-out:
  166. default:
  167. }
  168. out <- i
  169. case <-quit:
  170. return
  171. }
  172. }
  173. }()
  174. return out
  175. }
  176. kad := streamer.delivery.kad
  177. // get notification channels from Kademlia before returning
  178. // from this function to avoid race with Close method and
  179. // the goroutine created below
  180. depthC := latestIntC(kad.NeighbourhoodDepthC())
  181. addressBookSizeC := latestIntC(kad.AddrCountC())
  182. go func() {
  183. // wait for kademlia table to be healthy
  184. // but return if Registry is closed before
  185. select {
  186. case <-time.After(options.SyncUpdateDelay):
  187. case <-quit:
  188. return
  189. }
  190. // initial requests for syncing subscription to peers
  191. streamer.updateSyncing()
  192. for depth := range depthC {
  193. log.Debug("Kademlia neighbourhood depth change", "depth", depth)
  194. // Prevent too early sync subscriptions by waiting until there are no
  195. // new peers connecting. Sync streams updating will be done after no
  196. // peers are connected for at least SyncUpdateDelay period.
  197. timer := time.NewTimer(options.SyncUpdateDelay)
  198. // Hard limit to sync update delay, preventing long delays
  199. // on a very dynamic network
  200. maxTimer := time.NewTimer(3 * time.Minute)
  201. loop:
  202. for {
  203. select {
  204. case <-maxTimer.C:
  205. // force syncing update when a hard timeout is reached
  206. log.Trace("Sync subscriptions update on hard timeout")
  207. // request for syncing subscription to new peers
  208. streamer.updateSyncing()
  209. break loop
  210. case <-timer.C:
  211. // start syncing as no new peers has been added to kademlia
  212. // for some time
  213. log.Trace("Sync subscriptions update")
  214. // request for syncing subscription to new peers
  215. streamer.updateSyncing()
  216. break loop
  217. case size := <-addressBookSizeC:
  218. log.Trace("Kademlia address book size changed on depth change", "size", size)
  219. // new peers has been added to kademlia,
  220. // reset the timer to prevent early sync subscriptions
  221. if !timer.Stop() {
  222. <-timer.C
  223. }
  224. timer.Reset(options.SyncUpdateDelay)
  225. case <-quit:
  226. break loop
  227. }
  228. }
  229. timer.Stop()
  230. maxTimer.Stop()
  231. }
  232. }()
  233. }
  234. return streamer
  235. }
  236. // This is an accounted protocol, therefore we need to provide a pricing Hook to the spec
  237. // For simulations to be able to run multiple nodes and not override the hook's balance,
  238. // we need to construct a spec instance per node instance
  239. func (r *Registry) setupSpec() {
  240. // first create the "bare" spec
  241. r.createSpec()
  242. // now create the pricing object
  243. r.createPriceOracle()
  244. // if balance is nil, this node has been started without swap support (swapEnabled flag is false)
  245. if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
  246. // swap is enabled, so setup the hook
  247. r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
  248. }
  249. }
  250. // RegisterClient registers an incoming streamer constructor
  251. func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
  252. r.clientMu.Lock()
  253. defer r.clientMu.Unlock()
  254. r.clientFuncs[stream] = f
  255. }
  256. // RegisterServer registers an outgoing streamer constructor
  257. func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error)) {
  258. r.serverMu.Lock()
  259. defer r.serverMu.Unlock()
  260. r.serverFuncs[stream] = f
  261. }
  262. // GetClient accessor for incoming streamer constructors
  263. func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error) {
  264. r.clientMu.RLock()
  265. defer r.clientMu.RUnlock()
  266. f := r.clientFuncs[stream]
  267. if f == nil {
  268. return nil, fmt.Errorf("stream %v not registered", stream)
  269. }
  270. return f, nil
  271. }
  272. // GetServer accessor for incoming streamer constructors
  273. func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error) {
  274. r.serverMu.RLock()
  275. defer r.serverMu.RUnlock()
  276. f := r.serverFuncs[stream]
  277. if f == nil {
  278. return nil, fmt.Errorf("stream %v not registered", stream)
  279. }
  280. return f, nil
  281. }
  282. func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error {
  283. // check if the stream is registered
  284. if _, err := r.GetServerFunc(s.Name); err != nil {
  285. return err
  286. }
  287. peer := r.getPeer(peerId)
  288. if peer == nil {
  289. return fmt.Errorf("peer not found %v", peerId)
  290. }
  291. if _, err := peer.getServer(s); err != nil {
  292. if e, ok := err.(*notFoundError); ok && e.t == "server" {
  293. // request subscription only if the server for this stream is not created
  294. log.Debug("RequestSubscription ", "peer", peerId, "stream", s, "history", h)
  295. return peer.Send(context.TODO(), &RequestSubscriptionMsg{
  296. Stream: s,
  297. History: h,
  298. Priority: prio,
  299. })
  300. }
  301. return err
  302. }
  303. log.Trace("RequestSubscription: already subscribed", "peer", peerId, "stream", s, "history", h)
  304. return nil
  305. }
  306. // Subscribe initiates the streamer
  307. func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error {
  308. // check if the stream is registered
  309. if _, err := r.GetClientFunc(s.Name); err != nil {
  310. return err
  311. }
  312. peer := r.getPeer(peerId)
  313. if peer == nil {
  314. return fmt.Errorf("peer not found %v", peerId)
  315. }
  316. var to uint64
  317. if !s.Live && h != nil {
  318. to = h.To
  319. }
  320. err := peer.setClientParams(s, newClientParams(priority, to))
  321. if err != nil {
  322. return err
  323. }
  324. if s.Live && h != nil {
  325. if err := peer.setClientParams(
  326. getHistoryStream(s),
  327. newClientParams(getHistoryPriority(priority), h.To),
  328. ); err != nil {
  329. return err
  330. }
  331. }
  332. msg := &SubscribeMsg{
  333. Stream: s,
  334. History: h,
  335. Priority: priority,
  336. }
  337. log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h)
  338. return peer.SendPriority(context.TODO(), msg, priority)
  339. }
  340. func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error {
  341. peer := r.getPeer(peerId)
  342. if peer == nil {
  343. return fmt.Errorf("peer not found %v", peerId)
  344. }
  345. msg := &UnsubscribeMsg{
  346. Stream: s,
  347. }
  348. log.Debug("Unsubscribe ", "peer", peerId, "stream", s)
  349. if err := peer.Send(context.TODO(), msg); err != nil {
  350. return err
  351. }
  352. return peer.removeClient(s)
  353. }
  354. // Quit sends the QuitMsg to the peer to remove the
  355. // stream peer client and terminate the streaming.
  356. func (r *Registry) Quit(peerId enode.ID, s Stream) error {
  357. peer := r.getPeer(peerId)
  358. if peer == nil {
  359. log.Debug("stream quit: peer not found", "peer", peerId, "stream", s)
  360. // if the peer is not found, abort the request
  361. return nil
  362. }
  363. msg := &QuitMsg{
  364. Stream: s,
  365. }
  366. log.Debug("Quit ", "peer", peerId, "stream", s)
  367. return peer.Send(context.TODO(), msg)
  368. }
  369. func (r *Registry) Close() error {
  370. // Stop sending neighborhood depth change and address count
  371. // change from Kademlia that were initiated in NewRegistry constructor.
  372. r.delivery.kad.CloseNeighbourhoodDepthC()
  373. r.delivery.kad.CloseAddrCountC()
  374. close(r.quit)
  375. return r.intervalsStore.Close()
  376. }
  377. func (r *Registry) getPeer(peerId enode.ID) *Peer {
  378. r.peersMu.RLock()
  379. defer r.peersMu.RUnlock()
  380. return r.peers[peerId]
  381. }
  382. func (r *Registry) setPeer(peer *Peer) {
  383. r.peersMu.Lock()
  384. r.peers[peer.ID()] = peer
  385. metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
  386. r.peersMu.Unlock()
  387. }
  388. func (r *Registry) deletePeer(peer *Peer) {
  389. r.peersMu.Lock()
  390. delete(r.peers, peer.ID())
  391. metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers)))
  392. r.peersMu.Unlock()
  393. }
  394. func (r *Registry) peersCount() (c int) {
  395. r.peersMu.Lock()
  396. c = len(r.peers)
  397. r.peersMu.Unlock()
  398. return
  399. }
  400. // Run protocol run function
  401. func (r *Registry) Run(p *network.BzzPeer) error {
  402. sp := NewPeer(p.Peer, r)
  403. r.setPeer(sp)
  404. defer r.deletePeer(sp)
  405. defer close(sp.quit)
  406. defer sp.close()
  407. if r.autoRetrieval && !p.LightNode {
  408. err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
  409. if err != nil {
  410. return err
  411. }
  412. }
  413. return sp.Run(sp.HandleMsg)
  414. }
  415. // updateSyncing subscribes to SYNC streams by iterating over the
  416. // kademlia connections and bins. If there are existing SYNC streams
  417. // and they are no longer required after iteration, request to Quit
  418. // them will be send to appropriate peers.
  419. func (r *Registry) updateSyncing() {
  420. kad := r.delivery.kad
  421. // map of all SYNC streams for all peers
  422. // used at the and of the function to remove servers
  423. // that are not needed anymore
  424. subs := make(map[enode.ID]map[Stream]struct{})
  425. r.peersMu.RLock()
  426. for id, peer := range r.peers {
  427. peer.serverMu.RLock()
  428. for stream := range peer.servers {
  429. if stream.Name == "SYNC" {
  430. if _, ok := subs[id]; !ok {
  431. subs[id] = make(map[Stream]struct{})
  432. }
  433. subs[id][stream] = struct{}{}
  434. }
  435. }
  436. peer.serverMu.RUnlock()
  437. }
  438. r.peersMu.RUnlock()
  439. // start requesting subscriptions from peers
  440. r.requestPeerSubscriptions(kad, subs)
  441. // remove SYNC servers that do not need to be subscribed
  442. for id, streams := range subs {
  443. if len(streams) == 0 {
  444. continue
  445. }
  446. peer := r.getPeer(id)
  447. if peer == nil {
  448. continue
  449. }
  450. for stream := range streams {
  451. log.Debug("Remove sync server", "peer", id, "stream", stream)
  452. err := r.Quit(peer.ID(), stream)
  453. if err != nil && err != p2p.ErrShuttingDown {
  454. log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream)
  455. }
  456. }
  457. }
  458. }
  459. // requestPeerSubscriptions calls on each live peer in the kademlia table
  460. // and sends a `RequestSubscription` to peers according to their bin
  461. // and their relationship with kademlia's depth.
  462. // Also check `TestRequestPeerSubscriptions` in order to understand the
  463. // expected behavior.
  464. // The function expects:
  465. // * the kademlia
  466. // * a map of subscriptions
  467. // * the actual function to subscribe
  468. // (in case of the test, it doesn't do real subscriptions)
  469. func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) {
  470. var startPo int
  471. var endPo int
  472. var ok bool
  473. // kademlia's depth
  474. kadDepth := kad.NeighbourhoodDepth()
  475. // request subscriptions for all nodes and bins
  476. // nil as base takes the node's base; we need to pass 255 as `EachConn` runs
  477. // from deepest bins backwards
  478. kad.EachConn(nil, 255, func(p *network.Peer, po int) bool {
  479. // nodes that do not provide stream protocol
  480. // should not be subscribed, e.g. bootnodes
  481. if !p.HasCap("stream") {
  482. return true
  483. }
  484. //if the peer's bin is shallower than the kademlia depth,
  485. //only the peer's bin should be subscribed
  486. if po < kadDepth {
  487. startPo = po
  488. endPo = po
  489. } else {
  490. //if the peer's bin is equal or deeper than the kademlia depth,
  491. //each bin from the depth up to k.MaxProxDisplay should be subscribed
  492. startPo = kadDepth
  493. endPo = kad.MaxProxDisplay
  494. }
  495. for bin := startPo; bin <= endPo; bin++ {
  496. //do the actual subscription
  497. ok = subscriptionFunc(r, p, uint8(bin), subs)
  498. }
  499. return ok
  500. })
  501. }
  502. // doRequestSubscription sends the actual RequestSubscription to the peer
  503. func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
  504. log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin)
  505. // bin is always less then 256 and it is safe to convert it to type uint8
  506. stream := NewStream("SYNC", FormatSyncBinKey(bin), true)
  507. if streams, ok := subs[p.ID()]; ok {
  508. // delete live and history streams from the map, so that it won't be removed with a Quit request
  509. delete(streams, stream)
  510. delete(streams, getHistoryStream(stream))
  511. }
  512. err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High)
  513. if err != nil {
  514. log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream)
  515. return false
  516. }
  517. return true
  518. }
  519. func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  520. peer := protocols.NewPeer(p, rw, r.spec)
  521. bp := network.NewBzzPeer(peer)
  522. np := network.NewPeer(bp, r.delivery.kad)
  523. r.delivery.kad.On(np)
  524. defer r.delivery.kad.Off(np)
  525. return r.Run(bp)
  526. }
  527. // HandleMsg is the message handler that delegates incoming messages
  528. func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
  529. select {
  530. case <-p.streamer.quit:
  531. log.Trace("message received after the streamer is closed", "peer", p.ID())
  532. // return without an error since streamer is closed and
  533. // no messages should be handled as other subcomponents like
  534. // storage leveldb may be closed
  535. return nil
  536. default:
  537. }
  538. switch msg := msg.(type) {
  539. case *SubscribeMsg:
  540. return p.handleSubscribeMsg(ctx, msg)
  541. case *SubscribeErrorMsg:
  542. return p.handleSubscribeErrorMsg(msg)
  543. case *UnsubscribeMsg:
  544. return p.handleUnsubscribeMsg(msg)
  545. case *OfferedHashesMsg:
  546. return p.handleOfferedHashesMsg(ctx, msg)
  547. case *TakeoverProofMsg:
  548. return p.handleTakeoverProofMsg(ctx, msg)
  549. case *WantedHashesMsg:
  550. return p.handleWantedHashesMsg(ctx, msg)
  551. case *ChunkDeliveryMsgRetrieval:
  552. // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
  553. return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
  554. case *ChunkDeliveryMsgSyncing:
  555. // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
  556. return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
  557. case *RetrieveRequestMsg:
  558. return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
  559. case *RequestSubscriptionMsg:
  560. return p.handleRequestSubscription(ctx, msg)
  561. case *QuitMsg:
  562. return p.handleQuitMsg(msg)
  563. default:
  564. return fmt.Errorf("unknown message type: %T", msg)
  565. }
  566. }
  567. type server struct {
  568. Server
  569. stream Stream
  570. priority uint8
  571. currentBatch []byte
  572. sessionIndex uint64
  573. }
  574. // setNextBatch adjusts passed interval based on session index and whether
  575. // stream is live or history. It calls Server SetNextBatch with adjusted
  576. // interval and returns batch hashes and their interval.
  577. func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
  578. if s.stream.Live {
  579. if from == 0 {
  580. from = s.sessionIndex
  581. }
  582. if to <= from || from >= s.sessionIndex {
  583. to = math.MaxUint64
  584. }
  585. } else {
  586. if (to < from && to != 0) || from > s.sessionIndex {
  587. return nil, 0, 0, nil, nil
  588. }
  589. if to == 0 || to > s.sessionIndex {
  590. to = s.sessionIndex
  591. }
  592. }
  593. return s.SetNextBatch(from, to)
  594. }
  595. // Server interface for outgoing peer Streamer
  596. type Server interface {
  597. // SessionIndex is called when a server is initialized
  598. // to get the current cursor state of the stream data.
  599. // Based on this index, live and history stream intervals
  600. // will be adjusted before calling SetNextBatch.
  601. SessionIndex() (uint64, error)
  602. SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
  603. GetData(context.Context, []byte) ([]byte, error)
  604. Close()
  605. }
  606. type client struct {
  607. Client
  608. stream Stream
  609. priority uint8
  610. sessionAt uint64
  611. to uint64
  612. next chan error
  613. quit chan struct{}
  614. intervalsKey string
  615. intervalsStore state.Store
  616. }
  617. func peerStreamIntervalsKey(p *Peer, s Stream) string {
  618. return p.ID().String() + s.String()
  619. }
  620. func (c *client) AddInterval(start, end uint64) (err error) {
  621. i := &intervals.Intervals{}
  622. if err = c.intervalsStore.Get(c.intervalsKey, i); err != nil {
  623. return err
  624. }
  625. i.Add(start, end)
  626. return c.intervalsStore.Put(c.intervalsKey, i)
  627. }
  628. func (c *client) NextInterval() (start, end uint64, err error) {
  629. i := &intervals.Intervals{}
  630. err = c.intervalsStore.Get(c.intervalsKey, i)
  631. if err != nil {
  632. return 0, 0, err
  633. }
  634. start, end = i.Next()
  635. return start, end, nil
  636. }
  637. // Client interface for incoming peer Streamer
  638. type Client interface {
  639. NeedData(context.Context, []byte) func(context.Context) error
  640. BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
  641. Close()
  642. }
  643. func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) {
  644. if c.to > 0 && from >= c.to {
  645. return 0, 0
  646. }
  647. if c.stream.Live {
  648. return from, 0
  649. } else if from >= c.sessionAt {
  650. if c.to > 0 {
  651. return from, c.to
  652. }
  653. return from, math.MaxUint64
  654. }
  655. nextFrom, nextTo, err := c.NextInterval()
  656. if err != nil {
  657. log.Error("next intervals", "stream", c.stream)
  658. return
  659. }
  660. if nextTo > c.to {
  661. nextTo = c.to
  662. }
  663. if nextTo == 0 {
  664. nextTo = c.sessionAt
  665. }
  666. return
  667. }
  668. func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error {
  669. if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil {
  670. tp, err := tf()
  671. if err != nil {
  672. return err
  673. }
  674. if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil {
  675. return err
  676. }
  677. if c.to > 0 && tp.Takeover.End >= c.to {
  678. return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream)
  679. }
  680. return nil
  681. }
  682. return c.AddInterval(req.From, req.To)
  683. }
  684. func (c *client) close() {
  685. select {
  686. case <-c.quit:
  687. default:
  688. close(c.quit)
  689. }
  690. c.Close()
  691. }
  692. // clientParams store parameters for the new client
  693. // between a subscription and initial offered hashes request handling.
  694. type clientParams struct {
  695. priority uint8
  696. to uint64
  697. // signal when the client is created
  698. clientCreatedC chan struct{}
  699. }
  700. func newClientParams(priority uint8, to uint64) *clientParams {
  701. return &clientParams{
  702. priority: priority,
  703. to: to,
  704. clientCreatedC: make(chan struct{}),
  705. }
  706. }
  707. func (c *clientParams) waitClient(ctx context.Context) error {
  708. select {
  709. case <-ctx.Done():
  710. return ctx.Err()
  711. case <-c.clientCreatedC:
  712. return nil
  713. }
  714. }
  715. func (c *clientParams) clientCreated() {
  716. close(c.clientCreatedC)
  717. }
  718. // GetSpec returns the streamer spec to callers
  719. // This used to be a global variable but for simulations with
  720. // multiple nodes its fields (notably the Hook) would be overwritten
  721. func (r *Registry) GetSpec() *protocols.Spec {
  722. return r.spec
  723. }
  724. func (r *Registry) createSpec() {
  725. // Spec is the spec of the streamer protocol
  726. var spec = &protocols.Spec{
  727. Name: "stream",
  728. Version: 8,
  729. MaxMsgSize: 10 * 1024 * 1024,
  730. Messages: []interface{}{
  731. UnsubscribeMsg{},
  732. OfferedHashesMsg{},
  733. WantedHashesMsg{},
  734. TakeoverProofMsg{},
  735. SubscribeMsg{},
  736. RetrieveRequestMsg{},
  737. ChunkDeliveryMsgRetrieval{},
  738. SubscribeErrorMsg{},
  739. RequestSubscriptionMsg{},
  740. QuitMsg{},
  741. ChunkDeliveryMsgSyncing{},
  742. },
  743. }
  744. r.spec = spec
  745. }
  746. // An accountable message needs some meta information attached to it
  747. // in order to evaluate the correct price
  748. type StreamerPrices struct {
  749. priceMatrix map[reflect.Type]*protocols.Price
  750. registry *Registry
  751. }
  752. // Price implements the accounting interface and returns the price for a specific message
  753. func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price {
  754. t := reflect.TypeOf(msg).Elem()
  755. return sp.priceMatrix[t]
  756. }
  757. // Instead of hardcoding the price, get it
  758. // through a function - it could be quite complex in the future
  759. func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 {
  760. return uint64(1)
  761. }
  762. // Instead of hardcoding the price, get it
  763. // through a function - it could be quite complex in the future
  764. func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 {
  765. return uint64(1)
  766. }
  767. // createPriceOracle sets up a matrix which can be queried to get
  768. // the price for a message via the Price method
  769. func (r *Registry) createPriceOracle() {
  770. sp := &StreamerPrices{
  771. registry: r,
  772. }
  773. sp.priceMatrix = map[reflect.Type]*protocols.Price{
  774. reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): {
  775. Value: sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now
  776. PerByte: true,
  777. Payer: protocols.Receiver,
  778. },
  779. reflect.TypeOf(RetrieveRequestMsg{}): {
  780. Value: sp.getRetrieveRequestMsgPrice(), // arbitrary price for now
  781. PerByte: false,
  782. Payer: protocols.Sender,
  783. },
  784. }
  785. r.prices = sp
  786. }
  787. func (r *Registry) Protocols() []p2p.Protocol {
  788. return []p2p.Protocol{
  789. {
  790. Name: r.spec.Name,
  791. Version: r.spec.Version,
  792. Length: r.spec.Length(),
  793. Run: r.runProtocol,
  794. },
  795. }
  796. }
  797. func (r *Registry) APIs() []rpc.API {
  798. return []rpc.API{
  799. {
  800. Namespace: "stream",
  801. Version: "3.0",
  802. Service: r.api,
  803. Public: false,
  804. },
  805. }
  806. }
  807. func (r *Registry) Start(server *p2p.Server) error {
  808. log.Info("Streamer started")
  809. return nil
  810. }
  811. func (r *Registry) Stop() error {
  812. return nil
  813. }
  814. type Range struct {
  815. From, To uint64
  816. }
  817. func NewRange(from, to uint64) *Range {
  818. return &Range{
  819. From: from,
  820. To: to,
  821. }
  822. }
  823. func (r *Range) String() string {
  824. return fmt.Sprintf("%v-%v", r.From, r.To)
  825. }
  826. func getHistoryPriority(priority uint8) uint8 {
  827. if priority == 0 {
  828. return 0
  829. }
  830. return priority - 1
  831. }
  832. func getHistoryStream(s Stream) Stream {
  833. return NewStream(s.Name, s.Key, false)
  834. }
  835. type API struct {
  836. streamer *Registry
  837. }
  838. func NewAPI(r *Registry) *API {
  839. return &API{
  840. streamer: r,
  841. }
  842. }
  843. func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error {
  844. return api.streamer.Subscribe(peerId, s, history, priority)
  845. }
  846. func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
  847. return api.streamer.Unsubscribe(peerId, s)
  848. }
  849. /*
  850. GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
  851. It can be called via RPC.
  852. It returns a map of node IDs with an array of string representations of Stream objects.
  853. */
  854. func (api *API) GetPeerSubscriptions() map[string][]string {
  855. //create the empty map
  856. pstreams := make(map[string][]string)
  857. //iterate all streamer peers
  858. api.streamer.peersMu.RLock()
  859. defer api.streamer.peersMu.RUnlock()
  860. for id, p := range api.streamer.peers {
  861. var streams []string
  862. //every peer has a map of stream servers
  863. //every stream server represents a subscription
  864. p.serverMu.RLock()
  865. for s := range p.servers {
  866. //append the string representation of the stream
  867. //to the list for this peer
  868. streams = append(streams, s.String())
  869. }
  870. p.serverMu.RUnlock()
  871. //set the array of stream servers to the map
  872. pstreams[id.String()] = streams
  873. }
  874. return pstreams
  875. }