chain_indexer.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package core implements the Ethereum consensus protocol.
  17. package core
  18. import (
  19. "encoding/binary"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/event"
  26. )
  27. // ChainIndexer does a post-processing job for equally sized sections of the canonical
  28. // chain (like BlooomBits and CHT structures). A ChainIndexer is connected to the blockchain
  29. // through the event system by starting a ChainEventLoop in a goroutine.
  30. // Further child ChainIndexers can be added which use the output of the parent section
  31. // indexer. These child indexers receive new head notifications only after an entire section
  32. // has been finished or in case of rollbacks that might affect already finished sections.
  33. type ChainIndexer struct {
  34. chainDb, indexDb ethdb.Database
  35. backend ChainIndexerBackend
  36. sectionSize, confirmReq uint64
  37. stop chan struct{}
  38. lock sync.Mutex
  39. procWait time.Duration
  40. tryUpdate chan struct{}
  41. stored, targetCount, calcIdx, lastForwarded uint64
  42. updating bool
  43. children []*ChainIndexer
  44. }
  45. // ChainIndexerBackend interface is a backend for the indexer doing the actual post-processing job
  46. type ChainIndexerBackend interface {
  47. Reset(section uint64) // start processing a new section
  48. Process(header *types.Header) // process a single block (called for each block in the section)
  49. Commit(db ethdb.Database) error // do some more processing if necessary and store the results in the database
  50. UpdateMsg(done, all uint64) // print a progress update message if necessary (only called when multiple sections need to be processed)
  51. }
  52. // NewChainIndexer creates a new ChainIndexer
  53. // db: database where the index of available processed sections is stored (the index is stored by the
  54. // indexer, the actual processed chain data is stored by the backend)
  55. // dbKey: key prefix where the index is stored
  56. // backend: an implementation of ChainIndexerBackend
  57. // sectionSize: the size of processable sections
  58. // confirmReq: required number of confirmation blocks before a new section is being processed
  59. // procWait: waiting time between processing sections (simple way of limiting the resource usage of a db upgrade)
  60. // stop: quit channel
  61. func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, sectionSize, confirmReq uint64, procWait time.Duration, stop chan struct{}) *ChainIndexer {
  62. c := &ChainIndexer{
  63. chainDb: chainDb,
  64. indexDb: indexDb,
  65. backend: backend,
  66. sectionSize: sectionSize,
  67. confirmReq: confirmReq,
  68. tryUpdate: make(chan struct{}, 1),
  69. stop: stop,
  70. procWait: procWait,
  71. }
  72. c.stored = c.getValidSections()
  73. go c.updateLoop()
  74. return c
  75. }
  76. // updateLoop is the main event loop of the indexer
  77. func (c *ChainIndexer) updateLoop() {
  78. updateMsg := false
  79. for {
  80. select {
  81. case <-c.stop:
  82. return
  83. case <-c.tryUpdate:
  84. c.lock.Lock()
  85. if c.targetCount > c.stored {
  86. if !updateMsg && c.targetCount > c.stored+1 {
  87. updateMsg = true
  88. c.backend.UpdateMsg(c.stored, c.targetCount)
  89. }
  90. c.calcIdx = c.stored
  91. var lastSectionHead common.Hash
  92. if c.calcIdx > 0 {
  93. lastSectionHead = c.getSectionHead(c.calcIdx - 1)
  94. }
  95. c.lock.Unlock()
  96. sectionHead, ok := c.processSection(c.calcIdx, lastSectionHead)
  97. c.lock.Lock()
  98. if ok && lastSectionHead == c.getSectionHead(c.calcIdx-1) {
  99. c.stored = c.calcIdx + 1
  100. c.setSectionHead(c.calcIdx, sectionHead)
  101. c.setValidSections(c.stored)
  102. if updateMsg {
  103. c.backend.UpdateMsg(c.stored, c.targetCount)
  104. if c.stored >= c.targetCount {
  105. updateMsg = false
  106. }
  107. }
  108. c.lastForwarded = c.stored*c.sectionSize - 1
  109. for _, cp := range c.children {
  110. cp.newHead(c.lastForwarded, false)
  111. }
  112. } else {
  113. // if processing has failed, do not retry until further notification
  114. c.targetCount = c.stored
  115. }
  116. }
  117. if c.targetCount > c.stored {
  118. go func() {
  119. time.Sleep(c.procWait)
  120. c.tryUpdate <- struct{}{}
  121. }()
  122. } else {
  123. c.updating = false
  124. }
  125. c.lock.Unlock()
  126. }
  127. }
  128. }
  129. // ChainEventLoop runs in a goroutine and feeds blockchain events to the indexer by calling newHead
  130. // (not needed for child indexers where the parent calls newHead)
  131. func (c *ChainIndexer) ChainEventLoop(currentHeader *types.Header, eventMux *event.TypeMux) {
  132. sub := eventMux.Subscribe(ChainEvent{})
  133. c.newHead(currentHeader.Number.Uint64(), false)
  134. lastHead := currentHeader.Hash()
  135. for {
  136. select {
  137. case <-c.stop:
  138. return
  139. case ev := <-sub.Chan():
  140. header := ev.Data.(ChainEvent).Block.Header()
  141. c.newHead(header.Number.Uint64(), header.ParentHash != lastHead)
  142. lastHead = header.Hash()
  143. }
  144. }
  145. }
  146. // AddChildIndexer adds a child ChainIndexer that can use the output of this one
  147. func (c *ChainIndexer) AddChildIndexer(ci *ChainIndexer) {
  148. c.children = append(c.children, ci)
  149. }
  150. // newHead notifies the indexer about new chain heads or rollbacks
  151. func (c *ChainIndexer) newHead(headNum uint64, rollback bool) {
  152. c.lock.Lock()
  153. defer c.lock.Unlock()
  154. if rollback {
  155. firstChanged := headNum / c.sectionSize
  156. if firstChanged < c.targetCount {
  157. c.targetCount = firstChanged
  158. }
  159. if firstChanged < c.stored {
  160. c.stored = firstChanged
  161. c.setValidSections(c.stored)
  162. }
  163. headNum = firstChanged * c.sectionSize
  164. if headNum < c.lastForwarded {
  165. c.lastForwarded = headNum
  166. for _, cp := range c.children {
  167. cp.newHead(c.lastForwarded, true)
  168. }
  169. }
  170. } else {
  171. var newCount uint64
  172. if headNum >= c.confirmReq {
  173. newCount = (headNum + 1 - c.confirmReq) / c.sectionSize
  174. if newCount > c.targetCount {
  175. c.targetCount = newCount
  176. if !c.updating {
  177. c.updating = true
  178. c.tryUpdate <- struct{}{}
  179. }
  180. }
  181. }
  182. }
  183. }
  184. // processSection processes an entire section by calling backend functions while ensuring
  185. // the continuity of the passed headers. Since the chain mutex is not held while processing,
  186. // the continuity can be broken by a long reorg, in which case the function returns with ok == false.
  187. func (c *ChainIndexer) processSection(section uint64, lastSectionHead common.Hash) (sectionHead common.Hash, ok bool) {
  188. c.backend.Reset(section)
  189. head := lastSectionHead
  190. for i := section * c.sectionSize; i < (section+1)*c.sectionSize; i++ {
  191. hash := GetCanonicalHash(c.chainDb, i)
  192. if hash == (common.Hash{}) {
  193. return common.Hash{}, false
  194. }
  195. header := GetHeader(c.chainDb, hash, i)
  196. if header == nil || header.ParentHash != head {
  197. return common.Hash{}, false
  198. }
  199. c.backend.Process(header)
  200. head = header.Hash()
  201. }
  202. if err := c.backend.Commit(c.chainDb); err != nil {
  203. return common.Hash{}, false
  204. }
  205. return head, true
  206. }
  207. // CanonicalSections returns the number of processed sections that are consistent with
  208. // the current canonical chain
  209. func (c *ChainIndexer) CanonicalSections() uint64 {
  210. c.lock.Lock()
  211. defer c.lock.Unlock()
  212. cnt := c.getValidSections()
  213. for cnt > 0 {
  214. if c.getSectionHead(cnt-1) == GetCanonicalHash(c.chainDb, cnt*c.sectionSize-1) {
  215. break
  216. }
  217. cnt--
  218. c.setValidSections(cnt)
  219. }
  220. return cnt
  221. }
  222. // getValidSections reads the number of valid sections from the index database
  223. func (c *ChainIndexer) getValidSections() uint64 {
  224. data, _ := c.indexDb.Get([]byte("count"))
  225. if len(data) == 8 {
  226. return binary.BigEndian.Uint64(data[:])
  227. }
  228. return 0
  229. }
  230. // setValidSections writes the number of valid sections to the index database
  231. func (c *ChainIndexer) setValidSections(cnt uint64) {
  232. oldCnt := c.getValidSections()
  233. if cnt < oldCnt {
  234. for i := cnt; i < oldCnt; i++ {
  235. c.removeSectionHead(i)
  236. }
  237. }
  238. var data [8]byte
  239. binary.BigEndian.PutUint64(data[:], cnt)
  240. c.indexDb.Put([]byte("count"), data[:])
  241. }
  242. // getSectionHead reads the last block hash of a processed section from the index database
  243. func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash {
  244. var data [8]byte
  245. binary.BigEndian.PutUint64(data[:], idx)
  246. hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...))
  247. if len(hash) == len(common.Hash{}) {
  248. return common.BytesToHash(hash)
  249. }
  250. return common.Hash{}
  251. }
  252. // setSectionHead writes the last block hash of a processed section to the index database
  253. func (c *ChainIndexer) setSectionHead(idx uint64, shead common.Hash) {
  254. var data [8]byte
  255. binary.BigEndian.PutUint64(data[:], idx)
  256. c.indexDb.Put(append([]byte("shead"), data[:]...), shead.Bytes())
  257. }
  258. // removeSectionHead removes the reference to a processed section from the index database
  259. func (c *ChainIndexer) removeSectionHead(idx uint64) {
  260. var data [8]byte
  261. binary.BigEndian.PutUint64(data[:], idx)
  262. c.indexDb.Delete(append([]byte("shead"), data[:]...))
  263. }