priorityqueue.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // package priority_queue implement a channel based priority queue
  17. // over arbitrary types. It provides an
  18. // an autopop loop applying a function to the items always respecting
  19. // their priority. The structure is only quasi consistent ie., if a lower
  20. // priority item is autopopped, it is guaranteed that there was a point
  21. // when no higher priority item was present, ie. it is not guaranteed
  22. // that there was any point where the lower priority item was present
  23. // but the higher was not
  24. package priorityqueue
  25. import (
  26. "context"
  27. "errors"
  28. )
  29. var (
  30. errContention = errors.New("queue contention")
  31. errBadPriority = errors.New("bad priority")
  32. wakey = struct{}{}
  33. )
  34. // PriorityQueue is the basic structure
  35. type PriorityQueue struct {
  36. queues []chan interface{}
  37. wakeup chan struct{}
  38. }
  39. // New is the constructor for PriorityQueue
  40. func New(n int, l int) *PriorityQueue {
  41. var queues = make([]chan interface{}, n)
  42. for i := range queues {
  43. queues[i] = make(chan interface{}, l)
  44. }
  45. return &PriorityQueue{
  46. queues: queues,
  47. wakeup: make(chan struct{}, 1),
  48. }
  49. }
  50. // Run is a forever loop popping items from the queues
  51. func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) {
  52. top := len(pq.queues) - 1
  53. p := top
  54. READ:
  55. for {
  56. q := pq.queues[p]
  57. select {
  58. case <-ctx.Done():
  59. return
  60. case x := <-q:
  61. f(x)
  62. p = top
  63. default:
  64. if p > 0 {
  65. p--
  66. continue READ
  67. }
  68. p = top
  69. select {
  70. case <-ctx.Done():
  71. return
  72. case <-pq.wakeup:
  73. }
  74. }
  75. }
  76. }
  77. // Push pushes an item to the appropriate queue specified in the priority argument
  78. // if context is given it waits until either the item is pushed or the Context aborts
  79. // otherwise returns errContention if the queue is full
  80. func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error {
  81. if p < 0 || p >= len(pq.queues) {
  82. return errBadPriority
  83. }
  84. if ctx == nil {
  85. select {
  86. case pq.queues[p] <- x:
  87. default:
  88. return errContention
  89. }
  90. } else {
  91. select {
  92. case pq.queues[p] <- x:
  93. case <-ctx.Done():
  94. return ctx.Err()
  95. }
  96. }
  97. select {
  98. case pq.wakeup <- wakey:
  99. default:
  100. }
  101. return nil
  102. }