浏览代码

core/bloombits, eth/filters: handle null topics (#15195)

When implementing the new bloombits based filter, I've accidentally broke null
topics by removing the special casing of common.Hash{} filter rules, which
acted as the wildcard topic until now.

This PR fixes the regression, but instead of using the magic hash
common.Hash{} as the null wildcard, the PR reworks the code to handle nil
topics during parsing, converting a JSON null into nil []common.Hash topic.
Péter Szilágyi 8 年之前
父节点
当前提交
2ab2a9f131

+ 14 - 2
core/bloombits/matcher.go

@@ -80,7 +80,8 @@ type Matcher struct {
 }
 
 // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
-// address and topic filtering on them.
+// address and topic filtering on them. Setting a filter component to `nil` is
+// allowed and will result in that filter rule being skipped (OR 0x11...1).
 func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
 	// Create the matcher instance
 	m := &Matcher{
@@ -95,11 +96,22 @@ func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
 	m.filters = nil
 
 	for _, filter := range filters {
+		// Gather the bit indexes of the filter rule, special casing the nil filter
+		if len(filter) == 0 {
+			continue
+		}
 		bloomBits := make([]bloomIndexes, len(filter))
 		for i, clause := range filter {
+			if clause == nil {
+				bloomBits = nil
+				break
+			}
 			bloomBits[i] = calcBloomIndexes(clause)
 		}
-		m.filters = append(m.filters, bloomBits)
+		// Accumulate the filter rules if no nil rule was within
+		if bloomBits != nil {
+			m.filters = append(m.filters, bloomBits)
+		}
 	}
 	// For every bit, create a scheduler to load/download the bit vectors
 	for _, bloomIndexLists := range m.filters {

+ 28 - 0
core/bloombits/matcher_test.go

@@ -21,10 +21,38 @@ import (
 	"sync/atomic"
 	"testing"
 	"time"
+
+	"github.com/ethereum/go-ethereum/common"
 )
 
 const testSectionSize = 4096
 
+// Tests that wildcard filter rules (nil) can be specified and are handled well.
+func TestMatcherWildcards(t *testing.T) {
+	matcher := NewMatcher(testSectionSize, [][][]byte{
+		[][]byte{common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard
+		[][]byte{common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()},       // Default hash is not a wildcard
+		[][]byte{common.Hash{0x01}.Bytes()},                              // Plain rule, sanity check
+		[][]byte{common.Hash{0x01}.Bytes(), nil},                         // Wildcard suffix, drop rule
+		[][]byte{nil, common.Hash{0x01}.Bytes()},                         // Wildcard prefix, drop rule
+		[][]byte{nil, nil},                                               // Wildcard combo, drop rule
+		[][]byte{},                                                       // Inited wildcard rule, drop rule
+		nil,                                                              // Proper wildcard rule, drop rule
+	})
+	if len(matcher.filters) != 3 {
+		t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3)
+	}
+	if len(matcher.filters[0]) != 2 {
+		t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2)
+	}
+	if len(matcher.filters[1]) != 2 {
+		t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2)
+	}
+	if len(matcher.filters[2]) != 1 {
+		t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1)
+	}
+}
+
 // Tests the matcher pipeline on a single continuous workflow without interrupts.
 func TestMatcherContinuous(t *testing.T) {
 	testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75)

+ 6 - 3
eth/filters/api.go

@@ -498,7 +498,6 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
 			switch topic := t.(type) {
 			case nil:
 				// ignore topic when matching logs
-				args.Topics[i] = []common.Hash{{}}
 
 			case string:
 				// match specific topic
@@ -507,12 +506,16 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
 					return err
 				}
 				args.Topics[i] = []common.Hash{top}
+
 			case []interface{}:
 				// or case e.g. [null, "topic0", "topic1"]
 				for _, rawTopic := range topic {
 					if rawTopic == nil {
-						args.Topics[i] = append(args.Topics[i], common.Hash{})
-					} else if topic, ok := rawTopic.(string); ok {
+						// null component, match all
+						args.Topics[i] = nil
+						break
+					}
+					if topic, ok := rawTopic.(string); ok {
 						parsed, err := decodeTopic(topic)
 						if err != nil {
 							return err

+ 6 - 18
eth/filters/api_test.go

@@ -34,7 +34,6 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 		topic0                    = common.HexToHash("3ac225168df54212a25c1c01fd35bebfea408fdac2e31ddd6f80a4bbf9a5f1ca")
 		topic1                    = common.HexToHash("9084a792d2f8b16a62b882fd56f7860c07bf5fa91dd8a2ae7e809e5180fef0b3")
 		topic2                    = common.HexToHash("6ccae1c4af4152f460ff510e573399795dfab5dcf1fa60d1f33ac8fdc1e480ce")
-		nullTopic                 = common.Hash{}
 	)
 
 	// default values
@@ -150,11 +149,8 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	if test6.Topics[0][0] != topic0 {
 		t.Fatalf("got %x, expected %x", test6.Topics[0][0], topic0)
 	}
-	if len(test6.Topics[1]) != 1 {
-		t.Fatalf("expected 1 topic, got %d", len(test6.Topics[1]))
-	}
-	if test6.Topics[1][0] != nullTopic {
-		t.Fatalf("got %x, expected empty hash", test6.Topics[1][0])
+	if len(test6.Topics[1]) != 0 {
+		t.Fatalf("expected 0 topic, got %d", len(test6.Topics[1]))
 	}
 	if len(test6.Topics[2]) != 1 {
 		t.Fatalf("expected 1 topic, got %d", len(test6.Topics[2]))
@@ -180,18 +176,10 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 			topic0, topic1, test7.Topics[0][0], test7.Topics[0][1],
 		)
 	}
-	if len(test7.Topics[1]) != 1 {
-		t.Fatalf("expected 1 topic, got %d topics", len(test7.Topics[1]))
-	}
-	if test7.Topics[1][0] != nullTopic {
-		t.Fatalf("expected empty hash, got %x", test7.Topics[1][0])
-	}
-	if len(test7.Topics[2]) != 2 {
-		t.Fatalf("expected 2 topics, got %d topics", len(test7.Topics[2]))
+	if len(test7.Topics[1]) != 0 {
+		t.Fatalf("expected 0 topic, got %d topics", len(test7.Topics[1]))
 	}
-	if test7.Topics[2][0] != topic2 || test7.Topics[2][1] != nullTopic {
-		t.Fatalf("invalid topics expected [%x,%x], got [%x,%x]",
-			topic2, nullTopic, test7.Topics[2][0], test7.Topics[2][1],
-		)
+	if len(test7.Topics[2]) != 0 {
+		t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2]))
 	}
 }

+ 7 - 15
eth/filters/filter.go

@@ -60,7 +60,9 @@ type Filter struct {
 // New creates a new filter which uses a bloom filter on blocks to figure out whether
 // a particular block is interesting or not.
 func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
-	// Flatten the address and topic filter clauses into a single filter system
+	// Flatten the address and topic filter clauses into a single bloombits filter
+	// system. Since the bloombits are not positional, nil topics are permitted,
+	// which get flattened into a nil byte slice.
 	var filters [][][]byte
 	if len(addresses) > 0 {
 		filter := make([][]byte, len(addresses))
@@ -235,32 +237,24 @@ Logs:
 		if len(addresses) > 0 && !includes(addresses, log.Address) {
 			continue
 		}
-
-		logTopics := make([]common.Hash, len(topics))
-		copy(logTopics, log.Topics)
-
 		// If the to filtered topics is greater than the amount of topics in logs, skip.
 		if len(topics) > len(log.Topics) {
 			continue Logs
 		}
-
 		for i, topics := range topics {
-			var match bool
+			match := len(topics) == 0 // empty rule set == wildcard
 			for _, topic := range topics {
-				// common.Hash{} is a match all (wildcard)
-				if (topic == common.Hash{}) || log.Topics[i] == topic {
+				if log.Topics[i] == topic {
 					match = true
 					break
 				}
 			}
-
 			if !match {
 				continue Logs
 			}
 		}
 		ret = append(ret, log)
 	}
-
 	return ret
 }
 
@@ -273,16 +267,15 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
 				break
 			}
 		}
-
 		if !included {
 			return false
 		}
 	}
 
 	for _, sub := range topics {
-		var included bool
+		included := len(sub) == 0 // empty rule set == wildcard
 		for _, topic := range sub {
-			if (topic == common.Hash{}) || types.BloomLookup(bloom, topic) {
+			if types.BloomLookup(bloom, topic) {
 				included = true
 				break
 			}
@@ -291,6 +284,5 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo
 			return false
 		}
 	}
-
 	return true
 }

+ 0 - 5
eth/filters/filter_system.go

@@ -212,7 +212,6 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan
 		installed: make(chan struct{}),
 		err:       make(chan error),
 	}
-
 	return es.subscribe(sub)
 }
 
@@ -230,7 +229,6 @@ func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*types.Log
 		installed: make(chan struct{}),
 		err:       make(chan error),
 	}
-
 	return es.subscribe(sub)
 }
 
@@ -248,7 +246,6 @@ func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*ty
 		installed: make(chan struct{}),
 		err:       make(chan error),
 	}
-
 	return es.subscribe(sub)
 }
 
@@ -265,7 +262,6 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
 		installed: make(chan struct{}),
 		err:       make(chan error),
 	}
-
 	return es.subscribe(sub)
 }
 
@@ -282,7 +278,6 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
 		installed: make(chan struct{}),
 		err:       make(chan error),
 	}
-
 	return es.subscribe(sub)
 }
 

+ 7 - 5
eth/filters/filter_system_test.go

@@ -363,7 +363,7 @@ func TestLogFilter(t *testing.T) {
 			// match all
 			0: {FilterCriteria{}, allLogs, ""},
 			// match none due to no matching addresses
-			1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, []*types.Log{}, ""},
+			1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""},
 			// match logs based on addresses, ignore topics
 			2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
 			// match none due to no matching topics (match with address)
@@ -384,6 +384,8 @@ func TestLogFilter(t *testing.T) {
 			10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""},
 			// all "mined" and pending logs with topic firstTopic
 			11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""},
+			// match all logs due to wildcard topic
+			12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""},
 		}
 	)
 
@@ -459,7 +461,7 @@ func TestPendingLogsSubscription(t *testing.T) {
 		firstTopic     = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
 		secondTopic    = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
 		thirdTopic     = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
-		forthTopic     = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
+		fourthTopic    = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
 		notUsedTopic   = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
 
 		allLogs = []core.PendingLogsEvent{
@@ -471,7 +473,7 @@ func TestPendingLogsSubscription(t *testing.T) {
 			{Logs: []*types.Log{
 				{Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
 				{Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5},
-				{Address: thirdAddress, Topics: []common.Hash{forthTopic}, BlockNumber: 5},
+				{Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5},
 				{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
 			}},
 		}
@@ -493,7 +495,7 @@ func TestPendingLogsSubscription(t *testing.T) {
 			// match all
 			{FilterCriteria{}, convertLogs(allLogs), nil, nil},
 			// match none due to no matching addresses
-			{FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{{}}}, []*types.Log{}, nil, nil},
+			{FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil},
 			// match logs based on addresses, ignore topics
 			{FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
 			// match none due to no matching topics (match with address)
@@ -505,7 +507,7 @@ func TestPendingLogsSubscription(t *testing.T) {
 			// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
 			{FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
 			// multiple pending logs, should match only 2 topics from the logs in block 5
-			{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, forthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
+			{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
 		}
 	)