prioritypool.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package server
  17. import (
  18. "math"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/p2p/enode"
  26. "github.com/ethereum/go-ethereum/p2p/nodestate"
  27. )
  28. const (
  29. lazyQueueRefresh = time.Second * 10 // refresh period of the active queue
  30. )
  31. // PriorityPoolSetup contains node state flags and fields used by PriorityPool
  32. // Note: ActiveFlag and InactiveFlag can be controlled both externally and by the pool,
  33. // see PriorityPool description for details.
  34. type PriorityPoolSetup struct {
  35. // controlled by PriorityPool
  36. ActiveFlag, InactiveFlag nodestate.Flags
  37. CapacityField, ppNodeInfoField nodestate.Field
  38. // external connections
  39. updateFlag nodestate.Flags
  40. priorityField nodestate.Field
  41. }
  42. // NewPriorityPoolSetup creates a new PriorityPoolSetup and initializes the fields
  43. // and flags controlled by PriorityPool
  44. func NewPriorityPoolSetup(setup *nodestate.Setup) PriorityPoolSetup {
  45. return PriorityPoolSetup{
  46. ActiveFlag: setup.NewFlag("active"),
  47. InactiveFlag: setup.NewFlag("inactive"),
  48. CapacityField: setup.NewField("capacity", reflect.TypeOf(uint64(0))),
  49. ppNodeInfoField: setup.NewField("ppNodeInfo", reflect.TypeOf(&ppNodeInfo{})),
  50. }
  51. }
  52. // Connect sets the fields and flags used by PriorityPool as an input
  53. func (pps *PriorityPoolSetup) Connect(priorityField nodestate.Field, updateFlag nodestate.Flags) {
  54. pps.priorityField = priorityField // should implement nodePriority
  55. pps.updateFlag = updateFlag // triggers an immediate priority update
  56. }
  57. // PriorityPool handles a set of nodes where each node has a capacity (a scalar value)
  58. // and a priority (which can change over time and can also depend on the capacity).
  59. // A node is active if it has at least the necessary minimal amount of capacity while
  60. // inactive nodes have 0 capacity (values between 0 and the minimum are not allowed).
  61. // The pool ensures that the number and total capacity of all active nodes are limited
  62. // and the highest priority nodes are active at all times (limits can be changed
  63. // during operation with immediate effect).
  64. //
  65. // When activating clients a priority bias is applied in favor of the already active
  66. // nodes in order to avoid nodes quickly alternating between active and inactive states
  67. // when their priorities are close to each other. The bias is specified in terms of
  68. // duration (time) because priorities are expected to usually get lower over time and
  69. // therefore a future minimum prediction (see EstMinPriority) should monotonously
  70. // decrease with the specified time parameter.
  71. // This time bias can be interpreted as minimum expected active time at the given
  72. // capacity (if the threshold priority stays the same).
  73. //
  74. // Nodes in the pool always have either InactiveFlag or ActiveFlag set. A new node is
  75. // added to the pool by externally setting InactiveFlag. PriorityPool can switch a node
  76. // between InactiveFlag and ActiveFlag at any time. Nodes can be removed from the pool
  77. // by externally resetting both flags. ActiveFlag should not be set externally.
  78. //
  79. // The highest priority nodes in "inactive" state are moved to "active" state as soon as
  80. // the minimum capacity can be granted for them. The capacity of lower priority active
  81. // nodes is reduced or they are demoted to "inactive" state if their priority is
  82. // insufficient even at minimal capacity.
  83. type PriorityPool struct {
  84. PriorityPoolSetup
  85. ns *nodestate.NodeStateMachine
  86. clock mclock.Clock
  87. lock sync.Mutex
  88. activeQueue *prque.LazyQueue
  89. inactiveQueue *prque.Prque
  90. changed []*ppNodeInfo
  91. activeCount, activeCap uint64
  92. maxCount, maxCap uint64
  93. minCap uint64
  94. activeBias time.Duration
  95. capacityStepDiv uint64
  96. }
  97. // nodePriority interface provides current and estimated future priorities on demand
  98. type nodePriority interface {
  99. // Priority should return the current priority of the node (higher is better)
  100. Priority(now mclock.AbsTime, cap uint64) int64
  101. // EstMinPriority should return a lower estimate for the minimum of the node priority
  102. // value starting from the current moment until the given time. If the priority goes
  103. // under the returned estimate before the specified moment then it is the caller's
  104. // responsibility to signal with updateFlag.
  105. EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64
  106. }
  107. // ppNodeInfo is the internal node descriptor of PriorityPool
  108. type ppNodeInfo struct {
  109. nodePriority nodePriority
  110. node *enode.Node
  111. connected bool
  112. capacity, origCap uint64
  113. bias time.Duration
  114. forced, changed bool
  115. activeIndex, inactiveIndex int
  116. }
  117. // NewPriorityPool creates a new PriorityPool
  118. func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, clock mclock.Clock, minCap uint64, activeBias time.Duration, capacityStepDiv uint64) *PriorityPool {
  119. pp := &PriorityPool{
  120. ns: ns,
  121. PriorityPoolSetup: setup,
  122. clock: clock,
  123. activeQueue: prque.NewLazyQueue(activeSetIndex, activePriority, activeMaxPriority, clock, lazyQueueRefresh),
  124. inactiveQueue: prque.New(inactiveSetIndex),
  125. minCap: minCap,
  126. activeBias: activeBias,
  127. capacityStepDiv: capacityStepDiv,
  128. }
  129. ns.SubscribeField(pp.priorityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
  130. if newValue != nil {
  131. c := &ppNodeInfo{
  132. node: node,
  133. nodePriority: newValue.(nodePriority),
  134. activeIndex: -1,
  135. inactiveIndex: -1,
  136. }
  137. ns.SetFieldSub(node, pp.ppNodeInfoField, c)
  138. } else {
  139. ns.SetStateSub(node, nodestate.Flags{}, pp.ActiveFlag.Or(pp.InactiveFlag), 0)
  140. if n, _ := pp.ns.GetField(node, pp.ppNodeInfoField).(*ppNodeInfo); n != nil {
  141. pp.disconnectedNode(n)
  142. }
  143. ns.SetFieldSub(node, pp.CapacityField, nil)
  144. ns.SetFieldSub(node, pp.ppNodeInfoField, nil)
  145. }
  146. })
  147. ns.SubscribeState(pp.ActiveFlag.Or(pp.InactiveFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
  148. if c, _ := pp.ns.GetField(node, pp.ppNodeInfoField).(*ppNodeInfo); c != nil {
  149. if oldState.IsEmpty() {
  150. pp.connectedNode(c)
  151. }
  152. if newState.IsEmpty() {
  153. pp.disconnectedNode(c)
  154. }
  155. }
  156. })
  157. ns.SubscribeState(pp.updateFlag, func(node *enode.Node, oldState, newState nodestate.Flags) {
  158. if !newState.IsEmpty() {
  159. pp.updatePriority(node)
  160. }
  161. })
  162. return pp
  163. }
  164. // RequestCapacity checks whether changing the capacity of a node to the given target
  165. // is possible (bias is applied in favor of other active nodes if the target is higher
  166. // than the current capacity).
  167. // If setCap is true then it also performs the change if possible. The function returns
  168. // the minimum priority needed to do the change and whether it is currently allowed.
  169. // If setCap and allowed are both true then the caller can assume that the change was
  170. // successful.
  171. // Note: priorityField should always be set before calling RequestCapacity. If setCap
  172. // is false then both InactiveFlag and ActiveFlag can be unset and they are not changed
  173. // by this function call either.
  174. // Note 2: this function should run inside a NodeStateMachine operation
  175. func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias time.Duration, setCap bool) (minPriority int64, allowed bool) {
  176. pp.lock.Lock()
  177. pp.activeQueue.Refresh()
  178. var updates []capUpdate
  179. defer func() {
  180. pp.lock.Unlock()
  181. pp.updateFlags(updates)
  182. }()
  183. if targetCap < pp.minCap {
  184. targetCap = pp.minCap
  185. }
  186. c, _ := pp.ns.GetField(node, pp.ppNodeInfoField).(*ppNodeInfo)
  187. if c == nil {
  188. log.Error("RequestCapacity called for unknown node", "id", node.ID())
  189. return math.MaxInt64, false
  190. }
  191. var priority int64
  192. if targetCap > c.capacity {
  193. priority = c.nodePriority.EstMinPriority(pp.clock.Now()+mclock.AbsTime(bias), targetCap, false)
  194. } else {
  195. priority = c.nodePriority.Priority(pp.clock.Now(), targetCap)
  196. }
  197. pp.markForChange(c)
  198. pp.setCapacity(c, targetCap)
  199. c.forced = true
  200. pp.activeQueue.Remove(c.activeIndex)
  201. pp.inactiveQueue.Remove(c.inactiveIndex)
  202. pp.activeQueue.Push(c)
  203. minPriority = pp.enforceLimits()
  204. // if capacity update is possible now then minPriority == math.MinInt64
  205. // if it is not possible at all then minPriority == math.MaxInt64
  206. allowed = priority > minPriority
  207. updates = pp.finalizeChanges(setCap && allowed)
  208. return
  209. }
  210. // SetLimits sets the maximum number and total capacity of simultaneously active nodes
  211. func (pp *PriorityPool) SetLimits(maxCount, maxCap uint64) {
  212. pp.lock.Lock()
  213. pp.activeQueue.Refresh()
  214. var updates []capUpdate
  215. defer func() {
  216. pp.lock.Unlock()
  217. pp.ns.Operation(func() { pp.updateFlags(updates) })
  218. }()
  219. inc := (maxCount > pp.maxCount) || (maxCap > pp.maxCap)
  220. dec := (maxCount < pp.maxCount) || (maxCap < pp.maxCap)
  221. pp.maxCount, pp.maxCap = maxCount, maxCap
  222. if dec {
  223. pp.enforceLimits()
  224. updates = pp.finalizeChanges(true)
  225. }
  226. if inc {
  227. updates = pp.tryActivate()
  228. }
  229. }
  230. // SetActiveBias sets the bias applied when trying to activate inactive nodes
  231. func (pp *PriorityPool) SetActiveBias(bias time.Duration) {
  232. pp.lock.Lock()
  233. defer pp.lock.Unlock()
  234. pp.activeBias = bias
  235. pp.tryActivate()
  236. }
  237. // Active returns the number and total capacity of currently active nodes
  238. func (pp *PriorityPool) Active() (uint64, uint64) {
  239. pp.lock.Lock()
  240. defer pp.lock.Unlock()
  241. return pp.activeCount, pp.activeCap
  242. }
  243. // inactiveSetIndex callback updates ppNodeInfo item index in inactiveQueue
  244. func inactiveSetIndex(a interface{}, index int) {
  245. a.(*ppNodeInfo).inactiveIndex = index
  246. }
  247. // activeSetIndex callback updates ppNodeInfo item index in activeQueue
  248. func activeSetIndex(a interface{}, index int) {
  249. a.(*ppNodeInfo).activeIndex = index
  250. }
  251. // invertPriority inverts a priority value. The active queue uses inverted priorities
  252. // because the node on the top is the first to be deactivated.
  253. func invertPriority(p int64) int64 {
  254. if p == math.MinInt64 {
  255. return math.MaxInt64
  256. }
  257. return -p
  258. }
  259. // activePriority callback returns actual priority of ppNodeInfo item in activeQueue
  260. func activePriority(a interface{}, now mclock.AbsTime) int64 {
  261. c := a.(*ppNodeInfo)
  262. if c.forced {
  263. return math.MinInt64
  264. }
  265. if c.bias == 0 {
  266. return invertPriority(c.nodePriority.Priority(now, c.capacity))
  267. }
  268. return invertPriority(c.nodePriority.EstMinPriority(now+mclock.AbsTime(c.bias), c.capacity, true))
  269. }
  270. // activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue
  271. func activeMaxPriority(a interface{}, until mclock.AbsTime) int64 {
  272. c := a.(*ppNodeInfo)
  273. if c.forced {
  274. return math.MinInt64
  275. }
  276. return invertPriority(c.nodePriority.EstMinPriority(until+mclock.AbsTime(c.bias), c.capacity, false))
  277. }
  278. // inactivePriority callback returns actual priority of ppNodeInfo item in inactiveQueue
  279. func (pp *PriorityPool) inactivePriority(p *ppNodeInfo) int64 {
  280. return p.nodePriority.Priority(pp.clock.Now(), pp.minCap)
  281. }
  282. // connectedNode is called when a new node has been added to the pool (InactiveFlag set)
  283. // Note: this function should run inside a NodeStateMachine operation
  284. func (pp *PriorityPool) connectedNode(c *ppNodeInfo) {
  285. pp.lock.Lock()
  286. pp.activeQueue.Refresh()
  287. var updates []capUpdate
  288. defer func() {
  289. pp.lock.Unlock()
  290. pp.updateFlags(updates)
  291. }()
  292. if c.connected {
  293. return
  294. }
  295. c.connected = true
  296. pp.inactiveQueue.Push(c, pp.inactivePriority(c))
  297. updates = pp.tryActivate()
  298. }
  299. // disconnectedNode is called when a node has been removed from the pool (both InactiveFlag
  300. // and ActiveFlag reset)
  301. // Note: this function should run inside a NodeStateMachine operation
  302. func (pp *PriorityPool) disconnectedNode(c *ppNodeInfo) {
  303. pp.lock.Lock()
  304. pp.activeQueue.Refresh()
  305. var updates []capUpdate
  306. defer func() {
  307. pp.lock.Unlock()
  308. pp.updateFlags(updates)
  309. }()
  310. if !c.connected {
  311. return
  312. }
  313. c.connected = false
  314. pp.activeQueue.Remove(c.activeIndex)
  315. pp.inactiveQueue.Remove(c.inactiveIndex)
  316. if c.capacity != 0 {
  317. pp.setCapacity(c, 0)
  318. updates = pp.tryActivate()
  319. }
  320. }
  321. // markForChange internally puts a node in a temporary state that can either be reverted
  322. // or confirmed later. This temporary state allows changing the capacity of a node and
  323. // moving it between the active and inactive queue. ActiveFlag/InactiveFlag and
  324. // CapacityField are not changed while the changes are still temporary.
  325. func (pp *PriorityPool) markForChange(c *ppNodeInfo) {
  326. if c.changed {
  327. return
  328. }
  329. c.changed = true
  330. c.origCap = c.capacity
  331. pp.changed = append(pp.changed, c)
  332. }
  333. // setCapacity changes the capacity of a node and adjusts activeCap and activeCount
  334. // accordingly. Note that this change is performed in the temporary state so it should
  335. // be called after markForChange and before finalizeChanges.
  336. func (pp *PriorityPool) setCapacity(n *ppNodeInfo, cap uint64) {
  337. pp.activeCap += cap - n.capacity
  338. if n.capacity == 0 {
  339. pp.activeCount++
  340. }
  341. if cap == 0 {
  342. pp.activeCount--
  343. }
  344. n.capacity = cap
  345. }
  346. // enforceLimits enforces active node count and total capacity limits. It returns the
  347. // lowest active node priority. Note that this function is performed on the temporary
  348. // internal state.
  349. func (pp *PriorityPool) enforceLimits() int64 {
  350. if pp.activeCap <= pp.maxCap && pp.activeCount <= pp.maxCount {
  351. return math.MinInt64
  352. }
  353. var maxActivePriority int64
  354. pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool {
  355. c := data.(*ppNodeInfo)
  356. pp.markForChange(c)
  357. maxActivePriority = priority
  358. if c.capacity == pp.minCap {
  359. pp.setCapacity(c, 0)
  360. } else {
  361. sub := c.capacity / pp.capacityStepDiv
  362. if c.capacity-sub < pp.minCap {
  363. sub = c.capacity - pp.minCap
  364. }
  365. pp.setCapacity(c, c.capacity-sub)
  366. pp.activeQueue.Push(c)
  367. }
  368. return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount
  369. })
  370. return invertPriority(maxActivePriority)
  371. }
  372. // finalizeChanges either commits or reverts temporary changes. The necessary capacity
  373. // field and according flag updates are not performed here but returned in a list because
  374. // they should be performed while the mutex is not held.
  375. func (pp *PriorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
  376. for _, c := range pp.changed {
  377. // always remove and push back in order to update biased/forced priority
  378. pp.activeQueue.Remove(c.activeIndex)
  379. pp.inactiveQueue.Remove(c.inactiveIndex)
  380. c.bias = 0
  381. c.forced = false
  382. c.changed = false
  383. if !commit {
  384. pp.setCapacity(c, c.origCap)
  385. }
  386. if c.connected {
  387. if c.capacity != 0 {
  388. pp.activeQueue.Push(c)
  389. } else {
  390. pp.inactiveQueue.Push(c, pp.inactivePriority(c))
  391. }
  392. if c.capacity != c.origCap && commit {
  393. updates = append(updates, capUpdate{c.node, c.origCap, c.capacity})
  394. }
  395. }
  396. c.origCap = 0
  397. }
  398. pp.changed = nil
  399. return
  400. }
  401. // capUpdate describes a CapacityField and ActiveFlag/InactiveFlag update
  402. type capUpdate struct {
  403. node *enode.Node
  404. oldCap, newCap uint64
  405. }
  406. // updateFlags performs CapacityField and ActiveFlag/InactiveFlag updates while the
  407. // pool mutex is not held
  408. // Note: this function should run inside a NodeStateMachine operation
  409. func (pp *PriorityPool) updateFlags(updates []capUpdate) {
  410. for _, f := range updates {
  411. if f.oldCap == 0 {
  412. pp.ns.SetStateSub(f.node, pp.ActiveFlag, pp.InactiveFlag, 0)
  413. }
  414. if f.newCap == 0 {
  415. pp.ns.SetStateSub(f.node, pp.InactiveFlag, pp.ActiveFlag, 0)
  416. pp.ns.SetFieldSub(f.node, pp.CapacityField, nil)
  417. } else {
  418. pp.ns.SetFieldSub(f.node, pp.CapacityField, f.newCap)
  419. }
  420. }
  421. }
  422. // tryActivate tries to activate inactive nodes if possible
  423. func (pp *PriorityPool) tryActivate() []capUpdate {
  424. var commit bool
  425. for pp.inactiveQueue.Size() > 0 {
  426. c := pp.inactiveQueue.PopItem().(*ppNodeInfo)
  427. pp.markForChange(c)
  428. pp.setCapacity(c, pp.minCap)
  429. c.bias = pp.activeBias
  430. pp.activeQueue.Push(c)
  431. pp.enforceLimits()
  432. if c.capacity > 0 {
  433. commit = true
  434. } else {
  435. break
  436. }
  437. }
  438. return pp.finalizeChanges(commit)
  439. }
  440. // updatePriority gets the current priority value of the given node from the nodePriority
  441. // interface and performs the necessary changes. It is triggered by updateFlag.
  442. // Note: this function should run inside a NodeStateMachine operation
  443. func (pp *PriorityPool) updatePriority(node *enode.Node) {
  444. pp.lock.Lock()
  445. pp.activeQueue.Refresh()
  446. var updates []capUpdate
  447. defer func() {
  448. pp.lock.Unlock()
  449. pp.updateFlags(updates)
  450. }()
  451. c, _ := pp.ns.GetField(node, pp.ppNodeInfoField).(*ppNodeInfo)
  452. if c == nil || !c.connected {
  453. return
  454. }
  455. pp.activeQueue.Remove(c.activeIndex)
  456. pp.inactiveQueue.Remove(c.inactiveIndex)
  457. if c.capacity != 0 {
  458. pp.activeQueue.Push(c)
  459. } else {
  460. pp.inactiveQueue.Push(c, pp.inactivePriority(c))
  461. }
  462. updates = pp.tryActivate()
  463. }