peerset.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/eth/downloader"
  23. "github.com/ethereum/go-ethereum/eth/protocols/diff"
  24. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  25. "github.com/ethereum/go-ethereum/eth/protocols/snap"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. )
  28. var (
  29. // errPeerSetClosed is returned if a peer is attempted to be added or removed
  30. // from the peer set after it has been terminated.
  31. errPeerSetClosed = errors.New("peerset closed")
  32. // errPeerAlreadyRegistered is returned if a peer is attempted to be added
  33. // to the peer set, but one with the same id already exists.
  34. errPeerAlreadyRegistered = errors.New("peer already registered")
  35. // errPeerNotRegistered is returned if a peer is attempted to be removed from
  36. // a peer set, but no peer with the given id exists.
  37. errPeerNotRegistered = errors.New("peer not registered")
  38. // errSnapWithoutEth is returned if a peer attempts to connect only on the
  39. // snap protocol without advertizing the eth main protocol.
  40. errSnapWithoutEth = errors.New("peer connected on snap without compatible eth support")
  41. // errDiffWithoutEth is returned if a peer attempts to connect only on the
  42. // diff protocol without advertizing the eth main protocol.
  43. errDiffWithoutEth = errors.New("peer connected on diff without compatible eth support")
  44. )
  45. // peerSet represents the collection of active peers currently participating in
  46. // the `eth` protocol, with or without the `snap` extension.
  47. type peerSet struct {
  48. peers map[string]*ethPeer // Peers connected on the `eth` protocol
  49. snapPeers int // Number of `snap` compatible peers for connection prioritization
  50. snapWait map[string]chan *snap.Peer // Peers connected on `eth` waiting for their snap extension
  51. snapPend map[string]*snap.Peer // Peers connected on the `snap` protocol, but not yet on `eth`
  52. diffWait map[string]chan *diff.Peer // Peers connected on `eth` waiting for their diff extension
  53. diffPend map[string]*diff.Peer // Peers connected on the `diff` protocol, but not yet on `eth`
  54. lock sync.RWMutex
  55. closed bool
  56. }
  57. // newPeerSet creates a new peer set to track the active participants.
  58. func newPeerSet() *peerSet {
  59. return &peerSet{
  60. peers: make(map[string]*ethPeer),
  61. snapWait: make(map[string]chan *snap.Peer),
  62. snapPend: make(map[string]*snap.Peer),
  63. diffWait: make(map[string]chan *diff.Peer),
  64. diffPend: make(map[string]*diff.Peer),
  65. }
  66. }
  67. // registerSnapExtension unblocks an already connected `eth` peer waiting for its
  68. // `snap` extension, or if no such peer exists, tracks the extension for the time
  69. // being until the `eth` main protocol starts looking for it.
  70. func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error {
  71. // Reject the peer if it advertises `snap` without `eth` as `snap` is only a
  72. // satellite protocol meaningful with the chain selection of `eth`
  73. if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
  74. return errSnapWithoutEth
  75. }
  76. // Ensure nobody can double connect
  77. ps.lock.Lock()
  78. defer ps.lock.Unlock()
  79. id := peer.ID()
  80. if _, ok := ps.peers[id]; ok {
  81. return errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  82. }
  83. if _, ok := ps.snapPend[id]; ok {
  84. return errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  85. }
  86. // Inject the peer into an `eth` counterpart is available, otherwise save for later
  87. if wait, ok := ps.snapWait[id]; ok {
  88. delete(ps.snapWait, id)
  89. wait <- peer
  90. return nil
  91. }
  92. ps.snapPend[id] = peer
  93. return nil
  94. }
  95. // registerDiffExtension unblocks an already connected `eth` peer waiting for its
  96. // `diff` extension, or if no such peer exists, tracks the extension for the time
  97. // being until the `eth` main protocol starts looking for it.
  98. func (ps *peerSet) registerDiffExtension(peer *diff.Peer) error {
  99. // Reject the peer if it advertises `diff` without `eth` as `diff` is only a
  100. // satellite protocol meaningful with the chain selection of `eth`
  101. if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
  102. return errDiffWithoutEth
  103. }
  104. // Ensure nobody can double connect
  105. ps.lock.Lock()
  106. defer ps.lock.Unlock()
  107. id := peer.ID()
  108. if _, ok := ps.peers[id]; ok {
  109. return errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  110. }
  111. if _, ok := ps.diffPend[id]; ok {
  112. return errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  113. }
  114. // Inject the peer into an `eth` counterpart is available, otherwise save for later
  115. if wait, ok := ps.diffWait[id]; ok {
  116. delete(ps.diffWait, id)
  117. wait <- peer
  118. return nil
  119. }
  120. ps.diffPend[id] = peer
  121. return nil
  122. }
  123. // waitExtensions blocks until all satellite protocols are connected and tracked
  124. // by the peerset.
  125. func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) {
  126. // If the peer does not support a compatible `snap`, don't wait
  127. if !peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) {
  128. return nil, nil
  129. }
  130. // Ensure nobody can double connect
  131. ps.lock.Lock()
  132. id := peer.ID()
  133. if _, ok := ps.peers[id]; ok {
  134. ps.lock.Unlock()
  135. return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  136. }
  137. if _, ok := ps.snapWait[id]; ok {
  138. ps.lock.Unlock()
  139. return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  140. }
  141. // If `snap` already connected, retrieve the peer from the pending set
  142. if snap, ok := ps.snapPend[id]; ok {
  143. delete(ps.snapPend, id)
  144. ps.lock.Unlock()
  145. return snap, nil
  146. }
  147. // Otherwise wait for `snap` to connect concurrently
  148. wait := make(chan *snap.Peer)
  149. ps.snapWait[id] = wait
  150. ps.lock.Unlock()
  151. return <-wait, nil
  152. }
  153. // waitDiffExtension blocks until all satellite protocols are connected and tracked
  154. // by the peerset.
  155. func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) {
  156. // If the peer does not support a compatible `diff`, don't wait
  157. if !peer.RunningCap(diff.ProtocolName, diff.ProtocolVersions) {
  158. return nil, nil
  159. }
  160. // Ensure nobody can double connect
  161. ps.lock.Lock()
  162. id := peer.ID()
  163. if _, ok := ps.peers[id]; ok {
  164. ps.lock.Unlock()
  165. return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  166. }
  167. if _, ok := ps.diffWait[id]; ok {
  168. ps.lock.Unlock()
  169. return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  170. }
  171. // If `diff` already connected, retrieve the peer from the pending set
  172. if diff, ok := ps.diffPend[id]; ok {
  173. delete(ps.diffPend, id)
  174. ps.lock.Unlock()
  175. return diff, nil
  176. }
  177. // Otherwise wait for `diff` to connect concurrently
  178. wait := make(chan *diff.Peer)
  179. ps.diffWait[id] = wait
  180. ps.lock.Unlock()
  181. return <-wait, nil
  182. }
  183. func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer {
  184. if p := ps.peer(pid); p != nil && p.diffExt != nil {
  185. return p.diffExt
  186. }
  187. return nil
  188. }
  189. // registerPeer injects a new `eth` peer into the working set, or returns an error
  190. // if the peer is already known.
  191. func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error {
  192. // Start tracking the new peer
  193. ps.lock.Lock()
  194. defer ps.lock.Unlock()
  195. if ps.closed {
  196. return errPeerSetClosed
  197. }
  198. id := peer.ID()
  199. if _, ok := ps.peers[id]; ok {
  200. return errPeerAlreadyRegistered
  201. }
  202. eth := &ethPeer{
  203. Peer: peer,
  204. }
  205. if ext != nil {
  206. eth.snapExt = &snapPeer{ext}
  207. ps.snapPeers++
  208. }
  209. if diffExt != nil {
  210. eth.diffExt = &diffPeer{diffExt}
  211. }
  212. ps.peers[id] = eth
  213. return nil
  214. }
  215. // unregisterPeer removes a remote peer from the active set, disabling any further
  216. // actions to/from that particular entity.
  217. func (ps *peerSet) unregisterPeer(id string) error {
  218. ps.lock.Lock()
  219. defer ps.lock.Unlock()
  220. peer, ok := ps.peers[id]
  221. if !ok {
  222. return errPeerNotRegistered
  223. }
  224. delete(ps.peers, id)
  225. if peer.snapExt != nil {
  226. ps.snapPeers--
  227. }
  228. return nil
  229. }
  230. // peer retrieves the registered peer with the given id.
  231. func (ps *peerSet) peer(id string) *ethPeer {
  232. ps.lock.RLock()
  233. defer ps.lock.RUnlock()
  234. return ps.peers[id]
  235. }
  236. // peersWithoutBlock retrieves a list of peers that do not have a given block in
  237. // their set of known hashes so it might be propagated to them.
  238. func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {
  239. ps.lock.RLock()
  240. defer ps.lock.RUnlock()
  241. list := make([]*ethPeer, 0, len(ps.peers))
  242. for _, p := range ps.peers {
  243. if !p.KnownBlock(hash) {
  244. list = append(list, p)
  245. }
  246. }
  247. return list
  248. }
  249. // peersWithoutTransaction retrieves a list of peers that do not have a given
  250. // transaction in their set of known hashes.
  251. func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
  252. ps.lock.RLock()
  253. defer ps.lock.RUnlock()
  254. list := make([]*ethPeer, 0, len(ps.peers))
  255. for _, p := range ps.peers {
  256. if !p.KnownTransaction(hash) {
  257. list = append(list, p)
  258. }
  259. }
  260. return list
  261. }
  262. // len returns if the current number of `eth` peers in the set. Since the `snap`
  263. // peers are tied to the existence of an `eth` connection, that will always be a
  264. // subset of `eth`.
  265. func (ps *peerSet) len() int {
  266. ps.lock.RLock()
  267. defer ps.lock.RUnlock()
  268. return len(ps.peers)
  269. }
  270. // snapLen returns if the current number of `snap` peers in the set.
  271. func (ps *peerSet) snapLen() int {
  272. ps.lock.RLock()
  273. defer ps.lock.RUnlock()
  274. return ps.snapPeers
  275. }
  276. // peerWithHighestTD retrieves the known peer with the currently highest total
  277. // difficulty.
  278. func (ps *peerSet) peerWithHighestTD() *eth.Peer {
  279. ps.lock.RLock()
  280. defer ps.lock.RUnlock()
  281. var (
  282. bestPeer *eth.Peer
  283. bestTd *big.Int
  284. )
  285. for _, p := range ps.peers {
  286. if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  287. bestPeer, bestTd = p.Peer, td
  288. }
  289. }
  290. return bestPeer
  291. }
  292. // close disconnects all peers.
  293. func (ps *peerSet) close() {
  294. ps.lock.Lock()
  295. defer ps.lock.Unlock()
  296. for _, p := range ps.peers {
  297. p.Disconnect(p2p.DiscQuitting)
  298. }
  299. ps.closed = true
  300. }