Forráskód Böngészése

xeth, core, event/filter, rpc: new block and transaction filters

obscuren 10 éve
szülő
commit
7e160a677d
5 módosított fájl, 198 hozzáadás és 83 törlés
  1. 3 3
      core/filter.go
  2. 2 2
      event/filter/eth_filter.go
  3. 16 7
      rpc/api.go
  4. 11 0
      rpc/responses.go
  5. 166 71
      xeth/xeth.go

+ 3 - 3
core/filter.go

@@ -22,9 +22,9 @@ type Filter struct {
 	max      int
 	topics   [][]common.Hash
 
-	BlockCallback   func(*types.Block, state.Logs)
-	PendingCallback func(*types.Transaction)
-	LogsCallback    func(state.Logs)
+	BlockCallback       func(*types.Block, state.Logs)
+	TransactionCallback func(*types.Transaction)
+	LogsCallback        func(state.Logs)
 }
 
 // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block

+ 2 - 2
event/filter/eth_filter.go

@@ -88,8 +88,8 @@ out:
 			case core.TxPreEvent:
 				self.filterMu.RLock()
 				for _, filter := range self.filters {
-					if filter.PendingCallback != nil {
-						filter.PendingCallback(event.Tx)
+					if filter.TransactionCallback != nil {
+						filter.TransactionCallback(event.Tx)
 					}
 				}
 				self.filterMu.RUnlock()

+ 16 - 7
rpc/api.go

@@ -322,14 +322,13 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
 			return err
 		}
 
-		id := api.xeth().RegisterFilter(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics)
+		id := api.xeth().NewLogFilter(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics)
 		*reply = newHexNum(big.NewInt(int64(id)).Bytes())
+
 	case "eth_newBlockFilter":
-		args := new(FilterStringArgs)
-		if err := json.Unmarshal(req.Params, &args); err != nil {
-			return err
-		}
-		*reply = newHexNum(api.xeth().NewFilterString(args.Word))
+		*reply = newHexNum(api.xeth().NewBlockFilter())
+	case "eth_transactionFilter":
+		*reply = newHexNum(api.xeth().NewTransactionFilter())
 	case "eth_uninstallFilter":
 		args := new(FilterIdArgs)
 		if err := json.Unmarshal(req.Params, &args); err != nil {
@@ -341,7 +340,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
 		if err := json.Unmarshal(req.Params, &args); err != nil {
 			return err
 		}
-		*reply = NewLogsRes(api.xeth().FilterChanged(args.Id))
+
+		switch api.xeth().GetFilterType(args.Id) {
+		case xeth.BlockFilterTy:
+			*reply = NewHashesRes(api.xeth().BlockFilterChanged(args.Id))
+		case xeth.TransactionFilterTy:
+			*reply = NewHashesRes(api.xeth().TransactionFilterChanged(args.Id))
+		case xeth.LogFilterTy:
+			*reply = NewLogsRes(api.xeth().LogFilterChanged(args.Id))
+		default:
+			*reply = []string{} // reply empty string slice
+		}
 	case "eth_getFilterLogs":
 		args := new(FilterIdArgs)
 		if err := json.Unmarshal(req.Params, &args); err != nil {

+ 11 - 0
rpc/responses.go

@@ -3,6 +3,7 @@ package rpc
 import (
 	"encoding/json"
 
+	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/state"
 	"github.com/ethereum/go-ethereum/core/types"
 )
@@ -303,3 +304,13 @@ func NewLogsRes(logs state.Logs) (ls []LogRes) {
 
 	return
 }
+
+func NewHashesRes(hs []common.Hash) []string {
+	hashes := make([]string, len(hs))
+
+	for i, hash := range hs {
+		hashes[i] = hash.Hex()
+	}
+
+	return hashes
+}

+ 166 - 71
xeth/xeth.go

@@ -29,6 +29,14 @@ var (
 	defaultGas       = big.NewInt(90000)          //500000
 )
 
+// byte will be inferred
+const (
+	UnknownFilterTy = iota
+	BlockFilterTy
+	TransactionFilterTy
+	LogFilterTy
+)
+
 func DefaultGas() *big.Int      { return new(big.Int).Set(defaultGas) }
 func DefaultGasPrice() *big.Int { return new(big.Int).Set(defaultGasPrice) }
 
@@ -42,11 +50,17 @@ type XEth struct {
 	quit          chan struct{}
 	filterManager *filter.FilterManager
 
-	logMut sync.RWMutex
-	logs   map[int]*logFilter
+	logMu    sync.RWMutex
+	logQueue map[int]*logQueue
+
+	blockMu    sync.RWMutex
+	blockQueue map[int]*hashQueue
+
+	transactionMu    sync.RWMutex
+	transactionQueue map[int]*hashQueue
 
-	messagesMut sync.RWMutex
-	messages    map[int]*whisperFilter
+	messagesMu sync.RWMutex
+	messages   map[int]*whisperFilter
 
 	// regmut   sync.Mutex
 	// register map[string][]*interface{} // TODO improve return type
@@ -59,14 +73,16 @@ type XEth struct {
 // confirms all transactions will be used.
 func New(eth *eth.Ethereum, frontend Frontend) *XEth {
 	xeth := &XEth{
-		backend:       eth,
-		frontend:      frontend,
-		whisper:       NewWhisper(eth.Whisper()),
-		quit:          make(chan struct{}),
-		filterManager: filter.NewFilterManager(eth.EventMux()),
-		logs:          make(map[int]*logFilter),
-		messages:      make(map[int]*whisperFilter),
-		agent:         miner.NewRemoteAgent(),
+		backend:          eth,
+		frontend:         frontend,
+		whisper:          NewWhisper(eth.Whisper()),
+		quit:             make(chan struct{}),
+		filterManager:    filter.NewFilterManager(eth.EventMux()),
+		logQueue:         make(map[int]*logQueue),
+		blockQueue:       make(map[int]*hashQueue),
+		transactionQueue: make(map[int]*hashQueue),
+		messages:         make(map[int]*whisperFilter),
+		agent:            miner.NewRemoteAgent(),
 	}
 	eth.Miner().Register(xeth.agent)
 
@@ -87,23 +103,41 @@ done:
 	for {
 		select {
 		case <-timer.C:
-			self.logMut.Lock()
-			self.messagesMut.Lock()
-			for id, filter := range self.logs {
+			self.logMu.Lock()
+			for id, filter := range self.logQueue {
+				if time.Since(filter.timeout) > filterTickerTime {
+					self.filterManager.UninstallFilter(id)
+					delete(self.logQueue, id)
+				}
+			}
+			self.logMu.Unlock()
+
+			self.blockMu.Lock()
+			for id, filter := range self.blockQueue {
+				if time.Since(filter.timeout) > filterTickerTime {
+					self.filterManager.UninstallFilter(id)
+					delete(self.blockQueue, id)
+				}
+			}
+			self.blockMu.Unlock()
+
+			self.transactionMu.Lock()
+			for id, filter := range self.transactionQueue {
 				if time.Since(filter.timeout) > filterTickerTime {
 					self.filterManager.UninstallFilter(id)
-					delete(self.logs, id)
+					delete(self.transactionQueue, id)
 				}
 			}
+			self.transactionMu.Unlock()
 
+			self.messagesMu.Lock()
 			for id, filter := range self.messages {
 				if time.Since(filter.activity()) > filterTickerTime {
 					self.Whisper().Unwatch(id)
 					delete(self.messages, id)
 				}
 			}
-			self.messagesMut.Unlock()
-			self.logMut.Unlock()
+			self.messagesMu.Unlock()
 		case <-self.quit:
 			break done
 		}
@@ -360,7 +394,32 @@ func (self *XEth) SecretToAddress(key string) string {
 	return common.ToHex(pair.Address())
 }
 
-func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int {
+func (self *XEth) UninstallFilter(id int) bool {
+	defer self.filterManager.UninstallFilter(id)
+
+	if _, ok := self.logQueue[id]; ok {
+		self.logMu.Lock()
+		defer self.logMu.Unlock()
+		delete(self.logQueue, id)
+		return true
+	}
+	if _, ok := self.blockQueue[id]; ok {
+		self.blockMu.Lock()
+		defer self.blockMu.Unlock()
+		delete(self.blockQueue, id)
+		return true
+	}
+	if _, ok := self.transactionQueue[id]; ok {
+		self.transactionMu.Lock()
+		defer self.transactionMu.Unlock()
+		delete(self.transactionQueue, id)
+		return true
+	}
+
+	return false
+}
+
+func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int {
 	var id int
 	filter := core.NewFilter(self.backend)
 	filter.SetEarliestBlock(earliest)
@@ -370,71 +429,90 @@ func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address
 	filter.SetAddress(cAddress(address))
 	filter.SetTopics(cTopics(topics))
 	filter.LogsCallback = func(logs state.Logs) {
-		self.logMut.Lock()
-		defer self.logMut.Unlock()
+		self.logMu.Lock()
+		defer self.logMu.Unlock()
 
-		self.logs[id].add(logs...)
+		self.logQueue[id].add(logs...)
 	}
 	id = self.filterManager.InstallFilter(filter)
-	self.logs[id] = &logFilter{timeout: time.Now()}
+	self.logQueue[id] = &logQueue{timeout: time.Now()}
 
 	return id
 }
 
-func (self *XEth) UninstallFilter(id int) bool {
-	if _, ok := self.logs[id]; ok {
-		delete(self.logs, id)
-		self.filterManager.UninstallFilter(id)
-		return true
-	}
+func (self *XEth) NewTransactionFilter() int {
+	var id int
+	filter := core.NewFilter(self.backend)
+	filter.TransactionCallback = func(tx *types.Transaction) {
+		self.transactionMu.Lock()
+		defer self.transactionMu.Unlock()
 
-	return false
+		self.transactionQueue[id].add(tx.Hash())
+	}
+	id = self.filterManager.InstallFilter(filter)
+	self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
+	return id
 }
 
-func (self *XEth) NewFilterString(word string) int {
+func (self *XEth) NewBlockFilter() int {
 	var id int
 	filter := core.NewFilter(self.backend)
+	filter.BlockCallback = func(block *types.Block, logs state.Logs) {
+		self.blockMu.Lock()
+		defer self.blockMu.Unlock()
 
-	switch word {
-	case "pending":
-		filter.PendingCallback = func(tx *types.Transaction) {
-			self.logMut.Lock()
-			defer self.logMut.Unlock()
-
-			self.logs[id].add(&state.Log{})
-		}
-	case "latest":
-		filter.BlockCallback = func(block *types.Block, logs state.Logs) {
-			self.logMut.Lock()
-			defer self.logMut.Unlock()
+		self.blockQueue[id].add(block.Hash())
+	}
+	id = self.filterManager.InstallFilter(filter)
+	self.blockQueue[id] = &hashQueue{timeout: time.Now()}
+	return id
+}
 
-			for _, log := range logs {
-				self.logs[id].add(log)
-			}
-			self.logs[id].add(&state.Log{})
-		}
+func (self *XEth) GetFilterType(id int) byte {
+	if _, ok := self.blockQueue[id]; ok {
+		return BlockFilterTy
+	} else if _, ok := self.transactionQueue[id]; ok {
+		return TransactionFilterTy
+	} else if _, ok := self.logQueue[id]; ok {
+		return LogFilterTy
 	}
 
-	id = self.filterManager.InstallFilter(filter)
-	self.logs[id] = &logFilter{timeout: time.Now()}
+	return UnknownFilterTy
+}
 
-	return id
+func (self *XEth) LogFilterChanged(id int) state.Logs {
+	self.logMu.Lock()
+	defer self.logMu.Unlock()
+
+	if self.logQueue[id] != nil {
+		return self.logQueue[id].get()
+	}
+	return nil
 }
 
-func (self *XEth) FilterChanged(id int) state.Logs {
-	self.logMut.Lock()
-	defer self.logMut.Unlock()
+func (self *XEth) BlockFilterChanged(id int) []common.Hash {
+	self.blockMu.Lock()
+	defer self.blockMu.Unlock()
 
-	if self.logs[id] != nil {
-		return self.logs[id].get()
+	if self.blockQueue[id] != nil {
+		return self.blockQueue[id].get()
 	}
+	return nil
+}
+
+func (self *XEth) TransactionFilterChanged(id int) []common.Hash {
+	self.blockMu.Lock()
+	defer self.blockMu.Unlock()
 
+	if self.blockQueue[id] != nil {
+		return self.transactionQueue[id].get()
+	}
 	return nil
 }
 
 func (self *XEth) Logs(id int) state.Logs {
-	self.logMut.Lock()
-	defer self.logMut.Unlock()
+	self.logMu.Lock()
+	defer self.logMu.Unlock()
 
 	filter := self.filterManager.GetFilter(id)
 	if filter != nil {
@@ -465,24 +543,24 @@ func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int {
 
 	// Callback to delegate core whisper messages to this xeth filter
 	callback := func(msg WhisperMessage) {
-		p.messagesMut.RLock() // Only read lock to the filter pool
-		defer p.messagesMut.RUnlock()
+		p.messagesMu.RLock() // Only read lock to the filter pool
+		defer p.messagesMu.RUnlock()
 		p.messages[id].insert(msg)
 	}
 	// Initialize the core whisper filter and wrap into xeth
 	id = p.Whisper().Watch(to, from, topics, callback)
 
-	p.messagesMut.Lock()
+	p.messagesMu.Lock()
 	p.messages[id] = newWhisperFilter(id, p.Whisper())
-	p.messagesMut.Unlock()
+	p.messagesMu.Unlock()
 
 	return id
 }
 
 // UninstallWhisperFilter disables and removes an existing filter.
 func (p *XEth) UninstallWhisperFilter(id int) bool {
-	p.messagesMut.Lock()
-	defer p.messagesMut.Unlock()
+	p.messagesMu.Lock()
+	defer p.messagesMu.Unlock()
 
 	if _, ok := p.messages[id]; ok {
 		delete(p.messages, id)
@@ -493,8 +571,8 @@ func (p *XEth) UninstallWhisperFilter(id int) bool {
 
 // WhisperMessages retrieves all the known messages that match a specific filter.
 func (self *XEth) WhisperMessages(id int) []WhisperMessage {
-	self.messagesMut.RLock()
-	defer self.messagesMut.RUnlock()
+	self.messagesMu.RLock()
+	defer self.messagesMu.RUnlock()
 
 	if self.messages[id] != nil {
 		return self.messages[id].messages()
@@ -505,8 +583,8 @@ func (self *XEth) WhisperMessages(id int) []WhisperMessage {
 // WhisperMessagesChanged retrieves all the new messages matched by a filter
 // since the last retrieval
 func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage {
-	self.messagesMut.RLock()
-	defer self.messagesMut.RUnlock()
+	self.messagesMu.RLock()
+	defer self.messagesMu.RUnlock()
 
 	if self.messages[id] != nil {
 		return self.messages[id].retrieve()
@@ -767,19 +845,36 @@ func (m callmsg) Gas() *big.Int                 { return m.gas }
 func (m callmsg) Value() *big.Int               { return m.value }
 func (m callmsg) Data() []byte                  { return m.data }
 
-type logFilter struct {
+type logQueue struct {
 	logs    state.Logs
 	timeout time.Time
 	id      int
 }
 
-func (l *logFilter) add(logs ...*state.Log) {
+func (l *logQueue) add(logs ...*state.Log) {
 	l.logs = append(l.logs, logs...)
 }
 
-func (l *logFilter) get() state.Logs {
+func (l *logQueue) get() state.Logs {
 	l.timeout = time.Now()
 	tmp := l.logs
 	l.logs = nil
 	return tmp
 }
+
+type hashQueue struct {
+	hashes  []common.Hash
+	timeout time.Time
+	id      int
+}
+
+func (l *hashQueue) add(hashes ...common.Hash) {
+	l.hashes = append(l.hashes, hashes...)
+}
+
+func (l *hashQueue) get() []common.Hash {
+	l.timeout = time.Now()
+	tmp := l.hashes
+	l.hashes = nil
+	return tmp
+}