msgrate.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. // Copyright 2021 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package msgrate allows estimating the throughput of peers for more balanced syncs.
  17. package msgrate
  18. import (
  19. "errors"
  20. "math"
  21. "sort"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/log"
  25. )
  26. // measurementImpact is the impact a single measurement has on a peer's final
  27. // capacity value. A value closer to 0 reacts slower to sudden network changes,
  28. // but it is also more stable against temporary hiccups. 0.1 worked well for
  29. // most of Ethereum's existence, so might as well go with it.
  30. const measurementImpact = 0.1
  31. // capacityOverestimation is the ratio of items to over-estimate when retrieving
  32. // a peer's capacity to avoid locking into a lower value due to never attempting
  33. // to fetch more than some local stable value.
  34. const capacityOverestimation = 1.01
  35. // qosTuningPeers is the number of best peers to tune round trip times based on.
  36. // An Ethereum node doesn't need hundreds of connections to operate correctly,
  37. // so instead of lowering our download speed to the median of potentially many
  38. // bad nodes, we can target a smaller set of vey good nodes. At worse this will
  39. // result in less nodes to sync from, but that's still better than some hogging
  40. // the pipeline.
  41. const qosTuningPeers = 5
  42. // rttMinEstimate is the minimal round trip time to target requests for. Since
  43. // every request entails a 2 way latency + bandwidth + serving database lookups,
  44. // it should be generous enough to permit meaningful work to be done on top of
  45. // the transmission costs.
  46. const rttMinEstimate = 2 * time.Second
  47. // rttMaxEstimate is the maximal round trip time to target requests for. Although
  48. // the expectation is that a well connected node will never reach this, certain
  49. // special connectivity ones might experience significant delays (e.g. satellite
  50. // uplink with 3s RTT). This value should be low enough to forbid stalling the
  51. // pipeline too long, but large enough to cover the worst of the worst links.
  52. const rttMaxEstimate = 20 * time.Second
  53. // rttPushdownFactor is a multiplier to attempt forcing quicker requests than
  54. // what the message rate tracker estimates. The reason is that message rate
  55. // tracking adapts queries to the RTT, but multiple RTT values can be perfectly
  56. // valid, they just result in higher packet sizes. Since smaller packets almost
  57. // always result in stabler download streams, this factor hones in on the lowest
  58. // RTT from all the functional ones.
  59. const rttPushdownFactor = 0.9
  60. // rttMinConfidence is the minimum value the roundtrip confidence factor may drop
  61. // to. Since the target timeouts are based on how confident the tracker is in the
  62. // true roundtrip, it's important to not allow too huge fluctuations.
  63. const rttMinConfidence = 0.1
  64. // ttlScaling is the multiplier that converts the estimated roundtrip time to a
  65. // timeout cap for network requests. The expectation is that peers' response time
  66. // will fluctuate around the estimated roundtrip, but depending in their load at
  67. // request time, it might be higher than anticipated. This scaling factor ensures
  68. // that we allow remote connections some slack but at the same time do enforce a
  69. // behavior similar to our median peers.
  70. const ttlScaling = 3
  71. // ttlLimit is the maximum timeout allowance to prevent reaching crazy numbers
  72. // if some unforeseen network events shappen. As much as we try to hone in on
  73. // the most optimal values, it doesn't make any sense to go above a threshold,
  74. // even if everything is slow and screwy.
  75. const ttlLimit = time.Minute
  76. // tuningConfidenceCap is the number of active peers above which to stop detuning
  77. // the confidence number. The idea here is that once we hone in on the capacity
  78. // of a meaningful number of peers, adding one more should ot have a significant
  79. // impact on things, so just ron with the originals.
  80. const tuningConfidenceCap = 10
  81. // tuningImpact is the influence that a new tuning target has on the previously
  82. // cached value. This number is mostly just an out-of-the-blue heuristic that
  83. // prevents the estimates from jumping around. There's no particular reason for
  84. // the current value.
  85. const tuningImpact = 0.25
  86. // Tracker estimates the throughput capacity of a peer with regard to each data
  87. // type it can deliver. The goal is to dynamically adjust request sizes to max
  88. // out network throughput without overloading either the peer or th elocal node.
  89. //
  90. // By tracking in real time the latencies and bandiwdths peers exhibit for each
  91. // packet type, it's possible to prevent overloading by detecting a slowdown on
  92. // one type when another type is pushed too hard.
  93. //
  94. // Similarly, real time measurements also help avoid overloading the local net
  95. // connection if our peers would otherwise be capable to deliver more, but the
  96. // local link is saturated. In that case, the live measurements will force us
  97. // to reduce request sizes until the throughput gets stable.
  98. //
  99. // Lastly, message rate measurements allows us to detect if a peer is unusually
  100. // slow compared to other peers, in which case we can decide to keep it around
  101. // or free up the slot so someone closer.
  102. //
  103. // Since throughput tracking and estimation adapts dynamically to live network
  104. // conditions, it's fine to have multiple trackers locally track the same peer
  105. // in different subsystem. The throughput will simply be distributed across the
  106. // two trackers if both are highly active.
  107. type Tracker struct {
  108. // capacity is the number of items retrievable per second of a given type.
  109. // It is analogous to bandwidth, but we deliberately avoided using bytes
  110. // as the unit, since serving nodes also spend a lot of time loading data
  111. // from disk, which is linear in the number of items, but mostly constant
  112. // in their sizes.
  113. //
  114. // Callers of course are free to use the item counter as a byte counter if
  115. // or when their protocol of choice if capped by bytes instead of items.
  116. // (eg. eth.getHeaders vs snap.getAccountRange).
  117. capacity map[uint64]float64
  118. // roundtrip is the latency a peer in general responds to data requests.
  119. // This number is not used inside the tracker, but is exposed to compare
  120. // peers to each other and filter out slow ones. Note however, it only
  121. // makes sense to compare RTTs if the caller caters request sizes for
  122. // each peer to target the same RTT. There's no need to make this number
  123. // the real networking RTT, we just need a number to compare peers with.
  124. roundtrip time.Duration
  125. lock sync.RWMutex
  126. }
  127. // NewTracker creates a new message rate tracker for a specific peer. An initial
  128. // RTT is needed to avoid a peer getting marked as an outlier compared to others
  129. // right after joining. It's suggested to use the median rtt across all peers to
  130. // init a new peer tracker.
  131. func NewTracker(caps map[uint64]float64, rtt time.Duration) *Tracker {
  132. if caps == nil {
  133. caps = make(map[uint64]float64)
  134. }
  135. return &Tracker{
  136. capacity: caps,
  137. roundtrip: rtt,
  138. }
  139. }
  140. // Capacity calculates the number of items the peer is estimated to be able to
  141. // retrieve within the allotted time slot. The method will round up any division
  142. // errors and will add an additional overestimation ratio on top. The reason for
  143. // overshooting the capacity is because certain message types might not increase
  144. // the load proportionally to the requested items, so fetching a bit more might
  145. // still take the same RTT. By forcefully overshooting by a small amount, we can
  146. // avoid locking into a lower-that-real capacity.
  147. func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) int {
  148. t.lock.RLock()
  149. defer t.lock.RUnlock()
  150. // Calculate the actual measured throughput
  151. throughput := t.capacity[kind] * float64(targetRTT) / float64(time.Second)
  152. // Return an overestimation to force the peer out of a stuck minima, adding
  153. // +1 in case the item count is too low for the overestimator to dent
  154. return roundCapacity(1 + capacityOverestimation*throughput)
  155. }
  156. // roundCapacity gives the integer value of a capacity.
  157. // The result fits int32, and is guaranteed to be positive.
  158. func roundCapacity(cap float64) int {
  159. const maxInt32 = float64(1<<31 - 1)
  160. return int(math.Min(maxInt32, math.Max(1, math.Ceil(cap))))
  161. }
  162. // Update modifies the peer's capacity values for a specific data type with a new
  163. // measurement. If the delivery is zero, the peer is assumed to have either timed
  164. // out or to not have the requested data, resulting in a slash to 0 capacity. This
  165. // avoids assigning the peer retrievals that it won't be able to honour.
  166. func (t *Tracker) Update(kind uint64, elapsed time.Duration, items int) {
  167. t.lock.Lock()
  168. defer t.lock.Unlock()
  169. // If nothing was delivered (timeout / unavailable data), reduce throughput
  170. // to minimum
  171. if items == 0 {
  172. t.capacity[kind] = 0
  173. return
  174. }
  175. // Otherwise update the throughput with a new measurement
  176. if elapsed <= 0 {
  177. elapsed = 1 // +1 (ns) to ensure non-zero divisor
  178. }
  179. measured := float64(items) / (float64(elapsed) / float64(time.Second))
  180. t.capacity[kind] = (1-measurementImpact)*(t.capacity[kind]) + measurementImpact*measured
  181. t.roundtrip = time.Duration((1-measurementImpact)*float64(t.roundtrip) + measurementImpact*float64(elapsed))
  182. }
  183. // Trackers is a set of message rate trackers across a number of peers with the
  184. // goal of aggregating certain measurements across the entire set for outlier
  185. // filtering and newly joining initialization.
  186. type Trackers struct {
  187. trackers map[string]*Tracker
  188. // roundtrip is the current best guess as to what is a stable round trip time
  189. // across the entire collection of connected peers. This is derived from the
  190. // various trackers added, but is used as a cache to avoid recomputing on each
  191. // network request. The value is updated once every RTT to avoid fluctuations
  192. // caused by hiccups or peer events.
  193. roundtrip time.Duration
  194. // confidence represents the probability that the estimated roundtrip value
  195. // is the real one across all our peers. The confidence value is used as an
  196. // impact factor of new measurements on old estimates. As our connectivity
  197. // stabilizes, this value gravitates towards 1, new measurements havinng
  198. // almost no impact. If there's a large peer churn and few peers, then new
  199. // measurements will impact it more. The confidence is increased with every
  200. // packet and dropped with every new connection.
  201. confidence float64
  202. // tuned is the time instance the tracker recalculated its cached roundtrip
  203. // value and confidence values. A cleaner way would be to have a heartbeat
  204. // goroutine do it regularly, but that requires a lot of maintenance to just
  205. // run every now and again.
  206. tuned time.Time
  207. // The fields below can be used to override certain default values. Their
  208. // purpose is to allow quicker tests. Don't use them in production.
  209. OverrideTTLLimit time.Duration
  210. log log.Logger
  211. lock sync.RWMutex
  212. }
  213. // NewTrackers creates an empty set of trackers to be filled with peers.
  214. func NewTrackers(log log.Logger) *Trackers {
  215. return &Trackers{
  216. trackers: make(map[string]*Tracker),
  217. roundtrip: rttMaxEstimate,
  218. confidence: 1,
  219. tuned: time.Now(),
  220. OverrideTTLLimit: ttlLimit,
  221. log: log,
  222. }
  223. }
  224. // Track inserts a new tracker into the set.
  225. func (t *Trackers) Track(id string, tracker *Tracker) error {
  226. t.lock.Lock()
  227. defer t.lock.Unlock()
  228. if _, ok := t.trackers[id]; ok {
  229. return errors.New("already tracking")
  230. }
  231. t.trackers[id] = tracker
  232. t.detune()
  233. return nil
  234. }
  235. // Untrack stops tracking a previously added peer.
  236. func (t *Trackers) Untrack(id string) error {
  237. t.lock.Lock()
  238. defer t.lock.Unlock()
  239. if _, ok := t.trackers[id]; !ok {
  240. return errors.New("not tracking")
  241. }
  242. delete(t.trackers, id)
  243. return nil
  244. }
  245. // MedianRoundTrip returns the median RTT across all known trackers. The purpose
  246. // of the median RTT is to initialize a new peer with sane statistics that it will
  247. // hopefully outperform. If it seriously underperforms, there's a risk of dropping
  248. // the peer, but that is ok as we're aiming for a strong median.
  249. func (t *Trackers) MedianRoundTrip() time.Duration {
  250. t.lock.RLock()
  251. defer t.lock.RUnlock()
  252. return t.medianRoundTrip()
  253. }
  254. // medianRoundTrip is the internal lockless version of MedianRoundTrip to be used
  255. // by the QoS tuner.
  256. func (t *Trackers) medianRoundTrip() time.Duration {
  257. // Gather all the currently measured round trip times
  258. rtts := make([]float64, 0, len(t.trackers))
  259. for _, tt := range t.trackers {
  260. tt.lock.RLock()
  261. rtts = append(rtts, float64(tt.roundtrip))
  262. tt.lock.RUnlock()
  263. }
  264. sort.Float64s(rtts)
  265. median := rttMaxEstimate
  266. if qosTuningPeers <= len(rtts) {
  267. median = time.Duration(rtts[qosTuningPeers/2]) // Median of our best few peers
  268. } else if len(rtts) > 0 {
  269. median = time.Duration(rtts[len(rtts)/2]) // Median of all out connected peers
  270. }
  271. // Restrict the RTT into some QoS defaults, irrelevant of true RTT
  272. if median < rttMinEstimate {
  273. median = rttMinEstimate
  274. }
  275. if median > rttMaxEstimate {
  276. median = rttMaxEstimate
  277. }
  278. return median
  279. }
  280. // MeanCapacities returns the capacities averaged across all the added trackers.
  281. // The purpos of the mean capacities are to initialize a new peer with some sane
  282. // starting values that it will hopefully outperform. If the mean overshoots, the
  283. // peer will be cut back to minimal capacity and given another chance.
  284. func (t *Trackers) MeanCapacities() map[uint64]float64 {
  285. t.lock.RLock()
  286. defer t.lock.RUnlock()
  287. return t.meanCapacities()
  288. }
  289. // meanCapacities is the internal lockless version of MeanCapacities used for
  290. // debug logging.
  291. func (t *Trackers) meanCapacities() map[uint64]float64 {
  292. capacities := make(map[uint64]float64)
  293. for _, tt := range t.trackers {
  294. tt.lock.RLock()
  295. for key, val := range tt.capacity {
  296. capacities[key] += val
  297. }
  298. tt.lock.RUnlock()
  299. }
  300. for key, val := range capacities {
  301. capacities[key] = val / float64(len(t.trackers))
  302. }
  303. return capacities
  304. }
  305. // TargetRoundTrip returns the current target round trip time for a request to
  306. // complete in.The returned RTT is slightly under the estimated RTT. The reason
  307. // is that message rate estimation is a 2 dimensional problem which is solvable
  308. // for any RTT. The goal is to gravitate towards smaller RTTs instead of large
  309. // messages, to result in a stabler download stream.
  310. func (t *Trackers) TargetRoundTrip() time.Duration {
  311. // Recalculate the internal caches if it's been a while
  312. t.tune()
  313. // Caches surely recent, return target roundtrip
  314. t.lock.RLock()
  315. defer t.lock.RUnlock()
  316. return time.Duration(float64(t.roundtrip) * rttPushdownFactor)
  317. }
  318. // TargetTimeout returns the timeout allowance for a single request to finish
  319. // under. The timeout is proportional to the roundtrip, but also takes into
  320. // consideration the tracker's confidence in said roundtrip and scales it
  321. // accordingly. The final value is capped to avoid runaway requests.
  322. func (t *Trackers) TargetTimeout() time.Duration {
  323. // Recalculate the internal caches if it's been a while
  324. t.tune()
  325. // Caches surely recent, return target timeout
  326. t.lock.RLock()
  327. defer t.lock.RUnlock()
  328. return t.targetTimeout()
  329. }
  330. // targetTimeout is the internal lockless version of TargetTimeout to be used
  331. // during QoS tuning.
  332. func (t *Trackers) targetTimeout() time.Duration {
  333. timeout := time.Duration(ttlScaling * float64(t.roundtrip) / t.confidence)
  334. if timeout > t.OverrideTTLLimit {
  335. timeout = t.OverrideTTLLimit
  336. }
  337. return timeout
  338. }
  339. // tune gathers the individual tracker statistics and updates the estimated
  340. // request round trip time.
  341. func (t *Trackers) tune() {
  342. // Tune may be called concurrently all over the place, but we only want to
  343. // periodically update and even then only once. First check if it was updated
  344. // recently and abort if so.
  345. t.lock.RLock()
  346. dirty := time.Since(t.tuned) > t.roundtrip
  347. t.lock.RUnlock()
  348. if !dirty {
  349. return
  350. }
  351. // If an update is needed, obtain a write lock but make sure we don't update
  352. // it on all concurrent threads one by one.
  353. t.lock.Lock()
  354. defer t.lock.Unlock()
  355. if dirty := time.Since(t.tuned) > t.roundtrip; !dirty {
  356. return // A concurrent request beat us to the tuning
  357. }
  358. // First thread reaching the tuning point, update the estimates and return
  359. t.roundtrip = time.Duration((1-tuningImpact)*float64(t.roundtrip) + tuningImpact*float64(t.medianRoundTrip()))
  360. t.confidence = t.confidence + (1-t.confidence)/2
  361. t.tuned = time.Now()
  362. t.log.Debug("Recalculated msgrate QoS values", "rtt", t.roundtrip, "confidence", t.confidence, "ttl", t.targetTimeout(), "next", t.tuned.Add(t.roundtrip))
  363. t.log.Trace("Debug dump of mean capacities", "caps", log.Lazy{Fn: t.meanCapacities})
  364. }
  365. // detune reduces the tracker's confidence in order to make fresh measurements
  366. // have a larger impact on the estimates. It is meant to be used during new peer
  367. // connections so they can have a proper impact on the estimates.
  368. func (t *Trackers) detune() {
  369. // If we have a single peer, confidence is always 1
  370. if len(t.trackers) == 1 {
  371. t.confidence = 1
  372. return
  373. }
  374. // If we have a ton of peers, don't drop the confidence since there's enough
  375. // remaining to retain the same throughput
  376. if len(t.trackers) >= tuningConfidenceCap {
  377. return
  378. }
  379. // Otherwise drop the confidence factor
  380. peers := float64(len(t.trackers))
  381. t.confidence = t.confidence * (peers - 1) / peers
  382. if t.confidence < rttMinConfidence {
  383. t.confidence = rttMinConfidence
  384. }
  385. t.log.Debug("Relaxed msgrate QoS values", "rtt", t.roundtrip, "confidence", t.confidence, "ttl", t.targetTimeout())
  386. }
  387. // Capacity is a helper function to access a specific tracker without having to
  388. // track it explicitly outside.
  389. func (t *Trackers) Capacity(id string, kind uint64, targetRTT time.Duration) int {
  390. t.lock.RLock()
  391. defer t.lock.RUnlock()
  392. tracker := t.trackers[id]
  393. if tracker == nil {
  394. return 1 // Unregister race, don't return 0, it's a dangerous number
  395. }
  396. return tracker.Capacity(kind, targetRTT)
  397. }
  398. // Update is a helper function to access a specific tracker without having to
  399. // track it explicitly outside.
  400. func (t *Trackers) Update(id string, kind uint64, elapsed time.Duration, items int) {
  401. t.lock.RLock()
  402. defer t.lock.RUnlock()
  403. if tracker := t.trackers[id]; tracker != nil {
  404. tracker.Update(kind, elapsed, items)
  405. }
  406. }