database.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package ethdb
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/ethereum/go-ethereum/compression/rle"
  6. "github.com/ethereum/go-ethereum/logger"
  7. "github.com/ethereum/go-ethereum/logger/glog"
  8. "github.com/syndtr/goleveldb/leveldb"
  9. "github.com/syndtr/goleveldb/leveldb/iterator"
  10. )
  11. type LDBDatabase struct {
  12. fn string
  13. mu sync.Mutex
  14. db *leveldb.DB
  15. queue map[string][]byte
  16. quit chan struct{}
  17. }
  18. func NewLDBDatabase(file string) (*LDBDatabase, error) {
  19. // Open the db
  20. db, err := leveldb.OpenFile(file, nil)
  21. if err != nil {
  22. return nil, err
  23. }
  24. database := &LDBDatabase{
  25. fn: file,
  26. db: db,
  27. quit: make(chan struct{}),
  28. }
  29. database.makeQueue()
  30. go database.update()
  31. return database, nil
  32. }
  33. func (self *LDBDatabase) makeQueue() {
  34. self.queue = make(map[string][]byte)
  35. }
  36. func (self *LDBDatabase) Put(key []byte, value []byte) {
  37. self.mu.Lock()
  38. defer self.mu.Unlock()
  39. self.queue[string(key)] = value
  40. /*
  41. value = rle.Compress(value)
  42. err := self.db.Put(key, value, nil)
  43. if err != nil {
  44. fmt.Println("Error put", err)
  45. }
  46. */
  47. }
  48. func (self *LDBDatabase) Get(key []byte) ([]byte, error) {
  49. self.mu.Lock()
  50. defer self.mu.Unlock()
  51. // Check queue first
  52. if dat, ok := self.queue[string(key)]; ok {
  53. return dat, nil
  54. }
  55. dat, err := self.db.Get(key, nil)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return rle.Decompress(dat)
  60. }
  61. func (self *LDBDatabase) Delete(key []byte) error {
  62. self.mu.Lock()
  63. defer self.mu.Unlock()
  64. // make sure it's not in the queue
  65. delete(self.queue, string(key))
  66. return self.db.Delete(key, nil)
  67. }
  68. func (self *LDBDatabase) LastKnownTD() []byte {
  69. data, _ := self.Get([]byte("LTD"))
  70. if len(data) == 0 {
  71. data = []byte{0x0}
  72. }
  73. return data
  74. }
  75. func (self *LDBDatabase) NewIterator() iterator.Iterator {
  76. return self.db.NewIterator(nil, nil)
  77. }
  78. func (self *LDBDatabase) Flush() error {
  79. self.mu.Lock()
  80. defer self.mu.Unlock()
  81. batch := new(leveldb.Batch)
  82. for key, value := range self.queue {
  83. batch.Put([]byte(key), rle.Compress(value))
  84. }
  85. self.makeQueue() // reset the queue
  86. return self.db.Write(batch, nil)
  87. }
  88. func (self *LDBDatabase) Close() {
  89. self.quit <- struct{}{}
  90. <-self.quit
  91. glog.V(logger.Info).Infoln("flushed and closed db:", self.fn)
  92. }
  93. func (self *LDBDatabase) update() {
  94. ticker := time.NewTicker(1 * time.Minute)
  95. done:
  96. for {
  97. select {
  98. case <-ticker.C:
  99. if err := self.Flush(); err != nil {
  100. glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
  101. }
  102. case <-self.quit:
  103. break done
  104. }
  105. }
  106. if err := self.Flush(); err != nil {
  107. glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
  108. }
  109. // Close the leveldb database
  110. self.db.Close()
  111. self.quit <- struct{}{}
  112. }