leveldb.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. //go:build !js
  17. // +build !js
  18. // Package leveldb implements the key-value database layer based on LevelDB.
  19. package leveldb
  20. import (
  21. "fmt"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/ethdb"
  28. "github.com/ethereum/go-ethereum/log"
  29. "github.com/ethereum/go-ethereum/metrics"
  30. "github.com/syndtr/goleveldb/leveldb"
  31. "github.com/syndtr/goleveldb/leveldb/errors"
  32. "github.com/syndtr/goleveldb/leveldb/filter"
  33. "github.com/syndtr/goleveldb/leveldb/opt"
  34. "github.com/syndtr/goleveldb/leveldb/util"
  35. )
  36. const (
  37. // degradationWarnInterval specifies how often warning should be printed if the
  38. // leveldb database cannot keep up with requested writes.
  39. degradationWarnInterval = time.Minute
  40. // minCache is the minimum amount of memory in megabytes to allocate to leveldb
  41. // read and write caching, split half and half.
  42. minCache = 16
  43. // minHandles is the minimum number of files handles to allocate to the open
  44. // database files.
  45. minHandles = 16
  46. // metricsGatheringInterval specifies the interval to retrieve leveldb database
  47. // compaction, io and pause stats to report to the user.
  48. metricsGatheringInterval = 3 * time.Second
  49. )
  50. // Database is a persistent key-value store. Apart from basic data storage
  51. // functionality it also supports batch writes and iterating over the keyspace in
  52. // binary-alphabetical order.
  53. type Database struct {
  54. fn string // filename for reporting
  55. db *leveldb.DB // LevelDB instance
  56. compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
  57. compReadMeter metrics.Meter // Meter for measuring the data read during compaction
  58. compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
  59. writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
  60. writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
  61. diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
  62. diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
  63. diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
  64. memCompGauge metrics.Gauge // Gauge for tracking the number of memory compaction
  65. level0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in level0
  66. nonlevel0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
  67. seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
  68. quitLock sync.Mutex // Mutex protecting the quit channel access
  69. quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
  70. log log.Logger // Contextual logger tracking the database path
  71. }
  72. // New returns a wrapped LevelDB object. The namespace is the prefix that the
  73. // metrics reporting should use for surfacing internal stats.
  74. func New(file string, cache int, handles int, namespace string, readonly bool) (*Database, error) {
  75. return NewCustom(file, namespace, func(options *opt.Options) {
  76. // Ensure we have some minimal caching and file guarantees
  77. if cache < minCache {
  78. cache = minCache
  79. }
  80. if handles < minHandles {
  81. handles = minHandles
  82. }
  83. // Set default options
  84. options.OpenFilesCacheCapacity = handles
  85. options.BlockCacheCapacity = cache / 2 * opt.MiB
  86. options.WriteBuffer = cache / 4 * opt.MiB // Two of these are used internally
  87. if readonly {
  88. options.ReadOnly = true
  89. }
  90. })
  91. }
  92. // NewCustom returns a wrapped LevelDB object. The namespace is the prefix that the
  93. // metrics reporting should use for surfacing internal stats.
  94. // The customize function allows the caller to modify the leveldb options.
  95. func NewCustom(file string, namespace string, customize func(options *opt.Options)) (*Database, error) {
  96. options := configureOptions(customize)
  97. logger := log.New("database", file)
  98. usedCache := options.GetBlockCacheCapacity() + options.GetWriteBuffer()*2
  99. logCtx := []interface{}{"cache", common.StorageSize(usedCache), "handles", options.GetOpenFilesCacheCapacity()}
  100. if options.ReadOnly {
  101. logCtx = append(logCtx, "readonly", "true")
  102. }
  103. logger.Info("Allocated cache and file handles", logCtx...)
  104. // Open the db and recover any potential corruptions
  105. db, err := leveldb.OpenFile(file, options)
  106. if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
  107. db, err = leveldb.RecoverFile(file, nil)
  108. }
  109. if err != nil {
  110. return nil, err
  111. }
  112. // Assemble the wrapper with all the registered metrics
  113. ldb := &Database{
  114. fn: file,
  115. db: db,
  116. log: logger,
  117. quitChan: make(chan chan error),
  118. }
  119. ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil)
  120. ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil)
  121. ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil)
  122. ldb.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil)
  123. ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil)
  124. ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil)
  125. ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil)
  126. ldb.writeDelayNMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/counter", nil)
  127. ldb.memCompGauge = metrics.NewRegisteredGauge(namespace+"compact/memory", nil)
  128. ldb.level0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/level0", nil)
  129. ldb.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil)
  130. ldb.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil)
  131. // Start up the metrics gathering and return
  132. go ldb.meter(metricsGatheringInterval)
  133. return ldb, nil
  134. }
  135. // configureOptions sets some default options, then runs the provided setter.
  136. func configureOptions(customizeFn func(*opt.Options)) *opt.Options {
  137. // Set default options
  138. options := &opt.Options{
  139. Filter: filter.NewBloomFilter(10),
  140. DisableSeeksCompaction: true,
  141. }
  142. // Allow caller to make custom modifications to the options
  143. if customizeFn != nil {
  144. customizeFn(options)
  145. }
  146. return options
  147. }
  148. // Close stops the metrics collection, flushes any pending data to disk and closes
  149. // all io accesses to the underlying key-value store.
  150. func (db *Database) Close() error {
  151. db.quitLock.Lock()
  152. defer db.quitLock.Unlock()
  153. if db.quitChan != nil {
  154. errc := make(chan error)
  155. db.quitChan <- errc
  156. if err := <-errc; err != nil {
  157. db.log.Error("Metrics collection failed", "err", err)
  158. }
  159. db.quitChan = nil
  160. }
  161. return db.db.Close()
  162. }
  163. // Has retrieves if a key is present in the key-value store.
  164. func (db *Database) Has(key []byte) (bool, error) {
  165. return db.db.Has(key, nil)
  166. }
  167. // Get retrieves the given key if it's present in the key-value store.
  168. func (db *Database) Get(key []byte) ([]byte, error) {
  169. dat, err := db.db.Get(key, nil)
  170. if err != nil {
  171. return nil, err
  172. }
  173. return dat, nil
  174. }
  175. // Put inserts the given value into the key-value store.
  176. func (db *Database) Put(key []byte, value []byte) error {
  177. return db.db.Put(key, value, nil)
  178. }
  179. // Delete removes the key from the key-value store.
  180. func (db *Database) Delete(key []byte) error {
  181. return db.db.Delete(key, nil)
  182. }
  183. // NewBatch creates a write-only key-value store that buffers changes to its host
  184. // database until a final write is called.
  185. func (db *Database) NewBatch() ethdb.Batch {
  186. return &batch{
  187. db: db.db,
  188. b: new(leveldb.Batch),
  189. }
  190. }
  191. // NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
  192. func (db *Database) NewBatchWithSize(size int) ethdb.Batch {
  193. return &batch{
  194. db: db.db,
  195. b: leveldb.MakeBatch(size),
  196. }
  197. }
  198. // NewIterator creates a binary-alphabetical iterator over a subset
  199. // of database content with a particular key prefix, starting at a particular
  200. // initial key (or after, if it does not exist).
  201. func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
  202. return db.db.NewIterator(bytesPrefixRange(prefix, start), nil)
  203. }
  204. // NewSnapshot creates a database snapshot based on the current state.
  205. // The created snapshot will not be affected by all following mutations
  206. // happened on the database.
  207. // Note don't forget to release the snapshot once it's used up, otherwise
  208. // the stale data will never be cleaned up by the underlying compactor.
  209. func (db *Database) NewSnapshot() (ethdb.Snapshot, error) {
  210. snap, err := db.db.GetSnapshot()
  211. if err != nil {
  212. return nil, err
  213. }
  214. return &snapshot{db: snap}, nil
  215. }
  216. // Stat returns a particular internal stat of the database.
  217. func (db *Database) Stat(property string) (string, error) {
  218. return db.db.GetProperty(property)
  219. }
  220. // Compact flattens the underlying data store for the given key range. In essence,
  221. // deleted and overwritten versions are discarded, and the data is rearranged to
  222. // reduce the cost of operations needed to access them.
  223. //
  224. // A nil start is treated as a key before all keys in the data store; a nil limit
  225. // is treated as a key after all keys in the data store. If both is nil then it
  226. // will compact entire data store.
  227. func (db *Database) Compact(start []byte, limit []byte) error {
  228. return db.db.CompactRange(util.Range{Start: start, Limit: limit})
  229. }
  230. // Path returns the path to the database directory.
  231. func (db *Database) Path() string {
  232. return db.fn
  233. }
  234. // meter periodically retrieves internal leveldb counters and reports them to
  235. // the metrics subsystem.
  236. //
  237. // This is how a LevelDB stats table looks like (currently):
  238. // Compactions
  239. // Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
  240. // -------+------------+---------------+---------------+---------------+---------------
  241. // 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
  242. // 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
  243. // 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
  244. // 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
  245. //
  246. // This is how the write delay look like (currently):
  247. // DelayN:5 Delay:406.604657ms Paused: false
  248. //
  249. // This is how the iostats look like (currently):
  250. // Read(MB):3895.04860 Write(MB):3654.64712
  251. func (db *Database) meter(refresh time.Duration) {
  252. // Create the counters to store current and previous compaction values
  253. compactions := make([][]float64, 2)
  254. for i := 0; i < 2; i++ {
  255. compactions[i] = make([]float64, 4)
  256. }
  257. // Create storage for iostats.
  258. var iostats [2]float64
  259. // Create storage and warning log tracer for write delay.
  260. var (
  261. delaystats [2]int64
  262. lastWritePaused time.Time
  263. )
  264. var (
  265. errc chan error
  266. merr error
  267. )
  268. timer := time.NewTimer(refresh)
  269. defer timer.Stop()
  270. // Iterate ad infinitum and collect the stats
  271. for i := 1; errc == nil && merr == nil; i++ {
  272. // Retrieve the database stats
  273. stats, err := db.db.GetProperty("leveldb.stats")
  274. if err != nil {
  275. db.log.Error("Failed to read database stats", "err", err)
  276. merr = err
  277. continue
  278. }
  279. // Find the compaction table, skip the header
  280. lines := strings.Split(stats, "\n")
  281. for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
  282. lines = lines[1:]
  283. }
  284. if len(lines) <= 3 {
  285. db.log.Error("Compaction leveldbTable not found")
  286. merr = errors.New("compaction leveldbTable not found")
  287. continue
  288. }
  289. lines = lines[3:]
  290. // Iterate over all the leveldbTable rows, and accumulate the entries
  291. for j := 0; j < len(compactions[i%2]); j++ {
  292. compactions[i%2][j] = 0
  293. }
  294. for _, line := range lines {
  295. parts := strings.Split(line, "|")
  296. if len(parts) != 6 {
  297. break
  298. }
  299. for idx, counter := range parts[2:] {
  300. value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
  301. if err != nil {
  302. db.log.Error("Compaction entry parsing failed", "err", err)
  303. merr = err
  304. continue
  305. }
  306. compactions[i%2][idx] += value
  307. }
  308. }
  309. // Update all the requested meters
  310. if db.diskSizeGauge != nil {
  311. db.diskSizeGauge.Update(int64(compactions[i%2][0] * 1024 * 1024))
  312. }
  313. if db.compTimeMeter != nil {
  314. db.compTimeMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1000 * 1000 * 1000))
  315. }
  316. if db.compReadMeter != nil {
  317. db.compReadMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
  318. }
  319. if db.compWriteMeter != nil {
  320. db.compWriteMeter.Mark(int64((compactions[i%2][3] - compactions[(i-1)%2][3]) * 1024 * 1024))
  321. }
  322. // Retrieve the write delay statistic
  323. writedelay, err := db.db.GetProperty("leveldb.writedelay")
  324. if err != nil {
  325. db.log.Error("Failed to read database write delay statistic", "err", err)
  326. merr = err
  327. continue
  328. }
  329. var (
  330. delayN int64
  331. delayDuration string
  332. duration time.Duration
  333. paused bool
  334. )
  335. if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
  336. db.log.Error("Write delay statistic not found")
  337. merr = err
  338. continue
  339. }
  340. duration, err = time.ParseDuration(delayDuration)
  341. if err != nil {
  342. db.log.Error("Failed to parse delay duration", "err", err)
  343. merr = err
  344. continue
  345. }
  346. if db.writeDelayNMeter != nil {
  347. db.writeDelayNMeter.Mark(delayN - delaystats[0])
  348. }
  349. if db.writeDelayMeter != nil {
  350. db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
  351. }
  352. // If a warning that db is performing compaction has been displayed, any subsequent
  353. // warnings will be withheld for one minute not to overwhelm the user.
  354. if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
  355. time.Now().After(lastWritePaused.Add(degradationWarnInterval)) {
  356. db.log.Warn("Database compacting, degraded performance")
  357. lastWritePaused = time.Now()
  358. }
  359. delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()
  360. // Retrieve the database iostats.
  361. ioStats, err := db.db.GetProperty("leveldb.iostats")
  362. if err != nil {
  363. db.log.Error("Failed to read database iostats", "err", err)
  364. merr = err
  365. continue
  366. }
  367. var nRead, nWrite float64
  368. parts := strings.Split(ioStats, " ")
  369. if len(parts) < 2 {
  370. db.log.Error("Bad syntax of ioStats", "ioStats", ioStats)
  371. merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
  372. continue
  373. }
  374. if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
  375. db.log.Error("Bad syntax of read entry", "entry", parts[0])
  376. merr = err
  377. continue
  378. }
  379. if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
  380. db.log.Error("Bad syntax of write entry", "entry", parts[1])
  381. merr = err
  382. continue
  383. }
  384. if db.diskReadMeter != nil {
  385. db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
  386. }
  387. if db.diskWriteMeter != nil {
  388. db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
  389. }
  390. iostats[0], iostats[1] = nRead, nWrite
  391. compCount, err := db.db.GetProperty("leveldb.compcount")
  392. if err != nil {
  393. db.log.Error("Failed to read database iostats", "err", err)
  394. merr = err
  395. continue
  396. }
  397. var (
  398. memComp uint32
  399. level0Comp uint32
  400. nonLevel0Comp uint32
  401. seekComp uint32
  402. )
  403. if n, err := fmt.Sscanf(compCount, "MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", &memComp, &level0Comp, &nonLevel0Comp, &seekComp); n != 4 || err != nil {
  404. db.log.Error("Compaction count statistic not found")
  405. merr = err
  406. continue
  407. }
  408. db.memCompGauge.Update(int64(memComp))
  409. db.level0CompGauge.Update(int64(level0Comp))
  410. db.nonlevel0CompGauge.Update(int64(nonLevel0Comp))
  411. db.seekCompGauge.Update(int64(seekComp))
  412. // Sleep a bit, then repeat the stats collection
  413. select {
  414. case errc = <-db.quitChan:
  415. // Quit requesting, stop hammering the database
  416. case <-timer.C:
  417. timer.Reset(refresh)
  418. // Timeout, gather a new set of stats
  419. }
  420. }
  421. if errc == nil {
  422. errc = <-db.quitChan
  423. }
  424. errc <- merr
  425. }
  426. // batch is a write-only leveldb batch that commits changes to its host database
  427. // when Write is called. A batch cannot be used concurrently.
  428. type batch struct {
  429. db *leveldb.DB
  430. b *leveldb.Batch
  431. size int
  432. }
  433. // Put inserts the given value into the batch for later committing.
  434. func (b *batch) Put(key, value []byte) error {
  435. b.b.Put(key, value)
  436. b.size += len(key) + len(value)
  437. return nil
  438. }
  439. // Delete inserts the a key removal into the batch for later committing.
  440. func (b *batch) Delete(key []byte) error {
  441. b.b.Delete(key)
  442. b.size += len(key)
  443. return nil
  444. }
  445. // ValueSize retrieves the amount of data queued up for writing.
  446. func (b *batch) ValueSize() int {
  447. return b.size
  448. }
  449. // Write flushes any accumulated data to disk.
  450. func (b *batch) Write() error {
  451. return b.db.Write(b.b, nil)
  452. }
  453. // Reset resets the batch for reuse.
  454. func (b *batch) Reset() {
  455. b.b.Reset()
  456. b.size = 0
  457. }
  458. // Replay replays the batch contents.
  459. func (b *batch) Replay(w ethdb.KeyValueWriter) error {
  460. return b.b.Replay(&replayer{writer: w})
  461. }
  462. // replayer is a small wrapper to implement the correct replay methods.
  463. type replayer struct {
  464. writer ethdb.KeyValueWriter
  465. failure error
  466. }
  467. // Put inserts the given value into the key-value data store.
  468. func (r *replayer) Put(key, value []byte) {
  469. // If the replay already failed, stop executing ops
  470. if r.failure != nil {
  471. return
  472. }
  473. r.failure = r.writer.Put(key, value)
  474. }
  475. // Delete removes the key from the key-value data store.
  476. func (r *replayer) Delete(key []byte) {
  477. // If the replay already failed, stop executing ops
  478. if r.failure != nil {
  479. return
  480. }
  481. r.failure = r.writer.Delete(key)
  482. }
  483. // bytesPrefixRange returns key range that satisfy
  484. // - the given prefix, and
  485. // - the given seek position
  486. func bytesPrefixRange(prefix, start []byte) *util.Range {
  487. r := util.BytesPrefix(prefix)
  488. r.Start = append(r.Start, start...)
  489. return r
  490. }
  491. // snapshot wraps a leveldb snapshot for implementing the Snapshot interface.
  492. type snapshot struct {
  493. db *leveldb.Snapshot
  494. }
  495. // Has retrieves if a key is present in the snapshot backing by a key-value
  496. // data store.
  497. func (snap *snapshot) Has(key []byte) (bool, error) {
  498. return snap.db.Has(key, nil)
  499. }
  500. // Get retrieves the given key if it's present in the snapshot backing by
  501. // key-value data store.
  502. func (snap *snapshot) Get(key []byte) ([]byte, error) {
  503. return snap.db.Get(key, nil)
  504. }
  505. // Release releases associated resources. Release should always succeed and can
  506. // be called multiple times without causing error.
  507. func (snap *snapshot) Release() {
  508. snap.db.Release()
  509. }