sync.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync/atomic"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/core/rawdb"
  24. "github.com/ethereum/go-ethereum/core/types"
  25. "github.com/ethereum/go-ethereum/eth/downloader"
  26. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  27. "github.com/ethereum/go-ethereum/log"
  28. )
  29. const (
  30. forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
  31. defaultMinSyncPeers = 5 // Amount of peers desired to start syncing
  32. )
  33. // syncTransactions starts sending all currently pending transactions to the given peer.
  34. func (h *handler) syncTransactions(p *eth.Peer) {
  35. // Assemble the set of transaction to broadcast or announce to the remote
  36. // peer. Fun fact, this is quite an expensive operation as it needs to sort
  37. // the transactions if the sorting is not cached yet. However, with a random
  38. // order, insertions could overflow the non-executable queues and get dropped.
  39. //
  40. // TODO(karalabe): Figure out if we could get away with random order somehow
  41. var txs types.Transactions
  42. pending := h.txpool.Pending(false)
  43. for _, batch := range pending {
  44. txs = append(txs, batch...)
  45. }
  46. if len(txs) == 0 {
  47. return
  48. }
  49. // The eth/65 protocol introduces proper transaction announcements, so instead
  50. // of dripping transactions across multiple peers, just send the entire list as
  51. // an announcement and let the remote side decide what they need (likely nothing).
  52. hashes := make([]common.Hash, len(txs))
  53. for i, tx := range txs {
  54. hashes[i] = tx.Hash()
  55. }
  56. p.AsyncSendPooledTransactionHashes(hashes)
  57. }
  58. // chainSyncer coordinates blockchain sync components.
  59. type chainSyncer struct {
  60. handler *handler
  61. force *time.Timer
  62. forced bool // true when force timer fired
  63. warned time.Time
  64. peerEventCh chan struct{}
  65. doneCh chan error // non-nil when sync is running
  66. }
  67. // chainSyncOp is a scheduled sync operation.
  68. type chainSyncOp struct {
  69. mode downloader.SyncMode
  70. peer *eth.Peer
  71. td *big.Int
  72. head common.Hash
  73. }
  74. // newChainSyncer creates a chainSyncer.
  75. func newChainSyncer(handler *handler) *chainSyncer {
  76. return &chainSyncer{
  77. handler: handler,
  78. peerEventCh: make(chan struct{}),
  79. }
  80. }
  81. // handlePeerEvent notifies the syncer about a change in the peer set.
  82. // This is called for new peers and every time a peer announces a new
  83. // chain head.
  84. func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool {
  85. select {
  86. case cs.peerEventCh <- struct{}{}:
  87. return true
  88. case <-cs.handler.quitSync:
  89. return false
  90. }
  91. }
  92. // loop runs in its own goroutine and launches the sync when necessary.
  93. func (cs *chainSyncer) loop() {
  94. defer cs.handler.wg.Done()
  95. cs.handler.blockFetcher.Start()
  96. cs.handler.txFetcher.Start()
  97. defer cs.handler.blockFetcher.Stop()
  98. defer cs.handler.txFetcher.Stop()
  99. defer cs.handler.downloader.Terminate()
  100. // The force timer lowers the peer count threshold down to one when it fires.
  101. // This ensures we'll always start sync even if there aren't enough peers.
  102. cs.force = time.NewTimer(forceSyncCycle)
  103. defer cs.force.Stop()
  104. for {
  105. if op := cs.nextSyncOp(); op != nil {
  106. cs.startSync(op)
  107. }
  108. select {
  109. case <-cs.peerEventCh:
  110. // Peer information changed, recheck.
  111. case err := <-cs.doneCh:
  112. cs.doneCh = nil
  113. cs.force.Reset(forceSyncCycle)
  114. cs.forced = false
  115. // If we've reached the merge transition but no beacon client is available, or
  116. // it has not yet switched us over, keep warning the user that their infra is
  117. // potentially flaky.
  118. if errors.Is(err, downloader.ErrMergeTransition) && time.Since(cs.warned) > 10*time.Second {
  119. log.Warn("Local chain is post-merge, waiting for beacon client sync switch-over...")
  120. cs.warned = time.Now()
  121. }
  122. case <-cs.force.C:
  123. cs.forced = true
  124. case <-cs.handler.quitSync:
  125. // Disable all insertion on the blockchain. This needs to happen before
  126. // terminating the downloader because the downloader waits for blockchain
  127. // inserts, and these can take a long time to finish.
  128. cs.handler.chain.StopInsert()
  129. cs.handler.downloader.Terminate()
  130. if cs.doneCh != nil {
  131. <-cs.doneCh
  132. }
  133. return
  134. }
  135. }
  136. }
  137. // nextSyncOp determines whether sync is required at this time.
  138. func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
  139. if cs.doneCh != nil {
  140. return nil // Sync already running
  141. }
  142. // If a beacon client once took over control, disable the entire legacy sync
  143. // path from here on end. Note, there is a slight "race" between reaching TTD
  144. // and the beacon client taking over. The downloader will enforce that nothing
  145. // above the first TTD will be delivered to the chain for import.
  146. //
  147. // An alternative would be to check the local chain for exceeding the TTD and
  148. // avoid triggering a sync in that case, but that could also miss sibling or
  149. // other family TTD block being accepted.
  150. if cs.handler.chain.Config().TerminalTotalDifficultyPassed || cs.handler.merger.TDDReached() {
  151. return nil
  152. }
  153. // Ensure we're at minimum peer count.
  154. minPeers := defaultMinSyncPeers
  155. if cs.forced {
  156. minPeers = 1
  157. } else if minPeers > cs.handler.maxPeers {
  158. minPeers = cs.handler.maxPeers
  159. }
  160. if cs.handler.peers.len() < minPeers {
  161. return nil
  162. }
  163. // We have enough peers, pick the one with the highest TD, but avoid going
  164. // over the terminal total difficulty. Above that we expect the consensus
  165. // clients to direct the chain head to sync to.
  166. peer := cs.handler.peers.peerWithHighestTD()
  167. if peer == nil {
  168. return nil
  169. }
  170. mode, ourTD := cs.modeAndLocalHead()
  171. op := peerToSyncOp(mode, peer)
  172. if op.td.Cmp(ourTD) <= 0 {
  173. // We seem to be in sync according to the legacy rules. In the merge
  174. // world, it can also mean we're stuck on the merge block, waiting for
  175. // a beacon client. In the latter case, notify the user.
  176. if ttd := cs.handler.chain.Config().TerminalTotalDifficulty; ttd != nil && ourTD.Cmp(ttd) >= 0 && time.Since(cs.warned) > 10*time.Second {
  177. log.Warn("Local chain is post-merge, waiting for beacon client sync switch-over...")
  178. cs.warned = time.Now()
  179. }
  180. return nil // We're in sync
  181. }
  182. return op
  183. }
  184. func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp {
  185. peerHead, peerTD := p.Head()
  186. return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead}
  187. }
  188. func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
  189. // If we're in snap sync mode, return that directly
  190. if atomic.LoadUint32(&cs.handler.snapSync) == 1 {
  191. block := cs.handler.chain.CurrentFastBlock()
  192. td := cs.handler.chain.GetTd(block.Hash(), block.NumberU64())
  193. return downloader.SnapSync, td
  194. }
  195. // We are probably in full sync, but we might have rewound to before the
  196. // snap sync pivot, check if we should reenable
  197. if pivot := rawdb.ReadLastPivotNumber(cs.handler.database); pivot != nil {
  198. if head := cs.handler.chain.CurrentBlock(); head.NumberU64() < *pivot {
  199. block := cs.handler.chain.CurrentFastBlock()
  200. td := cs.handler.chain.GetTd(block.Hash(), block.NumberU64())
  201. return downloader.SnapSync, td
  202. }
  203. }
  204. // Nope, we're really full syncing
  205. head := cs.handler.chain.CurrentBlock()
  206. td := cs.handler.chain.GetTd(head.Hash(), head.NumberU64())
  207. return downloader.FullSync, td
  208. }
  209. // startSync launches doSync in a new goroutine.
  210. func (cs *chainSyncer) startSync(op *chainSyncOp) {
  211. cs.doneCh = make(chan error, 1)
  212. go func() { cs.doneCh <- cs.handler.doSync(op) }()
  213. }
  214. // doSync synchronizes the local blockchain with a remote peer.
  215. func (h *handler) doSync(op *chainSyncOp) error {
  216. if op.mode == downloader.SnapSync {
  217. // Before launch the snap sync, we have to ensure user uses the same
  218. // txlookup limit.
  219. // The main concern here is: during the snap sync Geth won't index the
  220. // block(generate tx indices) before the HEAD-limit. But if user changes
  221. // the limit in the next snap sync(e.g. user kill Geth manually and
  222. // restart) then it will be hard for Geth to figure out the oldest block
  223. // has been indexed. So here for the user-experience wise, it's non-optimal
  224. // that user can't change limit during the snap sync. If changed, Geth
  225. // will just blindly use the original one.
  226. limit := h.chain.TxLookupLimit()
  227. if stored := rawdb.ReadFastTxLookupLimit(h.database); stored == nil {
  228. rawdb.WriteFastTxLookupLimit(h.database, limit)
  229. } else if *stored != limit {
  230. h.chain.SetTxLookupLimit(*stored)
  231. log.Warn("Update txLookup limit", "provided", limit, "updated", *stored)
  232. }
  233. }
  234. // Run the sync cycle, and disable snap sync if we're past the pivot block
  235. ttd := h.chain.Config().TerminalTotalDifficulty
  236. if h.chain.Config().EthPoWForkSupport {
  237. ttd = nil
  238. }
  239. err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, ttd, op.mode)
  240. if err != nil {
  241. return err
  242. }
  243. if atomic.LoadUint32(&h.snapSync) == 1 {
  244. log.Info("Snap sync complete, auto disabling")
  245. atomic.StoreUint32(&h.snapSync, 0)
  246. }
  247. // If we've successfully finished a sync cycle and passed any required checkpoint,
  248. // enable accepting transactions from the network.
  249. head := h.chain.CurrentBlock()
  250. if head.NumberU64() >= h.checkpointNumber {
  251. // Checkpoint passed, sanity check the timestamp to have a fallback mechanism
  252. // for non-checkpointed (number = 0) private networks.
  253. if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) {
  254. atomic.StoreUint32(&h.acceptTxs, 1)
  255. }
  256. }
  257. if head.NumberU64() > 0 {
  258. // We've completed a sync cycle, notify all peers of new state. This path is
  259. // essential in star-topology networks where a gateway node needs to notify
  260. // all its out-of-date peers of the availability of a new block. This failure
  261. // scenario will most often crop up in private and hackathon networks with
  262. // degenerate connectivity, but it should be healthy for the mainnet too to
  263. // more reliably update peers or the local TD state.
  264. h.BroadcastBlock(head, false)
  265. }
  266. return nil
  267. }