Selaa lähdekoodia

Reorg filter logic to XEth

Taylor Gerring 10 vuotta sitten
vanhempi
commit
6c04c19eb4
6 muutettua tiedostoa jossa 273 lisäystä ja 203 poistoa
  1. 1 1
      core/filter.go
  2. 3 1
      event/filter/eth_filter.go
  3. 23 168
      rpc/api.go
  4. 32 32
      rpc/api_test.go
  5. 10 0
      rpc/args.go
  6. 204 1
      xeth/xeth.go

+ 1 - 1
core/filter.go

@@ -46,7 +46,7 @@ func NewFilter(eth Backend) *Filter {
 
 // SetOptions copies the filter options to the filter it self. The reason for this "silly" copy
 // is simply because named arguments in this case is extremely nice and readable.
-func (self *Filter) SetOptions(options FilterOptions) {
+func (self *Filter) SetOptions(options *FilterOptions) {
 	self.earliest = options.Earliest
 	self.latest = options.Latest
 	self.skip = options.Skip

+ 3 - 1
event/filter/eth_filter.go

@@ -48,7 +48,9 @@ func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) {
 func (self *FilterManager) UninstallFilter(id int) {
 	self.filterMu.Lock()
 	defer self.filterMu.Unlock()
-	delete(self.filters, id)
+	if _, ok := self.filters[id]; ok {
+		delete(self.filters, id)
+	}
 }
 
 // GetFilter retrieves a filter installed using InstallFilter.

+ 23 - 168
rpc/api.go

@@ -7,7 +7,6 @@ import (
 	"path"
 	"strings"
 	"sync"
-	"time"
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
@@ -15,15 +14,13 @@ import (
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/event"
-	"github.com/ethereum/go-ethereum/event/filter"
 	"github.com/ethereum/go-ethereum/state"
 	"github.com/ethereum/go-ethereum/xeth"
 )
 
 var (
-	defaultGasPrice  = big.NewInt(150000000000)
-	defaultGas       = big.NewInt(500000)
-	filterTickerTime = 5 * time.Minute
+	defaultGasPrice = big.NewInt(150000000000)
+	defaultGas      = big.NewInt(500000)
 )
 
 type EthereumApi struct {
@@ -31,17 +28,9 @@ type EthereumApi struct {
 	xethMu sync.RWMutex
 	mux    *event.TypeMux
 
-	quit          chan struct{}
-	filterManager *filter.FilterManager
-
-	logMut sync.RWMutex
-	logs   map[int]*logFilter
-
-	messagesMut sync.RWMutex
-	messages    map[int]*whisperFilter
-	// Register keeps a list of accounts and transaction data
-	regmut   sync.Mutex
-	register map[string][]*NewTxArgs
+	// // Register keeps a list of accounts and transaction data
+	// regmut   sync.Mutex
+	// register map[string][]*NewTxArgs
 
 	db common.Database
 }
@@ -49,16 +38,10 @@ type EthereumApi struct {
 func NewEthereumApi(eth *xeth.XEth, dataDir string) *EthereumApi {
 	db, _ := ethdb.NewLDBDatabase(path.Join(dataDir, "dapps"))
 	api := &EthereumApi{
-		eth:           eth,
-		mux:           eth.Backend().EventMux(),
-		quit:          make(chan struct{}),
-		filterManager: filter.NewFilterManager(eth.Backend().EventMux()),
-		logs:          make(map[int]*logFilter),
-		messages:      make(map[int]*whisperFilter),
-		db:            db,
+		eth: eth,
+		mux: eth.Backend().EventMux(),
+		db:  db,
 	}
-	go api.filterManager.Start()
-	go api.start()
 
 	return api
 }
@@ -85,39 +68,6 @@ func (self *EthereumApi) getStateWithNum(num int64) *xeth.State {
 	return self.xethWithStateNum(num).State()
 }
 
-func (self *EthereumApi) start() {
-	timer := time.NewTicker(2 * time.Second)
-done:
-	for {
-		select {
-		case <-timer.C:
-			self.logMut.Lock()
-			self.messagesMut.Lock()
-			for id, filter := range self.logs {
-				if time.Since(filter.timeout) > filterTickerTime {
-					self.filterManager.UninstallFilter(id)
-					delete(self.logs, id)
-				}
-			}
-
-			for id, filter := range self.messages {
-				if time.Since(filter.timeout) > filterTickerTime {
-					self.xeth().Whisper().Unwatch(id)
-					delete(self.messages, id)
-				}
-			}
-			self.messagesMut.Unlock()
-			self.logMut.Unlock()
-		case <-self.quit:
-			break done
-		}
-	}
-}
-
-func (self *EthereumApi) stop() {
-	close(self.quit)
-}
-
 // func (self *EthereumApi) Register(args string, reply *interface{}) error {
 // 	self.regmut.Lock()
 // 	defer self.regmut.Unlock()
@@ -149,91 +99,43 @@ func (self *EthereumApi) stop() {
 // }
 
 func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error {
-	var id int
-	filter := core.NewFilter(self.xeth().Backend())
-	filter.SetOptions(toFilterOptions(args))
-	filter.LogsCallback = func(logs state.Logs) {
-		self.logMut.Lock()
-		defer self.logMut.Unlock()
-
-		self.logs[id].add(logs...)
-	}
-	id = self.filterManager.InstallFilter(filter)
-	self.logs[id] = &logFilter{timeout: time.Now()}
-
+	opts := toFilterOptions(args)
+	id := self.xeth().RegisterFilter(opts)
 	*reply = common.ToHex(big.NewInt(int64(id)).Bytes())
 
 	return nil
 }
 
 func (self *EthereumApi) UninstallFilter(id int, reply *interface{}) error {
-	if _, ok := self.logs[id]; ok {
-		delete(self.logs, id)
-	}
+	*reply = self.xeth().UninstallFilter(id)
 
-	self.filterManager.UninstallFilter(id)
-	*reply = true
 	return nil
 }
 
 func (self *EthereumApi) NewFilterString(args *FilterStringArgs, reply *interface{}) error {
-	var id int
-	filter := core.NewFilter(self.xeth().Backend())
-
-	callback := func(block *types.Block, logs state.Logs) {
-		self.logMut.Lock()
-		defer self.logMut.Unlock()
-
-		for _, log := range logs {
-			self.logs[id].add(log)
-		}
-		self.logs[id].add(&state.StateLog{})
-	}
-
-	switch args.Word {
-	case "pending":
-		filter.PendingCallback = callback
-	case "latest":
-		filter.BlockCallback = callback
-	default:
-		return NewValidationError("Word", "Must be `latest` or `pending`")
+	if err := args.requirements(); err != nil {
+		return err
 	}
 
-	id = self.filterManager.InstallFilter(filter)
-	self.logs[id] = &logFilter{timeout: time.Now()}
+	id := self.xeth().NewFilterString(args.Word)
 	*reply = common.ToHex(big.NewInt(int64(id)).Bytes())
-
 	return nil
 }
 
 func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
-	self.logMut.Lock()
-	defer self.logMut.Unlock()
-
-	if self.logs[id] != nil {
-		*reply = NewLogsRes(self.logs[id].get())
-	}
-
+	*reply = NewLogsRes(self.xeth().FilterChanged(id))
 	return nil
 }
 
 func (self *EthereumApi) Logs(id int, reply *interface{}) error {
-	self.logMut.Lock()
-	defer self.logMut.Unlock()
-
-	filter := self.filterManager.GetFilter(id)
-	if filter != nil {
-		*reply = NewLogsRes(filter.Find())
-	}
+	*reply = NewLogsRes(self.xeth().Logs(id))
 
 	return nil
 }
 
 func (self *EthereumApi) AllLogs(args *FilterOptions, reply *interface{}) error {
-	filter := core.NewFilter(self.xeth().Backend())
-	filter.SetOptions(toFilterOptions(args))
-
-	*reply = NewLogsRes(filter.Find())
+	opts := toFilterOptions(args)
+	*reply = NewLogsRes(self.xeth().AllLogs(opts))
 
 	return nil
 }
@@ -385,36 +287,22 @@ func (p *EthereumApi) NewWhisperIdentity(reply *interface{}) error {
 // }
 
 func (p *EthereumApi) NewWhisperFilter(args *WhisperFilterArgs, reply *interface{}) error {
-	var id int
 	opts := new(xeth.Options)
 	opts.From = args.From
 	opts.To = args.To
 	opts.Topics = args.Topics
-	opts.Fn = func(msg xeth.WhisperMessage) {
-		p.messagesMut.Lock()
-		defer p.messagesMut.Unlock()
-		p.messages[id].add(msg) // = append(p.messages[id], msg)
-	}
-	id = p.xeth().Whisper().Watch(opts)
-	p.messages[id] = &whisperFilter{timeout: time.Now()}
+	id := p.xeth().NewWhisperFilter(opts)
 	*reply = common.ToHex(big.NewInt(int64(id)).Bytes())
 	return nil
 }
 
 func (p *EthereumApi) UninstallWhisperFilter(id int, reply *interface{}) error {
-	delete(p.messages, id)
-	*reply = true
+	*reply = p.xeth().UninstallWhisperFilter(id)
 	return nil
 }
 
 func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error {
-	self.messagesMut.Lock()
-	defer self.messagesMut.Unlock()
-
-	if self.messages[id] != nil {
-		*reply = self.messages[id].get()
-	}
-
+	*reply = self.xeth().MessagesChanged(id)
 	return nil
 }
 
@@ -835,7 +723,7 @@ func (self *EthereumApi) xeth() *xeth.XEth {
 	return self.eth
 }
 
-func toFilterOptions(options *FilterOptions) core.FilterOptions {
+func toFilterOptions(options *FilterOptions) *core.FilterOptions {
 	var opts core.FilterOptions
 
 	// Convert optional address slice/string to byte slice
@@ -868,38 +756,5 @@ func toFilterOptions(options *FilterOptions) core.FilterOptions {
 	}
 	opts.Topics = topics
 
-	return opts
-}
-
-type whisperFilter struct {
-	messages []xeth.WhisperMessage
-	timeout  time.Time
-	id       int
-}
-
-func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) {
-	w.messages = append(w.messages, msgs...)
-}
-func (w *whisperFilter) get() []xeth.WhisperMessage {
-	w.timeout = time.Now()
-	tmp := w.messages
-	w.messages = nil
-	return tmp
-}
-
-type logFilter struct {
-	logs    state.Logs
-	timeout time.Time
-	id      int
-}
-
-func (l *logFilter) add(logs ...state.Log) {
-	l.logs = append(l.logs, logs...)
-}
-
-func (l *logFilter) get() state.Logs {
-	l.timeout = time.Now()
-	tmp := l.logs
-	l.logs = nil
-	return tmp
+	return &opts
 }

+ 32 - 32
rpc/api_test.go

@@ -2,9 +2,9 @@ package rpc
 
 import (
 	"encoding/json"
-	"sync"
+	// "sync"
 	"testing"
-	"time"
+	// "time"
 )
 
 func TestWeb3Sha3(t *testing.T) {
@@ -24,33 +24,33 @@ func TestWeb3Sha3(t *testing.T) {
 	}
 }
 
-func TestFilterClose(t *testing.T) {
-	t.Skip()
-	api := &EthereumApi{
-		logs:     make(map[int]*logFilter),
-		messages: make(map[int]*whisperFilter),
-		quit:     make(chan struct{}),
-	}
-
-	filterTickerTime = 1
-	api.logs[0] = &logFilter{}
-	api.messages[0] = &whisperFilter{}
-	var wg sync.WaitGroup
-	wg.Add(1)
-	go api.start()
-	go func() {
-		select {
-		case <-time.After(500 * time.Millisecond):
-			api.stop()
-			wg.Done()
-		}
-	}()
-	wg.Wait()
-	if len(api.logs) != 0 {
-		t.Error("expected logs to be empty")
-	}
-
-	if len(api.messages) != 0 {
-		t.Error("expected messages to be empty")
-	}
-}
+// func TestFilterClose(t *testing.T) {
+// 	t.Skip()
+// 	api := &EthereumApi{
+// 		logs:     make(map[int]*logFilter),
+// 		messages: make(map[int]*whisperFilter),
+// 		quit:     make(chan struct{}),
+// 	}
+
+// 	filterTickerTime = 1
+// 	api.logs[0] = &logFilter{}
+// 	api.messages[0] = &whisperFilter{}
+// 	var wg sync.WaitGroup
+// 	wg.Add(1)
+// 	go api.start()
+// 	go func() {
+// 		select {
+// 		case <-time.After(500 * time.Millisecond):
+// 			api.stop()
+// 			wg.Done()
+// 		}
+// 	}()
+// 	wg.Wait()
+// 	if len(api.logs) != 0 {
+// 		t.Error("expected logs to be empty")
+// 	}
+
+// 	if len(api.messages) != 0 {
+// 		t.Error("expected messages to be empty")
+// 	}
+// }

+ 10 - 0
rpc/args.go

@@ -609,6 +609,16 @@ func (args *FilterStringArgs) UnmarshalJSON(b []byte) (err error) {
 	return nil
 }
 
+func (args *FilterStringArgs) requirements() error {
+	switch args.Word {
+	case "latest", "pending":
+		break
+	default:
+		return NewValidationError("Word", "Must be `latest` or `pending`")
+	}
+	return nil
+}
+
 type FilterIdArgs struct {
 	Id int
 }

+ 204 - 1
xeth/xeth.go

@@ -6,6 +6,8 @@ import (
 	"encoding/json"
 	"fmt"
 	"math/big"
+	"sync"
+	"time"
 
 	"github.com/ethereum/go-ethereum/accounts"
 	"github.com/ethereum/go-ethereum/common"
@@ -13,13 +15,17 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/event"
+	"github.com/ethereum/go-ethereum/event/filter"
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/p2p"
 	"github.com/ethereum/go-ethereum/state"
 	"github.com/ethereum/go-ethereum/whisper"
 )
 
-var pipelogger = logger.NewLogger("XETH")
+var (
+	pipelogger       = logger.NewLogger("XETH")
+	filterTickerTime = 5 * time.Minute
+)
 
 // to resolve the import cycle
 type Backend interface {
@@ -71,6 +77,15 @@ type XEth struct {
 	whisper        *Whisper
 
 	frontend Frontend
+
+	quit          chan struct{}
+	filterManager *filter.FilterManager
+
+	logMut sync.RWMutex
+	logs   map[int]*logFilter
+
+	messagesMut sync.RWMutex
+	messages    map[int]*whisperFilter
 }
 
 // dummyFrontend is a non-interactive frontend that allows all
@@ -90,15 +105,55 @@ func New(eth Backend, frontend Frontend) *XEth {
 		chainManager:   eth.ChainManager(),
 		accountManager: eth.AccountManager(),
 		whisper:        NewWhisper(eth.Whisper()),
+		quit:           make(chan struct{}),
+		filterManager:  filter.NewFilterManager(eth.EventMux()),
 		frontend:       frontend,
+		logs:           make(map[int]*logFilter),
+		messages:       make(map[int]*whisperFilter),
 	}
 	if frontend == nil {
 		xeth.frontend = dummyFrontend{}
 	}
 	xeth.state = NewState(xeth, xeth.chainManager.TransState())
+	go xeth.start()
+	go xeth.filterManager.Start()
+
 	return xeth
 }
 
+func (self *XEth) start() {
+	timer := time.NewTicker(2 * time.Second)
+done:
+	for {
+		select {
+		case <-timer.C:
+			self.logMut.Lock()
+			self.messagesMut.Lock()
+			for id, filter := range self.logs {
+				if time.Since(filter.timeout) > filterTickerTime {
+					self.filterManager.UninstallFilter(id)
+					delete(self.logs, id)
+				}
+			}
+
+			for id, filter := range self.messages {
+				if time.Since(filter.timeout) > filterTickerTime {
+					self.Whisper().Unwatch(id)
+					delete(self.messages, id)
+				}
+			}
+			self.messagesMut.Unlock()
+			self.logMut.Unlock()
+		case <-self.quit:
+			break done
+		}
+	}
+}
+
+func (self *XEth) stop() {
+	close(self.quit)
+}
+
 func (self *XEth) Backend() Backend { return self.eth }
 func (self *XEth) WithState(statedb *state.StateDB) *XEth {
 	xeth := &XEth{
@@ -241,6 +296,121 @@ func (self *XEth) SecretToAddress(key string) string {
 	return common.ToHex(pair.Address())
 }
 
+func (self *XEth) RegisterFilter(args *core.FilterOptions) int {
+	var id int
+	filter := core.NewFilter(self.Backend())
+	filter.SetOptions(args)
+	filter.LogsCallback = func(logs state.Logs) {
+		self.logMut.Lock()
+		defer self.logMut.Unlock()
+
+		self.logs[id].add(logs...)
+	}
+	id = self.filterManager.InstallFilter(filter)
+	self.logs[id] = &logFilter{timeout: time.Now()}
+
+	return id
+}
+
+func (self *XEth) UninstallFilter(id int) bool {
+	if _, ok := self.logs[id]; ok {
+		delete(self.logs, id)
+		self.filterManager.UninstallFilter(id)
+		return true
+	}
+
+	return false
+}
+
+func (self *XEth) NewFilterString(word string) int {
+	var id int
+	filter := core.NewFilter(self.Backend())
+
+	callback := func(block *types.Block, logs state.Logs) {
+		self.logMut.Lock()
+		defer self.logMut.Unlock()
+
+		for _, log := range logs {
+			self.logs[id].add(log)
+		}
+		self.logs[id].add(&state.StateLog{})
+	}
+
+	switch word {
+	case "pending":
+		filter.PendingCallback = callback
+	case "latest":
+		filter.BlockCallback = callback
+	}
+
+	id = self.filterManager.InstallFilter(filter)
+	self.logs[id] = &logFilter{timeout: time.Now()}
+
+	return id
+}
+
+func (self *XEth) FilterChanged(id int) state.Logs {
+	self.logMut.Lock()
+	defer self.logMut.Unlock()
+
+	if self.logs[id] != nil {
+		return self.logs[id].get()
+	}
+
+	return nil
+}
+
+func (self *XEth) Logs(id int) state.Logs {
+	self.logMut.Lock()
+	defer self.logMut.Unlock()
+
+	filter := self.filterManager.GetFilter(id)
+	if filter != nil {
+		return filter.Find()
+	}
+
+	return nil
+}
+
+func (self *XEth) AllLogs(args *core.FilterOptions) state.Logs {
+	filter := core.NewFilter(self.Backend())
+	filter.SetOptions(args)
+
+	return filter.Find()
+}
+
+func (p *XEth) NewWhisperFilter(opts *Options) int {
+	var id int
+	opts.Fn = func(msg WhisperMessage) {
+		p.messagesMut.Lock()
+		defer p.messagesMut.Unlock()
+		p.messages[id].add(msg) // = append(p.messages[id], msg)
+	}
+	id = p.Whisper().Watch(opts)
+	p.messages[id] = &whisperFilter{timeout: time.Now()}
+	return id
+}
+
+func (p *XEth) UninstallWhisperFilter(id int) bool {
+	if _, ok := p.messages[id]; ok {
+		delete(p.messages, id)
+		return true
+	}
+
+	return false
+}
+
+func (self *XEth) MessagesChanged(id int) []WhisperMessage {
+	self.messagesMut.Lock()
+	defer self.messagesMut.Unlock()
+
+	if self.messages[id] != nil {
+		return self.messages[id].get()
+	}
+
+	return nil
+}
+
 type KeyVal struct {
 	Key   string `json:"key"`
 	Value string `json:"value"`
@@ -411,3 +581,36 @@ func (m callmsg) GasPrice() *big.Int { return m.gasPrice }
 func (m callmsg) Gas() *big.Int      { return m.gas }
 func (m callmsg) Value() *big.Int    { return m.value }
 func (m callmsg) Data() []byte       { return m.data }
+
+type whisperFilter struct {
+	messages []WhisperMessage
+	timeout  time.Time
+	id       int
+}
+
+func (w *whisperFilter) add(msgs ...WhisperMessage) {
+	w.messages = append(w.messages, msgs...)
+}
+func (w *whisperFilter) get() []WhisperMessage {
+	w.timeout = time.Now()
+	tmp := w.messages
+	w.messages = nil
+	return tmp
+}
+
+type logFilter struct {
+	logs    state.Logs
+	timeout time.Time
+	id      int
+}
+
+func (l *logFilter) add(logs ...state.Log) {
+	l.logs = append(l.logs, logs...)
+}
+
+func (l *logFilter) get() state.Logs {
+	l.timeout = time.Now()
+	tmp := l.logs
+	l.logs = nil
+	return tmp
+}