servingqueue.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "fmt"
  19. "sort"
  20. "sync"
  21. "sync/atomic"
  22. "github.com/ethereum/go-ethereum/common/mclock"
  23. "github.com/ethereum/go-ethereum/common/prque"
  24. "github.com/ethereum/go-ethereum/les/csvlogger"
  25. )
  26. // servingQueue allows running tasks in a limited number of threads and puts the
  27. // waiting tasks in a priority queue
  28. type servingQueue struct {
  29. recentTime, queuedTime, servingTimeDiff uint64
  30. burstLimit, burstDropLimit uint64
  31. burstDecRate float64
  32. lastUpdate mclock.AbsTime
  33. queueAddCh, queueBestCh chan *servingTask
  34. stopThreadCh, quit chan struct{}
  35. setThreadsCh chan int
  36. wg sync.WaitGroup
  37. threadCount int // number of currently running threads
  38. queue *prque.Prque // priority queue for waiting or suspended tasks
  39. best *servingTask // the highest priority task (not included in the queue)
  40. suspendBias int64 // priority bias against suspending an already running task
  41. logger *csvlogger.Logger
  42. logRecentTime *csvlogger.Channel
  43. logQueuedTime *csvlogger.Channel
  44. }
  45. // servingTask represents a request serving task. Tasks can be implemented to
  46. // run in multiple steps, allowing the serving queue to suspend execution between
  47. // steps if higher priority tasks are entered. The creator of the task should
  48. // set the following fields:
  49. //
  50. // - priority: greater value means higher priority; values can wrap around the int64 range
  51. // - run: execute a single step; return true if finished
  52. // - after: executed after run finishes or returns an error, receives the total serving time
  53. type servingTask struct {
  54. sq *servingQueue
  55. servingTime, timeAdded, maxTime, expTime uint64
  56. peer *peer
  57. priority int64
  58. biasAdded bool
  59. token runToken
  60. tokenCh chan runToken
  61. }
  62. // runToken received by servingTask.start allows the task to run. Closing the
  63. // channel by servingTask.stop signals the thread controller to allow a new task
  64. // to start running.
  65. type runToken chan struct{}
  66. // start blocks until the task can start and returns true if it is allowed to run.
  67. // Returning false means that the task should be cancelled.
  68. func (t *servingTask) start() bool {
  69. if t.peer.isFrozen() {
  70. return false
  71. }
  72. t.tokenCh = make(chan runToken, 1)
  73. select {
  74. case t.sq.queueAddCh <- t:
  75. case <-t.sq.quit:
  76. return false
  77. }
  78. select {
  79. case t.token = <-t.tokenCh:
  80. case <-t.sq.quit:
  81. return false
  82. }
  83. if t.token == nil {
  84. return false
  85. }
  86. t.servingTime -= uint64(mclock.Now())
  87. return true
  88. }
  89. // done signals the thread controller about the task being finished and returns
  90. // the total serving time of the task in nanoseconds.
  91. func (t *servingTask) done() uint64 {
  92. t.servingTime += uint64(mclock.Now())
  93. close(t.token)
  94. diff := t.servingTime - t.timeAdded
  95. t.timeAdded = t.servingTime
  96. if t.expTime > diff {
  97. t.expTime -= diff
  98. atomic.AddUint64(&t.sq.servingTimeDiff, t.expTime)
  99. } else {
  100. t.expTime = 0
  101. }
  102. return t.servingTime
  103. }
  104. // waitOrStop can be called during the execution of the task. It blocks if there
  105. // is a higher priority task waiting (a bias is applied in favor of the currently
  106. // running task). Returning true means that the execution can be resumed. False
  107. // means the task should be cancelled.
  108. func (t *servingTask) waitOrStop() bool {
  109. t.done()
  110. if !t.biasAdded {
  111. t.priority += t.sq.suspendBias
  112. t.biasAdded = true
  113. }
  114. return t.start()
  115. }
  116. // newServingQueue returns a new servingQueue
  117. func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue {
  118. sq := &servingQueue{
  119. queue: prque.New(nil),
  120. suspendBias: suspendBias,
  121. queueAddCh: make(chan *servingTask, 100),
  122. queueBestCh: make(chan *servingTask),
  123. stopThreadCh: make(chan struct{}),
  124. quit: make(chan struct{}),
  125. setThreadsCh: make(chan int, 10),
  126. burstLimit: uint64(utilTarget * bufLimitRatio * 1200000),
  127. burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
  128. burstDecRate: utilTarget,
  129. lastUpdate: mclock.Now(),
  130. logger: logger,
  131. logRecentTime: logger.NewMinMaxChannel("recentTime", false),
  132. logQueuedTime: logger.NewMinMaxChannel("queuedTime", false),
  133. }
  134. sq.wg.Add(2)
  135. go sq.queueLoop()
  136. go sq.threadCountLoop()
  137. return sq
  138. }
  139. // newTask creates a new task with the given priority
  140. func (sq *servingQueue) newTask(peer *peer, maxTime uint64, priority int64) *servingTask {
  141. return &servingTask{
  142. sq: sq,
  143. peer: peer,
  144. maxTime: maxTime,
  145. expTime: maxTime,
  146. priority: priority,
  147. }
  148. }
  149. // threadController is started in multiple goroutines and controls the execution
  150. // of tasks. The number of active thread controllers equals the allowed number of
  151. // concurrently running threads. It tries to fetch the highest priority queued
  152. // task first. If there are no queued tasks waiting then it can directly catch
  153. // run tokens from the token channel and allow the corresponding tasks to run
  154. // without entering the priority queue.
  155. func (sq *servingQueue) threadController() {
  156. for {
  157. token := make(runToken)
  158. select {
  159. case best := <-sq.queueBestCh:
  160. best.tokenCh <- token
  161. case <-sq.stopThreadCh:
  162. sq.wg.Done()
  163. return
  164. case <-sq.quit:
  165. sq.wg.Done()
  166. return
  167. }
  168. <-token
  169. select {
  170. case <-sq.stopThreadCh:
  171. sq.wg.Done()
  172. return
  173. case <-sq.quit:
  174. sq.wg.Done()
  175. return
  176. default:
  177. }
  178. }
  179. }
  180. type (
  181. // peerTasks lists the tasks received from a given peer when selecting peers to freeze
  182. peerTasks struct {
  183. peer *peer
  184. list []*servingTask
  185. sumTime uint64
  186. priority float64
  187. }
  188. // peerList is a sortable list of peerTasks
  189. peerList []*peerTasks
  190. )
  191. func (l peerList) Len() int {
  192. return len(l)
  193. }
  194. func (l peerList) Less(i, j int) bool {
  195. return l[i].priority < l[j].priority
  196. }
  197. func (l peerList) Swap(i, j int) {
  198. l[i], l[j] = l[j], l[i]
  199. }
  200. // freezePeers selects the peers with the worst priority queued tasks and freezes
  201. // them until burstTime goes under burstDropLimit or all peers are frozen
  202. func (sq *servingQueue) freezePeers() {
  203. peerMap := make(map[*peer]*peerTasks)
  204. var peerList peerList
  205. if sq.best != nil {
  206. sq.queue.Push(sq.best, sq.best.priority)
  207. }
  208. sq.best = nil
  209. for sq.queue.Size() > 0 {
  210. task := sq.queue.PopItem().(*servingTask)
  211. tasks := peerMap[task.peer]
  212. if tasks == nil {
  213. bufValue, bufLimit := task.peer.fcClient.BufferStatus()
  214. if bufLimit < 1 {
  215. bufLimit = 1
  216. }
  217. tasks = &peerTasks{
  218. peer: task.peer,
  219. priority: float64(bufValue) / float64(bufLimit), // lower value comes first
  220. }
  221. peerMap[task.peer] = tasks
  222. peerList = append(peerList, tasks)
  223. }
  224. tasks.list = append(tasks.list, task)
  225. tasks.sumTime += task.expTime
  226. }
  227. sort.Sort(peerList)
  228. drop := true
  229. sq.logger.Event("freezing peers")
  230. for _, tasks := range peerList {
  231. if drop {
  232. tasks.peer.freezeClient()
  233. tasks.peer.fcClient.Freeze()
  234. sq.queuedTime -= tasks.sumTime
  235. if sq.logQueuedTime != nil {
  236. sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
  237. }
  238. sq.logger.Event(fmt.Sprintf("frozen peer sumTime=%d, %v", tasks.sumTime, tasks.peer.id))
  239. drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
  240. for _, task := range tasks.list {
  241. task.tokenCh <- nil
  242. }
  243. } else {
  244. for _, task := range tasks.list {
  245. sq.queue.Push(task, task.priority)
  246. }
  247. }
  248. }
  249. if sq.queue.Size() > 0 {
  250. sq.best = sq.queue.PopItem().(*servingTask)
  251. }
  252. }
  253. // updateRecentTime recalculates the recent serving time value
  254. func (sq *servingQueue) updateRecentTime() {
  255. subTime := atomic.SwapUint64(&sq.servingTimeDiff, 0)
  256. now := mclock.Now()
  257. dt := now - sq.lastUpdate
  258. sq.lastUpdate = now
  259. if dt > 0 {
  260. subTime += uint64(float64(dt) * sq.burstDecRate)
  261. }
  262. if sq.recentTime > subTime {
  263. sq.recentTime -= subTime
  264. } else {
  265. sq.recentTime = 0
  266. }
  267. }
  268. // addTask inserts a task into the priority queue
  269. func (sq *servingQueue) addTask(task *servingTask) {
  270. if sq.best == nil {
  271. sq.best = task
  272. } else if task.priority > sq.best.priority {
  273. sq.queue.Push(sq.best, sq.best.priority)
  274. sq.best = task
  275. } else {
  276. sq.queue.Push(task, task.priority)
  277. }
  278. sq.updateRecentTime()
  279. sq.queuedTime += task.expTime
  280. if sq.logQueuedTime != nil {
  281. sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
  282. sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
  283. }
  284. if sq.recentTime+sq.queuedTime > sq.burstLimit {
  285. sq.freezePeers()
  286. }
  287. }
  288. // queueLoop is an event loop running in a goroutine. It receives tasks from queueAddCh
  289. // and always tries to send the highest priority task to queueBestCh. Successfully sent
  290. // tasks are removed from the queue.
  291. func (sq *servingQueue) queueLoop() {
  292. for {
  293. if sq.best != nil {
  294. expTime := sq.best.expTime
  295. select {
  296. case task := <-sq.queueAddCh:
  297. sq.addTask(task)
  298. case sq.queueBestCh <- sq.best:
  299. sq.updateRecentTime()
  300. sq.queuedTime -= expTime
  301. sq.recentTime += expTime
  302. if sq.logQueuedTime != nil {
  303. sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
  304. sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
  305. }
  306. if sq.queue.Size() == 0 {
  307. sq.best = nil
  308. } else {
  309. sq.best, _ = sq.queue.PopItem().(*servingTask)
  310. }
  311. case <-sq.quit:
  312. sq.wg.Done()
  313. return
  314. }
  315. } else {
  316. select {
  317. case task := <-sq.queueAddCh:
  318. sq.addTask(task)
  319. case <-sq.quit:
  320. sq.wg.Done()
  321. return
  322. }
  323. }
  324. }
  325. }
  326. // threadCountLoop is an event loop running in a goroutine. It adjusts the number
  327. // of active thread controller goroutines.
  328. func (sq *servingQueue) threadCountLoop() {
  329. var threadCountTarget int
  330. for {
  331. for threadCountTarget > sq.threadCount {
  332. sq.wg.Add(1)
  333. go sq.threadController()
  334. sq.threadCount++
  335. }
  336. if threadCountTarget < sq.threadCount {
  337. select {
  338. case threadCountTarget = <-sq.setThreadsCh:
  339. case sq.stopThreadCh <- struct{}{}:
  340. sq.threadCount--
  341. case <-sq.quit:
  342. sq.wg.Done()
  343. return
  344. }
  345. } else {
  346. select {
  347. case threadCountTarget = <-sq.setThreadsCh:
  348. case <-sq.quit:
  349. sq.wg.Done()
  350. return
  351. }
  352. }
  353. }
  354. }
  355. // setThreads sets the allowed processing thread count, suspending tasks as soon as
  356. // possible if necessary.
  357. func (sq *servingQueue) setThreads(threadCount int) {
  358. select {
  359. case sq.setThreadsCh <- threadCount:
  360. case <-sq.quit:
  361. return
  362. }
  363. }
  364. // stop stops task processing as soon as possible and shuts down the serving queue.
  365. func (sq *servingQueue) stop() {
  366. close(sq.quit)
  367. sq.wg.Wait()
  368. }