tx_pool.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "sort"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core/state"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/event"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. "github.com/ethereum/go-ethereum/params"
  31. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  32. )
  33. var (
  34. // Transaction Pool Errors
  35. ErrInvalidSender = errors.New("invalid sender")
  36. ErrNonce = errors.New("nonce too low")
  37. ErrUnderpriced = errors.New("transaction underpriced")
  38. ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
  39. ErrBalance = errors.New("insufficient balance")
  40. ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
  41. ErrIntrinsicGas = errors.New("intrinsic gas too low")
  42. ErrGasLimit = errors.New("exceeds block gas limit")
  43. ErrNegativeValue = errors.New("negative value")
  44. )
  45. var (
  46. evictionInterval = time.Minute // Time interval to check for evictable transactions
  47. statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
  48. )
  49. var (
  50. // Metrics for the pending pool
  51. pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard")
  52. pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace")
  53. pendingRLCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting
  54. pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds
  55. // Metrics for the queued pool
  56. queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard")
  57. queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace")
  58. queuedRLCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting
  59. queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds
  60. // General tx metrics
  61. invalidTxCounter = metrics.NewCounter("txpool/invalid")
  62. underpricedTxCounter = metrics.NewCounter("txpool/underpriced")
  63. )
  64. type stateFn func() (*state.StateDB, error)
  65. // TxPoolConfig are the configuration parameters of the transaction pool.
  66. type TxPoolConfig struct {
  67. PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
  68. PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
  69. AccountSlots uint64 // Minimum number of executable transaction slots guaranteed per account
  70. GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
  71. AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
  72. GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
  73. Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
  74. }
  75. // DefaultTxPoolConfig contains the default configurations for the transaction
  76. // pool.
  77. var DefaultTxPoolConfig = TxPoolConfig{
  78. PriceLimit: 1,
  79. PriceBump: 10,
  80. AccountSlots: 16,
  81. GlobalSlots: 4096,
  82. AccountQueue: 64,
  83. GlobalQueue: 1024,
  84. Lifetime: 3 * time.Hour,
  85. }
  86. // sanitize checks the provided user configurations and changes anything that's
  87. // unreasonable or unworkable.
  88. func (config *TxPoolConfig) sanitize() TxPoolConfig {
  89. conf := *config
  90. if conf.PriceLimit < 1 {
  91. log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
  92. conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
  93. }
  94. if conf.PriceBump < 1 {
  95. log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
  96. conf.PriceBump = DefaultTxPoolConfig.PriceBump
  97. }
  98. return conf
  99. }
  100. // TxPool contains all currently known transactions. Transactions
  101. // enter the pool when they are received from the network or submitted
  102. // locally. They exit the pool when they are included in the blockchain.
  103. //
  104. // The pool separates processable transactions (which can be applied to the
  105. // current state) and future transactions. Transactions move between those
  106. // two states over time as they are received and processed.
  107. type TxPool struct {
  108. config TxPoolConfig
  109. chainconfig *params.ChainConfig
  110. currentState stateFn // The state function which will allow us to do some pre checks
  111. pendingState *state.ManagedState
  112. gasLimit func() *big.Int // The current gas limit function callback
  113. gasPrice *big.Int
  114. eventMux *event.TypeMux
  115. events *event.TypeMuxSubscription
  116. locals *txSet
  117. signer types.Signer
  118. mu sync.RWMutex
  119. pending map[common.Address]*txList // All currently processable transactions
  120. queue map[common.Address]*txList // Queued but non-processable transactions
  121. beats map[common.Address]time.Time // Last heartbeat from each known account
  122. all map[common.Hash]*types.Transaction // All transactions to allow lookups
  123. priced *txPricedList // All transactions sorted by price
  124. wg sync.WaitGroup // for shutdown sync
  125. quit chan struct{}
  126. homestead bool
  127. }
  128. // NewTxPool creates a new transaction pool to gather, sort and filter inbound
  129. // trnsactions from the network.
  130. func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
  131. // Sanitize the input to ensure no vulnerable gas prices are set
  132. config = (&config).sanitize()
  133. // Create the transaction pool with its initial settings
  134. pool := &TxPool{
  135. config: config,
  136. chainconfig: chainconfig,
  137. signer: types.NewEIP155Signer(chainconfig.ChainId),
  138. pending: make(map[common.Address]*txList),
  139. queue: make(map[common.Address]*txList),
  140. beats: make(map[common.Address]time.Time),
  141. all: make(map[common.Hash]*types.Transaction),
  142. eventMux: eventMux,
  143. currentState: currentStateFn,
  144. gasLimit: gasLimitFn,
  145. gasPrice: new(big.Int).SetUint64(config.PriceLimit),
  146. pendingState: nil,
  147. locals: newTxSet(),
  148. events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}),
  149. quit: make(chan struct{}),
  150. }
  151. pool.priced = newTxPricedList(&pool.all)
  152. pool.resetState()
  153. // Start the various events loops and return
  154. pool.wg.Add(2)
  155. go pool.eventLoop()
  156. go pool.expirationLoop()
  157. return pool
  158. }
  159. func (pool *TxPool) eventLoop() {
  160. defer pool.wg.Done()
  161. // Start a ticker and keep track of interesting pool stats to report
  162. var prevPending, prevQueued, prevStales int
  163. report := time.NewTicker(statsReportInterval)
  164. defer report.Stop()
  165. // Track chain events. When a chain events occurs (new chain canon block)
  166. // we need to know the new state. The new state will help us determine
  167. // the nonces in the managed state
  168. for {
  169. select {
  170. // Handle any events fired by the system
  171. case ev, ok := <-pool.events.Chan():
  172. if !ok {
  173. return
  174. }
  175. switch ev := ev.Data.(type) {
  176. case ChainHeadEvent:
  177. pool.mu.Lock()
  178. if ev.Block != nil {
  179. if pool.chainconfig.IsHomestead(ev.Block.Number()) {
  180. pool.homestead = true
  181. }
  182. }
  183. pool.resetState()
  184. pool.mu.Unlock()
  185. case RemovedTransactionEvent:
  186. pool.AddBatch(ev.Txs)
  187. }
  188. // Handle stats reporting ticks
  189. case <-report.C:
  190. pool.mu.RLock()
  191. pending, queued := pool.stats()
  192. stales := pool.priced.stales
  193. pool.mu.RUnlock()
  194. if pending != prevPending || queued != prevQueued || stales != prevStales {
  195. log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
  196. prevPending, prevQueued, prevStales = pending, queued, stales
  197. }
  198. }
  199. }
  200. }
  201. func (pool *TxPool) resetState() {
  202. currentState, err := pool.currentState()
  203. if err != nil {
  204. log.Error("Failed reset txpool state", "err", err)
  205. return
  206. }
  207. pool.pendingState = state.ManageState(currentState)
  208. // validate the pool of pending transactions, this will remove
  209. // any transactions that have been included in the block or
  210. // have been invalidated because of another transaction (e.g.
  211. // higher gas price)
  212. pool.demoteUnexecutables(currentState)
  213. // Update all accounts to the latest known pending nonce
  214. for addr, list := range pool.pending {
  215. txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
  216. pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
  217. }
  218. // Check the queue and move transactions over to the pending if possible
  219. // or remove those that have become invalid
  220. pool.promoteExecutables(currentState, nil)
  221. }
  222. // Stop terminates the transaction pool.
  223. func (pool *TxPool) Stop() {
  224. pool.events.Unsubscribe()
  225. close(pool.quit)
  226. pool.wg.Wait()
  227. log.Info("Transaction pool stopped")
  228. }
  229. // GasPrice returns the current gas price enforced by the transaction pool.
  230. func (pool *TxPool) GasPrice() *big.Int {
  231. pool.mu.RLock()
  232. defer pool.mu.RUnlock()
  233. return new(big.Int).Set(pool.gasPrice)
  234. }
  235. // SetGasPrice updates the minimum price required by the transaction pool for a
  236. // new transaction, and drops all transactions below this threshold.
  237. func (pool *TxPool) SetGasPrice(price *big.Int) {
  238. pool.mu.Lock()
  239. defer pool.mu.Unlock()
  240. pool.gasPrice = price
  241. for _, tx := range pool.priced.Cap(price, pool.locals) {
  242. pool.removeTx(tx.Hash())
  243. }
  244. log.Info("Transaction pool price threshold updated", "price", price)
  245. }
  246. // State returns the virtual managed state of the transaction pool.
  247. func (pool *TxPool) State() *state.ManagedState {
  248. pool.mu.RLock()
  249. defer pool.mu.RUnlock()
  250. return pool.pendingState
  251. }
  252. // Stats retrieves the current pool stats, namely the number of pending and the
  253. // number of queued (non-executable) transactions.
  254. func (pool *TxPool) Stats() (int, int) {
  255. pool.mu.RLock()
  256. defer pool.mu.RUnlock()
  257. return pool.stats()
  258. }
  259. // stats retrieves the current pool stats, namely the number of pending and the
  260. // number of queued (non-executable) transactions.
  261. func (pool *TxPool) stats() (int, int) {
  262. pending := 0
  263. for _, list := range pool.pending {
  264. pending += list.Len()
  265. }
  266. queued := 0
  267. for _, list := range pool.queue {
  268. queued += list.Len()
  269. }
  270. return pending, queued
  271. }
  272. // Content retrieves the data content of the transaction pool, returning all the
  273. // pending as well as queued transactions, grouped by account and sorted by nonce.
  274. func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
  275. pool.mu.RLock()
  276. defer pool.mu.RUnlock()
  277. pending := make(map[common.Address]types.Transactions)
  278. for addr, list := range pool.pending {
  279. pending[addr] = list.Flatten()
  280. }
  281. queued := make(map[common.Address]types.Transactions)
  282. for addr, list := range pool.queue {
  283. queued[addr] = list.Flatten()
  284. }
  285. return pending, queued
  286. }
  287. // Pending retrieves all currently processable transactions, groupped by origin
  288. // account and sorted by nonce. The returned transaction set is a copy and can be
  289. // freely modified by calling code.
  290. func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
  291. pool.mu.Lock()
  292. defer pool.mu.Unlock()
  293. pending := make(map[common.Address]types.Transactions)
  294. for addr, list := range pool.pending {
  295. pending[addr] = list.Flatten()
  296. }
  297. return pending, nil
  298. }
  299. // SetLocal marks a transaction as local, skipping gas price
  300. // check against local miner minimum in the future
  301. func (pool *TxPool) SetLocal(tx *types.Transaction) {
  302. pool.mu.Lock()
  303. defer pool.mu.Unlock()
  304. pool.locals.add(tx.Hash())
  305. }
  306. // validateTx checks whether a transaction is valid according
  307. // to the consensus rules.
  308. func (pool *TxPool) validateTx(tx *types.Transaction) error {
  309. local := pool.locals.contains(tx.Hash())
  310. // Drop transactions under our own minimal accepted gas price
  311. if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
  312. return ErrUnderpriced
  313. }
  314. currentState, err := pool.currentState()
  315. if err != nil {
  316. return err
  317. }
  318. from, err := types.Sender(pool.signer, tx)
  319. if err != nil {
  320. return ErrInvalidSender
  321. }
  322. // Last but not least check for nonce errors
  323. if currentState.GetNonce(from) > tx.Nonce() {
  324. return ErrNonce
  325. }
  326. // Check the transaction doesn't exceed the current
  327. // block limit gas.
  328. if pool.gasLimit().Cmp(tx.Gas()) < 0 {
  329. return ErrGasLimit
  330. }
  331. // Transactions can't be negative. This may never happen
  332. // using RLP decoded transactions but may occur if you create
  333. // a transaction using the RPC for example.
  334. if tx.Value().Sign() < 0 {
  335. return ErrNegativeValue
  336. }
  337. // Transactor should have enough funds to cover the costs
  338. // cost == V + GP * GL
  339. if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
  340. return ErrInsufficientFunds
  341. }
  342. intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
  343. if tx.Gas().Cmp(intrGas) < 0 {
  344. return ErrIntrinsicGas
  345. }
  346. return nil
  347. }
  348. // add validates a transaction and inserts it into the non-executable queue for
  349. // later pending promotion and execution. If the transaction is a replacement for
  350. // an already pending or queued one, it overwrites the previous and returns this
  351. // so outer code doesn't uselessly call promote.
  352. func (pool *TxPool) add(tx *types.Transaction) (bool, error) {
  353. // If the transaction is already known, discard it
  354. hash := tx.Hash()
  355. if pool.all[hash] != nil {
  356. log.Trace("Discarding already known transaction", "hash", hash)
  357. return false, fmt.Errorf("known transaction: %x", hash)
  358. }
  359. // If the transaction fails basic validation, discard it
  360. if err := pool.validateTx(tx); err != nil {
  361. log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
  362. invalidTxCounter.Inc(1)
  363. return false, err
  364. }
  365. // If the transaction pool is full, discard underpriced transactions
  366. if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
  367. // If the new transaction is underpriced, don't accept it
  368. if pool.priced.Underpriced(tx, pool.locals) {
  369. log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
  370. underpricedTxCounter.Inc(1)
  371. return false, ErrUnderpriced
  372. }
  373. // New transaction is better than our worse ones, make room for it
  374. drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
  375. for _, tx := range drop {
  376. log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
  377. underpricedTxCounter.Inc(1)
  378. pool.removeTx(tx.Hash())
  379. }
  380. }
  381. // If the transaction is replacing an already pending one, do directly
  382. from, _ := types.Sender(pool.signer, tx) // already validated
  383. if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
  384. // Nonce already pending, check if required price bump is met
  385. inserted, old := list.Add(tx, pool.config.PriceBump)
  386. if !inserted {
  387. pendingDiscardCounter.Inc(1)
  388. return false, ErrReplaceUnderpriced
  389. }
  390. // New transaction is better, replace old one
  391. if old != nil {
  392. delete(pool.all, old.Hash())
  393. pool.priced.Removed()
  394. pendingReplaceCounter.Inc(1)
  395. }
  396. pool.all[tx.Hash()] = tx
  397. pool.priced.Put(tx)
  398. log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
  399. return old != nil, nil
  400. }
  401. // New transaction isn't replacing a pending one, push into queue
  402. replace, err := pool.enqueueTx(hash, tx)
  403. if err != nil {
  404. return false, err
  405. }
  406. log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
  407. return replace, nil
  408. }
  409. // enqueueTx inserts a new transaction into the non-executable transaction queue.
  410. //
  411. // Note, this method assumes the pool lock is held!
  412. func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
  413. // Try to insert the transaction into the future queue
  414. from, _ := types.Sender(pool.signer, tx) // already validated
  415. if pool.queue[from] == nil {
  416. pool.queue[from] = newTxList(false)
  417. }
  418. inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
  419. if !inserted {
  420. // An older transaction was better, discard this
  421. queuedDiscardCounter.Inc(1)
  422. return false, ErrReplaceUnderpriced
  423. }
  424. // Discard any previous transaction and mark this
  425. if old != nil {
  426. delete(pool.all, old.Hash())
  427. pool.priced.Removed()
  428. queuedReplaceCounter.Inc(1)
  429. }
  430. pool.all[hash] = tx
  431. pool.priced.Put(tx)
  432. return old != nil, nil
  433. }
  434. // promoteTx adds a transaction to the pending (processable) list of transactions.
  435. //
  436. // Note, this method assumes the pool lock is held!
  437. func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
  438. // Try to insert the transaction into the pending queue
  439. if pool.pending[addr] == nil {
  440. pool.pending[addr] = newTxList(true)
  441. }
  442. list := pool.pending[addr]
  443. inserted, old := list.Add(tx, pool.config.PriceBump)
  444. if !inserted {
  445. // An older transaction was better, discard this
  446. delete(pool.all, hash)
  447. pool.priced.Removed()
  448. pendingDiscardCounter.Inc(1)
  449. return
  450. }
  451. // Otherwise discard any previous transaction and mark this
  452. if old != nil {
  453. delete(pool.all, old.Hash())
  454. pool.priced.Removed()
  455. pendingReplaceCounter.Inc(1)
  456. }
  457. // Failsafe to work around direct pending inserts (tests)
  458. if pool.all[hash] == nil {
  459. pool.all[hash] = tx
  460. pool.priced.Put(tx)
  461. }
  462. // Set the potentially new pending nonce and notify any subsystems of the new tx
  463. pool.beats[addr] = time.Now()
  464. pool.pendingState.SetNonce(addr, tx.Nonce()+1)
  465. go pool.eventMux.Post(TxPreEvent{tx})
  466. }
  467. // Add queues a single transaction in the pool if it is valid.
  468. func (pool *TxPool) Add(tx *types.Transaction) error {
  469. pool.mu.Lock()
  470. defer pool.mu.Unlock()
  471. // Try to inject the transaction and update any state
  472. replace, err := pool.add(tx)
  473. if err != nil {
  474. return err
  475. }
  476. // If we added a new transaction, run promotion checks and return
  477. if !replace {
  478. state, err := pool.currentState()
  479. if err != nil {
  480. return err
  481. }
  482. from, _ := types.Sender(pool.signer, tx) // already validated
  483. pool.promoteExecutables(state, []common.Address{from})
  484. }
  485. return nil
  486. }
  487. // AddBatch attempts to queue a batch of transactions.
  488. func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
  489. pool.mu.Lock()
  490. defer pool.mu.Unlock()
  491. // Add the batch of transaction, tracking the accepted ones
  492. dirty := make(map[common.Address]struct{})
  493. for _, tx := range txs {
  494. if replace, err := pool.add(tx); err == nil {
  495. if !replace {
  496. from, _ := types.Sender(pool.signer, tx) // already validated
  497. dirty[from] = struct{}{}
  498. }
  499. }
  500. }
  501. // Only reprocess the internal state if something was actually added
  502. if len(dirty) > 0 {
  503. state, err := pool.currentState()
  504. if err != nil {
  505. return err
  506. }
  507. addrs := make([]common.Address, 0, len(dirty))
  508. for addr, _ := range dirty {
  509. addrs = append(addrs, addr)
  510. }
  511. pool.promoteExecutables(state, addrs)
  512. }
  513. return nil
  514. }
  515. // Get returns a transaction if it is contained in the pool
  516. // and nil otherwise.
  517. func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
  518. pool.mu.RLock()
  519. defer pool.mu.RUnlock()
  520. return pool.all[hash]
  521. }
  522. // Remove removes the transaction with the given hash from the pool.
  523. func (pool *TxPool) Remove(hash common.Hash) {
  524. pool.mu.Lock()
  525. defer pool.mu.Unlock()
  526. pool.removeTx(hash)
  527. }
  528. // RemoveBatch removes all given transactions from the pool.
  529. func (pool *TxPool) RemoveBatch(txs types.Transactions) {
  530. pool.mu.Lock()
  531. defer pool.mu.Unlock()
  532. for _, tx := range txs {
  533. pool.removeTx(tx.Hash())
  534. }
  535. }
  536. // removeTx removes a single transaction from the queue, moving all subsequent
  537. // transactions back to the future queue.
  538. func (pool *TxPool) removeTx(hash common.Hash) {
  539. // Fetch the transaction we wish to delete
  540. tx, ok := pool.all[hash]
  541. if !ok {
  542. return
  543. }
  544. addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
  545. // Remove it from the list of known transactions
  546. delete(pool.all, hash)
  547. pool.priced.Removed()
  548. // Remove the transaction from the pending lists and reset the account nonce
  549. if pending := pool.pending[addr]; pending != nil {
  550. if removed, invalids := pending.Remove(tx); removed {
  551. // If no more transactions are left, remove the list
  552. if pending.Empty() {
  553. delete(pool.pending, addr)
  554. delete(pool.beats, addr)
  555. } else {
  556. // Otherwise postpone any invalidated transactions
  557. for _, tx := range invalids {
  558. pool.enqueueTx(tx.Hash(), tx)
  559. }
  560. }
  561. // Update the account nonce if needed
  562. if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
  563. pool.pendingState.SetNonce(addr, tx.Nonce())
  564. }
  565. }
  566. }
  567. // Transaction is in the future queue
  568. if future := pool.queue[addr]; future != nil {
  569. future.Remove(tx)
  570. if future.Empty() {
  571. delete(pool.queue, addr)
  572. }
  573. }
  574. }
  575. // promoteExecutables moves transactions that have become processable from the
  576. // future queue to the set of pending transactions. During this process, all
  577. // invalidated transactions (low nonce, low balance) are deleted.
  578. func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
  579. gaslimit := pool.gasLimit()
  580. // Gather all the accounts potentially needing updates
  581. if accounts == nil {
  582. accounts = make([]common.Address, 0, len(pool.queue))
  583. for addr, _ := range pool.queue {
  584. accounts = append(accounts, addr)
  585. }
  586. }
  587. // Iterate over all accounts and promote any executable transactions
  588. queued := uint64(0)
  589. for _, addr := range accounts {
  590. list := pool.queue[addr]
  591. if list == nil {
  592. continue // Just in case someone calls with a non existing account
  593. }
  594. // Drop all transactions that are deemed too old (low nonce)
  595. for _, tx := range list.Forward(state.GetNonce(addr)) {
  596. hash := tx.Hash()
  597. log.Trace("Removed old queued transaction", "hash", hash)
  598. delete(pool.all, hash)
  599. pool.priced.Removed()
  600. }
  601. // Drop all transactions that are too costly (low balance or out of gas)
  602. drops, _ := list.Filter(state.GetBalance(addr), gaslimit)
  603. for _, tx := range drops {
  604. hash := tx.Hash()
  605. log.Trace("Removed unpayable queued transaction", "hash", hash)
  606. delete(pool.all, hash)
  607. pool.priced.Removed()
  608. queuedNofundsCounter.Inc(1)
  609. }
  610. // Gather all executable transactions and promote them
  611. for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
  612. hash := tx.Hash()
  613. log.Trace("Promoting queued transaction", "hash", hash)
  614. pool.promoteTx(addr, hash, tx)
  615. }
  616. // Drop all transactions over the allowed limit
  617. for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
  618. hash := tx.Hash()
  619. log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
  620. delete(pool.all, hash)
  621. pool.priced.Removed()
  622. queuedRLCounter.Inc(1)
  623. }
  624. queued += uint64(list.Len())
  625. // Delete the entire queue entry if it became empty.
  626. if list.Empty() {
  627. delete(pool.queue, addr)
  628. }
  629. }
  630. // If the pending limit is overflown, start equalizing allowances
  631. pending := uint64(0)
  632. for _, list := range pool.pending {
  633. pending += uint64(list.Len())
  634. }
  635. if pending > pool.config.GlobalSlots {
  636. pendingBeforeCap := pending
  637. // Assemble a spam order to penalize large transactors first
  638. spammers := prque.New()
  639. for addr, list := range pool.pending {
  640. // Only evict transactions from high rollers
  641. if uint64(list.Len()) > pool.config.AccountSlots {
  642. // Skip local accounts as pools should maintain backlogs for themselves
  643. for _, tx := range list.txs.items {
  644. if !pool.locals.contains(tx.Hash()) {
  645. spammers.Push(addr, float32(list.Len()))
  646. }
  647. break // Checking on transaction for locality is enough
  648. }
  649. }
  650. }
  651. // Gradually drop transactions from offenders
  652. offenders := []common.Address{}
  653. for pending > pool.config.GlobalSlots && !spammers.Empty() {
  654. // Retrieve the next offender if not local address
  655. offender, _ := spammers.Pop()
  656. offenders = append(offenders, offender.(common.Address))
  657. // Equalize balances until all the same or below threshold
  658. if len(offenders) > 1 {
  659. // Calculate the equalization threshold for all current offenders
  660. threshold := pool.pending[offender.(common.Address)].Len()
  661. // Iteratively reduce all offenders until below limit or threshold reached
  662. for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
  663. for i := 0; i < len(offenders)-1; i++ {
  664. list := pool.pending[offenders[i]]
  665. list.Cap(list.Len() - 1)
  666. pending--
  667. }
  668. }
  669. }
  670. }
  671. // If still above threshold, reduce to limit or min allowance
  672. if pending > pool.config.GlobalSlots && len(offenders) > 0 {
  673. for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
  674. for _, addr := range offenders {
  675. list := pool.pending[addr]
  676. list.Cap(list.Len() - 1)
  677. pending--
  678. }
  679. }
  680. }
  681. pendingRLCounter.Inc(int64(pendingBeforeCap - pending))
  682. }
  683. // If we've queued more transactions than the hard limit, drop oldest ones
  684. if queued > pool.config.GlobalQueue {
  685. // Sort all accounts with queued transactions by heartbeat
  686. addresses := make(addresssByHeartbeat, 0, len(pool.queue))
  687. for addr := range pool.queue {
  688. addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
  689. }
  690. sort.Sort(addresses)
  691. // Drop transactions until the total is below the limit
  692. for drop := queued - pool.config.GlobalQueue; drop > 0; {
  693. addr := addresses[len(addresses)-1]
  694. list := pool.queue[addr.address]
  695. addresses = addresses[:len(addresses)-1]
  696. // Drop all transactions if they are less than the overflow
  697. if size := uint64(list.Len()); size <= drop {
  698. for _, tx := range list.Flatten() {
  699. pool.removeTx(tx.Hash())
  700. }
  701. drop -= size
  702. queuedRLCounter.Inc(int64(size))
  703. continue
  704. }
  705. // Otherwise drop only last few transactions
  706. txs := list.Flatten()
  707. for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
  708. pool.removeTx(txs[i].Hash())
  709. drop--
  710. queuedRLCounter.Inc(1)
  711. }
  712. }
  713. }
  714. }
  715. // demoteUnexecutables removes invalid and processed transactions from the pools
  716. // executable/pending queue and any subsequent transactions that become unexecutable
  717. // are moved back into the future queue.
  718. func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
  719. gaslimit := pool.gasLimit()
  720. // Iterate over all accounts and demote any non-executable transactions
  721. for addr, list := range pool.pending {
  722. nonce := state.GetNonce(addr)
  723. // Drop all transactions that are deemed too old (low nonce)
  724. for _, tx := range list.Forward(nonce) {
  725. hash := tx.Hash()
  726. log.Trace("Removed old pending transaction", "hash", hash)
  727. delete(pool.all, hash)
  728. pool.priced.Removed()
  729. }
  730. // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
  731. drops, invalids := list.Filter(state.GetBalance(addr), gaslimit)
  732. for _, tx := range drops {
  733. hash := tx.Hash()
  734. log.Trace("Removed unpayable pending transaction", "hash", hash)
  735. delete(pool.all, hash)
  736. pool.priced.Removed()
  737. pendingNofundsCounter.Inc(1)
  738. }
  739. for _, tx := range invalids {
  740. hash := tx.Hash()
  741. log.Trace("Demoting pending transaction", "hash", hash)
  742. pool.enqueueTx(hash, tx)
  743. }
  744. // Delete the entire queue entry if it became empty.
  745. if list.Empty() {
  746. delete(pool.pending, addr)
  747. delete(pool.beats, addr)
  748. }
  749. }
  750. }
  751. // expirationLoop is a loop that periodically iterates over all accounts with
  752. // queued transactions and drop all that have been inactive for a prolonged amount
  753. // of time.
  754. func (pool *TxPool) expirationLoop() {
  755. defer pool.wg.Done()
  756. evict := time.NewTicker(evictionInterval)
  757. defer evict.Stop()
  758. for {
  759. select {
  760. case <-evict.C:
  761. pool.mu.Lock()
  762. for addr := range pool.queue {
  763. if time.Since(pool.beats[addr]) > pool.config.Lifetime {
  764. for _, tx := range pool.queue[addr].Flatten() {
  765. pool.removeTx(tx.Hash())
  766. }
  767. }
  768. }
  769. pool.mu.Unlock()
  770. case <-pool.quit:
  771. return
  772. }
  773. }
  774. }
  775. // addressByHeartbeat is an account address tagged with its last activity timestamp.
  776. type addressByHeartbeat struct {
  777. address common.Address
  778. heartbeat time.Time
  779. }
  780. type addresssByHeartbeat []addressByHeartbeat
  781. func (a addresssByHeartbeat) Len() int { return len(a) }
  782. func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
  783. func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  784. // txSet represents a set of transaction hashes in which entries
  785. // are automatically dropped after txSetDuration time
  786. type txSet struct {
  787. txMap map[common.Hash]struct{}
  788. txOrd map[uint64]txOrdType
  789. addPtr, delPtr uint64
  790. }
  791. const txSetDuration = time.Hour * 2
  792. // txOrdType represents an entry in the time-ordered list of transaction hashes
  793. type txOrdType struct {
  794. hash common.Hash
  795. time time.Time
  796. }
  797. // newTxSet creates a new transaction set
  798. func newTxSet() *txSet {
  799. return &txSet{
  800. txMap: make(map[common.Hash]struct{}),
  801. txOrd: make(map[uint64]txOrdType),
  802. }
  803. }
  804. // contains returns true if the set contains the given transaction hash
  805. // (not thread safe, should be called from a locked environment)
  806. func (ts *txSet) contains(hash common.Hash) bool {
  807. _, ok := ts.txMap[hash]
  808. return ok
  809. }
  810. // add adds a transaction hash to the set, then removes entries older than txSetDuration
  811. // (not thread safe, should be called from a locked environment)
  812. func (ts *txSet) add(hash common.Hash) {
  813. ts.txMap[hash] = struct{}{}
  814. now := time.Now()
  815. ts.txOrd[ts.addPtr] = txOrdType{hash: hash, time: now}
  816. ts.addPtr++
  817. delBefore := now.Add(-txSetDuration)
  818. for ts.delPtr < ts.addPtr && ts.txOrd[ts.delPtr].time.Before(delBefore) {
  819. delete(ts.txMap, ts.txOrd[ts.delPtr].hash)
  820. delete(ts.txOrd, ts.delPtr)
  821. ts.delPtr++
  822. }
  823. }