Эх сурвалжийг харах

core: added a new RemovedLogEvent

When a chain reorganisation occurs we collect the logs that were deleted
during the chain reorganisation. The removed logs are posted to the
event mux indicating that those were deleted during the reorg.
Jeffrey Wilcke 10 жил өмнө
parent
commit
9901a40f04

+ 20 - 2
core/blockchain.go

@@ -1240,6 +1240,17 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 		oldStart    = oldBlock
 		oldStart    = oldBlock
 		newStart    = newBlock
 		newStart    = newBlock
 		deletedTxs  types.Transactions
 		deletedTxs  types.Transactions
+		deletedLogs vm.Logs
+		// collectLogs collects the logs that were generated during the
+		// processing of the block that corresponds with the given hash.
+		// These logs are later announced as deleted.
+		collectLogs = func(h common.Hash) {
+			// Coalesce logs
+			receipts := GetBlockReceipts(self.chainDb, h)
+			for _, receipt := range receipts {
+				deletedLogs = append(deletedLogs, receipt.Logs...)
+			}
+		}
 	)
 	)
 
 
 	// first reduce whoever is higher bound
 	// first reduce whoever is higher bound
@@ -1247,6 +1258,8 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 		// reduce old chain
 		// reduce old chain
 		for oldBlock = oldBlock; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) {
 		for oldBlock = oldBlock; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) {
 			deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
 			deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
+
+			collectLogs(oldBlock.Hash())
 		}
 		}
 	} else {
 	} else {
 		// reduce new chain and append new chain blocks for inserting later on
 		// reduce new chain and append new chain blocks for inserting later on
@@ -1269,6 +1282,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 		}
 		}
 		newChain = append(newChain, newBlock)
 		newChain = append(newChain, newBlock)
 		deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
 		deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
+		collectLogs(oldBlock.Hash())
 
 
 		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash())
 		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash())
 		if oldBlock == nil {
 		if oldBlock == nil {
@@ -1302,7 +1316,6 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 		if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
 		if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
 			return err
 			return err
 		}
 		}
-
 		addedTxs = append(addedTxs, block.Transactions()...)
 		addedTxs = append(addedTxs, block.Transactions()...)
 	}
 	}
 
 
@@ -1316,7 +1329,12 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	}
 	}
 	// Must be posted in a goroutine because of the transaction pool trying
 	// Must be posted in a goroutine because of the transaction pool trying
 	// to acquire the chain manager lock
 	// to acquire the chain manager lock
-	go self.eventMux.Post(RemovedTransactionEvent{diff})
+	if len(diff) > 0 {
+		go self.eventMux.Post(RemovedTransactionEvent{diff})
+	}
+	if len(deletedLogs) > 0 {
+		go self.eventMux.Post(RemovedLogEvent{deletedLogs})
+	}
 
 
 	return nil
 	return nil
 }
 }

+ 43 - 0
core/blockchain_test.go

@@ -963,3 +963,46 @@ func TestChainTxReorgs(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestLogReorgs(t *testing.T) {
+	params.MinGasLimit = big.NewInt(125000)      // Minimum the gas limit may ever be.
+	params.GenesisGasLimit = big.NewInt(3141592) // Gas limit of the Genesis block.
+
+	var (
+		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		addr1   = crypto.PubkeyToAddress(key1.PublicKey)
+		db, _   = ethdb.NewMemDatabase()
+		// this code generates a log
+		code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
+	)
+	genesis := WriteGenesisBlockForTesting(db,
+		GenesisAccount{addr1, big.NewInt(10000000000000)},
+	)
+
+	evmux := &event.TypeMux{}
+	blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
+
+	subs := evmux.Subscribe(RemovedLogEvent{})
+	chain, _ := GenerateChain(genesis, db, 2, func(i int, gen *BlockGen) {
+		if i == 1 {
+			tx, err := types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code).SignECDSA(key1)
+			if err != nil {
+				t.Fatalf("failed to create tx: %v", err)
+			}
+			gen.AddTx(tx)
+		}
+	})
+	if _, err := blockchain.InsertChain(chain); err != nil {
+		t.Fatalf("failed to insert chain: %v", err)
+	}
+
+	chain, _ = GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) {})
+	if _, err := blockchain.InsertChain(chain); err != nil {
+		t.Fatalf("failed to insert forked chain: %v", err)
+	}
+
+	ev := <-subs.Chan()
+	if len(ev.Data.(RemovedLogEvent).Logs) == 0 {
+		t.Error("expected logs")
+	}
+}

+ 2 - 2
core/chain_makers.go

@@ -90,6 +90,7 @@ func (b *BlockGen) AddTx(tx *types.Transaction) {
 	if b.gasPool == nil {
 	if b.gasPool == nil {
 		b.SetCoinbase(common.Address{})
 		b.SetCoinbase(common.Address{})
 	}
 	}
+	b.statedb.StartRecord(tx.Hash(), common.Hash{}, len(b.txs))
 	_, gas, err := ApplyMessage(NewEnv(b.statedb, nil, tx, b.header), tx, b.gasPool)
 	_, gas, err := ApplyMessage(NewEnv(b.statedb, nil, tx, b.header), tx, b.gasPool)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
@@ -97,8 +98,7 @@ func (b *BlockGen) AddTx(tx *types.Transaction) {
 	root := b.statedb.IntermediateRoot()
 	root := b.statedb.IntermediateRoot()
 	b.header.GasUsed.Add(b.header.GasUsed, gas)
 	b.header.GasUsed.Add(b.header.GasUsed, gas)
 	receipt := types.NewReceipt(root.Bytes(), b.header.GasUsed)
 	receipt := types.NewReceipt(root.Bytes(), b.header.GasUsed)
-	logs := b.statedb.GetLogs(tx.Hash())
-	receipt.Logs = logs
+	receipt.Logs = b.statedb.GetLogs(tx.Hash())
 	receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
 	receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
 	b.txs = append(b.txs, tx)
 	b.txs = append(b.txs, tx)
 	b.receipts = append(b.receipts, receipt)
 	b.receipts = append(b.receipts, receipt)

+ 3 - 0
core/events.go

@@ -39,6 +39,9 @@ type NewMinedBlockEvent struct{ Block *types.Block }
 // RemovedTransactionEvent is posted when a reorg happens
 // RemovedTransactionEvent is posted when a reorg happens
 type RemovedTransactionEvent struct{ Txs types.Transactions }
 type RemovedTransactionEvent struct{ Txs types.Transactions }
 
 
+// RemovedLogEvent is posted when a reorg happens
+type RemovedLogEvent struct{ Logs vm.Logs }
+
 // ChainSplit is posted when a new head is detected
 // ChainSplit is posted when a new head is detected
 type ChainSplitEvent struct {
 type ChainSplitEvent struct {
 	Block *types.Block
 	Block *types.Block