sync_bloom.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package trie
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "math"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/metrics"
  28. "github.com/steakknife/bloomfilter"
  29. )
  30. var (
  31. bloomAddMeter = metrics.NewRegisteredMeter("trie/bloom/add", nil)
  32. bloomLoadMeter = metrics.NewRegisteredMeter("trie/bloom/load", nil)
  33. bloomTestMeter = metrics.NewRegisteredMeter("trie/bloom/test", nil)
  34. bloomMissMeter = metrics.NewRegisteredMeter("trie/bloom/miss", nil)
  35. bloomFaultMeter = metrics.NewRegisteredMeter("trie/bloom/fault", nil)
  36. bloomErrorGauge = metrics.NewRegisteredGauge("trie/bloom/error", nil)
  37. )
  38. // syncBloomHasher is a wrapper around a byte blob to satisfy the interface API
  39. // requirements of the bloom library used. It's used to convert a trie hash into
  40. // a 64 bit mini hash.
  41. type syncBloomHasher []byte
  42. func (f syncBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") }
  43. func (f syncBloomHasher) Sum(b []byte) []byte { panic("not implemented") }
  44. func (f syncBloomHasher) Reset() { panic("not implemented") }
  45. func (f syncBloomHasher) BlockSize() int { panic("not implemented") }
  46. func (f syncBloomHasher) Size() int { return 8 }
  47. func (f syncBloomHasher) Sum64() uint64 { return binary.BigEndian.Uint64(f) }
  48. // SyncBloom is a bloom filter used during fast sync to quickly decide if a trie
  49. // node already exists on disk or not. It self populates from the provided disk
  50. // database on creation in a background thread and will only start returning live
  51. // results once that's finished.
  52. type SyncBloom struct {
  53. bloom *bloomfilter.Filter
  54. inited uint32
  55. closer sync.Once
  56. closed uint32
  57. pend sync.WaitGroup
  58. }
  59. // NewSyncBloom creates a new bloom filter of the given size (in megabytes) and
  60. // initializes it from the database. The bloom is hard coded to use 3 filters.
  61. func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom {
  62. // Create the bloom filter to track known trie nodes
  63. bloom, err := bloomfilter.New(memory*1024*1024*8, 3)
  64. if err != nil {
  65. panic(fmt.Sprintf("failed to create bloom: %v", err)) // Can't happen, here for sanity
  66. }
  67. log.Info("Allocated fast sync bloom", "size", common.StorageSize(memory*1024*1024))
  68. // Assemble the fast sync bloom and init it from previous sessions
  69. b := &SyncBloom{
  70. bloom: bloom,
  71. }
  72. b.pend.Add(2)
  73. go func() {
  74. defer b.pend.Done()
  75. b.init(database)
  76. }()
  77. go func() {
  78. defer b.pend.Done()
  79. b.meter()
  80. }()
  81. return b
  82. }
  83. // init iterates over the database, pushing every trie hash into the bloom filter.
  84. func (b *SyncBloom) init(database ethdb.Iteratee) {
  85. // Iterate over the database, but restart every now and again to avoid holding
  86. // a persistent snapshot since fast sync can push a ton of data concurrently,
  87. // bloating the disk.
  88. //
  89. // Note, this is fine, because everything inserted into leveldb by fast sync is
  90. // also pushed into the bloom directly, so we're not missing anything when the
  91. // iterator is swapped out for a new one.
  92. it := database.NewIterator()
  93. var (
  94. start = time.Now()
  95. swap = time.Now()
  96. )
  97. for it.Next() && atomic.LoadUint32(&b.closed) == 0 {
  98. // If the database entry is a trie node, add it to the bloom
  99. if key := it.Key(); len(key) == common.HashLength {
  100. b.bloom.Add(syncBloomHasher(key))
  101. bloomLoadMeter.Mark(1)
  102. }
  103. // If enough time elapsed since the last iterator swap, restart
  104. if time.Since(swap) > 8*time.Second {
  105. key := common.CopyBytes(it.Key())
  106. it.Release()
  107. it = database.NewIteratorWithStart(key)
  108. log.Info("Initializing fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", common.PrettyDuration(time.Since(start)))
  109. swap = time.Now()
  110. }
  111. }
  112. it.Release()
  113. // Mark the bloom filter inited and return
  114. log.Info("Initialized fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", common.PrettyDuration(time.Since(start)))
  115. atomic.StoreUint32(&b.inited, 1)
  116. }
  117. // meter periodically recalculates the false positive error rate of the bloom
  118. // filter and reports it in a metric.
  119. func (b *SyncBloom) meter() {
  120. for {
  121. // Report the current error ration. No floats, lame, scale it up.
  122. bloomErrorGauge.Update(int64(b.errorRate() * 100000))
  123. // Wait one second, but check termination more frequently
  124. for i := 0; i < 10; i++ {
  125. if atomic.LoadUint32(&b.closed) == 1 {
  126. return
  127. }
  128. time.Sleep(100 * time.Millisecond)
  129. }
  130. }
  131. }
  132. // Close terminates any background initializer still running and releases all the
  133. // memory allocated for the bloom.
  134. func (b *SyncBloom) Close() error {
  135. b.closer.Do(func() {
  136. // Ensure the initializer is stopped
  137. atomic.StoreUint32(&b.closed, 1)
  138. b.pend.Wait()
  139. // Wipe the bloom, but mark it "uninited" just in case someone attempts an access
  140. log.Info("Deallocated fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate())
  141. atomic.StoreUint32(&b.inited, 0)
  142. b.bloom = nil
  143. })
  144. return nil
  145. }
  146. // Add inserts a new trie node hash into the bloom filter.
  147. func (b *SyncBloom) Add(hash []byte) {
  148. if atomic.LoadUint32(&b.closed) == 1 {
  149. return
  150. }
  151. b.bloom.Add(syncBloomHasher(hash))
  152. bloomAddMeter.Mark(1)
  153. }
  154. // Contains tests if the bloom filter contains the given hash:
  155. // - false: the bloom definitely does not contain hash
  156. // - true: the bloom maybe contains hash
  157. //
  158. // While the bloom is being initialized, any query will return true.
  159. func (b *SyncBloom) Contains(hash []byte) bool {
  160. bloomTestMeter.Mark(1)
  161. if atomic.LoadUint32(&b.inited) == 0 {
  162. // We didn't load all the trie nodes from the previous run of Geth yet. As
  163. // such, we can't say for sure if a hash is not present for anything. Until
  164. // the init is done, we're faking "possible presence" for everything.
  165. return true
  166. }
  167. // Bloom initialized, check the real one and report any successful misses
  168. maybe := b.bloom.Contains(syncBloomHasher(hash))
  169. if !maybe {
  170. bloomMissMeter.Mark(1)
  171. }
  172. return maybe
  173. }
  174. // errorRate calculates the probability of a random containment test returning a
  175. // false positive.
  176. //
  177. // We're calculating it ourselves because the bloom library we used missed a
  178. // parentheses in the formula and calculates it wrong. And it's discontinued...
  179. func (b *SyncBloom) errorRate() float64 {
  180. k := float64(b.bloom.K())
  181. n := float64(b.bloom.N())
  182. m := float64(b.bloom.M())
  183. return math.Pow(1.0-math.Exp((-k)*(n+0.5)/(m-1)), k)
  184. }