Forráskód Böngészése

core: concurrent database reinit from freezer dump

* core: reinit chain from freezer in batches

* core/rawdb: concurrent database reinit from freezer dump

* core/rawdb: reinit from freezer in sequential order
Péter Szilágyi 6 éve
szülő
commit
fc85777a21

+ 23 - 2
common/prque/prque.go

@@ -1,5 +1,20 @@
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
 // This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque".
 
+// Package prque implements a priority queue data structure supporting arbitrary
+// value types and int64 priorities.
+//
+// If you would like to use a min-priority queue, simply negate the priorities.
+//
+// Internally the queue is based on the standard heap package working on a
+// sortable version of the block based stack.
 package prque
 
 import (
@@ -11,8 +26,8 @@ type Prque struct {
 	cont *sstack
 }
 
-// Creates a new priority queue.
-func New(setIndex setIndexCallback) *Prque {
+// New creates a new priority queue.
+func New(setIndex SetIndexCallback) *Prque {
 	return &Prque{newSstack(setIndex)}
 }
 
@@ -21,6 +36,12 @@ func (p *Prque) Push(data interface{}, priority int64) {
 	heap.Push(p.cont, &item{data, priority})
 }
 
+// Peek returns the value with the greates priority but does not pop it off.
+func (p *Prque) Peek() (interface{}, int64) {
+	item := p.cont.blocks[0][0]
+	return item.value, item.priority
+}
+
 // Pops the value with the greates priority off the stack and returns it.
 // Currently no shrinking is done.
 func (p *Prque) Pop() (interface{}, int64) {

+ 13 - 5
common/prque/sstack.go

@@ -1,3 +1,11 @@
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
 // This is a duplicated and slightly modified version of "gopkg.in/karalabe/cookiejar.v2/collections/prque".
 
 package prque
@@ -14,16 +22,16 @@ type item struct {
 	priority int64
 }
 
-// setIndexCallback is called when the element is moved to a new index.
-// Providing setIndexCallback is optional, it is needed only if the application needs
+// SetIndexCallback is called when the element is moved to a new index.
+// Providing SetIndexCallback is optional, it is needed only if the application needs
 // to delete elements other than the top one.
-type setIndexCallback func(a interface{}, i int)
+type SetIndexCallback func(data interface{}, index int)
 
 // Internal sortable stack data structure. Implements the Push and Pop ops for
 // the stack (heap) functionality and the Len, Less and Swap methods for the
 // sortability requirements of the heaps.
 type sstack struct {
-	setIndex setIndexCallback
+	setIndex SetIndexCallback
 	size     int
 	capacity int
 	offset   int
@@ -33,7 +41,7 @@ type sstack struct {
 }
 
 // Creates a new, empty stack.
-func newSstack(setIndex setIndexCallback) *sstack {
+func newSstack(setIndex SetIndexCallback) *sstack {
 	result := new(sstack)
 	result.setIndex = setIndex
 	result.active = make([]*item, blockSize)

+ 6 - 37
core/blockchain.go

@@ -220,47 +220,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 	}
 	// Initialize the chain with ancient data if it isn't empty.
 	if bc.empty() {
-		if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
-			var (
-				start  = time.Now()
-				logged time.Time
-			)
-			for i := uint64(0); i < frozen; i++ {
-				// Inject hash<->number mapping.
-				hash := rawdb.ReadCanonicalHash(bc.db, i)
-				if hash == (common.Hash{}) {
-					return nil, errors.New("broken ancient database")
-				}
-				rawdb.WriteHeaderNumber(bc.db, hash, i)
-
-				// Inject txlookup indexes.
-				block := rawdb.ReadBlock(bc.db, hash, i)
-				if block == nil {
-					return nil, errors.New("broken ancient database")
-				}
-				rawdb.WriteTxLookupEntries(bc.db, block)
-
-				// If we've spent too much time already, notify the user of what we're doing
-				if time.Since(logged) > 8*time.Second {
-					log.Info("Initializing chain from ancient data", "number", i, "hash", hash, "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
-					logged = time.Now()
-				}
-			}
-			hash := rawdb.ReadCanonicalHash(bc.db, frozen-1)
-			rawdb.WriteHeadHeaderHash(bc.db, hash)
-			rawdb.WriteHeadFastBlockHash(bc.db, hash)
-
-			// The first thing the node will do is reconstruct the verification data for
-			// the head block (ethash cache or clique voting snapshot). Might as well do
-			// it in advance.
-			bc.engine.VerifyHeader(bc, rawdb.ReadHeader(bc.db, hash, frozen-1), true)
-
-			log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
-		}
+		rawdb.InitDatabaseFromFreezer(bc.db)
 	}
 	if err := bc.loadLastState(); err != nil {
 		return nil, err
 	}
+	// The first thing the node will do is reconstruct the verification data for
+	// the head block (ethash cache or clique voting snapshot). Might as well do
+	// it in advance.
+	bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)
+
 	if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
 		var (
 			needRewind bool

+ 2 - 1
core/rawdb/accessors_indexes.go

@@ -55,8 +55,9 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
 // WriteTxLookupEntries stores a positional metadata for every transaction from
 // a block, enabling hash based transaction and receipt lookups.
 func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) {
+	number := block.Number().Bytes()
 	for _, tx := range block.Transactions() {
-		if err := db.Put(txLookupKey(tx.Hash()), block.Number().Bytes()); err != nil {
+		if err := db.Put(txLookupKey(tx.Hash()), number); err != nil {
 			log.Crit("Failed to store transaction lookup entry", "err", err)
 		}
 	}

+ 127 - 0
core/rawdb/freezer_reinit.go

@@ -0,0 +1,127 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rawdb
+
+import (
+	"errors"
+	"runtime"
+	"sync/atomic"
+	"time"
+
+	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/common/prque"
+	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/ethdb"
+	"github.com/ethereum/go-ethereum/log"
+)
+
+// InitDatabaseFromFreezer reinitializes an empty database from a previous batch
+// of frozen ancient blocks. The method iterates over all the frozen blocks and
+// injects into the database the block hash->number mappings and the transaction
+// lookup entries.
+func InitDatabaseFromFreezer(db ethdb.Database) error {
+	// If we can't access the freezer or it's empty, abort
+	frozen, err := db.Ancients()
+	if err != nil || frozen == 0 {
+		return err
+	}
+	// Blocks previously frozen, iterate over- and hash them concurrently
+	var (
+		number  = ^uint64(0) // -1
+		results = make(chan *types.Block, 4*runtime.NumCPU())
+	)
+	abort := make(chan struct{})
+	defer close(abort)
+
+	for i := 0; i < runtime.NumCPU(); i++ {
+		go func() {
+			for {
+				// Fetch the next task number, terminating if everything's done
+				n := atomic.AddUint64(&number, 1)
+				if n >= frozen {
+					return
+				}
+				// Retrieve the block from the freezer (no need for the hash, we pull by
+				// number from the freezer). If successful, pre-cache the block hash and
+				// the individual transaction hashes for storing into the database.
+				block := ReadBlock(db, common.Hash{}, n)
+				if block != nil {
+					block.Hash()
+					for _, tx := range block.Transactions() {
+						tx.Hash()
+					}
+				}
+				// Feed the block to the aggregator, or abort on interrupt
+				select {
+				case results <- block:
+				case <-abort:
+					return
+				}
+			}
+		}()
+	}
+	// Reassemble the blocks into a contiguous stream and push them out to disk
+	var (
+		queue = prque.New(nil)
+		next  = int64(0)
+
+		batch  = db.NewBatch()
+		start  = time.Now()
+		logged time.Time
+	)
+	for i := uint64(0); i < frozen; i++ {
+		// Retrieve the next result and bail if it's nil
+		block := <-results
+		if block == nil {
+			return errors.New("broken ancient database")
+		}
+		// Push the block into the import queue and process contiguous ranges
+		queue.Push(block, -int64(block.NumberU64()))
+		for !queue.Empty() {
+			// If the next available item is gapped, return
+			if _, priority := queue.Peek(); -priority != next {
+				break
+			}
+			// Next block available, pop it off and index it
+			block = queue.PopItem().(*types.Block)
+			next++
+
+			// Inject hash<->number mapping and txlookup indexes
+			WriteHeaderNumber(batch, block.Hash(), block.NumberU64())
+			WriteTxLookupEntries(batch, block)
+
+			// If enough data was accumulated in memory or we're at the last block, dump to disk
+			if batch.ValueSize() > ethdb.IdealBatchSize || uint64(next) == frozen {
+				if err := batch.Write(); err != nil {
+					return err
+				}
+				batch.Reset()
+			}
+			// If we've spent too much time already, notify the user of what we're doing
+			if time.Since(logged) > 8*time.Second {
+				log.Info("Initializing chain from ancient data", "number", block.Number(), "hash", block.Hash(), "total", frozen-1, "elapsed", common.PrettyDuration(time.Since(start)))
+				logged = time.Now()
+			}
+		}
+	}
+	hash := ReadCanonicalHash(db, frozen-1)
+	WriteHeadHeaderHash(db, hash)
+	WriteHeadFastBlockHash(db, hash)
+
+	log.Info("Initialized chain from ancient data", "number", frozen-1, "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
+	return nil
+}