tx_pool.go 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "errors"
  19. "math"
  20. "math/big"
  21. "sort"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/prque"
  27. "github.com/ethereum/go-ethereum/consensus/misc"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/event"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/metrics"
  33. "github.com/ethereum/go-ethereum/params"
  34. )
  35. const (
  36. // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
  37. chainHeadChanSize = 10
  38. // txSlotSize is used to calculate how many data slots a single transaction
  39. // takes up based on its size. The slots are used as DoS protection, ensuring
  40. // that validating a new transaction remains a constant operation (in reality
  41. // O(maxslots), where max slots are 4 currently).
  42. txSlotSize = 32 * 1024
  43. // txMaxSize is the maximum size a single transaction can have. This field has
  44. // non-trivial consequences: larger transactions are significantly harder and
  45. // more expensive to propagate; larger transactions also take more resources
  46. // to validate whether they fit into the pool or not.
  47. txMaxSize = 4 * txSlotSize // 128KB
  48. )
  49. var (
  50. // ErrAlreadyKnown is returned if the transactions is already contained
  51. // within the pool.
  52. ErrAlreadyKnown = errors.New("already known")
  53. // ErrInvalidSender is returned if the transaction contains an invalid signature.
  54. ErrInvalidSender = errors.New("invalid sender")
  55. // ErrUnderpriced is returned if a transaction's gas price is below the minimum
  56. // configured for the transaction pool.
  57. ErrUnderpriced = errors.New("transaction underpriced")
  58. // ErrTxPoolOverflow is returned if the transaction pool is full and can't accpet
  59. // another remote transaction.
  60. ErrTxPoolOverflow = errors.New("txpool is full")
  61. // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
  62. // with a different one without the required price bump.
  63. ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
  64. // ErrGasLimit is returned if a transaction's requested gas limit exceeds the
  65. // maximum allowance of the current block.
  66. ErrGasLimit = errors.New("exceeds block gas limit")
  67. // ErrNegativeValue is a sanity error to ensure no one is able to specify a
  68. // transaction with a negative value.
  69. ErrNegativeValue = errors.New("negative value")
  70. // ErrOversizedData is returned if the input data of a transaction is greater
  71. // than some meaningful limit a user might use. This is not a consensus error
  72. // making the transaction invalid, rather a DOS protection.
  73. ErrOversizedData = errors.New("oversized data")
  74. )
  75. var (
  76. evictionInterval = time.Minute // Time interval to check for evictable transactions
  77. statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
  78. )
  79. var (
  80. // Metrics for the pending pool
  81. pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
  82. pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
  83. pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
  84. pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds
  85. // Metrics for the queued pool
  86. queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
  87. queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
  88. queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
  89. queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
  90. queuedEvictionMeter = metrics.NewRegisteredMeter("txpool/queued/eviction", nil) // Dropped due to lifetime
  91. // General tx metrics
  92. knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil)
  93. validTxMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
  94. invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
  95. underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
  96. overflowedTxMeter = metrics.NewRegisteredMeter("txpool/overflowed", nil)
  97. // throttleTxMeter counts how many transactions are rejected due to too-many-changes between
  98. // txpool reorgs.
  99. throttleTxMeter = metrics.NewRegisteredMeter("txpool/throttle", nil)
  100. // reorgDurationTimer measures how long time a txpool reorg takes.
  101. reorgDurationTimer = metrics.NewRegisteredTimer("txpool/reorgtime", nil)
  102. // dropBetweenReorgHistogram counts how many drops we experience between two reorg runs. It is expected
  103. // that this number is pretty low, since txpool reorgs happen very frequently.
  104. dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015))
  105. pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
  106. queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
  107. localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
  108. slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
  109. reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
  110. )
  111. // TxStatus is the current status of a transaction as seen by the pool.
  112. type TxStatus uint
  113. const (
  114. TxStatusUnknown TxStatus = iota
  115. TxStatusQueued
  116. TxStatusPending
  117. TxStatusIncluded
  118. )
  119. // blockChain provides the state of blockchain and current gas limit to do
  120. // some pre checks in tx pool and event subscribers.
  121. type blockChain interface {
  122. CurrentBlock() *types.Block
  123. GetBlock(hash common.Hash, number uint64) *types.Block
  124. StateAt(root common.Hash) (*state.StateDB, error)
  125. SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
  126. }
  127. // TxPoolConfig are the configuration parameters of the transaction pool.
  128. type TxPoolConfig struct {
  129. Locals []common.Address // Addresses that should be treated by default as local
  130. NoLocals bool // Whether local transaction handling should be disabled
  131. Journal string // Journal of local transactions to survive node restarts
  132. Rejournal time.Duration // Time interval to regenerate the local transaction journal
  133. PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
  134. PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
  135. AccountSlots uint64 // Number of executable transaction slots guaranteed per account
  136. GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
  137. AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
  138. GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
  139. Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
  140. }
  141. // DefaultTxPoolConfig contains the default configurations for the transaction
  142. // pool.
  143. var DefaultTxPoolConfig = TxPoolConfig{
  144. Journal: "transactions.rlp",
  145. Rejournal: time.Hour,
  146. PriceLimit: 1,
  147. PriceBump: 10,
  148. AccountSlots: 16,
  149. GlobalSlots: 4096 + 1024, // urgent + floating queue capacity with 4:1 ratio
  150. AccountQueue: 64,
  151. GlobalQueue: 1024,
  152. Lifetime: 3 * time.Hour,
  153. }
  154. // sanitize checks the provided user configurations and changes anything that's
  155. // unreasonable or unworkable.
  156. func (config *TxPoolConfig) sanitize() TxPoolConfig {
  157. conf := *config
  158. if conf.Rejournal < time.Second {
  159. log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
  160. conf.Rejournal = time.Second
  161. }
  162. if conf.PriceLimit < 1 {
  163. log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
  164. conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
  165. }
  166. if conf.PriceBump < 1 {
  167. log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
  168. conf.PriceBump = DefaultTxPoolConfig.PriceBump
  169. }
  170. if conf.AccountSlots < 1 {
  171. log.Warn("Sanitizing invalid txpool account slots", "provided", conf.AccountSlots, "updated", DefaultTxPoolConfig.AccountSlots)
  172. conf.AccountSlots = DefaultTxPoolConfig.AccountSlots
  173. }
  174. if conf.GlobalSlots < 1 {
  175. log.Warn("Sanitizing invalid txpool global slots", "provided", conf.GlobalSlots, "updated", DefaultTxPoolConfig.GlobalSlots)
  176. conf.GlobalSlots = DefaultTxPoolConfig.GlobalSlots
  177. }
  178. if conf.AccountQueue < 1 {
  179. log.Warn("Sanitizing invalid txpool account queue", "provided", conf.AccountQueue, "updated", DefaultTxPoolConfig.AccountQueue)
  180. conf.AccountQueue = DefaultTxPoolConfig.AccountQueue
  181. }
  182. if conf.GlobalQueue < 1 {
  183. log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultTxPoolConfig.GlobalQueue)
  184. conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
  185. }
  186. if conf.Lifetime < 1 {
  187. log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultTxPoolConfig.Lifetime)
  188. conf.Lifetime = DefaultTxPoolConfig.Lifetime
  189. }
  190. return conf
  191. }
  192. // TxPool contains all currently known transactions. Transactions
  193. // enter the pool when they are received from the network or submitted
  194. // locally. They exit the pool when they are included in the blockchain.
  195. //
  196. // The pool separates processable transactions (which can be applied to the
  197. // current state) and future transactions. Transactions move between those
  198. // two states over time as they are received and processed.
  199. type TxPool struct {
  200. config TxPoolConfig
  201. chainconfig *params.ChainConfig
  202. chain blockChain
  203. gasPrice *big.Int
  204. txFeed event.Feed
  205. scope event.SubscriptionScope
  206. signer types.Signer
  207. mu sync.RWMutex
  208. istanbul bool // Fork indicator whether we are in the istanbul stage.
  209. eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
  210. eip1559 bool // Fork indicator whether we are using EIP-1559 type transactions.
  211. currentState *state.StateDB // Current state in the blockchain head
  212. pendingNonces *txNoncer // Pending state tracking virtual nonces
  213. currentMaxGas uint64 // Current gas limit for transaction caps
  214. locals *accountSet // Set of local transaction to exempt from eviction rules
  215. journal *txJournal // Journal of local transaction to back up to disk
  216. pending map[common.Address]*txList // All currently processable transactions
  217. queue map[common.Address]*txList // Queued but non-processable transactions
  218. beats map[common.Address]time.Time // Last heartbeat from each known account
  219. all *txLookup // All transactions to allow lookups
  220. priced *txPricedList // All transactions sorted by price
  221. chainHeadCh chan ChainHeadEvent
  222. chainHeadSub event.Subscription
  223. reqResetCh chan *txpoolResetRequest
  224. reqPromoteCh chan *accountSet
  225. queueTxEventCh chan *types.Transaction
  226. reorgDoneCh chan chan struct{}
  227. reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
  228. wg sync.WaitGroup // tracks loop, scheduleReorgLoop
  229. initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
  230. changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
  231. }
  232. type txpoolResetRequest struct {
  233. oldHead, newHead *types.Header
  234. }
  235. // NewTxPool creates a new transaction pool to gather, sort and filter inbound
  236. // transactions from the network.
  237. func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
  238. // Sanitize the input to ensure no vulnerable gas prices are set
  239. config = (&config).sanitize()
  240. // Create the transaction pool with its initial settings
  241. pool := &TxPool{
  242. config: config,
  243. chainconfig: chainconfig,
  244. chain: chain,
  245. signer: types.LatestSigner(chainconfig),
  246. pending: make(map[common.Address]*txList),
  247. queue: make(map[common.Address]*txList),
  248. beats: make(map[common.Address]time.Time),
  249. all: newTxLookup(),
  250. chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
  251. reqResetCh: make(chan *txpoolResetRequest),
  252. reqPromoteCh: make(chan *accountSet),
  253. queueTxEventCh: make(chan *types.Transaction),
  254. reorgDoneCh: make(chan chan struct{}),
  255. reorgShutdownCh: make(chan struct{}),
  256. initDoneCh: make(chan struct{}),
  257. gasPrice: new(big.Int).SetUint64(config.PriceLimit),
  258. }
  259. pool.locals = newAccountSet(pool.signer)
  260. for _, addr := range config.Locals {
  261. log.Info("Setting new local account", "address", addr)
  262. pool.locals.add(addr)
  263. }
  264. pool.priced = newTxPricedList(pool.all)
  265. pool.reset(nil, chain.CurrentBlock().Header())
  266. // Start the reorg loop early so it can handle requests generated during journal loading.
  267. pool.wg.Add(1)
  268. go pool.scheduleReorgLoop()
  269. // If local transactions and journaling is enabled, load from disk
  270. if !config.NoLocals && config.Journal != "" {
  271. pool.journal = newTxJournal(config.Journal)
  272. if err := pool.journal.load(pool.AddLocals); err != nil {
  273. log.Warn("Failed to load transaction journal", "err", err)
  274. }
  275. if err := pool.journal.rotate(pool.local()); err != nil {
  276. log.Warn("Failed to rotate transaction journal", "err", err)
  277. }
  278. }
  279. // Subscribe events from blockchain and start the main event loop.
  280. pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
  281. pool.wg.Add(1)
  282. go pool.loop()
  283. return pool
  284. }
  285. // loop is the transaction pool's main event loop, waiting for and reacting to
  286. // outside blockchain events as well as for various reporting and transaction
  287. // eviction events.
  288. func (pool *TxPool) loop() {
  289. defer pool.wg.Done()
  290. var (
  291. prevPending, prevQueued, prevStales int
  292. // Start the stats reporting and transaction eviction tickers
  293. report = time.NewTicker(statsReportInterval)
  294. evict = time.NewTicker(evictionInterval)
  295. journal = time.NewTicker(pool.config.Rejournal)
  296. // Track the previous head headers for transaction reorgs
  297. head = pool.chain.CurrentBlock()
  298. )
  299. defer report.Stop()
  300. defer evict.Stop()
  301. defer journal.Stop()
  302. // Notify tests that the init phase is done
  303. close(pool.initDoneCh)
  304. for {
  305. select {
  306. // Handle ChainHeadEvent
  307. case ev := <-pool.chainHeadCh:
  308. if ev.Block != nil {
  309. pool.requestReset(head.Header(), ev.Block.Header())
  310. head = ev.Block
  311. }
  312. // System shutdown.
  313. case <-pool.chainHeadSub.Err():
  314. close(pool.reorgShutdownCh)
  315. return
  316. // Handle stats reporting ticks
  317. case <-report.C:
  318. pool.mu.RLock()
  319. pending, queued := pool.stats()
  320. pool.mu.RUnlock()
  321. stales := int(atomic.LoadInt64(&pool.priced.stales))
  322. if pending != prevPending || queued != prevQueued || stales != prevStales {
  323. log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
  324. prevPending, prevQueued, prevStales = pending, queued, stales
  325. }
  326. // Handle inactive account transaction eviction
  327. case <-evict.C:
  328. pool.mu.Lock()
  329. for addr := range pool.queue {
  330. // Skip local transactions from the eviction mechanism
  331. if pool.locals.contains(addr) {
  332. continue
  333. }
  334. // Any non-locals old enough should be removed
  335. if time.Since(pool.beats[addr]) > pool.config.Lifetime {
  336. list := pool.queue[addr].Flatten()
  337. for _, tx := range list {
  338. pool.removeTx(tx.Hash(), true)
  339. }
  340. queuedEvictionMeter.Mark(int64(len(list)))
  341. }
  342. }
  343. pool.mu.Unlock()
  344. // Handle local transaction journal rotation
  345. case <-journal.C:
  346. if pool.journal != nil {
  347. pool.mu.Lock()
  348. if err := pool.journal.rotate(pool.local()); err != nil {
  349. log.Warn("Failed to rotate local tx journal", "err", err)
  350. }
  351. pool.mu.Unlock()
  352. }
  353. }
  354. }
  355. }
  356. // Stop terminates the transaction pool.
  357. func (pool *TxPool) Stop() {
  358. // Unsubscribe all subscriptions registered from txpool
  359. pool.scope.Close()
  360. // Unsubscribe subscriptions registered from blockchain
  361. pool.chainHeadSub.Unsubscribe()
  362. pool.wg.Wait()
  363. if pool.journal != nil {
  364. pool.journal.close()
  365. }
  366. log.Info("Transaction pool stopped")
  367. }
  368. // SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
  369. // starts sending event to the given channel.
  370. func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
  371. return pool.scope.Track(pool.txFeed.Subscribe(ch))
  372. }
  373. // GasPrice returns the current gas price enforced by the transaction pool.
  374. func (pool *TxPool) GasPrice() *big.Int {
  375. pool.mu.RLock()
  376. defer pool.mu.RUnlock()
  377. return new(big.Int).Set(pool.gasPrice)
  378. }
  379. // SetGasPrice updates the minimum price required by the transaction pool for a
  380. // new transaction, and drops all transactions below this threshold.
  381. func (pool *TxPool) SetGasPrice(price *big.Int) {
  382. pool.mu.Lock()
  383. defer pool.mu.Unlock()
  384. old := pool.gasPrice
  385. pool.gasPrice = price
  386. // if the min miner fee increased, remove transactions below the new threshold
  387. if price.Cmp(old) > 0 {
  388. // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
  389. drop := pool.all.RemotesBelowTip(price)
  390. for _, tx := range drop {
  391. pool.removeTx(tx.Hash(), false)
  392. }
  393. pool.priced.Removed(len(drop))
  394. }
  395. log.Info("Transaction pool price threshold updated", "price", price)
  396. }
  397. // Nonce returns the next nonce of an account, with all transactions executable
  398. // by the pool already applied on top.
  399. func (pool *TxPool) Nonce(addr common.Address) uint64 {
  400. pool.mu.RLock()
  401. defer pool.mu.RUnlock()
  402. return pool.pendingNonces.get(addr)
  403. }
  404. // Stats retrieves the current pool stats, namely the number of pending and the
  405. // number of queued (non-executable) transactions.
  406. func (pool *TxPool) Stats() (int, int) {
  407. pool.mu.RLock()
  408. defer pool.mu.RUnlock()
  409. return pool.stats()
  410. }
  411. // stats retrieves the current pool stats, namely the number of pending and the
  412. // number of queued (non-executable) transactions.
  413. func (pool *TxPool) stats() (int, int) {
  414. pending := 0
  415. for _, list := range pool.pending {
  416. pending += list.Len()
  417. }
  418. queued := 0
  419. for _, list := range pool.queue {
  420. queued += list.Len()
  421. }
  422. return pending, queued
  423. }
  424. // Content retrieves the data content of the transaction pool, returning all the
  425. // pending as well as queued transactions, grouped by account and sorted by nonce.
  426. func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
  427. pool.mu.Lock()
  428. defer pool.mu.Unlock()
  429. pending := make(map[common.Address]types.Transactions)
  430. for addr, list := range pool.pending {
  431. pending[addr] = list.Flatten()
  432. }
  433. queued := make(map[common.Address]types.Transactions)
  434. for addr, list := range pool.queue {
  435. queued[addr] = list.Flatten()
  436. }
  437. return pending, queued
  438. }
  439. // ContentFrom retrieves the data content of the transaction pool, returning the
  440. // pending as well as queued transactions of this address, grouped by nonce.
  441. func (pool *TxPool) ContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
  442. pool.mu.RLock()
  443. defer pool.mu.RUnlock()
  444. var pending types.Transactions
  445. if list, ok := pool.pending[addr]; ok {
  446. pending = list.Flatten()
  447. }
  448. var queued types.Transactions
  449. if list, ok := pool.queue[addr]; ok {
  450. queued = list.Flatten()
  451. }
  452. return pending, queued
  453. }
  454. // Pending retrieves all currently processable transactions, grouped by origin
  455. // account and sorted by nonce. The returned transaction set is a copy and can be
  456. // freely modified by calling code.
  457. //
  458. // The enforceTips parameter can be used to do an extra filtering on the pending
  459. // transactions and only return those whose **effective** tip is large enough in
  460. // the next pending execution environment.
  461. func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transactions {
  462. pool.mu.Lock()
  463. defer pool.mu.Unlock()
  464. pending := make(map[common.Address]types.Transactions)
  465. for addr, list := range pool.pending {
  466. txs := list.Flatten()
  467. // If the miner requests tip enforcement, cap the lists now
  468. if enforceTips && !pool.locals.contains(addr) {
  469. for i, tx := range txs {
  470. if tx.EffectiveGasTipIntCmp(pool.gasPrice, pool.priced.urgent.baseFee) < 0 {
  471. txs = txs[:i]
  472. break
  473. }
  474. }
  475. }
  476. if len(txs) > 0 {
  477. pending[addr] = txs
  478. }
  479. }
  480. return pending
  481. }
  482. // Locals retrieves the accounts currently considered local by the pool.
  483. func (pool *TxPool) Locals() []common.Address {
  484. pool.mu.Lock()
  485. defer pool.mu.Unlock()
  486. return pool.locals.flatten()
  487. }
  488. // local retrieves all currently known local transactions, grouped by origin
  489. // account and sorted by nonce. The returned transaction set is a copy and can be
  490. // freely modified by calling code.
  491. func (pool *TxPool) local() map[common.Address]types.Transactions {
  492. txs := make(map[common.Address]types.Transactions)
  493. for addr := range pool.locals.accounts {
  494. if pending := pool.pending[addr]; pending != nil {
  495. txs[addr] = append(txs[addr], pending.Flatten()...)
  496. }
  497. if queued := pool.queue[addr]; queued != nil {
  498. txs[addr] = append(txs[addr], queued.Flatten()...)
  499. }
  500. }
  501. return txs
  502. }
  503. // validateTx checks whether a transaction is valid according to the consensus
  504. // rules and adheres to some heuristic limits of the local node (price and size).
  505. func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
  506. // Accept only legacy transactions until EIP-2718/2930 activates.
  507. if !pool.eip2718 && tx.Type() != types.LegacyTxType {
  508. return ErrTxTypeNotSupported
  509. }
  510. // Reject dynamic fee transactions until EIP-1559 activates.
  511. if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType {
  512. return ErrTxTypeNotSupported
  513. }
  514. // Reject transactions over defined size to prevent DOS attacks
  515. if uint64(tx.Size()) > txMaxSize {
  516. return ErrOversizedData
  517. }
  518. // Transactions can't be negative. This may never happen using RLP decoded
  519. // transactions but may occur if you create a transaction using the RPC.
  520. if tx.Value().Sign() < 0 {
  521. return ErrNegativeValue
  522. }
  523. // Ensure the transaction doesn't exceed the current block limit gas.
  524. if pool.currentMaxGas < tx.Gas() {
  525. return ErrGasLimit
  526. }
  527. // Sanity check for extremely large numbers
  528. if tx.GasFeeCap().BitLen() > 256 {
  529. return ErrFeeCapVeryHigh
  530. }
  531. if tx.GasTipCap().BitLen() > 256 {
  532. return ErrTipVeryHigh
  533. }
  534. // Ensure gasFeeCap is greater than or equal to gasTipCap.
  535. if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
  536. return ErrTipAboveFeeCap
  537. }
  538. // Make sure the transaction is signed properly.
  539. from, err := types.Sender(pool.signer, tx)
  540. if err != nil {
  541. return ErrInvalidSender
  542. }
  543. // Drop non-local transactions under our own minimal accepted gas price or tip
  544. if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
  545. return ErrUnderpriced
  546. }
  547. // Ensure the transaction adheres to nonce ordering
  548. if pool.currentState.GetNonce(from) > tx.Nonce() {
  549. return ErrNonceTooLow
  550. }
  551. // Transactor should have enough funds to cover the costs
  552. // cost == V + GP * GL
  553. if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
  554. return ErrInsufficientFunds
  555. }
  556. // Ensure the transaction has more gas than the basic tx fee.
  557. intrGas, err := IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul)
  558. if err != nil {
  559. return err
  560. }
  561. if tx.Gas() < intrGas {
  562. return ErrIntrinsicGas
  563. }
  564. return nil
  565. }
  566. // add validates a transaction and inserts it into the non-executable queue for later
  567. // pending promotion and execution. If the transaction is a replacement for an already
  568. // pending or queued one, it overwrites the previous transaction if its price is higher.
  569. //
  570. // If a newly added transaction is marked as local, its sending account will be
  571. // be added to the allowlist, preventing any associated transaction from being dropped
  572. // out of the pool due to pricing constraints.
  573. func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
  574. // If the transaction is already known, discard it
  575. hash := tx.Hash()
  576. if pool.all.Get(hash) != nil {
  577. log.Trace("Discarding already known transaction", "hash", hash)
  578. knownTxMeter.Mark(1)
  579. return false, ErrAlreadyKnown
  580. }
  581. // Make the local flag. If it's from local source or it's from the network but
  582. // the sender is marked as local previously, treat it as the local transaction.
  583. isLocal := local || pool.locals.containsTx(tx)
  584. // If the transaction fails basic validation, discard it
  585. if err := pool.validateTx(tx, isLocal); err != nil {
  586. log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
  587. invalidTxMeter.Mark(1)
  588. return false, err
  589. }
  590. // If the transaction pool is full, discard underpriced transactions
  591. if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
  592. // If the new transaction is underpriced, don't accept it
  593. if !isLocal && pool.priced.Underpriced(tx) {
  594. log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
  595. underpricedTxMeter.Mark(1)
  596. return false, ErrUnderpriced
  597. }
  598. // We're about to replace a transaction. The reorg does a more thorough
  599. // analysis of what to remove and how, but it runs async. We don't want to
  600. // do too many replacements between reorg-runs, so we cap the number of
  601. // replacements to 25% of the slots
  602. if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) {
  603. throttleTxMeter.Mark(1)
  604. return false, ErrTxPoolOverflow
  605. }
  606. // New transaction is better than our worse ones, make room for it.
  607. // If it's a local transaction, forcibly discard all available transactions.
  608. // Otherwise if we can't make enough room for new one, abort the operation.
  609. drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
  610. // Special case, we still can't make the room for the new remote one.
  611. if !isLocal && !success {
  612. log.Trace("Discarding overflown transaction", "hash", hash)
  613. overflowedTxMeter.Mark(1)
  614. return false, ErrTxPoolOverflow
  615. }
  616. // Bump the counter of rejections-since-reorg
  617. pool.changesSinceReorg += len(drop)
  618. // Kick out the underpriced remote transactions.
  619. for _, tx := range drop {
  620. log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
  621. underpricedTxMeter.Mark(1)
  622. pool.removeTx(tx.Hash(), false)
  623. }
  624. }
  625. // Try to replace an existing transaction in the pending pool
  626. from, _ := types.Sender(pool.signer, tx) // already validated
  627. if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
  628. // Nonce already pending, check if required price bump is met
  629. inserted, old := list.Add(tx, pool.config.PriceBump)
  630. if !inserted {
  631. pendingDiscardMeter.Mark(1)
  632. return false, ErrReplaceUnderpriced
  633. }
  634. // New transaction is better, replace old one
  635. if old != nil {
  636. pool.all.Remove(old.Hash())
  637. pool.priced.Removed(1)
  638. pendingReplaceMeter.Mark(1)
  639. }
  640. pool.all.Add(tx, isLocal)
  641. pool.priced.Put(tx, isLocal)
  642. pool.journalTx(from, tx)
  643. pool.queueTxEvent(tx)
  644. log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
  645. // Successful promotion, bump the heartbeat
  646. pool.beats[from] = time.Now()
  647. return old != nil, nil
  648. }
  649. // New transaction isn't replacing a pending one, push into queue
  650. replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
  651. if err != nil {
  652. return false, err
  653. }
  654. // Mark local addresses and journal local transactions
  655. if local && !pool.locals.contains(from) {
  656. log.Info("Setting new local account", "address", from)
  657. pool.locals.add(from)
  658. pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
  659. }
  660. if isLocal {
  661. localGauge.Inc(1)
  662. }
  663. pool.journalTx(from, tx)
  664. log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
  665. return replaced, nil
  666. }
  667. // enqueueTx inserts a new transaction into the non-executable transaction queue.
  668. //
  669. // Note, this method assumes the pool lock is held!
  670. func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
  671. // Try to insert the transaction into the future queue
  672. from, _ := types.Sender(pool.signer, tx) // already validated
  673. if pool.queue[from] == nil {
  674. pool.queue[from] = newTxList(false)
  675. }
  676. inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
  677. if !inserted {
  678. // An older transaction was better, discard this
  679. queuedDiscardMeter.Mark(1)
  680. return false, ErrReplaceUnderpriced
  681. }
  682. // Discard any previous transaction and mark this
  683. if old != nil {
  684. pool.all.Remove(old.Hash())
  685. pool.priced.Removed(1)
  686. queuedReplaceMeter.Mark(1)
  687. } else {
  688. // Nothing was replaced, bump the queued counter
  689. queuedGauge.Inc(1)
  690. }
  691. // If the transaction isn't in lookup set but it's expected to be there,
  692. // show the error log.
  693. if pool.all.Get(hash) == nil && !addAll {
  694. log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
  695. }
  696. if addAll {
  697. pool.all.Add(tx, local)
  698. pool.priced.Put(tx, local)
  699. }
  700. // If we never record the heartbeat, do it right now.
  701. if _, exist := pool.beats[from]; !exist {
  702. pool.beats[from] = time.Now()
  703. }
  704. return old != nil, nil
  705. }
  706. // journalTx adds the specified transaction to the local disk journal if it is
  707. // deemed to have been sent from a local account.
  708. func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
  709. // Only journal if it's enabled and the transaction is local
  710. if pool.journal == nil || !pool.locals.contains(from) {
  711. return
  712. }
  713. if err := pool.journal.insert(tx); err != nil {
  714. log.Warn("Failed to journal local transaction", "err", err)
  715. }
  716. }
  717. // promoteTx adds a transaction to the pending (processable) list of transactions
  718. // and returns whether it was inserted or an older was better.
  719. //
  720. // Note, this method assumes the pool lock is held!
  721. func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
  722. // Try to insert the transaction into the pending queue
  723. if pool.pending[addr] == nil {
  724. pool.pending[addr] = newTxList(true)
  725. }
  726. list := pool.pending[addr]
  727. inserted, old := list.Add(tx, pool.config.PriceBump)
  728. if !inserted {
  729. // An older transaction was better, discard this
  730. pool.all.Remove(hash)
  731. pool.priced.Removed(1)
  732. pendingDiscardMeter.Mark(1)
  733. return false
  734. }
  735. // Otherwise discard any previous transaction and mark this
  736. if old != nil {
  737. pool.all.Remove(old.Hash())
  738. pool.priced.Removed(1)
  739. pendingReplaceMeter.Mark(1)
  740. } else {
  741. // Nothing was replaced, bump the pending counter
  742. pendingGauge.Inc(1)
  743. }
  744. // Set the potentially new pending nonce and notify any subsystems of the new tx
  745. pool.pendingNonces.set(addr, tx.Nonce()+1)
  746. // Successful promotion, bump the heartbeat
  747. pool.beats[addr] = time.Now()
  748. return true
  749. }
  750. // AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
  751. // senders as a local ones, ensuring they go around the local pricing constraints.
  752. //
  753. // This method is used to add transactions from the RPC API and performs synchronous pool
  754. // reorganization and event propagation.
  755. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
  756. return pool.addTxs(txs, !pool.config.NoLocals, true)
  757. }
  758. // AddLocal enqueues a single local transaction into the pool if it is valid. This is
  759. // a convenience wrapper aroundd AddLocals.
  760. func (pool *TxPool) AddLocal(tx *types.Transaction) error {
  761. errs := pool.AddLocals([]*types.Transaction{tx})
  762. return errs[0]
  763. }
  764. // AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
  765. // senders are not among the locally tracked ones, full pricing constraints will apply.
  766. //
  767. // This method is used to add transactions from the p2p network and does not wait for pool
  768. // reorganization and internal event propagation.
  769. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
  770. return pool.addTxs(txs, false, false)
  771. }
  772. // This is like AddRemotes, but waits for pool reorganization. Tests use this method.
  773. func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error {
  774. return pool.addTxs(txs, false, true)
  775. }
  776. // This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
  777. func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
  778. errs := pool.AddRemotesSync([]*types.Transaction{tx})
  779. return errs[0]
  780. }
  781. // AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience
  782. // wrapper around AddRemotes.
  783. //
  784. // Deprecated: use AddRemotes
  785. func (pool *TxPool) AddRemote(tx *types.Transaction) error {
  786. errs := pool.AddRemotes([]*types.Transaction{tx})
  787. return errs[0]
  788. }
  789. // addTxs attempts to queue a batch of transactions if they are valid.
  790. func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
  791. // Filter out known ones without obtaining the pool lock or recovering signatures
  792. var (
  793. errs = make([]error, len(txs))
  794. news = make([]*types.Transaction, 0, len(txs))
  795. )
  796. for i, tx := range txs {
  797. // If the transaction is known, pre-set the error slot
  798. if pool.all.Get(tx.Hash()) != nil {
  799. errs[i] = ErrAlreadyKnown
  800. knownTxMeter.Mark(1)
  801. continue
  802. }
  803. // Exclude transactions with invalid signatures as soon as
  804. // possible and cache senders in transactions before
  805. // obtaining lock
  806. _, err := types.Sender(pool.signer, tx)
  807. if err != nil {
  808. errs[i] = ErrInvalidSender
  809. invalidTxMeter.Mark(1)
  810. continue
  811. }
  812. // Accumulate all unknown transactions for deeper processing
  813. news = append(news, tx)
  814. }
  815. if len(news) == 0 {
  816. return errs
  817. }
  818. // Process all the new transaction and merge any errors into the original slice
  819. pool.mu.Lock()
  820. newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
  821. pool.mu.Unlock()
  822. var nilSlot = 0
  823. for _, err := range newErrs {
  824. for errs[nilSlot] != nil {
  825. nilSlot++
  826. }
  827. errs[nilSlot] = err
  828. nilSlot++
  829. }
  830. // Reorg the pool internals if needed and return
  831. done := pool.requestPromoteExecutables(dirtyAddrs)
  832. if sync {
  833. <-done
  834. }
  835. return errs
  836. }
  837. // addTxsLocked attempts to queue a batch of transactions if they are valid.
  838. // The transaction pool lock must be held.
  839. func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
  840. dirty := newAccountSet(pool.signer)
  841. errs := make([]error, len(txs))
  842. for i, tx := range txs {
  843. replaced, err := pool.add(tx, local)
  844. errs[i] = err
  845. if err == nil && !replaced {
  846. dirty.addTx(tx)
  847. }
  848. }
  849. validTxMeter.Mark(int64(len(dirty.accounts)))
  850. return errs, dirty
  851. }
  852. // Status returns the status (unknown/pending/queued) of a batch of transactions
  853. // identified by their hashes.
  854. func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
  855. status := make([]TxStatus, len(hashes))
  856. for i, hash := range hashes {
  857. tx := pool.Get(hash)
  858. if tx == nil {
  859. continue
  860. }
  861. from, _ := types.Sender(pool.signer, tx) // already validated
  862. pool.mu.RLock()
  863. if txList := pool.pending[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
  864. status[i] = TxStatusPending
  865. } else if txList := pool.queue[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
  866. status[i] = TxStatusQueued
  867. }
  868. // implicit else: the tx may have been included into a block between
  869. // checking pool.Get and obtaining the lock. In that case, TxStatusUnknown is correct
  870. pool.mu.RUnlock()
  871. }
  872. return status
  873. }
  874. // Get returns a transaction if it is contained in the pool and nil otherwise.
  875. func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
  876. return pool.all.Get(hash)
  877. }
  878. // Has returns an indicator whether txpool has a transaction cached with the
  879. // given hash.
  880. func (pool *TxPool) Has(hash common.Hash) bool {
  881. return pool.all.Get(hash) != nil
  882. }
  883. // removeTx removes a single transaction from the queue, moving all subsequent
  884. // transactions back to the future queue.
  885. func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
  886. // Fetch the transaction we wish to delete
  887. tx := pool.all.Get(hash)
  888. if tx == nil {
  889. return
  890. }
  891. addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
  892. // Remove it from the list of known transactions
  893. pool.all.Remove(hash)
  894. if outofbound {
  895. pool.priced.Removed(1)
  896. }
  897. if pool.locals.contains(addr) {
  898. localGauge.Dec(1)
  899. }
  900. // Remove the transaction from the pending lists and reset the account nonce
  901. if pending := pool.pending[addr]; pending != nil {
  902. if removed, invalids := pending.Remove(tx); removed {
  903. // If no more pending transactions are left, remove the list
  904. if pending.Empty() {
  905. delete(pool.pending, addr)
  906. }
  907. // Postpone any invalidated transactions
  908. for _, tx := range invalids {
  909. // Internal shuffle shouldn't touch the lookup set.
  910. pool.enqueueTx(tx.Hash(), tx, false, false)
  911. }
  912. // Update the account nonce if needed
  913. pool.pendingNonces.setIfLower(addr, tx.Nonce())
  914. // Reduce the pending counter
  915. pendingGauge.Dec(int64(1 + len(invalids)))
  916. return
  917. }
  918. }
  919. // Transaction is in the future queue
  920. if future := pool.queue[addr]; future != nil {
  921. if removed, _ := future.Remove(tx); removed {
  922. // Reduce the queued counter
  923. queuedGauge.Dec(1)
  924. }
  925. if future.Empty() {
  926. delete(pool.queue, addr)
  927. delete(pool.beats, addr)
  928. }
  929. }
  930. }
  931. // requestReset requests a pool reset to the new head block.
  932. // The returned channel is closed when the reset has occurred.
  933. func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
  934. select {
  935. case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
  936. return <-pool.reorgDoneCh
  937. case <-pool.reorgShutdownCh:
  938. return pool.reorgShutdownCh
  939. }
  940. }
  941. // requestPromoteExecutables requests transaction promotion checks for the given addresses.
  942. // The returned channel is closed when the promotion checks have occurred.
  943. func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
  944. select {
  945. case pool.reqPromoteCh <- set:
  946. return <-pool.reorgDoneCh
  947. case <-pool.reorgShutdownCh:
  948. return pool.reorgShutdownCh
  949. }
  950. }
  951. // queueTxEvent enqueues a transaction event to be sent in the next reorg run.
  952. func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
  953. select {
  954. case pool.queueTxEventCh <- tx:
  955. case <-pool.reorgShutdownCh:
  956. }
  957. }
  958. // scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
  959. // call those methods directly, but request them being run using requestReset and
  960. // requestPromoteExecutables instead.
  961. func (pool *TxPool) scheduleReorgLoop() {
  962. defer pool.wg.Done()
  963. var (
  964. curDone chan struct{} // non-nil while runReorg is active
  965. nextDone = make(chan struct{})
  966. launchNextRun bool
  967. reset *txpoolResetRequest
  968. dirtyAccounts *accountSet
  969. queuedEvents = make(map[common.Address]*txSortedMap)
  970. )
  971. for {
  972. // Launch next background reorg if needed
  973. if curDone == nil && launchNextRun {
  974. // Run the background reorg and announcements
  975. go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
  976. // Prepare everything for the next round of reorg
  977. curDone, nextDone = nextDone, make(chan struct{})
  978. launchNextRun = false
  979. reset, dirtyAccounts = nil, nil
  980. queuedEvents = make(map[common.Address]*txSortedMap)
  981. }
  982. select {
  983. case req := <-pool.reqResetCh:
  984. // Reset request: update head if request is already pending.
  985. if reset == nil {
  986. reset = req
  987. } else {
  988. reset.newHead = req.newHead
  989. }
  990. launchNextRun = true
  991. pool.reorgDoneCh <- nextDone
  992. case req := <-pool.reqPromoteCh:
  993. // Promote request: update address set if request is already pending.
  994. if dirtyAccounts == nil {
  995. dirtyAccounts = req
  996. } else {
  997. dirtyAccounts.merge(req)
  998. }
  999. launchNextRun = true
  1000. pool.reorgDoneCh <- nextDone
  1001. case tx := <-pool.queueTxEventCh:
  1002. // Queue up the event, but don't schedule a reorg. It's up to the caller to
  1003. // request one later if they want the events sent.
  1004. addr, _ := types.Sender(pool.signer, tx)
  1005. if _, ok := queuedEvents[addr]; !ok {
  1006. queuedEvents[addr] = newTxSortedMap()
  1007. }
  1008. queuedEvents[addr].Put(tx)
  1009. case <-curDone:
  1010. curDone = nil
  1011. case <-pool.reorgShutdownCh:
  1012. // Wait for current run to finish.
  1013. if curDone != nil {
  1014. <-curDone
  1015. }
  1016. close(nextDone)
  1017. return
  1018. }
  1019. }
  1020. }
  1021. // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
  1022. func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) {
  1023. defer func(t0 time.Time) {
  1024. reorgDurationTimer.Update(time.Since(t0))
  1025. }(time.Now())
  1026. defer close(done)
  1027. var promoteAddrs []common.Address
  1028. if dirtyAccounts != nil && reset == nil {
  1029. // Only dirty accounts need to be promoted, unless we're resetting.
  1030. // For resets, all addresses in the tx queue will be promoted and
  1031. // the flatten operation can be avoided.
  1032. promoteAddrs = dirtyAccounts.flatten()
  1033. }
  1034. pool.mu.Lock()
  1035. if reset != nil {
  1036. // Reset from the old head to the new, rescheduling any reorged transactions
  1037. pool.reset(reset.oldHead, reset.newHead)
  1038. // Nonces were reset, discard any events that became stale
  1039. for addr := range events {
  1040. events[addr].Forward(pool.pendingNonces.get(addr))
  1041. if events[addr].Len() == 0 {
  1042. delete(events, addr)
  1043. }
  1044. }
  1045. // Reset needs promote for all addresses
  1046. promoteAddrs = make([]common.Address, 0, len(pool.queue))
  1047. for addr := range pool.queue {
  1048. promoteAddrs = append(promoteAddrs, addr)
  1049. }
  1050. }
  1051. // Check for pending transactions for every account that sent new ones
  1052. promoted := pool.promoteExecutables(promoteAddrs)
  1053. // If a new block appeared, validate the pool of pending transactions. This will
  1054. // remove any transaction that has been included in the block or was invalidated
  1055. // because of another transaction (e.g. higher gas price).
  1056. if reset != nil {
  1057. pool.demoteUnexecutables()
  1058. if reset.newHead != nil && pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
  1059. pendingBaseFee := misc.CalcBaseFee(pool.chainconfig, reset.newHead)
  1060. pool.priced.SetBaseFee(pendingBaseFee)
  1061. }
  1062. // Update all accounts to the latest known pending nonce
  1063. nonces := make(map[common.Address]uint64, len(pool.pending))
  1064. for addr, list := range pool.pending {
  1065. highestPending := list.LastElement()
  1066. nonces[addr] = highestPending.Nonce() + 1
  1067. }
  1068. pool.pendingNonces.setAll(nonces)
  1069. }
  1070. // Ensure pool.queue and pool.pending sizes stay within the configured limits.
  1071. pool.truncatePending()
  1072. pool.truncateQueue()
  1073. dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
  1074. pool.changesSinceReorg = 0 // Reset change counter
  1075. pool.mu.Unlock()
  1076. // Notify subsystems for newly added transactions
  1077. for _, tx := range promoted {
  1078. addr, _ := types.Sender(pool.signer, tx)
  1079. if _, ok := events[addr]; !ok {
  1080. events[addr] = newTxSortedMap()
  1081. }
  1082. events[addr].Put(tx)
  1083. }
  1084. if len(events) > 0 {
  1085. var txs []*types.Transaction
  1086. for _, set := range events {
  1087. txs = append(txs, set.Flatten()...)
  1088. }
  1089. pool.txFeed.Send(NewTxsEvent{txs})
  1090. }
  1091. }
  1092. // reset retrieves the current state of the blockchain and ensures the content
  1093. // of the transaction pool is valid with regard to the chain state.
  1094. func (pool *TxPool) reset(oldHead, newHead *types.Header) {
  1095. // If we're reorging an old state, reinject all dropped transactions
  1096. var reinject types.Transactions
  1097. if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
  1098. // If the reorg is too deep, avoid doing it (will happen during fast sync)
  1099. oldNum := oldHead.Number.Uint64()
  1100. newNum := newHead.Number.Uint64()
  1101. if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
  1102. log.Debug("Skipping deep transaction reorg", "depth", depth)
  1103. } else {
  1104. // Reorg seems shallow enough to pull in all transactions into memory
  1105. var discarded, included types.Transactions
  1106. var (
  1107. rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
  1108. add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
  1109. )
  1110. if rem == nil {
  1111. // This can happen if a setHead is performed, where we simply discard the old
  1112. // head from the chain.
  1113. // If that is the case, we don't have the lost transactions any more, and
  1114. // there's nothing to add
  1115. if newNum >= oldNum {
  1116. // If we reorged to a same or higher number, then it's not a case of setHead
  1117. log.Warn("Transaction pool reset with missing oldhead",
  1118. "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
  1119. return
  1120. }
  1121. // If the reorg ended up on a lower number, it's indicative of setHead being the cause
  1122. log.Debug("Skipping transaction reset caused by setHead",
  1123. "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
  1124. // We still need to update the current state s.th. the lost transactions can be readded by the user
  1125. } else {
  1126. for rem.NumberU64() > add.NumberU64() {
  1127. discarded = append(discarded, rem.Transactions()...)
  1128. if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
  1129. log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
  1130. return
  1131. }
  1132. }
  1133. for add.NumberU64() > rem.NumberU64() {
  1134. included = append(included, add.Transactions()...)
  1135. if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
  1136. log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
  1137. return
  1138. }
  1139. }
  1140. for rem.Hash() != add.Hash() {
  1141. discarded = append(discarded, rem.Transactions()...)
  1142. if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
  1143. log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
  1144. return
  1145. }
  1146. included = append(included, add.Transactions()...)
  1147. if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
  1148. log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
  1149. return
  1150. }
  1151. }
  1152. reinject = types.TxDifference(discarded, included)
  1153. }
  1154. }
  1155. }
  1156. // Initialize the internal state to the current head
  1157. if newHead == nil {
  1158. newHead = pool.chain.CurrentBlock().Header() // Special case during testing
  1159. }
  1160. statedb, err := pool.chain.StateAt(newHead.Root)
  1161. if err != nil {
  1162. log.Error("Failed to reset txpool state", "err", err)
  1163. return
  1164. }
  1165. pool.currentState = statedb
  1166. pool.pendingNonces = newTxNoncer(statedb)
  1167. pool.currentMaxGas = newHead.GasLimit
  1168. // Inject any transactions discarded due to reorgs
  1169. log.Debug("Reinjecting stale transactions", "count", len(reinject))
  1170. senderCacher.recover(pool.signer, reinject)
  1171. pool.addTxsLocked(reinject, false)
  1172. // Update all fork indicator by next pending block number.
  1173. next := new(big.Int).Add(newHead.Number, big.NewInt(1))
  1174. pool.istanbul = pool.chainconfig.IsIstanbul(next)
  1175. pool.eip2718 = pool.chainconfig.IsBerlin(next)
  1176. pool.eip1559 = pool.chainconfig.IsLondon(next)
  1177. }
  1178. // promoteExecutables moves transactions that have become processable from the
  1179. // future queue to the set of pending transactions. During this process, all
  1180. // invalidated transactions (low nonce, low balance) are deleted.
  1181. func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
  1182. // Track the promoted transactions to broadcast them at once
  1183. var promoted []*types.Transaction
  1184. // Iterate over all accounts and promote any executable transactions
  1185. for _, addr := range accounts {
  1186. list := pool.queue[addr]
  1187. if list == nil {
  1188. continue // Just in case someone calls with a non existing account
  1189. }
  1190. // Drop all transactions that are deemed too old (low nonce)
  1191. forwards := list.Forward(pool.currentState.GetNonce(addr))
  1192. for _, tx := range forwards {
  1193. hash := tx.Hash()
  1194. pool.all.Remove(hash)
  1195. }
  1196. log.Trace("Removed old queued transactions", "count", len(forwards))
  1197. // Drop all transactions that are too costly (low balance or out of gas)
  1198. drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
  1199. for _, tx := range drops {
  1200. hash := tx.Hash()
  1201. pool.all.Remove(hash)
  1202. }
  1203. log.Trace("Removed unpayable queued transactions", "count", len(drops))
  1204. queuedNofundsMeter.Mark(int64(len(drops)))
  1205. // Gather all executable transactions and promote them
  1206. readies := list.Ready(pool.pendingNonces.get(addr))
  1207. for _, tx := range readies {
  1208. hash := tx.Hash()
  1209. if pool.promoteTx(addr, hash, tx) {
  1210. promoted = append(promoted, tx)
  1211. }
  1212. }
  1213. log.Trace("Promoted queued transactions", "count", len(promoted))
  1214. queuedGauge.Dec(int64(len(readies)))
  1215. // Drop all transactions over the allowed limit
  1216. var caps types.Transactions
  1217. if !pool.locals.contains(addr) {
  1218. caps = list.Cap(int(pool.config.AccountQueue))
  1219. for _, tx := range caps {
  1220. hash := tx.Hash()
  1221. pool.all.Remove(hash)
  1222. log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
  1223. }
  1224. queuedRateLimitMeter.Mark(int64(len(caps)))
  1225. }
  1226. // Mark all the items dropped as removed
  1227. pool.priced.Removed(len(forwards) + len(drops) + len(caps))
  1228. queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
  1229. if pool.locals.contains(addr) {
  1230. localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
  1231. }
  1232. // Delete the entire queue entry if it became empty.
  1233. if list.Empty() {
  1234. delete(pool.queue, addr)
  1235. delete(pool.beats, addr)
  1236. }
  1237. }
  1238. return promoted
  1239. }
  1240. // truncatePending removes transactions from the pending queue if the pool is above the
  1241. // pending limit. The algorithm tries to reduce transaction counts by an approximately
  1242. // equal number for all for accounts with many pending transactions.
  1243. func (pool *TxPool) truncatePending() {
  1244. pending := uint64(0)
  1245. for _, list := range pool.pending {
  1246. pending += uint64(list.Len())
  1247. }
  1248. if pending <= pool.config.GlobalSlots {
  1249. return
  1250. }
  1251. pendingBeforeCap := pending
  1252. // Assemble a spam order to penalize large transactors first
  1253. spammers := prque.New(nil)
  1254. for addr, list := range pool.pending {
  1255. // Only evict transactions from high rollers
  1256. if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
  1257. spammers.Push(addr, int64(list.Len()))
  1258. }
  1259. }
  1260. // Gradually drop transactions from offenders
  1261. offenders := []common.Address{}
  1262. for pending > pool.config.GlobalSlots && !spammers.Empty() {
  1263. // Retrieve the next offender if not local address
  1264. offender, _ := spammers.Pop()
  1265. offenders = append(offenders, offender.(common.Address))
  1266. // Equalize balances until all the same or below threshold
  1267. if len(offenders) > 1 {
  1268. // Calculate the equalization threshold for all current offenders
  1269. threshold := pool.pending[offender.(common.Address)].Len()
  1270. // Iteratively reduce all offenders until below limit or threshold reached
  1271. for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
  1272. for i := 0; i < len(offenders)-1; i++ {
  1273. list := pool.pending[offenders[i]]
  1274. caps := list.Cap(list.Len() - 1)
  1275. for _, tx := range caps {
  1276. // Drop the transaction from the global pools too
  1277. hash := tx.Hash()
  1278. pool.all.Remove(hash)
  1279. // Update the account nonce to the dropped transaction
  1280. pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
  1281. log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
  1282. }
  1283. pool.priced.Removed(len(caps))
  1284. pendingGauge.Dec(int64(len(caps)))
  1285. if pool.locals.contains(offenders[i]) {
  1286. localGauge.Dec(int64(len(caps)))
  1287. }
  1288. pending--
  1289. }
  1290. }
  1291. }
  1292. }
  1293. // If still above threshold, reduce to limit or min allowance
  1294. if pending > pool.config.GlobalSlots && len(offenders) > 0 {
  1295. for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
  1296. for _, addr := range offenders {
  1297. list := pool.pending[addr]
  1298. caps := list.Cap(list.Len() - 1)
  1299. for _, tx := range caps {
  1300. // Drop the transaction from the global pools too
  1301. hash := tx.Hash()
  1302. pool.all.Remove(hash)
  1303. // Update the account nonce to the dropped transaction
  1304. pool.pendingNonces.setIfLower(addr, tx.Nonce())
  1305. log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
  1306. }
  1307. pool.priced.Removed(len(caps))
  1308. pendingGauge.Dec(int64(len(caps)))
  1309. if pool.locals.contains(addr) {
  1310. localGauge.Dec(int64(len(caps)))
  1311. }
  1312. pending--
  1313. }
  1314. }
  1315. }
  1316. pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
  1317. }
  1318. // truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit.
  1319. func (pool *TxPool) truncateQueue() {
  1320. queued := uint64(0)
  1321. for _, list := range pool.queue {
  1322. queued += uint64(list.Len())
  1323. }
  1324. if queued <= pool.config.GlobalQueue {
  1325. return
  1326. }
  1327. // Sort all accounts with queued transactions by heartbeat
  1328. addresses := make(addressesByHeartbeat, 0, len(pool.queue))
  1329. for addr := range pool.queue {
  1330. if !pool.locals.contains(addr) { // don't drop locals
  1331. addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
  1332. }
  1333. }
  1334. sort.Sort(sort.Reverse(addresses))
  1335. // Drop transactions until the total is below the limit or only locals remain
  1336. for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
  1337. addr := addresses[len(addresses)-1]
  1338. list := pool.queue[addr.address]
  1339. addresses = addresses[:len(addresses)-1]
  1340. // Drop all transactions if they are less than the overflow
  1341. if size := uint64(list.Len()); size <= drop {
  1342. for _, tx := range list.Flatten() {
  1343. pool.removeTx(tx.Hash(), true)
  1344. }
  1345. drop -= size
  1346. queuedRateLimitMeter.Mark(int64(size))
  1347. continue
  1348. }
  1349. // Otherwise drop only last few transactions
  1350. txs := list.Flatten()
  1351. for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
  1352. pool.removeTx(txs[i].Hash(), true)
  1353. drop--
  1354. queuedRateLimitMeter.Mark(1)
  1355. }
  1356. }
  1357. }
  1358. // demoteUnexecutables removes invalid and processed transactions from the pools
  1359. // executable/pending queue and any subsequent transactions that become unexecutable
  1360. // are moved back into the future queue.
  1361. //
  1362. // Note: transactions are not marked as removed in the priced list because re-heaping
  1363. // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful
  1364. // to trigger a re-heap is this function
  1365. func (pool *TxPool) demoteUnexecutables() {
  1366. // Iterate over all accounts and demote any non-executable transactions
  1367. for addr, list := range pool.pending {
  1368. nonce := pool.currentState.GetNonce(addr)
  1369. // Drop all transactions that are deemed too old (low nonce)
  1370. olds := list.Forward(nonce)
  1371. for _, tx := range olds {
  1372. hash := tx.Hash()
  1373. pool.all.Remove(hash)
  1374. log.Trace("Removed old pending transaction", "hash", hash)
  1375. }
  1376. // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
  1377. drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
  1378. for _, tx := range drops {
  1379. hash := tx.Hash()
  1380. log.Trace("Removed unpayable pending transaction", "hash", hash)
  1381. pool.all.Remove(hash)
  1382. }
  1383. pendingNofundsMeter.Mark(int64(len(drops)))
  1384. for _, tx := range invalids {
  1385. hash := tx.Hash()
  1386. log.Trace("Demoting pending transaction", "hash", hash)
  1387. // Internal shuffle shouldn't touch the lookup set.
  1388. pool.enqueueTx(hash, tx, false, false)
  1389. }
  1390. pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
  1391. if pool.locals.contains(addr) {
  1392. localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
  1393. }
  1394. // If there's a gap in front, alert (should never happen) and postpone all transactions
  1395. if list.Len() > 0 && list.txs.Get(nonce) == nil {
  1396. gapped := list.Cap(0)
  1397. for _, tx := range gapped {
  1398. hash := tx.Hash()
  1399. log.Error("Demoting invalidated transaction", "hash", hash)
  1400. // Internal shuffle shouldn't touch the lookup set.
  1401. pool.enqueueTx(hash, tx, false, false)
  1402. }
  1403. pendingGauge.Dec(int64(len(gapped)))
  1404. // This might happen in a reorg, so log it to the metering
  1405. blockReorgInvalidatedTx.Mark(int64(len(gapped)))
  1406. }
  1407. // Delete the entire pending entry if it became empty.
  1408. if list.Empty() {
  1409. delete(pool.pending, addr)
  1410. }
  1411. }
  1412. }
  1413. // addressByHeartbeat is an account address tagged with its last activity timestamp.
  1414. type addressByHeartbeat struct {
  1415. address common.Address
  1416. heartbeat time.Time
  1417. }
  1418. type addressesByHeartbeat []addressByHeartbeat
  1419. func (a addressesByHeartbeat) Len() int { return len(a) }
  1420. func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
  1421. func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  1422. // accountSet is simply a set of addresses to check for existence, and a signer
  1423. // capable of deriving addresses from transactions.
  1424. type accountSet struct {
  1425. accounts map[common.Address]struct{}
  1426. signer types.Signer
  1427. cache *[]common.Address
  1428. }
  1429. // newAccountSet creates a new address set with an associated signer for sender
  1430. // derivations.
  1431. func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
  1432. as := &accountSet{
  1433. accounts: make(map[common.Address]struct{}),
  1434. signer: signer,
  1435. }
  1436. for _, addr := range addrs {
  1437. as.add(addr)
  1438. }
  1439. return as
  1440. }
  1441. // contains checks if a given address is contained within the set.
  1442. func (as *accountSet) contains(addr common.Address) bool {
  1443. _, exist := as.accounts[addr]
  1444. return exist
  1445. }
  1446. // containsTx checks if the sender of a given tx is within the set. If the sender
  1447. // cannot be derived, this method returns false.
  1448. func (as *accountSet) containsTx(tx *types.Transaction) bool {
  1449. if addr, err := types.Sender(as.signer, tx); err == nil {
  1450. return as.contains(addr)
  1451. }
  1452. return false
  1453. }
  1454. // add inserts a new address into the set to track.
  1455. func (as *accountSet) add(addr common.Address) {
  1456. as.accounts[addr] = struct{}{}
  1457. as.cache = nil
  1458. }
  1459. // addTx adds the sender of tx into the set.
  1460. func (as *accountSet) addTx(tx *types.Transaction) {
  1461. if addr, err := types.Sender(as.signer, tx); err == nil {
  1462. as.add(addr)
  1463. }
  1464. }
  1465. // flatten returns the list of addresses within this set, also caching it for later
  1466. // reuse. The returned slice should not be changed!
  1467. func (as *accountSet) flatten() []common.Address {
  1468. if as.cache == nil {
  1469. accounts := make([]common.Address, 0, len(as.accounts))
  1470. for account := range as.accounts {
  1471. accounts = append(accounts, account)
  1472. }
  1473. as.cache = &accounts
  1474. }
  1475. return *as.cache
  1476. }
  1477. // merge adds all addresses from the 'other' set into 'as'.
  1478. func (as *accountSet) merge(other *accountSet) {
  1479. for addr := range other.accounts {
  1480. as.accounts[addr] = struct{}{}
  1481. }
  1482. as.cache = nil
  1483. }
  1484. // txLookup is used internally by TxPool to track transactions while allowing
  1485. // lookup without mutex contention.
  1486. //
  1487. // Note, although this type is properly protected against concurrent access, it
  1488. // is **not** a type that should ever be mutated or even exposed outside of the
  1489. // transaction pool, since its internal state is tightly coupled with the pools
  1490. // internal mechanisms. The sole purpose of the type is to permit out-of-bound
  1491. // peeking into the pool in TxPool.Get without having to acquire the widely scoped
  1492. // TxPool.mu mutex.
  1493. //
  1494. // This lookup set combines the notion of "local transactions", which is useful
  1495. // to build upper-level structure.
  1496. type txLookup struct {
  1497. slots int
  1498. lock sync.RWMutex
  1499. locals map[common.Hash]*types.Transaction
  1500. remotes map[common.Hash]*types.Transaction
  1501. }
  1502. // newTxLookup returns a new txLookup structure.
  1503. func newTxLookup() *txLookup {
  1504. return &txLookup{
  1505. locals: make(map[common.Hash]*types.Transaction),
  1506. remotes: make(map[common.Hash]*types.Transaction),
  1507. }
  1508. }
  1509. // Range calls f on each key and value present in the map. The callback passed
  1510. // should return the indicator whether the iteration needs to be continued.
  1511. // Callers need to specify which set (or both) to be iterated.
  1512. func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) {
  1513. t.lock.RLock()
  1514. defer t.lock.RUnlock()
  1515. if local {
  1516. for key, value := range t.locals {
  1517. if !f(key, value, true) {
  1518. return
  1519. }
  1520. }
  1521. }
  1522. if remote {
  1523. for key, value := range t.remotes {
  1524. if !f(key, value, false) {
  1525. return
  1526. }
  1527. }
  1528. }
  1529. }
  1530. // Get returns a transaction if it exists in the lookup, or nil if not found.
  1531. func (t *txLookup) Get(hash common.Hash) *types.Transaction {
  1532. t.lock.RLock()
  1533. defer t.lock.RUnlock()
  1534. if tx := t.locals[hash]; tx != nil {
  1535. return tx
  1536. }
  1537. return t.remotes[hash]
  1538. }
  1539. // GetLocal returns a transaction if it exists in the lookup, or nil if not found.
  1540. func (t *txLookup) GetLocal(hash common.Hash) *types.Transaction {
  1541. t.lock.RLock()
  1542. defer t.lock.RUnlock()
  1543. return t.locals[hash]
  1544. }
  1545. // GetRemote returns a transaction if it exists in the lookup, or nil if not found.
  1546. func (t *txLookup) GetRemote(hash common.Hash) *types.Transaction {
  1547. t.lock.RLock()
  1548. defer t.lock.RUnlock()
  1549. return t.remotes[hash]
  1550. }
  1551. // Count returns the current number of transactions in the lookup.
  1552. func (t *txLookup) Count() int {
  1553. t.lock.RLock()
  1554. defer t.lock.RUnlock()
  1555. return len(t.locals) + len(t.remotes)
  1556. }
  1557. // LocalCount returns the current number of local transactions in the lookup.
  1558. func (t *txLookup) LocalCount() int {
  1559. t.lock.RLock()
  1560. defer t.lock.RUnlock()
  1561. return len(t.locals)
  1562. }
  1563. // RemoteCount returns the current number of remote transactions in the lookup.
  1564. func (t *txLookup) RemoteCount() int {
  1565. t.lock.RLock()
  1566. defer t.lock.RUnlock()
  1567. return len(t.remotes)
  1568. }
  1569. // Slots returns the current number of slots used in the lookup.
  1570. func (t *txLookup) Slots() int {
  1571. t.lock.RLock()
  1572. defer t.lock.RUnlock()
  1573. return t.slots
  1574. }
  1575. // Add adds a transaction to the lookup.
  1576. func (t *txLookup) Add(tx *types.Transaction, local bool) {
  1577. t.lock.Lock()
  1578. defer t.lock.Unlock()
  1579. t.slots += numSlots(tx)
  1580. slotsGauge.Update(int64(t.slots))
  1581. if local {
  1582. t.locals[tx.Hash()] = tx
  1583. } else {
  1584. t.remotes[tx.Hash()] = tx
  1585. }
  1586. }
  1587. // Remove removes a transaction from the lookup.
  1588. func (t *txLookup) Remove(hash common.Hash) {
  1589. t.lock.Lock()
  1590. defer t.lock.Unlock()
  1591. tx, ok := t.locals[hash]
  1592. if !ok {
  1593. tx, ok = t.remotes[hash]
  1594. }
  1595. if !ok {
  1596. log.Error("No transaction found to be deleted", "hash", hash)
  1597. return
  1598. }
  1599. t.slots -= numSlots(tx)
  1600. slotsGauge.Update(int64(t.slots))
  1601. delete(t.locals, hash)
  1602. delete(t.remotes, hash)
  1603. }
  1604. // RemoteToLocals migrates the transactions belongs to the given locals to locals
  1605. // set. The assumption is held the locals set is thread-safe to be used.
  1606. func (t *txLookup) RemoteToLocals(locals *accountSet) int {
  1607. t.lock.Lock()
  1608. defer t.lock.Unlock()
  1609. var migrated int
  1610. for hash, tx := range t.remotes {
  1611. if locals.containsTx(tx) {
  1612. t.locals[hash] = tx
  1613. delete(t.remotes, hash)
  1614. migrated += 1
  1615. }
  1616. }
  1617. return migrated
  1618. }
  1619. // RemotesBelowTip finds all remote transactions below the given tip threshold.
  1620. func (t *txLookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
  1621. found := make(types.Transactions, 0, 128)
  1622. t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
  1623. if tx.GasTipCapIntCmp(threshold) < 0 {
  1624. found = append(found, tx)
  1625. }
  1626. return true
  1627. }, false, true) // Only iterate remotes
  1628. return found
  1629. }
  1630. // numSlots calculates the number of slots needed for a single transaction.
  1631. func numSlots(tx *types.Transaction) int {
  1632. return int((tx.Size() + txSlotSize - 1) / txSlotSize)
  1633. }