trie_prefetcher.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package state
  17. import (
  18. "sync"
  19. "github.com/ethereum/go-ethereum/common"
  20. "github.com/ethereum/go-ethereum/log"
  21. "github.com/ethereum/go-ethereum/metrics"
  22. )
  23. var (
  24. // triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
  25. triePrefetchMetricsPrefix = "trie/prefetch/"
  26. )
  27. // triePrefetcher is an active prefetcher, which receives accounts or storage
  28. // items and does trie-loading of them. The goal is to get as much useful content
  29. // into the caches as possible.
  30. //
  31. // Note, the prefetcher's API is not thread safe.
  32. type triePrefetcher struct {
  33. db Database // Database to fetch trie nodes through
  34. root common.Hash // Root hash of theaccount trie for metrics
  35. fetches map[common.Hash]Trie // Partially or fully fetcher tries
  36. fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
  37. deliveryMissMeter metrics.Meter
  38. accountLoadMeter metrics.Meter
  39. accountDupMeter metrics.Meter
  40. accountSkipMeter metrics.Meter
  41. accountWasteMeter metrics.Meter
  42. storageLoadMeter metrics.Meter
  43. storageDupMeter metrics.Meter
  44. storageSkipMeter metrics.Meter
  45. storageWasteMeter metrics.Meter
  46. }
  47. // newTriePrefetcher
  48. func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
  49. prefix := triePrefetchMetricsPrefix + namespace
  50. p := &triePrefetcher{
  51. db: db,
  52. root: root,
  53. fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
  54. deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
  55. accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
  56. accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
  57. accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
  58. accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
  59. storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
  60. storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
  61. storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
  62. storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
  63. }
  64. return p
  65. }
  66. // close iterates over all the subfetchers, aborts any that were left spinning
  67. // and reports the stats to the metrics subsystem.
  68. func (p *triePrefetcher) close() {
  69. for _, fetcher := range p.fetchers {
  70. fetcher.abort() // safe to do multiple times
  71. if metrics.Enabled {
  72. if fetcher.root == p.root {
  73. p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
  74. p.accountDupMeter.Mark(int64(fetcher.dups))
  75. p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
  76. for _, key := range fetcher.used {
  77. delete(fetcher.seen, string(key))
  78. }
  79. p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
  80. } else {
  81. p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
  82. p.storageDupMeter.Mark(int64(fetcher.dups))
  83. p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
  84. for _, key := range fetcher.used {
  85. delete(fetcher.seen, string(key))
  86. }
  87. p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
  88. }
  89. }
  90. }
  91. // Clear out all fetchers (will crash on a second call, deliberate)
  92. p.fetchers = nil
  93. }
  94. // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
  95. // already loaded will be copied over, but no goroutines will be started. This
  96. // is mostly used in the miner which creates a copy of it's actively mutated
  97. // state to be sealed while it may further mutate the state.
  98. func (p *triePrefetcher) copy() *triePrefetcher {
  99. copy := &triePrefetcher{
  100. db: p.db,
  101. root: p.root,
  102. fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map
  103. deliveryMissMeter: p.deliveryMissMeter,
  104. accountLoadMeter: p.accountLoadMeter,
  105. accountDupMeter: p.accountDupMeter,
  106. accountSkipMeter: p.accountSkipMeter,
  107. accountWasteMeter: p.accountWasteMeter,
  108. storageLoadMeter: p.storageLoadMeter,
  109. storageDupMeter: p.storageDupMeter,
  110. storageSkipMeter: p.storageSkipMeter,
  111. storageWasteMeter: p.storageWasteMeter,
  112. }
  113. // If the prefetcher is already a copy, duplicate the data
  114. if p.fetches != nil {
  115. for root, fetch := range p.fetches {
  116. copy.fetches[root] = p.db.CopyTrie(fetch)
  117. }
  118. return copy
  119. }
  120. // Otherwise we're copying an active fetcher, retrieve the current states
  121. for root, fetcher := range p.fetchers {
  122. copy.fetches[root] = fetcher.peek()
  123. }
  124. return copy
  125. }
  126. // prefetch schedules a batch of trie items to prefetch.
  127. func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) {
  128. // If the prefetcher is an inactive one, bail out
  129. if p.fetches != nil {
  130. return
  131. }
  132. // Active fetcher, schedule the retrievals
  133. fetcher := p.fetchers[root]
  134. if fetcher == nil {
  135. fetcher = newSubfetcher(p.db, root)
  136. p.fetchers[root] = fetcher
  137. }
  138. fetcher.schedule(keys)
  139. }
  140. // trie returns the trie matching the root hash, or nil if the prefetcher doesn't
  141. // have it.
  142. func (p *triePrefetcher) trie(root common.Hash) Trie {
  143. // If the prefetcher is inactive, return from existing deep copies
  144. if p.fetches != nil {
  145. trie := p.fetches[root]
  146. if trie == nil {
  147. p.deliveryMissMeter.Mark(1)
  148. return nil
  149. }
  150. return p.db.CopyTrie(trie)
  151. }
  152. // Otherwise the prefetcher is active, bail if no trie was prefetched for this root
  153. fetcher := p.fetchers[root]
  154. if fetcher == nil {
  155. p.deliveryMissMeter.Mark(1)
  156. return nil
  157. }
  158. // Interrupt the prefetcher if it's by any chance still running and return
  159. // a copy of any pre-loaded trie.
  160. fetcher.abort() // safe to do multiple times
  161. trie := fetcher.peek()
  162. if trie == nil {
  163. p.deliveryMissMeter.Mark(1)
  164. return nil
  165. }
  166. return trie
  167. }
  168. // used marks a batch of state items used to allow creating statistics as to
  169. // how useful or wasteful the prefetcher is.
  170. func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
  171. if fetcher := p.fetchers[root]; fetcher != nil {
  172. fetcher.used = used
  173. }
  174. }
  175. // subfetcher is a trie fetcher goroutine responsible for pulling entries for a
  176. // single trie. It is spawned when a new root is encountered and lives until the
  177. // main prefetcher is paused and either all requested items are processed or if
  178. // the trie being worked on is retrieved from the prefetcher.
  179. type subfetcher struct {
  180. db Database // Database to load trie nodes through
  181. root common.Hash // Root hash of the trie to prefetch
  182. trie Trie // Trie being populated with nodes
  183. tasks [][]byte // Items queued up for retrieval
  184. lock sync.Mutex // Lock protecting the task queue
  185. wake chan struct{} // Wake channel if a new task is scheduled
  186. stop chan struct{} // Channel to interrupt processing
  187. term chan struct{} // Channel to signal iterruption
  188. copy chan chan Trie // Channel to request a copy of the current trie
  189. seen map[string]struct{} // Tracks the entries already loaded
  190. dups int // Number of duplicate preload tasks
  191. used [][]byte // Tracks the entries used in the end
  192. }
  193. // newSubfetcher creates a goroutine to prefetch state items belonging to a
  194. // particular root hash.
  195. func newSubfetcher(db Database, root common.Hash) *subfetcher {
  196. sf := &subfetcher{
  197. db: db,
  198. root: root,
  199. wake: make(chan struct{}, 1),
  200. stop: make(chan struct{}),
  201. term: make(chan struct{}),
  202. copy: make(chan chan Trie),
  203. seen: make(map[string]struct{}),
  204. }
  205. go sf.loop()
  206. return sf
  207. }
  208. // schedule adds a batch of trie keys to the queue to prefetch.
  209. func (sf *subfetcher) schedule(keys [][]byte) {
  210. // Append the tasks to the current queue
  211. sf.lock.Lock()
  212. sf.tasks = append(sf.tasks, keys...)
  213. sf.lock.Unlock()
  214. // Notify the prefetcher, it's fine if it's already terminated
  215. select {
  216. case sf.wake <- struct{}{}:
  217. default:
  218. }
  219. }
  220. // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
  221. // is currently.
  222. func (sf *subfetcher) peek() Trie {
  223. ch := make(chan Trie)
  224. select {
  225. case sf.copy <- ch:
  226. // Subfetcher still alive, return copy from it
  227. return <-ch
  228. case <-sf.term:
  229. // Subfetcher already terminated, return a copy directly
  230. if sf.trie == nil {
  231. return nil
  232. }
  233. return sf.db.CopyTrie(sf.trie)
  234. }
  235. }
  236. // abort interrupts the subfetcher immediately. It is safe to call abort multiple
  237. // times but it is not thread safe.
  238. func (sf *subfetcher) abort() {
  239. select {
  240. case <-sf.stop:
  241. default:
  242. close(sf.stop)
  243. }
  244. <-sf.term
  245. }
  246. // loop waits for new tasks to be scheduled and keeps loading them until it runs
  247. // out of tasks or its underlying trie is retrieved for committing.
  248. func (sf *subfetcher) loop() {
  249. // No matter how the loop stops, signal anyone waiting that it's terminated
  250. defer close(sf.term)
  251. // Start by opening the trie and stop processing if it fails
  252. trie, err := sf.db.OpenTrie(sf.root)
  253. if err != nil {
  254. log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
  255. return
  256. }
  257. sf.trie = trie
  258. // Trie opened successfully, keep prefetching items
  259. for {
  260. select {
  261. case <-sf.wake:
  262. // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
  263. sf.lock.Lock()
  264. tasks := sf.tasks
  265. sf.tasks = nil
  266. sf.lock.Unlock()
  267. // Prefetch any tasks until the loop is interrupted
  268. for i, task := range tasks {
  269. select {
  270. case <-sf.stop:
  271. // If termination is requested, add any leftover back and return
  272. sf.lock.Lock()
  273. sf.tasks = append(sf.tasks, tasks[i:]...)
  274. sf.lock.Unlock()
  275. return
  276. case ch := <-sf.copy:
  277. // Somebody wants a copy of the current trie, grant them
  278. ch <- sf.db.CopyTrie(sf.trie)
  279. default:
  280. // No termination request yet, prefetch the next entry
  281. taskid := string(task)
  282. if _, ok := sf.seen[taskid]; ok {
  283. sf.dups++
  284. } else {
  285. sf.trie.TryGet(task)
  286. sf.seen[taskid] = struct{}{}
  287. }
  288. }
  289. }
  290. case ch := <-sf.copy:
  291. // Somebody wants a copy of the current trie, grant them
  292. ch <- sf.db.CopyTrie(sf.trie)
  293. case <-sf.stop:
  294. // Termination is requested, abort and leave remaining tasks
  295. return
  296. }
  297. }
  298. }