peerset.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/eth/downloader"
  24. "github.com/ethereum/go-ethereum/eth/protocols/diff"
  25. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  26. "github.com/ethereum/go-ethereum/eth/protocols/snap"
  27. "github.com/ethereum/go-ethereum/p2p"
  28. )
  29. var (
  30. // errPeerSetClosed is returned if a peer is attempted to be added or removed
  31. // from the peer set after it has been terminated.
  32. errPeerSetClosed = errors.New("peerset closed")
  33. // errPeerAlreadyRegistered is returned if a peer is attempted to be added
  34. // to the peer set, but one with the same id already exists.
  35. errPeerAlreadyRegistered = errors.New("peer already registered")
  36. // errPeerWaitTimeout is returned if a peer waits extension for too long
  37. errPeerWaitTimeout = errors.New("peer wait timeout")
  38. // errPeerNotRegistered is returned if a peer is attempted to be removed from
  39. // a peer set, but no peer with the given id exists.
  40. errPeerNotRegistered = errors.New("peer not registered")
  41. // errSnapWithoutEth is returned if a peer attempts to connect only on the
  42. // snap protocol without advertising the eth main protocol.
  43. errSnapWithoutEth = errors.New("peer connected on snap without compatible eth support")
  44. // errDiffWithoutEth is returned if a peer attempts to connect only on the
  45. // diff protocol without advertising the eth main protocol.
  46. errDiffWithoutEth = errors.New("peer connected on diff without compatible eth support")
  47. )
  48. const (
  49. // extensionWaitTimeout is the maximum allowed time for the extension wait to
  50. // complete before dropping the connection as malicious.
  51. extensionWaitTimeout = 10 * time.Second
  52. )
  53. // peerSet represents the collection of active peers currently participating in
  54. // the `eth` protocol, with or without the `snap` extension.
  55. type peerSet struct {
  56. peers map[string]*ethPeer // Peers connected on the `eth` protocol
  57. snapPeers int // Number of `snap` compatible peers for connection prioritization
  58. snapWait map[string]chan *snap.Peer // Peers connected on `eth` waiting for their snap extension
  59. snapPend map[string]*snap.Peer // Peers connected on the `snap` protocol, but not yet on `eth`
  60. diffWait map[string]chan *diff.Peer // Peers connected on `eth` waiting for their diff extension
  61. diffPend map[string]*diff.Peer // Peers connected on the `diff` protocol, but not yet on `eth`
  62. lock sync.RWMutex
  63. closed bool
  64. }
  65. // newPeerSet creates a new peer set to track the active participants.
  66. func newPeerSet() *peerSet {
  67. return &peerSet{
  68. peers: make(map[string]*ethPeer),
  69. snapWait: make(map[string]chan *snap.Peer),
  70. snapPend: make(map[string]*snap.Peer),
  71. diffWait: make(map[string]chan *diff.Peer),
  72. diffPend: make(map[string]*diff.Peer),
  73. }
  74. }
  75. // registerSnapExtension unblocks an already connected `eth` peer waiting for its
  76. // `snap` extension, or if no such peer exists, tracks the extension for the time
  77. // being until the `eth` main protocol starts looking for it.
  78. func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error {
  79. // Reject the peer if it advertises `snap` without `eth` as `snap` is only a
  80. // satellite protocol meaningful with the chain selection of `eth`
  81. if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
  82. return errSnapWithoutEth
  83. }
  84. // Ensure nobody can double connect
  85. ps.lock.Lock()
  86. defer ps.lock.Unlock()
  87. id := peer.ID()
  88. if _, ok := ps.peers[id]; ok {
  89. return errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  90. }
  91. if _, ok := ps.snapPend[id]; ok {
  92. return errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  93. }
  94. // Inject the peer into an `eth` counterpart is available, otherwise save for later
  95. if wait, ok := ps.snapWait[id]; ok {
  96. delete(ps.snapWait, id)
  97. wait <- peer
  98. return nil
  99. }
  100. ps.snapPend[id] = peer
  101. return nil
  102. }
  103. // registerDiffExtension unblocks an already connected `eth` peer waiting for its
  104. // `diff` extension, or if no such peer exists, tracks the extension for the time
  105. // being until the `eth` main protocol starts looking for it.
  106. func (ps *peerSet) registerDiffExtension(peer *diff.Peer) error {
  107. // Reject the peer if it advertises `diff` without `eth` as `diff` is only a
  108. // satellite protocol meaningful with the chain selection of `eth`
  109. if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) {
  110. return errDiffWithoutEth
  111. }
  112. // Ensure nobody can double connect
  113. ps.lock.Lock()
  114. defer ps.lock.Unlock()
  115. id := peer.ID()
  116. if _, ok := ps.peers[id]; ok {
  117. return errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  118. }
  119. if _, ok := ps.diffPend[id]; ok {
  120. return errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  121. }
  122. // Inject the peer into an `eth` counterpart is available, otherwise save for later
  123. if wait, ok := ps.diffWait[id]; ok {
  124. delete(ps.diffWait, id)
  125. wait <- peer
  126. return nil
  127. }
  128. ps.diffPend[id] = peer
  129. return nil
  130. }
  131. // waitExtensions blocks until all satellite protocols are connected and tracked
  132. // by the peerset.
  133. func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) {
  134. // If the peer does not support a compatible `snap`, don't wait
  135. if !peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) {
  136. return nil, nil
  137. }
  138. // Ensure nobody can double connect
  139. ps.lock.Lock()
  140. id := peer.ID()
  141. if _, ok := ps.peers[id]; ok {
  142. ps.lock.Unlock()
  143. return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  144. }
  145. if _, ok := ps.snapWait[id]; ok {
  146. ps.lock.Unlock()
  147. return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  148. }
  149. // If `snap` already connected, retrieve the peer from the pending set
  150. if snap, ok := ps.snapPend[id]; ok {
  151. delete(ps.snapPend, id)
  152. ps.lock.Unlock()
  153. return snap, nil
  154. }
  155. // Otherwise wait for `snap` to connect concurrently
  156. wait := make(chan *snap.Peer)
  157. ps.snapWait[id] = wait
  158. ps.lock.Unlock()
  159. select {
  160. case peer := <-wait:
  161. return peer, nil
  162. case <-time.After(extensionWaitTimeout):
  163. ps.lock.Lock()
  164. delete(ps.snapWait, id)
  165. ps.lock.Unlock()
  166. return nil, errPeerWaitTimeout
  167. }
  168. }
  169. // waitDiffExtension blocks until all satellite protocols are connected and tracked
  170. // by the peerset.
  171. func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) {
  172. // If the peer does not support a compatible `diff`, don't wait
  173. if !peer.RunningCap(diff.ProtocolName, diff.ProtocolVersions) {
  174. return nil, nil
  175. }
  176. // Ensure nobody can double connect
  177. ps.lock.Lock()
  178. id := peer.ID()
  179. if _, ok := ps.peers[id]; ok {
  180. ps.lock.Unlock()
  181. return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones
  182. }
  183. if _, ok := ps.diffWait[id]; ok {
  184. ps.lock.Unlock()
  185. return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones
  186. }
  187. // If `diff` already connected, retrieve the peer from the pending set
  188. if diff, ok := ps.diffPend[id]; ok {
  189. delete(ps.diffPend, id)
  190. ps.lock.Unlock()
  191. return diff, nil
  192. }
  193. // Otherwise wait for `diff` to connect concurrently
  194. wait := make(chan *diff.Peer)
  195. ps.diffWait[id] = wait
  196. ps.lock.Unlock()
  197. select {
  198. case peer := <-wait:
  199. return peer, nil
  200. case <-time.After(extensionWaitTimeout):
  201. ps.lock.Lock()
  202. delete(ps.diffWait, id)
  203. ps.lock.Unlock()
  204. return nil, errPeerWaitTimeout
  205. }
  206. }
  207. func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer {
  208. if p := ps.peer(pid); p != nil && p.diffExt != nil {
  209. return p.diffExt
  210. }
  211. return nil
  212. }
  213. // registerPeer injects a new `eth` peer into the working set, or returns an error
  214. // if the peer is already known.
  215. func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error {
  216. // Start tracking the new peer
  217. ps.lock.Lock()
  218. defer ps.lock.Unlock()
  219. if ps.closed {
  220. return errPeerSetClosed
  221. }
  222. id := peer.ID()
  223. if _, ok := ps.peers[id]; ok {
  224. return errPeerAlreadyRegistered
  225. }
  226. eth := &ethPeer{
  227. Peer: peer,
  228. }
  229. if ext != nil {
  230. eth.snapExt = &snapPeer{ext}
  231. ps.snapPeers++
  232. }
  233. if diffExt != nil {
  234. eth.diffExt = &diffPeer{diffExt}
  235. }
  236. ps.peers[id] = eth
  237. return nil
  238. }
  239. // unregisterPeer removes a remote peer from the active set, disabling any further
  240. // actions to/from that particular entity.
  241. func (ps *peerSet) unregisterPeer(id string) error {
  242. ps.lock.Lock()
  243. defer ps.lock.Unlock()
  244. peer, ok := ps.peers[id]
  245. if !ok {
  246. return errPeerNotRegistered
  247. }
  248. delete(ps.peers, id)
  249. if peer.snapExt != nil {
  250. ps.snapPeers--
  251. }
  252. return nil
  253. }
  254. // peer retrieves the registered peer with the given id.
  255. func (ps *peerSet) peer(id string) *ethPeer {
  256. ps.lock.RLock()
  257. defer ps.lock.RUnlock()
  258. return ps.peers[id]
  259. }
  260. // headPeers retrieves a specified number list of peers.
  261. func (ps *peerSet) headPeers(num uint) []*ethPeer {
  262. ps.lock.RLock()
  263. defer ps.lock.RUnlock()
  264. if num > uint(len(ps.peers)) {
  265. num = uint(len(ps.peers))
  266. }
  267. list := make([]*ethPeer, 0, num)
  268. for _, p := range ps.peers {
  269. list = append(list, p)
  270. }
  271. return list
  272. }
  273. // peersWithoutBlock retrieves a list of peers that do not have a given block in
  274. // their set of known hashes so it might be propagated to them.
  275. func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {
  276. ps.lock.RLock()
  277. defer ps.lock.RUnlock()
  278. list := make([]*ethPeer, 0, len(ps.peers))
  279. for _, p := range ps.peers {
  280. if !p.KnownBlock(hash) {
  281. list = append(list, p)
  282. }
  283. }
  284. return list
  285. }
  286. // peersWithoutTransaction retrieves a list of peers that do not have a given
  287. // transaction in their set of known hashes.
  288. func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
  289. ps.lock.RLock()
  290. defer ps.lock.RUnlock()
  291. list := make([]*ethPeer, 0, len(ps.peers))
  292. for _, p := range ps.peers {
  293. if !p.KnownTransaction(hash) {
  294. list = append(list, p)
  295. }
  296. }
  297. return list
  298. }
  299. // len returns if the current number of `eth` peers in the set. Since the `snap`
  300. // peers are tied to the existence of an `eth` connection, that will always be a
  301. // subset of `eth`.
  302. func (ps *peerSet) len() int {
  303. ps.lock.RLock()
  304. defer ps.lock.RUnlock()
  305. return len(ps.peers)
  306. }
  307. // snapLen returns if the current number of `snap` peers in the set.
  308. func (ps *peerSet) snapLen() int {
  309. ps.lock.RLock()
  310. defer ps.lock.RUnlock()
  311. return ps.snapPeers
  312. }
  313. // peerWithHighestTD retrieves the known peer with the currently highest total
  314. // difficulty.
  315. func (ps *peerSet) peerWithHighestTD() *eth.Peer {
  316. ps.lock.RLock()
  317. defer ps.lock.RUnlock()
  318. var (
  319. bestPeer *eth.Peer
  320. bestTd *big.Int
  321. )
  322. for _, p := range ps.peers {
  323. if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  324. bestPeer, bestTd = p.Peer, td
  325. }
  326. }
  327. return bestPeer
  328. }
  329. // close disconnects all peers.
  330. func (ps *peerSet) close() {
  331. ps.lock.Lock()
  332. defer ps.lock.Unlock()
  333. for _, p := range ps.peers {
  334. p.Disconnect(p2p.DiscQuitting)
  335. }
  336. ps.closed = true
  337. }