filter_system.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package filters implements an ethereum filtering system for block,
  17. // transactions and log events.
  18. package filters
  19. import (
  20. "context"
  21. "fmt"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/bloombits"
  28. "github.com/ethereum/go-ethereum/core/rawdb"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/event"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/rpc"
  34. lru "github.com/hashicorp/golang-lru"
  35. )
  36. // Config represents the configuration of the filter system.
  37. type Config struct {
  38. LogCacheSize int // maximum number of cached blocks (default: 32)
  39. Timeout time.Duration // how long filters stay active (default: 5min)
  40. }
  41. func (cfg Config) withDefaults() Config {
  42. if cfg.Timeout == 0 {
  43. cfg.Timeout = 5 * time.Minute
  44. }
  45. if cfg.LogCacheSize == 0 {
  46. cfg.LogCacheSize = 32
  47. }
  48. return cfg
  49. }
  50. type Backend interface {
  51. ChainDb() ethdb.Database
  52. HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
  53. HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
  54. GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
  55. GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
  56. PendingBlockAndReceipts() (*types.Block, types.Receipts)
  57. SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
  58. SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
  59. SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
  60. SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
  61. SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
  62. BloomStatus() (uint64, uint64)
  63. ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
  64. }
  65. // FilterSystem holds resources shared by all filters.
  66. type FilterSystem struct {
  67. backend Backend
  68. logsCache *lru.Cache
  69. cfg *Config
  70. }
  71. // NewFilterSystem creates a filter system.
  72. func NewFilterSystem(backend Backend, config Config) *FilterSystem {
  73. config = config.withDefaults()
  74. cache, err := lru.New(config.LogCacheSize)
  75. if err != nil {
  76. panic(err)
  77. }
  78. return &FilterSystem{
  79. backend: backend,
  80. logsCache: cache,
  81. cfg: &config,
  82. }
  83. }
  84. // cachedGetLogs loads block logs from the backend and caches the result.
  85. func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) {
  86. cached, ok := sys.logsCache.Get(blockHash)
  87. if ok {
  88. return cached.([][]*types.Log), nil
  89. }
  90. logs, err := sys.backend.GetLogs(ctx, blockHash, number)
  91. if err != nil {
  92. return nil, err
  93. }
  94. if logs == nil {
  95. return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString())
  96. }
  97. sys.logsCache.Add(blockHash, logs)
  98. return logs, nil
  99. }
  100. // Type determines the kind of filter and is used to put the filter in to
  101. // the correct bucket when added.
  102. type Type byte
  103. const (
  104. // UnknownSubscription indicates an unknown subscription type
  105. UnknownSubscription Type = iota
  106. // LogsSubscription queries for new or removed (chain reorg) logs
  107. LogsSubscription
  108. // PendingLogsSubscription queries for logs in pending blocks
  109. PendingLogsSubscription
  110. // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
  111. MinedAndPendingLogsSubscription
  112. // PendingTransactionsSubscription queries tx hashes for pending
  113. // transactions entering the pending state
  114. PendingTransactionsSubscription
  115. // BlocksSubscription queries hashes for blocks that are imported
  116. BlocksSubscription
  117. // LastSubscription keeps track of the last index
  118. LastIndexSubscription
  119. )
  120. const (
  121. // txChanSize is the size of channel listening to NewTxsEvent.
  122. // The number is referenced from the size of tx pool.
  123. txChanSize = 4096
  124. // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
  125. rmLogsChanSize = 10
  126. // logsChanSize is the size of channel listening to LogsEvent.
  127. logsChanSize = 10
  128. // chainEvChanSize is the size of channel listening to ChainEvent.
  129. chainEvChanSize = 10
  130. )
  131. type subscription struct {
  132. id rpc.ID
  133. typ Type
  134. created time.Time
  135. logsCrit ethereum.FilterQuery
  136. logs chan []*types.Log
  137. hashes chan []common.Hash
  138. headers chan *types.Header
  139. installed chan struct{} // closed when the filter is installed
  140. err chan error // closed when the filter is uninstalled
  141. }
  142. // EventSystem creates subscriptions, processes events and broadcasts them to the
  143. // subscription which match the subscription criteria.
  144. type EventSystem struct {
  145. backend Backend
  146. sys *FilterSystem
  147. lightMode bool
  148. lastHead *types.Header
  149. // Subscriptions
  150. txsSub event.Subscription // Subscription for new transaction event
  151. logsSub event.Subscription // Subscription for new log event
  152. rmLogsSub event.Subscription // Subscription for removed log event
  153. pendingLogsSub event.Subscription // Subscription for pending log event
  154. chainSub event.Subscription // Subscription for new chain event
  155. // Channels
  156. install chan *subscription // install filter for event notification
  157. uninstall chan *subscription // remove filter for event notification
  158. txsCh chan core.NewTxsEvent // Channel to receive new transactions event
  159. logsCh chan []*types.Log // Channel to receive new log event
  160. pendingLogsCh chan []*types.Log // Channel to receive new log event
  161. rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
  162. chainCh chan core.ChainEvent // Channel to receive new chain event
  163. }
  164. // NewEventSystem creates a new manager that listens for event on the given mux,
  165. // parses and filters them. It uses the all map to retrieve filter changes. The
  166. // work loop holds its own index that is used to forward events to filters.
  167. //
  168. // The returned manager has a loop that needs to be stopped with the Stop function
  169. // or by stopping the given mux.
  170. func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
  171. m := &EventSystem{
  172. sys: sys,
  173. backend: sys.backend,
  174. lightMode: lightMode,
  175. install: make(chan *subscription),
  176. uninstall: make(chan *subscription),
  177. txsCh: make(chan core.NewTxsEvent, txChanSize),
  178. logsCh: make(chan []*types.Log, logsChanSize),
  179. rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
  180. pendingLogsCh: make(chan []*types.Log, logsChanSize),
  181. chainCh: make(chan core.ChainEvent, chainEvChanSize),
  182. }
  183. // Subscribe events
  184. m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
  185. m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
  186. m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
  187. m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
  188. m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
  189. // Make sure none of the subscriptions are empty
  190. if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
  191. log.Crit("Subscribe for event system failed")
  192. }
  193. go m.eventLoop()
  194. return m
  195. }
  196. // Subscription is created when the client registers itself for a particular event.
  197. type Subscription struct {
  198. ID rpc.ID
  199. f *subscription
  200. es *EventSystem
  201. unsubOnce sync.Once
  202. }
  203. // Err returns a channel that is closed when unsubscribed.
  204. func (sub *Subscription) Err() <-chan error {
  205. return sub.f.err
  206. }
  207. // Unsubscribe uninstalls the subscription from the event broadcast loop.
  208. func (sub *Subscription) Unsubscribe() {
  209. sub.unsubOnce.Do(func() {
  210. uninstallLoop:
  211. for {
  212. // write uninstall request and consume logs/hashes. This prevents
  213. // the eventLoop broadcast method to deadlock when writing to the
  214. // filter event channel while the subscription loop is waiting for
  215. // this method to return (and thus not reading these events).
  216. select {
  217. case sub.es.uninstall <- sub.f:
  218. break uninstallLoop
  219. case <-sub.f.logs:
  220. case <-sub.f.hashes:
  221. case <-sub.f.headers:
  222. }
  223. }
  224. // wait for filter to be uninstalled in work loop before returning
  225. // this ensures that the manager won't use the event channel which
  226. // will probably be closed by the client asap after this method returns.
  227. <-sub.Err()
  228. })
  229. }
  230. // subscribe installs the subscription in the event broadcast loop.
  231. func (es *EventSystem) subscribe(sub *subscription) *Subscription {
  232. es.install <- sub
  233. <-sub.installed
  234. return &Subscription{ID: sub.id, f: sub, es: es}
  235. }
  236. // SubscribeLogs creates a subscription that will write all logs matching the
  237. // given criteria to the given logs channel. Default value for the from and to
  238. // block is "latest". If the fromBlock > toBlock an error is returned.
  239. func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
  240. var from, to rpc.BlockNumber
  241. if crit.FromBlock == nil {
  242. from = rpc.LatestBlockNumber
  243. } else {
  244. from = rpc.BlockNumber(crit.FromBlock.Int64())
  245. }
  246. if crit.ToBlock == nil {
  247. to = rpc.LatestBlockNumber
  248. } else {
  249. to = rpc.BlockNumber(crit.ToBlock.Int64())
  250. }
  251. // only interested in pending logs
  252. if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
  253. return es.subscribePendingLogs(crit, logs), nil
  254. }
  255. // only interested in new mined logs
  256. if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
  257. return es.subscribeLogs(crit, logs), nil
  258. }
  259. // only interested in mined logs within a specific block range
  260. if from >= 0 && to >= 0 && to >= from {
  261. return es.subscribeLogs(crit, logs), nil
  262. }
  263. // interested in mined logs from a specific block number, new logs and pending logs
  264. if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
  265. return es.subscribeMinedPendingLogs(crit, logs), nil
  266. }
  267. // interested in logs from a specific block number to new mined blocks
  268. if from >= 0 && to == rpc.LatestBlockNumber {
  269. return es.subscribeLogs(crit, logs), nil
  270. }
  271. return nil, fmt.Errorf("invalid from and to block combination: from > to")
  272. }
  273. // subscribeMinedPendingLogs creates a subscription that returned mined and
  274. // pending logs that match the given criteria.
  275. func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
  276. sub := &subscription{
  277. id: rpc.NewID(),
  278. typ: MinedAndPendingLogsSubscription,
  279. logsCrit: crit,
  280. created: time.Now(),
  281. logs: logs,
  282. hashes: make(chan []common.Hash),
  283. headers: make(chan *types.Header),
  284. installed: make(chan struct{}),
  285. err: make(chan error),
  286. }
  287. return es.subscribe(sub)
  288. }
  289. // subscribeLogs creates a subscription that will write all logs matching the
  290. // given criteria to the given logs channel.
  291. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
  292. sub := &subscription{
  293. id: rpc.NewID(),
  294. typ: LogsSubscription,
  295. logsCrit: crit,
  296. created: time.Now(),
  297. logs: logs,
  298. hashes: make(chan []common.Hash),
  299. headers: make(chan *types.Header),
  300. installed: make(chan struct{}),
  301. err: make(chan error),
  302. }
  303. return es.subscribe(sub)
  304. }
  305. // subscribePendingLogs creates a subscription that writes contract event logs for
  306. // transactions that enter the transaction pool.
  307. func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
  308. sub := &subscription{
  309. id: rpc.NewID(),
  310. typ: PendingLogsSubscription,
  311. logsCrit: crit,
  312. created: time.Now(),
  313. logs: logs,
  314. hashes: make(chan []common.Hash),
  315. headers: make(chan *types.Header),
  316. installed: make(chan struct{}),
  317. err: make(chan error),
  318. }
  319. return es.subscribe(sub)
  320. }
  321. // SubscribeNewHeads creates a subscription that writes the header of a block that is
  322. // imported in the chain.
  323. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
  324. sub := &subscription{
  325. id: rpc.NewID(),
  326. typ: BlocksSubscription,
  327. created: time.Now(),
  328. logs: make(chan []*types.Log),
  329. hashes: make(chan []common.Hash),
  330. headers: headers,
  331. installed: make(chan struct{}),
  332. err: make(chan error),
  333. }
  334. return es.subscribe(sub)
  335. }
  336. // SubscribePendingTxs creates a subscription that writes transaction hashes for
  337. // transactions that enter the transaction pool.
  338. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
  339. sub := &subscription{
  340. id: rpc.NewID(),
  341. typ: PendingTransactionsSubscription,
  342. created: time.Now(),
  343. logs: make(chan []*types.Log),
  344. hashes: hashes,
  345. headers: make(chan *types.Header),
  346. installed: make(chan struct{}),
  347. err: make(chan error),
  348. }
  349. return es.subscribe(sub)
  350. }
  351. type filterIndex map[Type]map[rpc.ID]*subscription
  352. func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
  353. if len(ev) == 0 {
  354. return
  355. }
  356. for _, f := range filters[LogsSubscription] {
  357. matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
  358. if len(matchedLogs) > 0 {
  359. f.logs <- matchedLogs
  360. }
  361. }
  362. }
  363. func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
  364. if len(ev) == 0 {
  365. return
  366. }
  367. for _, f := range filters[PendingLogsSubscription] {
  368. matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
  369. if len(matchedLogs) > 0 {
  370. f.logs <- matchedLogs
  371. }
  372. }
  373. }
  374. func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
  375. for _, f := range filters[LogsSubscription] {
  376. matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
  377. if len(matchedLogs) > 0 {
  378. f.logs <- matchedLogs
  379. }
  380. }
  381. }
  382. func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
  383. hashes := make([]common.Hash, 0, len(ev.Txs))
  384. for _, tx := range ev.Txs {
  385. hashes = append(hashes, tx.Hash())
  386. }
  387. for _, f := range filters[PendingTransactionsSubscription] {
  388. f.hashes <- hashes
  389. }
  390. }
  391. func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
  392. for _, f := range filters[BlocksSubscription] {
  393. f.headers <- ev.Block.Header()
  394. }
  395. if es.lightMode && len(filters[LogsSubscription]) > 0 {
  396. es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
  397. for _, f := range filters[LogsSubscription] {
  398. if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
  399. f.logs <- matchedLogs
  400. }
  401. }
  402. })
  403. }
  404. }
  405. func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
  406. oldh := es.lastHead
  407. es.lastHead = newHeader
  408. if oldh == nil {
  409. return
  410. }
  411. newh := newHeader
  412. // find common ancestor, create list of rolled back and new block hashes
  413. var oldHeaders, newHeaders []*types.Header
  414. for oldh.Hash() != newh.Hash() {
  415. if oldh.Number.Uint64() >= newh.Number.Uint64() {
  416. oldHeaders = append(oldHeaders, oldh)
  417. oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
  418. }
  419. if oldh.Number.Uint64() < newh.Number.Uint64() {
  420. newHeaders = append(newHeaders, newh)
  421. newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
  422. if newh == nil {
  423. // happens when CHT syncing, nothing to do
  424. newh = oldh
  425. }
  426. }
  427. }
  428. // roll back old blocks
  429. for _, h := range oldHeaders {
  430. callBack(h, true)
  431. }
  432. // check new blocks (array is in reverse order)
  433. for i := len(newHeaders) - 1; i >= 0; i-- {
  434. callBack(newHeaders[i], false)
  435. }
  436. }
  437. // filter logs of a single header in light client mode
  438. func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
  439. if bloomFilter(header.Bloom, addresses, topics) {
  440. // Get the logs of the block
  441. ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  442. defer cancel()
  443. logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64())
  444. if err != nil {
  445. return nil
  446. }
  447. var unfiltered []*types.Log
  448. for _, logs := range logsList {
  449. for _, log := range logs {
  450. logcopy := *log
  451. logcopy.Removed = remove
  452. unfiltered = append(unfiltered, &logcopy)
  453. }
  454. }
  455. logs := filterLogs(unfiltered, nil, nil, addresses, topics)
  456. if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
  457. // We have matching but non-derived logs
  458. receipts, err := es.backend.GetReceipts(ctx, header.Hash())
  459. if err != nil {
  460. return nil
  461. }
  462. unfiltered = unfiltered[:0]
  463. for _, receipt := range receipts {
  464. for _, log := range receipt.Logs {
  465. logcopy := *log
  466. logcopy.Removed = remove
  467. unfiltered = append(unfiltered, &logcopy)
  468. }
  469. }
  470. logs = filterLogs(unfiltered, nil, nil, addresses, topics)
  471. }
  472. return logs
  473. }
  474. return nil
  475. }
  476. // eventLoop (un)installs filters and processes mux events.
  477. func (es *EventSystem) eventLoop() {
  478. // Ensure all subscriptions get cleaned up
  479. defer func() {
  480. es.txsSub.Unsubscribe()
  481. es.logsSub.Unsubscribe()
  482. es.rmLogsSub.Unsubscribe()
  483. es.pendingLogsSub.Unsubscribe()
  484. es.chainSub.Unsubscribe()
  485. }()
  486. index := make(filterIndex)
  487. for i := UnknownSubscription; i < LastIndexSubscription; i++ {
  488. index[i] = make(map[rpc.ID]*subscription)
  489. }
  490. for {
  491. select {
  492. case ev := <-es.txsCh:
  493. es.handleTxsEvent(index, ev)
  494. case ev := <-es.logsCh:
  495. es.handleLogs(index, ev)
  496. case ev := <-es.rmLogsCh:
  497. es.handleRemovedLogs(index, ev)
  498. case ev := <-es.pendingLogsCh:
  499. es.handlePendingLogs(index, ev)
  500. case ev := <-es.chainCh:
  501. es.handleChainEvent(index, ev)
  502. case f := <-es.install:
  503. if f.typ == MinedAndPendingLogsSubscription {
  504. // the type are logs and pending logs subscriptions
  505. index[LogsSubscription][f.id] = f
  506. index[PendingLogsSubscription][f.id] = f
  507. } else {
  508. index[f.typ][f.id] = f
  509. }
  510. close(f.installed)
  511. case f := <-es.uninstall:
  512. if f.typ == MinedAndPendingLogsSubscription {
  513. // the type are logs and pending logs subscriptions
  514. delete(index[LogsSubscription], f.id)
  515. delete(index[PendingLogsSubscription], f.id)
  516. } else {
  517. delete(index[f.typ], f.id)
  518. }
  519. close(f.err)
  520. // System stopped
  521. case <-es.txsSub.Err():
  522. return
  523. case <-es.logsSub.Err():
  524. return
  525. case <-es.rmLogsSub.Err():
  526. return
  527. case <-es.chainSub.Err():
  528. return
  529. }
  530. }
  531. }