transaction_pool.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package core
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "github.com/ethereum/go-ethereum/core/types"
  7. "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/event"
  9. "github.com/ethereum/go-ethereum/logger"
  10. )
  11. var (
  12. txplogger = logger.NewLogger("TXP")
  13. ErrInvalidSender = errors.New("Invalid sender")
  14. )
  15. const txPoolQueueSize = 50
  16. type TxPoolHook chan *types.Transaction
  17. type TxMsg struct {
  18. Tx *types.Transaction
  19. }
  20. const (
  21. minGasPrice = 1000000
  22. )
  23. type TxProcessor interface {
  24. ProcessTransaction(tx *types.Transaction)
  25. }
  26. // The tx pool a thread safe transaction pool handler. In order to
  27. // guarantee a non blocking pool we use a queue channel which can be
  28. // independently read without needing access to the actual pool.
  29. type TxPool struct {
  30. mu sync.RWMutex
  31. // Queueing channel for reading and writing incoming
  32. // transactions to
  33. queueChan chan *types.Transaction
  34. // Quiting channel
  35. quit chan bool
  36. // The actual pool
  37. //pool *list.List
  38. txs map[string]*types.Transaction
  39. SecondaryProcessor TxProcessor
  40. subscribers []chan TxMsg
  41. eventMux *event.TypeMux
  42. }
  43. func NewTxPool(eventMux *event.TypeMux) *TxPool {
  44. return &TxPool{
  45. txs: make(map[string]*types.Transaction),
  46. queueChan: make(chan *types.Transaction, txPoolQueueSize),
  47. quit: make(chan bool),
  48. eventMux: eventMux,
  49. }
  50. }
  51. func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
  52. if len(tx.To()) != 0 && len(tx.To()) != 20 {
  53. return fmt.Errorf("Invalid recipient. len = %d", len(tx.To()))
  54. }
  55. // Validate curve param
  56. v, _, _ := tx.Curve()
  57. if v > 28 || v < 27 {
  58. return fmt.Errorf("tx.v != (28 || 27) => %v", v)
  59. }
  60. // Validate sender address
  61. senderAddr := tx.From()
  62. if senderAddr == nil || len(senderAddr) != 20 {
  63. return ErrInvalidSender
  64. }
  65. /* XXX this kind of validation needs to happen elsewhere in the gui when sending txs.
  66. Other clients should do their own validation. Value transfer could throw error
  67. but doesn't necessarily invalidate the tx. Gas can still be payed for and miner
  68. can still be rewarded for their inclusion and processing.
  69. sender := pool.stateQuery.GetAccount(senderAddr)
  70. totAmount := new(big.Int).Set(tx.Value())
  71. // Make sure there's enough in the sender's account. Having insufficient
  72. // funds won't invalidate this transaction but simple ignores it.
  73. if sender.Balance().Cmp(totAmount) < 0 {
  74. return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From())
  75. }
  76. */
  77. return nil
  78. }
  79. func (self *TxPool) addTx(tx *types.Transaction) {
  80. self.txs[string(tx.Hash())] = tx
  81. }
  82. func (self *TxPool) add(tx *types.Transaction) error {
  83. if self.txs[string(tx.Hash())] != nil {
  84. return fmt.Errorf("Known transaction (%x)", tx.Hash()[0:4])
  85. }
  86. err := self.ValidateTransaction(tx)
  87. if err != nil {
  88. return err
  89. }
  90. self.addTx(tx)
  91. var to string
  92. if len(tx.To()) > 0 {
  93. to = common.Bytes2Hex(tx.To()[:4])
  94. } else {
  95. to = "[NEW_CONTRACT]"
  96. }
  97. var from string
  98. if len(tx.From()) > 0 {
  99. from = common.Bytes2Hex(tx.From()[:4])
  100. } else {
  101. return errors.New(fmt.Sprintf("FROM ADDRESS MUST BE POSITIVE (was %v)", tx.From()))
  102. }
  103. txplogger.Debugf("(t) %x => %s (%v) %x\n", from, to, tx.Value, tx.Hash())
  104. // Notify the subscribers
  105. go self.eventMux.Post(TxPreEvent{tx})
  106. return nil
  107. }
  108. func (self *TxPool) Size() int {
  109. return len(self.txs)
  110. }
  111. func (self *TxPool) Add(tx *types.Transaction) error {
  112. self.mu.Lock()
  113. defer self.mu.Unlock()
  114. return self.add(tx)
  115. }
  116. func (self *TxPool) AddTransactions(txs []*types.Transaction) {
  117. self.mu.Lock()
  118. defer self.mu.Unlock()
  119. for _, tx := range txs {
  120. if err := self.add(tx); err != nil {
  121. txplogger.Debugln(err)
  122. } else {
  123. txplogger.Debugf("tx %x\n", tx.Hash()[0:4])
  124. }
  125. }
  126. }
  127. func (self *TxPool) GetTransactions() (txs types.Transactions) {
  128. self.mu.RLock()
  129. defer self.mu.RUnlock()
  130. txs = make(types.Transactions, self.Size())
  131. i := 0
  132. for _, tx := range self.txs {
  133. txs[i] = tx
  134. i++
  135. }
  136. return
  137. }
  138. func (pool *TxPool) RemoveInvalid(query StateQuery) {
  139. pool.mu.Lock()
  140. var removedTxs types.Transactions
  141. for _, tx := range pool.txs {
  142. sender := query.GetAccount(tx.From())
  143. err := pool.ValidateTransaction(tx)
  144. if err != nil || sender.Nonce() >= tx.Nonce() {
  145. removedTxs = append(removedTxs, tx)
  146. }
  147. }
  148. pool.mu.Unlock()
  149. pool.RemoveSet(removedTxs)
  150. }
  151. func (self *TxPool) RemoveSet(txs types.Transactions) {
  152. self.mu.Lock()
  153. defer self.mu.Unlock()
  154. for _, tx := range txs {
  155. delete(self.txs, string(tx.Hash()))
  156. }
  157. }
  158. func (pool *TxPool) Flush() {
  159. pool.txs = make(map[string]*types.Transaction)
  160. }
  161. func (pool *TxPool) Start() {
  162. }
  163. func (pool *TxPool) Stop() {
  164. pool.Flush()
  165. txplogger.Infoln("Stopped")
  166. }