servingqueue.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "sort"
  19. "sync"
  20. "sync/atomic"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. "github.com/ethereum/go-ethereum/common/prque"
  23. )
  24. // servingQueue allows running tasks in a limited number of threads and puts the
  25. // waiting tasks in a priority queue
  26. type servingQueue struct {
  27. recentTime, queuedTime, servingTimeDiff uint64
  28. burstLimit, burstDropLimit uint64
  29. burstDecRate float64
  30. lastUpdate mclock.AbsTime
  31. queueAddCh, queueBestCh chan *servingTask
  32. stopThreadCh, quit chan struct{}
  33. setThreadsCh chan int
  34. wg sync.WaitGroup
  35. threadCount int // number of currently running threads
  36. queue *prque.Prque // priority queue for waiting or suspended tasks
  37. best *servingTask // the highest priority task (not included in the queue)
  38. suspendBias int64 // priority bias against suspending an already running task
  39. }
  40. // servingTask represents a request serving task. Tasks can be implemented to
  41. // run in multiple steps, allowing the serving queue to suspend execution between
  42. // steps if higher priority tasks are entered. The creator of the task should
  43. // set the following fields:
  44. //
  45. // - priority: greater value means higher priority; values can wrap around the int64 range
  46. // - run: execute a single step; return true if finished
  47. // - after: executed after run finishes or returns an error, receives the total serving time
  48. type servingTask struct {
  49. sq *servingQueue
  50. servingTime, timeAdded, maxTime, expTime uint64
  51. peer *clientPeer
  52. priority int64
  53. biasAdded bool
  54. token runToken
  55. tokenCh chan runToken
  56. }
  57. // runToken received by servingTask.start allows the task to run. Closing the
  58. // channel by servingTask.stop signals the thread controller to allow a new task
  59. // to start running.
  60. type runToken chan struct{}
  61. // start blocks until the task can start and returns true if it is allowed to run.
  62. // Returning false means that the task should be cancelled.
  63. func (t *servingTask) start() bool {
  64. if t.peer.isFrozen() {
  65. return false
  66. }
  67. t.tokenCh = make(chan runToken, 1)
  68. select {
  69. case t.sq.queueAddCh <- t:
  70. case <-t.sq.quit:
  71. return false
  72. }
  73. select {
  74. case t.token = <-t.tokenCh:
  75. case <-t.sq.quit:
  76. return false
  77. }
  78. if t.token == nil {
  79. return false
  80. }
  81. t.servingTime -= uint64(mclock.Now())
  82. return true
  83. }
  84. // done signals the thread controller about the task being finished and returns
  85. // the total serving time of the task in nanoseconds.
  86. func (t *servingTask) done() uint64 {
  87. t.servingTime += uint64(mclock.Now())
  88. close(t.token)
  89. diff := t.servingTime - t.timeAdded
  90. t.timeAdded = t.servingTime
  91. if t.expTime > diff {
  92. t.expTime -= diff
  93. atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime)
  94. } else {
  95. t.expTime = 0
  96. }
  97. return t.servingTime
  98. }
  99. // waitOrStop can be called during the execution of the task. It blocks if there
  100. // is a higher priority task waiting (a bias is applied in favor of the currently
  101. // running task). Returning true means that the execution can be resumed. False
  102. // means the task should be cancelled.
  103. func (t *servingTask) waitOrStop() bool {
  104. t.done()
  105. if !t.biasAdded {
  106. t.priority += t.sq.suspendBias
  107. t.biasAdded = true
  108. }
  109. return t.start()
  110. }
  111. // newServingQueue returns a new servingQueue
  112. func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
  113. sq := &servingQueue{
  114. queue: prque.NewWrapAround(nil),
  115. suspendBias: suspendBias,
  116. queueAddCh: make(chan *servingTask, 100),
  117. queueBestCh: make(chan *servingTask),
  118. stopThreadCh: make(chan struct{}),
  119. quit: make(chan struct{}),
  120. setThreadsCh: make(chan int, 10),
  121. burstLimit: uint64(utilTarget * bufLimitRatio * 1200000),
  122. burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
  123. burstDecRate: utilTarget,
  124. lastUpdate: mclock.Now(),
  125. }
  126. sq.wg.Add(2)
  127. go sq.queueLoop()
  128. go sq.threadCountLoop()
  129. return sq
  130. }
  131. // newTask creates a new task with the given priority
  132. func (sq *servingQueue) newTask(peer *clientPeer, maxTime uint64, priority int64) *servingTask {
  133. return &servingTask{
  134. sq: sq,
  135. peer: peer,
  136. maxTime: maxTime,
  137. expTime: maxTime,
  138. priority: priority,
  139. }
  140. }
  141. // threadController is started in multiple goroutines and controls the execution
  142. // of tasks. The number of active thread controllers equals the allowed number of
  143. // concurrently running threads. It tries to fetch the highest priority queued
  144. // task first. If there are no queued tasks waiting then it can directly catch
  145. // run tokens from the token channel and allow the corresponding tasks to run
  146. // without entering the priority queue.
  147. func (sq *servingQueue) threadController() {
  148. defer sq.wg.Done()
  149. for {
  150. token := make(runToken)
  151. select {
  152. case best := <-sq.queueBestCh:
  153. best.tokenCh <- token
  154. case <-sq.stopThreadCh:
  155. return
  156. case <-sq.quit:
  157. return
  158. }
  159. select {
  160. case <-sq.stopThreadCh:
  161. return
  162. case <-sq.quit:
  163. return
  164. case <-token:
  165. }
  166. }
  167. }
  168. type (
  169. // peerTasks lists the tasks received from a given peer when selecting peers to freeze
  170. peerTasks struct {
  171. peer *clientPeer
  172. list []*servingTask
  173. sumTime uint64
  174. priority float64
  175. }
  176. // peerList is a sortable list of peerTasks
  177. peerList []*peerTasks
  178. )
  179. func (l peerList) Len() int {
  180. return len(l)
  181. }
  182. func (l peerList) Less(i, j int) bool {
  183. return l[i].priority < l[j].priority
  184. }
  185. func (l peerList) Swap(i, j int) {
  186. l[i], l[j] = l[j], l[i]
  187. }
  188. // freezePeers selects the peers with the worst priority queued tasks and freezes
  189. // them until burstTime goes under burstDropLimit or all peers are frozen
  190. func (sq *servingQueue) freezePeers() {
  191. peerMap := make(map[*clientPeer]*peerTasks)
  192. var peerList peerList
  193. if sq.best != nil {
  194. sq.queue.Push(sq.best, sq.best.priority)
  195. }
  196. sq.best = nil
  197. for sq.queue.Size() > 0 {
  198. task := sq.queue.PopItem().(*servingTask)
  199. tasks := peerMap[task.peer]
  200. if tasks == nil {
  201. bufValue, bufLimit := task.peer.fcClient.BufferStatus()
  202. if bufLimit < 1 {
  203. bufLimit = 1
  204. }
  205. tasks = &peerTasks{
  206. peer: task.peer,
  207. priority: float64(bufValue) / float64(bufLimit), // lower value comes first
  208. }
  209. peerMap[task.peer] = tasks
  210. peerList = append(peerList, tasks)
  211. }
  212. tasks.list = append(tasks.list, task)
  213. tasks.sumTime += task.expTime
  214. }
  215. sort.Sort(peerList)
  216. drop := true
  217. for _, tasks := range peerList {
  218. if drop {
  219. tasks.peer.freeze()
  220. tasks.peer.fcClient.Freeze()
  221. sq.queuedTime -= tasks.sumTime
  222. sqQueuedGauge.Update(int64(sq.queuedTime))
  223. clientFreezeMeter.Mark(1)
  224. drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
  225. for _, task := range tasks.list {
  226. task.tokenCh <- nil
  227. }
  228. } else {
  229. for _, task := range tasks.list {
  230. sq.queue.Push(task, task.priority)
  231. }
  232. }
  233. }
  234. if sq.queue.Size() > 0 {
  235. sq.best = sq.queue.PopItem().(*servingTask)
  236. }
  237. }
  238. // updateRecentTime recalculates the recent serving time value
  239. func (sq *servingQueue) updateRecentTime() {
  240. subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0)
  241. now := mclock.Now()
  242. dt := now - sq.lastUpdate
  243. sq.lastUpdate = now
  244. if dt > 0 {
  245. subTime += uint64(float64(dt) * sq.burstDecRate)
  246. }
  247. if sq.recentTime > subTime {
  248. sq.recentTime -= subTime
  249. } else {
  250. sq.recentTime = 0
  251. }
  252. }
  253. // addTask inserts a task into the priority queue
  254. func (sq *servingQueue) addTask(task *servingTask) {
  255. if sq.best == nil {
  256. sq.best = task
  257. } else if task.priority-sq.best.priority > 0 {
  258. sq.queue.Push(sq.best, sq.best.priority)
  259. sq.best = task
  260. } else {
  261. sq.queue.Push(task, task.priority)
  262. }
  263. sq.updateRecentTime()
  264. sq.queuedTime += task.expTime
  265. sqServedGauge.Update(int64(sq.recentTime))
  266. sqQueuedGauge.Update(int64(sq.queuedTime))
  267. if sq.recentTime+sq.queuedTime > sq.burstLimit {
  268. sq.freezePeers()
  269. }
  270. }
  271. // queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh
  272. // and always tries to send the highest priority task to queueBestCh. Successfully sent
  273. // tasks are removed from the queue.
  274. func (sq *servingQueue) queueLoop() {
  275. defer sq.wg.Done()
  276. for {
  277. if sq.best != nil {
  278. expTime := sq.best.expTime
  279. select {
  280. case task := <-sq.queueAddCh:
  281. sq.addTask(task)
  282. case sq.queueBestCh <- sq.best:
  283. sq.updateRecentTime()
  284. sq.queuedTime -= expTime
  285. sq.recentTime += expTime
  286. sqServedGauge.Update(int64(sq.recentTime))
  287. sqQueuedGauge.Update(int64(sq.queuedTime))
  288. if sq.queue.Size() == 0 {
  289. sq.best = nil
  290. } else {
  291. sq.best, _ = sq.queue.PopItem().(*servingTask)
  292. }
  293. case <-sq.quit:
  294. return
  295. }
  296. } else {
  297. select {
  298. case task := <-sq.queueAddCh:
  299. sq.addTask(task)
  300. case <-sq.quit:
  301. return
  302. }
  303. }
  304. }
  305. }
  306. // threadCountLoop is an event loop running in a goroutine. It adjusts the number
  307. // of active thread controller goroutines.
  308. func (sq *servingQueue) threadCountLoop() {
  309. var threadCountTarget int
  310. defer sq.wg.Done()
  311. for {
  312. for threadCountTarget > sq.threadCount {
  313. sq.wg.Add(1)
  314. go sq.threadController()
  315. sq.threadCount++
  316. }
  317. if threadCountTarget < sq.threadCount {
  318. select {
  319. case threadCountTarget = <-sq.setThreadsCh:
  320. case sq.stopThreadCh <- struct{}{}:
  321. sq.threadCount--
  322. case <-sq.quit:
  323. return
  324. }
  325. } else {
  326. select {
  327. case threadCountTarget = <-sq.setThreadsCh:
  328. case <-sq.quit:
  329. return
  330. }
  331. }
  332. }
  333. }
  334. // setThreads sets the allowed processing thread count, suspending tasks as soon as
  335. // possible if necessary.
  336. func (sq *servingQueue) setThreads(threadCount int) {
  337. select {
  338. case sq.setThreadsCh <- threadCount:
  339. case <-sq.quit:
  340. return
  341. }
  342. }
  343. // stop stops task processing as soon as possible and shuts down the serving queue.
  344. func (sq *servingQueue) stop() {
  345. close(sq.quit)
  346. sq.wg.Wait()
  347. }