소스 검색

miner: streaming uncle blocks (#17320)

* miner: stream uncle block

* miner: polish
gary rong 7 년 전
부모
커밋
040aa2bb10
2개의 변경된 파일125개의 추가작업 그리고 50개의 파일을 삭제
  1. 57 42
      miner/worker.go
  2. 68 8
      miner/worker_test.go

+ 57 - 42
miner/worker.go

@@ -213,8 +213,9 @@ type worker struct {
 	running int32 // The indicator whether the consensus engine is running or not.
 
 	// Test hooks
-	newTaskHook      func(*task) // Method to call upon receiving a new sealing task
-	fullTaskInterval func()      // Method to call before pushing the full sealing task
+	newTaskHook  func(*task)      // Method to call upon receiving a new sealing task
+	skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
+	fullTaskHook func()           // Method to call before pushing the full sealing task
 }
 
 func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux) *worker {
@@ -329,8 +330,32 @@ func (w *worker) mainLoop() {
 			w.commitNewWork()
 
 		case ev := <-w.chainSideCh:
+			if _, exist := w.possibleUncles[ev.Block.Hash()]; exist {
+				continue
+			}
 			// Add side block to possible uncle block set.
 			w.possibleUncles[ev.Block.Hash()] = ev.Block
+			// If our mining block contains less than 2 uncle blocks,
+			// add the new uncle block if valid and regenerate a mining block.
+			if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
+				start := time.Now()
+				if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
+					var uncles []*types.Header
+					w.current.uncles.Each(func(item interface{}) bool {
+						hash, ok := item.(common.Hash)
+						if !ok {
+							return false
+						}
+						uncle, exist := w.possibleUncles[hash]
+						if !exist {
+							return false
+						}
+						uncles = append(uncles, uncle.Header())
+						return true
+					})
+					w.commit(uncles, nil, true, start)
+				}
+			}
 
 		case ev := <-w.txsCh:
 			// Apply transactions to the pending state if we're not mining.
@@ -378,6 +403,10 @@ func (w *worker) seal(t *task, stop <-chan struct{}) {
 		res *task
 	)
 
+	if w.skipSealHook != nil && w.skipSealHook(t) {
+		return
+	}
+
 	if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil {
 		log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(),
 			"elapsed", common.PrettyDuration(time.Since(t.createdAt)))
@@ -637,30 +666,9 @@ func (w *worker) commitNewWork() {
 		delete(w.possibleUncles, hash)
 	}
 
-	var (
-		emptyBlock, fullBlock *types.Block
-		emptyState, fullState *state.StateDB
-	)
-
 	// Create an empty block based on temporary copied state for sealing in advance without waiting block
 	// execution finished.
-	emptyState = env.state.Copy()
-	if emptyBlock, err = w.engine.Finalize(w.chain, header, emptyState, nil, uncles, nil); err != nil {
-		log.Error("Failed to finalize block for temporary sealing", "err", err)
-	} else {
-		// Push empty work in advance without applying pending transaction.
-		// The reason is transactions execution can cost a lot and sealer need to
-		// take advantage of this part time.
-		if w.isRunning() {
-			select {
-			case w.taskCh <- &task{receipts: nil, state: emptyState, block: emptyBlock, createdAt: time.Now()}:
-				log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles))
-			case <-w.exitCh:
-				log.Info("Worker has exited")
-				return
-			}
-		}
-	}
+	w.commit(uncles, nil, false, tstart)
 
 	// Fill the block with all available pending transactions.
 	pending, err := w.eth.TxPool().Pending()
@@ -676,31 +684,38 @@ func (w *worker) commitNewWork() {
 	txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending)
 	env.commitTransactions(w.mux, txs, w.chain, w.coinbase)
 
-	// Create the full block to seal with the consensus engine
-	fullState = env.state.Copy()
-	if fullBlock, err = w.engine.Finalize(w.chain, header, fullState, env.txs, uncles, env.receipts); err != nil {
-		log.Error("Failed to finalize block for sealing", "err", err)
-		return
-	}
+	w.commit(uncles, w.fullTaskHook, true, tstart)
+}
+
+// commit runs any post-transaction state modifications, assembles the final block
+// and commits new work if consensus engine is running.
+func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
 	// Deep copy receipts here to avoid interaction between different tasks.
-	cpy := make([]*types.Receipt, len(env.receipts))
-	for i, l := range env.receipts {
-		cpy[i] = new(types.Receipt)
-		*cpy[i] = *l
+	receipts := make([]*types.Receipt, len(w.current.receipts))
+	for i, l := range w.current.receipts {
+		receipts[i] = new(types.Receipt)
+		*receipts[i] = *l
+	}
+	s := w.current.state.Copy()
+	block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
+	if err != nil {
+		return err
 	}
-	// We only care about logging if we're actually mining.
 	if w.isRunning() {
-		if w.fullTaskInterval != nil {
-			w.fullTaskInterval()
+		if interval != nil {
+			interval()
 		}
-
 		select {
-		case w.taskCh <- &task{receipts: cpy, state: fullState, block: fullBlock, createdAt: time.Now()}:
-			w.unconfirmed.Shift(fullBlock.NumberU64() - 1)
-			log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
+		case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
+			w.unconfirmed.Shift(block.NumberU64() - 1)
+			log.Info("Commit new mining work", "number", block.Number(), "txs", w.current.tcount, "uncles", len(uncles),
+				"elapsed", common.PrettyDuration(time.Since(start)))
 		case <-w.exitCh:
 			log.Info("Worker has exited")
 		}
 	}
-	w.updateSnapshot()
+	if update {
+		w.updateSnapshot()
+	}
+	return nil
 }

+ 68 - 8
miner/worker_test.go

@@ -59,7 +59,7 @@ func init() {
 	ethashChainConfig = params.TestChainConfig
 	cliqueChainConfig = params.TestChainConfig
 	cliqueChainConfig.Clique = &params.CliqueConfig{
-		Period: 1,
+		Period: 10,
 		Epoch:  30000,
 	}
 	tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, testBankKey)
@@ -74,6 +74,7 @@ type testWorkerBackend struct {
 	txPool     *core.TxPool
 	chain      *core.BlockChain
 	testTxFeed event.Feed
+	uncleBlock *types.Block
 }
 
 func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) *testWorkerBackend {
@@ -93,15 +94,19 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
 	default:
 		t.Fatal("unexpect consensus engine type")
 	}
-	gspec.MustCommit(db)
+	genesis := gspec.MustCommit(db)
 
 	chain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
 	txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain)
+	blocks, _ := core.GenerateChain(chainConfig, genesis, engine, db, 1, func(i int, gen *core.BlockGen) {
+		gen.SetCoinbase(acc1Addr)
+	})
 
 	return &testWorkerBackend{
-		db:     db,
-		chain:  chain,
-		txPool: txpool,
+		db:         db,
+		chain:      chain,
+		txPool:     txpool,
+		uncleBlock: blocks[0],
 	}
 }
 
@@ -188,7 +193,7 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens
 			taskCh <- struct{}{}
 		}
 	}
-	w.fullTaskInterval = func() {
+	w.fullTaskHook = func() {
 		time.Sleep(100 * time.Millisecond)
 	}
 
@@ -202,11 +207,66 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens
 
 	w.start()
 	for i := 0; i < 2; i += 1 {
-		to := time.NewTimer(time.Second)
 		select {
 		case <-taskCh:
-		case <-to.C:
+		case <-time.NewTimer(time.Second).C:
 			t.Error("new task timeout")
 		}
 	}
 }
+
+func TestStreamUncleBlock(t *testing.T) {
+	ethash := ethash.NewFaker()
+	defer ethash.Close()
+
+	w, b := newTestWorker(t, ethashChainConfig, ethash)
+	defer w.close()
+
+	var taskCh = make(chan struct{})
+
+	taskIndex := 0
+	w.newTaskHook = func(task *task) {
+		if task.block.NumberU64() == 1 {
+			if taskIndex == 2 {
+				has := task.block.Header().UncleHash
+				want := types.CalcUncleHash([]*types.Header{b.uncleBlock.Header()})
+				if has != want {
+					t.Errorf("uncle hash mismatch, has %s, want %s", has.Hex(), want.Hex())
+				}
+			}
+			taskCh <- struct{}{}
+			taskIndex += 1
+		}
+	}
+	w.skipSealHook = func(task *task) bool {
+		return true
+	}
+	w.fullTaskHook = func() {
+		time.Sleep(100 * time.Millisecond)
+	}
+
+	// Ensure worker has finished initialization
+	for {
+		b := w.pendingBlock()
+		if b != nil && b.NumberU64() == 1 {
+			break
+		}
+	}
+
+	w.start()
+	// Ignore the first two works
+	for i := 0; i < 2; i += 1 {
+		select {
+		case <-taskCh:
+		case <-time.NewTimer(time.Second).C:
+			t.Error("new task timeout")
+		}
+	}
+	b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}})
+
+	select {
+	case <-taskCh:
+	case <-time.NewTimer(time.Second).C:
+		t.Error("new task timeout")
+	}
+}