transaction_pool.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package core
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/big"
  6. "sort"
  7. "sync"
  8. "github.com/ethereum/go-ethereum/common"
  9. "github.com/ethereum/go-ethereum/core/state"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/ethereum/go-ethereum/event"
  12. "github.com/ethereum/go-ethereum/logger"
  13. "github.com/ethereum/go-ethereum/logger/glog"
  14. )
  15. var (
  16. // Transaction Pool Errors
  17. ErrInvalidSender = errors.New("Invalid sender")
  18. ErrNonce = errors.New("Nonce too low")
  19. ErrBalance = errors.New("Insufficient balance")
  20. ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
  21. ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
  22. ErrIntrinsicGas = errors.New("Intrinsic gas too low")
  23. ErrGasLimit = errors.New("Exceeds block gas limit")
  24. ErrNegativeValue = errors.New("Negative value")
  25. )
  26. type stateFn func() *state.StateDB
  27. // TxPool contains all currently known transactions. Transactions
  28. // enter the pool when they are received from the network or submitted
  29. // locally. They exit the pool when they are included in the blockchain.
  30. //
  31. // The pool separates processable transactions (which can be applied to the
  32. // current state) and future transactions. Transactions move between those
  33. // two states over time as they are received and processed.
  34. type TxPool struct {
  35. quit chan bool // Quiting channel
  36. currentState stateFn // The state function which will allow us to do some pre checkes
  37. state *state.ManagedState
  38. gasLimit func() *big.Int // The current gas limit function callback
  39. eventMux *event.TypeMux
  40. events event.Subscription
  41. mu sync.RWMutex
  42. pending map[common.Hash]*types.Transaction // processable transactions
  43. queue map[common.Address]map[common.Hash]*types.Transaction
  44. }
  45. func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
  46. return &TxPool{
  47. pending: make(map[common.Hash]*types.Transaction),
  48. queue: make(map[common.Address]map[common.Hash]*types.Transaction),
  49. quit: make(chan bool),
  50. eventMux: eventMux,
  51. currentState: currentStateFn,
  52. gasLimit: gasLimitFn,
  53. state: state.ManageState(currentStateFn()),
  54. }
  55. }
  56. func (pool *TxPool) Start() {
  57. pool.events = pool.eventMux.Subscribe(ChainEvent{})
  58. for _ = range pool.events.Chan() {
  59. pool.mu.Lock()
  60. pool.state = state.ManageState(pool.currentState())
  61. for _, tx := range pool.pending {
  62. if addr, err := tx.From(); err == nil {
  63. pool.state.SetNonce(addr, tx.Nonce())
  64. }
  65. }
  66. pool.checkQueue()
  67. pool.mu.Unlock()
  68. }
  69. }
  70. func (pool *TxPool) Stop() {
  71. pool.pending = make(map[common.Hash]*types.Transaction)
  72. close(pool.quit)
  73. pool.events.Unsubscribe()
  74. glog.V(logger.Info).Infoln("TX Pool stopped")
  75. }
  76. func (pool *TxPool) State() *state.ManagedState {
  77. pool.mu.RLock()
  78. defer pool.mu.RUnlock()
  79. return pool.state
  80. }
  81. // validateTx checks whether a transaction is valid according
  82. // to the consensus rules.
  83. func (pool *TxPool) validateTx(tx *types.Transaction) error {
  84. // Validate sender
  85. var (
  86. from common.Address
  87. err error
  88. )
  89. if from, err = tx.From(); err != nil {
  90. return ErrInvalidSender
  91. }
  92. if !pool.currentState().HasAccount(from) {
  93. return ErrNonExistentAccount
  94. }
  95. if pool.gasLimit().Cmp(tx.GasLimit) < 0 {
  96. return ErrGasLimit
  97. }
  98. if tx.Amount.Cmp(common.Big0) < 0 {
  99. return ErrNegativeValue
  100. }
  101. total := new(big.Int).Mul(tx.Price, tx.GasLimit)
  102. total.Add(total, tx.Value())
  103. if pool.currentState().GetBalance(from).Cmp(total) < 0 {
  104. return ErrInsufficientFunds
  105. }
  106. if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
  107. return ErrIntrinsicGas
  108. }
  109. if pool.currentState().GetNonce(from) > tx.Nonce() {
  110. return ErrNonce
  111. }
  112. return nil
  113. }
  114. func (self *TxPool) add(tx *types.Transaction) error {
  115. hash := tx.Hash()
  116. /* XXX I'm unsure about this. This is extremely dangerous and may result
  117. in total black listing of certain transactions
  118. if self.invalidHashes.Has(hash) {
  119. return fmt.Errorf("Invalid transaction (%x)", hash[:4])
  120. }
  121. */
  122. if self.pending[hash] != nil {
  123. return fmt.Errorf("Known transaction (%x)", hash[:4])
  124. }
  125. err := self.validateTx(tx)
  126. if err != nil {
  127. return err
  128. }
  129. self.queueTx(hash, tx)
  130. if glog.V(logger.Debug) {
  131. var toname string
  132. if to := tx.To(); to != nil {
  133. toname = common.Bytes2Hex(to[:4])
  134. } else {
  135. toname = "[NEW_CONTRACT]"
  136. }
  137. // we can ignore the error here because From is
  138. // verified in ValidateTransaction.
  139. f, _ := tx.From()
  140. from := common.Bytes2Hex(f[:4])
  141. glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
  142. }
  143. // check and validate the queueue
  144. self.checkQueue()
  145. return nil
  146. }
  147. // Add queues a single transaction in the pool if it is valid.
  148. func (self *TxPool) Add(tx *types.Transaction) error {
  149. self.mu.Lock()
  150. defer self.mu.Unlock()
  151. return self.add(tx)
  152. }
  153. // AddTransactions attempts to queue all valid transactions in txs.
  154. func (self *TxPool) AddTransactions(txs []*types.Transaction) {
  155. self.mu.Lock()
  156. defer self.mu.Unlock()
  157. for _, tx := range txs {
  158. if err := self.add(tx); err != nil {
  159. glog.V(logger.Debug).Infoln("tx error:", err)
  160. } else {
  161. h := tx.Hash()
  162. glog.V(logger.Debug).Infof("tx %x\n", h[:4])
  163. }
  164. }
  165. }
  166. // GetTransaction returns a transaction if it is contained in the pool
  167. // and nil otherwise.
  168. func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
  169. // check the txs first
  170. if tx, ok := tp.pending[hash]; ok {
  171. return tx
  172. }
  173. // check queue
  174. for _, txs := range tp.queue {
  175. if tx, ok := txs[hash]; ok {
  176. return tx
  177. }
  178. }
  179. return nil
  180. }
  181. // GetTransactions returns all currently processable transactions.
  182. func (self *TxPool) GetTransactions() (txs types.Transactions) {
  183. self.mu.Lock()
  184. defer self.mu.Unlock()
  185. // check queue first
  186. self.checkQueue()
  187. // invalidate any txs
  188. self.validatePool()
  189. txs = make(types.Transactions, len(self.pending))
  190. i := 0
  191. for _, tx := range self.pending {
  192. txs[i] = tx
  193. i++
  194. }
  195. return txs
  196. }
  197. // GetQueuedTransactions returns all non-processable transactions.
  198. func (self *TxPool) GetQueuedTransactions() types.Transactions {
  199. self.mu.RLock()
  200. defer self.mu.RUnlock()
  201. var ret types.Transactions
  202. for _, txs := range self.queue {
  203. for _, tx := range txs {
  204. ret = append(ret, tx)
  205. }
  206. }
  207. sort.Sort(types.TxByNonce{ret})
  208. return ret
  209. }
  210. // RemoveTransactions removes all given transactions from the pool.
  211. func (self *TxPool) RemoveTransactions(txs types.Transactions) {
  212. self.mu.Lock()
  213. defer self.mu.Unlock()
  214. for _, tx := range txs {
  215. self.removeTx(tx.Hash())
  216. }
  217. }
  218. func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
  219. from, _ := tx.From() // already validated
  220. if self.queue[from] == nil {
  221. self.queue[from] = make(map[common.Hash]*types.Transaction)
  222. }
  223. self.queue[from][hash] = tx
  224. }
  225. func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
  226. if _, ok := pool.pending[hash]; !ok {
  227. pool.pending[hash] = tx
  228. pool.state.SetNonce(addr, tx.AccountNonce)
  229. // Notify the subscribers. This event is posted in a goroutine
  230. // because it's possible that somewhere during the post "Remove transaction"
  231. // gets called which will then wait for the global tx pool lock and deadlock.
  232. go pool.eventMux.Post(TxPreEvent{tx})
  233. }
  234. }
  235. // checkQueue moves transactions that have become processable to main pool.
  236. func (pool *TxPool) checkQueue() {
  237. state := pool.state
  238. var addq txQueue
  239. for address, txs := range pool.queue {
  240. curnonce := state.GetNonce(address)
  241. addq := addq[:0]
  242. for hash, tx := range txs {
  243. if tx.AccountNonce < curnonce {
  244. // Drop queued transactions whose nonce is lower than
  245. // the account nonce because they have been processed.
  246. delete(txs, hash)
  247. } else {
  248. // Collect the remaining transactions for the next pass.
  249. addq = append(addq, txQueueEntry{hash, address, tx})
  250. }
  251. }
  252. // Find the next consecutive nonce range starting at the
  253. // current account nonce.
  254. sort.Sort(addq)
  255. for _, e := range addq {
  256. if e.AccountNonce > curnonce+1 {
  257. break
  258. }
  259. delete(txs, e.hash)
  260. pool.addTx(e.hash, address, e.Transaction)
  261. }
  262. // Delete the entire queue entry if it became empty.
  263. if len(txs) == 0 {
  264. delete(pool.queue, address)
  265. }
  266. }
  267. }
  268. func (pool *TxPool) removeTx(hash common.Hash) {
  269. // delete from pending pool
  270. delete(pool.pending, hash)
  271. // delete from queue
  272. for address, txs := range pool.queue {
  273. if _, ok := txs[hash]; ok {
  274. if len(txs) == 1 {
  275. // if only one tx, remove entire address entry.
  276. delete(pool.queue, address)
  277. } else {
  278. delete(txs, hash)
  279. }
  280. break
  281. }
  282. }
  283. }
  284. // validatePool removes invalid and processed transactions from the main pool.
  285. func (pool *TxPool) validatePool() {
  286. for hash, tx := range pool.pending {
  287. if err := pool.validateTx(tx); err != nil {
  288. if glog.V(logger.Core) {
  289. glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
  290. }
  291. delete(pool.pending, hash)
  292. }
  293. }
  294. }
  295. type txQueue []txQueueEntry
  296. type txQueueEntry struct {
  297. hash common.Hash
  298. addr common.Address
  299. *types.Transaction
  300. }
  301. func (q txQueue) Len() int { return len(q) }
  302. func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
  303. func (q txQueue) Less(i, j int) bool { return q[i].AccountNonce < q[j].AccountNonce }