transaction_pool.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package core
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/big"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/ethereum/go-ethereum/common"
  10. "github.com/ethereum/go-ethereum/core/state"
  11. "github.com/ethereum/go-ethereum/core/types"
  12. "github.com/ethereum/go-ethereum/event"
  13. "github.com/ethereum/go-ethereum/logger"
  14. "github.com/ethereum/go-ethereum/logger/glog"
  15. "gopkg.in/fatih/set.v0"
  16. )
  17. var (
  18. ErrInvalidSender = errors.New("Invalid sender")
  19. ErrNonce = errors.New("Nonce too low")
  20. ErrBalance = errors.New("Insufficient balance")
  21. ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
  22. ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
  23. ErrIntrinsicGas = errors.New("Intrinsic gas too low")
  24. ErrGasLimit = errors.New("Exceeds block gas limit")
  25. ErrNegativeValue = errors.New("Negative value")
  26. )
  27. const txPoolQueueSize = 50
  28. type TxPoolHook chan *types.Transaction
  29. type TxMsg struct{ Tx *types.Transaction }
  30. type stateFn func() *state.StateDB
  31. const (
  32. minGasPrice = 1000000
  33. )
  34. type TxProcessor interface {
  35. ProcessTransaction(tx *types.Transaction)
  36. }
  37. // The tx pool a thread safe transaction pool handler. In order to
  38. // guarantee a non blocking pool we use a queue channel which can be
  39. // independently read without needing access to the actual pool.
  40. type TxPool struct {
  41. mu sync.RWMutex
  42. // Queueing channel for reading and writing incoming
  43. // transactions to
  44. queueChan chan *types.Transaction
  45. // Quiting channel
  46. quit chan bool
  47. // The state function which will allow us to do some pre checkes
  48. currentState stateFn
  49. // The current gas limit function callback
  50. gasLimit func() *big.Int
  51. // The actual pool
  52. txs map[common.Hash]*types.Transaction
  53. invalidHashes *set.Set
  54. queue map[common.Address]types.Transactions
  55. subscribers []chan TxMsg
  56. eventMux *event.TypeMux
  57. }
  58. func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
  59. txPool := &TxPool{
  60. txs: make(map[common.Hash]*types.Transaction),
  61. queue: make(map[common.Address]types.Transactions),
  62. queueChan: make(chan *types.Transaction, txPoolQueueSize),
  63. quit: make(chan bool),
  64. eventMux: eventMux,
  65. invalidHashes: set.New(),
  66. currentState: currentStateFn,
  67. gasLimit: gasLimitFn,
  68. }
  69. return txPool
  70. }
  71. func (pool *TxPool) Start() {
  72. // Queue timer will tick so we can attempt to move items from the queue to the
  73. // main transaction pool.
  74. queueTimer := time.NewTicker(300 * time.Millisecond)
  75. // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
  76. removalTimer := time.NewTicker(1 * time.Second)
  77. done:
  78. for {
  79. select {
  80. case <-queueTimer.C:
  81. pool.checkQueue()
  82. case <-removalTimer.C:
  83. pool.validatePool()
  84. case <-pool.quit:
  85. break done
  86. }
  87. }
  88. }
  89. func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
  90. // Validate sender
  91. var (
  92. from common.Address
  93. err error
  94. )
  95. if from, err = tx.From(); err != nil {
  96. return ErrInvalidSender
  97. }
  98. // Validate curve param
  99. v, _, _ := tx.Curve()
  100. if v > 28 || v < 27 {
  101. return fmt.Errorf("tx.v != (28 || 27) => %v", v)
  102. }
  103. if !pool.currentState().HasAccount(from) {
  104. return ErrNonExistentAccount
  105. }
  106. if pool.gasLimit().Cmp(tx.GasLimit) < 0 {
  107. return ErrGasLimit
  108. }
  109. if tx.Amount.Cmp(common.Big0) < 0 {
  110. return ErrNegativeValue
  111. }
  112. total := new(big.Int).Mul(tx.Price, tx.GasLimit)
  113. total.Add(total, tx.Value())
  114. if pool.currentState().GetBalance(from).Cmp(total) < 0 {
  115. return ErrInsufficientFunds
  116. }
  117. if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
  118. return ErrIntrinsicGas
  119. }
  120. if pool.currentState().GetNonce(from) > tx.Nonce() {
  121. return ErrNonce
  122. }
  123. return nil
  124. }
  125. func (self *TxPool) add(tx *types.Transaction) error {
  126. hash := tx.Hash()
  127. /* XXX I'm unsure about this. This is extremely dangerous and may result
  128. in total black listing of certain transactions
  129. if self.invalidHashes.Has(hash) {
  130. return fmt.Errorf("Invalid transaction (%x)", hash[:4])
  131. }
  132. */
  133. if self.txs[hash] != nil {
  134. return fmt.Errorf("Known transaction (%x)", hash[:4])
  135. }
  136. err := self.ValidateTransaction(tx)
  137. if err != nil {
  138. return err
  139. }
  140. self.queueTx(tx)
  141. var toname string
  142. if to := tx.To(); to != nil {
  143. toname = common.Bytes2Hex(to[:4])
  144. } else {
  145. toname = "[NEW_CONTRACT]"
  146. }
  147. // we can ignore the error here because From is
  148. // verified in ValidateTransaction.
  149. f, _ := tx.From()
  150. from := common.Bytes2Hex(f[:4])
  151. if glog.V(logger.Debug) {
  152. glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
  153. }
  154. return nil
  155. }
  156. func (self *TxPool) Size() int {
  157. return len(self.txs)
  158. }
  159. func (self *TxPool) Add(tx *types.Transaction) error {
  160. self.mu.Lock()
  161. defer self.mu.Unlock()
  162. return self.add(tx)
  163. }
  164. func (self *TxPool) AddTransactions(txs []*types.Transaction) {
  165. self.mu.Lock()
  166. defer self.mu.Unlock()
  167. for _, tx := range txs {
  168. if err := self.add(tx); err != nil {
  169. glog.V(logger.Debug).Infoln("tx error:", err)
  170. } else {
  171. h := tx.Hash()
  172. glog.V(logger.Debug).Infof("tx %x\n", h[:4])
  173. }
  174. }
  175. }
  176. // GetTransaction allows you to check the pending and queued transaction in the
  177. // transaction pool.
  178. // It has two stategies, first check the pool (map) then check the queue
  179. func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
  180. // check the txs first
  181. if tx, ok := tp.txs[hash]; ok {
  182. return tx
  183. }
  184. // check queue
  185. for _, txs := range tp.queue {
  186. for _, tx := range txs {
  187. if tx.Hash() == hash {
  188. return tx
  189. }
  190. }
  191. }
  192. return nil
  193. }
  194. func (self *TxPool) GetTransactions() (txs types.Transactions) {
  195. self.mu.RLock()
  196. defer self.mu.RUnlock()
  197. txs = make(types.Transactions, self.Size())
  198. i := 0
  199. for _, tx := range self.txs {
  200. txs[i] = tx
  201. i++
  202. }
  203. return
  204. }
  205. func (self *TxPool) GetQueuedTransactions() types.Transactions {
  206. self.mu.RLock()
  207. defer self.mu.RUnlock()
  208. var txs types.Transactions
  209. for _, ts := range self.queue {
  210. txs = append(txs, ts...)
  211. }
  212. return txs
  213. }
  214. func (self *TxPool) RemoveTransactions(txs types.Transactions) {
  215. self.mu.Lock()
  216. defer self.mu.Unlock()
  217. for _, tx := range txs {
  218. self.removeTx(tx.Hash())
  219. }
  220. }
  221. func (pool *TxPool) Flush() {
  222. pool.txs = make(map[common.Hash]*types.Transaction)
  223. }
  224. func (pool *TxPool) Stop() {
  225. pool.Flush()
  226. close(pool.quit)
  227. glog.V(logger.Info).Infoln("TX Pool stopped")
  228. }
  229. func (self *TxPool) queueTx(tx *types.Transaction) {
  230. from, _ := tx.From()
  231. self.queue[from] = append(self.queue[from], tx)
  232. }
  233. func (pool *TxPool) addTx(tx *types.Transaction) {
  234. if _, ok := pool.txs[tx.Hash()]; !ok {
  235. pool.txs[tx.Hash()] = tx
  236. // Notify the subscribers. This event is posted in a goroutine
  237. // because it's possible that somewhere during the post "Remove transaction"
  238. // gets called which will then wait for the global tx pool lock and deadlock.
  239. go pool.eventMux.Post(TxPreEvent{tx})
  240. }
  241. }
  242. // check queue will attempt to insert
  243. func (pool *TxPool) checkQueue() {
  244. pool.mu.Lock()
  245. defer pool.mu.Unlock()
  246. statedb := pool.currentState()
  247. for address, txs := range pool.queue {
  248. sort.Sort(types.TxByNonce{txs})
  249. var (
  250. nonce = statedb.GetNonce(address)
  251. start int
  252. )
  253. // Clean up the transactions first and determine the start of the nonces
  254. for _, tx := range txs {
  255. if tx.Nonce() >= nonce {
  256. break
  257. }
  258. start++
  259. }
  260. pool.queue[address] = txs[start:]
  261. // expected nonce
  262. enonce := nonce
  263. for _, tx := range pool.queue[address] {
  264. // If the expected nonce does not match up with the next one
  265. // (i.e. a nonce gap), we stop the loop
  266. if enonce != tx.Nonce() {
  267. break
  268. }
  269. enonce++
  270. pool.addTx(tx)
  271. }
  272. // delete the entire queue entry if it's empty. There's no need to keep it
  273. if len(pool.queue[address]) == 0 {
  274. delete(pool.queue, address)
  275. }
  276. }
  277. }
  278. func (pool *TxPool) removeTx(hash common.Hash) {
  279. // delete from pending pool
  280. delete(pool.txs, hash)
  281. // delete from queue
  282. out:
  283. for address, txs := range pool.queue {
  284. for i, tx := range txs {
  285. if tx.Hash() == hash {
  286. if len(txs) == 1 {
  287. // if only one tx, remove entire address entry
  288. delete(pool.queue, address)
  289. } else {
  290. pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...)
  291. }
  292. break out
  293. }
  294. }
  295. }
  296. }
  297. func (pool *TxPool) validatePool() {
  298. pool.mu.Lock()
  299. defer pool.mu.Unlock()
  300. for hash, tx := range pool.txs {
  301. if err := pool.ValidateTransaction(tx); err != nil {
  302. if glog.V(logger.Info) {
  303. glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
  304. }
  305. pool.removeTx(hash)
  306. }
  307. }
  308. }