Bläddra i källkod

Merge pull request #16210 from gluk256/288-filter-optimization

whisper: message filtering optimization

Only run the message through filters who registered their interest.
Guillaume Ballet 7 år sedan
förälder
incheckning
9b4e182ce5

+ 63 - 33
whisper/whisperv6/filter.go

@@ -35,6 +35,7 @@ type Filter struct {
 	PoW        float64           // Proof of work as described in the Whisper spec
 	AllowP2P   bool              // Indicates whether this filter is interested in direct peer-to-peer messages
 	SymKeyHash common.Hash       // The Keccak256Hash of the symmetric key, needed for optimization
+	id         string            // unique identifier
 
 	Messages map[common.Hash]*ReceivedMessage
 	mutex    sync.RWMutex
@@ -43,15 +44,21 @@ type Filter struct {
 // Filters represents a collection of filters
 type Filters struct {
 	watchers map[string]*Filter
-	whisper  *Whisper
-	mutex    sync.RWMutex
+
+	topicMatcher     map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic
+	allTopicsMatcher map[*Filter]struct{}               // list all the filters that will be notified of a new message, no matter what its topic is
+
+	whisper *Whisper
+	mutex   sync.RWMutex
 }
 
 // NewFilters returns a newly created filter collection
 func NewFilters(w *Whisper) *Filters {
 	return &Filters{
-		watchers: make(map[string]*Filter),
-		whisper:  w,
+		watchers:         make(map[string]*Filter),
+		topicMatcher:     make(map[TopicType]map[*Filter]struct{}),
+		allTopicsMatcher: make(map[*Filter]struct{}),
+		whisper:          w,
 	}
 }
 
@@ -81,7 +88,9 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
 		watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
 	}
 
+	watcher.id = id
 	fs.watchers[id] = watcher
+	fs.addTopicMatcher(watcher)
 	return id, err
 }
 
@@ -91,12 +100,51 @@ func (fs *Filters) Uninstall(id string) bool {
 	fs.mutex.Lock()
 	defer fs.mutex.Unlock()
 	if fs.watchers[id] != nil {
+		fs.removeFromTopicMatchers(fs.watchers[id])
 		delete(fs.watchers, id)
 		return true
 	}
 	return false
 }
 
+// addTopicMatcher adds a filter to the topic matchers.
+// If the filter's Topics array is empty, it will be tried on every topic.
+// Otherwise, it will be tried on the topics specified.
+func (fs *Filters) addTopicMatcher(watcher *Filter) {
+	if len(watcher.Topics) == 0 {
+		fs.allTopicsMatcher[watcher] = struct{}{}
+	} else {
+		for _, t := range watcher.Topics {
+			topic := BytesToTopic(t)
+			if fs.topicMatcher[topic] == nil {
+				fs.topicMatcher[topic] = make(map[*Filter]struct{})
+			}
+			fs.topicMatcher[topic][watcher] = struct{}{}
+		}
+	}
+}
+
+// removeFromTopicMatchers removes a filter from the topic matchers
+func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
+	delete(fs.allTopicsMatcher, watcher)
+	for _, topic := range watcher.Topics {
+		delete(fs.topicMatcher[BytesToTopic(topic)], watcher)
+	}
+}
+
+// getWatchersByTopic returns a slice containing the filters that
+// match a specific topic
+func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
+	res := make([]*Filter, 0, len(fs.allTopicsMatcher))
+	for watcher := range fs.allTopicsMatcher {
+		res = append(res, watcher)
+	}
+	for watcher := range fs.topicMatcher[topic] {
+		res = append(res, watcher)
+	}
+	return res
+}
+
 // Get returns a filter from the collection with a specific ID
 func (fs *Filters) Get(id string) *Filter {
 	fs.mutex.RLock()
@@ -112,11 +160,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
 	fs.mutex.RLock()
 	defer fs.mutex.RUnlock()
 
-	i := -1 // only used for logging info
-	for _, watcher := range fs.watchers {
-		i++
+	candidates := fs.getWatchersByTopic(env.Topic)
+	for _, watcher := range candidates {
 		if p2pMessage && !watcher.AllowP2P {
-			log.Trace(fmt.Sprintf("msg [%x], filter [%d]: p2p messages are not allowed", env.Hash(), i))
+			log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
 			continue
 		}
 
@@ -128,10 +175,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
 			if match {
 				msg = env.Open(watcher)
 				if msg == nil {
-					log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", i)
+					log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id)
 				}
 			} else {
-				log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", i)
+				log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id)
 			}
 		}
 
@@ -194,16 +241,17 @@ func (f *Filter) Retrieve() (all []*ReceivedMessage) {
 
 // MatchMessage checks if the filter matches an already decrypted
 // message (i.e. a Message that has already been handled by
-// MatchEnvelope when checked by a previous filter)
+// MatchEnvelope when checked by a previous filter).
+// Topics are not checked here, since this is done by topic matchers.
 func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
 	if f.PoW > 0 && msg.PoW < f.PoW {
 		return false
 	}
 
 	if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() {
-		return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) && f.MatchTopic(msg.Topic)
+		return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst)
 	} else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() {
-		return f.SymKeyHash == msg.SymKeyHash && f.MatchTopic(msg.Topic)
+		return f.SymKeyHash == msg.SymKeyHash
 	}
 	return false
 }
@@ -211,27 +259,9 @@ func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
 // MatchEnvelope checks if it's worth decrypting the message. If
 // it returns `true`, client code is expected to attempt decrypting
 // the message and subsequently call MatchMessage.
+// Topics are not checked here, since this is done by topic matchers.
 func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
-	if f.PoW > 0 && envelope.pow < f.PoW {
-		return false
-	}
-
-	return f.MatchTopic(envelope.Topic)
-}
-
-// MatchTopic checks that the filter captures a given topic.
-func (f *Filter) MatchTopic(topic TopicType) bool {
-	if len(f.Topics) == 0 {
-		// any topic matches
-		return true
-	}
-
-	for _, bt := range f.Topics {
-		if matchSingleTopic(topic, bt) {
-			return true
-		}
-	}
-	return false
+	return f.PoW <= 0 || envelope.pow >= f.PoW
 }
 
 func matchSingleTopic(topic TopicType, bt []byte) bool {

+ 12 - 17
whisper/whisperv6/filter_test.go

@@ -303,9 +303,8 @@ func TestMatchEnvelope(t *testing.T) {
 		t.Fatalf("failed generateMessageParams with seed %d: %s.", seed, err)
 	}
 
-	params.Topic[0] = 0xFF // ensure mismatch
+	params.Topic[0] = 0xFF // topic mismatch
 
-	// mismatch with pseudo-random data
 	msg, err := NewSentMessage(params)
 	if err != nil {
 		t.Fatalf("failed to create new message with seed %d: %s.", seed, err)
@@ -314,14 +313,6 @@ func TestMatchEnvelope(t *testing.T) {
 	if err != nil {
 		t.Fatalf("failed Wrap with seed %d: %s.", seed, err)
 	}
-	match := fsym.MatchEnvelope(env)
-	if match {
-		t.Fatalf("failed MatchEnvelope symmetric with seed %d.", seed)
-	}
-	match = fasym.MatchEnvelope(env)
-	if match {
-		t.Fatalf("failed MatchEnvelope asymmetric with seed %d.", seed)
-	}
 
 	// encrypt symmetrically
 	i := mrand.Int() % 4
@@ -337,7 +328,7 @@ func TestMatchEnvelope(t *testing.T) {
 	}
 
 	// symmetric + matching topic: match
-	match = fsym.MatchEnvelope(env)
+	match := fsym.MatchEnvelope(env)
 	if !match {
 		t.Fatalf("failed MatchEnvelope() symmetric with seed %d.", seed)
 	}
@@ -396,7 +387,7 @@ func TestMatchEnvelope(t *testing.T) {
 	// asymmetric + matching topic: match
 	fasym.Topics[i] = fasym.Topics[i+1]
 	match = fasym.MatchEnvelope(env)
-	if match {
+	if !match {
 		t.Fatalf("failed MatchEnvelope(asymmetric + matching topic) with seed %d.", seed)
 	}
 
@@ -431,7 +422,8 @@ func TestMatchEnvelope(t *testing.T) {
 	// filter with topic + envelope without topic: mismatch
 	fasym.Topics = fsym.Topics
 	match = fasym.MatchEnvelope(env)
-	if match {
+	if !match {
+		// topic mismatch should have no affect, as topics are handled by topic matchers
 		t.Fatalf("failed MatchEnvelope(filter without topic + envelope without topic) with seed %d.", seed)
 	}
 }
@@ -487,7 +479,8 @@ func TestMatchMessageSym(t *testing.T) {
 
 	// topic mismatch
 	f.Topics[index][0]++
-	if f.MatchMessage(msg) {
+	if !f.MatchMessage(msg) {
+		// topic mismatch should have no affect, as topics are handled by topic matchers
 		t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed)
 	}
 	f.Topics[index][0]--
@@ -580,7 +573,8 @@ func TestMatchMessageAsym(t *testing.T) {
 
 	// topic mismatch
 	f.Topics[index][0]++
-	if f.MatchMessage(msg) {
+	if !f.MatchMessage(msg) {
+		// topic mismatch should have no affect, as topics are handled by topic matchers
 		t.Fatalf("failed MatchEnvelope(topic mismatch) with seed %d.", seed)
 	}
 	f.Topics[index][0]--
@@ -829,8 +823,9 @@ func TestVariableTopics(t *testing.T) {
 
 		f.Topics[i][lastTopicByte]++
 		match = f.MatchEnvelope(env)
-		if match {
-			t.Fatalf("MatchEnvelope symmetric with seed %d, step %d: false positive.", seed, i)
+		if !match {
+			// topic mismatch should have no affect, as topics are handled by topic matchers
+			t.Fatalf("MatchEnvelope symmetric with seed %d, step %d.", seed, i)
 		}
 	}
 }

+ 0 - 18
whisper/whisperv6/whisper.go

@@ -928,24 +928,6 @@ func (whisper *Whisper) Envelopes() []*Envelope {
 	return all
 }
 
-// Messages iterates through all currently floating envelopes
-// and retrieves all the messages, that this filter could decrypt.
-func (whisper *Whisper) Messages(id string) []*ReceivedMessage {
-	result := make([]*ReceivedMessage, 0)
-	whisper.poolMu.RLock()
-	defer whisper.poolMu.RUnlock()
-
-	if filter := whisper.filters.Get(id); filter != nil {
-		for _, env := range whisper.envelopes {
-			msg := filter.processEnvelope(env)
-			if msg != nil {
-				result = append(result, msg)
-			}
-		}
-	}
-	return result
-}
-
 // isEnvelopeCached checks if envelope with specific hash has already been received and cached.
 func (whisper *Whisper) isEnvelopeCached(hash common.Hash) bool {
 	whisper.poolMu.Lock()

+ 1 - 10
whisper/whisperv6/whisper_test.go

@@ -75,10 +75,6 @@ func TestWhisperBasic(t *testing.T) {
 	if len(mail) != 0 {
 		t.Fatalf("failed w.Envelopes().")
 	}
-	m := w.Messages("non-existent")
-	if len(m) != 0 {
-		t.Fatalf("failed w.Messages.")
-	}
 
 	derived := pbkdf2.Key([]byte(peerID), nil, 65356, aesKeyLength, sha256.New)
 	if !validateDataIntegrity(derived, aesKeyLength) {
@@ -593,7 +589,7 @@ func TestCustomization(t *testing.T) {
 	}
 
 	// check w.messages()
-	id, err := w.Subscribe(f)
+	_, err = w.Subscribe(f)
 	if err != nil {
 		t.Fatalf("failed subscribe with seed %d: %s.", seed, err)
 	}
@@ -602,11 +598,6 @@ func TestCustomization(t *testing.T) {
 	if len(mail) > 0 {
 		t.Fatalf("received premature mail")
 	}
-
-	mail = w.Messages(id)
-	if len(mail) != 2 {
-		t.Fatalf("failed to get whisper messages")
-	}
 }
 
 func TestSymmetricSendCycle(t *testing.T) {