peer.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Contains the active peer-set of the downloader, maintaining both failures
  17. // as well as reputation metrics to prioritize the block retrievals.
  18. package downloader
  19. import (
  20. "errors"
  21. "fmt"
  22. "math"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "gopkg.in/fatih/set.v0"
  28. )
  29. // Hash and block fetchers belonging to eth/61 and below
  30. type relativeHashFetcherFn func(common.Hash) error
  31. type absoluteHashFetcherFn func(uint64, int) error
  32. type blockFetcherFn func([]common.Hash) error
  33. // Block header and body fetchers belonging to eth/62 and above
  34. type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
  35. type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
  36. type blockBodyFetcherFn func([]common.Hash) error
  37. type receiptFetcherFn func([]common.Hash) error
  38. type stateFetcherFn func([]common.Hash) error
  39. var (
  40. errAlreadyFetching = errors.New("already fetching blocks from peer")
  41. errAlreadyRegistered = errors.New("peer is already registered")
  42. errNotRegistered = errors.New("peer is not registered")
  43. )
  44. // peer represents an active peer from which hashes and blocks are retrieved.
  45. type peer struct {
  46. id string // Unique identifier of the peer
  47. head common.Hash // Hash of the peers latest known block
  48. blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
  49. receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
  50. stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
  51. rep int32 // Simple peer reputation
  52. blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
  53. receiptCapacity int32 // Number of receipts allowed to fetch per request
  54. stateCapacity int32 // Number of node data pieces allowed to fetch per request
  55. blockStarted time.Time // Time instance when the last block (body)fetch was started
  56. receiptStarted time.Time // Time instance when the last receipt fetch was started
  57. stateStarted time.Time // Time instance when the last node data fetch was started
  58. ignored *set.Set // Set of hashes not to request (didn't have previously)
  59. getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
  60. getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
  61. getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks
  62. getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
  63. getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
  64. getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
  65. getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
  66. getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
  67. version int // Eth protocol version number to switch strategies
  68. }
  69. // newPeer create a new downloader peer, with specific hash and block retrieval
  70. // mechanisms.
  71. func newPeer(id string, version int, head common.Hash,
  72. getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
  73. getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
  74. getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
  75. return &peer{
  76. id: id,
  77. head: head,
  78. blockCapacity: 1,
  79. receiptCapacity: 1,
  80. stateCapacity: 1,
  81. ignored: set.New(),
  82. getRelHashes: getRelHashes,
  83. getAbsHashes: getAbsHashes,
  84. getBlocks: getBlocks,
  85. getRelHeaders: getRelHeaders,
  86. getAbsHeaders: getAbsHeaders,
  87. getBlockBodies: getBlockBodies,
  88. getReceipts: getReceipts,
  89. getNodeData: getNodeData,
  90. version: version,
  91. }
  92. }
  93. // Reset clears the internal state of a peer entity.
  94. func (p *peer) Reset() {
  95. atomic.StoreInt32(&p.blockIdle, 0)
  96. atomic.StoreInt32(&p.receiptIdle, 0)
  97. atomic.StoreInt32(&p.blockCapacity, 1)
  98. atomic.StoreInt32(&p.receiptCapacity, 1)
  99. atomic.StoreInt32(&p.stateCapacity, 1)
  100. p.ignored.Clear()
  101. }
  102. // Fetch61 sends a block retrieval request to the remote peer.
  103. func (p *peer) Fetch61(request *fetchRequest) error {
  104. // Sanity check the protocol version
  105. if p.version != 61 {
  106. panic(fmt.Sprintf("block fetch [eth/61] requested on eth/%d", p.version))
  107. }
  108. // Short circuit if the peer is already fetching
  109. if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
  110. return errAlreadyFetching
  111. }
  112. p.blockStarted = time.Now()
  113. // Convert the hash set to a retrievable slice
  114. hashes := make([]common.Hash, 0, len(request.Hashes))
  115. for hash, _ := range request.Hashes {
  116. hashes = append(hashes, hash)
  117. }
  118. go p.getBlocks(hashes)
  119. return nil
  120. }
  121. // FetchBodies sends a block body retrieval request to the remote peer.
  122. func (p *peer) FetchBodies(request *fetchRequest) error {
  123. // Sanity check the protocol version
  124. if p.version < 62 {
  125. panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version))
  126. }
  127. // Short circuit if the peer is already fetching
  128. if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
  129. return errAlreadyFetching
  130. }
  131. p.blockStarted = time.Now()
  132. // Convert the header set to a retrievable slice
  133. hashes := make([]common.Hash, 0, len(request.Headers))
  134. for _, header := range request.Headers {
  135. hashes = append(hashes, header.Hash())
  136. }
  137. go p.getBlockBodies(hashes)
  138. return nil
  139. }
  140. // FetchReceipts sends a receipt retrieval request to the remote peer.
  141. func (p *peer) FetchReceipts(request *fetchRequest) error {
  142. // Sanity check the protocol version
  143. if p.version < 63 {
  144. panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version))
  145. }
  146. // Short circuit if the peer is already fetching
  147. if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
  148. return errAlreadyFetching
  149. }
  150. p.receiptStarted = time.Now()
  151. // Convert the header set to a retrievable slice
  152. hashes := make([]common.Hash, 0, len(request.Headers))
  153. for _, header := range request.Headers {
  154. hashes = append(hashes, header.Hash())
  155. }
  156. go p.getReceipts(hashes)
  157. return nil
  158. }
  159. // FetchNodeData sends a node state data retrieval request to the remote peer.
  160. func (p *peer) FetchNodeData(request *fetchRequest) error {
  161. // Sanity check the protocol version
  162. if p.version < 63 {
  163. panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
  164. }
  165. // Short circuit if the peer is already fetching
  166. if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
  167. return errAlreadyFetching
  168. }
  169. p.stateStarted = time.Now()
  170. // Convert the hash set to a retrievable slice
  171. hashes := make([]common.Hash, 0, len(request.Hashes))
  172. for hash, _ := range request.Hashes {
  173. hashes = append(hashes, hash)
  174. }
  175. go p.getNodeData(hashes)
  176. return nil
  177. }
  178. // SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
  179. // Its block retrieval allowance will also be updated either up- or downwards,
  180. // depending on whether the previous fetch completed in time.
  181. func (p *peer) SetBlocksIdle() {
  182. p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
  183. }
  184. // SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
  185. // Its block body retrieval allowance will also be updated either up- or downwards,
  186. // depending on whether the previous fetch completed in time.
  187. func (p *peer) SetBodiesIdle() {
  188. p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle)
  189. }
  190. // SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
  191. // Its receipt retrieval allowance will also be updated either up- or downwards,
  192. // depending on whether the previous fetch completed in time.
  193. func (p *peer) SetReceiptsIdle() {
  194. p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
  195. }
  196. // SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
  197. // requests. Its node data retrieval allowance will also be updated either up- or
  198. // downwards, depending on whether the previous fetch completed in time.
  199. func (p *peer) SetNodeDataIdle() {
  200. p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
  201. }
  202. // setIdle sets the peer to idle, allowing it to execute new retrieval requests.
  203. // Its data retrieval allowance will also be updated either up- or downwards,
  204. // depending on whether the previous fetch completed in time.
  205. func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
  206. // Update the peer's download allowance based on previous performance
  207. scale := 2.0
  208. if time.Since(started) > softTTL {
  209. scale = 0.5
  210. if time.Since(started) > hardTTL {
  211. scale = 1 / float64(maxFetch) // reduces capacity to 1
  212. }
  213. }
  214. for {
  215. // Calculate the new download bandwidth allowance
  216. prev := atomic.LoadInt32(capacity)
  217. next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
  218. // Try to update the old value
  219. if atomic.CompareAndSwapInt32(capacity, prev, next) {
  220. // If we're having problems at 1 capacity, try to find better peers
  221. if next == 1 {
  222. p.Demote()
  223. }
  224. break
  225. }
  226. }
  227. // Set the peer to idle to allow further fetch requests
  228. atomic.StoreInt32(idle, 0)
  229. }
  230. // BlockCapacity retrieves the peers block download allowance based on its
  231. // previously discovered bandwidth capacity.
  232. func (p *peer) BlockCapacity() int {
  233. return int(atomic.LoadInt32(&p.blockCapacity))
  234. }
  235. // ReceiptCapacity retrieves the peers block download allowance based on its
  236. // previously discovered bandwidth capacity.
  237. func (p *peer) ReceiptCapacity() int {
  238. return int(atomic.LoadInt32(&p.receiptCapacity))
  239. }
  240. // NodeDataCapacity retrieves the peers block download allowance based on its
  241. // previously discovered bandwidth capacity.
  242. func (p *peer) NodeDataCapacity() int {
  243. return int(atomic.LoadInt32(&p.stateCapacity))
  244. }
  245. // Promote increases the peer's reputation.
  246. func (p *peer) Promote() {
  247. atomic.AddInt32(&p.rep, 1)
  248. }
  249. // Demote decreases the peer's reputation or leaves it at 0.
  250. func (p *peer) Demote() {
  251. for {
  252. // Calculate the new reputation value
  253. prev := atomic.LoadInt32(&p.rep)
  254. next := prev / 2
  255. // Try to update the old value
  256. if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
  257. return
  258. }
  259. }
  260. }
  261. // String implements fmt.Stringer.
  262. func (p *peer) String() string {
  263. return fmt.Sprintf("Peer %s [%s]", p.id,
  264. fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
  265. fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
  266. fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
  267. fmt.Sprintf("ignored %4d", p.ignored.Size()),
  268. )
  269. }
  270. // peerSet represents the collection of active peer participating in the block
  271. // download procedure.
  272. type peerSet struct {
  273. peers map[string]*peer
  274. lock sync.RWMutex
  275. }
  276. // newPeerSet creates a new peer set top track the active download sources.
  277. func newPeerSet() *peerSet {
  278. return &peerSet{
  279. peers: make(map[string]*peer),
  280. }
  281. }
  282. // Reset iterates over the current peer set, and resets each of the known peers
  283. // to prepare for a next batch of block retrieval.
  284. func (ps *peerSet) Reset() {
  285. ps.lock.RLock()
  286. defer ps.lock.RUnlock()
  287. for _, peer := range ps.peers {
  288. peer.Reset()
  289. }
  290. }
  291. // Register injects a new peer into the working set, or returns an error if the
  292. // peer is already known.
  293. func (ps *peerSet) Register(p *peer) error {
  294. ps.lock.Lock()
  295. defer ps.lock.Unlock()
  296. if _, ok := ps.peers[p.id]; ok {
  297. return errAlreadyRegistered
  298. }
  299. ps.peers[p.id] = p
  300. return nil
  301. }
  302. // Unregister removes a remote peer from the active set, disabling any further
  303. // actions to/from that particular entity.
  304. func (ps *peerSet) Unregister(id string) error {
  305. ps.lock.Lock()
  306. defer ps.lock.Unlock()
  307. if _, ok := ps.peers[id]; !ok {
  308. return errNotRegistered
  309. }
  310. delete(ps.peers, id)
  311. return nil
  312. }
  313. // Peer retrieves the registered peer with the given id.
  314. func (ps *peerSet) Peer(id string) *peer {
  315. ps.lock.RLock()
  316. defer ps.lock.RUnlock()
  317. return ps.peers[id]
  318. }
  319. // Len returns if the current number of peers in the set.
  320. func (ps *peerSet) Len() int {
  321. ps.lock.RLock()
  322. defer ps.lock.RUnlock()
  323. return len(ps.peers)
  324. }
  325. // AllPeers retrieves a flat list of all the peers within the set.
  326. func (ps *peerSet) AllPeers() []*peer {
  327. ps.lock.RLock()
  328. defer ps.lock.RUnlock()
  329. list := make([]*peer, 0, len(ps.peers))
  330. for _, p := range ps.peers {
  331. list = append(list, p)
  332. }
  333. return list
  334. }
  335. // BlockIdlePeers retrieves a flat list of all the currently idle peers within the
  336. // active peer set, ordered by their reputation.
  337. func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
  338. idle := func(p *peer) bool {
  339. return atomic.LoadInt32(&p.blockIdle) == 0
  340. }
  341. return ps.idlePeers(61, 61, idle)
  342. }
  343. // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
  344. // the active peer set, ordered by their reputation.
  345. func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
  346. idle := func(p *peer) bool {
  347. return atomic.LoadInt32(&p.blockIdle) == 0
  348. }
  349. return ps.idlePeers(62, 64, idle)
  350. }
  351. // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
  352. // within the active peer set, ordered by their reputation.
  353. func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
  354. idle := func(p *peer) bool {
  355. return atomic.LoadInt32(&p.receiptIdle) == 0
  356. }
  357. return ps.idlePeers(63, 64, idle)
  358. }
  359. // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
  360. // peers within the active peer set, ordered by their reputation.
  361. func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
  362. idle := func(p *peer) bool {
  363. return atomic.LoadInt32(&p.stateIdle) == 0
  364. }
  365. return ps.idlePeers(63, 64, idle)
  366. }
  367. // idlePeers retrieves a flat list of all currently idle peers satisfying the
  368. // protocol version constraints, using the provided function to check idleness.
  369. func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
  370. ps.lock.RLock()
  371. defer ps.lock.RUnlock()
  372. idle, total := make([]*peer, 0, len(ps.peers)), 0
  373. for _, p := range ps.peers {
  374. if p.version >= minProtocol && p.version <= maxProtocol {
  375. if idleCheck(p) {
  376. idle = append(idle, p)
  377. }
  378. total++
  379. }
  380. }
  381. for i := 0; i < len(idle); i++ {
  382. for j := i + 1; j < len(idle); j++ {
  383. if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
  384. idle[i], idle[j] = idle[j], idle[i]
  385. }
  386. }
  387. }
  388. return idle, total
  389. }