trie_prefetcher.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package state
  17. import (
  18. "sync"
  19. "github.com/ethereum/go-ethereum/common"
  20. "github.com/ethereum/go-ethereum/log"
  21. "github.com/ethereum/go-ethereum/metrics"
  22. )
  23. var (
  24. // triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
  25. triePrefetchMetricsPrefix = "trie/prefetch/"
  26. )
  27. // triePrefetcher is an active prefetcher, which receives accounts or storage
  28. // items and does trie-loading of them. The goal is to get as much useful content
  29. // into the caches as possible.
  30. //
  31. // Note, the prefetcher's API is not thread safe.
  32. type triePrefetcher struct {
  33. db Database // Database to fetch trie nodes through
  34. root common.Hash // Root hash of the account trie for metrics
  35. fetches map[string]Trie // Partially or fully fetcher tries
  36. fetchers map[string]*subfetcher // Subfetchers for each trie
  37. deliveryMissMeter metrics.Meter
  38. accountLoadMeter metrics.Meter
  39. accountDupMeter metrics.Meter
  40. accountSkipMeter metrics.Meter
  41. accountWasteMeter metrics.Meter
  42. storageLoadMeter metrics.Meter
  43. storageDupMeter metrics.Meter
  44. storageSkipMeter metrics.Meter
  45. storageWasteMeter metrics.Meter
  46. }
  47. func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
  48. prefix := triePrefetchMetricsPrefix + namespace
  49. p := &triePrefetcher{
  50. db: db,
  51. root: root,
  52. fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
  53. deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
  54. accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
  55. accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
  56. accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
  57. accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
  58. storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
  59. storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
  60. storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
  61. storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
  62. }
  63. return p
  64. }
  65. // close iterates over all the subfetchers, aborts any that were left spinning
  66. // and reports the stats to the metrics subsystem.
  67. func (p *triePrefetcher) close() {
  68. for _, fetcher := range p.fetchers {
  69. fetcher.abort() // safe to do multiple times
  70. if metrics.Enabled {
  71. if fetcher.root == p.root {
  72. p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
  73. p.accountDupMeter.Mark(int64(fetcher.dups))
  74. p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
  75. for _, key := range fetcher.used {
  76. delete(fetcher.seen, string(key))
  77. }
  78. p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
  79. } else {
  80. p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
  81. p.storageDupMeter.Mark(int64(fetcher.dups))
  82. p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
  83. for _, key := range fetcher.used {
  84. delete(fetcher.seen, string(key))
  85. }
  86. p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
  87. }
  88. }
  89. }
  90. // Clear out all fetchers (will crash on a second call, deliberate)
  91. p.fetchers = nil
  92. }
  93. // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
  94. // already loaded will be copied over, but no goroutines will be started. This
  95. // is mostly used in the miner which creates a copy of it's actively mutated
  96. // state to be sealed while it may further mutate the state.
  97. func (p *triePrefetcher) copy() *triePrefetcher {
  98. copy := &triePrefetcher{
  99. db: p.db,
  100. root: p.root,
  101. fetches: make(map[string]Trie), // Active prefetchers use the fetches map
  102. deliveryMissMeter: p.deliveryMissMeter,
  103. accountLoadMeter: p.accountLoadMeter,
  104. accountDupMeter: p.accountDupMeter,
  105. accountSkipMeter: p.accountSkipMeter,
  106. accountWasteMeter: p.accountWasteMeter,
  107. storageLoadMeter: p.storageLoadMeter,
  108. storageDupMeter: p.storageDupMeter,
  109. storageSkipMeter: p.storageSkipMeter,
  110. storageWasteMeter: p.storageWasteMeter,
  111. }
  112. // If the prefetcher is already a copy, duplicate the data
  113. if p.fetches != nil {
  114. for root, fetch := range p.fetches {
  115. copy.fetches[root] = p.db.CopyTrie(fetch)
  116. }
  117. return copy
  118. }
  119. // Otherwise we're copying an active fetcher, retrieve the current states
  120. for id, fetcher := range p.fetchers {
  121. copy.fetches[id] = fetcher.peek()
  122. }
  123. return copy
  124. }
  125. // prefetch schedules a batch of trie items to prefetch.
  126. func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, keys [][]byte) {
  127. // If the prefetcher is an inactive one, bail out
  128. if p.fetches != nil {
  129. return
  130. }
  131. // Active fetcher, schedule the retrievals
  132. id := p.trieID(owner, root)
  133. fetcher := p.fetchers[id]
  134. if fetcher == nil {
  135. fetcher = newSubfetcher(p.db, owner, root)
  136. p.fetchers[id] = fetcher
  137. }
  138. fetcher.schedule(keys)
  139. }
  140. // trie returns the trie matching the root hash, or nil if the prefetcher doesn't
  141. // have it.
  142. func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
  143. // If the prefetcher is inactive, return from existing deep copies
  144. id := p.trieID(owner, root)
  145. if p.fetches != nil {
  146. trie := p.fetches[id]
  147. if trie == nil {
  148. p.deliveryMissMeter.Mark(1)
  149. return nil
  150. }
  151. return p.db.CopyTrie(trie)
  152. }
  153. // Otherwise the prefetcher is active, bail if no trie was prefetched for this root
  154. fetcher := p.fetchers[id]
  155. if fetcher == nil {
  156. p.deliveryMissMeter.Mark(1)
  157. return nil
  158. }
  159. // Interrupt the prefetcher if it's by any chance still running and return
  160. // a copy of any pre-loaded trie.
  161. fetcher.abort() // safe to do multiple times
  162. trie := fetcher.peek()
  163. if trie == nil {
  164. p.deliveryMissMeter.Mark(1)
  165. return nil
  166. }
  167. return trie
  168. }
  169. // used marks a batch of state items used to allow creating statistics as to
  170. // how useful or wasteful the prefetcher is.
  171. func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
  172. if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
  173. fetcher.used = used
  174. }
  175. }
  176. // trieID returns an unique trie identifier consists the trie owner and root hash.
  177. func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string {
  178. return string(append(owner.Bytes(), root.Bytes()...))
  179. }
  180. // subfetcher is a trie fetcher goroutine responsible for pulling entries for a
  181. // single trie. It is spawned when a new root is encountered and lives until the
  182. // main prefetcher is paused and either all requested items are processed or if
  183. // the trie being worked on is retrieved from the prefetcher.
  184. type subfetcher struct {
  185. db Database // Database to load trie nodes through
  186. owner common.Hash // Owner of the trie, usually account hash
  187. root common.Hash // Root hash of the trie to prefetch
  188. trie Trie // Trie being populated with nodes
  189. tasks [][]byte // Items queued up for retrieval
  190. lock sync.Mutex // Lock protecting the task queue
  191. wake chan struct{} // Wake channel if a new task is scheduled
  192. stop chan struct{} // Channel to interrupt processing
  193. term chan struct{} // Channel to signal interruption
  194. copy chan chan Trie // Channel to request a copy of the current trie
  195. seen map[string]struct{} // Tracks the entries already loaded
  196. dups int // Number of duplicate preload tasks
  197. used [][]byte // Tracks the entries used in the end
  198. }
  199. // newSubfetcher creates a goroutine to prefetch state items belonging to a
  200. // particular root hash.
  201. func newSubfetcher(db Database, owner common.Hash, root common.Hash) *subfetcher {
  202. sf := &subfetcher{
  203. db: db,
  204. owner: owner,
  205. root: root,
  206. wake: make(chan struct{}, 1),
  207. stop: make(chan struct{}),
  208. term: make(chan struct{}),
  209. copy: make(chan chan Trie),
  210. seen: make(map[string]struct{}),
  211. }
  212. go sf.loop()
  213. return sf
  214. }
  215. // schedule adds a batch of trie keys to the queue to prefetch.
  216. func (sf *subfetcher) schedule(keys [][]byte) {
  217. // Append the tasks to the current queue
  218. sf.lock.Lock()
  219. sf.tasks = append(sf.tasks, keys...)
  220. sf.lock.Unlock()
  221. // Notify the prefetcher, it's fine if it's already terminated
  222. select {
  223. case sf.wake <- struct{}{}:
  224. default:
  225. }
  226. }
  227. // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
  228. // is currently.
  229. func (sf *subfetcher) peek() Trie {
  230. ch := make(chan Trie)
  231. select {
  232. case sf.copy <- ch:
  233. // Subfetcher still alive, return copy from it
  234. return <-ch
  235. case <-sf.term:
  236. // Subfetcher already terminated, return a copy directly
  237. if sf.trie == nil {
  238. return nil
  239. }
  240. return sf.db.CopyTrie(sf.trie)
  241. }
  242. }
  243. // abort interrupts the subfetcher immediately. It is safe to call abort multiple
  244. // times but it is not thread safe.
  245. func (sf *subfetcher) abort() {
  246. select {
  247. case <-sf.stop:
  248. default:
  249. close(sf.stop)
  250. }
  251. <-sf.term
  252. }
  253. // loop waits for new tasks to be scheduled and keeps loading them until it runs
  254. // out of tasks or its underlying trie is retrieved for committing.
  255. func (sf *subfetcher) loop() {
  256. // No matter how the loop stops, signal anyone waiting that it's terminated
  257. defer close(sf.term)
  258. // Start by opening the trie and stop processing if it fails
  259. if sf.owner == (common.Hash{}) {
  260. trie, err := sf.db.OpenTrie(sf.root)
  261. if err != nil {
  262. log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
  263. return
  264. }
  265. sf.trie = trie
  266. } else {
  267. trie, err := sf.db.OpenStorageTrie(sf.owner, sf.root)
  268. if err != nil {
  269. log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
  270. return
  271. }
  272. sf.trie = trie
  273. }
  274. // Trie opened successfully, keep prefetching items
  275. for {
  276. select {
  277. case <-sf.wake:
  278. // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
  279. sf.lock.Lock()
  280. tasks := sf.tasks
  281. sf.tasks = nil
  282. sf.lock.Unlock()
  283. // Prefetch any tasks until the loop is interrupted
  284. for i, task := range tasks {
  285. select {
  286. case <-sf.stop:
  287. // If termination is requested, add any leftover back and return
  288. sf.lock.Lock()
  289. sf.tasks = append(sf.tasks, tasks[i:]...)
  290. sf.lock.Unlock()
  291. return
  292. case ch := <-sf.copy:
  293. // Somebody wants a copy of the current trie, grant them
  294. ch <- sf.db.CopyTrie(sf.trie)
  295. default:
  296. // No termination request yet, prefetch the next entry
  297. if _, ok := sf.seen[string(task)]; ok {
  298. sf.dups++
  299. } else {
  300. if len(task) == len(common.Address{}) {
  301. sf.trie.TryGetAccount(task)
  302. } else {
  303. sf.trie.TryGet(task)
  304. }
  305. sf.seen[string(task)] = struct{}{}
  306. }
  307. }
  308. }
  309. case ch := <-sf.copy:
  310. // Somebody wants a copy of the current trie, grant them
  311. ch <- sf.db.CopyTrie(sf.trie)
  312. case <-sf.stop:
  313. // Termination is requested, abort and leave remaining tasks
  314. return
  315. }
  316. }
  317. }