trie_prefetcher.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package state
  17. import (
  18. "github.com/ethereum/go-ethereum/common"
  19. "github.com/ethereum/go-ethereum/log"
  20. "github.com/ethereum/go-ethereum/metrics"
  21. )
  22. var (
  23. // trieDeliveryMeter counts how many times the prefetcher was unable to supply
  24. // the statedb with a prefilled trie. This meter should be zero -- if it's not, that
  25. // needs to be investigated
  26. trieDeliveryMissMeter = metrics.NewRegisteredMeter("trie/prefetch/deliverymiss", nil)
  27. triePrefetchFetchMeter = metrics.NewRegisteredMeter("trie/prefetch/fetch", nil)
  28. triePrefetchSkipMeter = metrics.NewRegisteredMeter("trie/prefetch/skip", nil)
  29. triePrefetchDropMeter = metrics.NewRegisteredMeter("trie/prefetch/drop", nil)
  30. )
  31. // TriePrefetcher is an active prefetcher, which receives accounts or storage
  32. // items on two channels, and does trie-loading of the items.
  33. // The goal is to get as much useful content into the caches as possible
  34. type TriePrefetcher struct {
  35. requestCh chan (fetchRequest) // Chan to receive requests for data to fetch
  36. cmdCh chan (*cmd) // Chan to control activity, pause/new root
  37. quitCh chan (struct{})
  38. deliveryCh chan (struct{})
  39. db Database
  40. paused bool
  41. storageTries map[common.Hash]Trie
  42. accountTrie Trie
  43. accountTrieRoot common.Hash
  44. }
  45. func NewTriePrefetcher(db Database) *TriePrefetcher {
  46. return &TriePrefetcher{
  47. requestCh: make(chan fetchRequest, 200),
  48. cmdCh: make(chan *cmd),
  49. quitCh: make(chan struct{}),
  50. deliveryCh: make(chan struct{}),
  51. db: db,
  52. }
  53. }
  54. type cmd struct {
  55. root common.Hash
  56. }
  57. type fetchRequest struct {
  58. slots []common.Hash
  59. storageRoot *common.Hash
  60. addresses []common.Address
  61. }
  62. func (p *TriePrefetcher) Loop() {
  63. var (
  64. accountTrieRoot common.Hash
  65. accountTrie Trie
  66. storageTries map[common.Hash]Trie
  67. err error
  68. // Some tracking of performance
  69. skipped int64
  70. fetched int64
  71. paused = true
  72. )
  73. // The prefetcher loop has two distinct phases:
  74. // 1: Paused: when in this state, the accumulated tries are accessible to outside
  75. // callers.
  76. // 2: Active prefetching, awaiting slots and accounts to prefetch
  77. for {
  78. select {
  79. case <-p.quitCh:
  80. return
  81. case cmd := <-p.cmdCh:
  82. // Clear out any old requests
  83. drain:
  84. for {
  85. select {
  86. case req := <-p.requestCh:
  87. if req.slots != nil {
  88. skipped += int64(len(req.slots))
  89. } else {
  90. skipped += int64(len(req.addresses))
  91. }
  92. default:
  93. break drain
  94. }
  95. }
  96. if paused {
  97. // Clear old data
  98. p.storageTries = nil
  99. p.accountTrie = nil
  100. p.accountTrieRoot = common.Hash{}
  101. // Resume again
  102. storageTries = make(map[common.Hash]Trie)
  103. accountTrieRoot = cmd.root
  104. accountTrie, err = p.db.OpenTrie(accountTrieRoot)
  105. if err != nil {
  106. log.Error("Trie prefetcher failed opening trie", "root", accountTrieRoot, "err", err)
  107. }
  108. if accountTrieRoot == (common.Hash{}) {
  109. log.Error("Trie prefetcher unpaused with bad root")
  110. }
  111. paused = false
  112. } else {
  113. // Update metrics at new block events
  114. triePrefetchFetchMeter.Mark(fetched)
  115. triePrefetchSkipMeter.Mark(skipped)
  116. fetched, skipped = 0, 0
  117. // Make the tries accessible
  118. p.accountTrie = accountTrie
  119. p.storageTries = storageTries
  120. p.accountTrieRoot = accountTrieRoot
  121. if cmd.root != (common.Hash{}) {
  122. log.Error("Trie prefetcher paused with non-empty root")
  123. }
  124. paused = true
  125. }
  126. p.deliveryCh <- struct{}{}
  127. case req := <-p.requestCh:
  128. if paused {
  129. continue
  130. }
  131. if sRoot := req.storageRoot; sRoot != nil {
  132. // Storage slots to fetch
  133. var (
  134. storageTrie Trie
  135. err error
  136. )
  137. if storageTrie = storageTries[*sRoot]; storageTrie == nil {
  138. if storageTrie, err = p.db.OpenTrie(*sRoot); err != nil {
  139. log.Warn("trie prefetcher failed opening storage trie", "root", *sRoot, "err", err)
  140. skipped += int64(len(req.slots))
  141. continue
  142. }
  143. storageTries[*sRoot] = storageTrie
  144. }
  145. for _, key := range req.slots {
  146. storageTrie.TryGet(key[:])
  147. }
  148. fetched += int64(len(req.slots))
  149. } else { // an account
  150. for _, addr := range req.addresses {
  151. accountTrie.TryGet(addr[:])
  152. }
  153. fetched += int64(len(req.addresses))
  154. }
  155. }
  156. }
  157. }
  158. // Close stops the prefetcher
  159. func (p *TriePrefetcher) Close() {
  160. if p.quitCh != nil {
  161. close(p.quitCh)
  162. p.quitCh = nil
  163. }
  164. }
  165. // Resume causes the prefetcher to clear out old data, and get ready to
  166. // fetch data concerning the new root
  167. func (p *TriePrefetcher) Resume(root common.Hash) {
  168. p.paused = false
  169. p.cmdCh <- &cmd{
  170. root: root,
  171. }
  172. // Wait for it
  173. <-p.deliveryCh
  174. }
  175. // Pause causes the prefetcher to pause prefetching, and make tries
  176. // accessible to callers via GetTrie
  177. func (p *TriePrefetcher) Pause() {
  178. if p.paused {
  179. return
  180. }
  181. p.paused = true
  182. p.cmdCh <- &cmd{
  183. root: common.Hash{},
  184. }
  185. // Wait for it
  186. <-p.deliveryCh
  187. }
  188. // PrefetchAddresses adds an address for prefetching
  189. func (p *TriePrefetcher) PrefetchAddresses(addresses []common.Address) {
  190. cmd := fetchRequest{
  191. addresses: addresses,
  192. }
  193. // We do an async send here, to not cause the caller to block
  194. //p.requestCh <- cmd
  195. select {
  196. case p.requestCh <- cmd:
  197. default:
  198. triePrefetchDropMeter.Mark(int64(len(addresses)))
  199. }
  200. }
  201. // PrefetchStorage adds a storage root and a set of keys for prefetching
  202. func (p *TriePrefetcher) PrefetchStorage(root common.Hash, slots []common.Hash) {
  203. cmd := fetchRequest{
  204. storageRoot: &root,
  205. slots: slots,
  206. }
  207. // We do an async send here, to not cause the caller to block
  208. //p.requestCh <- cmd
  209. select {
  210. case p.requestCh <- cmd:
  211. default:
  212. triePrefetchDropMeter.Mark(int64(len(slots)))
  213. }
  214. }
  215. // GetTrie returns the trie matching the root hash, or nil if the prefetcher
  216. // doesn't have it.
  217. func (p *TriePrefetcher) GetTrie(root common.Hash) Trie {
  218. if root == p.accountTrieRoot {
  219. return p.accountTrie
  220. }
  221. if storageTrie, ok := p.storageTries[root]; ok {
  222. // Two accounts may well have the same storage root, but we cannot allow
  223. // them both to make updates to the same trie instance. Therefore,
  224. // we need to either delete the trie now, or deliver a copy of the trie.
  225. delete(p.storageTries, root)
  226. return storageTrie
  227. }
  228. trieDeliveryMissMeter.Mark(1)
  229. return nil
  230. }