trie_prefetcher.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package state
  17. import (
  18. "sync"
  19. "github.com/ethereum/go-ethereum/common"
  20. "github.com/ethereum/go-ethereum/common/gopool"
  21. "github.com/ethereum/go-ethereum/log"
  22. "github.com/ethereum/go-ethereum/metrics"
  23. )
  24. const abortChanSize = 64
  25. var (
  26. // triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
  27. triePrefetchMetricsPrefix = "trie/prefetch/"
  28. )
  29. // triePrefetcher is an active prefetcher, which receives accounts or storage
  30. // items and does trie-loading of them. The goal is to get as much useful content
  31. // into the caches as possible.
  32. //
  33. // Note, the prefetcher's API is not thread safe.
  34. type triePrefetcher struct {
  35. db Database // Database to fetch trie nodes through
  36. root common.Hash // Root hash of theaccount trie for metrics
  37. fetches map[common.Hash]Trie // Partially or fully fetcher tries
  38. fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
  39. abortChan chan *subfetcher
  40. closeChan chan struct{}
  41. deliveryMissMeter metrics.Meter
  42. accountLoadMeter metrics.Meter
  43. accountDupMeter metrics.Meter
  44. accountSkipMeter metrics.Meter
  45. accountWasteMeter metrics.Meter
  46. storageLoadMeter metrics.Meter
  47. storageDupMeter metrics.Meter
  48. storageSkipMeter metrics.Meter
  49. storageWasteMeter metrics.Meter
  50. }
  51. // newTriePrefetcher
  52. func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
  53. prefix := triePrefetchMetricsPrefix + namespace
  54. p := &triePrefetcher{
  55. db: db,
  56. root: root,
  57. fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
  58. abortChan: make(chan *subfetcher, abortChanSize),
  59. closeChan: make(chan struct{}),
  60. deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
  61. accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
  62. accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
  63. accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
  64. accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
  65. storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
  66. storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
  67. storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
  68. storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
  69. }
  70. go p.abortLoop()
  71. return p
  72. }
  73. func (p *triePrefetcher) abortLoop() {
  74. for {
  75. select {
  76. case fetcher := <-p.abortChan:
  77. fetcher.abort()
  78. case <-p.closeChan:
  79. // drain fetcher channel
  80. for {
  81. select {
  82. case fetcher := <-p.abortChan:
  83. fetcher.abort()
  84. default:
  85. return
  86. }
  87. }
  88. }
  89. }
  90. }
  91. // close iterates over all the subfetchers, aborts any that were left spinning
  92. // and reports the stats to the metrics subsystem.
  93. func (p *triePrefetcher) close() {
  94. for _, fetcher := range p.fetchers {
  95. p.abortChan <- fetcher // safe to do multiple times
  96. <-fetcher.term
  97. if metrics.Enabled {
  98. if fetcher.root == p.root {
  99. p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
  100. p.accountDupMeter.Mark(int64(fetcher.dups))
  101. p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
  102. for _, key := range fetcher.used {
  103. delete(fetcher.seen, string(key))
  104. }
  105. p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
  106. } else {
  107. p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
  108. p.storageDupMeter.Mark(int64(fetcher.dups))
  109. p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
  110. for _, key := range fetcher.used {
  111. delete(fetcher.seen, string(key))
  112. }
  113. p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
  114. }
  115. }
  116. }
  117. close(p.closeChan)
  118. // Clear out all fetchers (will crash on a second call, deliberate)
  119. p.fetchers = nil
  120. }
  121. // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
  122. // already loaded will be copied over, but no goroutines will be started. This
  123. // is mostly used in the miner which creates a copy of it's actively mutated
  124. // state to be sealed while it may further mutate the state.
  125. func (p *triePrefetcher) copy() *triePrefetcher {
  126. copy := &triePrefetcher{
  127. db: p.db,
  128. root: p.root,
  129. fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map
  130. deliveryMissMeter: p.deliveryMissMeter,
  131. accountLoadMeter: p.accountLoadMeter,
  132. accountDupMeter: p.accountDupMeter,
  133. accountSkipMeter: p.accountSkipMeter,
  134. accountWasteMeter: p.accountWasteMeter,
  135. storageLoadMeter: p.storageLoadMeter,
  136. storageDupMeter: p.storageDupMeter,
  137. storageSkipMeter: p.storageSkipMeter,
  138. storageWasteMeter: p.storageWasteMeter,
  139. }
  140. // If the prefetcher is already a copy, duplicate the data
  141. if p.fetches != nil {
  142. for root, fetch := range p.fetches {
  143. copy.fetches[root] = p.db.CopyTrie(fetch)
  144. }
  145. return copy
  146. }
  147. // Otherwise we're copying an active fetcher, retrieve the current states
  148. for root, fetcher := range p.fetchers {
  149. copy.fetches[root] = fetcher.peek()
  150. }
  151. return copy
  152. }
  153. // prefetch schedules a batch of trie items to prefetch.
  154. func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash common.Hash) {
  155. // If the prefetcher is an inactive one, bail out
  156. if p.fetches != nil {
  157. return
  158. }
  159. // Active fetcher, schedule the retrievals
  160. fetcher := p.fetchers[root]
  161. if fetcher == nil {
  162. fetcher = newSubfetcher(p.db, root, accountHash)
  163. p.fetchers[root] = fetcher
  164. }
  165. fetcher.schedule(keys)
  166. }
  167. // trie returns the trie matching the root hash, or nil if the prefetcher doesn't
  168. // have it.
  169. func (p *triePrefetcher) trie(root common.Hash) Trie {
  170. // If the prefetcher is inactive, return from existing deep copies
  171. if p.fetches != nil {
  172. trie := p.fetches[root]
  173. if trie == nil {
  174. p.deliveryMissMeter.Mark(1)
  175. return nil
  176. }
  177. return p.db.CopyTrie(trie)
  178. }
  179. // Otherwise the prefetcher is active, bail if no trie was prefetched for this root
  180. fetcher := p.fetchers[root]
  181. if fetcher == nil {
  182. p.deliveryMissMeter.Mark(1)
  183. return nil
  184. }
  185. // Interrupt the prefetcher if it's by any chance still running and return
  186. // a copy of any pre-loaded trie.
  187. p.abortChan <- fetcher // safe to do multiple times
  188. trie := fetcher.peek()
  189. if trie == nil {
  190. p.deliveryMissMeter.Mark(1)
  191. return nil
  192. }
  193. return trie
  194. }
  195. // used marks a batch of state items used to allow creating statistics as to
  196. // how useful or wasteful the prefetcher is.
  197. func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
  198. if fetcher := p.fetchers[root]; fetcher != nil {
  199. fetcher.used = used
  200. }
  201. }
  202. // subfetcher is a trie fetcher goroutine responsible for pulling entries for a
  203. // single trie. It is spawned when a new root is encountered and lives until the
  204. // main prefetcher is paused and either all requested items are processed or if
  205. // the trie being worked on is retrieved from the prefetcher.
  206. type subfetcher struct {
  207. db Database // Database to load trie nodes through
  208. root common.Hash // Root hash of the trie to prefetch
  209. trie Trie // Trie being populated with nodes
  210. tasks [][]byte // Items queued up for retrieval
  211. lock sync.Mutex // Lock protecting the task queue
  212. wake chan struct{} // Wake channel if a new task is scheduled
  213. stop chan struct{} // Channel to interrupt processing
  214. term chan struct{} // Channel to signal iterruption
  215. copy chan chan Trie // Channel to request a copy of the current trie
  216. seen map[string]struct{} // Tracks the entries already loaded
  217. dups int // Number of duplicate preload tasks
  218. used [][]byte // Tracks the entries used in the end
  219. accountHash common.Hash
  220. }
  221. // newSubfetcher creates a goroutine to prefetch state items belonging to a
  222. // particular root hash.
  223. func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subfetcher {
  224. sf := &subfetcher{
  225. db: db,
  226. root: root,
  227. wake: make(chan struct{}, 1),
  228. stop: make(chan struct{}),
  229. term: make(chan struct{}),
  230. copy: make(chan chan Trie),
  231. seen: make(map[string]struct{}),
  232. accountHash: accountHash,
  233. }
  234. gopool.Submit(func() {
  235. sf.loop()
  236. })
  237. return sf
  238. }
  239. // schedule adds a batch of trie keys to the queue to prefetch.
  240. func (sf *subfetcher) schedule(keys [][]byte) {
  241. // Append the tasks to the current queue
  242. sf.lock.Lock()
  243. sf.tasks = append(sf.tasks, keys...)
  244. sf.lock.Unlock()
  245. // Notify the prefetcher, it's fine if it's already terminated
  246. select {
  247. case sf.wake <- struct{}{}:
  248. default:
  249. }
  250. }
  251. // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
  252. // is currently.
  253. func (sf *subfetcher) peek() Trie {
  254. ch := make(chan Trie)
  255. select {
  256. case sf.copy <- ch:
  257. // Subfetcher still alive, return copy from it
  258. return <-ch
  259. case <-sf.term:
  260. // Subfetcher already terminated, return a copy directly
  261. if sf.trie == nil {
  262. return nil
  263. }
  264. return sf.db.CopyTrie(sf.trie)
  265. }
  266. }
  267. // abort interrupts the subfetcher immediately. It is safe to call abort multiple
  268. // times but it is not thread safe.
  269. func (sf *subfetcher) abort() {
  270. select {
  271. case <-sf.stop:
  272. default:
  273. close(sf.stop)
  274. }
  275. <-sf.term
  276. }
  277. // loop waits for new tasks to be scheduled and keeps loading them until it runs
  278. // out of tasks or its underlying trie is retrieved for committing.
  279. func (sf *subfetcher) loop() {
  280. // No matter how the loop stops, signal anyone waiting that it's terminated
  281. defer close(sf.term)
  282. // Start by opening the trie and stop processing if it fails
  283. var trie Trie
  284. var err error
  285. if sf.accountHash == emptyAddr {
  286. trie, err = sf.db.OpenTrie(sf.root)
  287. } else {
  288. // address is useless
  289. trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root)
  290. }
  291. if err != nil {
  292. log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
  293. return
  294. }
  295. sf.trie = trie
  296. // Trie opened successfully, keep prefetching items
  297. for {
  298. select {
  299. case <-sf.wake:
  300. // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
  301. sf.lock.Lock()
  302. tasks := sf.tasks
  303. sf.tasks = nil
  304. sf.lock.Unlock()
  305. // Prefetch any tasks until the loop is interrupted
  306. for i, task := range tasks {
  307. select {
  308. case <-sf.stop:
  309. // If termination is requested, add any leftover back and return
  310. sf.lock.Lock()
  311. sf.tasks = append(sf.tasks, tasks[i:]...)
  312. sf.lock.Unlock()
  313. return
  314. case ch := <-sf.copy:
  315. // Somebody wants a copy of the current trie, grant them
  316. ch <- sf.db.CopyTrie(sf.trie)
  317. default:
  318. // No termination request yet, prefetch the next entry
  319. taskid := string(task)
  320. if _, ok := sf.seen[taskid]; ok {
  321. sf.dups++
  322. } else {
  323. sf.trie.TryGet(task)
  324. sf.seen[taskid] = struct{}{}
  325. }
  326. }
  327. }
  328. case ch := <-sf.copy:
  329. // Somebody wants a copy of the current trie, grant them
  330. ch <- sf.db.CopyTrie(sf.trie)
  331. case <-sf.stop:
  332. // Termination is requested, abort and leave remaining tasks
  333. return
  334. }
  335. }
  336. }