api.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package filters
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/common/hexutil"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/rpc"
  30. )
  31. // filter is a helper struct that holds meta information over the filter type
  32. // and associated subscription in the event system.
  33. type filter struct {
  34. typ Type
  35. deadline *time.Timer // filter is inactive when deadline triggers
  36. hashes []common.Hash
  37. crit FilterCriteria
  38. logs []*types.Log
  39. s *Subscription // associated subscription in event system
  40. }
  41. // FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
  42. // information related to the Ethereum protocol such als blocks, transactions and logs.
  43. type FilterAPI struct {
  44. sys *FilterSystem
  45. events *EventSystem
  46. filtersMu sync.Mutex
  47. filters map[rpc.ID]*filter
  48. timeout time.Duration
  49. }
  50. // NewFilterAPI returns a new FilterAPI instance.
  51. func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
  52. api := &FilterAPI{
  53. sys: system,
  54. events: NewEventSystem(system, lightMode),
  55. filters: make(map[rpc.ID]*filter),
  56. timeout: system.cfg.Timeout,
  57. }
  58. go api.timeoutLoop(system.cfg.Timeout)
  59. return api
  60. }
  61. // timeoutLoop runs at the interval set by 'timeout' and deletes filters
  62. // that have not been recently used. It is started when the API is created.
  63. func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
  64. var toUninstall []*Subscription
  65. ticker := time.NewTicker(timeout)
  66. defer ticker.Stop()
  67. for {
  68. <-ticker.C
  69. api.filtersMu.Lock()
  70. for id, f := range api.filters {
  71. select {
  72. case <-f.deadline.C:
  73. toUninstall = append(toUninstall, f.s)
  74. delete(api.filters, id)
  75. default:
  76. continue
  77. }
  78. }
  79. api.filtersMu.Unlock()
  80. // Unsubscribes are processed outside the lock to avoid the following scenario:
  81. // event loop attempts broadcasting events to still active filters while
  82. // Unsubscribe is waiting for it to process the uninstall request.
  83. for _, s := range toUninstall {
  84. s.Unsubscribe()
  85. }
  86. toUninstall = nil
  87. }
  88. }
  89. // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
  90. // as transactions enter the pending state.
  91. //
  92. // It is part of the filter package because this filter can be used through the
  93. // `eth_getFilterChanges` polling method that is also used for log filters.
  94. func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
  95. var (
  96. pendingTxs = make(chan []common.Hash)
  97. pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
  98. )
  99. api.filtersMu.Lock()
  100. api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
  101. api.filtersMu.Unlock()
  102. go func() {
  103. for {
  104. select {
  105. case ph := <-pendingTxs:
  106. api.filtersMu.Lock()
  107. if f, found := api.filters[pendingTxSub.ID]; found {
  108. f.hashes = append(f.hashes, ph...)
  109. }
  110. api.filtersMu.Unlock()
  111. case <-pendingTxSub.Err():
  112. api.filtersMu.Lock()
  113. delete(api.filters, pendingTxSub.ID)
  114. api.filtersMu.Unlock()
  115. return
  116. }
  117. }
  118. }()
  119. return pendingTxSub.ID
  120. }
  121. // NewPendingTransactions creates a subscription that is triggered each time a transaction
  122. // enters the transaction pool and was signed from one of the transactions this nodes manages.
  123. func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
  124. notifier, supported := rpc.NotifierFromContext(ctx)
  125. if !supported {
  126. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  127. }
  128. rpcSub := notifier.CreateSubscription()
  129. go func() {
  130. txHashes := make(chan []common.Hash, 128)
  131. pendingTxSub := api.events.SubscribePendingTxs(txHashes)
  132. for {
  133. select {
  134. case hashes := <-txHashes:
  135. // To keep the original behaviour, send a single tx hash in one notification.
  136. // TODO(rjl493456442) Send a batch of tx hashes in one notification
  137. for _, h := range hashes {
  138. notifier.Notify(rpcSub.ID, h)
  139. }
  140. case <-rpcSub.Err():
  141. pendingTxSub.Unsubscribe()
  142. return
  143. case <-notifier.Closed():
  144. pendingTxSub.Unsubscribe()
  145. return
  146. }
  147. }
  148. }()
  149. return rpcSub, nil
  150. }
  151. // NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
  152. // It is part of the filter package since polling goes with eth_getFilterChanges.
  153. func (api *FilterAPI) NewBlockFilter() rpc.ID {
  154. var (
  155. headers = make(chan *types.Header)
  156. headerSub = api.events.SubscribeNewHeads(headers)
  157. )
  158. api.filtersMu.Lock()
  159. api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
  160. api.filtersMu.Unlock()
  161. go func() {
  162. for {
  163. select {
  164. case h := <-headers:
  165. api.filtersMu.Lock()
  166. if f, found := api.filters[headerSub.ID]; found {
  167. f.hashes = append(f.hashes, h.Hash())
  168. }
  169. api.filtersMu.Unlock()
  170. case <-headerSub.Err():
  171. api.filtersMu.Lock()
  172. delete(api.filters, headerSub.ID)
  173. api.filtersMu.Unlock()
  174. return
  175. }
  176. }
  177. }()
  178. return headerSub.ID
  179. }
  180. // NewHeads send a notification each time a new (header) block is appended to the chain.
  181. func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
  182. notifier, supported := rpc.NotifierFromContext(ctx)
  183. if !supported {
  184. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  185. }
  186. rpcSub := notifier.CreateSubscription()
  187. go func() {
  188. headers := make(chan *types.Header)
  189. headersSub := api.events.SubscribeNewHeads(headers)
  190. for {
  191. select {
  192. case h := <-headers:
  193. notifier.Notify(rpcSub.ID, h)
  194. case <-rpcSub.Err():
  195. headersSub.Unsubscribe()
  196. return
  197. case <-notifier.Closed():
  198. headersSub.Unsubscribe()
  199. return
  200. }
  201. }
  202. }()
  203. return rpcSub, nil
  204. }
  205. // Logs creates a subscription that fires for all new log that match the given filter criteria.
  206. func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
  207. notifier, supported := rpc.NotifierFromContext(ctx)
  208. if !supported {
  209. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  210. }
  211. var (
  212. rpcSub = notifier.CreateSubscription()
  213. matchedLogs = make(chan []*types.Log)
  214. )
  215. logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
  216. if err != nil {
  217. return nil, err
  218. }
  219. go func() {
  220. for {
  221. select {
  222. case logs := <-matchedLogs:
  223. for _, log := range logs {
  224. log := log
  225. notifier.Notify(rpcSub.ID, &log)
  226. }
  227. case <-rpcSub.Err(): // client send an unsubscribe request
  228. logsSub.Unsubscribe()
  229. return
  230. case <-notifier.Closed(): // connection dropped
  231. logsSub.Unsubscribe()
  232. return
  233. }
  234. }
  235. }()
  236. return rpcSub, nil
  237. }
  238. // FilterCriteria represents a request to create a new filter.
  239. // Same as ethereum.FilterQuery but with UnmarshalJSON() method.
  240. type FilterCriteria ethereum.FilterQuery
  241. // NewFilter creates a new filter and returns the filter id. It can be
  242. // used to retrieve logs when the state changes. This method cannot be
  243. // used to fetch logs that are already stored in the state.
  244. //
  245. // Default criteria for the from and to block are "latest".
  246. // Using "latest" as block number will return logs for mined blocks.
  247. // Using "pending" as block number returns logs for not yet mined (pending) blocks.
  248. // In case logs are removed (chain reorg) previously returned logs are returned
  249. // again but with the removed property set to true.
  250. //
  251. // In case "fromBlock" > "toBlock" an error is returned.
  252. func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
  253. logs := make(chan []*types.Log)
  254. logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
  255. if err != nil {
  256. return "", err
  257. }
  258. api.filtersMu.Lock()
  259. api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
  260. api.filtersMu.Unlock()
  261. go func() {
  262. for {
  263. select {
  264. case l := <-logs:
  265. api.filtersMu.Lock()
  266. if f, found := api.filters[logsSub.ID]; found {
  267. f.logs = append(f.logs, l...)
  268. }
  269. api.filtersMu.Unlock()
  270. case <-logsSub.Err():
  271. api.filtersMu.Lock()
  272. delete(api.filters, logsSub.ID)
  273. api.filtersMu.Unlock()
  274. return
  275. }
  276. }
  277. }()
  278. return logsSub.ID, nil
  279. }
  280. // GetLogs returns logs matching the given argument that are stored within the state.
  281. func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
  282. var filter *Filter
  283. if crit.BlockHash != nil {
  284. // Block filter requested, construct a single-shot filter
  285. filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
  286. } else {
  287. // Convert the RPC block numbers into internal representations
  288. begin := rpc.LatestBlockNumber.Int64()
  289. if crit.FromBlock != nil {
  290. begin = crit.FromBlock.Int64()
  291. }
  292. end := rpc.LatestBlockNumber.Int64()
  293. if crit.ToBlock != nil {
  294. end = crit.ToBlock.Int64()
  295. }
  296. // Construct the range filter
  297. filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
  298. }
  299. // Run the filter and return all the logs
  300. logs, err := filter.Logs(ctx)
  301. if err != nil {
  302. return nil, err
  303. }
  304. return returnLogs(logs), err
  305. }
  306. // UninstallFilter removes the filter with the given filter id.
  307. func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
  308. api.filtersMu.Lock()
  309. f, found := api.filters[id]
  310. if found {
  311. delete(api.filters, id)
  312. }
  313. api.filtersMu.Unlock()
  314. if found {
  315. f.s.Unsubscribe()
  316. }
  317. return found
  318. }
  319. // GetFilterLogs returns the logs for the filter with the given id.
  320. // If the filter could not be found an empty array of logs is returned.
  321. func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
  322. api.filtersMu.Lock()
  323. f, found := api.filters[id]
  324. api.filtersMu.Unlock()
  325. if !found || f.typ != LogsSubscription {
  326. return nil, fmt.Errorf("filter not found")
  327. }
  328. var filter *Filter
  329. if f.crit.BlockHash != nil {
  330. // Block filter requested, construct a single-shot filter
  331. filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
  332. } else {
  333. // Convert the RPC block numbers into internal representations
  334. begin := rpc.LatestBlockNumber.Int64()
  335. if f.crit.FromBlock != nil {
  336. begin = f.crit.FromBlock.Int64()
  337. }
  338. end := rpc.LatestBlockNumber.Int64()
  339. if f.crit.ToBlock != nil {
  340. end = f.crit.ToBlock.Int64()
  341. }
  342. // Construct the range filter
  343. filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
  344. }
  345. // Run the filter and return all the logs
  346. logs, err := filter.Logs(ctx)
  347. if err != nil {
  348. return nil, err
  349. }
  350. return returnLogs(logs), nil
  351. }
  352. // GetFilterChanges returns the logs for the filter with the given id since
  353. // last time it was called. This can be used for polling.
  354. //
  355. // For pending transaction and block filters the result is []common.Hash.
  356. // (pending)Log filters return []Log.
  357. func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
  358. api.filtersMu.Lock()
  359. defer api.filtersMu.Unlock()
  360. if f, found := api.filters[id]; found {
  361. if !f.deadline.Stop() {
  362. // timer expired but filter is not yet removed in timeout loop
  363. // receive timer value and reset timer
  364. <-f.deadline.C
  365. }
  366. f.deadline.Reset(api.timeout)
  367. switch f.typ {
  368. case PendingTransactionsSubscription, BlocksSubscription:
  369. hashes := f.hashes
  370. f.hashes = nil
  371. return returnHashes(hashes), nil
  372. case LogsSubscription, MinedAndPendingLogsSubscription:
  373. logs := f.logs
  374. f.logs = nil
  375. return returnLogs(logs), nil
  376. }
  377. }
  378. return []interface{}{}, fmt.Errorf("filter not found")
  379. }
  380. // returnHashes is a helper that will return an empty hash array case the given hash array is nil,
  381. // otherwise the given hashes array is returned.
  382. func returnHashes(hashes []common.Hash) []common.Hash {
  383. if hashes == nil {
  384. return []common.Hash{}
  385. }
  386. return hashes
  387. }
  388. // returnLogs is a helper that will return an empty log array in case the given logs array is nil,
  389. // otherwise the given logs array is returned.
  390. func returnLogs(logs []*types.Log) []*types.Log {
  391. if logs == nil {
  392. return []*types.Log{}
  393. }
  394. return logs
  395. }
  396. // UnmarshalJSON sets *args fields with given data.
  397. func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
  398. type input struct {
  399. BlockHash *common.Hash `json:"blockHash"`
  400. FromBlock *rpc.BlockNumber `json:"fromBlock"`
  401. ToBlock *rpc.BlockNumber `json:"toBlock"`
  402. Addresses interface{} `json:"address"`
  403. Topics []interface{} `json:"topics"`
  404. }
  405. var raw input
  406. if err := json.Unmarshal(data, &raw); err != nil {
  407. return err
  408. }
  409. if raw.BlockHash != nil {
  410. if raw.FromBlock != nil || raw.ToBlock != nil {
  411. // BlockHash is mutually exclusive with FromBlock/ToBlock criteria
  412. return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other")
  413. }
  414. args.BlockHash = raw.BlockHash
  415. } else {
  416. if raw.FromBlock != nil {
  417. args.FromBlock = big.NewInt(raw.FromBlock.Int64())
  418. }
  419. if raw.ToBlock != nil {
  420. args.ToBlock = big.NewInt(raw.ToBlock.Int64())
  421. }
  422. }
  423. args.Addresses = []common.Address{}
  424. if raw.Addresses != nil {
  425. // raw.Address can contain a single address or an array of addresses
  426. switch rawAddr := raw.Addresses.(type) {
  427. case []interface{}:
  428. for i, addr := range rawAddr {
  429. if strAddr, ok := addr.(string); ok {
  430. addr, err := decodeAddress(strAddr)
  431. if err != nil {
  432. return fmt.Errorf("invalid address at index %d: %v", i, err)
  433. }
  434. args.Addresses = append(args.Addresses, addr)
  435. } else {
  436. return fmt.Errorf("non-string address at index %d", i)
  437. }
  438. }
  439. case string:
  440. addr, err := decodeAddress(rawAddr)
  441. if err != nil {
  442. return fmt.Errorf("invalid address: %v", err)
  443. }
  444. args.Addresses = []common.Address{addr}
  445. default:
  446. return errors.New("invalid addresses in query")
  447. }
  448. }
  449. // topics is an array consisting of strings and/or arrays of strings.
  450. // JSON null values are converted to common.Hash{} and ignored by the filter manager.
  451. if len(raw.Topics) > 0 {
  452. args.Topics = make([][]common.Hash, len(raw.Topics))
  453. for i, t := range raw.Topics {
  454. switch topic := t.(type) {
  455. case nil:
  456. // ignore topic when matching logs
  457. case string:
  458. // match specific topic
  459. top, err := decodeTopic(topic)
  460. if err != nil {
  461. return err
  462. }
  463. args.Topics[i] = []common.Hash{top}
  464. case []interface{}:
  465. // or case e.g. [null, "topic0", "topic1"]
  466. for _, rawTopic := range topic {
  467. if rawTopic == nil {
  468. // null component, match all
  469. args.Topics[i] = nil
  470. break
  471. }
  472. if topic, ok := rawTopic.(string); ok {
  473. parsed, err := decodeTopic(topic)
  474. if err != nil {
  475. return err
  476. }
  477. args.Topics[i] = append(args.Topics[i], parsed)
  478. } else {
  479. return fmt.Errorf("invalid topic(s)")
  480. }
  481. }
  482. default:
  483. return fmt.Errorf("invalid topic(s)")
  484. }
  485. }
  486. }
  487. return nil
  488. }
  489. func decodeAddress(s string) (common.Address, error) {
  490. b, err := hexutil.Decode(s)
  491. if err == nil && len(b) != common.AddressLength {
  492. err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength)
  493. }
  494. return common.BytesToAddress(b), err
  495. }
  496. func decodeTopic(s string) (common.Hash, error) {
  497. b, err := hexutil.Decode(s)
  498. if err == nil && len(b) != common.HashLength {
  499. err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength)
  500. }
  501. return common.BytesToHash(b), err
  502. }