statesync.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package downloader
  17. import (
  18. "fmt"
  19. "hash"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/core/rawdb"
  24. "github.com/ethereum/go-ethereum/core/state"
  25. "github.com/ethereum/go-ethereum/ethdb"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/trie"
  28. "golang.org/x/crypto/sha3"
  29. )
  30. // stateReq represents a batch of state fetch requests grouped together into
  31. // a single data retrieval network packet.
  32. type stateReq struct {
  33. items []common.Hash // Hashes of the state items to download
  34. tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
  35. timeout time.Duration // Maximum round trip time for this to complete
  36. timer *time.Timer // Timer to fire when the RTT timeout expires
  37. peer *peerConnection // Peer that we're requesting from
  38. response [][]byte // Response data of the peer (nil for timeouts)
  39. dropped bool // Flag whether the peer dropped off early
  40. }
  41. // timedOut returns if this request timed out.
  42. func (req *stateReq) timedOut() bool {
  43. return req.response == nil
  44. }
  45. // stateSyncStats is a collection of progress stats to report during a state trie
  46. // sync to RPC requests as well as to display in user logs.
  47. type stateSyncStats struct {
  48. processed uint64 // Number of state entries processed
  49. duplicate uint64 // Number of state entries downloaded twice
  50. unexpected uint64 // Number of non-requested state entries received
  51. pending uint64 // Number of still pending state entries
  52. }
  53. // syncState starts downloading state with the given root hash.
  54. func (d *Downloader) syncState(root common.Hash) *stateSync {
  55. // Create the state sync
  56. s := newStateSync(d, root)
  57. select {
  58. case d.stateSyncStart <- s:
  59. case <-d.quitCh:
  60. s.err = errCancelStateFetch
  61. close(s.done)
  62. }
  63. return s
  64. }
  65. // stateFetcher manages the active state sync and accepts requests
  66. // on its behalf.
  67. func (d *Downloader) stateFetcher() {
  68. for {
  69. select {
  70. case s := <-d.stateSyncStart:
  71. for next := s; next != nil; {
  72. next = d.runStateSync(next)
  73. }
  74. case <-d.stateCh:
  75. // Ignore state responses while no sync is running.
  76. case <-d.quitCh:
  77. return
  78. }
  79. }
  80. }
  81. // runStateSync runs a state synchronisation until it completes or another root
  82. // hash is requested to be switched over to.
  83. func (d *Downloader) runStateSync(s *stateSync) *stateSync {
  84. var (
  85. active = make(map[string]*stateReq) // Currently in-flight requests
  86. finished []*stateReq // Completed or failed requests
  87. timeout = make(chan *stateReq) // Timed out active requests
  88. )
  89. defer func() {
  90. // Cancel active request timers on exit. Also set peers to idle so they're
  91. // available for the next sync.
  92. for _, req := range active {
  93. req.timer.Stop()
  94. req.peer.SetNodeDataIdle(len(req.items))
  95. }
  96. }()
  97. // Run the state sync.
  98. go s.run()
  99. defer s.Cancel()
  100. // Listen for peer departure events to cancel assigned tasks
  101. peerDrop := make(chan *peerConnection, 1024)
  102. peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
  103. defer peerSub.Unsubscribe()
  104. for {
  105. // Enable sending of the first buffered element if there is one.
  106. var (
  107. deliverReq *stateReq
  108. deliverReqCh chan *stateReq
  109. )
  110. if len(finished) > 0 {
  111. deliverReq = finished[0]
  112. deliverReqCh = s.deliver
  113. }
  114. select {
  115. // The stateSync lifecycle:
  116. case next := <-d.stateSyncStart:
  117. return next
  118. case <-s.done:
  119. return nil
  120. // Send the next finished request to the current sync:
  121. case deliverReqCh <- deliverReq:
  122. // Shift out the first request, but also set the emptied slot to nil for GC
  123. copy(finished, finished[1:])
  124. finished[len(finished)-1] = nil
  125. finished = finished[:len(finished)-1]
  126. // Handle incoming state packs:
  127. case pack := <-d.stateCh:
  128. // Discard any data not requested (or previously timed out)
  129. req := active[pack.PeerId()]
  130. if req == nil {
  131. log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
  132. continue
  133. }
  134. // Finalize the request and queue up for processing
  135. req.timer.Stop()
  136. req.response = pack.(*statePack).states
  137. finished = append(finished, req)
  138. delete(active, pack.PeerId())
  139. // Handle dropped peer connections:
  140. case p := <-peerDrop:
  141. // Skip if no request is currently pending
  142. req := active[p.id]
  143. if req == nil {
  144. continue
  145. }
  146. // Finalize the request and queue up for processing
  147. req.timer.Stop()
  148. req.dropped = true
  149. finished = append(finished, req)
  150. delete(active, p.id)
  151. // Handle timed-out requests:
  152. case req := <-timeout:
  153. // If the peer is already requesting something else, ignore the stale timeout.
  154. // This can happen when the timeout and the delivery happens simultaneously,
  155. // causing both pathways to trigger.
  156. if active[req.peer.id] != req {
  157. continue
  158. }
  159. // Move the timed out data back into the download queue
  160. finished = append(finished, req)
  161. delete(active, req.peer.id)
  162. // Track outgoing state requests:
  163. case req := <-d.trackStateReq:
  164. // If an active request already exists for this peer, we have a problem. In
  165. // theory the trie node schedule must never assign two requests to the same
  166. // peer. In practice however, a peer might receive a request, disconnect and
  167. // immediately reconnect before the previous times out. In this case the first
  168. // request is never honored, alas we must not silently overwrite it, as that
  169. // causes valid requests to go missing and sync to get stuck.
  170. if old := active[req.peer.id]; old != nil {
  171. log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
  172. // Make sure the previous one doesn't get siletly lost
  173. old.timer.Stop()
  174. old.dropped = true
  175. finished = append(finished, old)
  176. }
  177. // Start a timer to notify the sync loop if the peer stalled.
  178. req.timer = time.AfterFunc(req.timeout, func() {
  179. select {
  180. case timeout <- req:
  181. case <-s.done:
  182. // Prevent leaking of timer goroutines in the unlikely case where a
  183. // timer is fired just before exiting runStateSync.
  184. }
  185. })
  186. active[req.peer.id] = req
  187. }
  188. }
  189. }
  190. // stateSync schedules requests for downloading a particular state trie defined
  191. // by a given state root.
  192. type stateSync struct {
  193. d *Downloader // Downloader instance to access and manage current peerset
  194. sched *trie.Sync // State trie sync scheduler defining the tasks
  195. keccak hash.Hash // Keccak256 hasher to verify deliveries with
  196. tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
  197. numUncommitted int
  198. bytesUncommitted int
  199. deliver chan *stateReq // Delivery channel multiplexing peer responses
  200. cancel chan struct{} // Channel to signal a termination request
  201. cancelOnce sync.Once // Ensures cancel only ever gets called once
  202. done chan struct{} // Channel to signal termination completion
  203. err error // Any error hit during sync (set before completion)
  204. }
  205. // stateTask represents a single trie node download task, containing a set of
  206. // peers already attempted retrieval from to detect stalled syncs and abort.
  207. type stateTask struct {
  208. attempts map[string]struct{}
  209. }
  210. // newStateSync creates a new state trie download scheduler. This method does not
  211. // yet start the sync. The user needs to call run to initiate.
  212. func newStateSync(d *Downloader, root common.Hash) *stateSync {
  213. return &stateSync{
  214. d: d,
  215. sched: state.NewStateSync(root, d.stateDB, d.stateBloom),
  216. keccak: sha3.NewLegacyKeccak256(),
  217. tasks: make(map[common.Hash]*stateTask),
  218. deliver: make(chan *stateReq),
  219. cancel: make(chan struct{}),
  220. done: make(chan struct{}),
  221. }
  222. }
  223. // run starts the task assignment and response processing loop, blocking until
  224. // it finishes, and finally notifying any goroutines waiting for the loop to
  225. // finish.
  226. func (s *stateSync) run() {
  227. s.err = s.loop()
  228. close(s.done)
  229. }
  230. // Wait blocks until the sync is done or canceled.
  231. func (s *stateSync) Wait() error {
  232. <-s.done
  233. return s.err
  234. }
  235. // Cancel cancels the sync and waits until it has shut down.
  236. func (s *stateSync) Cancel() error {
  237. s.cancelOnce.Do(func() { close(s.cancel) })
  238. return s.Wait()
  239. }
  240. // loop is the main event loop of a state trie sync. It it responsible for the
  241. // assignment of new tasks to peers (including sending it to them) as well as
  242. // for the processing of inbound data. Note, that the loop does not directly
  243. // receive data from peers, rather those are buffered up in the downloader and
  244. // pushed here async. The reason is to decouple processing from data receipt
  245. // and timeouts.
  246. func (s *stateSync) loop() (err error) {
  247. // Listen for new peer events to assign tasks to them
  248. newPeer := make(chan *peerConnection, 1024)
  249. peerSub := s.d.peers.SubscribeNewPeers(newPeer)
  250. defer peerSub.Unsubscribe()
  251. defer func() {
  252. cerr := s.commit(true)
  253. if err == nil {
  254. err = cerr
  255. }
  256. }()
  257. // Keep assigning new tasks until the sync completes or aborts
  258. for s.sched.Pending() > 0 {
  259. if err = s.commit(false); err != nil {
  260. return err
  261. }
  262. s.assignTasks()
  263. // Tasks assigned, wait for something to happen
  264. select {
  265. case <-newPeer:
  266. // New peer arrived, try to assign it download tasks
  267. case <-s.cancel:
  268. return errCancelStateFetch
  269. case <-s.d.cancelCh:
  270. return errCanceled
  271. case req := <-s.deliver:
  272. // Response, disconnect or timeout triggered, drop the peer if stalling
  273. log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
  274. if len(req.items) <= 2 && !req.dropped && req.timedOut() {
  275. // 2 items are the minimum requested, if even that times out, we've no use of
  276. // this peer at the moment.
  277. log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
  278. if s.d.dropPeer == nil {
  279. // The dropPeer method is nil when `--copydb` is used for a local copy.
  280. // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
  281. req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id)
  282. } else {
  283. s.d.dropPeer(req.peer.id)
  284. // If this peer was the master peer, abort sync immediately
  285. s.d.cancelLock.RLock()
  286. master := req.peer.id == s.d.cancelPeer
  287. s.d.cancelLock.RUnlock()
  288. if master {
  289. s.d.cancel()
  290. return errTimeout
  291. }
  292. }
  293. }
  294. // Process all the received blobs and check for stale delivery
  295. delivered, err := s.process(req)
  296. if err != nil {
  297. log.Warn("Node data write error", "err", err)
  298. return err
  299. }
  300. req.peer.SetNodeDataIdle(delivered)
  301. }
  302. }
  303. return nil
  304. }
  305. func (s *stateSync) commit(force bool) error {
  306. if !force && s.bytesUncommitted < ethdb.IdealBatchSize {
  307. return nil
  308. }
  309. start := time.Now()
  310. b := s.d.stateDB.NewBatch()
  311. if err := s.sched.Commit(b); err != nil {
  312. return err
  313. }
  314. if err := b.Write(); err != nil {
  315. return fmt.Errorf("DB write error: %v", err)
  316. }
  317. s.updateStats(s.numUncommitted, 0, 0, time.Since(start))
  318. s.numUncommitted = 0
  319. s.bytesUncommitted = 0
  320. return nil
  321. }
  322. // assignTasks attempts to assign new tasks to all idle peers, either from the
  323. // batch currently being retried, or fetching new data from the trie sync itself.
  324. func (s *stateSync) assignTasks() {
  325. // Iterate over all idle peers and try to assign them state fetches
  326. peers, _ := s.d.peers.NodeDataIdlePeers()
  327. for _, p := range peers {
  328. // Assign a batch of fetches proportional to the estimated latency/bandwidth
  329. cap := p.NodeDataCapacity(s.d.requestRTT())
  330. req := &stateReq{peer: p, timeout: s.d.requestTTL()}
  331. s.fillTasks(cap, req)
  332. // If the peer was assigned tasks to fetch, send the network request
  333. if len(req.items) > 0 {
  334. req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
  335. select {
  336. case s.d.trackStateReq <- req:
  337. req.peer.FetchNodeData(req.items)
  338. case <-s.cancel:
  339. case <-s.d.cancelCh:
  340. }
  341. }
  342. }
  343. }
  344. // fillTasks fills the given request object with a maximum of n state download
  345. // tasks to send to the remote peer.
  346. func (s *stateSync) fillTasks(n int, req *stateReq) {
  347. // Refill available tasks from the scheduler.
  348. if len(s.tasks) < n {
  349. new := s.sched.Missing(n - len(s.tasks))
  350. for _, hash := range new {
  351. s.tasks[hash] = &stateTask{make(map[string]struct{})}
  352. }
  353. }
  354. // Find tasks that haven't been tried with the request's peer.
  355. req.items = make([]common.Hash, 0, n)
  356. req.tasks = make(map[common.Hash]*stateTask, n)
  357. for hash, t := range s.tasks {
  358. // Stop when we've gathered enough requests
  359. if len(req.items) == n {
  360. break
  361. }
  362. // Skip any requests we've already tried from this peer
  363. if _, ok := t.attempts[req.peer.id]; ok {
  364. continue
  365. }
  366. // Assign the request to this peer
  367. t.attempts[req.peer.id] = struct{}{}
  368. req.items = append(req.items, hash)
  369. req.tasks[hash] = t
  370. delete(s.tasks, hash)
  371. }
  372. }
  373. // process iterates over a batch of delivered state data, injecting each item
  374. // into a running state sync, re-queuing any items that were requested but not
  375. // delivered. Returns whether the peer actually managed to deliver anything of
  376. // value, and any error that occurred.
  377. func (s *stateSync) process(req *stateReq) (int, error) {
  378. // Collect processing stats and update progress if valid data was received
  379. duplicate, unexpected, successful := 0, 0, 0
  380. defer func(start time.Time) {
  381. if duplicate > 0 || unexpected > 0 {
  382. s.updateStats(0, duplicate, unexpected, time.Since(start))
  383. }
  384. }(time.Now())
  385. // Iterate over all the delivered data and inject one-by-one into the trie
  386. for _, blob := range req.response {
  387. _, hash, err := s.processNodeData(blob)
  388. switch err {
  389. case nil:
  390. s.numUncommitted++
  391. s.bytesUncommitted += len(blob)
  392. successful++
  393. case trie.ErrNotRequested:
  394. unexpected++
  395. case trie.ErrAlreadyProcessed:
  396. duplicate++
  397. default:
  398. return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
  399. }
  400. delete(req.tasks, hash)
  401. }
  402. // Put unfulfilled tasks back into the retry queue
  403. npeers := s.d.peers.Len()
  404. for hash, task := range req.tasks {
  405. // If the node did deliver something, missing items may be due to a protocol
  406. // limit or a previous timeout + delayed delivery. Both cases should permit
  407. // the node to retry the missing items (to avoid single-peer stalls).
  408. if len(req.response) > 0 || req.timedOut() {
  409. delete(task.attempts, req.peer.id)
  410. }
  411. // If we've requested the node too many times already, it may be a malicious
  412. // sync where nobody has the right data. Abort.
  413. if len(task.attempts) >= npeers {
  414. return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
  415. }
  416. // Missing item, place into the retry queue.
  417. s.tasks[hash] = task
  418. }
  419. return successful, nil
  420. }
  421. // processNodeData tries to inject a trie node data blob delivered from a remote
  422. // peer into the state trie, returning whether anything useful was written or any
  423. // error occurred.
  424. func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
  425. res := trie.SyncResult{Data: blob}
  426. s.keccak.Reset()
  427. s.keccak.Write(blob)
  428. s.keccak.Sum(res.Hash[:0])
  429. committed, _, err := s.sched.Process([]trie.SyncResult{res})
  430. return committed, res.Hash, err
  431. }
  432. // updateStats bumps the various state sync progress counters and displays a log
  433. // message for the user to see.
  434. func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) {
  435. s.d.syncStatsLock.Lock()
  436. defer s.d.syncStatsLock.Unlock()
  437. s.d.syncStatsState.pending = uint64(s.sched.Pending())
  438. s.d.syncStatsState.processed += uint64(written)
  439. s.d.syncStatsState.duplicate += uint64(duplicate)
  440. s.d.syncStatsState.unexpected += uint64(unexpected)
  441. if written > 0 || duplicate > 0 || unexpected > 0 {
  442. log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
  443. }
  444. if written > 0 {
  445. rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed)
  446. }
  447. }