tx_pool.go 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "errors"
  19. "fmt"
  20. "math"
  21. "math/big"
  22. "sort"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/prque"
  27. "github.com/ethereum/go-ethereum/core/state"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/event"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/metrics"
  32. "github.com/ethereum/go-ethereum/params"
  33. )
  34. const (
  35. // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
  36. chainHeadChanSize = 10
  37. )
  38. var (
  39. // ErrInvalidSender is returned if the transaction contains an invalid signature.
  40. ErrInvalidSender = errors.New("invalid sender")
  41. // ErrNonceTooLow is returned if the nonce of a transaction is lower than the
  42. // one present in the local chain.
  43. ErrNonceTooLow = errors.New("nonce too low")
  44. // ErrUnderpriced is returned if a transaction's gas price is below the minimum
  45. // configured for the transaction pool.
  46. ErrUnderpriced = errors.New("transaction underpriced")
  47. // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
  48. // with a different one without the required price bump.
  49. ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
  50. // ErrInsufficientFunds is returned if the total cost of executing a transaction
  51. // is higher than the balance of the user's account.
  52. ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
  53. // ErrIntrinsicGas is returned if the transaction is specified to use less gas
  54. // than required to start the invocation.
  55. ErrIntrinsicGas = errors.New("intrinsic gas too low")
  56. // ErrGasLimit is returned if a transaction's requested gas limit exceeds the
  57. // maximum allowance of the current block.
  58. ErrGasLimit = errors.New("exceeds block gas limit")
  59. // ErrNegativeValue is a sanity error to ensure noone is able to specify a
  60. // transaction with a negative value.
  61. ErrNegativeValue = errors.New("negative value")
  62. // ErrOversizedData is returned if the input data of a transaction is greater
  63. // than some meaningful limit a user might use. This is not a consensus error
  64. // making the transaction invalid, rather a DOS protection.
  65. ErrOversizedData = errors.New("oversized data")
  66. )
  67. var (
  68. evictionInterval = time.Minute // Time interval to check for evictable transactions
  69. statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
  70. )
  71. var (
  72. // Metrics for the pending pool
  73. pendingDiscardCounter = metrics.NewRegisteredCounter("txpool/pending/discard", nil)
  74. pendingReplaceCounter = metrics.NewRegisteredCounter("txpool/pending/replace", nil)
  75. pendingRateLimitCounter = metrics.NewRegisteredCounter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
  76. pendingNofundsCounter = metrics.NewRegisteredCounter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
  77. // Metrics for the queued pool
  78. queuedDiscardCounter = metrics.NewRegisteredCounter("txpool/queued/discard", nil)
  79. queuedReplaceCounter = metrics.NewRegisteredCounter("txpool/queued/replace", nil)
  80. queuedRateLimitCounter = metrics.NewRegisteredCounter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
  81. queuedNofundsCounter = metrics.NewRegisteredCounter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
  82. // General tx metrics
  83. invalidTxCounter = metrics.NewRegisteredCounter("txpool/invalid", nil)
  84. underpricedTxCounter = metrics.NewRegisteredCounter("txpool/underpriced", nil)
  85. )
  86. // TxStatus is the current status of a transaction as seen by the pool.
  87. type TxStatus uint
  88. const (
  89. TxStatusUnknown TxStatus = iota
  90. TxStatusQueued
  91. TxStatusPending
  92. TxStatusIncluded
  93. )
  94. // blockChain provides the state of blockchain and current gas limit to do
  95. // some pre checks in tx pool and event subscribers.
  96. type blockChain interface {
  97. CurrentBlock() *types.Block
  98. GetBlock(hash common.Hash, number uint64) *types.Block
  99. StateAt(root common.Hash) (*state.StateDB, error)
  100. SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
  101. }
  102. // TxPoolConfig are the configuration parameters of the transaction pool.
  103. type TxPoolConfig struct {
  104. Locals []common.Address // Addresses that should be treated by default as local
  105. NoLocals bool // Whether local transaction handling should be disabled
  106. Journal string // Journal of local transactions to survive node restarts
  107. Rejournal time.Duration // Time interval to regenerate the local transaction journal
  108. PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
  109. PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
  110. AccountSlots uint64 // Number of executable transaction slots guaranteed per account
  111. GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
  112. AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
  113. GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
  114. Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
  115. }
  116. // DefaultTxPoolConfig contains the default configurations for the transaction
  117. // pool.
  118. var DefaultTxPoolConfig = TxPoolConfig{
  119. Journal: "transactions.rlp",
  120. Rejournal: time.Hour,
  121. PriceLimit: 1,
  122. PriceBump: 10,
  123. AccountSlots: 16,
  124. GlobalSlots: 4096,
  125. AccountQueue: 64,
  126. GlobalQueue: 1024,
  127. Lifetime: 3 * time.Hour,
  128. }
  129. // sanitize checks the provided user configurations and changes anything that's
  130. // unreasonable or unworkable.
  131. func (config *TxPoolConfig) sanitize() TxPoolConfig {
  132. conf := *config
  133. if conf.Rejournal < time.Second {
  134. log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
  135. conf.Rejournal = time.Second
  136. }
  137. if conf.PriceLimit < 1 {
  138. log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
  139. conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
  140. }
  141. if conf.PriceBump < 1 {
  142. log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
  143. conf.PriceBump = DefaultTxPoolConfig.PriceBump
  144. }
  145. if conf.AccountSlots < 1 {
  146. log.Warn("Sanitizing invalid txpool account slots", "provided", conf.AccountSlots, "updated", DefaultTxPoolConfig.AccountSlots)
  147. conf.AccountSlots = DefaultTxPoolConfig.AccountSlots
  148. }
  149. if conf.GlobalSlots < 1 {
  150. log.Warn("Sanitizing invalid txpool global slots", "provided", conf.GlobalSlots, "updated", DefaultTxPoolConfig.GlobalSlots)
  151. conf.GlobalSlots = DefaultTxPoolConfig.GlobalSlots
  152. }
  153. if conf.AccountQueue < 1 {
  154. log.Warn("Sanitizing invalid txpool account queue", "provided", conf.AccountQueue, "updated", DefaultTxPoolConfig.AccountQueue)
  155. conf.AccountQueue = DefaultTxPoolConfig.AccountQueue
  156. }
  157. if conf.GlobalQueue < 1 {
  158. log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
  159. conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
  160. }
  161. if conf.Lifetime < 1 {
  162. log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
  163. conf.Lifetime = DefaultTxPoolConfig.Lifetime
  164. }
  165. return conf
  166. }
  167. // TxPool contains all currently known transactions. Transactions
  168. // enter the pool when they are received from the network or submitted
  169. // locally. They exit the pool when they are included in the blockchain.
  170. //
  171. // The pool separates processable transactions (which can be applied to the
  172. // current state) and future transactions. Transactions move between those
  173. // two states over time as they are received and processed.
  174. type TxPool struct {
  175. config TxPoolConfig
  176. chainconfig *params.ChainConfig
  177. chain blockChain
  178. gasPrice *big.Int
  179. txFeed event.Feed
  180. scope event.SubscriptionScope
  181. chainHeadCh chan ChainHeadEvent
  182. chainHeadSub event.Subscription
  183. signer types.Signer
  184. mu sync.RWMutex
  185. currentState *state.StateDB // Current state in the blockchain head
  186. pendingState *state.ManagedState // Pending state tracking virtual nonces
  187. currentMaxGas uint64 // Current gas limit for transaction caps
  188. locals *accountSet // Set of local transaction to exempt from eviction rules
  189. journal *txJournal // Journal of local transaction to back up to disk
  190. pending map[common.Address]*txList // All currently processable transactions
  191. queue map[common.Address]*txList // Queued but non-processable transactions
  192. beats map[common.Address]time.Time // Last heartbeat from each known account
  193. all *txLookup // All transactions to allow lookups
  194. priced *txPricedList // All transactions sorted by price
  195. wg sync.WaitGroup // for shutdown sync
  196. homestead bool
  197. }
  198. // NewTxPool creates a new transaction pool to gather, sort and filter inbound
  199. // transactions from the network.
  200. func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
  201. // Sanitize the input to ensure no vulnerable gas prices are set
  202. config = (&config).sanitize()
  203. // Create the transaction pool with its initial settings
  204. pool := &TxPool{
  205. config: config,
  206. chainconfig: chainconfig,
  207. chain: chain,
  208. signer: types.NewEIP155Signer(chainconfig.ChainID),
  209. pending: make(map[common.Address]*txList),
  210. queue: make(map[common.Address]*txList),
  211. beats: make(map[common.Address]time.Time),
  212. all: newTxLookup(),
  213. chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
  214. gasPrice: new(big.Int).SetUint64(config.PriceLimit),
  215. }
  216. pool.locals = newAccountSet(pool.signer)
  217. for _, addr := range config.Locals {
  218. log.Info("Setting new local account", "address", addr)
  219. pool.locals.add(addr)
  220. }
  221. pool.priced = newTxPricedList(pool.all)
  222. pool.reset(nil, chain.CurrentBlock().Header())
  223. // If local transactions and journaling is enabled, load from disk
  224. if !config.NoLocals && config.Journal != "" {
  225. pool.journal = newTxJournal(config.Journal)
  226. if err := pool.journal.load(pool.AddLocals); err != nil {
  227. log.Warn("Failed to load transaction journal", "err", err)
  228. }
  229. if err := pool.journal.rotate(pool.local()); err != nil {
  230. log.Warn("Failed to rotate transaction journal", "err", err)
  231. }
  232. }
  233. // Subscribe events from blockchain
  234. pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
  235. // Start the event loop and return
  236. pool.wg.Add(1)
  237. go pool.loop()
  238. return pool
  239. }
  240. // loop is the transaction pool's main event loop, waiting for and reacting to
  241. // outside blockchain events as well as for various reporting and transaction
  242. // eviction events.
  243. func (pool *TxPool) loop() {
  244. defer pool.wg.Done()
  245. // Start the stats reporting and transaction eviction tickers
  246. var prevPending, prevQueued, prevStales int
  247. report := time.NewTicker(statsReportInterval)
  248. defer report.Stop()
  249. evict := time.NewTicker(evictionInterval)
  250. defer evict.Stop()
  251. journal := time.NewTicker(pool.config.Rejournal)
  252. defer journal.Stop()
  253. // Track the previous head headers for transaction reorgs
  254. head := pool.chain.CurrentBlock()
  255. // Keep waiting for and reacting to the various events
  256. for {
  257. select {
  258. // Handle ChainHeadEvent
  259. case ev := <-pool.chainHeadCh:
  260. if ev.Block != nil {
  261. pool.mu.Lock()
  262. if pool.chainconfig.IsHomestead(ev.Block.Number()) {
  263. pool.homestead = true
  264. }
  265. pool.reset(head.Header(), ev.Block.Header())
  266. head = ev.Block
  267. pool.mu.Unlock()
  268. }
  269. // Be unsubscribed due to system stopped
  270. case <-pool.chainHeadSub.Err():
  271. return
  272. // Handle stats reporting ticks
  273. case <-report.C:
  274. pool.mu.RLock()
  275. pending, queued := pool.stats()
  276. stales := pool.priced.stales
  277. pool.mu.RUnlock()
  278. if pending != prevPending || queued != prevQueued || stales != prevStales {
  279. log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
  280. prevPending, prevQueued, prevStales = pending, queued, stales
  281. }
  282. // Handle inactive account transaction eviction
  283. case <-evict.C:
  284. pool.mu.Lock()
  285. for addr := range pool.queue {
  286. // Skip local transactions from the eviction mechanism
  287. if pool.locals.contains(addr) {
  288. continue
  289. }
  290. // Any non-locals old enough should be removed
  291. if time.Since(pool.beats[addr]) > pool.config.Lifetime {
  292. for _, tx := range pool.queue[addr].Flatten() {
  293. pool.removeTx(tx.Hash(), true)
  294. }
  295. }
  296. }
  297. pool.mu.Unlock()
  298. // Handle local transaction journal rotation
  299. case <-journal.C:
  300. if pool.journal != nil {
  301. pool.mu.Lock()
  302. if err := pool.journal.rotate(pool.local()); err != nil {
  303. log.Warn("Failed to rotate local tx journal", "err", err)
  304. }
  305. pool.mu.Unlock()
  306. }
  307. }
  308. }
  309. }
  310. // lockedReset is a wrapper around reset to allow calling it in a thread safe
  311. // manner. This method is only ever used in the tester!
  312. func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
  313. pool.mu.Lock()
  314. defer pool.mu.Unlock()
  315. pool.reset(oldHead, newHead)
  316. }
  317. // reset retrieves the current state of the blockchain and ensures the content
  318. // of the transaction pool is valid with regard to the chain state.
  319. func (pool *TxPool) reset(oldHead, newHead *types.Header) {
  320. // If we're reorging an old state, reinject all dropped transactions
  321. var reinject types.Transactions
  322. if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
  323. // If the reorg is too deep, avoid doing it (will happen during fast sync)
  324. oldNum := oldHead.Number.Uint64()
  325. newNum := newHead.Number.Uint64()
  326. if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
  327. log.Debug("Skipping deep transaction reorg", "depth", depth)
  328. } else {
  329. // Reorg seems shallow enough to pull in all transactions into memory
  330. var discarded, included types.Transactions
  331. var (
  332. rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
  333. add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
  334. )
  335. if rem == nil {
  336. // This can happen if a setHead is performed, where we simply discard the old
  337. // head from the chain.
  338. // If that is the case, we don't have the lost transactions any more, and
  339. // there's nothing to add
  340. if newNum < oldNum {
  341. // If the reorg ended up on a lower number, it's indicative of setHead being the cause
  342. log.Debug("Skipping transaction reset caused by setHead",
  343. "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
  344. } else {
  345. // If we reorged to a same or higher number, then it's not a case of setHead
  346. log.Warn("Transaction pool reset with missing oldhead",
  347. "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
  348. }
  349. return
  350. }
  351. for rem.NumberU64() > add.NumberU64() {
  352. discarded = append(discarded, rem.Transactions()...)
  353. if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
  354. log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
  355. return
  356. }
  357. }
  358. for add.NumberU64() > rem.NumberU64() {
  359. included = append(included, add.Transactions()...)
  360. if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
  361. log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
  362. return
  363. }
  364. }
  365. for rem.Hash() != add.Hash() {
  366. discarded = append(discarded, rem.Transactions()...)
  367. if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
  368. log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
  369. return
  370. }
  371. included = append(included, add.Transactions()...)
  372. if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
  373. log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
  374. return
  375. }
  376. }
  377. reinject = types.TxDifference(discarded, included)
  378. }
  379. }
  380. // Initialize the internal state to the current head
  381. if newHead == nil {
  382. newHead = pool.chain.CurrentBlock().Header() // Special case during testing
  383. }
  384. statedb, err := pool.chain.StateAt(newHead.Root)
  385. if err != nil {
  386. log.Error("Failed to reset txpool state", "err", err)
  387. return
  388. }
  389. pool.currentState = statedb
  390. pool.pendingState = state.ManageState(statedb)
  391. pool.currentMaxGas = newHead.GasLimit
  392. // Inject any transactions discarded due to reorgs
  393. log.Debug("Reinjecting stale transactions", "count", len(reinject))
  394. senderCacher.recover(pool.signer, reinject)
  395. pool.addTxsLocked(reinject, false)
  396. // validate the pool of pending transactions, this will remove
  397. // any transactions that have been included in the block or
  398. // have been invalidated because of another transaction (e.g.
  399. // higher gas price)
  400. pool.demoteUnexecutables()
  401. // Update all accounts to the latest known pending nonce
  402. for addr, list := range pool.pending {
  403. txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
  404. pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
  405. }
  406. // Check the queue and move transactions over to the pending if possible
  407. // or remove those that have become invalid
  408. pool.promoteExecutables(nil)
  409. }
  410. // Stop terminates the transaction pool.
  411. func (pool *TxPool) Stop() {
  412. // Unsubscribe all subscriptions registered from txpool
  413. pool.scope.Close()
  414. // Unsubscribe subscriptions registered from blockchain
  415. pool.chainHeadSub.Unsubscribe()
  416. pool.wg.Wait()
  417. if pool.journal != nil {
  418. pool.journal.close()
  419. }
  420. log.Info("Transaction pool stopped")
  421. }
  422. // SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
  423. // starts sending event to the given channel.
  424. func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
  425. return pool.scope.Track(pool.txFeed.Subscribe(ch))
  426. }
  427. // GasPrice returns the current gas price enforced by the transaction pool.
  428. func (pool *TxPool) GasPrice() *big.Int {
  429. pool.mu.RLock()
  430. defer pool.mu.RUnlock()
  431. return new(big.Int).Set(pool.gasPrice)
  432. }
  433. // SetGasPrice updates the minimum price required by the transaction pool for a
  434. // new transaction, and drops all transactions below this threshold.
  435. func (pool *TxPool) SetGasPrice(price *big.Int) {
  436. pool.mu.Lock()
  437. defer pool.mu.Unlock()
  438. pool.gasPrice = price
  439. for _, tx := range pool.priced.Cap(price, pool.locals) {
  440. pool.removeTx(tx.Hash(), false)
  441. }
  442. log.Info("Transaction pool price threshold updated", "price", price)
  443. }
  444. // State returns the virtual managed state of the transaction pool.
  445. func (pool *TxPool) State() *state.ManagedState {
  446. pool.mu.RLock()
  447. defer pool.mu.RUnlock()
  448. return pool.pendingState
  449. }
  450. // Stats retrieves the current pool stats, namely the number of pending and the
  451. // number of queued (non-executable) transactions.
  452. func (pool *TxPool) Stats() (int, int) {
  453. pool.mu.RLock()
  454. defer pool.mu.RUnlock()
  455. return pool.stats()
  456. }
  457. // stats retrieves the current pool stats, namely the number of pending and the
  458. // number of queued (non-executable) transactions.
  459. func (pool *TxPool) stats() (int, int) {
  460. pending := 0
  461. for _, list := range pool.pending {
  462. pending += list.Len()
  463. }
  464. queued := 0
  465. for _, list := range pool.queue {
  466. queued += list.Len()
  467. }
  468. return pending, queued
  469. }
  470. // Content retrieves the data content of the transaction pool, returning all the
  471. // pending as well as queued transactions, grouped by account and sorted by nonce.
  472. func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
  473. pool.mu.Lock()
  474. defer pool.mu.Unlock()
  475. pending := make(map[common.Address]types.Transactions)
  476. for addr, list := range pool.pending {
  477. pending[addr] = list.Flatten()
  478. }
  479. queued := make(map[common.Address]types.Transactions)
  480. for addr, list := range pool.queue {
  481. queued[addr] = list.Flatten()
  482. }
  483. return pending, queued
  484. }
  485. // Pending retrieves all currently processable transactions, grouped by origin
  486. // account and sorted by nonce. The returned transaction set is a copy and can be
  487. // freely modified by calling code.
  488. func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
  489. pool.mu.Lock()
  490. defer pool.mu.Unlock()
  491. pending := make(map[common.Address]types.Transactions)
  492. for addr, list := range pool.pending {
  493. pending[addr] = list.Flatten()
  494. }
  495. return pending, nil
  496. }
  497. // Locals retrieves the accounts currently considered local by the pool.
  498. func (pool *TxPool) Locals() []common.Address {
  499. pool.mu.Lock()
  500. defer pool.mu.Unlock()
  501. return pool.locals.flatten()
  502. }
  503. // local retrieves all currently known local transactions, grouped by origin
  504. // account and sorted by nonce. The returned transaction set is a copy and can be
  505. // freely modified by calling code.
  506. func (pool *TxPool) local() map[common.Address]types.Transactions {
  507. txs := make(map[common.Address]types.Transactions)
  508. for addr := range pool.locals.accounts {
  509. if pending := pool.pending[addr]; pending != nil {
  510. txs[addr] = append(txs[addr], pending.Flatten()...)
  511. }
  512. if queued := pool.queue[addr]; queued != nil {
  513. txs[addr] = append(txs[addr], queued.Flatten()...)
  514. }
  515. }
  516. return txs
  517. }
  518. // validateTx checks whether a transaction is valid according to the consensus
  519. // rules and adheres to some heuristic limits of the local node (price and size).
  520. func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
  521. // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
  522. if tx.Size() > 32*1024 {
  523. return ErrOversizedData
  524. }
  525. // Transactions can't be negative. This may never happen using RLP decoded
  526. // transactions but may occur if you create a transaction using the RPC.
  527. if tx.Value().Sign() < 0 {
  528. return ErrNegativeValue
  529. }
  530. // Ensure the transaction doesn't exceed the current block limit gas.
  531. if pool.currentMaxGas < tx.Gas() {
  532. return ErrGasLimit
  533. }
  534. // Make sure the transaction is signed properly
  535. from, err := types.Sender(pool.signer, tx)
  536. if err != nil {
  537. return ErrInvalidSender
  538. }
  539. // Drop non-local transactions under our own minimal accepted gas price
  540. local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
  541. if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
  542. return ErrUnderpriced
  543. }
  544. // Ensure the transaction adheres to nonce ordering
  545. if pool.currentState.GetNonce(from) > tx.Nonce() {
  546. return ErrNonceTooLow
  547. }
  548. // Transactor should have enough funds to cover the costs
  549. // cost == V + GP * GL
  550. if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
  551. return ErrInsufficientFunds
  552. }
  553. intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
  554. if err != nil {
  555. return err
  556. }
  557. if tx.Gas() < intrGas {
  558. return ErrIntrinsicGas
  559. }
  560. return nil
  561. }
  562. // add validates a transaction and inserts it into the non-executable queue for
  563. // later pending promotion and execution. If the transaction is a replacement for
  564. // an already pending or queued one, it overwrites the previous and returns this
  565. // so outer code doesn't uselessly call promote.
  566. //
  567. // If a newly added transaction is marked as local, its sending account will be
  568. // whitelisted, preventing any associated transaction from being dropped out of
  569. // the pool due to pricing constraints.
  570. func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
  571. // If the transaction is already known, discard it
  572. hash := tx.Hash()
  573. if pool.all.Get(hash) != nil {
  574. log.Trace("Discarding already known transaction", "hash", hash)
  575. return false, fmt.Errorf("known transaction: %x", hash)
  576. }
  577. // If the transaction fails basic validation, discard it
  578. if err := pool.validateTx(tx, local); err != nil {
  579. log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
  580. invalidTxCounter.Inc(1)
  581. return false, err
  582. }
  583. // If the transaction pool is full, discard underpriced transactions
  584. if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
  585. // If the new transaction is underpriced, don't accept it
  586. if !local && pool.priced.Underpriced(tx, pool.locals) {
  587. log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
  588. underpricedTxCounter.Inc(1)
  589. return false, ErrUnderpriced
  590. }
  591. // New transaction is better than our worse ones, make room for it
  592. drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
  593. for _, tx := range drop {
  594. log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
  595. underpricedTxCounter.Inc(1)
  596. pool.removeTx(tx.Hash(), false)
  597. }
  598. }
  599. // If the transaction is replacing an already pending one, do directly
  600. from, _ := types.Sender(pool.signer, tx) // already validated
  601. if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
  602. // Nonce already pending, check if required price bump is met
  603. inserted, old := list.Add(tx, pool.config.PriceBump)
  604. if !inserted {
  605. pendingDiscardCounter.Inc(1)
  606. return false, ErrReplaceUnderpriced
  607. }
  608. // New transaction is better, replace old one
  609. if old != nil {
  610. pool.all.Remove(old.Hash())
  611. pool.priced.Removed()
  612. pendingReplaceCounter.Inc(1)
  613. }
  614. pool.all.Add(tx)
  615. pool.priced.Put(tx)
  616. pool.journalTx(from, tx)
  617. log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
  618. // We've directly injected a replacement transaction, notify subsystems
  619. go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
  620. return old != nil, nil
  621. }
  622. // New transaction isn't replacing a pending one, push into queue
  623. replace, err := pool.enqueueTx(hash, tx)
  624. if err != nil {
  625. return false, err
  626. }
  627. // Mark local addresses and journal local transactions
  628. if local {
  629. if !pool.locals.contains(from) {
  630. log.Info("Setting new local account", "address", from)
  631. pool.locals.add(from)
  632. }
  633. }
  634. pool.journalTx(from, tx)
  635. log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
  636. return replace, nil
  637. }
  638. // enqueueTx inserts a new transaction into the non-executable transaction queue.
  639. //
  640. // Note, this method assumes the pool lock is held!
  641. func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
  642. // Try to insert the transaction into the future queue
  643. from, _ := types.Sender(pool.signer, tx) // already validated
  644. if pool.queue[from] == nil {
  645. pool.queue[from] = newTxList(false)
  646. }
  647. inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
  648. if !inserted {
  649. // An older transaction was better, discard this
  650. queuedDiscardCounter.Inc(1)
  651. return false, ErrReplaceUnderpriced
  652. }
  653. // Discard any previous transaction and mark this
  654. if old != nil {
  655. pool.all.Remove(old.Hash())
  656. pool.priced.Removed()
  657. queuedReplaceCounter.Inc(1)
  658. }
  659. if pool.all.Get(hash) == nil {
  660. pool.all.Add(tx)
  661. pool.priced.Put(tx)
  662. }
  663. return old != nil, nil
  664. }
  665. // journalTx adds the specified transaction to the local disk journal if it is
  666. // deemed to have been sent from a local account.
  667. func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
  668. // Only journal if it's enabled and the transaction is local
  669. if pool.journal == nil || !pool.locals.contains(from) {
  670. return
  671. }
  672. if err := pool.journal.insert(tx); err != nil {
  673. log.Warn("Failed to journal local transaction", "err", err)
  674. }
  675. }
  676. // promoteTx adds a transaction to the pending (processable) list of transactions
  677. // and returns whether it was inserted or an older was better.
  678. //
  679. // Note, this method assumes the pool lock is held!
  680. func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
  681. // Try to insert the transaction into the pending queue
  682. if pool.pending[addr] == nil {
  683. pool.pending[addr] = newTxList(true)
  684. }
  685. list := pool.pending[addr]
  686. inserted, old := list.Add(tx, pool.config.PriceBump)
  687. if !inserted {
  688. // An older transaction was better, discard this
  689. pool.all.Remove(hash)
  690. pool.priced.Removed()
  691. pendingDiscardCounter.Inc(1)
  692. return false
  693. }
  694. // Otherwise discard any previous transaction and mark this
  695. if old != nil {
  696. pool.all.Remove(old.Hash())
  697. pool.priced.Removed()
  698. pendingReplaceCounter.Inc(1)
  699. }
  700. // Failsafe to work around direct pending inserts (tests)
  701. if pool.all.Get(hash) == nil {
  702. pool.all.Add(tx)
  703. pool.priced.Put(tx)
  704. }
  705. // Set the potentially new pending nonce and notify any subsystems of the new tx
  706. pool.beats[addr] = time.Now()
  707. pool.pendingState.SetNonce(addr, tx.Nonce()+1)
  708. return true
  709. }
  710. // AddLocal enqueues a single transaction into the pool if it is valid, marking
  711. // the sender as a local one in the mean time, ensuring it goes around the local
  712. // pricing constraints.
  713. func (pool *TxPool) AddLocal(tx *types.Transaction) error {
  714. return pool.addTx(tx, !pool.config.NoLocals)
  715. }
  716. // AddRemote enqueues a single transaction into the pool if it is valid. If the
  717. // sender is not among the locally tracked ones, full pricing constraints will
  718. // apply.
  719. func (pool *TxPool) AddRemote(tx *types.Transaction) error {
  720. return pool.addTx(tx, false)
  721. }
  722. // AddLocals enqueues a batch of transactions into the pool if they are valid,
  723. // marking the senders as a local ones in the mean time, ensuring they go around
  724. // the local pricing constraints.
  725. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
  726. return pool.addTxs(txs, !pool.config.NoLocals)
  727. }
  728. // AddRemotes enqueues a batch of transactions into the pool if they are valid.
  729. // If the senders are not among the locally tracked ones, full pricing constraints
  730. // will apply.
  731. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
  732. return pool.addTxs(txs, false)
  733. }
  734. // addTx enqueues a single transaction into the pool if it is valid.
  735. func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
  736. // Cache sender in transaction before obtaining lock (pool.signer is immutable)
  737. types.Sender(pool.signer, tx)
  738. pool.mu.Lock()
  739. defer pool.mu.Unlock()
  740. // Try to inject the transaction and update any state
  741. replace, err := pool.add(tx, local)
  742. if err != nil {
  743. return err
  744. }
  745. // If we added a new transaction, run promotion checks and return
  746. if !replace {
  747. from, _ := types.Sender(pool.signer, tx) // already validated
  748. pool.promoteExecutables([]common.Address{from})
  749. }
  750. return nil
  751. }
  752. // addTxs attempts to queue a batch of transactions if they are valid.
  753. func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error {
  754. // Cache senders in transactions before obtaining lock (pool.signer is immutable)
  755. for _, tx := range txs {
  756. types.Sender(pool.signer, tx)
  757. }
  758. pool.mu.Lock()
  759. defer pool.mu.Unlock()
  760. return pool.addTxsLocked(txs, local)
  761. }
  762. // addTxsLocked attempts to queue a batch of transactions if they are valid,
  763. // whilst assuming the transaction pool lock is already held.
  764. func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
  765. // Add the batch of transactions, tracking the accepted ones
  766. dirty := make(map[common.Address]struct{})
  767. errs := make([]error, len(txs))
  768. for i, tx := range txs {
  769. var replace bool
  770. if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace {
  771. from, _ := types.Sender(pool.signer, tx) // already validated
  772. dirty[from] = struct{}{}
  773. }
  774. }
  775. // Only reprocess the internal state if something was actually added
  776. if len(dirty) > 0 {
  777. addrs := make([]common.Address, 0, len(dirty))
  778. for addr := range dirty {
  779. addrs = append(addrs, addr)
  780. }
  781. pool.promoteExecutables(addrs)
  782. }
  783. return errs
  784. }
  785. // Status returns the status (unknown/pending/queued) of a batch of transactions
  786. // identified by their hashes.
  787. func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
  788. pool.mu.RLock()
  789. defer pool.mu.RUnlock()
  790. status := make([]TxStatus, len(hashes))
  791. for i, hash := range hashes {
  792. if tx := pool.all.Get(hash); tx != nil {
  793. from, _ := types.Sender(pool.signer, tx) // already validated
  794. if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
  795. status[i] = TxStatusPending
  796. } else {
  797. status[i] = TxStatusQueued
  798. }
  799. }
  800. }
  801. return status
  802. }
  803. // Get returns a transaction if it is contained in the pool
  804. // and nil otherwise.
  805. func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
  806. return pool.all.Get(hash)
  807. }
  808. // removeTx removes a single transaction from the queue, moving all subsequent
  809. // transactions back to the future queue.
  810. func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
  811. // Fetch the transaction we wish to delete
  812. tx := pool.all.Get(hash)
  813. if tx == nil {
  814. return
  815. }
  816. addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
  817. // Remove it from the list of known transactions
  818. pool.all.Remove(hash)
  819. if outofbound {
  820. pool.priced.Removed()
  821. }
  822. // Remove the transaction from the pending lists and reset the account nonce
  823. if pending := pool.pending[addr]; pending != nil {
  824. if removed, invalids := pending.Remove(tx); removed {
  825. // If no more pending transactions are left, remove the list
  826. if pending.Empty() {
  827. delete(pool.pending, addr)
  828. delete(pool.beats, addr)
  829. }
  830. // Postpone any invalidated transactions
  831. for _, tx := range invalids {
  832. pool.enqueueTx(tx.Hash(), tx)
  833. }
  834. // Update the account nonce if needed
  835. if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
  836. pool.pendingState.SetNonce(addr, nonce)
  837. }
  838. return
  839. }
  840. }
  841. // Transaction is in the future queue
  842. if future := pool.queue[addr]; future != nil {
  843. future.Remove(tx)
  844. if future.Empty() {
  845. delete(pool.queue, addr)
  846. }
  847. }
  848. }
  849. // promoteExecutables moves transactions that have become processable from the
  850. // future queue to the set of pending transactions. During this process, all
  851. // invalidated transactions (low nonce, low balance) are deleted.
  852. func (pool *TxPool) promoteExecutables(accounts []common.Address) {
  853. // Track the promoted transactions to broadcast them at once
  854. var promoted []*types.Transaction
  855. // Gather all the accounts potentially needing updates
  856. if accounts == nil {
  857. accounts = make([]common.Address, 0, len(pool.queue))
  858. for addr := range pool.queue {
  859. accounts = append(accounts, addr)
  860. }
  861. }
  862. // Iterate over all accounts and promote any executable transactions
  863. for _, addr := range accounts {
  864. list := pool.queue[addr]
  865. if list == nil {
  866. continue // Just in case someone calls with a non existing account
  867. }
  868. // Drop all transactions that are deemed too old (low nonce)
  869. for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
  870. hash := tx.Hash()
  871. log.Trace("Removed old queued transaction", "hash", hash)
  872. pool.all.Remove(hash)
  873. pool.priced.Removed()
  874. }
  875. // Drop all transactions that are too costly (low balance or out of gas)
  876. drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
  877. for _, tx := range drops {
  878. hash := tx.Hash()
  879. log.Trace("Removed unpayable queued transaction", "hash", hash)
  880. pool.all.Remove(hash)
  881. pool.priced.Removed()
  882. queuedNofundsCounter.Inc(1)
  883. }
  884. // Gather all executable transactions and promote them
  885. for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
  886. hash := tx.Hash()
  887. if pool.promoteTx(addr, hash, tx) {
  888. log.Trace("Promoting queued transaction", "hash", hash)
  889. promoted = append(promoted, tx)
  890. }
  891. }
  892. // Drop all transactions over the allowed limit
  893. if !pool.locals.contains(addr) {
  894. for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
  895. hash := tx.Hash()
  896. pool.all.Remove(hash)
  897. pool.priced.Removed()
  898. queuedRateLimitCounter.Inc(1)
  899. log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
  900. }
  901. }
  902. // Delete the entire queue entry if it became empty.
  903. if list.Empty() {
  904. delete(pool.queue, addr)
  905. }
  906. }
  907. // Notify subsystem for new promoted transactions.
  908. if len(promoted) > 0 {
  909. go pool.txFeed.Send(NewTxsEvent{promoted})
  910. }
  911. // If the pending limit is overflown, start equalizing allowances
  912. pending := uint64(0)
  913. for _, list := range pool.pending {
  914. pending += uint64(list.Len())
  915. }
  916. if pending > pool.config.GlobalSlots {
  917. pendingBeforeCap := pending
  918. // Assemble a spam order to penalize large transactors first
  919. spammers := prque.New(nil)
  920. for addr, list := range pool.pending {
  921. // Only evict transactions from high rollers
  922. if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
  923. spammers.Push(addr, int64(list.Len()))
  924. }
  925. }
  926. // Gradually drop transactions from offenders
  927. offenders := []common.Address{}
  928. for pending > pool.config.GlobalSlots && !spammers.Empty() {
  929. // Retrieve the next offender if not local address
  930. offender, _ := spammers.Pop()
  931. offenders = append(offenders, offender.(common.Address))
  932. // Equalize balances until all the same or below threshold
  933. if len(offenders) > 1 {
  934. // Calculate the equalization threshold for all current offenders
  935. threshold := pool.pending[offender.(common.Address)].Len()
  936. // Iteratively reduce all offenders until below limit or threshold reached
  937. for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
  938. for i := 0; i < len(offenders)-1; i++ {
  939. list := pool.pending[offenders[i]]
  940. for _, tx := range list.Cap(list.Len() - 1) {
  941. // Drop the transaction from the global pools too
  942. hash := tx.Hash()
  943. pool.all.Remove(hash)
  944. pool.priced.Removed()
  945. // Update the account nonce to the dropped transaction
  946. if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
  947. pool.pendingState.SetNonce(offenders[i], nonce)
  948. }
  949. log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
  950. }
  951. pending--
  952. }
  953. }
  954. }
  955. }
  956. // If still above threshold, reduce to limit or min allowance
  957. if pending > pool.config.GlobalSlots && len(offenders) > 0 {
  958. for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
  959. for _, addr := range offenders {
  960. list := pool.pending[addr]
  961. for _, tx := range list.Cap(list.Len() - 1) {
  962. // Drop the transaction from the global pools too
  963. hash := tx.Hash()
  964. pool.all.Remove(hash)
  965. pool.priced.Removed()
  966. // Update the account nonce to the dropped transaction
  967. if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
  968. pool.pendingState.SetNonce(addr, nonce)
  969. }
  970. log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
  971. }
  972. pending--
  973. }
  974. }
  975. }
  976. pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
  977. }
  978. // If we've queued more transactions than the hard limit, drop oldest ones
  979. queued := uint64(0)
  980. for _, list := range pool.queue {
  981. queued += uint64(list.Len())
  982. }
  983. if queued > pool.config.GlobalQueue {
  984. // Sort all accounts with queued transactions by heartbeat
  985. addresses := make(addressesByHeartbeat, 0, len(pool.queue))
  986. for addr := range pool.queue {
  987. if !pool.locals.contains(addr) { // don't drop locals
  988. addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
  989. }
  990. }
  991. sort.Sort(addresses)
  992. // Drop transactions until the total is below the limit or only locals remain
  993. for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
  994. addr := addresses[len(addresses)-1]
  995. list := pool.queue[addr.address]
  996. addresses = addresses[:len(addresses)-1]
  997. // Drop all transactions if they are less than the overflow
  998. if size := uint64(list.Len()); size <= drop {
  999. for _, tx := range list.Flatten() {
  1000. pool.removeTx(tx.Hash(), true)
  1001. }
  1002. drop -= size
  1003. queuedRateLimitCounter.Inc(int64(size))
  1004. continue
  1005. }
  1006. // Otherwise drop only last few transactions
  1007. txs := list.Flatten()
  1008. for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
  1009. pool.removeTx(txs[i].Hash(), true)
  1010. drop--
  1011. queuedRateLimitCounter.Inc(1)
  1012. }
  1013. }
  1014. }
  1015. }
  1016. // demoteUnexecutables removes invalid and processed transactions from the pools
  1017. // executable/pending queue and any subsequent transactions that become unexecutable
  1018. // are moved back into the future queue.
  1019. func (pool *TxPool) demoteUnexecutables() {
  1020. // Iterate over all accounts and demote any non-executable transactions
  1021. for addr, list := range pool.pending {
  1022. nonce := pool.currentState.GetNonce(addr)
  1023. // Drop all transactions that are deemed too old (low nonce)
  1024. for _, tx := range list.Forward(nonce) {
  1025. hash := tx.Hash()
  1026. log.Trace("Removed old pending transaction", "hash", hash)
  1027. pool.all.Remove(hash)
  1028. pool.priced.Removed()
  1029. }
  1030. // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
  1031. drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
  1032. for _, tx := range drops {
  1033. hash := tx.Hash()
  1034. log.Trace("Removed unpayable pending transaction", "hash", hash)
  1035. pool.all.Remove(hash)
  1036. pool.priced.Removed()
  1037. pendingNofundsCounter.Inc(1)
  1038. }
  1039. for _, tx := range invalids {
  1040. hash := tx.Hash()
  1041. log.Trace("Demoting pending transaction", "hash", hash)
  1042. pool.enqueueTx(hash, tx)
  1043. }
  1044. // If there's a gap in front, alert (should never happen) and postpone all transactions
  1045. if list.Len() > 0 && list.txs.Get(nonce) == nil {
  1046. for _, tx := range list.Cap(0) {
  1047. hash := tx.Hash()
  1048. log.Error("Demoting invalidated transaction", "hash", hash)
  1049. pool.enqueueTx(hash, tx)
  1050. }
  1051. }
  1052. // Delete the entire queue entry if it became empty.
  1053. if list.Empty() {
  1054. delete(pool.pending, addr)
  1055. delete(pool.beats, addr)
  1056. }
  1057. }
  1058. }
  1059. // addressByHeartbeat is an account address tagged with its last activity timestamp.
  1060. type addressByHeartbeat struct {
  1061. address common.Address
  1062. heartbeat time.Time
  1063. }
  1064. type addressesByHeartbeat []addressByHeartbeat
  1065. func (a addressesByHeartbeat) Len() int { return len(a) }
  1066. func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
  1067. func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  1068. // accountSet is simply a set of addresses to check for existence, and a signer
  1069. // capable of deriving addresses from transactions.
  1070. type accountSet struct {
  1071. accounts map[common.Address]struct{}
  1072. signer types.Signer
  1073. cache *[]common.Address
  1074. }
  1075. // newAccountSet creates a new address set with an associated signer for sender
  1076. // derivations.
  1077. func newAccountSet(signer types.Signer) *accountSet {
  1078. return &accountSet{
  1079. accounts: make(map[common.Address]struct{}),
  1080. signer: signer,
  1081. }
  1082. }
  1083. // contains checks if a given address is contained within the set.
  1084. func (as *accountSet) contains(addr common.Address) bool {
  1085. _, exist := as.accounts[addr]
  1086. return exist
  1087. }
  1088. // containsTx checks if the sender of a given tx is within the set. If the sender
  1089. // cannot be derived, this method returns false.
  1090. func (as *accountSet) containsTx(tx *types.Transaction) bool {
  1091. if addr, err := types.Sender(as.signer, tx); err == nil {
  1092. return as.contains(addr)
  1093. }
  1094. return false
  1095. }
  1096. // add inserts a new address into the set to track.
  1097. func (as *accountSet) add(addr common.Address) {
  1098. as.accounts[addr] = struct{}{}
  1099. as.cache = nil
  1100. }
  1101. // flatten returns the list of addresses within this set, also caching it for later
  1102. // reuse. The returned slice should not be changed!
  1103. func (as *accountSet) flatten() []common.Address {
  1104. if as.cache == nil {
  1105. accounts := make([]common.Address, 0, len(as.accounts))
  1106. for account := range as.accounts {
  1107. accounts = append(accounts, account)
  1108. }
  1109. as.cache = &accounts
  1110. }
  1111. return *as.cache
  1112. }
  1113. // txLookup is used internally by TxPool to track transactions while allowing lookup without
  1114. // mutex contention.
  1115. //
  1116. // Note, although this type is properly protected against concurrent access, it
  1117. // is **not** a type that should ever be mutated or even exposed outside of the
  1118. // transaction pool, since its internal state is tightly coupled with the pools
  1119. // internal mechanisms. The sole purpose of the type is to permit out-of-bound
  1120. // peeking into the pool in TxPool.Get without having to acquire the widely scoped
  1121. // TxPool.mu mutex.
  1122. type txLookup struct {
  1123. all map[common.Hash]*types.Transaction
  1124. lock sync.RWMutex
  1125. }
  1126. // newTxLookup returns a new txLookup structure.
  1127. func newTxLookup() *txLookup {
  1128. return &txLookup{
  1129. all: make(map[common.Hash]*types.Transaction),
  1130. }
  1131. }
  1132. // Range calls f on each key and value present in the map.
  1133. func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
  1134. t.lock.RLock()
  1135. defer t.lock.RUnlock()
  1136. for key, value := range t.all {
  1137. if !f(key, value) {
  1138. break
  1139. }
  1140. }
  1141. }
  1142. // Get returns a transaction if it exists in the lookup, or nil if not found.
  1143. func (t *txLookup) Get(hash common.Hash) *types.Transaction {
  1144. t.lock.RLock()
  1145. defer t.lock.RUnlock()
  1146. return t.all[hash]
  1147. }
  1148. // Count returns the current number of items in the lookup.
  1149. func (t *txLookup) Count() int {
  1150. t.lock.RLock()
  1151. defer t.lock.RUnlock()
  1152. return len(t.all)
  1153. }
  1154. // Add adds a transaction to the lookup.
  1155. func (t *txLookup) Add(tx *types.Transaction) {
  1156. t.lock.Lock()
  1157. defer t.lock.Unlock()
  1158. t.all[tx.Hash()] = tx
  1159. }
  1160. // Remove removes a transaction from the lookup.
  1161. func (t *txLookup) Remove(hash common.Hash) {
  1162. t.lock.Lock()
  1163. defer t.lock.Unlock()
  1164. delete(t.all, hash)
  1165. }