瀏覽代碼

Added old filter. Needs some refactoring

obscuren 11 年之前
父節點
當前提交
cdb2ebbdfa
共有 2 個文件被更改,包括 100 次插入57 次删除
  1. 6 57
      eth/backend.go
  2. 94 0
      event/filter/old_filter.go

+ 6 - 57
eth/backend.go

@@ -14,7 +14,6 @@ import (
 	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/pow/ezp"
 	"github.com/ethereum/go-ethereum/rpc"
-	"github.com/ethereum/go-ethereum/state"
 	"github.com/ethereum/go-ethereum/whisper"
 )
 
@@ -75,7 +74,6 @@ func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.Ke
 		clientIdentity: identity,
 		blacklist:      p2p.NewBlacklist(),
 		eventMux:       &event.TypeMux{},
-		filters:        make(map[int]*core.Filter),
 	}
 
 	eth.txPool = core.NewTxPool(eth)
@@ -83,6 +81,7 @@ func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.Ke
 	eth.blockManager = core.NewBlockManager(eth)
 	eth.chainManager.SetProcessor(eth.blockManager)
 	eth.whisper = whisper.New()
+	eth.filterManager = filter.NewFilterManager(eth.EventMux())
 
 	hasBlock := eth.chainManager.HasBlock
 	insertChain := eth.chainManager.InsertChain
@@ -164,8 +163,7 @@ func (s *Ethereum) Start(seed bool) error {
 	}
 	s.blockPool.Start()
 	s.whisper.Start()
-
-	go s.filterLoop()
+	s.filterManager.Start()
 
 	// broadcast transactions
 	s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
@@ -267,58 +265,9 @@ func saveProtocolVersion(db ethutil.Database) {
 	}
 }
 
-// InstallFilter adds filter for blockchain events.
-// The filter's callbacks will run for matching blocks and messages.
-// The filter should not be modified after it has been installed.
+// XXX Refactor me & MOVE
 func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) {
-	self.filterMu.Lock()
-	id = self.filterId
-	self.filters[id] = filter
-	self.filterId++
-	self.filterMu.Unlock()
-	return id
-}
-
-func (self *Ethereum) UninstallFilter(id int) {
-	self.filterMu.Lock()
-	delete(self.filters, id)
-	self.filterMu.Unlock()
-}
-
-// GetFilter retrieves a filter installed using InstallFilter.
-// The filter may not be modified.
-func (self *Ethereum) GetFilter(id int) *core.Filter {
-	self.filterMu.RLock()
-	defer self.filterMu.RUnlock()
-	return self.filters[id]
-}
-
-func (self *Ethereum) filterLoop() {
-	// Subscribe to events
-	events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
-	for event := range events.Chan() {
-		switch event.(type) {
-		case core.NewBlockEvent:
-			self.filterMu.RLock()
-			for _, filter := range self.filters {
-				if filter.BlockCallback != nil {
-					e := event.(core.NewBlockEvent)
-					filter.BlockCallback(e.Block)
-				}
-			}
-			self.filterMu.RUnlock()
-		case state.Messages:
-			self.filterMu.RLock()
-			for _, filter := range self.filters {
-				if filter.MessageCallback != nil {
-					e := event.(state.Messages)
-					msgs := filter.FilterMessages(e)
-					if len(msgs) > 0 {
-						filter.MessageCallback(msgs)
-					}
-				}
-			}
-			self.filterMu.RUnlock()
-		}
-	}
+	return self.filterManager.InstallFilter(filter)
 }
+func (self *Ethereum) UninstallFilter(id int)        { self.filterManager.UninstallFilter(id) }
+func (self *Ethereum) GetFilter(id int) *core.Filter { return self.filterManager.GetFilter(id) }

+ 94 - 0
event/filter/old_filter.go

@@ -0,0 +1,94 @@
+// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring
+package filter
+
+import (
+	"sync"
+
+	"github.com/ethereum/go-ethereum/core"
+	"github.com/ethereum/go-ethereum/event"
+	"github.com/ethereum/go-ethereum/state"
+)
+
+type FilterManager struct {
+	eventMux *event.TypeMux
+
+	filterMu sync.RWMutex
+	filterId int
+	filters  map[int]*core.Filter
+
+	quit chan struct{}
+}
+
+func NewFilterManager(mux *event.TypeMux) *FilterManager {
+	return &FilterManager{
+		eventMux: mux,
+		filters:  make(map[int]*core.Filter),
+	}
+}
+
+func (self *FilterManager) Start() {
+	go self.filterLoop()
+}
+
+func (self *FilterManager) Stop() {
+	close(self.quit)
+}
+
+func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) {
+	self.filterMu.Lock()
+	id = self.filterId
+	self.filters[id] = filter
+	self.filterId++
+	self.filterMu.Unlock()
+	return id
+}
+
+func (self *FilterManager) UninstallFilter(id int) {
+	self.filterMu.Lock()
+	delete(self.filters, id)
+	self.filterMu.Unlock()
+}
+
+// GetFilter retrieves a filter installed using InstallFilter.
+// The filter may not be modified.
+func (self *FilterManager) GetFilter(id int) *core.Filter {
+	self.filterMu.RLock()
+	defer self.filterMu.RUnlock()
+	return self.filters[id]
+}
+
+func (self *FilterManager) filterLoop() {
+	// Subscribe to events
+	events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
+
+out:
+	for {
+		select {
+		case <-self.quit:
+			break out
+		case event := <-events.Chan():
+			switch event := event.(type) {
+			case core.NewBlockEvent:
+				self.filterMu.RLock()
+				for _, filter := range self.filters {
+					if filter.BlockCallback != nil {
+						filter.BlockCallback(event.Block)
+					}
+				}
+				self.filterMu.RUnlock()
+
+			case state.Messages:
+				self.filterMu.RLock()
+				for _, filter := range self.filters {
+					if filter.MessageCallback != nil {
+						msgs := filter.FilterMessages(event)
+						if len(msgs) > 0 {
+							filter.MessageCallback(msgs)
+						}
+					}
+				}
+				self.filterMu.RUnlock()
+			}
+		}
+	}
+}