peerset.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  24. "github.com/ethereum/go-ethereum/eth/protocols/snap"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. )
  28. var (
  29. // errPeerSetClosed is returned if a peer is attempted to be added or removed
  30. // from the peer set after it has been terminated.
  31. errPeerSetClosed = errors.New("peerset closed")
  32. // errPeerAlreadyRegistered is returned if a peer is attempted to be added
  33. // to the peer set, but one with the same id already exists.
  34. errPeerAlreadyRegistered = errors.New("peer already registered")
  35. // errPeerNotRegistered is returned if a peer is attempted to be removed from
  36. // a peer set, but no peer with the given id exists.
  37. errPeerNotRegistered = errors.New("peer not registered")
  38. // ethConnectTimeout is the `snap` timeout for `eth` to connect too.
  39. ethConnectTimeout = 3 * time.Second
  40. )
  41. // peerSet represents the collection of active peers currently participating in
  42. // the `eth` or `snap` protocols.
  43. type peerSet struct {
  44. ethPeers map[string]*ethPeer // Peers connected on the `eth` protocol
  45. snapPeers map[string]*snapPeer // Peers connected on the `snap` protocol
  46. ethJoinFeed event.Feed // Events when an `eth` peer successfully joins
  47. ethDropFeed event.Feed // Events when an `eth` peer gets dropped
  48. snapJoinFeed event.Feed // Events when a `snap` peer joins on both `eth` and `snap`
  49. snapDropFeed event.Feed // Events when a `snap` peer gets dropped (only if fully joined)
  50. scope event.SubscriptionScope // Subscription group to unsubscribe everyone at once
  51. lock sync.RWMutex
  52. closed bool
  53. }
  54. // newPeerSet creates a new peer set to track the active participants.
  55. func newPeerSet() *peerSet {
  56. return &peerSet{
  57. ethPeers: make(map[string]*ethPeer),
  58. snapPeers: make(map[string]*snapPeer),
  59. }
  60. }
  61. // subscribeEthJoin registers a subscription for peers joining (and completing
  62. // the handshake) on the `eth` protocol.
  63. func (ps *peerSet) subscribeEthJoin(ch chan<- *eth.Peer) event.Subscription {
  64. return ps.scope.Track(ps.ethJoinFeed.Subscribe(ch))
  65. }
  66. // subscribeEthDrop registers a subscription for peers being dropped from the
  67. // `eth` protocol.
  68. func (ps *peerSet) subscribeEthDrop(ch chan<- *eth.Peer) event.Subscription {
  69. return ps.scope.Track(ps.ethDropFeed.Subscribe(ch))
  70. }
  71. // subscribeSnapJoin registers a subscription for peers joining (and completing
  72. // the `eth` join) on the `snap` protocol.
  73. func (ps *peerSet) subscribeSnapJoin(ch chan<- *snap.Peer) event.Subscription {
  74. return ps.scope.Track(ps.snapJoinFeed.Subscribe(ch))
  75. }
  76. // subscribeSnapDrop registers a subscription for peers being dropped from the
  77. // `snap` protocol.
  78. func (ps *peerSet) subscribeSnapDrop(ch chan<- *snap.Peer) event.Subscription {
  79. return ps.scope.Track(ps.snapDropFeed.Subscribe(ch))
  80. }
  81. // registerEthPeer injects a new `eth` peer into the working set, or returns an
  82. // error if the peer is already known. The peer is announced on the `eth` join
  83. // feed and if it completes a pending `snap` peer, also on that feed.
  84. func (ps *peerSet) registerEthPeer(peer *eth.Peer) error {
  85. ps.lock.Lock()
  86. if ps.closed {
  87. ps.lock.Unlock()
  88. return errPeerSetClosed
  89. }
  90. id := peer.ID()
  91. if _, ok := ps.ethPeers[id]; ok {
  92. ps.lock.Unlock()
  93. return errPeerAlreadyRegistered
  94. }
  95. ps.ethPeers[id] = &ethPeer{Peer: peer}
  96. snap, ok := ps.snapPeers[id]
  97. ps.lock.Unlock()
  98. if ok {
  99. // Previously dangling `snap` peer, stop it's timer since `eth` connected
  100. snap.lock.Lock()
  101. if snap.ethDrop != nil {
  102. snap.ethDrop.Stop()
  103. snap.ethDrop = nil
  104. }
  105. snap.lock.Unlock()
  106. }
  107. ps.ethJoinFeed.Send(peer)
  108. if ok {
  109. ps.snapJoinFeed.Send(snap.Peer)
  110. }
  111. return nil
  112. }
  113. // unregisterEthPeer removes a remote peer from the active set, disabling any further
  114. // actions to/from that particular entity. The drop is announced on the `eth` drop
  115. // feed and also on the `snap` feed if the eth/snap duality was broken just now.
  116. func (ps *peerSet) unregisterEthPeer(id string) error {
  117. ps.lock.Lock()
  118. eth, ok := ps.ethPeers[id]
  119. if !ok {
  120. ps.lock.Unlock()
  121. return errPeerNotRegistered
  122. }
  123. delete(ps.ethPeers, id)
  124. snap, ok := ps.snapPeers[id]
  125. ps.lock.Unlock()
  126. ps.ethDropFeed.Send(eth)
  127. if ok {
  128. ps.snapDropFeed.Send(snap)
  129. }
  130. return nil
  131. }
  132. // registerSnapPeer injects a new `snap` peer into the working set, or returns
  133. // an error if the peer is already known. The peer is announced on the `snap`
  134. // join feed if it completes an existing `eth` peer.
  135. //
  136. // If the peer isn't yet connected on `eth` and fails to do so within a given
  137. // amount of time, it is dropped. This enforces that `snap` is an extension to
  138. // `eth`, not a standalone leeching protocol.
  139. func (ps *peerSet) registerSnapPeer(peer *snap.Peer) error {
  140. ps.lock.Lock()
  141. if ps.closed {
  142. ps.lock.Unlock()
  143. return errPeerSetClosed
  144. }
  145. id := peer.ID()
  146. if _, ok := ps.snapPeers[id]; ok {
  147. ps.lock.Unlock()
  148. return errPeerAlreadyRegistered
  149. }
  150. ps.snapPeers[id] = &snapPeer{Peer: peer}
  151. _, ok := ps.ethPeers[id]
  152. if !ok {
  153. // Dangling `snap` peer, start a timer to drop if `eth` doesn't connect
  154. ps.snapPeers[id].ethDrop = time.AfterFunc(ethConnectTimeout, func() {
  155. peer.Log().Warn("Snapshot peer missing eth, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
  156. peer.Disconnect(p2p.DiscUselessPeer)
  157. })
  158. }
  159. ps.lock.Unlock()
  160. if ok {
  161. ps.snapJoinFeed.Send(peer)
  162. }
  163. return nil
  164. }
  165. // unregisterSnapPeer removes a remote peer from the active set, disabling any
  166. // further actions to/from that particular entity. The drop is announced on the
  167. // `snap` drop feed.
  168. func (ps *peerSet) unregisterSnapPeer(id string) error {
  169. ps.lock.Lock()
  170. peer, ok := ps.snapPeers[id]
  171. if !ok {
  172. ps.lock.Unlock()
  173. return errPeerNotRegistered
  174. }
  175. delete(ps.snapPeers, id)
  176. ps.lock.Unlock()
  177. peer.lock.Lock()
  178. if peer.ethDrop != nil {
  179. peer.ethDrop.Stop()
  180. peer.ethDrop = nil
  181. }
  182. peer.lock.Unlock()
  183. ps.snapDropFeed.Send(peer)
  184. return nil
  185. }
  186. // ethPeer retrieves the registered `eth` peer with the given id.
  187. func (ps *peerSet) ethPeer(id string) *ethPeer {
  188. ps.lock.RLock()
  189. defer ps.lock.RUnlock()
  190. return ps.ethPeers[id]
  191. }
  192. // snapPeer retrieves the registered `snap` peer with the given id.
  193. func (ps *peerSet) snapPeer(id string) *snapPeer {
  194. ps.lock.RLock()
  195. defer ps.lock.RUnlock()
  196. return ps.snapPeers[id]
  197. }
  198. // ethPeersWithoutBlock retrieves a list of `eth` peers that do not have a given
  199. // block in their set of known hashes so it might be propagated to them.
  200. func (ps *peerSet) ethPeersWithoutBlock(hash common.Hash) []*ethPeer {
  201. ps.lock.RLock()
  202. defer ps.lock.RUnlock()
  203. list := make([]*ethPeer, 0, len(ps.ethPeers))
  204. for _, p := range ps.ethPeers {
  205. if !p.KnownBlock(hash) {
  206. list = append(list, p)
  207. }
  208. }
  209. return list
  210. }
  211. // ethPeersWithoutTransaction retrieves a list of `eth` peers that do not have a
  212. // given transaction in their set of known hashes.
  213. func (ps *peerSet) ethPeersWithoutTransaction(hash common.Hash) []*ethPeer {
  214. ps.lock.RLock()
  215. defer ps.lock.RUnlock()
  216. list := make([]*ethPeer, 0, len(ps.ethPeers))
  217. for _, p := range ps.ethPeers {
  218. if !p.KnownTransaction(hash) {
  219. list = append(list, p)
  220. }
  221. }
  222. return list
  223. }
  224. // Len returns if the current number of `eth` peers in the set. Since the `snap`
  225. // peers are tied to the existence of an `eth` connection, that will always be a
  226. // subset of `eth`.
  227. func (ps *peerSet) Len() int {
  228. ps.lock.RLock()
  229. defer ps.lock.RUnlock()
  230. return len(ps.ethPeers)
  231. }
  232. // SnapLen returns if the current number of `snap` peers in the set. Since the `snap`
  233. // peers are tied to the existence of an `eth` connection, that will always be a
  234. // subset of `eth`.
  235. func (ps *peerSet) SnapLen() int {
  236. ps.lock.RLock()
  237. defer ps.lock.RUnlock()
  238. return len(ps.snapPeers)
  239. }
  240. // ethPeerWithHighestTD retrieves the known peer with the currently highest total
  241. // difficulty.
  242. func (ps *peerSet) ethPeerWithHighestTD() *eth.Peer {
  243. ps.lock.RLock()
  244. defer ps.lock.RUnlock()
  245. var (
  246. bestPeer *eth.Peer
  247. bestTd *big.Int
  248. )
  249. for _, p := range ps.ethPeers {
  250. if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  251. bestPeer, bestTd = p.Peer, td
  252. }
  253. }
  254. return bestPeer
  255. }
  256. // close disconnects all peers.
  257. func (ps *peerSet) close() {
  258. ps.lock.Lock()
  259. defer ps.lock.Unlock()
  260. for _, p := range ps.ethPeers {
  261. p.Disconnect(p2p.DiscQuitting)
  262. }
  263. for _, p := range ps.snapPeers {
  264. p.Disconnect(p2p.DiscQuitting)
  265. }
  266. ps.closed = true
  267. }