Browse Source

core: added LRU caching and added batch writing when LDB is used

obscuren 10 years ago
parent
commit
4460dc9d1a
1 changed files with 81 additions and 21 deletions
  1. 81 21
      core/chain_manager.go

+ 81 - 21
core/chain_manager.go

@@ -11,15 +11,18 @@ import (
 	"time"
 
 	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/compression/rle"
 	"github.com/ethereum/go-ethereum/core/state"
 	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/logger/glog"
 	"github.com/ethereum/go-ethereum/params"
 	"github.com/ethereum/go-ethereum/pow"
 	"github.com/ethereum/go-ethereum/rlp"
-	"github.com/rcrowley/go-metrics"
+	"github.com/golang/groupcache/lru"
+	"github.com/syndtr/goleveldb/leveldb"
 )
 
 var (
@@ -105,8 +108,9 @@ type ChainManager struct {
 	transState *state.StateDB
 	txState    *state.ManagedState
 
-	cache        *BlockCache
-	futureBlocks *BlockCache
+	cache        *lru.Cache  // cache is the LRU caching
+	futureBlocks *BlockCache // future blocks are blocks added for later processing
+	nextBlocks   *BlockCache // next blocks is used during large inserts
 
 	quit chan struct{}
 	// procInterrupt must be atomically called
@@ -123,7 +127,7 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
 		genesisBlock: GenesisBlock(42, stateDb),
 		eventMux:     mux,
 		quit:         make(chan struct{}),
-		cache:        NewBlockCache(blockCacheLimit),
+		cache:        lru.New(blockCacheLimit),
 		pow:          pow,
 	}
 	// Check the genesis block given to the chain manager. If the genesis block mismatches block number 0
@@ -168,7 +172,7 @@ func (bc *ChainManager) SetHead(head *types.Block) {
 		bc.removeBlock(block)
 	}
 
-	bc.cache = NewBlockCache(blockCacheLimit)
+	bc.cache = lru.New(blockCacheLimit)
 	bc.currentBlock = head
 	bc.makeCache()
 
@@ -257,11 +261,13 @@ func (bc *ChainManager) setLastState() {
 
 func (bc *ChainManager) makeCache() {
 	if bc.cache == nil {
-		bc.cache = NewBlockCache(blockCacheLimit)
+		bc.cache = lru.New(blockCacheLimit)
 	}
 	// load in last `blockCacheLimit` - 1 blocks. Last block is the current.
-	for _, block := range bc.GetBlocksFromHash(bc.currentBlock.Hash(), blockCacheLimit) {
-		bc.cache.Push(block)
+	ancestors := bc.GetAncestors(bc.currentBlock, blockCacheLimit-1)
+	ancestors = append(ancestors, bc.currentBlock)
+	for _, block := range ancestors {
+		bc.cache.Add(block.Hash(), block)
 	}
 }
 
@@ -274,7 +280,7 @@ func (bc *ChainManager) Reset() {
 	}
 
 	if bc.cache == nil {
-		bc.cache = NewBlockCache(blockCacheLimit)
+		bc.cache = lru.New(blockCacheLimit)
 	}
 
 	// Prepare the genesis block
@@ -359,15 +365,20 @@ func (bc *ChainManager) insert(block *types.Block) {
 }
 
 func (bc *ChainManager) write(block *types.Block) {
-	enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
-	key := append(blockHashPre, block.Hash().Bytes()...)
-	err := bc.blockDb.Put(key, enc)
-	if err != nil {
-		glog.Fatal("db write fail:", err)
-	}
+	tstart := time.Now()
+
+	go func() {
+		enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
+		key := append(blockHashPre, block.Hash().Bytes()...)
+		err := bc.blockDb.Put(key, enc)
+		if err != nil {
+			glog.Fatal("db write fail:", err)
+		}
+	}()
 
-	// Push block to cache
-	bc.cache.Push(block)
+	if glog.V(logger.Debug) {
+		glog.Infof("wrote block #%v %s. Took %v\n", block.Number(), common.PP(block.Hash().Bytes()), time.Since(tstart))
+	}
 }
 
 // Accessors
@@ -377,6 +388,16 @@ func (bc *ChainManager) Genesis() *types.Block {
 
 // Block fetching methods
 func (bc *ChainManager) HasBlock(hash common.Hash) bool {
+	if bc.cache.Contains(hash) {
+		return true
+	}
+
+	if bc.nextBlocks != nil {
+		if block := bc.nextBlocks.Get(hash); block != nil {
+			return true
+		}
+	}
+
 	data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...))
 	return len(data) != 0
 }
@@ -403,11 +424,15 @@ func (self *ChainManager) GetBlockHashesFromHash(hash common.Hash, max uint64) (
 }
 
 func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
-	/*
-		if block := self.cache.Get(hash); block != nil {
+	if block, ok := self.cache.Get(hash); ok {
+		return block.(*types.Block)
+	}
+
+	if self.nextBlocks != nil {
+		if block := self.nextBlocks.Get(hash); block != nil {
 			return block
 		}
-	*/
+	}
 
 	data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...))
 	if len(data) == 0 {
@@ -418,6 +443,10 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
 		glog.V(logger.Error).Infof("invalid block RLP for hash %x: %v", hash, err)
 		return nil
 	}
+
+	// Add the block to the cache
+	self.cache.Add(hash, (*types.Block)(&block))
+
 	return (*types.Block)(&block)
 }
 
@@ -494,6 +523,31 @@ func (self *ChainManager) procFutureBlocks() {
 	}
 }
 
+func (self *ChainManager) enqueueForWrite(block *types.Block) {
+	self.nextBlocks.Push(block)
+}
+
+func (self *ChainManager) flushQueuedBlocks() {
+	db, batchWrite := self.blockDb.(*ethdb.LDBDatabase)
+	batch := new(leveldb.Batch)
+	self.nextBlocks.Each(func(i int, block *types.Block) {
+		enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
+		key := append(blockHashPre, block.Hash().Bytes()...)
+		if batchWrite {
+			batch.Put(key, rle.Compress(enc))
+		} else {
+			self.blockDb.Put(key, enc)
+		}
+	})
+	if batchWrite {
+		db.LDB().Write(batch, nil)
+	}
+
+	// reset the next blocks cache
+	self.nextBlocks = nil
+
+}
+
 // InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
 // it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
 func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
@@ -503,6 +557,8 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
 	self.chainmu.Lock()
 	defer self.chainmu.Unlock()
 
+	self.nextBlocks = NewBlockCache(len(chain))
+
 	// A queued approach to delivering events. This is generally
 	// faster than direct delivery and requires much less mutex
 	// acquiring.
@@ -520,6 +576,10 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
 	// Start the parallel nonce verifier.
 	go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
 	defer close(nonceQuit)
+	defer self.flushQueuedBlocks()
+
+	defer func() {
+	}()
 
 	txcount := 0
 	for i, block := range chain {
@@ -632,7 +692,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
 		}
 		// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
 		// not in the canonical chain.
-		self.write(block)
+		self.enqueueForWrite(block)
 		// Delete from future blocks
 		self.futureBlocks.Delete(block.Hash())