matcher.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package bloombits
  17. import (
  18. "errors"
  19. "math"
  20. "sort"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/bitutil"
  26. "github.com/ethereum/go-ethereum/crypto"
  27. )
  28. // bloomIndexes represents the bit indexes inside the bloom filter that belong
  29. // to some key.
  30. type bloomIndexes [3]uint
  31. // calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
  32. func calcBloomIndexes(b []byte) bloomIndexes {
  33. b = crypto.Keccak256(b)
  34. var idxs bloomIndexes
  35. for i := 0; i < len(idxs); i++ {
  36. idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
  37. }
  38. return idxs
  39. }
  40. // partialMatches with a non-nil vector represents a section in which some sub-
  41. // matchers have already found potential matches. Subsequent sub-matchers will
  42. // binary AND their matches with this vector. If vector is nil, it represents a
  43. // section to be processed by the first sub-matcher.
  44. type partialMatches struct {
  45. section uint64
  46. bitset []byte
  47. }
  48. // Retrieval represents a request for retrieval task assignments for a given
  49. // bit with the given number of fetch elements, or a response for such a request.
  50. // It can also have the actual results set to be used as a delivery data struct.
  51. type Retrieval struct {
  52. Bit uint
  53. Sections []uint64
  54. Bitsets [][]byte
  55. }
  56. // Matcher is a pipelined system of schedulers and logic matchers which perform
  57. // binary AND/OR operations on the bit-streams, creating a stream of potential
  58. // blocks to inspect for data content.
  59. type Matcher struct {
  60. sectionSize uint64 // Size of the data batches to filter on
  61. addresses []bloomIndexes // Addresses the system is filtering for
  62. topics [][]bloomIndexes // Topics the system is filtering for
  63. schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
  64. retrievers chan chan uint // Retriever processes waiting for bit allocations
  65. counters chan chan uint // Retriever processes waiting for task count reports
  66. retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
  67. deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
  68. running uint32 // Atomic flag whether a session is live or not
  69. }
  70. // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
  71. // address and topic filtering on them.
  72. func NewMatcher(sectionSize uint64, addresses []common.Address, topics [][]common.Hash) *Matcher {
  73. m := &Matcher{
  74. sectionSize: sectionSize,
  75. schedulers: make(map[uint]*scheduler),
  76. retrievers: make(chan chan uint),
  77. counters: make(chan chan uint),
  78. retrievals: make(chan chan *Retrieval),
  79. deliveries: make(chan *Retrieval),
  80. }
  81. m.setAddresses(addresses)
  82. m.setTopics(topics)
  83. return m
  84. }
  85. // setAddresses configures the matcher to only return logs that are generated
  86. // from addresses that are included in the given list.
  87. func (m *Matcher) setAddresses(addresses []common.Address) {
  88. // Calculate the bloom bit indexes for the addresses we're interested in
  89. m.addresses = make([]bloomIndexes, len(addresses))
  90. for i, address := range addresses {
  91. m.addresses[i] = calcBloomIndexes(address.Bytes())
  92. }
  93. // For every bit, create a scheduler to load/download the bit vectors
  94. for _, bloomIndexList := range m.addresses {
  95. for _, bloomIndex := range bloomIndexList {
  96. m.addScheduler(bloomIndex)
  97. }
  98. }
  99. }
  100. // setTopics configures the matcher to only return logs that have topics matching
  101. // the given list.
  102. func (m *Matcher) setTopics(topicsList [][]common.Hash) {
  103. // Calculate the bloom bit indexes for the topics we're interested in
  104. m.topics = nil
  105. for _, topics := range topicsList {
  106. bloomBits := make([]bloomIndexes, len(topics))
  107. for i, topic := range topics {
  108. bloomBits[i] = calcBloomIndexes(topic.Bytes())
  109. }
  110. m.topics = append(m.topics, bloomBits)
  111. }
  112. // For every bit, create a scheduler to load/download the bit vectors
  113. for _, bloomIndexLists := range m.topics {
  114. for _, bloomIndexList := range bloomIndexLists {
  115. for _, bloomIndex := range bloomIndexList {
  116. m.addScheduler(bloomIndex)
  117. }
  118. }
  119. }
  120. }
  121. // addScheduler adds a bit stream retrieval scheduler for the given bit index if
  122. // it has not existed before. If the bit is already selected for filtering, the
  123. // existing scheduler can be used.
  124. func (m *Matcher) addScheduler(idx uint) {
  125. if _, ok := m.schedulers[idx]; ok {
  126. return
  127. }
  128. m.schedulers[idx] = newScheduler(idx)
  129. }
  130. // Start starts the matching process and returns a stream of bloom matches in
  131. // a given range of blocks. If there are no more matches in the range, the result
  132. // channel is closed.
  133. func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) {
  134. // Make sure we're not creating concurrent sessions
  135. if atomic.SwapUint32(&m.running, 1) == 1 {
  136. return nil, errors.New("matcher already running")
  137. }
  138. defer atomic.StoreUint32(&m.running, 0)
  139. // Initiate a new matching round
  140. session := &MatcherSession{
  141. matcher: m,
  142. quit: make(chan struct{}),
  143. kill: make(chan struct{}),
  144. }
  145. for _, scheduler := range m.schedulers {
  146. scheduler.reset()
  147. }
  148. sink := m.run(begin, end, cap(results), session)
  149. // Read the output from the result sink and deliver to the user
  150. session.pend.Add(1)
  151. go func() {
  152. defer session.pend.Done()
  153. defer close(results)
  154. for {
  155. select {
  156. case <-session.quit:
  157. return
  158. case res, ok := <-sink:
  159. // New match result found
  160. if !ok {
  161. return
  162. }
  163. // Calculate the first and last blocks of the section
  164. sectionStart := res.section * m.sectionSize
  165. first := sectionStart
  166. if begin > first {
  167. first = begin
  168. }
  169. last := sectionStart + m.sectionSize - 1
  170. if end < last {
  171. last = end
  172. }
  173. // Iterate over all the blocks in the section and return the matching ones
  174. for i := first; i <= last; i++ {
  175. // If the bitset is nil, we're a special match-all cornercase
  176. if res.bitset == nil {
  177. select {
  178. case <-session.quit:
  179. return
  180. case results <- i:
  181. }
  182. continue
  183. }
  184. // Skip the entire byte if no matches are found inside
  185. next := res.bitset[(i-sectionStart)/8]
  186. if next == 0 {
  187. i += 7
  188. continue
  189. }
  190. // Some bit it set, do the actual submatching
  191. if bit := 7 - i%8; next&(1<<bit) != 0 {
  192. select {
  193. case <-session.quit:
  194. return
  195. case results <- i:
  196. }
  197. }
  198. }
  199. }
  200. }
  201. }()
  202. return session, nil
  203. }
  204. // run creates a daisy-chain of sub-matchers, one for the address set and one
  205. // for each topic set, each sub-matcher receiving a section only if the previous
  206. // ones have all found a potential match in one of the blocks of the section,
  207. // then binary AND-ing its own matches and forwaring the result to the next one.
  208. //
  209. // The method starts feeding the section indexes into the first sub-matcher on a
  210. // new goroutine and returns a sink channel receiving the results.
  211. func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
  212. // Create the source channel and feed section indexes into
  213. source := make(chan *partialMatches, buffer)
  214. session.pend.Add(1)
  215. go func() {
  216. defer session.pend.Done()
  217. defer close(source)
  218. for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
  219. select {
  220. case <-session.quit:
  221. return
  222. case source <- &partialMatches{i, nil}:
  223. }
  224. }
  225. }()
  226. // Assemble the daisy-chained filtering pipeline
  227. blooms := m.topics
  228. if len(m.addresses) > 0 {
  229. blooms = append([][]bloomIndexes{m.addresses}, blooms...)
  230. }
  231. next := source
  232. dist := make(chan *request, buffer)
  233. for _, bloom := range blooms {
  234. next = m.subMatch(next, dist, bloom, session)
  235. }
  236. // Start the request distribution
  237. session.pend.Add(1)
  238. go m.distributor(dist, session)
  239. return next
  240. }
  241. // subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
  242. // binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
  243. // The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
  244. // that address/topic, and binary AND-ing those vectors together.
  245. func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
  246. // Start the concurrent schedulers for each bit required by the bloom filter
  247. sectionSources := make([][3]chan uint64, len(bloom))
  248. sectionSinks := make([][3]chan []byte, len(bloom))
  249. for i, bits := range bloom {
  250. for j, bit := range bits {
  251. sectionSources[i][j] = make(chan uint64, cap(source))
  252. sectionSinks[i][j] = make(chan []byte, cap(source))
  253. m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
  254. }
  255. }
  256. process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
  257. results := make(chan *partialMatches, cap(source))
  258. session.pend.Add(2)
  259. go func() {
  260. // Tear down the goroutine and terminate all source channels
  261. defer session.pend.Done()
  262. defer close(process)
  263. defer func() {
  264. for _, bloomSources := range sectionSources {
  265. for _, bitSource := range bloomSources {
  266. close(bitSource)
  267. }
  268. }
  269. }()
  270. // Read sections from the source channel and multiplex into all bit-schedulers
  271. for {
  272. select {
  273. case <-session.quit:
  274. return
  275. case subres, ok := <-source:
  276. // New subresult from previous link
  277. if !ok {
  278. return
  279. }
  280. // Multiplex the section index to all bit-schedulers
  281. for _, bloomSources := range sectionSources {
  282. for _, bitSource := range bloomSources {
  283. select {
  284. case <-session.quit:
  285. return
  286. case bitSource <- subres.section:
  287. }
  288. }
  289. }
  290. // Notify the processor that this section will become available
  291. select {
  292. case <-session.quit:
  293. return
  294. case process <- subres:
  295. }
  296. }
  297. }
  298. }()
  299. go func() {
  300. // Tear down the goroutine and terminate the final sink channel
  301. defer session.pend.Done()
  302. defer close(results)
  303. // Read the source notifications and collect the delivered results
  304. for {
  305. select {
  306. case <-session.quit:
  307. return
  308. case subres, ok := <-process:
  309. // Notified of a section being retrieved
  310. if !ok {
  311. return
  312. }
  313. // Gather all the sub-results and merge them together
  314. var orVector []byte
  315. for _, bloomSinks := range sectionSinks {
  316. var andVector []byte
  317. for _, bitSink := range bloomSinks {
  318. var data []byte
  319. select {
  320. case <-session.quit:
  321. return
  322. case data = <-bitSink:
  323. }
  324. if andVector == nil {
  325. andVector = make([]byte, int(m.sectionSize/8))
  326. copy(andVector, data)
  327. } else {
  328. bitutil.ANDBytes(andVector, andVector, data)
  329. }
  330. }
  331. if orVector == nil {
  332. orVector = andVector
  333. } else {
  334. bitutil.ORBytes(orVector, orVector, andVector)
  335. }
  336. }
  337. if orVector == nil {
  338. orVector = make([]byte, int(m.sectionSize/8))
  339. }
  340. if subres.bitset != nil {
  341. bitutil.ANDBytes(orVector, orVector, subres.bitset)
  342. }
  343. if bitutil.TestBytes(orVector) {
  344. select {
  345. case <-session.quit:
  346. return
  347. case results <- &partialMatches{subres.section, orVector}:
  348. }
  349. }
  350. }
  351. }
  352. }()
  353. return results
  354. }
  355. // distributor receives requests from the schedulers and queues them into a set
  356. // of pending requests, which are assigned to retrievers wanting to fulfil them.
  357. func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
  358. defer session.pend.Done()
  359. var (
  360. requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
  361. unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
  362. retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
  363. )
  364. var (
  365. allocs int // Number of active allocations to handle graceful shutdown requests
  366. shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
  367. )
  368. // assign is a helper method fo try to assign a pending bit an an actively
  369. // listening servicer, or schedule it up for later when one arrives.
  370. assign := func(bit uint) {
  371. select {
  372. case fetcher := <-m.retrievers:
  373. allocs++
  374. fetcher <- bit
  375. default:
  376. // No retrievers active, start listening for new ones
  377. retrievers = m.retrievers
  378. unallocs[bit] = struct{}{}
  379. }
  380. }
  381. for {
  382. select {
  383. case <-shutdown:
  384. // Graceful shutdown requested, wait until all pending requests are honoured
  385. if allocs == 0 {
  386. return
  387. }
  388. shutdown = nil
  389. case <-session.kill:
  390. // Pending requests not honoured in time, hard terminate
  391. return
  392. case req := <-dist:
  393. // New retrieval request arrived to be distributed to some fetcher process
  394. queue := requests[req.bit]
  395. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
  396. requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
  397. // If it's a new bit and we have waiting fetchers, allocate to them
  398. if len(queue) == 0 {
  399. assign(req.bit)
  400. }
  401. case fetcher := <-retrievers:
  402. // New retriever arrived, find the lowest section-ed bit to assign
  403. bit, best := uint(0), uint64(math.MaxUint64)
  404. for idx := range unallocs {
  405. if requests[idx][0] < best {
  406. bit, best = idx, requests[idx][0]
  407. }
  408. }
  409. // Stop tracking this bit (and alloc notifications if no more work is available)
  410. delete(unallocs, bit)
  411. if len(unallocs) == 0 {
  412. retrievers = nil
  413. }
  414. allocs++
  415. fetcher <- bit
  416. case fetcher := <-m.counters:
  417. // New task count request arrives, return number of items
  418. fetcher <- uint(len(requests[<-fetcher]))
  419. case fetcher := <-m.retrievals:
  420. // New fetcher waiting for tasks to retrieve, assign
  421. task := <-fetcher
  422. if want := len(task.Sections); want >= len(requests[task.Bit]) {
  423. task.Sections = requests[task.Bit]
  424. delete(requests, task.Bit)
  425. } else {
  426. task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
  427. requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
  428. }
  429. fetcher <- task
  430. // If anything was left unallocated, try to assign to someone else
  431. if len(requests[task.Bit]) > 0 {
  432. assign(task.Bit)
  433. }
  434. case result := <-m.deliveries:
  435. // New retrieval task response from fetcher, split out missing sections and
  436. // deliver complete ones
  437. var (
  438. sections = make([]uint64, 0, len(result.Sections))
  439. bitsets = make([][]byte, 0, len(result.Bitsets))
  440. missing = make([]uint64, 0, len(result.Sections))
  441. )
  442. for i, bitset := range result.Bitsets {
  443. if len(bitset) == 0 {
  444. missing = append(missing, result.Sections[i])
  445. continue
  446. }
  447. sections = append(sections, result.Sections[i])
  448. bitsets = append(bitsets, bitset)
  449. }
  450. m.schedulers[result.Bit].deliver(sections, bitsets)
  451. allocs--
  452. // Reschedule missing sections and allocate bit if newly available
  453. if len(missing) > 0 {
  454. queue := requests[result.Bit]
  455. for _, section := range missing {
  456. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
  457. queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
  458. }
  459. requests[result.Bit] = queue
  460. if len(queue) == len(missing) {
  461. assign(result.Bit)
  462. }
  463. }
  464. // If we're in the process of shutting down, terminate
  465. if allocs == 0 && shutdown == nil {
  466. return
  467. }
  468. }
  469. }
  470. }
  471. // MatcherSession is returned by a started matcher to be used as a terminator
  472. // for the actively running matching operation.
  473. type MatcherSession struct {
  474. matcher *Matcher
  475. quit chan struct{} // Quit channel to request pipeline termination
  476. kill chan struct{} // Term channel to signal non-graceful forced shutdown
  477. pend sync.WaitGroup
  478. }
  479. // Close stops the matching process and waits for all subprocesses to terminate
  480. // before returning. The timeout may be used for graceful shutdown, allowing the
  481. // currently running retrievals to complete before this time.
  482. func (s *MatcherSession) Close(timeout time.Duration) {
  483. // Bail out if the matcher is not running
  484. select {
  485. case <-s.quit:
  486. return
  487. default:
  488. }
  489. // Signal termination and wait for all goroutines to tear down
  490. close(s.quit)
  491. time.AfterFunc(timeout, func() { close(s.kill) })
  492. s.pend.Wait()
  493. }
  494. // AllocateRetrieval assigns a bloom bit index to a client process that can either
  495. // immediately reuest and fetch the section contents assigned to this bit or wait
  496. // a little while for more sections to be requested.
  497. func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
  498. fetcher := make(chan uint)
  499. select {
  500. case <-s.quit:
  501. return 0, false
  502. case s.matcher.retrievers <- fetcher:
  503. bit, ok := <-fetcher
  504. return bit, ok
  505. }
  506. }
  507. // PendingSections returns the number of pending section retrievals belonging to
  508. // the given bloom bit index.
  509. func (s *MatcherSession) PendingSections(bit uint) int {
  510. fetcher := make(chan uint)
  511. select {
  512. case <-s.quit:
  513. return 0
  514. case s.matcher.counters <- fetcher:
  515. fetcher <- bit
  516. return int(<-fetcher)
  517. }
  518. }
  519. // AllocateSections assigns all or part of an already allocated bit-task queue
  520. // to the requesting process.
  521. func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
  522. fetcher := make(chan *Retrieval)
  523. select {
  524. case <-s.quit:
  525. return nil
  526. case s.matcher.retrievals <- fetcher:
  527. task := &Retrieval{
  528. Bit: bit,
  529. Sections: make([]uint64, count),
  530. }
  531. fetcher <- task
  532. return (<-fetcher).Sections
  533. }
  534. }
  535. // DeliverSections delivers a batch of section bit-vectors for a specific bloom
  536. // bit index to be injected into the processing pipeline.
  537. func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) {
  538. select {
  539. case <-s.kill:
  540. return
  541. case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}:
  542. }
  543. }
  544. // Multiplex polls the matcher session for rerieval tasks and multiplexes it into
  545. // the reuested retrieval queue to be serviced together with other sessions.
  546. //
  547. // This method will block for the lifetime of the session. Even after termination
  548. // of the session, any request in-flight need to be responded to! Empty responses
  549. // are fine though in that case.
  550. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
  551. for {
  552. // Allocate a new bloom bit index to retrieve data for, stopping when done
  553. bit, ok := s.AllocateRetrieval()
  554. if !ok {
  555. return
  556. }
  557. // Bit allocated, throttle a bit if we're below our batch limit
  558. if s.PendingSections(bit) < batch {
  559. select {
  560. case <-s.quit:
  561. // Session terminating, we can't meaningfully service, abort
  562. s.AllocateSections(bit, 0)
  563. s.DeliverSections(bit, []uint64{}, [][]byte{})
  564. return
  565. case <-time.After(wait):
  566. // Throttling up, fetch whatever's available
  567. }
  568. }
  569. // Allocate as much as we can handle and request servicing
  570. sections := s.AllocateSections(bit, batch)
  571. request := make(chan *Retrieval)
  572. select {
  573. case <-s.quit:
  574. // Session terminating, we can't meaningfully service, abort
  575. s.DeliverSections(bit, sections, make([][]byte, len(sections)))
  576. return
  577. case mux <- request:
  578. // Retrieval accepted, something must arrive before we're aborting
  579. request <- &Retrieval{Bit: bit, Sections: sections}
  580. result := <-request
  581. s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
  582. }
  583. }
  584. }