iter.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package enode
  17. import (
  18. "sync"
  19. "time"
  20. "blockchain-go/common/gopool"
  21. )
  22. // Iterator represents a sequence of nodes. The Next method moves to the next node in the
  23. // sequence. It returns false when the sequence has ended or the iterator is closed. Close
  24. // may be called concurrently with Next and Node, and interrupts Next if it is blocked.
  25. type Iterator interface {
  26. Next() bool // moves to next node
  27. Node() *Node // returns current node
  28. Close() // ends the iterator
  29. }
  30. // ReadNodes reads at most n nodes from the given iterator. The return value contains no
  31. // duplicates and no nil values. To prevent looping indefinitely for small repeating node
  32. // sequences, this function calls Next at most n times.
  33. func ReadNodes(it Iterator, n int) []*Node {
  34. seen := make(map[ID]*Node, n)
  35. for i := 0; i < n && it.Next(); i++ {
  36. // Remove duplicates, keeping the node with higher seq.
  37. node := it.Node()
  38. prevNode, ok := seen[node.ID()]
  39. if ok && prevNode.Seq() > node.Seq() {
  40. continue
  41. }
  42. seen[node.ID()] = node
  43. }
  44. result := make([]*Node, 0, len(seen))
  45. for _, node := range seen {
  46. result = append(result, node)
  47. }
  48. return result
  49. }
  50. // IterNodes makes an iterator which runs through the given nodes once.
  51. func IterNodes(nodes []*Node) Iterator {
  52. return &sliceIter{nodes: nodes, index: -1}
  53. }
  54. // CycleNodes makes an iterator which cycles through the given nodes indefinitely.
  55. func CycleNodes(nodes []*Node) Iterator {
  56. return &sliceIter{nodes: nodes, index: -1, cycle: true}
  57. }
  58. type sliceIter struct {
  59. mu sync.Mutex
  60. nodes []*Node
  61. index int
  62. cycle bool
  63. }
  64. func (it *sliceIter) Next() bool {
  65. it.mu.Lock()
  66. defer it.mu.Unlock()
  67. if len(it.nodes) == 0 {
  68. return false
  69. }
  70. it.index++
  71. if it.index == len(it.nodes) {
  72. if it.cycle {
  73. it.index = 0
  74. } else {
  75. it.nodes = nil
  76. return false
  77. }
  78. }
  79. return true
  80. }
  81. func (it *sliceIter) Node() *Node {
  82. it.mu.Lock()
  83. defer it.mu.Unlock()
  84. if len(it.nodes) == 0 {
  85. return nil
  86. }
  87. return it.nodes[it.index]
  88. }
  89. func (it *sliceIter) Close() {
  90. it.mu.Lock()
  91. defer it.mu.Unlock()
  92. it.nodes = nil
  93. }
  94. // Filter wraps an iterator such that Next only returns nodes for which
  95. // the 'check' function returns true.
  96. func Filter(it Iterator, check func(*Node) bool) Iterator {
  97. return &filterIter{it, check}
  98. }
  99. type filterIter struct {
  100. Iterator
  101. check func(*Node) bool
  102. }
  103. func (f *filterIter) Next() bool {
  104. for f.Iterator.Next() {
  105. if f.check(f.Node()) {
  106. return true
  107. }
  108. }
  109. return false
  110. }
  111. // FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends
  112. // only when Close is called. Source iterators added via AddSource are removed from the
  113. // mix when they end.
  114. //
  115. // The distribution of nodes returned by Next is approximately fair, i.e. FairMix
  116. // attempts to draw from all sources equally often. However, if a certain source is slow
  117. // and doesn't return a node within the configured timeout, a node from any other source
  118. // will be returned.
  119. //
  120. // It's safe to call AddSource and Close concurrently with Next.
  121. type FairMix struct {
  122. wg sync.WaitGroup
  123. fromAny chan *Node
  124. timeout time.Duration
  125. cur *Node
  126. mu sync.Mutex
  127. closed chan struct{}
  128. sources []*mixSource
  129. last int
  130. }
  131. type mixSource struct {
  132. it Iterator
  133. next chan *Node
  134. timeout time.Duration
  135. }
  136. // NewFairMix creates a mixer.
  137. //
  138. // The timeout specifies how long the mixer will wait for the next fairly-chosen source
  139. // before giving up and taking a node from any other source. A good way to set the timeout
  140. // is deciding how long you'd want to wait for a node on average. Passing a negative
  141. // timeout makes the mixer completely fair.
  142. func NewFairMix(timeout time.Duration) *FairMix {
  143. m := &FairMix{
  144. fromAny: make(chan *Node),
  145. closed: make(chan struct{}),
  146. timeout: timeout,
  147. }
  148. return m
  149. }
  150. // AddSource adds a source of nodes.
  151. func (m *FairMix) AddSource(it Iterator) {
  152. m.mu.Lock()
  153. defer m.mu.Unlock()
  154. if m.closed == nil {
  155. return
  156. }
  157. m.wg.Add(1)
  158. source := &mixSource{it, make(chan *Node), m.timeout}
  159. m.sources = append(m.sources, source)
  160. gopool.Submit(func() {
  161. m.runSource(m.closed, source)
  162. })
  163. }
  164. // Close shuts down the mixer and all current sources.
  165. // Calling this is required to release resources associated with the mixer.
  166. func (m *FairMix) Close() {
  167. m.mu.Lock()
  168. defer m.mu.Unlock()
  169. if m.closed == nil {
  170. return
  171. }
  172. for _, s := range m.sources {
  173. s.it.Close()
  174. }
  175. close(m.closed)
  176. m.wg.Wait()
  177. close(m.fromAny)
  178. m.sources = nil
  179. m.closed = nil
  180. }
  181. // Next returns a node from a random source.
  182. func (m *FairMix) Next() bool {
  183. m.cur = nil
  184. var timeout <-chan time.Time
  185. if m.timeout >= 0 {
  186. timer := time.NewTimer(m.timeout)
  187. timeout = timer.C
  188. defer timer.Stop()
  189. }
  190. for {
  191. source := m.pickSource()
  192. if source == nil {
  193. return m.nextFromAny()
  194. }
  195. select {
  196. case n, ok := <-source.next:
  197. if ok {
  198. m.cur = n
  199. source.timeout = m.timeout
  200. return true
  201. }
  202. // This source has ended.
  203. m.deleteSource(source)
  204. case <-timeout:
  205. source.timeout /= 2
  206. return m.nextFromAny()
  207. }
  208. }
  209. }
  210. // Node returns the current node.
  211. func (m *FairMix) Node() *Node {
  212. return m.cur
  213. }
  214. // nextFromAny is used when there are no sources or when the 'fair' choice
  215. // doesn't turn up a node quickly enough.
  216. func (m *FairMix) nextFromAny() bool {
  217. n, ok := <-m.fromAny
  218. if ok {
  219. m.cur = n
  220. }
  221. return ok
  222. }
  223. // pickSource chooses the next source to read from, cycling through them in order.
  224. func (m *FairMix) pickSource() *mixSource {
  225. m.mu.Lock()
  226. defer m.mu.Unlock()
  227. if len(m.sources) == 0 {
  228. return nil
  229. }
  230. m.last = (m.last + 1) % len(m.sources)
  231. return m.sources[m.last]
  232. }
  233. // deleteSource deletes a source.
  234. func (m *FairMix) deleteSource(s *mixSource) {
  235. m.mu.Lock()
  236. defer m.mu.Unlock()
  237. for i := range m.sources {
  238. if m.sources[i] == s {
  239. copy(m.sources[i:], m.sources[i+1:])
  240. m.sources[len(m.sources)-1] = nil
  241. m.sources = m.sources[:len(m.sources)-1]
  242. break
  243. }
  244. }
  245. }
  246. // runSource reads a single source in a loop.
  247. func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
  248. defer m.wg.Done()
  249. defer close(s.next)
  250. for s.it.Next() {
  251. n := s.it.Node()
  252. select {
  253. case s.next <- n:
  254. case m.fromAny <- n:
  255. case <-closed:
  256. return
  257. }
  258. }
  259. }