matcher_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package bloombits
  17. import (
  18. "math/rand"
  19. "sync/atomic"
  20. "testing"
  21. "time"
  22. )
  23. const testSectionSize = 4096
  24. // Tests the matcher pipeline on a single continuous workflow without interrupts.
  25. func TestMatcherContinuous(t *testing.T) {
  26. testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75)
  27. testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81)
  28. testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36)
  29. }
  30. // Tests the matcher pipeline on a constantly interrupted and resumed work pattern
  31. // with the aim of ensuring data items are requested only once.
  32. func TestMatcherIntermittent(t *testing.T) {
  33. testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, true, 75)
  34. testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, true, 81)
  35. testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36)
  36. }
  37. // Tests the matcher pipeline on random input to hopefully catch anomalies.
  38. func TestMatcherRandom(t *testing.T) {
  39. for i := 0; i < 10; i++ {
  40. testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 10000, 0)
  41. testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 10000, 0)
  42. testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 10000, 0)
  43. testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 10000, 0)
  44. testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 10000, 0)
  45. }
  46. }
  47. // makeRandomIndexes generates a random filter system, composed on multiple filter
  48. // criteria, each having one bloom list component for the address and arbitrarilly
  49. // many topic bloom list components.
  50. func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
  51. res := make([][]bloomIndexes, len(lengths))
  52. for i, topics := range lengths {
  53. res[i] = make([]bloomIndexes, topics)
  54. for j := 0; j < topics; j++ {
  55. for k := 0; k < len(res[i][j]); k++ {
  56. res[i][j][k] = uint(rand.Intn(max-1) + 2)
  57. }
  58. }
  59. }
  60. return res
  61. }
  62. // testMatcherDiffBatches runs the given matches test in single-delivery and also
  63. // in batches delivery mode, verifying that all kinds of deliveries are handled
  64. // correctly withn.
  65. func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32) {
  66. singleton := testMatcher(t, filter, blocks, intermittent, retrievals, 1)
  67. batched := testMatcher(t, filter, blocks, intermittent, retrievals, 16)
  68. if singleton != batched {
  69. t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
  70. }
  71. }
  72. // testMatcherBothModes runs the given matcher test in both continuous as well as
  73. // in intermittent mode, verifying that the request counts match each other.
  74. func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64, retrievals uint32) {
  75. continuous := testMatcher(t, filter, blocks, false, retrievals, 16)
  76. intermittent := testMatcher(t, filter, blocks, true, retrievals, 16)
  77. if continuous != intermittent {
  78. t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
  79. }
  80. }
  81. // testMatcher is a generic tester to run the given matcher test and return the
  82. // number of requests made for cross validation between different modes.
  83. func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
  84. // Create a new matcher an simulate our explicit random bitsets
  85. matcher := NewMatcher(testSectionSize, nil, nil)
  86. matcher.addresses = filter[0]
  87. matcher.topics = filter[1:]
  88. for _, rule := range filter {
  89. for _, topic := range rule {
  90. for _, bit := range topic {
  91. matcher.addScheduler(bit)
  92. }
  93. }
  94. }
  95. // Track the number of retrieval requests made
  96. var requested uint32
  97. // Start the matching session for the filter and the retriver goroutines
  98. quit := make(chan struct{})
  99. matches := make(chan uint64, 16)
  100. session, err := matcher.Start(0, blocks-1, matches)
  101. if err != nil {
  102. t.Fatalf("failed to stat matcher session: %v", err)
  103. }
  104. startRetrievers(session, quit, &requested, maxReqCount)
  105. // Iterate over all the blocks and verify that the pipeline produces the correct matches
  106. for i := uint64(0); i < blocks; i++ {
  107. if expMatch3(filter, i) {
  108. match, ok := <-matches
  109. if !ok {
  110. t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i)
  111. return 0
  112. }
  113. if match != i {
  114. t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match)
  115. }
  116. // If we're testing intermittent mode, abort and restart the pipeline
  117. if intermittent {
  118. session.Close(time.Second)
  119. close(quit)
  120. quit = make(chan struct{})
  121. matches = make(chan uint64, 16)
  122. session, err = matcher.Start(i+1, blocks-1, matches)
  123. if err != nil {
  124. t.Fatalf("failed to stat matcher session: %v", err)
  125. }
  126. startRetrievers(session, quit, &requested, maxReqCount)
  127. }
  128. }
  129. }
  130. // Ensure the result channel is torn down after the last block
  131. match, ok := <-matches
  132. if ok {
  133. t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
  134. }
  135. // Clean up the session and ensure we match the expected retrieval count
  136. session.Close(time.Second)
  137. close(quit)
  138. if retrievals != 0 && requested != retrievals {
  139. t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals)
  140. }
  141. return requested
  142. }
  143. // startRetrievers starts a batch of goroutines listening for section requests
  144. // and serving them.
  145. func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) {
  146. requests := make(chan chan *Retrieval)
  147. for i := 0; i < 10; i++ {
  148. // Start a multiplexer to test multiple threaded execution
  149. go session.Multiplex(batch, 100*time.Microsecond, requests)
  150. // Start a services to match the above multiplexer
  151. go func() {
  152. for {
  153. // Wait for a service request or a shutdown
  154. select {
  155. case <-quit:
  156. return
  157. case request := <-requests:
  158. task := <-request
  159. task.Bitsets = make([][]byte, len(task.Sections))
  160. for i, section := range task.Sections {
  161. if rand.Int()%4 != 0 { // Handle occasional missing deliveries
  162. task.Bitsets[i] = generateBitset(task.Bit, section)
  163. atomic.AddUint32(retrievals, 1)
  164. }
  165. }
  166. request <- task
  167. }
  168. }
  169. }()
  170. }
  171. }
  172. // generateBitset generates the rotated bitset for the given bloom bit and section
  173. // numbers.
  174. func generateBitset(bit uint, section uint64) []byte {
  175. bitset := make([]byte, testSectionSize/8)
  176. for i := 0; i < len(bitset); i++ {
  177. for b := 0; b < 8; b++ {
  178. blockIdx := section*testSectionSize + uint64(i*8+b)
  179. bitset[i] += bitset[i]
  180. if (blockIdx % uint64(bit)) == 0 {
  181. bitset[i]++
  182. }
  183. }
  184. }
  185. return bitset
  186. }
  187. func expMatch1(filter bloomIndexes, i uint64) bool {
  188. for _, ii := range filter {
  189. if (i % uint64(ii)) != 0 {
  190. return false
  191. }
  192. }
  193. return true
  194. }
  195. func expMatch2(filter []bloomIndexes, i uint64) bool {
  196. for _, ii := range filter {
  197. if expMatch1(ii, i) {
  198. return true
  199. }
  200. }
  201. return false
  202. }
  203. func expMatch3(filter [][]bloomIndexes, i uint64) bool {
  204. for _, ii := range filter {
  205. if !expMatch2(ii, i) {
  206. return false
  207. }
  208. }
  209. return true
  210. }