priorityqueue.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // package priority_queue implement a channel based priority queue
  17. // over arbitrary types. It provides an
  18. // an autopop loop applying a function to the items always respecting
  19. // their priority. The structure is only quasi consistent ie., if a lower
  20. // priority item is autopopped, it is guaranteed that there was a point
  21. // when no higher priority item was present, ie. it is not guaranteed
  22. // that there was any point where the lower priority item was present
  23. // but the higher was not
  24. package priorityqueue
  25. import (
  26. "context"
  27. "errors"
  28. "github.com/ethereum/go-ethereum/log"
  29. )
  30. var (
  31. ErrContention = errors.New("contention")
  32. errBadPriority = errors.New("bad priority")
  33. wakey = struct{}{}
  34. )
  35. // PriorityQueue is the basic structure
  36. type PriorityQueue struct {
  37. Queues []chan interface{}
  38. wakeup chan struct{}
  39. }
  40. // New is the constructor for PriorityQueue
  41. func New(n int, l int) *PriorityQueue {
  42. var queues = make([]chan interface{}, n)
  43. for i := range queues {
  44. queues[i] = make(chan interface{}, l)
  45. }
  46. return &PriorityQueue{
  47. Queues: queues,
  48. wakeup: make(chan struct{}, 1),
  49. }
  50. }
  51. // Run is a forever loop popping items from the queues
  52. func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) {
  53. top := len(pq.Queues) - 1
  54. p := top
  55. READ:
  56. for {
  57. q := pq.Queues[p]
  58. select {
  59. case <-ctx.Done():
  60. return
  61. case x := <-q:
  62. log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
  63. f(x)
  64. p = top
  65. default:
  66. if p > 0 {
  67. p--
  68. log.Trace("priority.queue p > 0", "p", p)
  69. continue READ
  70. }
  71. p = top
  72. select {
  73. case <-ctx.Done():
  74. return
  75. case <-pq.wakeup:
  76. log.Trace("priority.queue wakeup", "p", p)
  77. }
  78. }
  79. }
  80. }
  81. // Push pushes an item to the appropriate queue specified in the priority argument
  82. // if context is given it waits until either the item is pushed or the Context aborts
  83. func (pq *PriorityQueue) Push(x interface{}, p int) error {
  84. if p < 0 || p >= len(pq.Queues) {
  85. return errBadPriority
  86. }
  87. log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
  88. select {
  89. case pq.Queues[p] <- x:
  90. default:
  91. return ErrContention
  92. }
  93. select {
  94. case pq.wakeup <- wakey:
  95. default:
  96. }
  97. return nil
  98. }