valuetracker.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package client
  17. import (
  18. "bytes"
  19. "fmt"
  20. "math"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/les/utils"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/rlp"
  29. )
  30. const (
  31. vtVersion = 1 // database encoding format for ValueTracker
  32. nvtVersion = 1 // database encoding format for NodeValueTracker
  33. )
  34. var (
  35. vtKey = []byte("vt:")
  36. vtNodeKey = []byte("vtNode:")
  37. )
  38. // NodeValueTracker collects service value statistics for a specific server node
  39. type NodeValueTracker struct {
  40. lock sync.Mutex
  41. rtStats, lastRtStats ResponseTimeStats
  42. lastTransfer mclock.AbsTime
  43. basket serverBasket
  44. reqCosts []uint64
  45. reqValues *[]float64
  46. }
  47. // init initializes a NodeValueTracker.
  48. // Note that the contents of the referenced reqValues slice will not change; a new
  49. // reference is passed if the values are updated by ValueTracker.
  50. func (nv *NodeValueTracker) init(now mclock.AbsTime, reqValues *[]float64) {
  51. reqTypeCount := len(*reqValues)
  52. nv.reqCosts = make([]uint64, reqTypeCount)
  53. nv.lastTransfer = now
  54. nv.reqValues = reqValues
  55. nv.basket.init(reqTypeCount)
  56. }
  57. // updateCosts updates the request cost table of the server. The request value factor
  58. // is also updated based on the given cost table and the current reference basket.
  59. // Note that the contents of the referenced reqValues slice will not change; a new
  60. // reference is passed if the values are updated by ValueTracker.
  61. func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) {
  62. nv.lock.Lock()
  63. defer nv.lock.Unlock()
  64. nv.reqCosts = reqCosts
  65. nv.reqValues = reqValues
  66. nv.basket.updateRvFactor(rvFactor)
  67. }
  68. // transferStats returns request basket and response time statistics that should be
  69. // added to the global statistics. The contents of the server's own request basket are
  70. // gradually transferred to the main reference basket and removed from the server basket
  71. // with the specified transfer rate.
  72. // The response time statistics are retained at both places and therefore the global
  73. // distribution is always the sum of the individual server distributions.
  74. func (nv *NodeValueTracker) transferStats(now mclock.AbsTime, transferRate float64) (requestBasket, ResponseTimeStats) {
  75. nv.lock.Lock()
  76. defer nv.lock.Unlock()
  77. dt := now - nv.lastTransfer
  78. nv.lastTransfer = now
  79. if dt < 0 {
  80. dt = 0
  81. }
  82. recentRtStats := nv.rtStats
  83. recentRtStats.SubStats(&nv.lastRtStats)
  84. nv.lastRtStats = nv.rtStats
  85. return nv.basket.transfer(-math.Expm1(-transferRate * float64(dt))), recentRtStats
  86. }
  87. // RtStats returns the node's own response time distribution statistics
  88. func (nv *NodeValueTracker) RtStats() ResponseTimeStats {
  89. nv.lock.Lock()
  90. defer nv.lock.Unlock()
  91. return nv.rtStats
  92. }
  93. // ValueTracker coordinates service value calculation for individual servers and updates
  94. // global statistics
  95. type ValueTracker struct {
  96. clock mclock.Clock
  97. lock sync.Mutex
  98. quit chan chan struct{}
  99. db ethdb.KeyValueStore
  100. connected map[enode.ID]*NodeValueTracker
  101. reqTypeCount int
  102. refBasket referenceBasket
  103. mappings [][]string
  104. currentMapping int
  105. initRefBasket requestBasket
  106. rtStats ResponseTimeStats
  107. transferRate float64
  108. statsExpLock sync.RWMutex
  109. statsExpRate, offlineExpRate float64
  110. statsExpirer utils.Expirer
  111. statsExpFactor utils.ExpirationFactor
  112. }
  113. type valueTrackerEncV1 struct {
  114. Mappings [][]string
  115. RefBasketMapping uint
  116. RefBasket requestBasket
  117. RtStats ResponseTimeStats
  118. ExpOffset, SavedAt uint64
  119. }
  120. type nodeValueTrackerEncV1 struct {
  121. RtStats ResponseTimeStats
  122. ServerBasketMapping uint
  123. ServerBasket requestBasket
  124. }
  125. // RequestInfo is an initializer structure for the service vector.
  126. type RequestInfo struct {
  127. // Name identifies the request type and is used for re-mapping the service vector if necessary
  128. Name string
  129. // InitAmount and InitValue are used to initialize the reference basket
  130. InitAmount, InitValue float64
  131. }
  132. // NewValueTracker creates a new ValueTracker and loads its previously saved state from
  133. // the database if possible.
  134. func NewValueTracker(db ethdb.KeyValueStore, clock mclock.Clock, reqInfo []RequestInfo, updatePeriod time.Duration, transferRate, statsExpRate, offlineExpRate float64) *ValueTracker {
  135. now := clock.Now()
  136. initRefBasket := requestBasket{items: make([]basketItem, len(reqInfo))}
  137. mapping := make([]string, len(reqInfo))
  138. var sumAmount, sumValue float64
  139. for _, req := range reqInfo {
  140. sumAmount += req.InitAmount
  141. sumValue += req.InitAmount * req.InitValue
  142. }
  143. scaleValues := sumAmount * basketFactor / sumValue
  144. for i, req := range reqInfo {
  145. mapping[i] = req.Name
  146. initRefBasket.items[i].amount = uint64(req.InitAmount * basketFactor)
  147. initRefBasket.items[i].value = uint64(req.InitAmount * req.InitValue * scaleValues)
  148. }
  149. vt := &ValueTracker{
  150. clock: clock,
  151. connected: make(map[enode.ID]*NodeValueTracker),
  152. quit: make(chan chan struct{}),
  153. db: db,
  154. reqTypeCount: len(initRefBasket.items),
  155. initRefBasket: initRefBasket,
  156. transferRate: transferRate,
  157. statsExpRate: statsExpRate,
  158. offlineExpRate: offlineExpRate,
  159. }
  160. if vt.loadFromDb(mapping) != nil {
  161. // previous state not saved or invalid, init with default values
  162. vt.refBasket.basket = initRefBasket
  163. vt.mappings = [][]string{mapping}
  164. vt.currentMapping = 0
  165. }
  166. vt.statsExpirer.SetRate(now, statsExpRate)
  167. vt.refBasket.init(vt.reqTypeCount)
  168. vt.periodicUpdate()
  169. go func() {
  170. for {
  171. select {
  172. case <-clock.After(updatePeriod):
  173. vt.lock.Lock()
  174. vt.periodicUpdate()
  175. vt.lock.Unlock()
  176. case quit := <-vt.quit:
  177. close(quit)
  178. return
  179. }
  180. }
  181. }()
  182. return vt
  183. }
  184. // StatsExpirer returns the statistics expirer so that other values can be expired
  185. // with the same rate as the service value statistics.
  186. func (vt *ValueTracker) StatsExpirer() *utils.Expirer {
  187. return &vt.statsExpirer
  188. }
  189. // StatsExpirer returns the current expiration factor so that other values can be expired
  190. // with the same rate as the service value statistics.
  191. func (vt *ValueTracker) StatsExpFactor() utils.ExpirationFactor {
  192. vt.statsExpLock.RLock()
  193. defer vt.statsExpLock.RUnlock()
  194. return vt.statsExpFactor
  195. }
  196. // loadFromDb loads the value tracker's state from the database and converts saved
  197. // request basket index mapping if it does not match the specified index to name mapping.
  198. func (vt *ValueTracker) loadFromDb(mapping []string) error {
  199. enc, err := vt.db.Get(vtKey)
  200. if err != nil {
  201. return err
  202. }
  203. r := bytes.NewReader(enc)
  204. var version uint
  205. if err := rlp.Decode(r, &version); err != nil {
  206. log.Error("Decoding value tracker state failed", "err", err)
  207. return err
  208. }
  209. if version != vtVersion {
  210. log.Error("Unknown ValueTracker version", "stored", version, "current", nvtVersion)
  211. return fmt.Errorf("Unknown ValueTracker version %d (current version is %d)", version, vtVersion)
  212. }
  213. var vte valueTrackerEncV1
  214. if err := rlp.Decode(r, &vte); err != nil {
  215. log.Error("Decoding value tracker state failed", "err", err)
  216. return err
  217. }
  218. logOffset := utils.Fixed64(vte.ExpOffset)
  219. dt := time.Now().UnixNano() - int64(vte.SavedAt)
  220. if dt > 0 {
  221. logOffset += utils.Float64ToFixed64(float64(dt) * vt.offlineExpRate / math.Log(2))
  222. }
  223. vt.statsExpirer.SetLogOffset(vt.clock.Now(), logOffset)
  224. vt.rtStats = vte.RtStats
  225. vt.mappings = vte.Mappings
  226. vt.currentMapping = -1
  227. loop:
  228. for i, m := range vt.mappings {
  229. if len(m) != len(mapping) {
  230. continue loop
  231. }
  232. for j, s := range mapping {
  233. if m[j] != s {
  234. continue loop
  235. }
  236. }
  237. vt.currentMapping = i
  238. break
  239. }
  240. if vt.currentMapping == -1 {
  241. vt.currentMapping = len(vt.mappings)
  242. vt.mappings = append(vt.mappings, mapping)
  243. }
  244. if int(vte.RefBasketMapping) == vt.currentMapping {
  245. vt.refBasket.basket = vte.RefBasket
  246. } else {
  247. if vte.RefBasketMapping >= uint(len(vt.mappings)) {
  248. log.Error("Unknown request basket mapping", "stored", vte.RefBasketMapping, "current", vt.currentMapping)
  249. return fmt.Errorf("Unknown request basket mapping %d (current version is %d)", vte.RefBasketMapping, vt.currentMapping)
  250. }
  251. vt.refBasket.basket = vte.RefBasket.convertMapping(vt.mappings[vte.RefBasketMapping], mapping, vt.initRefBasket)
  252. }
  253. return nil
  254. }
  255. // saveToDb saves the value tracker's state to the database
  256. func (vt *ValueTracker) saveToDb() {
  257. vte := valueTrackerEncV1{
  258. Mappings: vt.mappings,
  259. RefBasketMapping: uint(vt.currentMapping),
  260. RefBasket: vt.refBasket.basket,
  261. RtStats: vt.rtStats,
  262. ExpOffset: uint64(vt.statsExpirer.LogOffset(vt.clock.Now())),
  263. SavedAt: uint64(time.Now().UnixNano()),
  264. }
  265. enc1, err := rlp.EncodeToBytes(uint(vtVersion))
  266. if err != nil {
  267. log.Error("Encoding value tracker state failed", "err", err)
  268. return
  269. }
  270. enc2, err := rlp.EncodeToBytes(&vte)
  271. if err != nil {
  272. log.Error("Encoding value tracker state failed", "err", err)
  273. return
  274. }
  275. if err := vt.db.Put(vtKey, append(enc1, enc2...)); err != nil {
  276. log.Error("Saving value tracker state failed", "err", err)
  277. }
  278. }
  279. // Stop saves the value tracker's state and each loaded node's individual state and
  280. // returns after shutting the internal goroutines down.
  281. func (vt *ValueTracker) Stop() {
  282. quit := make(chan struct{})
  283. vt.quit <- quit
  284. <-quit
  285. vt.lock.Lock()
  286. vt.periodicUpdate()
  287. for id, nv := range vt.connected {
  288. vt.saveNode(id, nv)
  289. }
  290. vt.connected = nil
  291. vt.saveToDb()
  292. vt.lock.Unlock()
  293. }
  294. // Register adds a server node to the value tracker
  295. func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
  296. vt.lock.Lock()
  297. defer vt.lock.Unlock()
  298. if vt.connected == nil {
  299. // ValueTracker has already been stopped
  300. return nil
  301. }
  302. nv := vt.loadOrNewNode(id)
  303. nv.init(vt.clock.Now(), &vt.refBasket.reqValues)
  304. vt.connected[id] = nv
  305. return nv
  306. }
  307. // Unregister removes a server node from the value tracker
  308. func (vt *ValueTracker) Unregister(id enode.ID) {
  309. vt.lock.Lock()
  310. defer vt.lock.Unlock()
  311. if nv := vt.connected[id]; nv != nil {
  312. vt.saveNode(id, nv)
  313. delete(vt.connected, id)
  314. }
  315. }
  316. // GetNode returns an individual server node's value tracker. If it did not exist before
  317. // then a new node is created.
  318. func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker {
  319. vt.lock.Lock()
  320. defer vt.lock.Unlock()
  321. return vt.loadOrNewNode(id)
  322. }
  323. // loadOrNewNode returns an individual server node's value tracker. If it did not exist before
  324. // then a new node is created.
  325. func (vt *ValueTracker) loadOrNewNode(id enode.ID) *NodeValueTracker {
  326. if nv, ok := vt.connected[id]; ok {
  327. return nv
  328. }
  329. nv := &NodeValueTracker{lastTransfer: vt.clock.Now()}
  330. enc, err := vt.db.Get(append(vtNodeKey, id[:]...))
  331. if err != nil {
  332. return nv
  333. }
  334. r := bytes.NewReader(enc)
  335. var version uint
  336. if err := rlp.Decode(r, &version); err != nil {
  337. log.Error("Failed to decode node value tracker", "id", id, "err", err)
  338. return nv
  339. }
  340. if version != nvtVersion {
  341. log.Error("Unknown NodeValueTracker version", "stored", version, "current", nvtVersion)
  342. return nv
  343. }
  344. var nve nodeValueTrackerEncV1
  345. if err := rlp.Decode(r, &nve); err != nil {
  346. log.Error("Failed to decode node value tracker", "id", id, "err", err)
  347. return nv
  348. }
  349. nv.rtStats = nve.RtStats
  350. nv.lastRtStats = nve.RtStats
  351. if int(nve.ServerBasketMapping) == vt.currentMapping {
  352. nv.basket.basket = nve.ServerBasket
  353. } else {
  354. if nve.ServerBasketMapping >= uint(len(vt.mappings)) {
  355. log.Error("Unknown request basket mapping", "stored", nve.ServerBasketMapping, "current", vt.currentMapping)
  356. return nv
  357. }
  358. nv.basket.basket = nve.ServerBasket.convertMapping(vt.mappings[nve.ServerBasketMapping], vt.mappings[vt.currentMapping], vt.initRefBasket)
  359. }
  360. return nv
  361. }
  362. // saveNode saves a server node's value tracker to the database
  363. func (vt *ValueTracker) saveNode(id enode.ID, nv *NodeValueTracker) {
  364. recentRtStats := nv.rtStats
  365. recentRtStats.SubStats(&nv.lastRtStats)
  366. vt.rtStats.AddStats(&recentRtStats)
  367. nv.lastRtStats = nv.rtStats
  368. nve := nodeValueTrackerEncV1{
  369. RtStats: nv.rtStats,
  370. ServerBasketMapping: uint(vt.currentMapping),
  371. ServerBasket: nv.basket.basket,
  372. }
  373. enc1, err := rlp.EncodeToBytes(uint(nvtVersion))
  374. if err != nil {
  375. log.Error("Failed to encode service value information", "id", id, "err", err)
  376. return
  377. }
  378. enc2, err := rlp.EncodeToBytes(&nve)
  379. if err != nil {
  380. log.Error("Failed to encode service value information", "id", id, "err", err)
  381. return
  382. }
  383. if err := vt.db.Put(append(vtNodeKey, id[:]...), append(enc1, enc2...)); err != nil {
  384. log.Error("Failed to save service value information", "id", id, "err", err)
  385. }
  386. }
  387. // UpdateCosts updates the node value tracker's request cost table
  388. func (vt *ValueTracker) UpdateCosts(nv *NodeValueTracker, reqCosts []uint64) {
  389. vt.lock.Lock()
  390. defer vt.lock.Unlock()
  391. nv.updateCosts(reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(reqCosts))
  392. }
  393. // RtStats returns the global response time distribution statistics
  394. func (vt *ValueTracker) RtStats() ResponseTimeStats {
  395. vt.lock.Lock()
  396. defer vt.lock.Unlock()
  397. vt.periodicUpdate()
  398. return vt.rtStats
  399. }
  400. // periodicUpdate transfers individual node data to the global statistics, normalizes
  401. // the reference basket and updates request values. The global state is also saved to
  402. // the database with each update.
  403. func (vt *ValueTracker) periodicUpdate() {
  404. now := vt.clock.Now()
  405. vt.statsExpLock.Lock()
  406. vt.statsExpFactor = utils.ExpFactor(vt.statsExpirer.LogOffset(now))
  407. vt.statsExpLock.Unlock()
  408. for _, nv := range vt.connected {
  409. basket, rtStats := nv.transferStats(now, vt.transferRate)
  410. vt.refBasket.add(basket)
  411. vt.rtStats.AddStats(&rtStats)
  412. }
  413. vt.refBasket.normalize()
  414. vt.refBasket.updateReqValues()
  415. for _, nv := range vt.connected {
  416. nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
  417. }
  418. vt.saveToDb()
  419. }
  420. type ServedRequest struct {
  421. ReqType, Amount uint32
  422. }
  423. // Served adds a served request to the node's statistics. An actual request may be composed
  424. // of one or more request types (service vector indices).
  425. func (vt *ValueTracker) Served(nv *NodeValueTracker, reqs []ServedRequest, respTime time.Duration) {
  426. vt.statsExpLock.RLock()
  427. expFactor := vt.statsExpFactor
  428. vt.statsExpLock.RUnlock()
  429. nv.lock.Lock()
  430. defer nv.lock.Unlock()
  431. var value float64
  432. for _, r := range reqs {
  433. nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
  434. value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
  435. }
  436. nv.rtStats.Add(respTime, value, vt.statsExpFactor)
  437. }
  438. type RequestStatsItem struct {
  439. Name string
  440. ReqAmount, ReqValue float64
  441. }
  442. // RequestStats returns the current contents of the reference request basket, with
  443. // request values meaning average per request rather than total.
  444. func (vt *ValueTracker) RequestStats() []RequestStatsItem {
  445. vt.statsExpLock.RLock()
  446. expFactor := vt.statsExpFactor
  447. vt.statsExpLock.RUnlock()
  448. vt.lock.Lock()
  449. defer vt.lock.Unlock()
  450. vt.periodicUpdate()
  451. res := make([]RequestStatsItem, len(vt.refBasket.basket.items))
  452. for i, item := range vt.refBasket.basket.items {
  453. res[i].Name = vt.mappings[vt.currentMapping][i]
  454. res[i].ReqAmount = expFactor.Value(float64(item.amount)/basketFactor, vt.refBasket.basket.exp)
  455. res[i].ReqValue = vt.refBasket.reqValues[i]
  456. }
  457. return res
  458. }