sync_bloom.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package trie
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "math"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/ethdb"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/metrics"
  29. "github.com/steakknife/bloomfilter"
  30. )
  31. var (
  32. bloomAddMeter = metrics.NewRegisteredMeter("trie/bloom/add", nil)
  33. bloomLoadMeter = metrics.NewRegisteredMeter("trie/bloom/load", nil)
  34. bloomTestMeter = metrics.NewRegisteredMeter("trie/bloom/test", nil)
  35. bloomMissMeter = metrics.NewRegisteredMeter("trie/bloom/miss", nil)
  36. bloomFaultMeter = metrics.NewRegisteredMeter("trie/bloom/fault", nil)
  37. bloomErrorGauge = metrics.NewRegisteredGauge("trie/bloom/error", nil)
  38. )
  39. // syncBloomHasher is a wrapper around a byte blob to satisfy the interface API
  40. // requirements of the bloom library used. It's used to convert a trie hash or
  41. // contract code hash into a 64 bit mini hash.
  42. type syncBloomHasher []byte
  43. func (f syncBloomHasher) Write(p []byte) (n int, err error) { panic("not implemented") }
  44. func (f syncBloomHasher) Sum(b []byte) []byte { panic("not implemented") }
  45. func (f syncBloomHasher) Reset() { panic("not implemented") }
  46. func (f syncBloomHasher) BlockSize() int { panic("not implemented") }
  47. func (f syncBloomHasher) Size() int { return 8 }
  48. func (f syncBloomHasher) Sum64() uint64 { return binary.BigEndian.Uint64(f) }
  49. // SyncBloom is a bloom filter used during fast sync to quickly decide if a trie
  50. // node or contract code already exists on disk or not. It self populates from the
  51. // provided disk database on creation in a background thread and will only start
  52. // returning live results once that's finished.
  53. type SyncBloom struct {
  54. bloom *bloomfilter.Filter
  55. inited uint32
  56. closer sync.Once
  57. closed uint32
  58. pend sync.WaitGroup
  59. }
  60. // NewSyncBloom creates a new bloom filter of the given size (in megabytes) and
  61. // initializes it from the database. The bloom is hard coded to use 3 filters.
  62. func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom {
  63. // Create the bloom filter to track known trie nodes
  64. bloom, err := bloomfilter.New(memory*1024*1024*8, 3)
  65. if err != nil {
  66. panic(fmt.Sprintf("failed to create bloom: %v", err))
  67. }
  68. log.Info("Allocated fast sync bloom", "size", common.StorageSize(memory*1024*1024))
  69. // Assemble the fast sync bloom and init it from previous sessions
  70. b := &SyncBloom{
  71. bloom: bloom,
  72. }
  73. b.pend.Add(2)
  74. go func() {
  75. defer b.pend.Done()
  76. b.init(database)
  77. }()
  78. go func() {
  79. defer b.pend.Done()
  80. b.meter()
  81. }()
  82. return b
  83. }
  84. // init iterates over the database, pushing every trie hash into the bloom filter.
  85. func (b *SyncBloom) init(database ethdb.Iteratee) {
  86. // Iterate over the database, but restart every now and again to avoid holding
  87. // a persistent snapshot since fast sync can push a ton of data concurrently,
  88. // bloating the disk.
  89. //
  90. // Note, this is fine, because everything inserted into leveldb by fast sync is
  91. // also pushed into the bloom directly, so we're not missing anything when the
  92. // iterator is swapped out for a new one.
  93. it := database.NewIterator(nil, nil)
  94. var (
  95. start = time.Now()
  96. swap = time.Now()
  97. )
  98. for it.Next() && atomic.LoadUint32(&b.closed) == 0 {
  99. // If the database entry is a trie node, add it to the bloom
  100. key := it.Key()
  101. if len(key) == common.HashLength {
  102. b.bloom.Add(syncBloomHasher(key))
  103. bloomLoadMeter.Mark(1)
  104. }
  105. // If the database entry is a contract code, add it to the bloom
  106. if ok, hash := rawdb.IsCodeKey(key); ok {
  107. b.bloom.Add(syncBloomHasher(hash))
  108. bloomLoadMeter.Mark(1)
  109. }
  110. // If enough time elapsed since the last iterator swap, restart
  111. if time.Since(swap) > 8*time.Second {
  112. key := common.CopyBytes(it.Key())
  113. it.Release()
  114. it = database.NewIterator(nil, key)
  115. log.Info("Initializing fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", common.PrettyDuration(time.Since(start)))
  116. swap = time.Now()
  117. }
  118. }
  119. it.Release()
  120. // Mark the bloom filter inited and return
  121. log.Info("Initialized fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate(), "elapsed", common.PrettyDuration(time.Since(start)))
  122. atomic.StoreUint32(&b.inited, 1)
  123. }
  124. // meter periodically recalculates the false positive error rate of the bloom
  125. // filter and reports it in a metric.
  126. func (b *SyncBloom) meter() {
  127. for {
  128. // Report the current error ration. No floats, lame, scale it up.
  129. bloomErrorGauge.Update(int64(b.errorRate() * 100000))
  130. // Wait one second, but check termination more frequently
  131. for i := 0; i < 10; i++ {
  132. if atomic.LoadUint32(&b.closed) == 1 {
  133. return
  134. }
  135. time.Sleep(100 * time.Millisecond)
  136. }
  137. }
  138. }
  139. // Close terminates any background initializer still running and releases all the
  140. // memory allocated for the bloom.
  141. func (b *SyncBloom) Close() error {
  142. b.closer.Do(func() {
  143. // Ensure the initializer is stopped
  144. atomic.StoreUint32(&b.closed, 1)
  145. b.pend.Wait()
  146. // Wipe the bloom, but mark it "uninited" just in case someone attempts an access
  147. log.Info("Deallocated fast sync bloom", "items", b.bloom.N(), "errorrate", b.errorRate())
  148. atomic.StoreUint32(&b.inited, 0)
  149. b.bloom = nil
  150. })
  151. return nil
  152. }
  153. // Add inserts a new trie node hash into the bloom filter.
  154. func (b *SyncBloom) Add(hash []byte) {
  155. if atomic.LoadUint32(&b.closed) == 1 {
  156. return
  157. }
  158. b.bloom.Add(syncBloomHasher(hash))
  159. bloomAddMeter.Mark(1)
  160. }
  161. // Contains tests if the bloom filter contains the given hash:
  162. // - false: the bloom definitely does not contain hash
  163. // - true: the bloom maybe contains hash
  164. //
  165. // While the bloom is being initialized, any query will return true.
  166. func (b *SyncBloom) Contains(hash []byte) bool {
  167. bloomTestMeter.Mark(1)
  168. if atomic.LoadUint32(&b.inited) == 0 {
  169. // We didn't load all the trie nodes from the previous run of Geth yet. As
  170. // such, we can't say for sure if a hash is not present for anything. Until
  171. // the init is done, we're faking "possible presence" for everything.
  172. return true
  173. }
  174. // Bloom initialized, check the real one and report any successful misses
  175. maybe := b.bloom.Contains(syncBloomHasher(hash))
  176. if !maybe {
  177. bloomMissMeter.Mark(1)
  178. }
  179. return maybe
  180. }
  181. // errorRate calculates the probability of a random containment test returning a
  182. // false positive.
  183. //
  184. // We're calculating it ourselves because the bloom library we used missed a
  185. // parentheses in the formula and calculates it wrong. And it's discontinued...
  186. func (b *SyncBloom) errorRate() float64 {
  187. k := float64(b.bloom.K())
  188. n := float64(b.bloom.N())
  189. m := float64(b.bloom.M())
  190. return math.Pow(1.0-math.Exp((-k)*(n+0.5)/(m-1)), k)
  191. }