peerset.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  23. "github.com/ethereum/go-ethereum/eth/protocols/snap"
  24. "github.com/ethereum/go-ethereum/p2p"
  25. )
  26. var (
  27. // errPeerSetClosed is returned if a peer is attempted to be added or removed
  28. // from the peer set after it has been terminated.
  29. errPeerSetClosed = errors.New("peerset closed")
  30. // errPeerAlreadyRegistered is returned if a peer is attempted to be added
  31. // to the peer set, but one with the same id already exists.
  32. errPeerAlreadyRegistered = errors.New("peer already registered")
  33. // errPeerNotRegistered is returned if a peer is attempted to be removed from
  34. // a peer set, but no peer with the given id exists.
  35. errPeerNotRegistered = errors.New("peer not registered")
  36. // errSnapWithoutEth is returned if a peer attempts to connect only on the
  37. // snap protocol without advertising the eth main protocol.
  38. errSnapWithoutEth = errors.New("peer connected on snap without compatible eth support")
  39. )
  40. // peerSet represents the collection of active peers currently participating in
  41. // the `eth` protocol, with or without the `snap` extension.
  42. type peerSet struct {
  43. peers map[string]*ethPeer // Peers connected on the `eth` protocol
  44. snapPeers int // Number of `snap` compatible peers for connection prioritization
  45. snapWait map[string]chan *snap.Peer // Peers connected on `eth` waiting for their snap extension
  46. snapPend map[string]*snap.Peer // Peers connected on the `snap` protocol, but not yet on `eth`
  47. lock sync.RWMutex
  48. closed bool
  49. }
  50. // newPeerSet creates a new peer set to track the active participants.
  51. func newPeerSet() *peerSet {
  52. return &peerSet{
  53. peers: make(map[string]*ethPeer),
  54. snapWait: make(map[string]chan *snap.Peer),
  55. snapPend: make(map[string]*snap.Peer),
  56. }
  57. }
  58. // registerSnapExtension unblocks an already connected `eth` peer waiting for its
  59. // `snap` extension, or if no such peer exists, tracks the extension for the time
  60. // being until the `eth` main protocol starts looking for it.
  61. func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error {
  62. // Reject the peer if it advertises `snap` without `eth` as `snap` is only a
  63. // satellite protocol meaningful with the chain selection of `eth`
  64. if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
  65. return errSnapWithoutEth
  66. }
  67. // Ensure nobody can double connect
  68. ps.lock.Lock()
  69. defer ps.lock.Unlock()
  70. id := peer.ID()
  71. if _, ok := ps.peers[id]; ok {
  72. return errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  73. }
  74. if _, ok := ps.snapPend[id]; ok {
  75. return errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  76. }
  77. // Inject the peer into an `eth` counterpart is available, otherwise save for later
  78. if wait, ok := ps.snapWait[id]; ok {
  79. delete(ps.snapWait, id)
  80. wait <- peer
  81. return nil
  82. }
  83. ps.snapPend[id] = peer
  84. return nil
  85. }
  86. // waitExtensions blocks until all satellite protocols are connected and tracked
  87. // by the peerset.
  88. func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) {
  89. // If the peer does not support a compatible `snap`, don't wait
  90. if !peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) {
  91. return nil, nil
  92. }
  93. // Ensure nobody can double connect
  94. ps.lock.Lock()
  95. id := peer.ID()
  96. if _, ok := ps.peers[id]; ok {
  97. ps.lock.Unlock()
  98. return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  99. }
  100. if _, ok := ps.snapWait[id]; ok {
  101. ps.lock.Unlock()
  102. return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  103. }
  104. // If `snap` already connected, retrieve the peer from the pending set
  105. if snap, ok := ps.snapPend[id]; ok {
  106. delete(ps.snapPend, id)
  107. ps.lock.Unlock()
  108. return snap, nil
  109. }
  110. // Otherwise wait for `snap` to connect concurrently
  111. wait := make(chan *snap.Peer)
  112. ps.snapWait[id] = wait
  113. ps.lock.Unlock()
  114. return <-wait, nil
  115. }
  116. // registerPeer injects a new `eth` peer into the working set, or returns an error
  117. // if the peer is already known.
  118. func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer) error {
  119. // Start tracking the new peer
  120. ps.lock.Lock()
  121. defer ps.lock.Unlock()
  122. if ps.closed {
  123. return errPeerSetClosed
  124. }
  125. id := peer.ID()
  126. if _, ok := ps.peers[id]; ok {
  127. return errPeerAlreadyRegistered
  128. }
  129. eth := &ethPeer{
  130. Peer: peer,
  131. }
  132. if ext != nil {
  133. eth.snapExt = &snapPeer{ext}
  134. ps.snapPeers++
  135. }
  136. ps.peers[id] = eth
  137. return nil
  138. }
  139. // unregisterPeer removes a remote peer from the active set, disabling any further
  140. // actions to/from that particular entity.
  141. func (ps *peerSet) unregisterPeer(id string) error {
  142. ps.lock.Lock()
  143. defer ps.lock.Unlock()
  144. peer, ok := ps.peers[id]
  145. if !ok {
  146. return errPeerNotRegistered
  147. }
  148. delete(ps.peers, id)
  149. if peer.snapExt != nil {
  150. ps.snapPeers--
  151. }
  152. return nil
  153. }
  154. // peer retrieves the registered peer with the given id.
  155. func (ps *peerSet) peer(id string) *ethPeer {
  156. ps.lock.RLock()
  157. defer ps.lock.RUnlock()
  158. return ps.peers[id]
  159. }
  160. // peersWithoutBlock retrieves a list of peers that do not have a given block in
  161. // their set of known hashes so it might be propagated to them.
  162. func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {
  163. ps.lock.RLock()
  164. defer ps.lock.RUnlock()
  165. list := make([]*ethPeer, 0, len(ps.peers))
  166. for _, p := range ps.peers {
  167. if !p.KnownBlock(hash) {
  168. list = append(list, p)
  169. }
  170. }
  171. return list
  172. }
  173. // peersWithoutTransaction retrieves a list of peers that do not have a given
  174. // transaction in their set of known hashes.
  175. func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
  176. ps.lock.RLock()
  177. defer ps.lock.RUnlock()
  178. list := make([]*ethPeer, 0, len(ps.peers))
  179. for _, p := range ps.peers {
  180. if !p.KnownTransaction(hash) {
  181. list = append(list, p)
  182. }
  183. }
  184. return list
  185. }
  186. // len returns if the current number of `eth` peers in the set. Since the `snap`
  187. // peers are tied to the existence of an `eth` connection, that will always be a
  188. // subset of `eth`.
  189. func (ps *peerSet) len() int {
  190. ps.lock.RLock()
  191. defer ps.lock.RUnlock()
  192. return len(ps.peers)
  193. }
  194. // snapLen returns if the current number of `snap` peers in the set.
  195. func (ps *peerSet) snapLen() int {
  196. ps.lock.RLock()
  197. defer ps.lock.RUnlock()
  198. return ps.snapPeers
  199. }
  200. // peerWithHighestTD retrieves the known peer with the currently highest total
  201. // difficulty, but below the given PoS switchover threshold.
  202. func (ps *peerSet) peerWithHighestTD() *eth.Peer {
  203. ps.lock.RLock()
  204. defer ps.lock.RUnlock()
  205. var (
  206. bestPeer *eth.Peer
  207. bestTd *big.Int
  208. )
  209. for _, p := range ps.peers {
  210. if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  211. bestPeer, bestTd = p.Peer, td
  212. }
  213. }
  214. return bestPeer
  215. }
  216. // close disconnects all peers.
  217. func (ps *peerSet) close() {
  218. ps.lock.Lock()
  219. defer ps.lock.Unlock()
  220. for _, p := range ps.peers {
  221. p.Disconnect(p2p.DiscQuitting)
  222. }
  223. ps.closed = true
  224. }