filter.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package filters
  17. import (
  18. "context"
  19. "errors"
  20. "math/big"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/core/bloombits"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/rpc"
  25. )
  26. // Filter can be used to retrieve and filter logs.
  27. type Filter struct {
  28. sys *FilterSystem
  29. addresses []common.Address
  30. topics [][]common.Hash
  31. block common.Hash // Block hash if filtering a single block
  32. begin, end int64 // Range interval if filtering multiple blocks
  33. matcher *bloombits.Matcher
  34. }
  35. // NewRangeFilter creates a new filter which uses a bloom filter on blocks to
  36. // figure out whether a particular block is interesting or not.
  37. func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
  38. // Flatten the address and topic filter clauses into a single bloombits filter
  39. // system. Since the bloombits are not positional, nil topics are permitted,
  40. // which get flattened into a nil byte slice.
  41. var filters [][][]byte
  42. if len(addresses) > 0 {
  43. filter := make([][]byte, len(addresses))
  44. for i, address := range addresses {
  45. filter[i] = address.Bytes()
  46. }
  47. filters = append(filters, filter)
  48. }
  49. for _, topicList := range topics {
  50. filter := make([][]byte, len(topicList))
  51. for i, topic := range topicList {
  52. filter[i] = topic.Bytes()
  53. }
  54. filters = append(filters, filter)
  55. }
  56. size, _ := sys.backend.BloomStatus()
  57. // Create a generic filter and convert it into a range filter
  58. filter := newFilter(sys, addresses, topics)
  59. filter.matcher = bloombits.NewMatcher(size, filters)
  60. filter.begin = begin
  61. filter.end = end
  62. return filter
  63. }
  64. // NewBlockFilter creates a new filter which directly inspects the contents of
  65. // a block to figure out whether it is interesting or not.
  66. func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
  67. // Create a generic filter and convert it into a block filter
  68. filter := newFilter(sys, addresses, topics)
  69. filter.block = block
  70. return filter
  71. }
  72. // newFilter creates a generic filter that can either filter based on a block hash,
  73. // or based on range queries. The search criteria needs to be explicitly set.
  74. func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.Hash) *Filter {
  75. return &Filter{
  76. sys: sys,
  77. addresses: addresses,
  78. topics: topics,
  79. }
  80. }
  81. // Logs searches the blockchain for matching log entries, returning all from the
  82. // first block that contains matches, updating the start of the filter accordingly.
  83. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
  84. // If we're doing singleton block filtering, execute and return
  85. if f.block != (common.Hash{}) {
  86. header, err := f.sys.backend.HeaderByHash(ctx, f.block)
  87. if err != nil {
  88. return nil, err
  89. }
  90. if header == nil {
  91. return nil, errors.New("unknown block")
  92. }
  93. return f.blockLogs(ctx, header, false)
  94. }
  95. // Short-cut if all we care about is pending logs
  96. if f.begin == rpc.PendingBlockNumber.Int64() {
  97. if f.end != rpc.PendingBlockNumber.Int64() {
  98. return nil, errors.New("invalid block range")
  99. }
  100. return f.pendingLogs()
  101. }
  102. // Figure out the limits of the filter range
  103. header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
  104. if header == nil {
  105. return nil, nil
  106. }
  107. var (
  108. head = header.Number.Uint64()
  109. end = uint64(f.end)
  110. pending = f.end == rpc.PendingBlockNumber.Int64()
  111. )
  112. if f.begin == rpc.LatestBlockNumber.Int64() {
  113. f.begin = int64(head)
  114. }
  115. if f.end == rpc.LatestBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
  116. end = head
  117. }
  118. // Gather all indexed logs, and finish with non indexed ones
  119. var (
  120. logs []*types.Log
  121. err error
  122. size, sections = f.sys.backend.BloomStatus()
  123. )
  124. if indexed := sections * size; indexed > uint64(f.begin) {
  125. if indexed > end {
  126. logs, err = f.indexedLogs(ctx, end)
  127. } else {
  128. logs, err = f.indexedLogs(ctx, indexed-1)
  129. }
  130. if err != nil {
  131. return logs, err
  132. }
  133. }
  134. rest, err := f.unindexedLogs(ctx, end)
  135. logs = append(logs, rest...)
  136. if pending {
  137. pendingLogs, err := f.pendingLogs()
  138. if err != nil {
  139. return nil, err
  140. }
  141. logs = append(logs, pendingLogs...)
  142. }
  143. return logs, err
  144. }
  145. // indexedLogs returns the logs matching the filter criteria based on the bloom
  146. // bits indexed available locally or via the network.
  147. func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
  148. // Create a matcher session and request servicing from the backend
  149. matches := make(chan uint64, 64)
  150. session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
  151. if err != nil {
  152. return nil, err
  153. }
  154. defer session.Close()
  155. f.sys.backend.ServiceFilter(ctx, session)
  156. // Iterate over the matches until exhausted or context closed
  157. var logs []*types.Log
  158. for {
  159. select {
  160. case number, ok := <-matches:
  161. // Abort if all matches have been fulfilled
  162. if !ok {
  163. err := session.Error()
  164. if err == nil {
  165. f.begin = int64(end) + 1
  166. }
  167. return logs, err
  168. }
  169. f.begin = int64(number) + 1
  170. // Retrieve the suggested block and pull any truly matching logs
  171. header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
  172. if header == nil || err != nil {
  173. return logs, err
  174. }
  175. found, err := f.blockLogs(ctx, header, true)
  176. if err != nil {
  177. return logs, err
  178. }
  179. logs = append(logs, found...)
  180. case <-ctx.Done():
  181. return logs, ctx.Err()
  182. }
  183. }
  184. }
  185. // unindexedLogs returns the logs matching the filter criteria based on raw block
  186. // iteration and bloom matching.
  187. func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
  188. var logs []*types.Log
  189. for ; f.begin <= int64(end); f.begin++ {
  190. header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
  191. if header == nil || err != nil {
  192. return logs, err
  193. }
  194. found, err := f.blockLogs(ctx, header, false)
  195. if err != nil {
  196. return logs, err
  197. }
  198. logs = append(logs, found...)
  199. }
  200. return logs, nil
  201. }
  202. // blockLogs returns the logs matching the filter criteria within a single block.
  203. func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) {
  204. // Fast track: no filtering criteria
  205. if len(f.addresses) == 0 && len(f.topics) == 0 {
  206. list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
  207. if err != nil {
  208. return nil, err
  209. }
  210. return flatten(list), nil
  211. } else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) {
  212. return f.checkMatches(ctx, header)
  213. }
  214. return nil, nil
  215. }
  216. // checkMatches checks if the receipts belonging to the given header contain any log events that
  217. // match the filter criteria. This function is called when the bloom filter signals a potential match.
  218. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) {
  219. logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
  220. if err != nil {
  221. return nil, err
  222. }
  223. unfiltered := flatten(logsList)
  224. logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
  225. if len(logs) > 0 {
  226. // We have matching logs, check if we need to resolve full logs via the light client
  227. if logs[0].TxHash == (common.Hash{}) {
  228. receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash())
  229. if err != nil {
  230. return nil, err
  231. }
  232. unfiltered = unfiltered[:0]
  233. for _, receipt := range receipts {
  234. unfiltered = append(unfiltered, receipt.Logs...)
  235. }
  236. logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
  237. }
  238. return logs, nil
  239. }
  240. return nil, nil
  241. }
  242. // pendingLogs returns the logs matching the filter criteria within the pending block.
  243. func (f *Filter) pendingLogs() ([]*types.Log, error) {
  244. block, receipts := f.sys.backend.PendingBlockAndReceipts()
  245. if bloomFilter(block.Bloom(), f.addresses, f.topics) {
  246. var unfiltered []*types.Log
  247. for _, r := range receipts {
  248. unfiltered = append(unfiltered, r.Logs...)
  249. }
  250. return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
  251. }
  252. return nil, nil
  253. }
  254. func includes(addresses []common.Address, a common.Address) bool {
  255. for _, addr := range addresses {
  256. if addr == a {
  257. return true
  258. }
  259. }
  260. return false
  261. }
  262. // filterLogs creates a slice of logs matching the given criteria.
  263. func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
  264. var ret []*types.Log
  265. Logs:
  266. for _, log := range logs {
  267. if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
  268. continue
  269. }
  270. if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
  271. continue
  272. }
  273. if len(addresses) > 0 && !includes(addresses, log.Address) {
  274. continue
  275. }
  276. // If the to filtered topics is greater than the amount of topics in logs, skip.
  277. if len(topics) > len(log.Topics) {
  278. continue
  279. }
  280. for i, sub := range topics {
  281. match := len(sub) == 0 // empty rule set == wildcard
  282. for _, topic := range sub {
  283. if log.Topics[i] == topic {
  284. match = true
  285. break
  286. }
  287. }
  288. if !match {
  289. continue Logs
  290. }
  291. }
  292. ret = append(ret, log)
  293. }
  294. return ret
  295. }
  296. func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
  297. if len(addresses) > 0 {
  298. var included bool
  299. for _, addr := range addresses {
  300. if types.BloomLookup(bloom, addr) {
  301. included = true
  302. break
  303. }
  304. }
  305. if !included {
  306. return false
  307. }
  308. }
  309. for _, sub := range topics {
  310. included := len(sub) == 0 // empty rule set == wildcard
  311. for _, topic := range sub {
  312. if types.BloomLookup(bloom, topic) {
  313. included = true
  314. break
  315. }
  316. }
  317. if !included {
  318. return false
  319. }
  320. }
  321. return true
  322. }
  323. func flatten(list [][]*types.Log) []*types.Log {
  324. var flat []*types.Log
  325. for _, logs := range list {
  326. flat = append(flat, logs...)
  327. }
  328. return flat
  329. }