sync.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package trie
  17. import (
  18. "errors"
  19. "fmt"
  20. "github.com/ethereum/go-ethereum/common"
  21. "github.com/ethereum/go-ethereum/common/prque"
  22. "github.com/ethereum/go-ethereum/core/rawdb"
  23. "github.com/ethereum/go-ethereum/ethdb"
  24. "github.com/ethereum/go-ethereum/log"
  25. )
  26. // ErrNotRequested is returned by the trie sync when it's requested to process a
  27. // node it did not request.
  28. var ErrNotRequested = errors.New("not requested")
  29. // ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
  30. // node it already processed previously.
  31. var ErrAlreadyProcessed = errors.New("already processed")
  32. // maxFetchesPerDepth is the maximum number of pending trie nodes per depth. The
  33. // role of this value is to limit the number of trie nodes that get expanded in
  34. // memory if the node was configured with a significant number of peers.
  35. const maxFetchesPerDepth = 16384
  36. // SyncPath is a path tuple identifying a particular trie node either in a single
  37. // trie (account) or a layered trie (account -> storage).
  38. //
  39. // Content wise the tuple either has 1 element if it addresses a node in a single
  40. // trie or 2 elements if it addresses a node in a stacked trie.
  41. //
  42. // To support aiming arbitrary trie nodes, the path needs to support odd nibble
  43. // lengths. To avoid transferring expanded hex form over the network, the last
  44. // part of the tuple (which needs to index into the middle of a trie) is compact
  45. // encoded. In case of a 2-tuple, the first item is always 32 bytes so that is
  46. // simple binary encoded.
  47. //
  48. // Examples:
  49. // - Path 0x9 -> {0x19}
  50. // - Path 0x99 -> {0x0099}
  51. // - Path 0x01234567890123456789012345678901012345678901234567890123456789019 -> {0x0123456789012345678901234567890101234567890123456789012345678901, 0x19}
  52. // - Path 0x012345678901234567890123456789010123456789012345678901234567890199 -> {0x0123456789012345678901234567890101234567890123456789012345678901, 0x0099}
  53. type SyncPath [][]byte
  54. // NewSyncPath converts an expanded trie path from nibble form into a compact
  55. // version that can be sent over the network.
  56. func NewSyncPath(path []byte) SyncPath {
  57. // If the hash is from the account trie, append a single item, if it
  58. // is from the a storage trie, append a tuple. Note, the length 64 is
  59. // clashing between account leaf and storage root. It's fine though
  60. // because having a trie node at 64 depth means a hash collision was
  61. // found and we're long dead.
  62. if len(path) < 64 {
  63. return SyncPath{hexToCompact(path)}
  64. }
  65. return SyncPath{hexToKeybytes(path[:64]), hexToCompact(path[64:])}
  66. }
  67. // nodeRequest represents a scheduled or already in-flight trie node retrieval request.
  68. type nodeRequest struct {
  69. hash common.Hash // Hash of the trie node to retrieve
  70. path []byte // Merkle path leading to this node for prioritization
  71. data []byte // Data content of the node, cached until all subtrees complete
  72. parent *nodeRequest // Parent state node referencing this entry
  73. deps int // Number of dependencies before allowed to commit this node
  74. callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
  75. }
  76. // codeRequest represents a scheduled or already in-flight bytecode retrieval request.
  77. type codeRequest struct {
  78. hash common.Hash // Hash of the contract bytecode to retrieve
  79. path []byte // Merkle path leading to this node for prioritization
  80. data []byte // Data content of the node, cached until all subtrees complete
  81. parents []*nodeRequest // Parent state nodes referencing this entry (notify all upon completion)
  82. }
  83. // NodeSyncResult is a response with requested trie node along with its node path.
  84. type NodeSyncResult struct {
  85. Path string // Path of the originally unknown trie node
  86. Data []byte // Data content of the retrieved trie node
  87. }
  88. // CodeSyncResult is a response with requested bytecode along with its hash.
  89. type CodeSyncResult struct {
  90. Hash common.Hash // Hash the originally unknown bytecode
  91. Data []byte // Data content of the retrieved bytecode
  92. }
  93. // syncMemBatch is an in-memory buffer of successfully downloaded but not yet
  94. // persisted data items.
  95. type syncMemBatch struct {
  96. nodes map[string][]byte // In-memory membatch of recently completed nodes
  97. hashes map[string]common.Hash // Hashes of recently completed nodes
  98. codes map[common.Hash][]byte // In-memory membatch of recently completed codes
  99. }
  100. // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
  101. func newSyncMemBatch() *syncMemBatch {
  102. return &syncMemBatch{
  103. nodes: make(map[string][]byte),
  104. hashes: make(map[string]common.Hash),
  105. codes: make(map[common.Hash][]byte),
  106. }
  107. }
  108. // hasNode reports the trie node with specific path is already cached.
  109. func (batch *syncMemBatch) hasNode(path []byte) bool {
  110. _, ok := batch.nodes[string(path)]
  111. return ok
  112. }
  113. // hasCode reports the contract code with specific hash is already cached.
  114. func (batch *syncMemBatch) hasCode(hash common.Hash) bool {
  115. _, ok := batch.codes[hash]
  116. return ok
  117. }
  118. // Sync is the main state trie synchronisation scheduler, which provides yet
  119. // unknown trie hashes to retrieve, accepts node data associated with said hashes
  120. // and reconstructs the trie step by step until all is done.
  121. type Sync struct {
  122. database ethdb.KeyValueReader // Persistent database to check for existing entries
  123. membatch *syncMemBatch // Memory buffer to avoid frequent database writes
  124. nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path
  125. codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash
  126. queue *prque.Prque // Priority queue with the pending requests
  127. fetches map[int]int // Number of active fetches per trie node depth
  128. }
  129. // NewSync creates a new trie data download scheduler.
  130. func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback) *Sync {
  131. ts := &Sync{
  132. database: database,
  133. membatch: newSyncMemBatch(),
  134. nodeReqs: make(map[string]*nodeRequest),
  135. codeReqs: make(map[common.Hash]*codeRequest),
  136. queue: prque.New(nil),
  137. fetches: make(map[int]int),
  138. }
  139. ts.AddSubTrie(root, nil, common.Hash{}, nil, callback)
  140. return ts
  141. }
  142. // AddSubTrie registers a new trie to the sync code, rooted at the designated
  143. // parent for completion tracking. The given path is a unique node path in
  144. // hex format and contain all the parent path if it's layered trie node.
  145. func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, parentPath []byte, callback LeafCallback) {
  146. // Short circuit if the trie is empty or already known
  147. if root == emptyRoot {
  148. return
  149. }
  150. if s.membatch.hasNode(path) {
  151. return
  152. }
  153. if rawdb.HasTrieNode(s.database, root) {
  154. return
  155. }
  156. // Assemble the new sub-trie sync request
  157. req := &nodeRequest{
  158. hash: root,
  159. path: path,
  160. callback: callback,
  161. }
  162. // If this sub-trie has a designated parent, link them together
  163. if parent != (common.Hash{}) {
  164. ancestor := s.nodeReqs[string(parentPath)]
  165. if ancestor == nil {
  166. panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
  167. }
  168. ancestor.deps++
  169. req.parent = ancestor
  170. }
  171. s.scheduleNodeRequest(req)
  172. }
  173. // AddCodeEntry schedules the direct retrieval of a contract code that should not
  174. // be interpreted as a trie node, but rather accepted and stored into the database
  175. // as is.
  176. func (s *Sync) AddCodeEntry(hash common.Hash, path []byte, parent common.Hash, parentPath []byte) {
  177. // Short circuit if the entry is empty or already known
  178. if hash == emptyState {
  179. return
  180. }
  181. if s.membatch.hasCode(hash) {
  182. return
  183. }
  184. // If database says duplicate, the blob is present for sure.
  185. // Note we only check the existence with new code scheme, fast
  186. // sync is expected to run with a fresh new node. Even there
  187. // exists the code with legacy format, fetch and store with
  188. // new scheme anyway.
  189. if rawdb.HasCodeWithPrefix(s.database, hash) {
  190. return
  191. }
  192. // Assemble the new sub-trie sync request
  193. req := &codeRequest{
  194. path: path,
  195. hash: hash,
  196. }
  197. // If this sub-trie has a designated parent, link them together
  198. if parent != (common.Hash{}) {
  199. ancestor := s.nodeReqs[string(parentPath)] // the parent of codereq can ONLY be nodereq
  200. if ancestor == nil {
  201. panic(fmt.Sprintf("raw-entry ancestor not found: %x", parent))
  202. }
  203. ancestor.deps++
  204. req.parents = append(req.parents, ancestor)
  205. }
  206. s.scheduleCodeRequest(req)
  207. }
  208. // Missing retrieves the known missing nodes from the trie for retrieval. To aid
  209. // both eth/6x style fast sync and snap/1x style state sync, the paths of trie
  210. // nodes are returned too, as well as separate hash list for codes.
  211. func (s *Sync) Missing(max int) ([]string, []common.Hash, []common.Hash) {
  212. var (
  213. nodePaths []string
  214. nodeHashes []common.Hash
  215. codeHashes []common.Hash
  216. )
  217. for !s.queue.Empty() && (max == 0 || len(nodeHashes)+len(codeHashes) < max) {
  218. // Retrieve the next item in line
  219. item, prio := s.queue.Peek()
  220. // If we have too many already-pending tasks for this depth, throttle
  221. depth := int(prio >> 56)
  222. if s.fetches[depth] > maxFetchesPerDepth {
  223. break
  224. }
  225. // Item is allowed to be scheduled, add it to the task list
  226. s.queue.Pop()
  227. s.fetches[depth]++
  228. switch item := item.(type) {
  229. case common.Hash:
  230. codeHashes = append(codeHashes, item)
  231. case string:
  232. req, ok := s.nodeReqs[item]
  233. if !ok {
  234. log.Error("Missing node request", "path", item)
  235. continue // System very wrong, shouldn't happen
  236. }
  237. nodePaths = append(nodePaths, item)
  238. nodeHashes = append(nodeHashes, req.hash)
  239. }
  240. }
  241. return nodePaths, nodeHashes, codeHashes
  242. }
  243. // ProcessCode injects the received data for requested item. Note it can
  244. // happpen that the single response commits two pending requests(e.g.
  245. // there are two requests one for code and one for node but the hash
  246. // is same). In this case the second response for the same hash will
  247. // be treated as "non-requested" item or "already-processed" item but
  248. // there is no downside.
  249. func (s *Sync) ProcessCode(result CodeSyncResult) error {
  250. // If the code was not requested or it's already processed, bail out
  251. req := s.codeReqs[result.Hash]
  252. if req == nil {
  253. return ErrNotRequested
  254. }
  255. if req.data != nil {
  256. return ErrAlreadyProcessed
  257. }
  258. req.data = result.Data
  259. return s.commitCodeRequest(req)
  260. }
  261. // ProcessNode injects the received data for requested item. Note it can
  262. // happen that the single response commits two pending requests(e.g.
  263. // there are two requests one for code and one for node but the hash
  264. // is same). In this case the second response for the same hash will
  265. // be treated as "non-requested" item or "already-processed" item but
  266. // there is no downside.
  267. func (s *Sync) ProcessNode(result NodeSyncResult) error {
  268. // If the trie node was not requested or it's already processed, bail out
  269. req := s.nodeReqs[result.Path]
  270. if req == nil {
  271. return ErrNotRequested
  272. }
  273. if req.data != nil {
  274. return ErrAlreadyProcessed
  275. }
  276. // Decode the node data content and update the request
  277. node, err := decodeNode(req.hash.Bytes(), result.Data)
  278. if err != nil {
  279. return err
  280. }
  281. req.data = result.Data
  282. // Create and schedule a request for all the children nodes
  283. requests, err := s.children(req, node)
  284. if err != nil {
  285. return err
  286. }
  287. if len(requests) == 0 && req.deps == 0 {
  288. s.commitNodeRequest(req)
  289. } else {
  290. req.deps += len(requests)
  291. for _, child := range requests {
  292. s.scheduleNodeRequest(child)
  293. }
  294. }
  295. return nil
  296. }
  297. // Commit flushes the data stored in the internal membatch out to persistent
  298. // storage, returning any occurred error.
  299. func (s *Sync) Commit(dbw ethdb.Batch) error {
  300. // Dump the membatch into a database dbw
  301. for path, value := range s.membatch.nodes {
  302. rawdb.WriteTrieNode(dbw, s.membatch.hashes[path], value)
  303. }
  304. for hash, value := range s.membatch.codes {
  305. rawdb.WriteCode(dbw, hash, value)
  306. }
  307. // Drop the membatch data and return
  308. s.membatch = newSyncMemBatch()
  309. return nil
  310. }
  311. // Pending returns the number of state entries currently pending for download.
  312. func (s *Sync) Pending() int {
  313. return len(s.nodeReqs) + len(s.codeReqs)
  314. }
  315. // schedule inserts a new state retrieval request into the fetch queue. If there
  316. // is already a pending request for this node, the new request will be discarded
  317. // and only a parent reference added to the old one.
  318. func (s *Sync) scheduleNodeRequest(req *nodeRequest) {
  319. s.nodeReqs[string(req.path)] = req
  320. // Schedule the request for future retrieval. This queue is shared
  321. // by both node requests and code requests.
  322. prio := int64(len(req.path)) << 56 // depth >= 128 will never happen, storage leaves will be included in their parents
  323. for i := 0; i < 14 && i < len(req.path); i++ {
  324. prio |= int64(15-req.path[i]) << (52 - i*4) // 15-nibble => lexicographic order
  325. }
  326. s.queue.Push(string(req.path), prio)
  327. }
  328. // schedule inserts a new state retrieval request into the fetch queue. If there
  329. // is already a pending request for this node, the new request will be discarded
  330. // and only a parent reference added to the old one.
  331. func (s *Sync) scheduleCodeRequest(req *codeRequest) {
  332. // If we're already requesting this node, add a new reference and stop
  333. if old, ok := s.codeReqs[req.hash]; ok {
  334. old.parents = append(old.parents, req.parents...)
  335. return
  336. }
  337. s.codeReqs[req.hash] = req
  338. // Schedule the request for future retrieval. This queue is shared
  339. // by both node requests and code requests.
  340. prio := int64(len(req.path)) << 56 // depth >= 128 will never happen, storage leaves will be included in their parents
  341. for i := 0; i < 14 && i < len(req.path); i++ {
  342. prio |= int64(15-req.path[i]) << (52 - i*4) // 15-nibble => lexicographic order
  343. }
  344. s.queue.Push(req.hash, prio)
  345. }
  346. // children retrieves all the missing children of a state trie entry for future
  347. // retrieval scheduling.
  348. func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
  349. // Gather all the children of the node, irrelevant whether known or not
  350. type child struct {
  351. path []byte
  352. node node
  353. }
  354. var children []child
  355. switch node := (object).(type) {
  356. case *shortNode:
  357. key := node.Key
  358. if hasTerm(key) {
  359. key = key[:len(key)-1]
  360. }
  361. children = []child{{
  362. node: node.Val,
  363. path: append(append([]byte(nil), req.path...), key...),
  364. }}
  365. case *fullNode:
  366. for i := 0; i < 17; i++ {
  367. if node.Children[i] != nil {
  368. children = append(children, child{
  369. node: node.Children[i],
  370. path: append(append([]byte(nil), req.path...), byte(i)),
  371. })
  372. }
  373. }
  374. default:
  375. panic(fmt.Sprintf("unknown node: %+v", node))
  376. }
  377. // Iterate over the children, and request all unknown ones
  378. requests := make([]*nodeRequest, 0, len(children))
  379. for _, child := range children {
  380. // Notify any external watcher of a new key/value node
  381. if req.callback != nil {
  382. if node, ok := (child.node).(valueNode); ok {
  383. var paths [][]byte
  384. if len(child.path) == 2*common.HashLength {
  385. paths = append(paths, hexToKeybytes(child.path))
  386. } else if len(child.path) == 4*common.HashLength {
  387. paths = append(paths, hexToKeybytes(child.path[:2*common.HashLength]))
  388. paths = append(paths, hexToKeybytes(child.path[2*common.HashLength:]))
  389. }
  390. if err := req.callback(paths, child.path, node, req.hash, req.path); err != nil {
  391. return nil, err
  392. }
  393. }
  394. }
  395. // If the child references another node, resolve or schedule
  396. if node, ok := (child.node).(hashNode); ok {
  397. // Try to resolve the node from the local database
  398. if s.membatch.hasNode(child.path) {
  399. continue
  400. }
  401. // If database says duplicate, then at least the trie node is present
  402. // and we hold the assumption that it's NOT legacy contract code.
  403. chash := common.BytesToHash(node)
  404. if rawdb.HasTrieNode(s.database, chash) {
  405. continue
  406. }
  407. // Locally unknown node, schedule for retrieval
  408. requests = append(requests, &nodeRequest{
  409. path: child.path,
  410. hash: chash,
  411. parent: req,
  412. callback: req.callback,
  413. })
  414. }
  415. }
  416. return requests, nil
  417. }
  418. // commit finalizes a retrieval request and stores it into the membatch. If any
  419. // of the referencing parent requests complete due to this commit, they are also
  420. // committed themselves.
  421. func (s *Sync) commitNodeRequest(req *nodeRequest) error {
  422. // Write the node content to the membatch
  423. s.membatch.nodes[string(req.path)] = req.data
  424. s.membatch.hashes[string(req.path)] = req.hash
  425. delete(s.nodeReqs, string(req.path))
  426. s.fetches[len(req.path)]--
  427. // Check parent for completion
  428. if req.parent != nil {
  429. req.parent.deps--
  430. if req.parent.deps == 0 {
  431. if err := s.commitNodeRequest(req.parent); err != nil {
  432. return err
  433. }
  434. }
  435. }
  436. return nil
  437. }
  438. // commit finalizes a retrieval request and stores it into the membatch. If any
  439. // of the referencing parent requests complete due to this commit, they are also
  440. // committed themselves.
  441. func (s *Sync) commitCodeRequest(req *codeRequest) error {
  442. // Write the node content to the membatch
  443. s.membatch.codes[req.hash] = req.data
  444. delete(s.codeReqs, req.hash)
  445. s.fetches[len(req.path)]--
  446. // Check all parents for completion
  447. for _, parent := range req.parents {
  448. parent.deps--
  449. if parent.deps == 0 {
  450. if err := s.commitNodeRequest(parent); err != nil {
  451. return err
  452. }
  453. }
  454. }
  455. return nil
  456. }