trie_prefetcher.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package state
  17. import (
  18. "sync"
  19. "github.com/ethereum/go-ethereum/common"
  20. "github.com/ethereum/go-ethereum/common/gopool"
  21. "github.com/ethereum/go-ethereum/log"
  22. "github.com/ethereum/go-ethereum/metrics"
  23. )
  24. var (
  25. // triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
  26. triePrefetchMetricsPrefix = "trie/prefetch/"
  27. )
  28. // triePrefetcher is an active prefetcher, which receives accounts or storage
  29. // items and does trie-loading of them. The goal is to get as much useful content
  30. // into the caches as possible.
  31. //
  32. // Note, the prefetcher's API is not thread safe.
  33. type triePrefetcher struct {
  34. db Database // Database to fetch trie nodes through
  35. root common.Hash // Root hash of theaccount trie for metrics
  36. fetches map[common.Hash]Trie // Partially or fully fetcher tries
  37. fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
  38. deliveryMissMeter metrics.Meter
  39. accountLoadMeter metrics.Meter
  40. accountDupMeter metrics.Meter
  41. accountSkipMeter metrics.Meter
  42. accountWasteMeter metrics.Meter
  43. storageLoadMeter metrics.Meter
  44. storageDupMeter metrics.Meter
  45. storageSkipMeter metrics.Meter
  46. storageWasteMeter metrics.Meter
  47. }
  48. // newTriePrefetcher
  49. func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
  50. prefix := triePrefetchMetricsPrefix + namespace
  51. p := &triePrefetcher{
  52. db: db,
  53. root: root,
  54. fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
  55. deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
  56. accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
  57. accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
  58. accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
  59. accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
  60. storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
  61. storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
  62. storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
  63. storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
  64. }
  65. return p
  66. }
  67. // close iterates over all the subfetchers, aborts any that were left spinning
  68. // and reports the stats to the metrics subsystem.
  69. func (p *triePrefetcher) close() {
  70. for _, fetcher := range p.fetchers {
  71. fetcher.abort() // safe to do multiple times
  72. if metrics.Enabled {
  73. if fetcher.root == p.root {
  74. p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
  75. p.accountDupMeter.Mark(int64(fetcher.dups))
  76. p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
  77. for _, key := range fetcher.used {
  78. delete(fetcher.seen, string(key))
  79. }
  80. p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
  81. } else {
  82. p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
  83. p.storageDupMeter.Mark(int64(fetcher.dups))
  84. p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
  85. for _, key := range fetcher.used {
  86. delete(fetcher.seen, string(key))
  87. }
  88. p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
  89. }
  90. }
  91. }
  92. // Clear out all fetchers (will crash on a second call, deliberate)
  93. p.fetchers = nil
  94. }
  95. // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
  96. // already loaded will be copied over, but no goroutines will be started. This
  97. // is mostly used in the miner which creates a copy of it's actively mutated
  98. // state to be sealed while it may further mutate the state.
  99. func (p *triePrefetcher) copy() *triePrefetcher {
  100. copy := &triePrefetcher{
  101. db: p.db,
  102. root: p.root,
  103. fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map
  104. deliveryMissMeter: p.deliveryMissMeter,
  105. accountLoadMeter: p.accountLoadMeter,
  106. accountDupMeter: p.accountDupMeter,
  107. accountSkipMeter: p.accountSkipMeter,
  108. accountWasteMeter: p.accountWasteMeter,
  109. storageLoadMeter: p.storageLoadMeter,
  110. storageDupMeter: p.storageDupMeter,
  111. storageSkipMeter: p.storageSkipMeter,
  112. storageWasteMeter: p.storageWasteMeter,
  113. }
  114. // If the prefetcher is already a copy, duplicate the data
  115. if p.fetches != nil {
  116. for root, fetch := range p.fetches {
  117. copy.fetches[root] = p.db.CopyTrie(fetch)
  118. }
  119. return copy
  120. }
  121. // Otherwise we're copying an active fetcher, retrieve the current states
  122. for root, fetcher := range p.fetchers {
  123. copy.fetches[root] = fetcher.peek()
  124. }
  125. return copy
  126. }
  127. // prefetch schedules a batch of trie items to prefetch.
  128. func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash common.Hash) {
  129. // If the prefetcher is an inactive one, bail out
  130. if p.fetches != nil {
  131. return
  132. }
  133. // Active fetcher, schedule the retrievals
  134. fetcher := p.fetchers[root]
  135. if fetcher == nil {
  136. fetcher = newSubfetcher(p.db, root, accountHash)
  137. p.fetchers[root] = fetcher
  138. }
  139. fetcher.schedule(keys)
  140. }
  141. // trie returns the trie matching the root hash, or nil if the prefetcher doesn't
  142. // have it.
  143. func (p *triePrefetcher) trie(root common.Hash) Trie {
  144. // If the prefetcher is inactive, return from existing deep copies
  145. if p.fetches != nil {
  146. trie := p.fetches[root]
  147. if trie == nil {
  148. p.deliveryMissMeter.Mark(1)
  149. return nil
  150. }
  151. return p.db.CopyTrie(trie)
  152. }
  153. // Otherwise the prefetcher is active, bail if no trie was prefetched for this root
  154. fetcher := p.fetchers[root]
  155. if fetcher == nil {
  156. p.deliveryMissMeter.Mark(1)
  157. return nil
  158. }
  159. // Interrupt the prefetcher if it's by any chance still running and return
  160. // a copy of any pre-loaded trie.
  161. fetcher.abort() // safe to do multiple times
  162. trie := fetcher.peek()
  163. if trie == nil {
  164. p.deliveryMissMeter.Mark(1)
  165. return nil
  166. }
  167. return trie
  168. }
  169. // used marks a batch of state items used to allow creating statistics as to
  170. // how useful or wasteful the prefetcher is.
  171. func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
  172. if fetcher := p.fetchers[root]; fetcher != nil {
  173. fetcher.used = used
  174. }
  175. }
  176. // subfetcher is a trie fetcher goroutine responsible for pulling entries for a
  177. // single trie. It is spawned when a new root is encountered and lives until the
  178. // main prefetcher is paused and either all requested items are processed or if
  179. // the trie being worked on is retrieved from the prefetcher.
  180. type subfetcher struct {
  181. db Database // Database to load trie nodes through
  182. root common.Hash // Root hash of the trie to prefetch
  183. trie Trie // Trie being populated with nodes
  184. tasks [][]byte // Items queued up for retrieval
  185. lock sync.Mutex // Lock protecting the task queue
  186. wake chan struct{} // Wake channel if a new task is scheduled
  187. stop chan struct{} // Channel to interrupt processing
  188. term chan struct{} // Channel to signal iterruption
  189. copy chan chan Trie // Channel to request a copy of the current trie
  190. seen map[string]struct{} // Tracks the entries already loaded
  191. dups int // Number of duplicate preload tasks
  192. used [][]byte // Tracks the entries used in the end
  193. accountHash common.Hash
  194. }
  195. // newSubfetcher creates a goroutine to prefetch state items belonging to a
  196. // particular root hash.
  197. func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subfetcher {
  198. sf := &subfetcher{
  199. db: db,
  200. root: root,
  201. wake: make(chan struct{}, 1),
  202. stop: make(chan struct{}),
  203. term: make(chan struct{}),
  204. copy: make(chan chan Trie),
  205. seen: make(map[string]struct{}),
  206. accountHash: accountHash,
  207. }
  208. gopool.Submit(func() {
  209. sf.loop()
  210. })
  211. return sf
  212. }
  213. // schedule adds a batch of trie keys to the queue to prefetch.
  214. func (sf *subfetcher) schedule(keys [][]byte) {
  215. // Append the tasks to the current queue
  216. sf.lock.Lock()
  217. sf.tasks = append(sf.tasks, keys...)
  218. sf.lock.Unlock()
  219. // Notify the prefetcher, it's fine if it's already terminated
  220. select {
  221. case sf.wake <- struct{}{}:
  222. default:
  223. }
  224. }
  225. // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
  226. // is currently.
  227. func (sf *subfetcher) peek() Trie {
  228. ch := make(chan Trie)
  229. select {
  230. case sf.copy <- ch:
  231. // Subfetcher still alive, return copy from it
  232. return <-ch
  233. case <-sf.term:
  234. // Subfetcher already terminated, return a copy directly
  235. if sf.trie == nil {
  236. return nil
  237. }
  238. return sf.db.CopyTrie(sf.trie)
  239. }
  240. }
  241. // abort interrupts the subfetcher immediately. It is safe to call abort multiple
  242. // times but it is not thread safe.
  243. func (sf *subfetcher) abort() {
  244. select {
  245. case <-sf.stop:
  246. default:
  247. close(sf.stop)
  248. }
  249. <-sf.term
  250. }
  251. // loop waits for new tasks to be scheduled and keeps loading them until it runs
  252. // out of tasks or its underlying trie is retrieved for committing.
  253. func (sf *subfetcher) loop() {
  254. // No matter how the loop stops, signal anyone waiting that it's terminated
  255. defer close(sf.term)
  256. // Start by opening the trie and stop processing if it fails
  257. var trie Trie
  258. var err error
  259. if sf.accountHash == emptyAddr {
  260. trie, err = sf.db.OpenTrie(sf.root)
  261. } else {
  262. // address is useless
  263. trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root)
  264. }
  265. if err != nil {
  266. log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
  267. return
  268. }
  269. sf.trie = trie
  270. // Trie opened successfully, keep prefetching items
  271. for {
  272. select {
  273. case <-sf.wake:
  274. // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
  275. sf.lock.Lock()
  276. tasks := sf.tasks
  277. sf.tasks = nil
  278. sf.lock.Unlock()
  279. // Prefetch any tasks until the loop is interrupted
  280. for i, task := range tasks {
  281. select {
  282. case <-sf.stop:
  283. // If termination is requested, add any leftover back and return
  284. sf.lock.Lock()
  285. sf.tasks = append(sf.tasks, tasks[i:]...)
  286. sf.lock.Unlock()
  287. return
  288. case ch := <-sf.copy:
  289. // Somebody wants a copy of the current trie, grant them
  290. ch <- sf.db.CopyTrie(sf.trie)
  291. default:
  292. // No termination request yet, prefetch the next entry
  293. taskid := string(task)
  294. if _, ok := sf.seen[taskid]; ok {
  295. sf.dups++
  296. } else {
  297. sf.trie.TryGet(task)
  298. sf.seen[taskid] = struct{}{}
  299. }
  300. }
  301. }
  302. case ch := <-sf.copy:
  303. // Somebody wants a copy of the current trie, grant them
  304. ch <- sf.db.CopyTrie(sf.trie)
  305. case <-sf.stop:
  306. // Termination is requested, abort and leave remaining tasks
  307. return
  308. }
  309. }
  310. }