peer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/metrics"
  24. "github.com/ethereum/go-ethereum/p2p/protocols"
  25. "github.com/ethereum/go-ethereum/swarm/log"
  26. pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
  27. "github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
  28. "github.com/ethereum/go-ethereum/swarm/spancontext"
  29. "github.com/ethereum/go-ethereum/swarm/state"
  30. "github.com/ethereum/go-ethereum/swarm/storage"
  31. "github.com/ethereum/go-ethereum/swarm/tracing"
  32. opentracing "github.com/opentracing/opentracing-go"
  33. )
  34. type notFoundError struct {
  35. t string
  36. s Stream
  37. }
  38. func newNotFoundError(t string, s Stream) *notFoundError {
  39. return &notFoundError{t: t, s: s}
  40. }
  41. func (e *notFoundError) Error() string {
  42. return fmt.Sprintf("%s not found for stream %q", e.t, e.s)
  43. }
  44. // ErrMaxPeerServers will be returned if peer server limit is reached.
  45. // It will be sent in the SubscribeErrorMsg.
  46. var ErrMaxPeerServers = errors.New("max peer servers")
  47. // Peer is the Peer extension for the streaming protocol
  48. type Peer struct {
  49. *protocols.Peer
  50. streamer *Registry
  51. pq *pq.PriorityQueue
  52. serverMu sync.RWMutex
  53. clientMu sync.RWMutex // protects both clients and clientParams
  54. servers map[Stream]*server
  55. clients map[Stream]*client
  56. // clientParams map keeps required client arguments
  57. // that are set on Registry.Subscribe and used
  58. // on creating a new client in offered hashes handler.
  59. clientParams map[Stream]*clientParams
  60. quit chan struct{}
  61. }
  62. type WrappedPriorityMsg struct {
  63. Context context.Context
  64. Msg interface{}
  65. }
  66. // NewPeer is the constructor for Peer
  67. func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
  68. p := &Peer{
  69. Peer: peer,
  70. pq: pq.New(int(PriorityQueue), PriorityQueueCap),
  71. streamer: streamer,
  72. servers: make(map[Stream]*server),
  73. clients: make(map[Stream]*client),
  74. clientParams: make(map[Stream]*clientParams),
  75. quit: make(chan struct{}),
  76. }
  77. ctx, cancel := context.WithCancel(context.Background())
  78. go p.pq.Run(ctx, func(i interface{}) {
  79. wmsg := i.(WrappedPriorityMsg)
  80. err := p.Send(wmsg.Context, wmsg.Msg)
  81. if err != nil {
  82. log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
  83. p.Drop(err)
  84. }
  85. })
  86. // basic monitoring for pq contention
  87. go func(pq *pq.PriorityQueue) {
  88. ticker := time.NewTicker(5 * time.Second)
  89. defer ticker.Stop()
  90. for {
  91. select {
  92. case <-ticker.C:
  93. var len_maxi int
  94. var cap_maxi int
  95. for k := range pq.Queues {
  96. if len_maxi < len(pq.Queues[k]) {
  97. len_maxi = len(pq.Queues[k])
  98. }
  99. if cap_maxi < cap(pq.Queues[k]) {
  100. cap_maxi = cap(pq.Queues[k])
  101. }
  102. }
  103. metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi))
  104. metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi))
  105. case <-p.quit:
  106. return
  107. }
  108. }
  109. }(p.pq)
  110. go func() {
  111. <-p.quit
  112. cancel()
  113. }()
  114. return p
  115. }
  116. // Deliver sends a storeRequestMsg protocol message to the peer
  117. // Depending on the `syncing` parameter we send different message types
  118. func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
  119. var msg interface{}
  120. spanName := "send.chunk.delivery"
  121. //we send different types of messages if delivery is for syncing or retrievals,
  122. //even if handling and content of the message are the same,
  123. //because swap accounting decides which messages need accounting based on the message type
  124. if syncing {
  125. msg = &ChunkDeliveryMsgSyncing{
  126. Addr: chunk.Address(),
  127. SData: chunk.Data(),
  128. }
  129. spanName += ".syncing"
  130. } else {
  131. msg = &ChunkDeliveryMsgRetrieval{
  132. Addr: chunk.Address(),
  133. SData: chunk.Data(),
  134. }
  135. spanName += ".retrieval"
  136. }
  137. ctx = context.WithValue(ctx, "stream_send_tag", nil)
  138. return p.SendPriority(ctx, msg, priority)
  139. }
  140. // SendPriority sends message to the peer using the outgoing priority queue
  141. func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
  142. defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
  143. tracing.StartSaveSpan(ctx)
  144. metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
  145. wmsg := WrappedPriorityMsg{
  146. Context: ctx,
  147. Msg: msg,
  148. }
  149. err := p.pq.Push(wmsg, int(priority))
  150. if err == pq.ErrContention {
  151. log.Warn("dropping peer on priority queue contention", "peer", p.ID())
  152. p.Drop(err)
  153. }
  154. return err
  155. }
  156. // SendOfferedHashes sends OfferedHashesMsg protocol msg
  157. func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
  158. var sp opentracing.Span
  159. ctx, sp := spancontext.StartSpan(
  160. context.TODO(),
  161. "send.offered.hashes",
  162. )
  163. defer sp.Finish()
  164. hashes, from, to, proof, err := s.setNextBatch(f, t)
  165. if err != nil {
  166. return err
  167. }
  168. // true only when quitting
  169. if len(hashes) == 0 {
  170. return nil
  171. }
  172. if proof == nil {
  173. proof = &HandoverProof{
  174. Handover: &Handover{},
  175. }
  176. }
  177. s.currentBatch = hashes
  178. msg := &OfferedHashesMsg{
  179. HandoverProof: proof,
  180. Hashes: hashes,
  181. From: from,
  182. To: to,
  183. Stream: s.stream,
  184. }
  185. log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
  186. ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
  187. return p.SendPriority(ctx, msg, s.priority)
  188. }
  189. func (p *Peer) getServer(s Stream) (*server, error) {
  190. p.serverMu.RLock()
  191. defer p.serverMu.RUnlock()
  192. server := p.servers[s]
  193. if server == nil {
  194. return nil, newNotFoundError("server", s)
  195. }
  196. return server, nil
  197. }
  198. func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
  199. p.serverMu.Lock()
  200. defer p.serverMu.Unlock()
  201. if p.servers[s] != nil {
  202. return nil, fmt.Errorf("server %s already registered", s)
  203. }
  204. if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers {
  205. return nil, ErrMaxPeerServers
  206. }
  207. sessionIndex, err := o.SessionIndex()
  208. if err != nil {
  209. return nil, err
  210. }
  211. os := &server{
  212. Server: o,
  213. stream: s,
  214. priority: priority,
  215. sessionIndex: sessionIndex,
  216. }
  217. p.servers[s] = os
  218. return os, nil
  219. }
  220. func (p *Peer) removeServer(s Stream) error {
  221. p.serverMu.Lock()
  222. defer p.serverMu.Unlock()
  223. server, ok := p.servers[s]
  224. if !ok {
  225. return newNotFoundError("server", s)
  226. }
  227. server.Close()
  228. delete(p.servers, s)
  229. return nil
  230. }
  231. func (p *Peer) getClient(ctx context.Context, s Stream) (c *client, err error) {
  232. var params *clientParams
  233. func() {
  234. p.clientMu.RLock()
  235. defer p.clientMu.RUnlock()
  236. c = p.clients[s]
  237. if c != nil {
  238. return
  239. }
  240. params = p.clientParams[s]
  241. }()
  242. if c != nil {
  243. return c, nil
  244. }
  245. if params != nil {
  246. //debug.PrintStack()
  247. if err := params.waitClient(ctx); err != nil {
  248. return nil, err
  249. }
  250. }
  251. p.clientMu.RLock()
  252. defer p.clientMu.RUnlock()
  253. c = p.clients[s]
  254. if c != nil {
  255. return c, nil
  256. }
  257. return nil, newNotFoundError("client", s)
  258. }
  259. func (p *Peer) getOrSetClient(s Stream, from, to uint64) (c *client, created bool, err error) {
  260. p.clientMu.Lock()
  261. defer p.clientMu.Unlock()
  262. c = p.clients[s]
  263. if c != nil {
  264. return c, false, nil
  265. }
  266. f, err := p.streamer.GetClientFunc(s.Name)
  267. if err != nil {
  268. return nil, false, err
  269. }
  270. is, err := f(p, s.Key, s.Live)
  271. if err != nil {
  272. return nil, false, err
  273. }
  274. cp, err := p.getClientParams(s)
  275. if err != nil {
  276. return nil, false, err
  277. }
  278. defer func() {
  279. if err == nil {
  280. if err := p.removeClientParams(s); err != nil {
  281. log.Error("stream set client: remove client params", "stream", s, "peer", p, "err", err)
  282. }
  283. }
  284. }()
  285. intervalsKey := peerStreamIntervalsKey(p, s)
  286. if s.Live {
  287. // try to find previous history and live intervals and merge live into history
  288. historyKey := peerStreamIntervalsKey(p, NewStream(s.Name, s.Key, false))
  289. historyIntervals := &intervals.Intervals{}
  290. err := p.streamer.intervalsStore.Get(historyKey, historyIntervals)
  291. switch err {
  292. case nil:
  293. liveIntervals := &intervals.Intervals{}
  294. err := p.streamer.intervalsStore.Get(intervalsKey, liveIntervals)
  295. switch err {
  296. case nil:
  297. historyIntervals.Merge(liveIntervals)
  298. if err := p.streamer.intervalsStore.Put(historyKey, historyIntervals); err != nil {
  299. log.Error("stream set client: put history intervals", "stream", s, "peer", p, "err", err)
  300. }
  301. case state.ErrNotFound:
  302. default:
  303. log.Error("stream set client: get live intervals", "stream", s, "peer", p, "err", err)
  304. }
  305. case state.ErrNotFound:
  306. default:
  307. log.Error("stream set client: get history intervals", "stream", s, "peer", p, "err", err)
  308. }
  309. }
  310. if err := p.streamer.intervalsStore.Put(intervalsKey, intervals.NewIntervals(from)); err != nil {
  311. return nil, false, err
  312. }
  313. next := make(chan error, 1)
  314. c = &client{
  315. Client: is,
  316. stream: s,
  317. priority: cp.priority,
  318. to: cp.to,
  319. next: next,
  320. quit: make(chan struct{}),
  321. intervalsStore: p.streamer.intervalsStore,
  322. intervalsKey: intervalsKey,
  323. }
  324. p.clients[s] = c
  325. cp.clientCreated() // unblock all possible getClient calls that are waiting
  326. next <- nil // this is to allow wantedKeysMsg before first batch arrives
  327. return c, true, nil
  328. }
  329. func (p *Peer) removeClient(s Stream) error {
  330. p.clientMu.Lock()
  331. defer p.clientMu.Unlock()
  332. client, ok := p.clients[s]
  333. if !ok {
  334. return newNotFoundError("client", s)
  335. }
  336. client.close()
  337. delete(p.clients, s)
  338. return nil
  339. }
  340. func (p *Peer) setClientParams(s Stream, params *clientParams) error {
  341. p.clientMu.Lock()
  342. defer p.clientMu.Unlock()
  343. if p.clients[s] != nil {
  344. return fmt.Errorf("client %s already exists", s)
  345. }
  346. if p.clientParams[s] != nil {
  347. return fmt.Errorf("client params %s already set", s)
  348. }
  349. p.clientParams[s] = params
  350. return nil
  351. }
  352. func (p *Peer) getClientParams(s Stream) (*clientParams, error) {
  353. params := p.clientParams[s]
  354. if params == nil {
  355. return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.ID())
  356. }
  357. return params, nil
  358. }
  359. func (p *Peer) removeClientParams(s Stream) error {
  360. _, ok := p.clientParams[s]
  361. if !ok {
  362. return newNotFoundError("client params", s)
  363. }
  364. delete(p.clientParams, s)
  365. return nil
  366. }
  367. func (p *Peer) close() {
  368. for _, s := range p.servers {
  369. s.Close()
  370. }
  371. }