Browse Source

parallel bloom calculation (#445)

* parallel bloom calculation

* indent

* add condition if bloomJobs not nil

* add handler for worker

* fix format

* bloomWorker should exit when all txs have been processed

* rename BloomPair => BloomHash

* add size to map

* rename & unique variable

* bloomJobs => bloomProcessors

* fix

* only assign bloom if empty

* abstraction method for processing receipt bloom

* remove duplicate receipt_processor

* rename Processor

* fix  ReceiptProcessor

* fix ReceiptBloomGenertor typo

* reduce worker to 1

* remove empty wg

* add defence code to check if channel is closed

* remove nil

* format fix

* remove thread pool

* use max 100 worker capacity

* reduce worker size

* refactor startWorker
Steven Tran 4 years ago
parent
commit
31463f8dd1

+ 1 - 1
core/chain_makers.go

@@ -104,7 +104,7 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) {
 		b.SetCoinbase(common.Address{})
 	}
 	b.statedb.Prepare(tx.Hash(), common.Hash{}, len(b.txs))
-	receipt, err := ApplyTransaction(b.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vm.Config{})
+	receipt, err := ApplyTransaction(b.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vm.Config{}, NewReceiptBloomGenerator())
 	if err != nil {
 		panic(err)
 	}

+ 66 - 0
core/receipt_processor.go

@@ -0,0 +1,66 @@
+package core
+
+import (
+	"bytes"
+	"sync"
+
+	"github.com/ethereum/go-ethereum/core/types"
+)
+
+type ReceiptProcessor interface {
+	Apply(receipt *types.Receipt)
+}
+
+var (
+	_ ReceiptProcessor = (*ReceiptBloomGenerator)(nil)
+	_ ReceiptProcessor = (*AsyncReceiptBloomGenerator)(nil)
+)
+
+func NewReceiptBloomGenerator() *ReceiptBloomGenerator {
+	return &ReceiptBloomGenerator{}
+}
+
+type ReceiptBloomGenerator struct {
+}
+
+func (p *ReceiptBloomGenerator) Apply(receipt *types.Receipt) {
+	receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
+}
+
+func NewAsyncReceiptBloomGenerator(txNums int) *AsyncReceiptBloomGenerator {
+	generator := &AsyncReceiptBloomGenerator{
+		receipts: make(chan *types.Receipt, txNums),
+	}
+	generator.startWorker()
+	return generator
+}
+
+type AsyncReceiptBloomGenerator struct {
+	receipts chan *types.Receipt
+	wg       sync.WaitGroup
+	isClosed bool
+}
+
+func (p *AsyncReceiptBloomGenerator) startWorker() {
+	p.wg.Add(1)
+	go func() {
+		defer p.wg.Done()
+		for receipt := range p.receipts {
+			if receipt != nil && bytes.Equal(receipt.Bloom[:], types.EmptyBloom[:]) {
+				receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
+			}
+		}
+	}()
+}
+
+func (p *AsyncReceiptBloomGenerator) Apply(receipt *types.Receipt) {
+	if !p.isClosed {
+		p.receipts <- receipt
+	}
+}
+
+func (p *AsyncReceiptBloomGenerator) Close() {
+	close(p.receipts)
+	p.isClosed = true
+	p.wg.Wait()
+}

+ 14 - 6
core/state_processor.go

@@ -390,9 +390,14 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
 	blockContext := NewEVMBlockContext(header, p.bc, nil)
 	vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
 
+	txNum := len(block.Transactions())
 	// Iterate over and process the individual transactions
 	posa, isPoSA := p.engine.(consensus.PoSA)
-	commonTxs := make([]*types.Transaction, 0, len(block.Transactions()))
+	commonTxs := make([]*types.Transaction, 0, txNum)
+
+	// initilise bloom processors
+	bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
+
 	// usually do have two tx, one for validator set contract, another for system reward contract.
 	systemTxs := make([]*types.Transaction, 0, 2)
 	for i, tx := range block.Transactions() {
@@ -410,7 +415,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
 			return statedb, nil, nil, 0, err
 		}
 		statedb.Prepare(tx.Hash(), block.Hash(), i)
-		receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv)
+		receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessors)
 		if err != nil {
 			return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
 		}
@@ -418,6 +423,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
 		commonTxs = append(commonTxs, tx)
 		receipts = append(receipts, receipt)
 	}
+	bloomProcessors.Close()
 
 	// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
 	err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas)
@@ -431,7 +437,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
 	return statedb, receipts, allLogs, *usedGas, nil
 }
 
-func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) {
+func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
 	// Create a new context to be used in the EVM environment.
 	txContext := NewEVMTxContext(msg)
 	evm.Reset(txContext, statedb)
@@ -469,10 +475,12 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon
 
 	// Set the receipt logs and create the bloom filter.
 	receipt.Logs = statedb.GetLogs(tx.Hash())
-	receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
 	receipt.BlockHash = statedb.BlockHash()
 	receipt.BlockNumber = header.Number
 	receipt.TransactionIndex = uint(statedb.TxIndex())
+	for _, receiptProcessor := range receiptProcessors {
+		receiptProcessor.Apply(receipt)
+	}
 	return receipt, err
 }
 
@@ -480,7 +488,7 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon
 // and uses the input parameters for its environment. It returns the receipt
 // for the transaction, gas used and an error if the transaction failed,
 // indicating the block was invalid.
-func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, error) {
+func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, receiptProcessors ...ReceiptProcessor) (*types.Receipt, error) {
 	msg, err := tx.AsMessage(types.MakeSigner(config, header.Number))
 	if err != nil {
 		return nil, err
@@ -493,5 +501,5 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo
 		vm.EVMInterpreterPool.Put(ite)
 		vm.EvmPool.Put(vmenv)
 	}()
-	return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv)
+	return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv, receiptProcessors...)
 }

+ 2 - 0
core/types/bloom9.go

@@ -37,6 +37,8 @@ const (
 	BloomBitLength = 8 * BloomByteLength
 )
 
+var EmptyBloom = Bloom{}
+
 // Bloom represents a 2048 bit bloom filter.
 type Bloom [BloomByteLength]byte
 

+ 4 - 0
core/types/transaction.go

@@ -479,6 +479,10 @@ func (t *TransactionsByPriceAndNonce) Pop() {
 	heap.Pop(&t.heads)
 }
 
+func (t *TransactionsByPriceAndNonce) CurrentSize() int {
+	return len(t.heads)
+}
+
 // Message is a fully derived transaction and implements core.Message
 //
 // NOTE: In a future PR this will be removed.

+ 1 - 1
eth/catalyst/api.go

@@ -79,7 +79,7 @@ type blockExecutionEnv struct {
 
 func (env *blockExecutionEnv) commitTransaction(tx *types.Transaction, coinbase common.Address) error {
 	vmconfig := *env.chain.GetVMConfig()
-	receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmconfig)
+	receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmconfig, core.NewReceiptBloomGenerator())
 	if err != nil {
 		return err
 	}

+ 12 - 3
miner/worker.go

@@ -736,10 +736,10 @@ func (w *worker) updateSnapshot() {
 	w.snapshotState = w.current.state.Copy()
 }
 
-func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
+func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address, receiptProcessors ...core.ReceiptProcessor) ([]*types.Log, error) {
 	snap := w.current.state.Snapshot()
 
-	receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
+	receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig(), receiptProcessors...)
 	if err != nil {
 		w.current.state.RevertToSnapshot(snap)
 		return nil, err
@@ -769,6 +769,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
 		log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver)
 		defer stopTimer.Stop()
 	}
+
+	// initilise bloom processors
+	processorCapacity := 100
+	if txs.CurrentSize() < processorCapacity {
+		processorCapacity = txs.CurrentSize()
+	}
+	bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)
+
 LOOP:
 	for {
 		// In the following three cases, we will interrupt the execution of the transaction.
@@ -824,7 +832,7 @@ LOOP:
 		// Start executing the transaction
 		w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)
 
-		logs, err := w.commitTransaction(tx, coinbase)
+		logs, err := w.commitTransaction(tx, coinbase, bloomProcessors)
 		switch {
 		case errors.Is(err, core.ErrGasLimitReached):
 			// Pop the current out-of-gas transaction without shifting in the next from the account
@@ -859,6 +867,7 @@ LOOP:
 			txs.Shift()
 		}
 	}
+	bloomProcessors.Close()
 
 	if !w.isRunning() && len(coalescedLogs) > 0 {
 		// We don't push the pendingLogsEvent while we are mining. The reason is that