trie_prefetcher.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package state
  17. import (
  18. "sync"
  19. "github.com/ethereum/go-ethereum/common"
  20. "github.com/ethereum/go-ethereum/log"
  21. "github.com/ethereum/go-ethereum/metrics"
  22. )
  23. const abortChanSize = 64
  24. var (
  25. // triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
  26. triePrefetchMetricsPrefix = "trie/prefetch/"
  27. )
  28. // triePrefetcher is an active prefetcher, which receives accounts or storage
  29. // items and does trie-loading of them. The goal is to get as much useful content
  30. // into the caches as possible.
  31. //
  32. // Note, the prefetcher's API is not thread safe.
  33. type triePrefetcher struct {
  34. db Database // Database to fetch trie nodes through
  35. root common.Hash // Root hash of theaccount trie for metrics
  36. fetches map[common.Hash]Trie // Partially or fully fetcher tries
  37. fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
  38. abortChan chan *subfetcher
  39. closeChan chan struct{}
  40. deliveryMissMeter metrics.Meter
  41. accountLoadMeter metrics.Meter
  42. accountDupMeter metrics.Meter
  43. accountSkipMeter metrics.Meter
  44. accountWasteMeter metrics.Meter
  45. storageLoadMeter metrics.Meter
  46. storageDupMeter metrics.Meter
  47. storageSkipMeter metrics.Meter
  48. storageWasteMeter metrics.Meter
  49. }
  50. // newTriePrefetcher
  51. func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
  52. prefix := triePrefetchMetricsPrefix + namespace
  53. p := &triePrefetcher{
  54. db: db,
  55. root: root,
  56. fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
  57. abortChan: make(chan *subfetcher, abortChanSize),
  58. closeChan: make(chan struct{}),
  59. deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
  60. accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
  61. accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
  62. accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
  63. accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
  64. storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
  65. storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
  66. storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
  67. storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
  68. }
  69. go p.abortLoop()
  70. return p
  71. }
  72. func (p *triePrefetcher) abortLoop() {
  73. for {
  74. select {
  75. case fetcher := <-p.abortChan:
  76. fetcher.abort()
  77. case <-p.closeChan:
  78. // drain fetcher channel
  79. for {
  80. select {
  81. case fetcher := <-p.abortChan:
  82. fetcher.abort()
  83. default:
  84. return
  85. }
  86. }
  87. }
  88. }
  89. }
  90. // close iterates over all the subfetchers, aborts any that were left spinning
  91. // and reports the stats to the metrics subsystem.
  92. func (p *triePrefetcher) close() {
  93. for _, fetcher := range p.fetchers {
  94. p.abortChan <- fetcher // safe to do multiple times
  95. <-fetcher.term
  96. if metrics.EnabledExpensive {
  97. if fetcher.root == p.root {
  98. p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
  99. p.accountDupMeter.Mark(int64(fetcher.dups))
  100. p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
  101. for _, key := range fetcher.used {
  102. delete(fetcher.seen, string(key))
  103. }
  104. p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
  105. } else {
  106. p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
  107. p.storageDupMeter.Mark(int64(fetcher.dups))
  108. p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
  109. for _, key := range fetcher.used {
  110. delete(fetcher.seen, string(key))
  111. }
  112. p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
  113. }
  114. }
  115. }
  116. close(p.closeChan)
  117. // Clear out all fetchers (will crash on a second call, deliberate)
  118. p.fetchers = nil
  119. }
  120. // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
  121. // already loaded will be copied over, but no goroutines will be started. This
  122. // is mostly used in the miner which creates a copy of it's actively mutated
  123. // state to be sealed while it may further mutate the state.
  124. func (p *triePrefetcher) copy() *triePrefetcher {
  125. copy := &triePrefetcher{
  126. db: p.db,
  127. root: p.root,
  128. fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map
  129. deliveryMissMeter: p.deliveryMissMeter,
  130. accountLoadMeter: p.accountLoadMeter,
  131. accountDupMeter: p.accountDupMeter,
  132. accountSkipMeter: p.accountSkipMeter,
  133. accountWasteMeter: p.accountWasteMeter,
  134. storageLoadMeter: p.storageLoadMeter,
  135. storageDupMeter: p.storageDupMeter,
  136. storageSkipMeter: p.storageSkipMeter,
  137. storageWasteMeter: p.storageWasteMeter,
  138. }
  139. // If the prefetcher is already a copy, duplicate the data
  140. if p.fetches != nil {
  141. for root, fetch := range p.fetches {
  142. copy.fetches[root] = p.db.CopyTrie(fetch)
  143. }
  144. return copy
  145. }
  146. // Otherwise we're copying an active fetcher, retrieve the current states
  147. for root, fetcher := range p.fetchers {
  148. copy.fetches[root] = fetcher.peek()
  149. }
  150. return copy
  151. }
  152. // prefetch schedules a batch of trie items to prefetch.
  153. func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash common.Hash) {
  154. // If the prefetcher is an inactive one, bail out
  155. if p.fetches != nil {
  156. return
  157. }
  158. // Active fetcher, schedule the retrievals
  159. fetcher := p.fetchers[root]
  160. if fetcher == nil {
  161. fetcher = newSubfetcher(p.db, root, accountHash)
  162. p.fetchers[root] = fetcher
  163. }
  164. fetcher.schedule(keys)
  165. }
  166. // trie returns the trie matching the root hash, or nil if the prefetcher doesn't
  167. // have it.
  168. func (p *triePrefetcher) trie(root common.Hash) Trie {
  169. // If the prefetcher is inactive, return from existing deep copies
  170. if p.fetches != nil {
  171. trie := p.fetches[root]
  172. if trie == nil {
  173. p.deliveryMissMeter.Mark(1)
  174. return nil
  175. }
  176. return p.db.CopyTrie(trie)
  177. }
  178. // Otherwise the prefetcher is active, bail if no trie was prefetched for this root
  179. fetcher := p.fetchers[root]
  180. if fetcher == nil {
  181. p.deliveryMissMeter.Mark(1)
  182. return nil
  183. }
  184. // Interrupt the prefetcher if it's by any chance still running and return
  185. // a copy of any pre-loaded trie.
  186. p.abortChan <- fetcher // safe to do multiple times
  187. trie := fetcher.peek()
  188. if trie == nil {
  189. p.deliveryMissMeter.Mark(1)
  190. return nil
  191. }
  192. return trie
  193. }
  194. // used marks a batch of state items used to allow creating statistics as to
  195. // how useful or wasteful the prefetcher is.
  196. func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
  197. if fetcher := p.fetchers[root]; fetcher != nil {
  198. fetcher.used = used
  199. }
  200. }
  201. // subfetcher is a trie fetcher goroutine responsible for pulling entries for a
  202. // single trie. It is spawned when a new root is encountered and lives until the
  203. // main prefetcher is paused and either all requested items are processed or if
  204. // the trie being worked on is retrieved from the prefetcher.
  205. type subfetcher struct {
  206. db Database // Database to load trie nodes through
  207. root common.Hash // Root hash of the trie to prefetch
  208. trie Trie // Trie being populated with nodes
  209. tasks [][]byte // Items queued up for retrieval
  210. lock sync.Mutex // Lock protecting the task queue
  211. wake chan struct{} // Wake channel if a new task is scheduled
  212. stop chan struct{} // Channel to interrupt processing
  213. term chan struct{} // Channel to signal iterruption
  214. copy chan chan Trie // Channel to request a copy of the current trie
  215. seen map[string]struct{} // Tracks the entries already loaded
  216. dups int // Number of duplicate preload tasks
  217. used [][]byte // Tracks the entries used in the end
  218. accountHash common.Hash
  219. }
  220. // newSubfetcher creates a goroutine to prefetch state items belonging to a
  221. // particular root hash.
  222. func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subfetcher {
  223. sf := &subfetcher{
  224. db: db,
  225. root: root,
  226. wake: make(chan struct{}, 1),
  227. stop: make(chan struct{}),
  228. term: make(chan struct{}),
  229. copy: make(chan chan Trie),
  230. seen: make(map[string]struct{}),
  231. accountHash: accountHash,
  232. }
  233. go sf.loop()
  234. return sf
  235. }
  236. // schedule adds a batch of trie keys to the queue to prefetch.
  237. func (sf *subfetcher) schedule(keys [][]byte) {
  238. // Append the tasks to the current queue
  239. sf.lock.Lock()
  240. sf.tasks = append(sf.tasks, keys...)
  241. sf.lock.Unlock()
  242. // Notify the prefetcher, it's fine if it's already terminated
  243. select {
  244. case sf.wake <- struct{}{}:
  245. default:
  246. }
  247. }
  248. // peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
  249. // is currently.
  250. func (sf *subfetcher) peek() Trie {
  251. ch := make(chan Trie)
  252. select {
  253. case sf.copy <- ch:
  254. // Subfetcher still alive, return copy from it
  255. return <-ch
  256. case <-sf.term:
  257. // Subfetcher already terminated, return a copy directly
  258. if sf.trie == nil {
  259. return nil
  260. }
  261. return sf.db.CopyTrie(sf.trie)
  262. }
  263. }
  264. // abort interrupts the subfetcher immediately. It is safe to call abort multiple
  265. // times but it is not thread safe.
  266. func (sf *subfetcher) abort() {
  267. select {
  268. case <-sf.stop:
  269. default:
  270. close(sf.stop)
  271. }
  272. <-sf.term
  273. }
  274. // loop waits for new tasks to be scheduled and keeps loading them until it runs
  275. // out of tasks or its underlying trie is retrieved for committing.
  276. func (sf *subfetcher) loop() {
  277. // No matter how the loop stops, signal anyone waiting that it's terminated
  278. defer close(sf.term)
  279. // Start by opening the trie and stop processing if it fails
  280. var trie Trie
  281. var err error
  282. if sf.accountHash == emptyAddr {
  283. trie, err = sf.db.OpenTrie(sf.root)
  284. } else {
  285. // address is useless
  286. trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root)
  287. }
  288. if err != nil {
  289. log.Debug("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
  290. }
  291. sf.trie = trie
  292. // Trie opened successfully, keep prefetching items
  293. for {
  294. select {
  295. case <-sf.wake:
  296. // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
  297. if sf.trie == nil {
  298. if sf.accountHash == emptyAddr {
  299. sf.trie, err = sf.db.OpenTrie(sf.root)
  300. } else {
  301. // address is useless
  302. sf.trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root)
  303. }
  304. if err != nil {
  305. continue
  306. }
  307. }
  308. sf.lock.Lock()
  309. tasks := sf.tasks
  310. sf.tasks = nil
  311. sf.lock.Unlock()
  312. // Prefetch any tasks until the loop is interrupted
  313. for i, task := range tasks {
  314. select {
  315. case <-sf.stop:
  316. // If termination is requested, add any leftover back and return
  317. sf.lock.Lock()
  318. sf.tasks = append(sf.tasks, tasks[i:]...)
  319. sf.lock.Unlock()
  320. return
  321. case ch := <-sf.copy:
  322. // Somebody wants a copy of the current trie, grant them
  323. ch <- sf.db.CopyTrie(sf.trie)
  324. default:
  325. // No termination request yet, prefetch the next entry
  326. taskid := string(task)
  327. if _, ok := sf.seen[taskid]; ok {
  328. sf.dups++
  329. } else {
  330. sf.trie.TryGet(task)
  331. sf.seen[taskid] = struct{}{}
  332. }
  333. }
  334. }
  335. case ch := <-sf.copy:
  336. // Somebody wants a copy of the current trie, grant them
  337. ch <- sf.db.CopyTrie(sf.trie)
  338. case <-sf.stop:
  339. // Termination is requested, abort and leave remaining tasks
  340. return
  341. }
  342. }
  343. }