فهرست منبع

Moved leveldb update loop to eth/backend

change order of block insert and update LastBlock

bugfix, wrong hash stored in blockDb
Bas van Kervel 10 سال پیش
والد
کامیت
c273ed7d82
5فایلهای تغییر یافته به همراه62 افزوده شده و 48 حذف شده
  1. 1 0
      common/db.go
  2. 5 5
      core/chain_manager.go
  3. 49 18
      eth/backend.go
  4. 3 25
      ethdb/database.go
  5. 4 0
      ethdb/memory_database.go

+ 1 - 0
common/db.go

@@ -7,4 +7,5 @@ type Database interface {
 	Delete(key []byte) error
 	LastKnownTD() []byte
 	Close()
+	Flush() error
 }

+ 5 - 5
core/chain_manager.go

@@ -342,14 +342,14 @@ func (self *ChainManager) Export(w io.Writer) error {
 }
 
 func (bc *ChainManager) insert(block *types.Block) {
-	bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
-	bc.currentBlock = block
-	bc.lastBlockHash = block.Hash()
-
 	key := append(blockNumPre, block.Number().Bytes()...)
-	bc.blockDb.Put(key, bc.lastBlockHash.Bytes())
+	bc.blockDb.Put(key, block.Hash().Bytes())
 	// Push block to cache
 	bc.cache.Push(block)
+
+	bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
+	bc.currentBlock = block
+	bc.lastBlockHash = block.Hash()
 }
 
 func (bc *ChainManager) write(block *types.Block) {

+ 49 - 18
eth/backend.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"path"
 	"strings"
+	"time"
 
 	"github.com/ethereum/ethash"
 	"github.com/ethereum/go-ethereum/accounts"
@@ -123,6 +124,8 @@ type Ethereum struct {
 	blockDb common.Database // Block chain database
 	stateDb common.Database // State changes database
 	extraDb common.Database // Extra database (txs, etc)
+	// Closed when databases are flushed and closed
+	databasesClosed chan bool
 
 	//*** SERVICES ***
 	// State manager for processing new blocks and managing the over all states
@@ -197,18 +200,19 @@ func New(config *Config) (*Ethereum, error) {
 	glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion)
 
 	eth := &Ethereum{
-		shutdownChan:   make(chan bool),
-		blockDb:        blockDb,
-		stateDb:        stateDb,
-		extraDb:        extraDb,
-		eventMux:       &event.TypeMux{},
-		accountManager: config.AccountManager,
-		DataDir:        config.DataDir,
-		etherbase:      common.HexToAddress(config.Etherbase),
-		clientVersion:  config.Name, // TODO should separate from Name
-		ethVersionId:   config.ProtocolVersion,
-		netVersionId:   config.NetworkId,
-		NatSpec:        config.NatSpec,
+		shutdownChan:    make(chan bool),
+		databasesClosed: make(chan bool),
+		blockDb:         blockDb,
+		stateDb:         stateDb,
+		extraDb:         extraDb,
+		eventMux:        &event.TypeMux{},
+		accountManager:  config.AccountManager,
+		DataDir:         config.DataDir,
+		etherbase:       common.HexToAddress(config.Etherbase),
+		clientVersion:   config.Name, // TODO should separate from Name
+		ethVersionId:    config.ProtocolVersion,
+		netVersionId:    config.NetworkId,
+		NatSpec:         config.NatSpec,
 	}
 
 	eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
@@ -376,6 +380,9 @@ func (s *Ethereum) Start() error {
 		}
 	}
 
+	// periodically flush databases
+	go s.syncDatabases()
+
 	// Start services
 	go s.txPool.Start()
 	s.protocolManager.Start()
@@ -392,6 +399,34 @@ func (s *Ethereum) Start() error {
 	return nil
 }
 
+func (s *Ethereum) syncDatabases() {
+	ticker := time.NewTicker(1 * time.Minute)
+done:
+	for {
+		select {
+		case <-ticker.C:
+			// don't change the order of database flushes
+			if err := s.extraDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
+			}
+			if err := s.stateDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
+			}
+			if err := s.blockDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
+			}
+		case <-s.shutdownChan:
+			break done
+		}
+	}
+
+	s.blockDb.Close()
+	s.stateDb.Close()
+	s.extraDb.Close()
+
+	close(s.databasesClosed)
+}
+
 func (s *Ethereum) StartForTest() {
 	jsonlogger.LogJson(&logger.LogStarting{
 		ClientString:    s.net.Name,
@@ -412,12 +447,7 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error {
 }
 
 func (s *Ethereum) Stop() {
-	// Close the database
-	defer s.blockDb.Close()
-	defer s.stateDb.Close()
-	defer s.extraDb.Close()
-
-	s.txSub.Unsubscribe() // quits txBroadcastLoop
+	s.txSub.Unsubscribe()         // quits txBroadcastLoop
 
 	s.protocolManager.Stop()
 	s.txPool.Stop()
@@ -432,6 +462,7 @@ func (s *Ethereum) Stop() {
 
 // This function will wait for a shutdown and resumes main thread execution
 func (s *Ethereum) WaitForShutdown() {
+	<-s.databasesClosed
 	<-s.shutdownChan
 }
 

+ 3 - 25
ethdb/database.go

@@ -2,7 +2,6 @@ package ethdb
 
 import (
 	"sync"
-	"time"
 
 	"github.com/ethereum/go-ethereum/compression/rle"
 	"github.com/ethereum/go-ethereum/logger"
@@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
 	}
 	database.makeQueue()
 
-	go database.update()
-
 	return database, nil
 }
 
@@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error {
 	}
 	self.makeQueue() // reset the queue
 
+	glog.V(logger.Detail).Infoln("Flush database: ", self.fn)
+
 	return self.db.Write(batch, nil)
 }
 
 func (self *LDBDatabase) Close() {
-	self.quit <- struct{}{}
-	<-self.quit
-	glog.V(logger.Info).Infoln("flushed and closed db:", self.fn)
-}
-
-func (self *LDBDatabase) update() {
-	ticker := time.NewTicker(1 * time.Minute)
-done:
-	for {
-		select {
-		case <-ticker.C:
-			if err := self.Flush(); err != nil {
-				glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
-			}
-		case <-self.quit:
-			break done
-		}
-	}
-
 	if err := self.Flush(); err != nil {
 		glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
 	}
 
-	// Close the leveldb database
 	self.db.Close()
-
-	self.quit <- struct{}{}
+	glog.V(logger.Error).Infoln("flushed and closed db:", self.fn)
 }

+ 4 - 0
ethdb/memory_database.go

@@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte {
 
 	return data
 }
+
+func (db *MemDatabase) Flush() error {
+	return nil
+}