freezer.go 14 KB


  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rawdb
  17. import (
  18. "errors"
  19. "fmt"
  20. "math"
  21. "os"
  22. "path/filepath"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/ethdb"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. "github.com/prometheus/tsdb/fileutil"
  31. )
  32. var (
  33. // errReadOnly is returned if the freezer is opened in read only mode. All the
  34. // mutations are disallowed.
  35. errReadOnly = errors.New("read only")
  36. // errUnknownTable is returned if the user attempts to read from a table that is
  37. // not tracked by the freezer.
  38. errUnknownTable = errors.New("unknown table")
  39. // errOutOrderInsertion is returned if the user attempts to inject out-of-order
  40. // binary blobs into the freezer.
  41. errOutOrderInsertion = errors.New("the append operation is out-order")
  42. // errSymlinkDatadir is returned if the ancient directory specified by user
  43. // is a symbolic link.
  44. errSymlinkDatadir = errors.New("symbolic link datadir is not supported")
  45. )
  46. // freezerTableSize defines the maximum size of freezer data files.
  47. const freezerTableSize = 2 * 1000 * 1000 * 1000
  48. // Freezer is a memory mapped append-only database to store immutable ordered
  49. // data into flat files:
  50. //
  51. // - The append-only nature ensures that disk writes are minimized.
  52. // - The memory mapping ensures we can max out system memory for caching without
  53. // reserving it for go-ethereum. This would also reduce the memory requirements
  54. // of Geth, and thus also GC overhead.
  55. type Freezer struct {
  56. // WARNING: The `frozen` and `tail` fields are accessed atomically. On 32 bit platforms, only
  57. // 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
  58. // so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
  59. frozen uint64 // Number of blocks already frozen
  60. tail uint64 // Number of the first stored item in the freezer
  61. // This lock synchronizes writers and the truncate operation, as well as
  62. // the "atomic" (batched) read operations.
  63. writeLock sync.RWMutex
  64. writeBatch *freezerBatch
  65. readonly bool
  66. tables map[string]*freezerTable // Data tables for storing everything
  67. instanceLock fileutil.Releaser // File-system lock to prevent double opens
  68. closeOnce sync.Once
  69. }
  70. // NewFreezer creates a freezer instance for maintaining immutable ordered
  71. // data according to the given parameters.
  72. //
  73. // The 'tables' argument defines the data tables. If the value of a map
  74. // entry is true, snappy compression is disabled for the table.
  75. func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*Freezer, error) {
  76. // Create the initial freezer object
  77. var (
  78. readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
  79. writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
  80. sizeGauge = metrics.NewRegisteredGauge(namespace+"ancient/size", nil)
  81. )
  82. // Ensure the datadir is not a symbolic link if it exists.
  83. if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
  84. if info.Mode()&os.ModeSymlink != 0 {
  85. log.Warn("Symbolic link ancient database is not supported", "path", datadir)
  86. return nil, errSymlinkDatadir
  87. }
  88. }
  89. // Leveldb uses LOCK as the filelock filename. To prevent the
  90. // name collision, we use FLOCK as the lock name.
  91. lock, _, err := fileutil.Flock(filepath.Join(datadir, "FLOCK"))
  92. if err != nil {
  93. return nil, err
  94. }
  95. // Open all the supported data tables
  96. freezer := &Freezer{
  97. readonly: readonly,
  98. tables: make(map[string]*freezerTable),
  99. instanceLock: lock,
  100. }
  101. // Create the tables.
  102. for name, disableSnappy := range tables {
  103. table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy, readonly)
  104. if err != nil {
  105. for _, table := range freezer.tables {
  106. table.Close()
  107. }
  108. lock.Release()
  109. return nil, err
  110. }
  111. freezer.tables[name] = table
  112. }
  113. if freezer.readonly {
  114. // In readonly mode only validate, don't truncate.
  115. // validate also sets `freezer.frozen`.
  116. err = freezer.validate()
  117. } else {
  118. // Truncate all tables to common length.
  119. err = freezer.repair()
  120. }
  121. if err != nil {
  122. for _, table := range freezer.tables {
  123. table.Close()
  124. }
  125. lock.Release()
  126. return nil, err
  127. }
  128. // Create the write batch.
  129. freezer.writeBatch = newFreezerBatch(freezer)
  130. log.Info("Opened ancient database", "database", datadir, "readonly", readonly)
  131. return freezer, nil
  132. }
  133. // Close terminates the chain freezer, unmapping all the data files.
  134. func (f *Freezer) Close() error {
  135. f.writeLock.Lock()
  136. defer f.writeLock.Unlock()
  137. var errs []error
  138. f.closeOnce.Do(func() {
  139. for _, table := range f.tables {
  140. if err := table.Close(); err != nil {
  141. errs = append(errs, err)
  142. }
  143. }
  144. if err := f.instanceLock.Release(); err != nil {
  145. errs = append(errs, err)
  146. }
  147. })
  148. if errs != nil {
  149. return fmt.Errorf("%v", errs)
  150. }
  151. return nil
  152. }
  153. // HasAncient returns an indicator whether the specified ancient data exists
  154. // in the freezer.
  155. func (f *Freezer) HasAncient(kind string, number uint64) (bool, error) {
  156. if table := f.tables[kind]; table != nil {
  157. return table.has(number), nil
  158. }
  159. return false, nil
  160. }
  161. // Ancient retrieves an ancient binary blob from the append-only immutable files.
  162. func (f *Freezer) Ancient(kind string, number uint64) ([]byte, error) {
  163. if table := f.tables[kind]; table != nil {
  164. return table.Retrieve(number)
  165. }
  166. return nil, errUnknownTable
  167. }
  168. // AncientRange retrieves multiple items in sequence, starting from the index 'start'.
  169. // It will return
  170. // - at most 'max' items,
  171. // - at least 1 item (even if exceeding the maxByteSize), but will otherwise
  172. // return as many items as fit into maxByteSize.
  173. func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
  174. if table := f.tables[kind]; table != nil {
  175. return table.RetrieveItems(start, count, maxBytes)
  176. }
  177. return nil, errUnknownTable
  178. }
  179. // Ancients returns the length of the frozen items.
  180. func (f *Freezer) Ancients() (uint64, error) {
  181. return atomic.LoadUint64(&f.frozen), nil
  182. }
  183. // Tail returns the number of first stored item in the freezer.
  184. func (f *Freezer) Tail() (uint64, error) {
  185. return atomic.LoadUint64(&f.tail), nil
  186. }
  187. // AncientSize returns the ancient size of the specified category.
  188. func (f *Freezer) AncientSize(kind string) (uint64, error) {
  189. // This needs the write lock to avoid data races on table fields.
  190. // Speed doesn't matter here, AncientSize is for debugging.
  191. f.writeLock.RLock()
  192. defer f.writeLock.RUnlock()
  193. if table := f.tables[kind]; table != nil {
  194. return table.size()
  195. }
  196. return 0, errUnknownTable
  197. }
  198. // ReadAncients runs the given read operation while ensuring that no writes take place
  199. // on the underlying freezer.
  200. func (f *Freezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
  201. f.writeLock.RLock()
  202. defer f.writeLock.RUnlock()
  203. return fn(f)
  204. }
  205. // ModifyAncients runs the given write operation.
  206. func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
  207. if f.readonly {
  208. return 0, errReadOnly
  209. }
  210. f.writeLock.Lock()
  211. defer f.writeLock.Unlock()
  212. // Roll back all tables to the starting position in case of error.
  213. prevItem := atomic.LoadUint64(&f.frozen)
  214. defer func() {
  215. if err != nil {
  216. // The write operation has failed. Go back to the previous item position.
  217. for name, table := range f.tables {
  218. err := table.truncateHead(prevItem)
  219. if err != nil {
  220. log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
  221. }
  222. }
  223. }
  224. }()
  225. f.writeBatch.reset()
  226. if err := fn(f.writeBatch); err != nil {
  227. return 0, err
  228. }
  229. item, writeSize, err := f.writeBatch.commit()
  230. if err != nil {
  231. return 0, err
  232. }
  233. atomic.StoreUint64(&f.frozen, item)
  234. return writeSize, nil
  235. }
  236. // TruncateHead discards any recent data above the provided threshold number.
  237. func (f *Freezer) TruncateHead(items uint64) error {
  238. if f.readonly {
  239. return errReadOnly
  240. }
  241. f.writeLock.Lock()
  242. defer f.writeLock.Unlock()
  243. if atomic.LoadUint64(&f.frozen) <= items {
  244. return nil
  245. }
  246. for _, table := range f.tables {
  247. if err := table.truncateHead(items); err != nil {
  248. return err
  249. }
  250. }
  251. atomic.StoreUint64(&f.frozen, items)
  252. return nil
  253. }
  254. // TruncateTail discards any recent data below the provided threshold number.
  255. func (f *Freezer) TruncateTail(tail uint64) error {
  256. if f.readonly {
  257. return errReadOnly
  258. }
  259. f.writeLock.Lock()
  260. defer f.writeLock.Unlock()
  261. if atomic.LoadUint64(&f.tail) >= tail {
  262. return nil
  263. }
  264. for _, table := range f.tables {
  265. if err := table.truncateTail(tail); err != nil {
  266. return err
  267. }
  268. }
  269. atomic.StoreUint64(&f.tail, tail)
  270. return nil
  271. }
  272. // Sync flushes all data tables to disk.
  273. func (f *Freezer) Sync() error {
  274. var errs []error
  275. for _, table := range f.tables {
  276. if err := table.Sync(); err != nil {
  277. errs = append(errs, err)
  278. }
  279. }
  280. if errs != nil {
  281. return fmt.Errorf("%v", errs)
  282. }
  283. return nil
  284. }
  285. // validate checks that every table has the same length.
  286. // Used instead of `repair` in readonly mode.
  287. func (f *Freezer) validate() error {
  288. if len(f.tables) == 0 {
  289. return nil
  290. }
  291. var (
  292. length uint64
  293. name string
  294. )
  295. // Hack to get length of any table
  296. for kind, table := range f.tables {
  297. length = atomic.LoadUint64(&table.items)
  298. name = kind
  299. break
  300. }
  301. // Now check every table against that length
  302. for kind, table := range f.tables {
  303. items := atomic.LoadUint64(&table.items)
  304. if length != items {
  305. return fmt.Errorf("freezer tables %s and %s have differing lengths: %d != %d", kind, name, items, length)
  306. }
  307. }
  308. atomic.StoreUint64(&f.frozen, length)
  309. return nil
  310. }
  311. // repair truncates all data tables to the same length.
  312. func (f *Freezer) repair() error {
  313. var (
  314. head = uint64(math.MaxUint64)
  315. tail = uint64(0)
  316. )
  317. for _, table := range f.tables {
  318. items := atomic.LoadUint64(&table.items)
  319. if head > items {
  320. head = items
  321. }
  322. hidden := atomic.LoadUint64(&table.itemHidden)
  323. if hidden > tail {
  324. tail = hidden
  325. }
  326. }
  327. for _, table := range f.tables {
  328. if err := table.truncateHead(head); err != nil {
  329. return err
  330. }
  331. if err := table.truncateTail(tail); err != nil {
  332. return err
  333. }
  334. }
  335. atomic.StoreUint64(&f.frozen, head)
  336. atomic.StoreUint64(&f.tail, tail)
  337. return nil
  338. }
  339. // convertLegacyFn takes a raw freezer entry in an older format and
  340. // returns it in the new format.
  341. type convertLegacyFn = func([]byte) ([]byte, error)
  342. // MigrateTable processes the entries in a given table in sequence
  343. // converting them to a new format if they're of an old format.
  344. func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
  345. if f.readonly {
  346. return errReadOnly
  347. }
  348. f.writeLock.Lock()
  349. defer f.writeLock.Unlock()
  350. table, ok := f.tables[kind]
  351. if !ok {
  352. return errUnknownTable
  353. }
  354. // forEach iterates every entry in the table serially and in order, calling `fn`
  355. // with the item as argument. If `fn` returns an error the iteration stops
  356. // and that error will be returned.
  357. forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error {
  358. var (
  359. items = atomic.LoadUint64(&t.items)
  360. batchSize = uint64(1024)
  361. maxBytes = uint64(1024 * 1024)
  362. )
  363. for i := offset; i < items; {
  364. if i+batchSize > items {
  365. batchSize = items - i
  366. }
  367. data, err := t.RetrieveItems(i, batchSize, maxBytes)
  368. if err != nil {
  369. return err
  370. }
  371. for j, item := range data {
  372. if err := fn(i+uint64(j), item); err != nil {
  373. return err
  374. }
  375. }
  376. i += uint64(len(data))
  377. }
  378. return nil
  379. }
  380. // TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
  381. // process assumes no deletion at tail and needs to be modified to account for that.
  382. if table.itemOffset > 0 || table.itemHidden > 0 {
  383. return fmt.Errorf("migration not supported for tail-deleted freezers")
  384. }
  385. ancientsPath := filepath.Dir(table.index.Name())
  386. // Set up new dir for the migrated table, the content of which
  387. // we'll at the end move over to the ancients dir.
  388. migrationPath := filepath.Join(ancientsPath, "migration")
  389. newTable, err := newFreezerTable(migrationPath, kind, table.noCompression, false)
  390. if err != nil {
  391. return err
  392. }
  393. var (
  394. batch = newTable.newBatch()
  395. out []byte
  396. start = time.Now()
  397. logged = time.Now()
  398. offset = newTable.items
  399. )
  400. if offset > 0 {
  401. log.Info("found previous migration attempt", "migrated", offset)
  402. }
  403. // Iterate through entries and transform them
  404. if err := forEach(table, offset, func(i uint64, blob []byte) error {
  405. if i%10000 == 0 && time.Since(logged) > 16*time.Second {
  406. log.Info("Processing legacy elements", "count", i, "elapsed", common.PrettyDuration(time.Since(start)))
  407. logged = time.Now()
  408. }
  409. out, err = convert(blob)
  410. if err != nil {
  411. return err
  412. }
  413. if err := batch.AppendRaw(i, out); err != nil {
  414. return err
  415. }
  416. return nil
  417. }); err != nil {
  418. return err
  419. }
  420. if err := batch.commit(); err != nil {
  421. return err
  422. }
  423. log.Info("Replacing old table files with migrated ones", "elapsed", common.PrettyDuration(time.Since(start)))
  424. // Release and delete old table files. Note this won't
  425. // delete the index file.
  426. table.releaseFilesAfter(0, true)
  427. if err := newTable.Close(); err != nil {
  428. return err
  429. }
  430. files, err := os.ReadDir(migrationPath)
  431. if err != nil {
  432. return err
  433. }
  434. // Move migrated files to ancients dir.
  435. for _, f := range files {
  436. // This will replace the old index file as a side-effect.
  437. if err := os.Rename(filepath.Join(migrationPath, f.Name()), filepath.Join(ancientsPath, f.Name())); err != nil {
  438. return err
  439. }
  440. }
  441. // Delete by now empty dir.
  442. if err := os.Remove(migrationPath); err != nil {
  443. return err
  444. }
  445. return nil
  446. }