transaction_pool.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package core
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/big"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/ethereum/go-ethereum/common"
  10. "github.com/ethereum/go-ethereum/core/state"
  11. "github.com/ethereum/go-ethereum/core/types"
  12. "github.com/ethereum/go-ethereum/event"
  13. "github.com/ethereum/go-ethereum/logger"
  14. "github.com/ethereum/go-ethereum/logger/glog"
  15. "gopkg.in/fatih/set.v0"
  16. )
  17. var (
  18. ErrInvalidSender = errors.New("Invalid sender")
  19. ErrNonce = errors.New("Nonce too low")
  20. ErrNonExistentAccount = errors.New("Account does not exist")
  21. ErrInsufficientFunds = errors.New("Insufficient funds")
  22. ErrIntrinsicGas = errors.New("Intrinsic gas too low")
  23. ErrGasLimit = errors.New("Exceeds block gas limit")
  24. )
  25. const txPoolQueueSize = 50
  26. type TxPoolHook chan *types.Transaction
  27. type TxMsg struct{ Tx *types.Transaction }
  28. type stateFn func() *state.StateDB
  29. const (
  30. minGasPrice = 1000000
  31. )
  32. type TxProcessor interface {
  33. ProcessTransaction(tx *types.Transaction)
  34. }
  35. // The tx pool a thread safe transaction pool handler. In order to
  36. // guarantee a non blocking pool we use a queue channel which can be
  37. // independently read without needing access to the actual pool.
  38. type TxPool struct {
  39. mu sync.RWMutex
  40. // Queueing channel for reading and writing incoming
  41. // transactions to
  42. queueChan chan *types.Transaction
  43. // Quiting channel
  44. quit chan bool
  45. // The state function which will allow us to do some pre checkes
  46. currentState stateFn
  47. // The current gas limit function callback
  48. gasLimit func() *big.Int
  49. // The actual pool
  50. txs map[common.Hash]*types.Transaction
  51. invalidHashes *set.Set
  52. queue map[common.Address]types.Transactions
  53. subscribers []chan TxMsg
  54. eventMux *event.TypeMux
  55. }
  56. func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
  57. txPool := &TxPool{
  58. txs: make(map[common.Hash]*types.Transaction),
  59. queue: make(map[common.Address]types.Transactions),
  60. queueChan: make(chan *types.Transaction, txPoolQueueSize),
  61. quit: make(chan bool),
  62. eventMux: eventMux,
  63. invalidHashes: set.New(),
  64. currentState: currentStateFn,
  65. gasLimit: gasLimitFn,
  66. }
  67. return txPool
  68. }
  69. func (pool *TxPool) Start() {
  70. // Queue timer will tick so we can attempt to move items from the queue to the
  71. // main transaction pool.
  72. queueTimer := time.NewTicker(300 * time.Millisecond)
  73. // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
  74. removalTimer := time.NewTicker(1 * time.Second)
  75. done:
  76. for {
  77. select {
  78. case <-queueTimer.C:
  79. pool.checkQueue()
  80. case <-removalTimer.C:
  81. pool.validatePool()
  82. case <-pool.quit:
  83. break done
  84. }
  85. }
  86. }
  87. func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
  88. // Validate sender
  89. var (
  90. from common.Address
  91. err error
  92. )
  93. if from, err = tx.From(); err != nil {
  94. return ErrInvalidSender
  95. }
  96. // Validate curve param
  97. v, _, _ := tx.Curve()
  98. if v > 28 || v < 27 {
  99. return fmt.Errorf("tx.v != (28 || 27) => %v", v)
  100. }
  101. if !pool.currentState().HasAccount(from) {
  102. return ErrNonExistentAccount
  103. }
  104. if pool.gasLimit().Cmp(tx.GasLimit) < 0 {
  105. return ErrGasLimit
  106. }
  107. if pool.currentState().GetBalance(from).Cmp(new(big.Int).Mul(tx.Price, tx.GasLimit)) < 0 {
  108. return ErrInsufficientFunds
  109. }
  110. if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
  111. return ErrIntrinsicGas
  112. }
  113. if pool.currentState().GetNonce(from) > tx.Nonce() {
  114. return ErrNonce
  115. }
  116. return nil
  117. }
  118. func (self *TxPool) add(tx *types.Transaction) error {
  119. hash := tx.Hash()
  120. /* XXX I'm unsure about this. This is extremely dangerous and may result
  121. in total black listing of certain transactions
  122. if self.invalidHashes.Has(hash) {
  123. return fmt.Errorf("Invalid transaction (%x)", hash[:4])
  124. }
  125. */
  126. if self.txs[hash] != nil {
  127. return fmt.Errorf("Known transaction (%x)", hash[:4])
  128. }
  129. err := self.ValidateTransaction(tx)
  130. if err != nil {
  131. return err
  132. }
  133. self.queueTx(tx)
  134. var toname string
  135. if to := tx.To(); to != nil {
  136. toname = common.Bytes2Hex(to[:4])
  137. } else {
  138. toname = "[NEW_CONTRACT]"
  139. }
  140. // we can ignore the error here because From is
  141. // verified in ValidateTransaction.
  142. f, _ := tx.From()
  143. from := common.Bytes2Hex(f[:4])
  144. if glog.V(logger.Debug) {
  145. glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
  146. }
  147. return nil
  148. }
  149. func (self *TxPool) Size() int {
  150. return len(self.txs)
  151. }
  152. func (self *TxPool) Add(tx *types.Transaction) error {
  153. self.mu.Lock()
  154. defer self.mu.Unlock()
  155. return self.add(tx)
  156. }
  157. func (self *TxPool) AddTransactions(txs []*types.Transaction) {
  158. self.mu.Lock()
  159. defer self.mu.Unlock()
  160. for _, tx := range txs {
  161. if err := self.add(tx); err != nil {
  162. glog.V(logger.Debug).Infoln(err)
  163. } else {
  164. h := tx.Hash()
  165. glog.V(logger.Debug).Infof("tx %x\n", h[:4])
  166. }
  167. }
  168. }
  169. func (self *TxPool) GetTransactions() (txs types.Transactions) {
  170. self.mu.RLock()
  171. defer self.mu.RUnlock()
  172. txs = make(types.Transactions, self.Size())
  173. i := 0
  174. for _, tx := range self.txs {
  175. txs[i] = tx
  176. i++
  177. }
  178. return
  179. }
  180. func (self *TxPool) GetQueuedTransactions() types.Transactions {
  181. self.mu.RLock()
  182. defer self.mu.RUnlock()
  183. var txs types.Transactions
  184. for _, ts := range self.queue {
  185. txs = append(txs, ts...)
  186. }
  187. return txs
  188. }
  189. func (self *TxPool) RemoveTransactions(txs types.Transactions) {
  190. self.mu.Lock()
  191. defer self.mu.Unlock()
  192. for _, tx := range txs {
  193. delete(self.txs, tx.Hash())
  194. }
  195. }
  196. func (pool *TxPool) Flush() {
  197. pool.txs = make(map[common.Hash]*types.Transaction)
  198. }
  199. func (pool *TxPool) Stop() {
  200. pool.Flush()
  201. close(pool.quit)
  202. glog.V(logger.Info).Infoln("TX Pool stopped")
  203. }
  204. func (self *TxPool) queueTx(tx *types.Transaction) {
  205. from, _ := tx.From()
  206. self.queue[from] = append(self.queue[from], tx)
  207. }
  208. func (pool *TxPool) addTx(tx *types.Transaction) {
  209. if _, ok := pool.txs[tx.Hash()]; !ok {
  210. pool.txs[tx.Hash()] = tx
  211. // Notify the subscribers. This event is posted in a goroutine
  212. // because it's possible that somewhere during the post "Remove transaction"
  213. // gets called which will then wait for the global tx pool lock and deadlock.
  214. go pool.eventMux.Post(TxPreEvent{tx})
  215. }
  216. }
  217. // check queue will attempt to insert
  218. func (pool *TxPool) checkQueue() {
  219. pool.mu.Lock()
  220. defer pool.mu.Unlock()
  221. statedb := pool.currentState()
  222. for address, txs := range pool.queue {
  223. sort.Sort(types.TxByNonce{txs})
  224. var (
  225. nonce = statedb.GetNonce(address)
  226. start int
  227. )
  228. // Clean up the transactions first and determine the start of the nonces
  229. for _, tx := range txs {
  230. if tx.Nonce() >= nonce {
  231. break
  232. }
  233. start++
  234. }
  235. pool.queue[address] = txs[start:]
  236. // expected nonce
  237. enonce := nonce
  238. for _, tx := range pool.queue[address] {
  239. // If the expected nonce does not match up with the next one
  240. // (i.e. a nonce gap), we stop the loop
  241. if enonce != tx.Nonce() {
  242. break
  243. }
  244. enonce++
  245. pool.addTx(tx)
  246. }
  247. //pool.queue[address] = txs[i:]
  248. // delete the entire queue entry if it's empty. There's no need to keep it
  249. if len(pool.queue[address]) == 0 {
  250. delete(pool.queue, address)
  251. }
  252. }
  253. }
  254. func (pool *TxPool) validatePool() {
  255. pool.mu.Lock()
  256. defer pool.mu.Unlock()
  257. statedb := pool.currentState()
  258. for hash, tx := range pool.txs {
  259. from, _ := tx.From()
  260. if nonce := statedb.GetNonce(from); nonce > tx.Nonce() {
  261. if glog.V(logger.Debug) {
  262. glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce())
  263. }
  264. delete(pool.txs, hash)
  265. }
  266. }
  267. }