costtracker.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more detailct.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "math"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common/mclock"
  25. "github.com/ethereum/go-ethereum/eth"
  26. "github.com/ethereum/go-ethereum/ethdb"
  27. "github.com/ethereum/go-ethereum/les/csvlogger"
  28. "github.com/ethereum/go-ethereum/les/flowcontrol"
  29. "github.com/ethereum/go-ethereum/log"
  30. )
  31. const makeCostStats = false // make request cost statistics during operation
  32. var (
  33. // average request cost estimates based on serving time
  34. reqAvgTimeCost = requestCostTable{
  35. GetBlockHeadersMsg: {150000, 30000},
  36. GetBlockBodiesMsg: {0, 700000},
  37. GetReceiptsMsg: {0, 1000000},
  38. GetCodeMsg: {0, 450000},
  39. GetProofsV2Msg: {0, 600000},
  40. GetHelperTrieProofsMsg: {0, 1000000},
  41. SendTxV2Msg: {0, 450000},
  42. GetTxStatusMsg: {0, 250000},
  43. }
  44. // maximum incoming message size estimates
  45. reqMaxInSize = requestCostTable{
  46. GetBlockHeadersMsg: {40, 0},
  47. GetBlockBodiesMsg: {0, 40},
  48. GetReceiptsMsg: {0, 40},
  49. GetCodeMsg: {0, 80},
  50. GetProofsV2Msg: {0, 80},
  51. GetHelperTrieProofsMsg: {0, 20},
  52. SendTxV2Msg: {0, 16500},
  53. GetTxStatusMsg: {0, 50},
  54. }
  55. // maximum outgoing message size estimates
  56. reqMaxOutSize = requestCostTable{
  57. GetBlockHeadersMsg: {0, 556},
  58. GetBlockBodiesMsg: {0, 100000},
  59. GetReceiptsMsg: {0, 200000},
  60. GetCodeMsg: {0, 50000},
  61. GetProofsV2Msg: {0, 4000},
  62. GetHelperTrieProofsMsg: {0, 4000},
  63. SendTxV2Msg: {0, 100},
  64. GetTxStatusMsg: {0, 100},
  65. }
  66. // request amounts that have to fit into the minimum buffer size minBufferMultiplier times
  67. minBufferReqAmount = map[uint64]uint64{
  68. GetBlockHeadersMsg: 192,
  69. GetBlockBodiesMsg: 1,
  70. GetReceiptsMsg: 1,
  71. GetCodeMsg: 1,
  72. GetProofsV2Msg: 1,
  73. GetHelperTrieProofsMsg: 16,
  74. SendTxV2Msg: 8,
  75. GetTxStatusMsg: 64,
  76. }
  77. minBufferMultiplier = 3
  78. )
  79. const (
  80. maxCostFactor = 2 // ratio of maximum and average cost estimates
  81. gfUsageThreshold = 0.5
  82. gfUsageTC = time.Second
  83. gfRaiseTC = time.Second * 200
  84. gfDropTC = time.Second * 50
  85. gfDbKey = "_globalCostFactorV3"
  86. )
  87. // costTracker is responsible for calculating costs and cost estimates on the
  88. // server side. It continuously updates the global cost factor which is defined
  89. // as the number of cost units per nanosecond of serving time in a single thread.
  90. // It is based on statistics collected during serving requests in high-load periods
  91. // and practically acts as a one-dimension request price scaling factor over the
  92. // pre-defined cost estimate table. Instead of scaling the cost values, the real
  93. // value of cost units is changed by applying the factor to the serving times. This
  94. // is more convenient because the changes in the cost factor can be applied immediately
  95. // without always notifying the clients about the changed cost tables.
  96. type costTracker struct {
  97. db ethdb.Database
  98. stopCh chan chan struct{}
  99. inSizeFactor, outSizeFactor float64
  100. gf, utilTarget float64
  101. minBufLimit uint64
  102. gfUpdateCh chan gfUpdate
  103. gfLock sync.RWMutex
  104. totalRechargeCh chan uint64
  105. stats map[uint64][]uint64
  106. logger *csvlogger.Logger
  107. logRecentTime, logRecentAvg, logTotalRecharge, logRelCost *csvlogger.Channel
  108. }
  109. // newCostTracker creates a cost tracker and loads the cost factor statistics from the database.
  110. // It also returns the minimum capacity that can be assigned to any peer.
  111. func newCostTracker(db ethdb.Database, config *eth.Config, logger *csvlogger.Logger) (*costTracker, uint64) {
  112. utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100
  113. ct := &costTracker{
  114. db: db,
  115. stopCh: make(chan chan struct{}),
  116. utilTarget: utilTarget,
  117. logger: logger,
  118. logRelCost: logger.NewMinMaxChannel("relativeCost", true),
  119. logRecentTime: logger.NewMinMaxChannel("recentTime", true),
  120. logRecentAvg: logger.NewMinMaxChannel("recentAvg", true),
  121. logTotalRecharge: logger.NewChannel("totalRecharge", 0.01),
  122. }
  123. if config.LightBandwidthIn > 0 {
  124. ct.inSizeFactor = utilTarget / float64(config.LightBandwidthIn)
  125. }
  126. if config.LightBandwidthOut > 0 {
  127. ct.outSizeFactor = utilTarget / float64(config.LightBandwidthOut)
  128. }
  129. if makeCostStats {
  130. ct.stats = make(map[uint64][]uint64)
  131. for code := range reqAvgTimeCost {
  132. ct.stats[code] = make([]uint64, 10)
  133. }
  134. }
  135. ct.gfLoop()
  136. costList := ct.makeCostList(ct.globalFactor() * 1.25)
  137. for _, c := range costList {
  138. amount := minBufferReqAmount[c.MsgCode]
  139. cost := c.BaseCost + amount*c.ReqCost
  140. if cost > ct.minBufLimit {
  141. ct.minBufLimit = cost
  142. }
  143. }
  144. ct.minBufLimit *= uint64(minBufferMultiplier)
  145. return ct, (ct.minBufLimit-1)/bufLimitRatio + 1
  146. }
  147. // stop stops the cost tracker and saves the cost factor statistics to the database
  148. func (ct *costTracker) stop() {
  149. stopCh := make(chan struct{})
  150. ct.stopCh <- stopCh
  151. <-stopCh
  152. if makeCostStats {
  153. ct.printStats()
  154. }
  155. }
  156. // makeCostList returns upper cost estimates based on the hardcoded cost estimate
  157. // tables and the optionally specified incoming/outgoing bandwidth limits
  158. func (ct *costTracker) makeCostList(globalFactor float64) RequestCostList {
  159. maxCost := func(avgTimeCost, inSize, outSize uint64) uint64 {
  160. cost := avgTimeCost * maxCostFactor
  161. inSizeCost := uint64(float64(inSize) * ct.inSizeFactor * globalFactor)
  162. if inSizeCost > cost {
  163. cost = inSizeCost
  164. }
  165. outSizeCost := uint64(float64(outSize) * ct.outSizeFactor * globalFactor)
  166. if outSizeCost > cost {
  167. cost = outSizeCost
  168. }
  169. return cost
  170. }
  171. var list RequestCostList
  172. for code, data := range reqAvgTimeCost {
  173. baseCost := maxCost(data.baseCost, reqMaxInSize[code].baseCost, reqMaxOutSize[code].baseCost)
  174. reqCost := maxCost(data.reqCost, reqMaxInSize[code].reqCost, reqMaxOutSize[code].reqCost)
  175. if ct.minBufLimit != 0 {
  176. // if minBufLimit is set then always enforce maximum request cost <= minBufLimit
  177. maxCost := baseCost + reqCost*minBufferReqAmount[code]
  178. if maxCost > ct.minBufLimit {
  179. mul := 0.999 * float64(ct.minBufLimit) / float64(maxCost)
  180. baseCost = uint64(float64(baseCost) * mul)
  181. reqCost = uint64(float64(reqCost) * mul)
  182. }
  183. }
  184. list = append(list, requestCostListItem{
  185. MsgCode: code,
  186. BaseCost: baseCost,
  187. ReqCost: reqCost,
  188. })
  189. }
  190. return list
  191. }
  192. type gfUpdate struct {
  193. avgTimeCost, servingTime float64
  194. }
  195. // gfLoop starts an event loop which updates the global cost factor which is
  196. // calculated as a weighted average of the average estimate / serving time ratio.
  197. // The applied weight equals the serving time if gfUsage is over a threshold,
  198. // zero otherwise. gfUsage is the recent average serving time per time unit in
  199. // an exponential moving window. This ensures that statistics are collected only
  200. // under high-load circumstances where the measured serving times are relevant.
  201. // The total recharge parameter of the flow control system which controls the
  202. // total allowed serving time per second but nominated in cost units, should
  203. // also be scaled with the cost factor and is also updated by this loop.
  204. func (ct *costTracker) gfLoop() {
  205. var gfLog, recentTime, recentAvg float64
  206. lastUpdate := mclock.Now()
  207. expUpdate := lastUpdate
  208. data, _ := ct.db.Get([]byte(gfDbKey))
  209. if len(data) == 8 {
  210. gfLog = math.Float64frombits(binary.BigEndian.Uint64(data[:]))
  211. }
  212. gf := math.Exp(gfLog)
  213. ct.gf = gf
  214. totalRecharge := ct.utilTarget * gf
  215. ct.gfUpdateCh = make(chan gfUpdate, 100)
  216. threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / 1000000
  217. go func() {
  218. saveCostFactor := func() {
  219. var data [8]byte
  220. binary.BigEndian.PutUint64(data[:], math.Float64bits(gfLog))
  221. ct.db.Put([]byte(gfDbKey), data[:])
  222. log.Debug("global cost factor saved", "value", gf)
  223. }
  224. saveTicker := time.NewTicker(time.Minute * 10)
  225. for {
  226. select {
  227. case r := <-ct.gfUpdateCh:
  228. now := mclock.Now()
  229. if ct.logRelCost != nil && r.avgTimeCost > 1e-20 {
  230. ct.logRelCost.Update(r.servingTime * gf / r.avgTimeCost)
  231. }
  232. if r.servingTime > 1000000000 {
  233. ct.logger.Event(fmt.Sprintf("Very long servingTime = %f avgTimeCost = %f costFactor = %f", r.servingTime, r.avgTimeCost, gf))
  234. }
  235. dt := float64(now - expUpdate)
  236. expUpdate = now
  237. exp := math.Exp(-dt / float64(gfUsageTC))
  238. // calculate gf correction until now, based on previous values
  239. var gfCorr float64
  240. max := recentTime
  241. if recentAvg > max {
  242. max = recentAvg
  243. }
  244. // we apply continuous correction when MAX(recentTime, recentAvg) > threshold
  245. if max > threshold {
  246. // calculate correction time between last expUpdate and now
  247. if max*exp >= threshold {
  248. gfCorr = dt
  249. } else {
  250. gfCorr = math.Log(max/threshold) * float64(gfUsageTC)
  251. }
  252. // calculate log(gf) correction with the right direction and time constant
  253. if recentTime > recentAvg {
  254. // drop gf if actual serving times are larger than average estimates
  255. gfCorr /= -float64(gfDropTC)
  256. } else {
  257. // raise gf if actual serving times are smaller than average estimates
  258. gfCorr /= float64(gfRaiseTC)
  259. }
  260. }
  261. // update recent cost values with current request
  262. recentTime = recentTime*exp + r.servingTime
  263. recentAvg = recentAvg*exp + r.avgTimeCost/gf
  264. if gfCorr != 0 {
  265. gfLog += gfCorr
  266. gf = math.Exp(gfLog)
  267. if time.Duration(now-lastUpdate) > time.Second {
  268. totalRecharge = ct.utilTarget * gf
  269. lastUpdate = now
  270. ct.gfLock.Lock()
  271. ct.gf = gf
  272. ch := ct.totalRechargeCh
  273. ct.gfLock.Unlock()
  274. if ch != nil {
  275. select {
  276. case ct.totalRechargeCh <- uint64(totalRecharge):
  277. default:
  278. }
  279. }
  280. log.Debug("global cost factor updated", "gf", gf)
  281. }
  282. }
  283. ct.logRecentTime.Update(recentTime)
  284. ct.logRecentAvg.Update(recentAvg)
  285. ct.logTotalRecharge.Update(totalRecharge)
  286. case <-saveTicker.C:
  287. saveCostFactor()
  288. case stopCh := <-ct.stopCh:
  289. saveCostFactor()
  290. close(stopCh)
  291. return
  292. }
  293. }
  294. }()
  295. }
  296. // globalFactor returns the current value of the global cost factor
  297. func (ct *costTracker) globalFactor() float64 {
  298. ct.gfLock.RLock()
  299. defer ct.gfLock.RUnlock()
  300. return ct.gf
  301. }
  302. // totalRecharge returns the current total recharge parameter which is used by
  303. // flowcontrol.ClientManager and is scaled by the global cost factor
  304. func (ct *costTracker) totalRecharge() uint64 {
  305. ct.gfLock.RLock()
  306. defer ct.gfLock.RUnlock()
  307. return uint64(ct.gf * ct.utilTarget)
  308. }
  309. // subscribeTotalRecharge returns all future updates to the total recharge value
  310. // through a channel and also returns the current value
  311. func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 {
  312. ct.gfLock.Lock()
  313. defer ct.gfLock.Unlock()
  314. ct.totalRechargeCh = ch
  315. return uint64(ct.gf * ct.utilTarget)
  316. }
  317. // updateStats updates the global cost factor and (if enabled) the real cost vs.
  318. // average estimate statistics
  319. func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
  320. avg := reqAvgTimeCost[code]
  321. avgTimeCost := avg.baseCost + amount*avg.reqCost
  322. select {
  323. case ct.gfUpdateCh <- gfUpdate{float64(avgTimeCost), float64(servingTime)}:
  324. default:
  325. }
  326. if makeCostStats {
  327. realCost <<= 4
  328. l := 0
  329. for l < 9 && realCost > avgTimeCost {
  330. l++
  331. realCost >>= 1
  332. }
  333. atomic.AddUint64(&ct.stats[code][l], 1)
  334. }
  335. }
  336. // realCost calculates the final cost of a request based on actual serving time,
  337. // incoming and outgoing message size
  338. //
  339. // Note: message size is only taken into account if bandwidth limitation is applied
  340. // and the cost based on either message size is greater than the cost based on
  341. // serving time. A maximum of the three costs is applied instead of their sum
  342. // because the three limited resources (serving thread time and i/o bandwidth) can
  343. // also be maxed out simultaneously.
  344. func (ct *costTracker) realCost(servingTime uint64, inSize, outSize uint32) uint64 {
  345. cost := float64(servingTime)
  346. inSizeCost := float64(inSize) * ct.inSizeFactor
  347. if inSizeCost > cost {
  348. cost = inSizeCost
  349. }
  350. outSizeCost := float64(outSize) * ct.outSizeFactor
  351. if outSizeCost > cost {
  352. cost = outSizeCost
  353. }
  354. return uint64(cost * ct.globalFactor())
  355. }
  356. // printStats prints the distribution of real request cost relative to the average estimates
  357. func (ct *costTracker) printStats() {
  358. if ct.stats == nil {
  359. return
  360. }
  361. for code, arr := range ct.stats {
  362. log.Info("Request cost statistics", "code", code, "1/16", arr[0], "1/8", arr[1], "1/4", arr[2], "1/2", arr[3], "1", arr[4], "2", arr[5], "4", arr[6], "8", arr[7], "16", arr[8], ">16", arr[9])
  363. }
  364. }
  365. type (
  366. // requestCostTable assigns a cost estimate function to each request type
  367. // which is a linear function of the requested amount
  368. // (cost = baseCost + reqCost * amount)
  369. requestCostTable map[uint64]*requestCosts
  370. requestCosts struct {
  371. baseCost, reqCost uint64
  372. }
  373. // RequestCostList is a list representation of request costs which is used for
  374. // database storage and communication through the network
  375. RequestCostList []requestCostListItem
  376. requestCostListItem struct {
  377. MsgCode, BaseCost, ReqCost uint64
  378. }
  379. )
  380. // getMaxCost calculates the estimated cost for a given request type and amount
  381. func (table requestCostTable) getMaxCost(code, amount uint64) uint64 {
  382. costs := table[code]
  383. return costs.baseCost + amount*costs.reqCost
  384. }
  385. // decode converts a cost list to a cost table
  386. func (list RequestCostList) decode(protocolLength uint64) requestCostTable {
  387. table := make(requestCostTable)
  388. for _, e := range list {
  389. if e.MsgCode < protocolLength {
  390. table[e.MsgCode] = &requestCosts{
  391. baseCost: e.BaseCost,
  392. reqCost: e.ReqCost,
  393. }
  394. }
  395. }
  396. return table
  397. }
  398. // testCostList returns a dummy request cost list used by tests
  399. func testCostList(testCost uint64) RequestCostList {
  400. cl := make(RequestCostList, len(reqAvgTimeCost))
  401. var max uint64
  402. for code := range reqAvgTimeCost {
  403. if code > max {
  404. max = code
  405. }
  406. }
  407. i := 0
  408. for code := uint64(0); code <= max; code++ {
  409. if _, ok := reqAvgTimeCost[code]; ok {
  410. cl[i].MsgCode = code
  411. cl[i].BaseCost = testCost
  412. cl[i].ReqCost = 0
  413. i++
  414. }
  415. }
  416. return cl
  417. }