tx_journal.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package core
  17. import (
  18. "errors"
  19. "io"
  20. "io/fs"
  21. "os"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/rlp"
  26. )
  27. // errNoActiveJournal is returned if a transaction is attempted to be inserted
  28. // into the journal, but no such file is currently open.
  29. var errNoActiveJournal = errors.New("no active journal")
  30. // devNull is a WriteCloser that just discards anything written into it. Its
  31. // goal is to allow the transaction journal to write into a fake journal when
  32. // loading transactions on startup without printing warnings due to no file
  33. // being read for write.
  34. type devNull struct{}
  35. func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
  36. func (*devNull) Close() error { return nil }
  37. // txJournal is a rotating log of transactions with the aim of storing locally
  38. // created transactions to allow non-executed ones to survive node restarts.
  39. type txJournal struct {
  40. path string // Filesystem path to store the transactions at
  41. writer io.WriteCloser // Output stream to write new transactions into
  42. }
  43. // newTxJournal creates a new transaction journal to
  44. func newTxJournal(path string) *txJournal {
  45. return &txJournal{
  46. path: path,
  47. }
  48. }
  49. // load parses a transaction journal dump from disk, loading its contents into
  50. // the specified pool.
  51. func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
  52. // Open the journal for loading any past transactions
  53. input, err := os.Open(journal.path)
  54. if errors.Is(err, fs.ErrNotExist) {
  55. // Skip the parsing if the journal file doesn't exist at all
  56. return nil
  57. }
  58. if err != nil {
  59. return err
  60. }
  61. defer input.Close()
  62. // Temporarily discard any journal additions (don't double add on load)
  63. journal.writer = new(devNull)
  64. defer func() { journal.writer = nil }()
  65. // Inject all transactions from the journal into the pool
  66. stream := rlp.NewStream(input, 0)
  67. total, dropped := 0, 0
  68. // Create a method to load a limited batch of transactions and bump the
  69. // appropriate progress counters. Then use this method to load all the
  70. // journaled transactions in small-ish batches.
  71. loadBatch := func(txs types.Transactions) {
  72. for _, err := range add(txs) {
  73. if err != nil {
  74. log.Debug("Failed to add journaled transaction", "err", err)
  75. dropped++
  76. }
  77. }
  78. }
  79. var (
  80. failure error
  81. batch types.Transactions
  82. )
  83. for {
  84. // Parse the next transaction and terminate on error
  85. tx := new(types.Transaction)
  86. if err = stream.Decode(tx); err != nil {
  87. if err != io.EOF {
  88. failure = err
  89. }
  90. if batch.Len() > 0 {
  91. loadBatch(batch)
  92. }
  93. break
  94. }
  95. // New transaction parsed, queue up for later, import if threshold is reached
  96. total++
  97. if batch = append(batch, tx); batch.Len() > 1024 {
  98. loadBatch(batch)
  99. batch = batch[:0]
  100. }
  101. }
  102. log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
  103. return failure
  104. }
  105. // insert adds the specified transaction to the local disk journal.
  106. func (journal *txJournal) insert(tx *types.Transaction) error {
  107. if journal.writer == nil {
  108. return errNoActiveJournal
  109. }
  110. if err := rlp.Encode(journal.writer, tx); err != nil {
  111. return err
  112. }
  113. return nil
  114. }
  115. // rotate regenerates the transaction journal based on the current contents of
  116. // the transaction pool.
  117. func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
  118. // Close the current journal (if any is open)
  119. if journal.writer != nil {
  120. if err := journal.writer.Close(); err != nil {
  121. return err
  122. }
  123. journal.writer = nil
  124. }
  125. // Generate a new journal with the contents of the current pool
  126. replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  127. if err != nil {
  128. return err
  129. }
  130. journaled := 0
  131. for _, txs := range all {
  132. for _, tx := range txs {
  133. if err = rlp.Encode(replacement, tx); err != nil {
  134. replacement.Close()
  135. return err
  136. }
  137. }
  138. journaled += len(txs)
  139. }
  140. replacement.Close()
  141. // Replace the live journal with the newly generated one
  142. if err = os.Rename(journal.path+".new", journal.path); err != nil {
  143. return err
  144. }
  145. sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644)
  146. if err != nil {
  147. return err
  148. }
  149. journal.writer = sink
  150. log.Info("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))
  151. return nil
  152. }
  153. // close flushes the transaction journal contents to disk and closes the file.
  154. func (journal *txJournal) close() error {
  155. var err error
  156. if journal.writer != nil {
  157. err = journal.writer.Close()
  158. journal.writer = nil
  159. }
  160. return err
  161. }