receipt_processor.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package core
  2. import (
  3. "bytes"
  4. "sync"
  5. "github.com/ethereum/go-ethereum/core/types"
  6. )
  7. type ReceiptProcessor interface {
  8. Apply(receipt *types.Receipt)
  9. }
  10. var (
  11. _ ReceiptProcessor = (*ReceiptBloomGenerator)(nil)
  12. _ ReceiptProcessor = (*AsyncReceiptBloomGenerator)(nil)
  13. )
  14. func NewReceiptBloomGenerator() *ReceiptBloomGenerator {
  15. return &ReceiptBloomGenerator{}
  16. }
  17. type ReceiptBloomGenerator struct {
  18. }
  19. func (p *ReceiptBloomGenerator) Apply(receipt *types.Receipt) {
  20. receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
  21. }
  22. func NewAsyncReceiptBloomGenerator(txNums int) *AsyncReceiptBloomGenerator {
  23. generator := &AsyncReceiptBloomGenerator{
  24. receipts: make(chan *types.Receipt, txNums),
  25. }
  26. generator.startWorker()
  27. return generator
  28. }
  29. type AsyncReceiptBloomGenerator struct {
  30. receipts chan *types.Receipt
  31. wg sync.WaitGroup
  32. isClosed bool
  33. }
  34. func (p *AsyncReceiptBloomGenerator) startWorker() {
  35. p.wg.Add(1)
  36. go func() {
  37. defer p.wg.Done()
  38. for receipt := range p.receipts {
  39. if receipt != nil && bytes.Equal(receipt.Bloom[:], types.EmptyBloom[:]) {
  40. receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
  41. }
  42. }
  43. }()
  44. }
  45. func (p *AsyncReceiptBloomGenerator) Apply(receipt *types.Receipt) {
  46. if !p.isClosed {
  47. p.receipts <- receipt
  48. }
  49. }
  50. func (p *AsyncReceiptBloomGenerator) Close() {
  51. close(p.receipts)
  52. p.isClosed = true
  53. p.wg.Wait()
  54. }