peer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package stream
  17. import (
  18. "context"
  19. "errors"
  20. "fmt"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/metrics"
  24. "github.com/ethereum/go-ethereum/p2p/protocols"
  25. "github.com/ethereum/go-ethereum/swarm/log"
  26. pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
  27. "github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
  28. "github.com/ethereum/go-ethereum/swarm/spancontext"
  29. "github.com/ethereum/go-ethereum/swarm/state"
  30. "github.com/ethereum/go-ethereum/swarm/storage"
  31. "github.com/ethereum/go-ethereum/swarm/tracing"
  32. opentracing "github.com/opentracing/opentracing-go"
  33. )
  34. type notFoundError struct {
  35. t string
  36. s Stream
  37. }
  38. func newNotFoundError(t string, s Stream) *notFoundError {
  39. return &notFoundError{t: t, s: s}
  40. }
  41. func (e *notFoundError) Error() string {
  42. return fmt.Sprintf("%s not found for stream %q", e.t, e.s)
  43. }
  44. // ErrMaxPeerServers will be returned if peer server limit is reached.
  45. // It will be sent in the SubscribeErrorMsg.
  46. var ErrMaxPeerServers = errors.New("max peer servers")
  47. // Peer is the Peer extension for the streaming protocol
  48. type Peer struct {
  49. *protocols.Peer
  50. streamer *Registry
  51. pq *pq.PriorityQueue
  52. serverMu sync.RWMutex
  53. clientMu sync.RWMutex // protects both clients and clientParams
  54. servers map[Stream]*server
  55. clients map[Stream]*client
  56. // clientParams map keeps required client arguments
  57. // that are set on Registry.Subscribe and used
  58. // on creating a new client in offered hashes handler.
  59. clientParams map[Stream]*clientParams
  60. quit chan struct{}
  61. }
  62. type WrappedPriorityMsg struct {
  63. Context context.Context
  64. Msg interface{}
  65. }
  66. // NewPeer is the constructor for Peer
  67. func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
  68. p := &Peer{
  69. Peer: peer,
  70. pq: pq.New(int(PriorityQueue), PriorityQueueCap),
  71. streamer: streamer,
  72. servers: make(map[Stream]*server),
  73. clients: make(map[Stream]*client),
  74. clientParams: make(map[Stream]*clientParams),
  75. quit: make(chan struct{}),
  76. }
  77. ctx, cancel := context.WithCancel(context.Background())
  78. go p.pq.Run(ctx, func(i interface{}) {
  79. wmsg := i.(WrappedPriorityMsg)
  80. err := p.Send(wmsg.Context, wmsg.Msg)
  81. if err != nil {
  82. log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
  83. p.Drop()
  84. }
  85. })
  86. // basic monitoring for pq contention
  87. go func(pq *pq.PriorityQueue) {
  88. ticker := time.NewTicker(5 * time.Second)
  89. defer ticker.Stop()
  90. for {
  91. select {
  92. case <-ticker.C:
  93. var lenMaxi int
  94. var capMaxi int
  95. for k := range pq.Queues {
  96. if lenMaxi < len(pq.Queues[k]) {
  97. lenMaxi = len(pq.Queues[k])
  98. }
  99. if capMaxi < cap(pq.Queues[k]) {
  100. capMaxi = cap(pq.Queues[k])
  101. }
  102. }
  103. metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(lenMaxi))
  104. metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(capMaxi))
  105. case <-p.quit:
  106. return
  107. }
  108. }
  109. }(p.pq)
  110. go func() {
  111. <-p.quit
  112. cancel()
  113. }()
  114. return p
  115. }
  116. // Deliver sends a storeRequestMsg protocol message to the peer
  117. // Depending on the `syncing` parameter we send different message types
  118. func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error {
  119. var msg interface{}
  120. metrics.GetOrRegisterCounter("peer.deliver", nil).Inc(1)
  121. //we send different types of messages if delivery is for syncing or retrievals,
  122. //even if handling and content of the message are the same,
  123. //because swap accounting decides which messages need accounting based on the message type
  124. if syncing {
  125. msg = &ChunkDeliveryMsgSyncing{
  126. Addr: chunk.Address(),
  127. SData: chunk.Data(),
  128. }
  129. } else {
  130. msg = &ChunkDeliveryMsgRetrieval{
  131. Addr: chunk.Address(),
  132. SData: chunk.Data(),
  133. }
  134. }
  135. return p.SendPriority(ctx, msg, priority)
  136. }
  137. // SendPriority sends message to the peer using the outgoing priority queue
  138. func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
  139. defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
  140. ctx = tracing.StartSaveSpan(ctx)
  141. metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
  142. wmsg := WrappedPriorityMsg{
  143. Context: ctx,
  144. Msg: msg,
  145. }
  146. err := p.pq.Push(wmsg, int(priority))
  147. if err != nil {
  148. log.Error("err on p.pq.Push", "err", err, "peer", p.ID())
  149. }
  150. return err
  151. }
  152. // SendOfferedHashes sends OfferedHashesMsg protocol msg
  153. func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
  154. var sp opentracing.Span
  155. ctx, sp := spancontext.StartSpan(
  156. context.TODO(),
  157. "send.offered.hashes",
  158. )
  159. defer sp.Finish()
  160. defer metrics.GetOrRegisterResettingTimer("send.offered.hashes", nil).UpdateSince(time.Now())
  161. hashes, from, to, proof, err := s.setNextBatch(f, t)
  162. if err != nil {
  163. return err
  164. }
  165. // true only when quitting
  166. if len(hashes) == 0 {
  167. return nil
  168. }
  169. if proof == nil {
  170. proof = &HandoverProof{
  171. Handover: &Handover{},
  172. }
  173. }
  174. s.currentBatch = hashes
  175. msg := &OfferedHashesMsg{
  176. HandoverProof: proof,
  177. Hashes: hashes,
  178. From: from,
  179. To: to,
  180. Stream: s.stream,
  181. }
  182. log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
  183. ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
  184. return p.SendPriority(ctx, msg, s.priority)
  185. }
  186. func (p *Peer) getServer(s Stream) (*server, error) {
  187. p.serverMu.RLock()
  188. defer p.serverMu.RUnlock()
  189. server := p.servers[s]
  190. if server == nil {
  191. return nil, newNotFoundError("server", s)
  192. }
  193. return server, nil
  194. }
  195. func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
  196. p.serverMu.Lock()
  197. defer p.serverMu.Unlock()
  198. if p.servers[s] != nil {
  199. return nil, fmt.Errorf("server %s already registered", s)
  200. }
  201. if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers {
  202. return nil, ErrMaxPeerServers
  203. }
  204. sessionIndex, err := o.SessionIndex()
  205. if err != nil {
  206. return nil, err
  207. }
  208. os := &server{
  209. Server: o,
  210. stream: s,
  211. priority: priority,
  212. sessionIndex: sessionIndex,
  213. }
  214. p.servers[s] = os
  215. return os, nil
  216. }
  217. func (p *Peer) removeServer(s Stream) error {
  218. p.serverMu.Lock()
  219. defer p.serverMu.Unlock()
  220. server, ok := p.servers[s]
  221. if !ok {
  222. return newNotFoundError("server", s)
  223. }
  224. server.Close()
  225. delete(p.servers, s)
  226. return nil
  227. }
  228. func (p *Peer) getClient(ctx context.Context, s Stream) (c *client, err error) {
  229. var params *clientParams
  230. func() {
  231. p.clientMu.RLock()
  232. defer p.clientMu.RUnlock()
  233. c = p.clients[s]
  234. if c != nil {
  235. return
  236. }
  237. params = p.clientParams[s]
  238. }()
  239. if c != nil {
  240. return c, nil
  241. }
  242. if params != nil {
  243. //debug.PrintStack()
  244. if err := params.waitClient(ctx); err != nil {
  245. return nil, err
  246. }
  247. }
  248. p.clientMu.RLock()
  249. defer p.clientMu.RUnlock()
  250. c = p.clients[s]
  251. if c != nil {
  252. return c, nil
  253. }
  254. return nil, newNotFoundError("client", s)
  255. }
  256. func (p *Peer) getOrSetClient(s Stream, from, to uint64) (c *client, created bool, err error) {
  257. p.clientMu.Lock()
  258. defer p.clientMu.Unlock()
  259. c = p.clients[s]
  260. if c != nil {
  261. return c, false, nil
  262. }
  263. f, err := p.streamer.GetClientFunc(s.Name)
  264. if err != nil {
  265. return nil, false, err
  266. }
  267. is, err := f(p, s.Key, s.Live)
  268. if err != nil {
  269. return nil, false, err
  270. }
  271. cp, err := p.getClientParams(s)
  272. if err != nil {
  273. return nil, false, err
  274. }
  275. defer func() {
  276. if err == nil {
  277. if err := p.removeClientParams(s); err != nil {
  278. log.Error("stream set client: remove client params", "stream", s, "peer", p, "err", err)
  279. }
  280. }
  281. }()
  282. intervalsKey := peerStreamIntervalsKey(p, s)
  283. if s.Live {
  284. // try to find previous history and live intervals and merge live into history
  285. historyKey := peerStreamIntervalsKey(p, NewStream(s.Name, s.Key, false))
  286. historyIntervals := &intervals.Intervals{}
  287. err := p.streamer.intervalsStore.Get(historyKey, historyIntervals)
  288. switch err {
  289. case nil:
  290. liveIntervals := &intervals.Intervals{}
  291. err := p.streamer.intervalsStore.Get(intervalsKey, liveIntervals)
  292. switch err {
  293. case nil:
  294. historyIntervals.Merge(liveIntervals)
  295. if err := p.streamer.intervalsStore.Put(historyKey, historyIntervals); err != nil {
  296. log.Error("stream set client: put history intervals", "stream", s, "peer", p, "err", err)
  297. }
  298. case state.ErrNotFound:
  299. default:
  300. log.Error("stream set client: get live intervals", "stream", s, "peer", p, "err", err)
  301. }
  302. case state.ErrNotFound:
  303. default:
  304. log.Error("stream set client: get history intervals", "stream", s, "peer", p, "err", err)
  305. }
  306. }
  307. if err := p.streamer.intervalsStore.Put(intervalsKey, intervals.NewIntervals(from)); err != nil {
  308. return nil, false, err
  309. }
  310. next := make(chan error, 1)
  311. c = &client{
  312. Client: is,
  313. stream: s,
  314. priority: cp.priority,
  315. to: cp.to,
  316. next: next,
  317. quit: make(chan struct{}),
  318. intervalsStore: p.streamer.intervalsStore,
  319. intervalsKey: intervalsKey,
  320. }
  321. p.clients[s] = c
  322. cp.clientCreated() // unblock all possible getClient calls that are waiting
  323. next <- nil // this is to allow wantedKeysMsg before first batch arrives
  324. return c, true, nil
  325. }
  326. func (p *Peer) removeClient(s Stream) error {
  327. p.clientMu.Lock()
  328. defer p.clientMu.Unlock()
  329. client, ok := p.clients[s]
  330. if !ok {
  331. return newNotFoundError("client", s)
  332. }
  333. client.close()
  334. delete(p.clients, s)
  335. return nil
  336. }
  337. func (p *Peer) setClientParams(s Stream, params *clientParams) error {
  338. p.clientMu.Lock()
  339. defer p.clientMu.Unlock()
  340. if p.clients[s] != nil {
  341. return fmt.Errorf("client %s already exists", s)
  342. }
  343. if p.clientParams[s] != nil {
  344. return fmt.Errorf("client params %s already set", s)
  345. }
  346. p.clientParams[s] = params
  347. return nil
  348. }
  349. func (p *Peer) getClientParams(s Stream) (*clientParams, error) {
  350. params := p.clientParams[s]
  351. if params == nil {
  352. return nil, fmt.Errorf("client params '%v' not provided to peer %v", s, p.ID())
  353. }
  354. return params, nil
  355. }
  356. func (p *Peer) removeClientParams(s Stream) error {
  357. _, ok := p.clientParams[s]
  358. if !ok {
  359. return newNotFoundError("client params", s)
  360. }
  361. delete(p.clientParams, s)
  362. return nil
  363. }
  364. func (p *Peer) close() {
  365. for _, s := range p.servers {
  366. s.Close()
  367. }
  368. }