costtracker.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more detailct.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "math"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/eth"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/les/flowcontrol"
  27. "github.com/ethereum/go-ethereum/log"
  28. )
  29. const makeCostStats = false // make request cost statistics during operation
  30. var (
  31. // average request cost estimates based on serving time
  32. reqAvgTimeCost = requestCostTable{
  33. GetBlockHeadersMsg: {150000, 30000},
  34. GetBlockBodiesMsg: {0, 700000},
  35. GetReceiptsMsg: {0, 1000000},
  36. GetCodeMsg: {0, 450000},
  37. GetProofsV2Msg: {0, 600000},
  38. GetHelperTrieProofsMsg: {0, 1000000},
  39. SendTxV2Msg: {0, 450000},
  40. GetTxStatusMsg: {0, 250000},
  41. }
  42. // maximum incoming message size estimates
  43. reqMaxInSize = requestCostTable{
  44. GetBlockHeadersMsg: {40, 0},
  45. GetBlockBodiesMsg: {0, 40},
  46. GetReceiptsMsg: {0, 40},
  47. GetCodeMsg: {0, 80},
  48. GetProofsV2Msg: {0, 80},
  49. GetHelperTrieProofsMsg: {0, 20},
  50. SendTxV2Msg: {0, 66000},
  51. GetTxStatusMsg: {0, 50},
  52. }
  53. // maximum outgoing message size estimates
  54. reqMaxOutSize = requestCostTable{
  55. GetBlockHeadersMsg: {0, 556},
  56. GetBlockBodiesMsg: {0, 100000},
  57. GetReceiptsMsg: {0, 200000},
  58. GetCodeMsg: {0, 50000},
  59. GetProofsV2Msg: {0, 4000},
  60. GetHelperTrieProofsMsg: {0, 4000},
  61. SendTxV2Msg: {0, 100},
  62. GetTxStatusMsg: {0, 100},
  63. }
  64. minBufLimit = uint64(50000000 * maxCostFactor) // minimum buffer limit allowed for a client
  65. minCapacity = (minBufLimit-1)/bufLimitRatio + 1 // minimum capacity allowed for a client
  66. )
  67. const (
  68. maxCostFactor = 2 // ratio of maximum and average cost estimates
  69. gfInitWeight = time.Second * 10
  70. gfMaxWeight = time.Hour
  71. gfUsageThreshold = 0.5
  72. gfUsageTC = time.Second
  73. gfDbKey = "_globalCostFactor"
  74. )
  75. // costTracker is responsible for calculating costs and cost estimates on the
  76. // server side. It continuously updates the global cost factor which is defined
  77. // as the number of cost units per nanosecond of serving time in a single thread.
  78. // It is based on statistics collected during serving requests in high-load periods
  79. // and practically acts as a one-dimension request price scaling factor over the
  80. // pre-defined cost estimate table. Instead of scaling the cost values, the real
  81. // value of cost units is changed by applying the factor to the serving times. This
  82. // is more convenient because the changes in the cost factor can be applied immediately
  83. // without always notifying the clients about the changed cost tables.
  84. type costTracker struct {
  85. db ethdb.Database
  86. stopCh chan chan struct{}
  87. inSizeFactor, outSizeFactor float64
  88. gf, utilTarget float64
  89. gfUpdateCh chan gfUpdate
  90. gfLock sync.RWMutex
  91. totalRechargeCh chan uint64
  92. stats map[uint64][]uint64
  93. }
  94. // newCostTracker creates a cost tracker and loads the cost factor statistics from the database
  95. func newCostTracker(db ethdb.Database, config *eth.Config) *costTracker {
  96. utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100
  97. ct := &costTracker{
  98. db: db,
  99. stopCh: make(chan chan struct{}),
  100. utilTarget: utilTarget,
  101. }
  102. if config.LightBandwidthIn > 0 {
  103. ct.inSizeFactor = utilTarget / float64(config.LightBandwidthIn)
  104. }
  105. if config.LightBandwidthOut > 0 {
  106. ct.outSizeFactor = utilTarget / float64(config.LightBandwidthOut)
  107. }
  108. if makeCostStats {
  109. ct.stats = make(map[uint64][]uint64)
  110. for code := range reqAvgTimeCost {
  111. ct.stats[code] = make([]uint64, 10)
  112. }
  113. }
  114. ct.gfLoop()
  115. return ct
  116. }
  117. // stop stops the cost tracker and saves the cost factor statistics to the database
  118. func (ct *costTracker) stop() {
  119. stopCh := make(chan struct{})
  120. ct.stopCh <- stopCh
  121. <-stopCh
  122. if makeCostStats {
  123. ct.printStats()
  124. }
  125. }
  126. // makeCostList returns upper cost estimates based on the hardcoded cost estimate
  127. // tables and the optionally specified incoming/outgoing bandwidth limits
  128. func (ct *costTracker) makeCostList() RequestCostList {
  129. maxCost := func(avgTime, inSize, outSize uint64) uint64 {
  130. globalFactor := ct.globalFactor()
  131. cost := avgTime * maxCostFactor
  132. inSizeCost := uint64(float64(inSize) * ct.inSizeFactor * globalFactor * maxCostFactor)
  133. if inSizeCost > cost {
  134. cost = inSizeCost
  135. }
  136. outSizeCost := uint64(float64(outSize) * ct.outSizeFactor * globalFactor * maxCostFactor)
  137. if outSizeCost > cost {
  138. cost = outSizeCost
  139. }
  140. return cost
  141. }
  142. var list RequestCostList
  143. for code, data := range reqAvgTimeCost {
  144. list = append(list, requestCostListItem{
  145. MsgCode: code,
  146. BaseCost: maxCost(data.baseCost, reqMaxInSize[code].baseCost, reqMaxOutSize[code].baseCost),
  147. ReqCost: maxCost(data.reqCost, reqMaxInSize[code].reqCost, reqMaxOutSize[code].reqCost),
  148. })
  149. }
  150. return list
  151. }
  152. type gfUpdate struct {
  153. avgTime, servingTime float64
  154. }
  155. // gfLoop starts an event loop which updates the global cost factor which is
  156. // calculated as a weighted average of the average estimate / serving time ratio.
  157. // The applied weight equals the serving time if gfUsage is over a threshold,
  158. // zero otherwise. gfUsage is the recent average serving time per time unit in
  159. // an exponential moving window. This ensures that statistics are collected only
  160. // under high-load circumstances where the measured serving times are relevant.
  161. // The total recharge parameter of the flow control system which controls the
  162. // total allowed serving time per second but nominated in cost units, should
  163. // also be scaled with the cost factor and is also updated by this loop.
  164. func (ct *costTracker) gfLoop() {
  165. var gfUsage, gfSum, gfWeight float64
  166. lastUpdate := mclock.Now()
  167. expUpdate := lastUpdate
  168. data, _ := ct.db.Get([]byte(gfDbKey))
  169. if len(data) == 16 {
  170. gfSum = math.Float64frombits(binary.BigEndian.Uint64(data[0:8]))
  171. gfWeight = math.Float64frombits(binary.BigEndian.Uint64(data[8:16]))
  172. }
  173. if gfWeight < float64(gfInitWeight) {
  174. gfSum = float64(gfInitWeight)
  175. gfWeight = float64(gfInitWeight)
  176. }
  177. gf := gfSum / gfWeight
  178. ct.gf = gf
  179. ct.gfUpdateCh = make(chan gfUpdate, 100)
  180. go func() {
  181. for {
  182. select {
  183. case r := <-ct.gfUpdateCh:
  184. now := mclock.Now()
  185. max := r.servingTime * gf
  186. if r.avgTime > max {
  187. max = r.avgTime
  188. }
  189. dt := float64(now - expUpdate)
  190. expUpdate = now
  191. gfUsage = gfUsage*math.Exp(-dt/float64(gfUsageTC)) + max*1000000/float64(gfUsageTC)
  192. if gfUsage >= gfUsageThreshold*ct.utilTarget*gf {
  193. gfSum += r.avgTime
  194. gfWeight += r.servingTime
  195. if time.Duration(now-lastUpdate) > time.Second {
  196. gf = gfSum / gfWeight
  197. if gfWeight >= float64(gfMaxWeight) {
  198. gfSum = gf * float64(gfMaxWeight)
  199. gfWeight = float64(gfMaxWeight)
  200. }
  201. lastUpdate = now
  202. ct.gfLock.Lock()
  203. ct.gf = gf
  204. ch := ct.totalRechargeCh
  205. ct.gfLock.Unlock()
  206. if ch != nil {
  207. select {
  208. case ct.totalRechargeCh <- uint64(ct.utilTarget * gf):
  209. default:
  210. }
  211. }
  212. log.Debug("global cost factor updated", "gf", gf, "weight", time.Duration(gfWeight))
  213. }
  214. }
  215. case stopCh := <-ct.stopCh:
  216. var data [16]byte
  217. binary.BigEndian.PutUint64(data[0:8], math.Float64bits(gfSum))
  218. binary.BigEndian.PutUint64(data[8:16], math.Float64bits(gfWeight))
  219. ct.db.Put([]byte(gfDbKey), data[:])
  220. log.Debug("global cost factor saved", "sum", time.Duration(gfSum), "weight", time.Duration(gfWeight))
  221. close(stopCh)
  222. return
  223. }
  224. }
  225. }()
  226. }
  227. // globalFactor returns the current value of the global cost factor
  228. func (ct *costTracker) globalFactor() float64 {
  229. ct.gfLock.RLock()
  230. defer ct.gfLock.RUnlock()
  231. return ct.gf
  232. }
  233. // totalRecharge returns the current total recharge parameter which is used by
  234. // flowcontrol.ClientManager and is scaled by the global cost factor
  235. func (ct *costTracker) totalRecharge() uint64 {
  236. ct.gfLock.RLock()
  237. defer ct.gfLock.RUnlock()
  238. return uint64(ct.gf * ct.utilTarget)
  239. }
  240. // subscribeTotalRecharge returns all future updates to the total recharge value
  241. // through a channel and also returns the current value
  242. func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 {
  243. ct.gfLock.Lock()
  244. defer ct.gfLock.Unlock()
  245. ct.totalRechargeCh = ch
  246. return uint64(ct.gf * ct.utilTarget)
  247. }
  248. // updateStats updates the global cost factor and (if enabled) the real cost vs.
  249. // average estimate statistics
  250. func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
  251. avg := reqAvgTimeCost[code]
  252. avgTime := avg.baseCost + amount*avg.reqCost
  253. select {
  254. case ct.gfUpdateCh <- gfUpdate{float64(avgTime), float64(servingTime)}:
  255. default:
  256. }
  257. if makeCostStats {
  258. realCost <<= 4
  259. l := 0
  260. for l < 9 && realCost > avgTime {
  261. l++
  262. realCost >>= 1
  263. }
  264. atomic.AddUint64(&ct.stats[code][l], 1)
  265. }
  266. }
  267. // realCost calculates the final cost of a request based on actual serving time,
  268. // incoming and outgoing message size
  269. //
  270. // Note: message size is only taken into account if bandwidth limitation is applied
  271. // and the cost based on either message size is greater than the cost based on
  272. // serving time. A maximum of the three costs is applied instead of their sum
  273. // because the three limited resources (serving thread time and i/o bandwidth) can
  274. // also be maxed out simultaneously.
  275. func (ct *costTracker) realCost(servingTime uint64, inSize, outSize uint32) uint64 {
  276. cost := float64(servingTime)
  277. inSizeCost := float64(inSize) * ct.inSizeFactor
  278. if inSizeCost > cost {
  279. cost = inSizeCost
  280. }
  281. outSizeCost := float64(outSize) * ct.outSizeFactor
  282. if outSizeCost > cost {
  283. cost = outSizeCost
  284. }
  285. return uint64(cost * ct.globalFactor())
  286. }
  287. // printStats prints the distribution of real request cost relative to the average estimates
  288. func (ct *costTracker) printStats() {
  289. if ct.stats == nil {
  290. return
  291. }
  292. for code, arr := range ct.stats {
  293. log.Info("Request cost statistics", "code", code, "1/16", arr[0], "1/8", arr[1], "1/4", arr[2], "1/2", arr[3], "1", arr[4], "2", arr[5], "4", arr[6], "8", arr[7], "16", arr[8], ">16", arr[9])
  294. }
  295. }
  296. type (
  297. // requestCostTable assigns a cost estimate function to each request type
  298. // which is a linear function of the requested amount
  299. // (cost = baseCost + reqCost * amount)
  300. requestCostTable map[uint64]*requestCosts
  301. requestCosts struct {
  302. baseCost, reqCost uint64
  303. }
  304. // RequestCostList is a list representation of request costs which is used for
  305. // database storage and communication through the network
  306. RequestCostList []requestCostListItem
  307. requestCostListItem struct {
  308. MsgCode, BaseCost, ReqCost uint64
  309. }
  310. )
  311. // getCost calculates the estimated cost for a given request type and amount
  312. func (table requestCostTable) getCost(code, amount uint64) uint64 {
  313. costs := table[code]
  314. return costs.baseCost + amount*costs.reqCost
  315. }
  316. // decode converts a cost list to a cost table
  317. func (list RequestCostList) decode() requestCostTable {
  318. table := make(requestCostTable)
  319. for _, e := range list {
  320. table[e.MsgCode] = &requestCosts{
  321. baseCost: e.BaseCost,
  322. reqCost: e.ReqCost,
  323. }
  324. }
  325. return table
  326. }
  327. // testCostList returns a dummy request cost list used by tests
  328. func testCostList() RequestCostList {
  329. cl := make(RequestCostList, len(reqAvgTimeCost))
  330. var max uint64
  331. for code := range reqAvgTimeCost {
  332. if code > max {
  333. max = code
  334. }
  335. }
  336. i := 0
  337. for code := uint64(0); code <= max; code++ {
  338. if _, ok := reqAvgTimeCost[code]; ok {
  339. cl[i].MsgCode = code
  340. cl[i].BaseCost = 0
  341. cl[i].ReqCost = 0
  342. i++
  343. }
  344. }
  345. return cl
  346. }