matcher.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package bloombits
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "math"
  22. "sort"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/bitutil"
  27. "github.com/ethereum/go-ethereum/common/gopool"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. )
  30. // bloomIndexes represents the bit indexes inside the bloom filter that belong
  31. // to some key.
  32. type bloomIndexes [3]uint
  33. // calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
  34. func calcBloomIndexes(b []byte) bloomIndexes {
  35. b = crypto.Keccak256(b)
  36. var idxs bloomIndexes
  37. for i := 0; i < len(idxs); i++ {
  38. idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
  39. }
  40. return idxs
  41. }
  42. // partialMatches with a non-nil vector represents a section in which some sub-
  43. // matchers have already found potential matches. Subsequent sub-matchers will
  44. // binary AND their matches with this vector. If vector is nil, it represents a
  45. // section to be processed by the first sub-matcher.
  46. type partialMatches struct {
  47. section uint64
  48. bitset []byte
  49. }
  50. // Retrieval represents a request for retrieval task assignments for a given
  51. // bit with the given number of fetch elements, or a response for such a request.
  52. // It can also have the actual results set to be used as a delivery data struct.
  53. //
  54. // The contest and error fields are used by the light client to terminate matching
  55. // early if an error is encountered on some path of the pipeline.
  56. type Retrieval struct {
  57. Bit uint
  58. Sections []uint64
  59. Bitsets [][]byte
  60. Context context.Context
  61. Error error
  62. }
  63. // Matcher is a pipelined system of schedulers and logic matchers which perform
  64. // binary AND/OR operations on the bit-streams, creating a stream of potential
  65. // blocks to inspect for data content.
  66. type Matcher struct {
  67. sectionSize uint64 // Size of the data batches to filter on
  68. filters [][]bloomIndexes // Filter the system is matching for
  69. schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
  70. retrievers chan chan uint // Retriever processes waiting for bit allocations
  71. counters chan chan uint // Retriever processes waiting for task count reports
  72. retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
  73. deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
  74. running uint32 // Atomic flag whether a session is live or not
  75. }
  76. // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
  77. // address and topic filtering on them. Setting a filter component to `nil` is
  78. // allowed and will result in that filter rule being skipped (OR 0x11...1).
  79. func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
  80. // Create the matcher instance
  81. m := &Matcher{
  82. sectionSize: sectionSize,
  83. schedulers: make(map[uint]*scheduler),
  84. retrievers: make(chan chan uint),
  85. counters: make(chan chan uint),
  86. retrievals: make(chan chan *Retrieval),
  87. deliveries: make(chan *Retrieval),
  88. }
  89. // Calculate the bloom bit indexes for the groups we're interested in
  90. m.filters = nil
  91. for _, filter := range filters {
  92. // Gather the bit indexes of the filter rule, special casing the nil filter
  93. if len(filter) == 0 {
  94. continue
  95. }
  96. bloomBits := make([]bloomIndexes, len(filter))
  97. for i, clause := range filter {
  98. if clause == nil {
  99. bloomBits = nil
  100. break
  101. }
  102. bloomBits[i] = calcBloomIndexes(clause)
  103. }
  104. // Accumulate the filter rules if no nil rule was within
  105. if bloomBits != nil {
  106. m.filters = append(m.filters, bloomBits)
  107. }
  108. }
  109. // For every bit, create a scheduler to load/download the bit vectors
  110. for _, bloomIndexLists := range m.filters {
  111. for _, bloomIndexList := range bloomIndexLists {
  112. for _, bloomIndex := range bloomIndexList {
  113. m.addScheduler(bloomIndex)
  114. }
  115. }
  116. }
  117. return m
  118. }
  119. // addScheduler adds a bit stream retrieval scheduler for the given bit index if
  120. // it has not existed before. If the bit is already selected for filtering, the
  121. // existing scheduler can be used.
  122. func (m *Matcher) addScheduler(idx uint) {
  123. if _, ok := m.schedulers[idx]; ok {
  124. return
  125. }
  126. m.schedulers[idx] = newScheduler(idx)
  127. }
  128. // Start starts the matching process and returns a stream of bloom matches in
  129. // a given range of blocks. If there are no more matches in the range, the result
  130. // channel is closed.
  131. func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
  132. // Make sure we're not creating concurrent sessions
  133. if atomic.SwapUint32(&m.running, 1) == 1 {
  134. return nil, errors.New("matcher already running")
  135. }
  136. defer atomic.StoreUint32(&m.running, 0)
  137. // Initiate a new matching round
  138. session := &MatcherSession{
  139. matcher: m,
  140. quit: make(chan struct{}),
  141. ctx: ctx,
  142. }
  143. for _, scheduler := range m.schedulers {
  144. scheduler.reset()
  145. }
  146. sink := m.run(begin, end, cap(results), session)
  147. // Read the output from the result sink and deliver to the user
  148. session.pend.Add(1)
  149. gopool.Submit(func() {
  150. defer session.pend.Done()
  151. defer close(results)
  152. for {
  153. select {
  154. case <-session.quit:
  155. return
  156. case res, ok := <-sink:
  157. // New match result found
  158. if !ok {
  159. return
  160. }
  161. // Calculate the first and last blocks of the section
  162. sectionStart := res.section * m.sectionSize
  163. first := sectionStart
  164. if begin > first {
  165. first = begin
  166. }
  167. last := sectionStart + m.sectionSize - 1
  168. if end < last {
  169. last = end
  170. }
  171. // Iterate over all the blocks in the section and return the matching ones
  172. for i := first; i <= last; i++ {
  173. // Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
  174. next := res.bitset[(i-sectionStart)/8]
  175. if next == 0 {
  176. if i%8 == 0 {
  177. i += 7
  178. }
  179. continue
  180. }
  181. // Some bit it set, do the actual submatching
  182. if bit := 7 - i%8; next&(1<<bit) != 0 {
  183. select {
  184. case <-session.quit:
  185. return
  186. case results <- i:
  187. }
  188. }
  189. }
  190. }
  191. }
  192. })
  193. return session, nil
  194. }
  195. // run creates a daisy-chain of sub-matchers, one for the address set and one
  196. // for each topic set, each sub-matcher receiving a section only if the previous
  197. // ones have all found a potential match in one of the blocks of the section,
  198. // then binary AND-ing its own matches and forwarding the result to the next one.
  199. //
  200. // The method starts feeding the section indexes into the first sub-matcher on a
  201. // new goroutine and returns a sink channel receiving the results.
  202. func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
  203. // Create the source channel and feed section indexes into
  204. source := make(chan *partialMatches, buffer)
  205. session.pend.Add(1)
  206. gopool.Submit(func() {
  207. defer session.pend.Done()
  208. defer close(source)
  209. for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
  210. select {
  211. case <-session.quit:
  212. return
  213. case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
  214. }
  215. }
  216. })
  217. // Assemble the daisy-chained filtering pipeline
  218. next := source
  219. dist := make(chan *request, buffer)
  220. for _, bloom := range m.filters {
  221. next = m.subMatch(next, dist, bloom, session)
  222. }
  223. // Start the request distribution
  224. session.pend.Add(1)
  225. gopool.Submit(func() {
  226. m.distributor(dist, session)
  227. })
  228. return next
  229. }
  230. // subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
  231. // binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
  232. // The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
  233. // that address/topic, and binary AND-ing those vectors together.
  234. func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
  235. // Start the concurrent schedulers for each bit required by the bloom filter
  236. sectionSources := make([][3]chan uint64, len(bloom))
  237. sectionSinks := make([][3]chan []byte, len(bloom))
  238. for i, bits := range bloom {
  239. for j, bit := range bits {
  240. sectionSources[i][j] = make(chan uint64, cap(source))
  241. sectionSinks[i][j] = make(chan []byte, cap(source))
  242. m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
  243. }
  244. }
  245. process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
  246. results := make(chan *partialMatches, cap(source))
  247. session.pend.Add(2)
  248. gopool.Submit(func() {
  249. // Tear down the goroutine and terminate all source channels
  250. defer session.pend.Done()
  251. defer close(process)
  252. defer func() {
  253. for _, bloomSources := range sectionSources {
  254. for _, bitSource := range bloomSources {
  255. close(bitSource)
  256. }
  257. }
  258. }()
  259. // Read sections from the source channel and multiplex into all bit-schedulers
  260. for {
  261. select {
  262. case <-session.quit:
  263. return
  264. case subres, ok := <-source:
  265. // New subresult from previous link
  266. if !ok {
  267. return
  268. }
  269. // Multiplex the section index to all bit-schedulers
  270. for _, bloomSources := range sectionSources {
  271. for _, bitSource := range bloomSources {
  272. select {
  273. case <-session.quit:
  274. return
  275. case bitSource <- subres.section:
  276. }
  277. }
  278. }
  279. // Notify the processor that this section will become available
  280. select {
  281. case <-session.quit:
  282. return
  283. case process <- subres:
  284. }
  285. }
  286. }
  287. })
  288. gopool.Submit(func() {
  289. // Tear down the goroutine and terminate the final sink channel
  290. defer session.pend.Done()
  291. defer close(results)
  292. // Read the source notifications and collect the delivered results
  293. for {
  294. select {
  295. case <-session.quit:
  296. return
  297. case subres, ok := <-process:
  298. // Notified of a section being retrieved
  299. if !ok {
  300. return
  301. }
  302. // Gather all the sub-results and merge them together
  303. var orVector []byte
  304. for _, bloomSinks := range sectionSinks {
  305. var andVector []byte
  306. for _, bitSink := range bloomSinks {
  307. var data []byte
  308. select {
  309. case <-session.quit:
  310. return
  311. case data = <-bitSink:
  312. }
  313. if andVector == nil {
  314. andVector = make([]byte, int(m.sectionSize/8))
  315. copy(andVector, data)
  316. } else {
  317. bitutil.ANDBytes(andVector, andVector, data)
  318. }
  319. }
  320. if orVector == nil {
  321. orVector = andVector
  322. } else {
  323. bitutil.ORBytes(orVector, orVector, andVector)
  324. }
  325. }
  326. if orVector == nil {
  327. orVector = make([]byte, int(m.sectionSize/8))
  328. }
  329. if subres.bitset != nil {
  330. bitutil.ANDBytes(orVector, orVector, subres.bitset)
  331. }
  332. if bitutil.TestBytes(orVector) {
  333. select {
  334. case <-session.quit:
  335. return
  336. case results <- &partialMatches{subres.section, orVector}:
  337. }
  338. }
  339. }
  340. }
  341. })
  342. return results
  343. }
  344. // distributor receives requests from the schedulers and queues them into a set
  345. // of pending requests, which are assigned to retrievers wanting to fulfil them.
  346. func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
  347. defer session.pend.Done()
  348. var (
  349. requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
  350. unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
  351. retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
  352. allocs int // Number of active allocations to handle graceful shutdown requests
  353. shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
  354. )
  355. // assign is a helper method fo try to assign a pending bit an actively
  356. // listening servicer, or schedule it up for later when one arrives.
  357. assign := func(bit uint) {
  358. select {
  359. case fetcher := <-m.retrievers:
  360. allocs++
  361. fetcher <- bit
  362. default:
  363. // No retrievers active, start listening for new ones
  364. retrievers = m.retrievers
  365. unallocs[bit] = struct{}{}
  366. }
  367. }
  368. for {
  369. select {
  370. case <-shutdown:
  371. // Shutdown requested. No more retrievers can be allocated,
  372. // but we still need to wait until all pending requests have returned.
  373. shutdown = nil
  374. if allocs == 0 {
  375. return
  376. }
  377. case req := <-dist:
  378. // New retrieval request arrived to be distributed to some fetcher process
  379. queue := requests[req.bit]
  380. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
  381. requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
  382. // If it's a new bit and we have waiting fetchers, allocate to them
  383. if len(queue) == 0 {
  384. assign(req.bit)
  385. }
  386. case fetcher := <-retrievers:
  387. // New retriever arrived, find the lowest section-ed bit to assign
  388. bit, best := uint(0), uint64(math.MaxUint64)
  389. for idx := range unallocs {
  390. if requests[idx][0] < best {
  391. bit, best = idx, requests[idx][0]
  392. }
  393. }
  394. // Stop tracking this bit (and alloc notifications if no more work is available)
  395. delete(unallocs, bit)
  396. if len(unallocs) == 0 {
  397. retrievers = nil
  398. }
  399. allocs++
  400. fetcher <- bit
  401. case fetcher := <-m.counters:
  402. // New task count request arrives, return number of items
  403. fetcher <- uint(len(requests[<-fetcher]))
  404. case fetcher := <-m.retrievals:
  405. // New fetcher waiting for tasks to retrieve, assign
  406. task := <-fetcher
  407. if want := len(task.Sections); want >= len(requests[task.Bit]) {
  408. task.Sections = requests[task.Bit]
  409. delete(requests, task.Bit)
  410. } else {
  411. task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
  412. requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
  413. }
  414. fetcher <- task
  415. // If anything was left unallocated, try to assign to someone else
  416. if len(requests[task.Bit]) > 0 {
  417. assign(task.Bit)
  418. }
  419. case result := <-m.deliveries:
  420. // New retrieval task response from fetcher, split out missing sections and
  421. // deliver complete ones
  422. var (
  423. sections = make([]uint64, 0, len(result.Sections))
  424. bitsets = make([][]byte, 0, len(result.Bitsets))
  425. missing = make([]uint64, 0, len(result.Sections))
  426. )
  427. for i, bitset := range result.Bitsets {
  428. if len(bitset) == 0 {
  429. missing = append(missing, result.Sections[i])
  430. continue
  431. }
  432. sections = append(sections, result.Sections[i])
  433. bitsets = append(bitsets, bitset)
  434. }
  435. m.schedulers[result.Bit].deliver(sections, bitsets)
  436. allocs--
  437. // Reschedule missing sections and allocate bit if newly available
  438. if len(missing) > 0 {
  439. queue := requests[result.Bit]
  440. for _, section := range missing {
  441. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
  442. queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
  443. }
  444. requests[result.Bit] = queue
  445. if len(queue) == len(missing) {
  446. assign(result.Bit)
  447. }
  448. }
  449. // End the session when all pending deliveries have arrived.
  450. if shutdown == nil && allocs == 0 {
  451. return
  452. }
  453. }
  454. }
  455. }
  456. // MatcherSession is returned by a started matcher to be used as a terminator
  457. // for the actively running matching operation.
  458. type MatcherSession struct {
  459. matcher *Matcher
  460. closer sync.Once // Sync object to ensure we only ever close once
  461. quit chan struct{} // Quit channel to request pipeline termination
  462. ctx context.Context // Context used by the light client to abort filtering
  463. err atomic.Value // Global error to track retrieval failures deep in the chain
  464. pend sync.WaitGroup
  465. }
  466. // Close stops the matching process and waits for all subprocesses to terminate
  467. // before returning. The timeout may be used for graceful shutdown, allowing the
  468. // currently running retrievals to complete before this time.
  469. func (s *MatcherSession) Close() {
  470. s.closer.Do(func() {
  471. // Signal termination and wait for all goroutines to tear down
  472. close(s.quit)
  473. s.pend.Wait()
  474. })
  475. }
  476. // Error returns any failure encountered during the matching session.
  477. func (s *MatcherSession) Error() error {
  478. if err := s.err.Load(); err != nil {
  479. return err.(error)
  480. }
  481. return nil
  482. }
  483. // allocateRetrieval assigns a bloom bit index to a client process that can either
  484. // immediately request and fetch the section contents assigned to this bit or wait
  485. // a little while for more sections to be requested.
  486. func (s *MatcherSession) allocateRetrieval() (uint, bool) {
  487. fetcher := make(chan uint)
  488. select {
  489. case <-s.quit:
  490. return 0, false
  491. case s.matcher.retrievers <- fetcher:
  492. bit, ok := <-fetcher
  493. return bit, ok
  494. }
  495. }
  496. // pendingSections returns the number of pending section retrievals belonging to
  497. // the given bloom bit index.
  498. func (s *MatcherSession) pendingSections(bit uint) int {
  499. fetcher := make(chan uint)
  500. select {
  501. case <-s.quit:
  502. return 0
  503. case s.matcher.counters <- fetcher:
  504. fetcher <- bit
  505. return int(<-fetcher)
  506. }
  507. }
  508. // allocateSections assigns all or part of an already allocated bit-task queue
  509. // to the requesting process.
  510. func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
  511. fetcher := make(chan *Retrieval)
  512. select {
  513. case <-s.quit:
  514. return nil
  515. case s.matcher.retrievals <- fetcher:
  516. task := &Retrieval{
  517. Bit: bit,
  518. Sections: make([]uint64, count),
  519. }
  520. fetcher <- task
  521. return (<-fetcher).Sections
  522. }
  523. }
  524. // deliverSections delivers a batch of section bit-vectors for a specific bloom
  525. // bit index to be injected into the processing pipeline.
  526. func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
  527. s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
  528. }
  529. // Multiplex polls the matcher session for retrieval tasks and multiplexes it into
  530. // the requested retrieval queue to be serviced together with other sessions.
  531. //
  532. // This method will block for the lifetime of the session. Even after termination
  533. // of the session, any request in-flight need to be responded to! Empty responses
  534. // are fine though in that case.
  535. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
  536. for {
  537. // Allocate a new bloom bit index to retrieve data for, stopping when done
  538. bit, ok := s.allocateRetrieval()
  539. if !ok {
  540. return
  541. }
  542. // Bit allocated, throttle a bit if we're below our batch limit
  543. if s.pendingSections(bit) < batch {
  544. select {
  545. case <-s.quit:
  546. // Session terminating, we can't meaningfully service, abort
  547. s.allocateSections(bit, 0)
  548. s.deliverSections(bit, []uint64{}, [][]byte{})
  549. return
  550. case <-time.After(wait):
  551. // Throttling up, fetch whatever's available
  552. }
  553. }
  554. // Allocate as much as we can handle and request servicing
  555. sections := s.allocateSections(bit, batch)
  556. request := make(chan *Retrieval)
  557. select {
  558. case <-s.quit:
  559. // Session terminating, we can't meaningfully service, abort
  560. s.deliverSections(bit, sections, make([][]byte, len(sections)))
  561. return
  562. case mux <- request:
  563. // Retrieval accepted, something must arrive before we're aborting
  564. request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
  565. result := <-request
  566. if result.Error != nil {
  567. s.err.Store(result.Error)
  568. s.Close()
  569. }
  570. s.deliverSections(result.Bit, result.Sections, result.Bitsets)
  571. }
  572. }
  573. }