balance_tracker.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package server
  17. import (
  18. "reflect"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/ethdb"
  23. "github.com/ethereum/go-ethereum/les/utils"
  24. "github.com/ethereum/go-ethereum/p2p/enode"
  25. "github.com/ethereum/go-ethereum/p2p/nodestate"
  26. )
  27. const (
  28. posThreshold = 1000000 // minimum positive balance that is persisted in the database
  29. negThreshold = 1000000 // minimum negative balance that is persisted in the database
  30. persistExpirationRefresh = time.Minute * 5 // refresh period of the token expiration persistence
  31. )
  32. // BalanceTrackerSetup contains node state flags and fields used by BalanceTracker
  33. type BalanceTrackerSetup struct {
  34. // controlled by PriorityPool
  35. PriorityFlag, UpdateFlag nodestate.Flags
  36. BalanceField nodestate.Field
  37. // external connections
  38. connAddressField, capacityField nodestate.Field
  39. }
  40. // NewBalanceTrackerSetup creates a new BalanceTrackerSetup and initializes the fields
  41. // and flags controlled by BalanceTracker
  42. func NewBalanceTrackerSetup(setup *nodestate.Setup) BalanceTrackerSetup {
  43. return BalanceTrackerSetup{
  44. // PriorityFlag is set if the node has a positive balance
  45. PriorityFlag: setup.NewFlag("priorityNode"),
  46. // UpdateFlag set and then immediately reset if the balance has been updated and
  47. // therefore priority is suddenly changed
  48. UpdateFlag: setup.NewFlag("balanceUpdate"),
  49. // BalanceField contains the NodeBalance struct which implements nodePriority,
  50. // allowing on-demand priority calculation and future priority estimation
  51. BalanceField: setup.NewField("balance", reflect.TypeOf(&NodeBalance{})),
  52. }
  53. }
  54. // Connect sets the fields used by BalanceTracker as an input
  55. func (bts *BalanceTrackerSetup) Connect(connAddressField, capacityField nodestate.Field) {
  56. bts.connAddressField = connAddressField
  57. bts.capacityField = capacityField
  58. }
  59. // BalanceTracker tracks positive and negative balances for connected nodes.
  60. // After connAddressField is set externally, a NodeBalance is created and previous
  61. // balance values are loaded from the database. Both balances are exponentially expired
  62. // values. Costs are deducted from the positive balance if present, otherwise added to
  63. // the negative balance. If the capacity is non-zero then a time cost is applied
  64. // continuously while individual request costs are applied immediately.
  65. // The two balances are translated into a single priority value that also depends
  66. // on the actual capacity.
  67. type BalanceTracker struct {
  68. BalanceTrackerSetup
  69. clock mclock.Clock
  70. lock sync.Mutex
  71. ns *nodestate.NodeStateMachine
  72. ndb *nodeDB
  73. posExp, negExp utils.ValueExpirer
  74. posExpTC, negExpTC uint64
  75. active, inactive utils.ExpiredValue
  76. balanceTimer *utils.UpdateTimer
  77. quit chan struct{}
  78. }
  79. // NewBalanceTracker creates a new BalanceTracker
  80. func NewBalanceTracker(ns *nodestate.NodeStateMachine, setup BalanceTrackerSetup, db ethdb.KeyValueStore, clock mclock.Clock, posExp, negExp utils.ValueExpirer) *BalanceTracker {
  81. ndb := newNodeDB(db, clock)
  82. bt := &BalanceTracker{
  83. ns: ns,
  84. BalanceTrackerSetup: setup,
  85. ndb: ndb,
  86. clock: clock,
  87. posExp: posExp,
  88. negExp: negExp,
  89. balanceTimer: utils.NewUpdateTimer(clock, time.Second*10),
  90. quit: make(chan struct{}),
  91. }
  92. bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool {
  93. bt.inactive.AddExp(balance)
  94. return true
  95. })
  96. ns.SubscribeField(bt.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  97. n, _ := ns.GetField(node, bt.BalanceField).(*NodeBalance)
  98. if n == nil {
  99. return
  100. }
  101. ov, _ := oldValue.(uint64)
  102. nv, _ := newValue.(uint64)
  103. if ov == 0 && nv != 0 {
  104. n.activate()
  105. }
  106. if nv != 0 {
  107. n.setCapacity(nv)
  108. }
  109. if ov != 0 && nv == 0 {
  110. n.deactivate()
  111. }
  112. })
  113. ns.SubscribeField(bt.connAddressField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  114. if newValue != nil {
  115. ns.SetFieldSub(node, bt.BalanceField, bt.newNodeBalance(node, newValue.(string)))
  116. } else {
  117. ns.SetStateSub(node, nodestate.Flags{}, bt.PriorityFlag, 0)
  118. if b, _ := ns.GetField(node, bt.BalanceField).(*NodeBalance); b != nil {
  119. b.deactivate()
  120. }
  121. ns.SetFieldSub(node, bt.BalanceField, nil)
  122. }
  123. })
  124. // The positive and negative balances of clients are stored in database
  125. // and both of these decay exponentially over time. Delete them if the
  126. // value is small enough.
  127. bt.ndb.evictCallBack = bt.canDropBalance
  128. go func() {
  129. for {
  130. select {
  131. case <-clock.After(persistExpirationRefresh):
  132. now := clock.Now()
  133. bt.ndb.setExpiration(posExp.LogOffset(now), negExp.LogOffset(now))
  134. case <-bt.quit:
  135. return
  136. }
  137. }
  138. }()
  139. return bt
  140. }
  141. // Stop saves expiration offset and unsaved node balances and shuts BalanceTracker down
  142. func (bt *BalanceTracker) Stop() {
  143. now := bt.clock.Now()
  144. bt.ndb.setExpiration(bt.posExp.LogOffset(now), bt.negExp.LogOffset(now))
  145. close(bt.quit)
  146. bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  147. if n, ok := bt.ns.GetField(node, bt.BalanceField).(*NodeBalance); ok {
  148. n.lock.Lock()
  149. n.storeBalance(true, true)
  150. n.lock.Unlock()
  151. bt.ns.SetField(node, bt.BalanceField, nil)
  152. }
  153. })
  154. bt.ndb.close()
  155. }
  156. // TotalTokenAmount returns the current total amount of service tokens in existence
  157. func (bt *BalanceTracker) TotalTokenAmount() uint64 {
  158. bt.lock.Lock()
  159. defer bt.lock.Unlock()
  160. bt.balanceTimer.Update(func(_ time.Duration) bool {
  161. bt.active = utils.ExpiredValue{}
  162. bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  163. if n, ok := bt.ns.GetField(node, bt.BalanceField).(*NodeBalance); ok {
  164. pos, _ := n.GetRawBalance()
  165. bt.active.AddExp(pos)
  166. }
  167. })
  168. return true
  169. })
  170. total := bt.active
  171. total.AddExp(bt.inactive)
  172. return total.Value(bt.posExp.LogOffset(bt.clock.Now()))
  173. }
  174. // GetPosBalanceIDs lists node IDs with an associated positive balance
  175. func (bt *BalanceTracker) GetPosBalanceIDs(start, stop enode.ID, maxCount int) (result []enode.ID) {
  176. return bt.ndb.getPosBalanceIDs(start, stop, maxCount)
  177. }
  178. // SetExpirationTCs sets positive and negative token expiration time constants.
  179. // Specified in seconds, 0 means infinite (no expiration).
  180. func (bt *BalanceTracker) SetExpirationTCs(pos, neg uint64) {
  181. bt.lock.Lock()
  182. defer bt.lock.Unlock()
  183. bt.posExpTC, bt.negExpTC = pos, neg
  184. now := bt.clock.Now()
  185. if pos > 0 {
  186. bt.posExp.SetRate(now, 1/float64(pos*uint64(time.Second)))
  187. } else {
  188. bt.posExp.SetRate(now, 0)
  189. }
  190. if neg > 0 {
  191. bt.negExp.SetRate(now, 1/float64(neg*uint64(time.Second)))
  192. } else {
  193. bt.negExp.SetRate(now, 0)
  194. }
  195. }
  196. // GetExpirationTCs returns the current positive and negative token expiration
  197. // time constants
  198. func (bt *BalanceTracker) GetExpirationTCs() (pos, neg uint64) {
  199. bt.lock.Lock()
  200. defer bt.lock.Unlock()
  201. return bt.posExpTC, bt.negExpTC
  202. }
  203. // newNodeBalance loads balances from the database and creates a NodeBalance instance
  204. // for the given node. It also sets the PriorityFlag and adds balanceCallbackZero if
  205. // the node has a positive balance.
  206. // Note: this function should run inside a NodeStateMachine operation
  207. func (bt *BalanceTracker) newNodeBalance(node *enode.Node, negBalanceKey string) *NodeBalance {
  208. pb := bt.ndb.getOrNewBalance(node.ID().Bytes(), false)
  209. nb := bt.ndb.getOrNewBalance([]byte(negBalanceKey), true)
  210. n := &NodeBalance{
  211. bt: bt,
  212. node: node,
  213. connAddress: negBalanceKey,
  214. balance: balance{pos: pb, neg: nb},
  215. initTime: bt.clock.Now(),
  216. lastUpdate: bt.clock.Now(),
  217. }
  218. for i := range n.callbackIndex {
  219. n.callbackIndex[i] = -1
  220. }
  221. if n.checkPriorityStatus() {
  222. n.bt.ns.SetStateSub(n.node, n.bt.PriorityFlag, nodestate.Flags{}, 0)
  223. }
  224. return n
  225. }
  226. // storeBalance stores either a positive or a negative balance in the database
  227. func (bt *BalanceTracker) storeBalance(id []byte, neg bool, value utils.ExpiredValue) {
  228. if bt.canDropBalance(bt.clock.Now(), neg, value) {
  229. bt.ndb.delBalance(id, neg) // balance is small enough, drop it directly.
  230. } else {
  231. bt.ndb.setBalance(id, neg, value)
  232. }
  233. }
  234. // canDropBalance tells whether a positive or negative balance is below the threshold
  235. // and therefore can be dropped from the database
  236. func (bt *BalanceTracker) canDropBalance(now mclock.AbsTime, neg bool, b utils.ExpiredValue) bool {
  237. if neg {
  238. return b.Value(bt.negExp.LogOffset(now)) <= negThreshold
  239. }
  240. return b.Value(bt.posExp.LogOffset(now)) <= posThreshold
  241. }
  242. // updateTotalBalance adjusts the total balance after executing given callback.
  243. func (bt *BalanceTracker) updateTotalBalance(n *NodeBalance, callback func() bool) {
  244. bt.lock.Lock()
  245. defer bt.lock.Unlock()
  246. n.lock.Lock()
  247. defer n.lock.Unlock()
  248. original, active := n.balance.pos, n.active
  249. if !callback() {
  250. return
  251. }
  252. if active {
  253. bt.active.SubExp(original)
  254. } else {
  255. bt.inactive.SubExp(original)
  256. }
  257. if n.active {
  258. bt.active.AddExp(n.balance.pos)
  259. } else {
  260. bt.inactive.AddExp(n.balance.pos)
  261. }
  262. }