transaction_pool.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package core
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/big"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/ethereum/go-ethereum/common"
  10. "github.com/ethereum/go-ethereum/core/state"
  11. "github.com/ethereum/go-ethereum/core/types"
  12. "github.com/ethereum/go-ethereum/event"
  13. "github.com/ethereum/go-ethereum/logger"
  14. "github.com/ethereum/go-ethereum/logger/glog"
  15. "gopkg.in/fatih/set.v0"
  16. )
  17. var (
  18. ErrInvalidSender = errors.New("Invalid sender")
  19. ErrNonce = errors.New("Nonce too low")
  20. ErrBalance = errors.New("Insufficient balance")
  21. ErrNonExistentAccount = errors.New("Account does not exist")
  22. ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
  23. ErrIntrinsicGas = errors.New("Intrinsic gas too low")
  24. ErrGasLimit = errors.New("Exceeds block gas limit")
  25. )
  26. const txPoolQueueSize = 50
  27. type TxPoolHook chan *types.Transaction
  28. type TxMsg struct{ Tx *types.Transaction }
  29. type stateFn func() *state.StateDB
  30. const (
  31. minGasPrice = 1000000
  32. )
  33. type TxProcessor interface {
  34. ProcessTransaction(tx *types.Transaction)
  35. }
  36. // The tx pool a thread safe transaction pool handler. In order to
  37. // guarantee a non blocking pool we use a queue channel which can be
  38. // independently read without needing access to the actual pool.
  39. type TxPool struct {
  40. mu sync.RWMutex
  41. // Queueing channel for reading and writing incoming
  42. // transactions to
  43. queueChan chan *types.Transaction
  44. // Quiting channel
  45. quit chan bool
  46. // The state function which will allow us to do some pre checkes
  47. currentState stateFn
  48. // The current gas limit function callback
  49. gasLimit func() *big.Int
  50. // The actual pool
  51. txs map[common.Hash]*types.Transaction
  52. invalidHashes *set.Set
  53. queue map[common.Address]types.Transactions
  54. subscribers []chan TxMsg
  55. eventMux *event.TypeMux
  56. }
  57. func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
  58. txPool := &TxPool{
  59. txs: make(map[common.Hash]*types.Transaction),
  60. queue: make(map[common.Address]types.Transactions),
  61. queueChan: make(chan *types.Transaction, txPoolQueueSize),
  62. quit: make(chan bool),
  63. eventMux: eventMux,
  64. invalidHashes: set.New(),
  65. currentState: currentStateFn,
  66. gasLimit: gasLimitFn,
  67. }
  68. return txPool
  69. }
  70. func (pool *TxPool) Start() {
  71. // Queue timer will tick so we can attempt to move items from the queue to the
  72. // main transaction pool.
  73. queueTimer := time.NewTicker(300 * time.Millisecond)
  74. // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
  75. removalTimer := time.NewTicker(1 * time.Second)
  76. done:
  77. for {
  78. select {
  79. case <-queueTimer.C:
  80. pool.checkQueue()
  81. case <-removalTimer.C:
  82. pool.validatePool()
  83. case <-pool.quit:
  84. break done
  85. }
  86. }
  87. }
  88. func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
  89. // Validate sender
  90. var (
  91. from common.Address
  92. err error
  93. )
  94. if from, err = tx.From(); err != nil {
  95. return ErrInvalidSender
  96. }
  97. // Validate curve param
  98. v, _, _ := tx.Curve()
  99. if v > 28 || v < 27 {
  100. return fmt.Errorf("tx.v != (28 || 27) => %v", v)
  101. }
  102. if !pool.currentState().HasAccount(from) {
  103. return ErrNonExistentAccount
  104. }
  105. if pool.gasLimit().Cmp(tx.GasLimit) < 0 {
  106. return ErrGasLimit
  107. }
  108. total := new(big.Int).Mul(tx.Price, tx.GasLimit)
  109. total.Add(total, tx.Value())
  110. if pool.currentState().GetBalance(from).Cmp(total) < 0 {
  111. return ErrInsufficientFunds
  112. }
  113. if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
  114. return ErrIntrinsicGas
  115. }
  116. if pool.currentState().GetNonce(from) > tx.Nonce() {
  117. return ErrNonce
  118. }
  119. return nil
  120. }
  121. func (self *TxPool) add(tx *types.Transaction) error {
  122. hash := tx.Hash()
  123. /* XXX I'm unsure about this. This is extremely dangerous and may result
  124. in total black listing of certain transactions
  125. if self.invalidHashes.Has(hash) {
  126. return fmt.Errorf("Invalid transaction (%x)", hash[:4])
  127. }
  128. */
  129. if self.txs[hash] != nil {
  130. return fmt.Errorf("Known transaction (%x)", hash[:4])
  131. }
  132. err := self.ValidateTransaction(tx)
  133. if err != nil {
  134. return err
  135. }
  136. self.queueTx(tx)
  137. var toname string
  138. if to := tx.To(); to != nil {
  139. toname = common.Bytes2Hex(to[:4])
  140. } else {
  141. toname = "[NEW_CONTRACT]"
  142. }
  143. // we can ignore the error here because From is
  144. // verified in ValidateTransaction.
  145. f, _ := tx.From()
  146. from := common.Bytes2Hex(f[:4])
  147. if glog.V(logger.Debug) {
  148. glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
  149. }
  150. return nil
  151. }
  152. func (self *TxPool) Size() int {
  153. return len(self.txs)
  154. }
  155. func (self *TxPool) Add(tx *types.Transaction) error {
  156. self.mu.Lock()
  157. defer self.mu.Unlock()
  158. return self.add(tx)
  159. }
  160. func (self *TxPool) AddTransactions(txs []*types.Transaction) {
  161. self.mu.Lock()
  162. defer self.mu.Unlock()
  163. for _, tx := range txs {
  164. if err := self.add(tx); err != nil {
  165. glog.V(logger.Debug).Infoln("tx error:", err)
  166. } else {
  167. h := tx.Hash()
  168. glog.V(logger.Debug).Infof("tx %x\n", h[:4])
  169. }
  170. }
  171. }
  172. func (self *TxPool) GetTransactions() (txs types.Transactions) {
  173. self.mu.RLock()
  174. defer self.mu.RUnlock()
  175. txs = make(types.Transactions, self.Size())
  176. i := 0
  177. for _, tx := range self.txs {
  178. txs[i] = tx
  179. i++
  180. }
  181. return
  182. }
  183. func (self *TxPool) GetQueuedTransactions() types.Transactions {
  184. self.mu.RLock()
  185. defer self.mu.RUnlock()
  186. var txs types.Transactions
  187. for _, ts := range self.queue {
  188. txs = append(txs, ts...)
  189. }
  190. return txs
  191. }
  192. func (self *TxPool) RemoveTransactions(txs types.Transactions) {
  193. self.mu.Lock()
  194. defer self.mu.Unlock()
  195. for _, tx := range txs {
  196. delete(self.txs, tx.Hash())
  197. }
  198. }
  199. func (pool *TxPool) Flush() {
  200. pool.txs = make(map[common.Hash]*types.Transaction)
  201. }
  202. func (pool *TxPool) Stop() {
  203. pool.Flush()
  204. close(pool.quit)
  205. glog.V(logger.Info).Infoln("TX Pool stopped")
  206. }
  207. func (self *TxPool) queueTx(tx *types.Transaction) {
  208. from, _ := tx.From()
  209. self.queue[from] = append(self.queue[from], tx)
  210. }
  211. func (pool *TxPool) addTx(tx *types.Transaction) {
  212. if _, ok := pool.txs[tx.Hash()]; !ok {
  213. pool.txs[tx.Hash()] = tx
  214. // Notify the subscribers. This event is posted in a goroutine
  215. // because it's possible that somewhere during the post "Remove transaction"
  216. // gets called which will then wait for the global tx pool lock and deadlock.
  217. go pool.eventMux.Post(TxPreEvent{tx})
  218. }
  219. }
  220. // check queue will attempt to insert
  221. func (pool *TxPool) checkQueue() {
  222. pool.mu.Lock()
  223. defer pool.mu.Unlock()
  224. statedb := pool.currentState()
  225. for address, txs := range pool.queue {
  226. sort.Sort(types.TxByNonce{txs})
  227. var (
  228. nonce = statedb.GetNonce(address)
  229. start int
  230. )
  231. // Clean up the transactions first and determine the start of the nonces
  232. for _, tx := range txs {
  233. if tx.Nonce() >= nonce {
  234. break
  235. }
  236. start++
  237. }
  238. pool.queue[address] = txs[start:]
  239. // expected nonce
  240. enonce := nonce
  241. for _, tx := range pool.queue[address] {
  242. // If the expected nonce does not match up with the next one
  243. // (i.e. a nonce gap), we stop the loop
  244. if enonce != tx.Nonce() {
  245. break
  246. }
  247. enonce++
  248. pool.addTx(tx)
  249. }
  250. // delete the entire queue entry if it's empty. There's no need to keep it
  251. if len(pool.queue[address]) == 0 {
  252. delete(pool.queue, address)
  253. }
  254. }
  255. }
  256. func (pool *TxPool) removeTx(hash common.Hash) {
  257. // delete from pending pool
  258. delete(pool.txs, hash)
  259. // delete from queue
  260. out:
  261. for address, txs := range pool.queue {
  262. for i, tx := range txs {
  263. if tx.Hash() == hash {
  264. if len(txs) == 1 {
  265. // if only one tx, remove entire address entry
  266. delete(pool.queue, address)
  267. } else {
  268. pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...)
  269. }
  270. break out
  271. }
  272. }
  273. }
  274. }
  275. func (pool *TxPool) validatePool() {
  276. pool.mu.Lock()
  277. defer pool.mu.Unlock()
  278. for hash, tx := range pool.txs {
  279. if err := pool.ValidateTransaction(tx); err != nil {
  280. if glog.V(logger.Info) {
  281. glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
  282. }
  283. pool.removeTx(hash)
  284. }
  285. }
  286. }