transaction_pool.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package core
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/big"
  6. "sort"
  7. "sync"
  8. "github.com/ethereum/go-ethereum/common"
  9. "github.com/ethereum/go-ethereum/core/state"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/ethereum/go-ethereum/event"
  12. "github.com/ethereum/go-ethereum/logger"
  13. "github.com/ethereum/go-ethereum/logger/glog"
  14. )
  15. var (
  16. // Transaction Pool Errors
  17. ErrInvalidSender = errors.New("Invalid sender")
  18. ErrNonce = errors.New("Nonce too low")
  19. ErrCheap = errors.New("Gas price too low for acceptance")
  20. ErrBalance = errors.New("Insufficient balance")
  21. ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
  22. ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
  23. ErrIntrinsicGas = errors.New("Intrinsic gas too low")
  24. ErrGasLimit = errors.New("Exceeds block gas limit")
  25. ErrNegativeValue = errors.New("Negative value")
  26. )
  27. const (
  28. maxQueued = 64 // max limit of queued txs per address
  29. )
  30. type stateFn func() *state.StateDB
  31. // TxPool contains all currently known transactions. Transactions
  32. // enter the pool when they are received from the network or submitted
  33. // locally. They exit the pool when they are included in the blockchain.
  34. //
  35. // The pool separates processable transactions (which can be applied to the
  36. // current state) and future transactions. Transactions move between those
  37. // two states over time as they are received and processed.
  38. type TxPool struct {
  39. quit chan bool // Quiting channel
  40. currentState stateFn // The state function which will allow us to do some pre checkes
  41. pendingState *state.ManagedState
  42. gasLimit func() *big.Int // The current gas limit function callback
  43. minGasPrice *big.Int
  44. eventMux *event.TypeMux
  45. events event.Subscription
  46. mu sync.RWMutex
  47. pending map[common.Hash]*types.Transaction // processable transactions
  48. queue map[common.Address]map[common.Hash]*types.Transaction
  49. }
  50. func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
  51. pool := &TxPool{
  52. pending: make(map[common.Hash]*types.Transaction),
  53. queue: make(map[common.Address]map[common.Hash]*types.Transaction),
  54. quit: make(chan bool),
  55. eventMux: eventMux,
  56. currentState: currentStateFn,
  57. gasLimit: gasLimitFn,
  58. minGasPrice: new(big.Int),
  59. pendingState: state.ManageState(currentStateFn()),
  60. events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}),
  61. }
  62. go pool.eventLoop()
  63. return pool
  64. }
  65. func (pool *TxPool) eventLoop() {
  66. // Track chain events. When a chain events occurs (new chain canon block)
  67. // we need to know the new state. The new state will help us determine
  68. // the nonces in the managed state
  69. for ev := range pool.events.Chan() {
  70. pool.mu.Lock()
  71. switch ev := ev.(type) {
  72. case ChainHeadEvent:
  73. pool.resetState()
  74. case GasPriceChanged:
  75. pool.minGasPrice = ev.Price
  76. }
  77. pool.mu.Unlock()
  78. }
  79. }
  80. func (pool *TxPool) resetState() {
  81. pool.pendingState = state.ManageState(pool.currentState())
  82. // validate the pool of pending transactions, this will remove
  83. // any transactions that have been included in the block or
  84. // have been invalidated because of another transaction (e.g.
  85. // higher gas price)
  86. pool.validatePool()
  87. // Loop over the pending transactions and base the nonce of the new
  88. // pending transaction set.
  89. for _, tx := range pool.pending {
  90. if addr, err := tx.From(); err == nil {
  91. // Set the nonce. Transaction nonce can never be lower
  92. // than the state nonce; validatePool took care of that.
  93. if pool.pendingState.GetNonce(addr) < tx.Nonce() {
  94. pool.pendingState.SetNonce(addr, tx.Nonce())
  95. }
  96. }
  97. }
  98. // Check the queue and move transactions over to the pending if possible
  99. // or remove those that have become invalid
  100. pool.checkQueue()
  101. }
  102. func (pool *TxPool) Stop() {
  103. close(pool.quit)
  104. pool.events.Unsubscribe()
  105. glog.V(logger.Info).Infoln("TX Pool stopped")
  106. }
  107. func (pool *TxPool) State() *state.ManagedState {
  108. pool.mu.RLock()
  109. defer pool.mu.RUnlock()
  110. return pool.pendingState
  111. }
  112. func (pool *TxPool) Stats() (pending int, queued int) {
  113. pool.mu.RLock()
  114. defer pool.mu.RUnlock()
  115. pending = len(pool.pending)
  116. for _, txs := range pool.queue {
  117. queued += len(txs)
  118. }
  119. return
  120. }
  121. // validateTx checks whether a transaction is valid according
  122. // to the consensus rules.
  123. func (pool *TxPool) validateTx(tx *types.Transaction) error {
  124. // Validate sender
  125. var (
  126. from common.Address
  127. err error
  128. )
  129. // Drop transactions under our own minimal accepted gas price
  130. if pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
  131. return ErrCheap
  132. }
  133. // Validate the transaction sender and it's sig. Throw
  134. // if the from fields is invalid.
  135. if from, err = tx.From(); err != nil {
  136. return ErrInvalidSender
  137. }
  138. // Make sure the account exist. Non existent accounts
  139. // haven't got funds and well therefor never pass.
  140. if !pool.currentState().HasAccount(from) {
  141. return ErrNonExistentAccount
  142. }
  143. // Last but not least check for nonce errors
  144. if pool.currentState().GetNonce(from) > tx.Nonce() {
  145. return ErrNonce
  146. }
  147. // Check the transaction doesn't exceed the current
  148. // block limit gas.
  149. if pool.gasLimit().Cmp(tx.Gas()) < 0 {
  150. return ErrGasLimit
  151. }
  152. // Transactions can't be negative. This may never happen
  153. // using RLP decoded transactions but may occur if you create
  154. // a transaction using the RPC for example.
  155. if tx.Value().Cmp(common.Big0) < 0 {
  156. return ErrNegativeValue
  157. }
  158. // Transactor should have enough funds to cover the costs
  159. // cost == V + GP * GL
  160. if pool.currentState().GetBalance(from).Cmp(tx.Cost()) < 0 {
  161. return ErrInsufficientFunds
  162. }
  163. // Should supply enough intrinsic gas
  164. if tx.Gas().Cmp(IntrinsicGas(tx.Data())) < 0 {
  165. return ErrIntrinsicGas
  166. }
  167. return nil
  168. }
  169. // validate and queue transactions.
  170. func (self *TxPool) add(tx *types.Transaction) error {
  171. hash := tx.Hash()
  172. if self.pending[hash] != nil {
  173. return fmt.Errorf("Known transaction (%x)", hash[:4])
  174. }
  175. err := self.validateTx(tx)
  176. if err != nil {
  177. return err
  178. }
  179. self.queueTx(hash, tx)
  180. if glog.V(logger.Debug) {
  181. var toname string
  182. if to := tx.To(); to != nil {
  183. toname = common.Bytes2Hex(to[:4])
  184. } else {
  185. toname = "[NEW_CONTRACT]"
  186. }
  187. // we can ignore the error here because From is
  188. // verified in ValidateTransaction.
  189. f, _ := tx.From()
  190. from := common.Bytes2Hex(f[:4])
  191. glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
  192. }
  193. return nil
  194. }
  195. // queueTx will queue an unknown transaction
  196. func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
  197. from, _ := tx.From() // already validated
  198. if self.queue[from] == nil {
  199. self.queue[from] = make(map[common.Hash]*types.Transaction)
  200. }
  201. self.queue[from][hash] = tx
  202. }
  203. // addTx will add a transaction to the pending (processable queue) list of transactions
  204. func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
  205. if _, ok := pool.pending[hash]; !ok {
  206. pool.pending[hash] = tx
  207. // Increment the nonce on the pending state. This can only happen if
  208. // the nonce is +1 to the previous one.
  209. pool.pendingState.SetNonce(addr, tx.Nonce()+1)
  210. // Notify the subscribers. This event is posted in a goroutine
  211. // because it's possible that somewhere during the post "Remove transaction"
  212. // gets called which will then wait for the global tx pool lock and deadlock.
  213. go pool.eventMux.Post(TxPreEvent{tx})
  214. }
  215. }
  216. // Add queues a single transaction in the pool if it is valid.
  217. func (self *TxPool) Add(tx *types.Transaction) (err error) {
  218. self.mu.Lock()
  219. defer self.mu.Unlock()
  220. err = self.add(tx)
  221. if err == nil {
  222. // check and validate the queueue
  223. self.checkQueue()
  224. }
  225. return
  226. }
  227. // AddTransactions attempts to queue all valid transactions in txs.
  228. func (self *TxPool) AddTransactions(txs []*types.Transaction) {
  229. self.mu.Lock()
  230. defer self.mu.Unlock()
  231. for _, tx := range txs {
  232. if err := self.add(tx); err != nil {
  233. glog.V(logger.Debug).Infoln("tx error:", err)
  234. } else {
  235. h := tx.Hash()
  236. glog.V(logger.Debug).Infof("tx %x\n", h[:4])
  237. }
  238. }
  239. // check and validate the queueue
  240. self.checkQueue()
  241. }
  242. // GetTransaction returns a transaction if it is contained in the pool
  243. // and nil otherwise.
  244. func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
  245. // check the txs first
  246. if tx, ok := tp.pending[hash]; ok {
  247. return tx
  248. }
  249. // check queue
  250. for _, txs := range tp.queue {
  251. if tx, ok := txs[hash]; ok {
  252. return tx
  253. }
  254. }
  255. return nil
  256. }
  257. // GetTransactions returns all currently processable transactions.
  258. // The returned slice may be modified by the caller.
  259. func (self *TxPool) GetTransactions() (txs types.Transactions) {
  260. self.mu.Lock()
  261. defer self.mu.Unlock()
  262. // check queue first
  263. self.checkQueue()
  264. // invalidate any txs
  265. self.validatePool()
  266. txs = make(types.Transactions, len(self.pending))
  267. i := 0
  268. for _, tx := range self.pending {
  269. txs[i] = tx
  270. i++
  271. }
  272. return txs
  273. }
  274. // GetQueuedTransactions returns all non-processable transactions.
  275. func (self *TxPool) GetQueuedTransactions() types.Transactions {
  276. self.mu.RLock()
  277. defer self.mu.RUnlock()
  278. var ret types.Transactions
  279. for _, txs := range self.queue {
  280. for _, tx := range txs {
  281. ret = append(ret, tx)
  282. }
  283. }
  284. sort.Sort(types.TxByNonce{ret})
  285. return ret
  286. }
  287. // RemoveTransactions removes all given transactions from the pool.
  288. func (self *TxPool) RemoveTransactions(txs types.Transactions) {
  289. self.mu.Lock()
  290. defer self.mu.Unlock()
  291. for _, tx := range txs {
  292. self.removeTx(tx.Hash())
  293. }
  294. }
  295. func (pool *TxPool) removeTx(hash common.Hash) {
  296. // delete from pending pool
  297. delete(pool.pending, hash)
  298. // delete from queue
  299. for address, txs := range pool.queue {
  300. if _, ok := txs[hash]; ok {
  301. if len(txs) == 1 {
  302. // if only one tx, remove entire address entry.
  303. delete(pool.queue, address)
  304. } else {
  305. delete(txs, hash)
  306. }
  307. break
  308. }
  309. }
  310. }
  311. // checkQueue moves transactions that have become processable to main pool.
  312. func (pool *TxPool) checkQueue() {
  313. state := pool.pendingState
  314. var addq txQueue
  315. for address, txs := range pool.queue {
  316. // guessed nonce is the nonce currently kept by the tx pool (pending state)
  317. guessedNonce := state.GetNonce(address)
  318. // true nonce is the nonce known by the last state
  319. trueNonce := pool.currentState().GetNonce(address)
  320. addq := addq[:0]
  321. for hash, tx := range txs {
  322. if tx.Nonce() < trueNonce {
  323. // Drop queued transactions whose nonce is lower than
  324. // the account nonce because they have been processed.
  325. delete(txs, hash)
  326. } else {
  327. // Collect the remaining transactions for the next pass.
  328. addq = append(addq, txQueueEntry{hash, address, tx})
  329. }
  330. }
  331. // Find the next consecutive nonce range starting at the
  332. // current account nonce.
  333. sort.Sort(addq)
  334. for i, e := range addq {
  335. // start deleting the transactions from the queue if they exceed the limit
  336. if i > maxQueued {
  337. delete(pool.queue[address], e.hash)
  338. continue
  339. }
  340. if e.Nonce() > guessedNonce {
  341. if len(addq)-i > maxQueued {
  342. if glog.V(logger.Debug) {
  343. glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:]))
  344. }
  345. for j := i + maxQueued; j < len(addq); j++ {
  346. delete(txs, addq[j].hash)
  347. }
  348. }
  349. break
  350. }
  351. delete(txs, e.hash)
  352. pool.addTx(e.hash, address, e.Transaction)
  353. }
  354. // Delete the entire queue entry if it became empty.
  355. if len(txs) == 0 {
  356. delete(pool.queue, address)
  357. }
  358. }
  359. }
  360. // validatePool removes invalid and processed transactions from the main pool.
  361. func (pool *TxPool) validatePool() {
  362. state := pool.currentState()
  363. for hash, tx := range pool.pending {
  364. from, _ := tx.From() // err already checked
  365. // perform light nonce validation
  366. if state.GetNonce(from) > tx.Nonce() {
  367. if glog.V(logger.Core) {
  368. glog.Infof("removed tx (%x) from pool: low tx nonce\n", hash[:4])
  369. }
  370. delete(pool.pending, hash)
  371. }
  372. }
  373. }
  374. type txQueue []txQueueEntry
  375. type txQueueEntry struct {
  376. hash common.Hash
  377. addr common.Address
  378. *types.Transaction
  379. }
  380. func (q txQueue) Len() int { return len(q) }
  381. func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
  382. func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }