manager.go 5.6 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package flowcontrol implements a client side flow control mechanism
  17. package flowcontrol
  18. import (
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. )
  23. const rcConst = 1000000
  24. type cmNode struct {
  25. node *ClientNode
  26. lastUpdate mclock.AbsTime
  27. serving, recharging bool
  28. rcWeight uint64
  29. rcValue, rcDelta, startValue int64
  30. finishRecharge mclock.AbsTime
  31. }
  32. func (node *cmNode) update(time mclock.AbsTime) {
  33. dt := int64(time - node.lastUpdate)
  34. node.rcValue += node.rcDelta * dt / rcConst
  35. node.lastUpdate = time
  36. if node.recharging && time >= node.finishRecharge {
  37. node.recharging = false
  38. node.rcDelta = 0
  39. node.rcValue = 0
  40. }
  41. }
  42. func (node *cmNode) set(serving bool, simReqCnt, sumWeight uint64) {
  43. if node.serving && !serving {
  44. node.recharging = true
  45. sumWeight += node.rcWeight
  46. }
  47. node.serving = serving
  48. if node.recharging && serving {
  49. node.recharging = false
  50. sumWeight -= node.rcWeight
  51. }
  52. node.rcDelta = 0
  53. if serving {
  54. node.rcDelta = int64(rcConst / simReqCnt)
  55. }
  56. if node.recharging {
  57. node.rcDelta = -int64(node.node.cm.rcRecharge * node.rcWeight / sumWeight)
  58. node.finishRecharge = node.lastUpdate + mclock.AbsTime(node.rcValue*rcConst/(-node.rcDelta))
  59. }
  60. }
  61. type ClientManager struct {
  62. lock sync.Mutex
  63. nodes map[*cmNode]struct{}
  64. simReqCnt, sumWeight, rcSumValue uint64
  65. maxSimReq, maxRcSum uint64
  66. rcRecharge uint64
  67. resumeQueue chan chan bool
  68. time mclock.AbsTime
  69. }
  70. func NewClientManager(rcTarget, maxSimReq, maxRcSum uint64) *ClientManager {
  71. cm := &ClientManager{
  72. nodes: make(map[*cmNode]struct{}),
  73. resumeQueue: make(chan chan bool),
  74. rcRecharge: rcConst * rcConst / (100*rcConst/rcTarget - rcConst),
  75. maxSimReq: maxSimReq,
  76. maxRcSum: maxRcSum,
  77. }
  78. go cm.queueProc()
  79. return cm
  80. }
  81. func (self *ClientManager) Stop() {
  82. self.lock.Lock()
  83. defer self.lock.Unlock()
  84. // signal any waiting accept routines to return false
  85. self.nodes = make(map[*cmNode]struct{})
  86. close(self.resumeQueue)
  87. }
  88. func (self *ClientManager) addNode(cnode *ClientNode) *cmNode {
  89. time := mclock.Now()
  90. node := &cmNode{
  91. node: cnode,
  92. lastUpdate: time,
  93. finishRecharge: time,
  94. rcWeight: 1,
  95. }
  96. self.lock.Lock()
  97. defer self.lock.Unlock()
  98. self.nodes[node] = struct{}{}
  99. self.update(mclock.Now())
  100. return node
  101. }
  102. func (self *ClientManager) removeNode(node *cmNode) {
  103. self.lock.Lock()
  104. defer self.lock.Unlock()
  105. time := mclock.Now()
  106. self.stop(node, time)
  107. delete(self.nodes, node)
  108. self.update(time)
  109. }
  110. // recalc sumWeight
  111. func (self *ClientManager) updateNodes(time mclock.AbsTime) (rce bool) {
  112. var sumWeight, rcSum uint64
  113. for node := range self.nodes {
  114. rc := node.recharging
  115. node.update(time)
  116. if rc && !node.recharging {
  117. rce = true
  118. }
  119. if node.recharging {
  120. sumWeight += node.rcWeight
  121. }
  122. rcSum += uint64(node.rcValue)
  123. }
  124. self.sumWeight = sumWeight
  125. self.rcSumValue = rcSum
  126. return
  127. }
  128. func (self *ClientManager) update(time mclock.AbsTime) {
  129. for {
  130. firstTime := time
  131. for node := range self.nodes {
  132. if node.recharging && node.finishRecharge < firstTime {
  133. firstTime = node.finishRecharge
  134. }
  135. }
  136. if self.updateNodes(firstTime) {
  137. for node := range self.nodes {
  138. if node.recharging {
  139. node.set(node.serving, self.simReqCnt, self.sumWeight)
  140. }
  141. }
  142. } else {
  143. self.time = time
  144. return
  145. }
  146. }
  147. }
  148. func (self *ClientManager) canStartReq() bool {
  149. return self.simReqCnt < self.maxSimReq && self.rcSumValue < self.maxRcSum
  150. }
  151. func (self *ClientManager) queueProc() {
  152. for rc := range self.resumeQueue {
  153. for {
  154. time.Sleep(time.Millisecond * 10)
  155. self.lock.Lock()
  156. self.update(mclock.Now())
  157. cs := self.canStartReq()
  158. self.lock.Unlock()
  159. if cs {
  160. break
  161. }
  162. }
  163. close(rc)
  164. }
  165. }
  166. func (self *ClientManager) accept(node *cmNode, time mclock.AbsTime) bool {
  167. self.lock.Lock()
  168. defer self.lock.Unlock()
  169. self.update(time)
  170. if !self.canStartReq() {
  171. resume := make(chan bool)
  172. self.lock.Unlock()
  173. self.resumeQueue <- resume
  174. <-resume
  175. self.lock.Lock()
  176. if _, ok := self.nodes[node]; !ok {
  177. return false // reject if node has been removed or manager has been stopped
  178. }
  179. }
  180. self.simReqCnt++
  181. node.set(true, self.simReqCnt, self.sumWeight)
  182. node.startValue = node.rcValue
  183. self.update(self.time)
  184. return true
  185. }
  186. func (self *ClientManager) stop(node *cmNode, time mclock.AbsTime) {
  187. if node.serving {
  188. self.update(time)
  189. self.simReqCnt--
  190. node.set(false, self.simReqCnt, self.sumWeight)
  191. self.update(time)
  192. }
  193. }
  194. func (self *ClientManager) processed(node *cmNode, time mclock.AbsTime) (rcValue, rcCost uint64) {
  195. self.lock.Lock()
  196. defer self.lock.Unlock()
  197. self.stop(node, time)
  198. return uint64(node.rcValue), uint64(node.rcValue - node.startValue)
  199. }