txpool.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package light
  17. import (
  18. "fmt"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/core"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/logger"
  27. "github.com/ethereum/go-ethereum/logger/glog"
  28. "github.com/ethereum/go-ethereum/rlp"
  29. "golang.org/x/net/context"
  30. )
  31. // txPermanent is the number of mined blocks after a mined transaction is
  32. // considered permanent and no rollback is expected
  33. var txPermanent = uint64(500)
  34. // TxPool implements the transaction pool for light clients, which keeps track
  35. // of the status of locally created transactions, detecting if they are included
  36. // in a block (mined) or rolled back. There are no queued transactions since we
  37. // always receive all locally signed transactions in the same order as they are
  38. // created.
  39. type TxPool struct {
  40. config *core.ChainConfig
  41. quit chan bool
  42. eventMux *event.TypeMux
  43. events event.Subscription
  44. mu sync.RWMutex
  45. chain *LightChain
  46. odr OdrBackend
  47. chainDb ethdb.Database
  48. relay TxRelayBackend
  49. head common.Hash
  50. nonce map[common.Address]uint64 // "pending" nonce
  51. pending map[common.Hash]*types.Transaction // pending transactions by tx hash
  52. mined map[common.Hash][]*types.Transaction // mined transactions by block hash
  53. clearIdx uint64 // earliest block nr that can contain mined tx info
  54. homestead bool
  55. }
  56. // TxRelayBackend provides an interface to the mechanism that forwards transacions
  57. // to the ETH network. The implementations of the functions should be non-blocking.
  58. //
  59. // Send instructs backend to forward new transactions
  60. // NewHead notifies backend about a new head after processed by the tx pool,
  61. // including mined and rolled back transactions since the last event
  62. // Discard notifies backend about transactions that should be discarded either
  63. // because they have been replaced by a re-send or because they have been mined
  64. // long ago and no rollback is expected
  65. type TxRelayBackend interface {
  66. Send(txs types.Transactions)
  67. NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash)
  68. Discard(hashes []common.Hash)
  69. }
  70. // NewTxPool creates a new light transaction pool
  71. func NewTxPool(config *core.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool {
  72. pool := &TxPool{
  73. config: config,
  74. nonce: make(map[common.Address]uint64),
  75. pending: make(map[common.Hash]*types.Transaction),
  76. mined: make(map[common.Hash][]*types.Transaction),
  77. quit: make(chan bool),
  78. eventMux: eventMux,
  79. events: eventMux.Subscribe(core.ChainHeadEvent{}),
  80. chain: chain,
  81. relay: relay,
  82. odr: chain.Odr(),
  83. chainDb: chain.Odr().Database(),
  84. head: chain.CurrentHeader().Hash(),
  85. clearIdx: chain.CurrentHeader().Number.Uint64(),
  86. }
  87. go pool.eventLoop()
  88. return pool
  89. }
  90. // currentState returns the light state of the current head header
  91. func (pool *TxPool) currentState() *LightState {
  92. return NewLightState(StateTrieID(pool.chain.CurrentHeader()), pool.odr)
  93. }
  94. // GetNonce returns the "pending" nonce of a given address. It always queries
  95. // the nonce belonging to the latest header too in order to detect if another
  96. // client using the same key sent a transaction.
  97. func (pool *TxPool) GetNonce(ctx context.Context, addr common.Address) (uint64, error) {
  98. nonce, err := pool.currentState().GetNonce(ctx, addr)
  99. if err != nil {
  100. return 0, err
  101. }
  102. sn, ok := pool.nonce[addr]
  103. if ok && sn > nonce {
  104. nonce = sn
  105. }
  106. if !ok || sn < nonce {
  107. pool.nonce[addr] = nonce
  108. }
  109. return nonce, nil
  110. }
  111. type txBlockData struct {
  112. BlockHash common.Hash
  113. BlockIndex uint64
  114. Index uint64
  115. }
  116. // storeTxBlockData stores the block position of a mined tx in the local db
  117. func (pool *TxPool) storeTxBlockData(txh common.Hash, tbd txBlockData) {
  118. //fmt.Println("storeTxBlockData", txh, tbd)
  119. data, _ := rlp.EncodeToBytes(tbd)
  120. pool.chainDb.Put(append(txh[:], byte(1)), data)
  121. }
  122. // removeTxBlockData removes the stored block position of a rolled back tx
  123. func (pool *TxPool) removeTxBlockData(txh common.Hash) {
  124. //fmt.Println("removeTxBlockData", txh)
  125. pool.chainDb.Delete(append(txh[:], byte(1)))
  126. }
  127. // txStateChanges stores the recent changes between pending/mined states of
  128. // transactions. True means mined, false means rolled back, no entry means no change
  129. type txStateChanges map[common.Hash]bool
  130. // setState sets the status of a tx to either recently mined or recently rolled back
  131. func (txc txStateChanges) setState(txHash common.Hash, mined bool) {
  132. val, ent := txc[txHash]
  133. if ent && (val != mined) {
  134. delete(txc, txHash)
  135. } else {
  136. txc[txHash] = mined
  137. }
  138. }
  139. // getLists creates lists of mined and rolled back tx hashes
  140. func (txc txStateChanges) getLists() (mined []common.Hash, rollback []common.Hash) {
  141. for hash, val := range txc {
  142. if val {
  143. mined = append(mined, hash)
  144. } else {
  145. rollback = append(rollback, hash)
  146. }
  147. }
  148. return
  149. }
  150. // checkMinedTxs checks newly added blocks for the currently pending transactions
  151. // and marks them as mined if necessary. It also stores block position in the db
  152. // and adds them to the received txStateChanges map.
  153. func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, idx uint64, txc txStateChanges) error {
  154. //fmt.Println("checkMinedTxs")
  155. if len(pool.pending) == 0 {
  156. return nil
  157. }
  158. //fmt.Println("len(pool) =", len(pool.pending))
  159. block, err := GetBlock(ctx, pool.odr, hash, idx)
  160. var receipts types.Receipts
  161. if err != nil {
  162. //fmt.Println(err)
  163. return err
  164. }
  165. //fmt.Println("len(block.Transactions()) =", len(block.Transactions()))
  166. list := pool.mined[hash]
  167. for i, tx := range block.Transactions() {
  168. txHash := tx.Hash()
  169. //fmt.Println(" txHash:", txHash)
  170. if tx, ok := pool.pending[txHash]; ok {
  171. //fmt.Println("TX FOUND")
  172. if receipts == nil {
  173. receipts, err = GetBlockReceipts(ctx, pool.odr, hash, idx)
  174. if err != nil {
  175. return err
  176. }
  177. if len(receipts) != len(block.Transactions()) {
  178. panic(nil) // should never happen if hashes did match
  179. }
  180. core.SetReceiptsData(block, receipts)
  181. }
  182. //fmt.Println("WriteReceipt", receipts[i].TxHash)
  183. core.WriteReceipt(pool.chainDb, receipts[i])
  184. pool.storeTxBlockData(txHash, txBlockData{hash, idx, uint64(i)})
  185. delete(pool.pending, txHash)
  186. list = append(list, tx)
  187. txc.setState(txHash, true)
  188. }
  189. }
  190. if list != nil {
  191. pool.mined[hash] = list
  192. }
  193. return nil
  194. }
  195. // rollbackTxs marks the transactions contained in recently rolled back blocks
  196. // as rolled back. It also removes block position info from the db and adds them
  197. // to the received txStateChanges map.
  198. func (pool *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) {
  199. if list, ok := pool.mined[hash]; ok {
  200. for _, tx := range list {
  201. txHash := tx.Hash()
  202. pool.removeTxBlockData(txHash)
  203. pool.pending[txHash] = tx
  204. txc.setState(txHash, false)
  205. }
  206. delete(pool.mined, hash)
  207. }
  208. }
  209. // setNewHead sets a new head header, processing (and rolling back if necessary)
  210. // the blocks since the last known head and returns a txStateChanges map containing
  211. // the recently mined and rolled back transaction hashes. If an error (context
  212. // timeout) occurs during checking new blocks, it leaves the locally known head
  213. // at the latest checked block and still returns a valid txStateChanges, making it
  214. // possible to continue checking the missing blocks at the next chain head event
  215. func (pool *TxPool) setNewHead(ctx context.Context, newHeader *types.Header) (txStateChanges, error) {
  216. txc := make(txStateChanges)
  217. oldh := pool.chain.GetHeaderByHash(pool.head)
  218. newh := newHeader
  219. // find common ancestor, create list of rolled back and new block hashes
  220. var oldHashes, newHashes []common.Hash
  221. for oldh.Hash() != newh.Hash() {
  222. if oldh.Number.Uint64() >= newh.Number.Uint64() {
  223. oldHashes = append(oldHashes, oldh.Hash())
  224. oldh = pool.chain.GetHeader(oldh.ParentHash, oldh.Number.Uint64()-1)
  225. }
  226. if oldh.Number.Uint64() < newh.Number.Uint64() {
  227. newHashes = append(newHashes, newh.Hash())
  228. newh = pool.chain.GetHeader(newh.ParentHash, newh.Number.Uint64()-1)
  229. if newh == nil {
  230. // happens when CHT syncing, nothing to do
  231. newh = oldh
  232. }
  233. }
  234. }
  235. if oldh.Number.Uint64() < pool.clearIdx {
  236. pool.clearIdx = oldh.Number.Uint64()
  237. }
  238. // roll back old blocks
  239. for _, hash := range oldHashes {
  240. pool.rollbackTxs(hash, txc)
  241. }
  242. pool.head = oldh.Hash()
  243. // check mined txs of new blocks (array is in reversed order)
  244. for i := len(newHashes) - 1; i >= 0; i-- {
  245. hash := newHashes[i]
  246. if err := pool.checkMinedTxs(ctx, hash, newHeader.Number.Uint64()-uint64(i), txc); err != nil {
  247. return txc, err
  248. }
  249. pool.head = hash
  250. }
  251. // clear old mined tx entries of old blocks
  252. if idx := newHeader.Number.Uint64(); idx > pool.clearIdx+txPermanent {
  253. idx2 := idx - txPermanent
  254. for i := pool.clearIdx; i < idx2; i++ {
  255. hash := core.GetCanonicalHash(pool.chainDb, i)
  256. if list, ok := pool.mined[hash]; ok {
  257. hashes := make([]common.Hash, len(list))
  258. for i, tx := range list {
  259. hashes[i] = tx.Hash()
  260. }
  261. pool.relay.Discard(hashes)
  262. delete(pool.mined, hash)
  263. }
  264. }
  265. pool.clearIdx = idx2
  266. }
  267. return txc, nil
  268. }
  269. // blockCheckTimeout is the time limit for checking new blocks for mined
  270. // transactions. Checking resumes at the next chain head event if timed out.
  271. const blockCheckTimeout = time.Second * 3
  272. // eventLoop processes chain head events and also notifies the tx relay backend
  273. // about the new head hash and tx state changes
  274. func (pool *TxPool) eventLoop() {
  275. for ev := range pool.events.Chan() {
  276. switch ev.Data.(type) {
  277. case core.ChainHeadEvent:
  278. pool.mu.Lock()
  279. ctx, _ := context.WithTimeout(context.Background(), blockCheckTimeout)
  280. head := pool.chain.CurrentHeader()
  281. txc, _ := pool.setNewHead(ctx, head)
  282. m, r := txc.getLists()
  283. pool.relay.NewHead(pool.head, m, r)
  284. pool.homestead = pool.config.IsHomestead(head.Number)
  285. pool.mu.Unlock()
  286. }
  287. }
  288. }
  289. // Stop stops the light transaction pool
  290. func (pool *TxPool) Stop() {
  291. close(pool.quit)
  292. pool.events.Unsubscribe()
  293. glog.V(logger.Info).Infoln("Transaction pool stopped")
  294. }
  295. // Stats returns the number of currently pending (locally created) transactions
  296. func (pool *TxPool) Stats() (pending int) {
  297. pool.mu.RLock()
  298. defer pool.mu.RUnlock()
  299. pending = len(pool.pending)
  300. return
  301. }
  302. // validateTx checks whether a transaction is valid according to the consensus rules.
  303. func (pool *TxPool) validateTx(ctx context.Context, tx *types.Transaction) error {
  304. // Validate sender
  305. var (
  306. from common.Address
  307. err error
  308. )
  309. // Validate the transaction sender and it's sig. Throw
  310. // if the from fields is invalid.
  311. if from, err = tx.From(); err != nil {
  312. return core.ErrInvalidSender
  313. }
  314. // Make sure the account exist. Non existent accounts
  315. // haven't got funds and well therefor never pass.
  316. currentState := pool.currentState()
  317. if h, err := currentState.HasAccount(ctx, from); err == nil {
  318. if !h {
  319. return core.ErrNonExistentAccount
  320. }
  321. } else {
  322. return err
  323. }
  324. // Last but not least check for nonce errors
  325. if n, err := currentState.GetNonce(ctx, from); err == nil {
  326. if n > tx.Nonce() {
  327. return core.ErrNonce
  328. }
  329. } else {
  330. return err
  331. }
  332. // Check the transaction doesn't exceed the current
  333. // block limit gas.
  334. header := pool.chain.GetHeaderByHash(pool.head)
  335. if header.GasLimit.Cmp(tx.Gas()) < 0 {
  336. return core.ErrGasLimit
  337. }
  338. // Transactions can't be negative. This may never happen
  339. // using RLP decoded transactions but may occur if you create
  340. // a transaction using the RPC for example.
  341. if tx.Value().Cmp(common.Big0) < 0 {
  342. return core.ErrNegativeValue
  343. }
  344. // Transactor should have enough funds to cover the costs
  345. // cost == V + GP * GL
  346. if b, err := currentState.GetBalance(ctx, from); err == nil {
  347. if b.Cmp(tx.Cost()) < 0 {
  348. return core.ErrInsufficientFunds
  349. }
  350. } else {
  351. return err
  352. }
  353. // Should supply enough intrinsic gas
  354. if tx.Gas().Cmp(core.IntrinsicGas(tx.Data(), core.MessageCreatesContract(tx), pool.homestead)) < 0 {
  355. return core.ErrIntrinsicGas
  356. }
  357. return nil
  358. }
  359. // add validates a new transaction and sets its state pending if processable.
  360. // It also updates the locally stored nonce if necessary.
  361. func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
  362. hash := tx.Hash()
  363. if self.pending[hash] != nil {
  364. return fmt.Errorf("Known transaction (%x)", hash[:4])
  365. }
  366. err := self.validateTx(ctx, tx)
  367. if err != nil {
  368. return err
  369. }
  370. if _, ok := self.pending[hash]; !ok {
  371. self.pending[hash] = tx
  372. nonce := tx.Nonce() + 1
  373. addr, _ := tx.From()
  374. if nonce > self.nonce[addr] {
  375. self.nonce[addr] = nonce
  376. }
  377. // Notify the subscribers. This event is posted in a goroutine
  378. // because it's possible that somewhere during the post "Remove transaction"
  379. // gets called which will then wait for the global tx pool lock and deadlock.
  380. go self.eventMux.Post(core.TxPreEvent{Tx: tx})
  381. }
  382. if glog.V(logger.Debug) {
  383. var toname string
  384. if to := tx.To(); to != nil {
  385. toname = common.Bytes2Hex(to[:4])
  386. } else {
  387. toname = "[NEW_CONTRACT]"
  388. }
  389. // we can ignore the error here because From is
  390. // verified in ValidateTransaction.
  391. f, _ := tx.From()
  392. from := common.Bytes2Hex(f[:4])
  393. glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
  394. }
  395. return nil
  396. }
  397. // Add adds a transaction to the pool if valid and passes it to the tx relay
  398. // backend
  399. func (self *TxPool) Add(ctx context.Context, tx *types.Transaction) error {
  400. self.mu.Lock()
  401. defer self.mu.Unlock()
  402. data, err := rlp.EncodeToBytes(tx)
  403. if err != nil {
  404. return err
  405. }
  406. if err := self.add(ctx, tx); err != nil {
  407. return err
  408. }
  409. //fmt.Println("Send", tx.Hash())
  410. self.relay.Send(types.Transactions{tx})
  411. self.chainDb.Put(tx.Hash().Bytes(), data)
  412. return nil
  413. }
  414. // AddTransactions adds all valid transactions to the pool and passes them to
  415. // the tx relay backend
  416. func (self *TxPool) AddBatch(ctx context.Context, txs []*types.Transaction) {
  417. self.mu.Lock()
  418. defer self.mu.Unlock()
  419. var sendTx types.Transactions
  420. for _, tx := range txs {
  421. if err := self.add(ctx, tx); err != nil {
  422. glog.V(logger.Debug).Infoln("tx error:", err)
  423. } else {
  424. sendTx = append(sendTx, tx)
  425. h := tx.Hash()
  426. glog.V(logger.Debug).Infof("tx %x\n", h[:4])
  427. }
  428. }
  429. if len(sendTx) > 0 {
  430. self.relay.Send(sendTx)
  431. }
  432. }
  433. // GetTransaction returns a transaction if it is contained in the pool
  434. // and nil otherwise.
  435. func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
  436. // check the txs first
  437. if tx, ok := tp.pending[hash]; ok {
  438. return tx
  439. }
  440. return nil
  441. }
  442. // GetTransactions returns all currently processable transactions.
  443. // The returned slice may be modified by the caller.
  444. func (self *TxPool) GetTransactions() (txs types.Transactions) {
  445. self.mu.RLock()
  446. defer self.mu.RUnlock()
  447. txs = make(types.Transactions, len(self.pending))
  448. i := 0
  449. for _, tx := range self.pending {
  450. txs[i] = tx
  451. i++
  452. }
  453. return txs
  454. }
  455. // Content retrieves the data content of the transaction pool, returning all the
  456. // pending as well as queued transactions, grouped by account and nonce.
  457. func (self *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
  458. self.mu.RLock()
  459. defer self.mu.RUnlock()
  460. // Retrieve all the pending transactions and sort by account and by nonce
  461. pending := make(map[common.Address]types.Transactions)
  462. for _, tx := range self.pending {
  463. account, _ := tx.From()
  464. pending[account] = append(pending[account], tx)
  465. }
  466. // There are no queued transactions in a light pool, just return an empty map
  467. queued := make(map[common.Address]types.Transactions)
  468. return pending, queued
  469. }
  470. // RemoveTransactions removes all given transactions from the pool.
  471. func (self *TxPool) RemoveTransactions(txs types.Transactions) {
  472. self.mu.Lock()
  473. defer self.mu.Unlock()
  474. var hashes []common.Hash
  475. for _, tx := range txs {
  476. //self.RemoveTx(tx.Hash())
  477. hash := tx.Hash()
  478. delete(self.pending, hash)
  479. self.chainDb.Delete(hash[:])
  480. hashes = append(hashes, hash)
  481. }
  482. self.relay.Discard(hashes)
  483. }
  484. // RemoveTx removes the transaction with the given hash from the pool.
  485. func (pool *TxPool) RemoveTx(hash common.Hash) {
  486. pool.mu.Lock()
  487. defer pool.mu.Unlock()
  488. // delete from pending pool
  489. delete(pool.pending, hash)
  490. pool.chainDb.Delete(hash[:])
  491. pool.relay.Discard([]common.Hash{hash})
  492. }