|
|
@@ -13,6 +13,7 @@ import (
|
|
|
"math/big"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/core"
|
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
|
@@ -31,13 +32,14 @@ const (
|
|
|
|
|
|
type EthereumApi struct {
|
|
|
xeth *xeth.XEth
|
|
|
+ quit chan struct{}
|
|
|
filterManager *filter.FilterManager
|
|
|
|
|
|
logMut sync.RWMutex
|
|
|
- logs map[int]state.Logs
|
|
|
+ logs map[int]*logFilter
|
|
|
|
|
|
messagesMut sync.RWMutex
|
|
|
- messages map[int][]xeth.WhisperMessage
|
|
|
+ messages map[int]*whisperFilter
|
|
|
// Register keeps a list of accounts and transaction data
|
|
|
regmut sync.Mutex
|
|
|
register map[string][]*NewTxArgs
|
|
|
@@ -49,12 +51,14 @@ func NewEthereumApi(eth *xeth.XEth) *EthereumApi {
|
|
|
db, _ := ethdb.NewLDBDatabase("dapps")
|
|
|
api := &EthereumApi{
|
|
|
xeth: eth,
|
|
|
+ quit: make(chan struct{}),
|
|
|
filterManager: filter.NewFilterManager(eth.Backend().EventMux()),
|
|
|
- logs: make(map[int]state.Logs),
|
|
|
- messages: make(map[int][]xeth.WhisperMessage),
|
|
|
+ logs: make(map[int]*logFilter),
|
|
|
+ messages: make(map[int]*whisperFilter),
|
|
|
db: db,
|
|
|
}
|
|
|
go api.filterManager.Start()
|
|
|
+ go api.start()
|
|
|
|
|
|
return api
|
|
|
}
|
|
|
@@ -97,7 +101,11 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
|
|
|
self.logMut.Lock()
|
|
|
defer self.logMut.Unlock()
|
|
|
|
|
|
- self.logs[id] = append(self.logs[id], logs...)
|
|
|
+ if self.logs[id] == nil {
|
|
|
+ self.logs[id] = &logFilter{timeout: time.Now()}
|
|
|
+ }
|
|
|
+
|
|
|
+ self.logs[id].add(logs...)
|
|
|
}
|
|
|
id = self.filterManager.InstallFilter(filter)
|
|
|
*reply = id
|
|
|
@@ -113,7 +121,11 @@ func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error
|
|
|
self.logMut.Lock()
|
|
|
defer self.logMut.Unlock()
|
|
|
|
|
|
- self.logs[id] = append(self.logs[id], &state.StateLog{})
|
|
|
+ if self.logs[id] == nil {
|
|
|
+ self.logs[id] = &logFilter{timeout: time.Now()}
|
|
|
+ }
|
|
|
+
|
|
|
+ self.logs[id].add(&state.StateLog{})
|
|
|
}
|
|
|
if args == "pending" {
|
|
|
filter.PendingCallback = callback
|
|
|
@@ -131,9 +143,9 @@ func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
|
|
|
self.logMut.Lock()
|
|
|
defer self.logMut.Unlock()
|
|
|
|
|
|
- *reply = toLogs(self.logs[id])
|
|
|
-
|
|
|
- self.logs[id] = nil // empty the logs
|
|
|
+ if self.logs[id] != nil {
|
|
|
+ *reply = toLogs(self.logs[id].get())
|
|
|
+ }
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
@@ -331,7 +343,10 @@ func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) e
|
|
|
args.Fn = func(msg xeth.WhisperMessage) {
|
|
|
p.messagesMut.Lock()
|
|
|
defer p.messagesMut.Unlock()
|
|
|
- p.messages[id] = append(p.messages[id], msg)
|
|
|
+ if p.messages[id] == nil {
|
|
|
+ p.messages[id] = &whisperFilter{timeout: time.Now()}
|
|
|
+ }
|
|
|
+ p.messages[id].add(msg) // = append(p.messages[id], msg)
|
|
|
}
|
|
|
id = p.xeth.Whisper().Watch(args)
|
|
|
*reply = id
|
|
|
@@ -342,9 +357,9 @@ func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error {
|
|
|
self.messagesMut.Lock()
|
|
|
defer self.messagesMut.Unlock()
|
|
|
|
|
|
- *reply = self.messages[id]
|
|
|
-
|
|
|
- self.messages[id] = nil // empty the messages
|
|
|
+ if self.messages[id] != nil {
|
|
|
+ *reply = self.messages[id].get()
|
|
|
+ }
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
@@ -535,3 +550,34 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
|
|
|
rpclogger.DebugDetailf("Reply: %T %s", reply, reply)
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
+var filterTickerTime = 15 * time.Second
|
|
|
+
|
|
|
+func (self *EthereumApi) start() {
|
|
|
+ timer := time.NewTicker(filterTickerTime)
|
|
|
+done:
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-timer.C:
|
|
|
+ self.logMut.Lock()
|
|
|
+ self.messagesMut.Lock()
|
|
|
+ for id, filter := range self.logs {
|
|
|
+ if time.Since(filter.timeout) > 20*time.Second {
|
|
|
+ delete(self.logs, id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for id, filter := range self.messages {
|
|
|
+ if time.Since(filter.timeout) > 20*time.Second {
|
|
|
+ delete(self.messages, id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case <-self.quit:
|
|
|
+ break done
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (self *EthereumApi) stop() {
|
|
|
+ close(self.quit)
|
|
|
+}
|