handler.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "math"
  20. "math/big"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus"
  26. "github.com/ethereum/go-ethereum/consensus/beacon"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/forkid"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/eth/downloader"
  31. "github.com/ethereum/go-ethereum/eth/fetcher"
  32. "github.com/ethereum/go-ethereum/eth/protocols/eth"
  33. "github.com/ethereum/go-ethereum/eth/protocols/snap"
  34. "github.com/ethereum/go-ethereum/ethdb"
  35. "github.com/ethereum/go-ethereum/event"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/p2p"
  38. "github.com/ethereum/go-ethereum/params"
  39. )
  40. const (
  41. // txChanSize is the size of channel listening to NewTxsEvent.
  42. // The number is referenced from the size of tx pool.
  43. txChanSize = 4096
  44. )
  45. var (
  46. syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
  47. )
  48. // txPool defines the methods needed from a transaction pool implementation to
  49. // support all the operations needed by the Ethereum chain protocols.
  50. type txPool interface {
  51. // Has returns an indicator whether txpool has a transaction
  52. // cached with the given hash.
  53. Has(hash common.Hash) bool
  54. // Get retrieves the transaction from local txpool with given
  55. // tx hash.
  56. Get(hash common.Hash) *types.Transaction
  57. // AddRemotes should add the given transactions to the pool.
  58. AddRemotes([]*types.Transaction) []error
  59. // Pending should return pending transactions.
  60. // The slice should be modifiable by the caller.
  61. Pending(enforceTips bool) map[common.Address]types.Transactions
  62. // SubscribeNewTxsEvent should return an event subscription of
  63. // NewTxsEvent and send events to the given channel.
  64. SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
  65. }
  66. // handlerConfig is the collection of initialization parameters to create a full
  67. // node network handler.
  68. type handlerConfig struct {
  69. Database ethdb.Database // Database for direct sync insertions
  70. Chain *core.BlockChain // Blockchain to serve data from
  71. TxPool txPool // Transaction pool to propagate from
  72. Merger *consensus.Merger // The manager for eth1/2 transition
  73. Network uint64 // Network identifier to adfvertise
  74. Sync downloader.SyncMode // Whether to snap or full sync
  75. BloomCache uint64 // Megabytes to alloc for snap sync bloom
  76. EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
  77. Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
  78. RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
  79. }
  80. type handler struct {
  81. networkID uint64
  82. forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
  83. snapSync uint32 // Flag whether snap sync is enabled (gets disabled if we already have blocks)
  84. acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
  85. checkpointNumber uint64 // Block number for the sync progress validator to cross reference
  86. checkpointHash common.Hash // Block hash for the sync progress validator to cross reference
  87. database ethdb.Database
  88. txpool txPool
  89. chain *core.BlockChain
  90. maxPeers int
  91. downloader *downloader.Downloader
  92. blockFetcher *fetcher.BlockFetcher
  93. txFetcher *fetcher.TxFetcher
  94. peers *peerSet
  95. merger *consensus.Merger
  96. eventMux *event.TypeMux
  97. txsCh chan core.NewTxsEvent
  98. txsSub event.Subscription
  99. minedBlockSub *event.TypeMuxSubscription
  100. requiredBlocks map[uint64]common.Hash
  101. // channels for fetcher, syncer, txsyncLoop
  102. quitSync chan struct{}
  103. chainSync *chainSyncer
  104. wg sync.WaitGroup
  105. peerWG sync.WaitGroup
  106. }
  107. // newHandler returns a handler for all Ethereum chain management protocol.
  108. func newHandler(config *handlerConfig) (*handler, error) {
  109. // Create the protocol manager with the base fields
  110. if config.EventMux == nil {
  111. config.EventMux = new(event.TypeMux) // Nicety initialization for tests
  112. }
  113. h := &handler{
  114. networkID: config.Network,
  115. forkFilter: forkid.NewFilter(config.Chain),
  116. eventMux: config.EventMux,
  117. database: config.Database,
  118. txpool: config.TxPool,
  119. chain: config.Chain,
  120. peers: newPeerSet(),
  121. merger: config.Merger,
  122. requiredBlocks: config.RequiredBlocks,
  123. quitSync: make(chan struct{}),
  124. }
  125. if config.Sync == downloader.FullSync {
  126. // The database seems empty as the current block is the genesis. Yet the snap
  127. // block is ahead, so snap sync was enabled for this node at a certain point.
  128. // The scenarios where this can happen is
  129. // * if the user manually (or via a bad block) rolled back a snap sync node
  130. // below the sync point.
  131. // * the last snap sync is not finished while user specifies a full sync this
  132. // time. But we don't have any recent state for full sync.
  133. // In these cases however it's safe to reenable snap sync.
  134. fullBlock, fastBlock := h.chain.CurrentBlock(), h.chain.CurrentFastBlock()
  135. if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 {
  136. h.snapSync = uint32(1)
  137. log.Warn("Switch sync mode from full sync to snap sync")
  138. }
  139. } else {
  140. if h.chain.CurrentBlock().NumberU64() > 0 {
  141. // Print warning log if database is not empty to run snap sync.
  142. log.Warn("Switch sync mode from snap sync to full sync")
  143. } else {
  144. // If snap sync was requested and our database is empty, grant it
  145. h.snapSync = uint32(1)
  146. }
  147. }
  148. // If we have trusted checkpoints, enforce them on the chain
  149. if config.Checkpoint != nil {
  150. h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1
  151. h.checkpointHash = config.Checkpoint.SectionHead
  152. }
  153. // If sync succeeds, pass a callback to potentially disable snap sync mode
  154. // and enable transaction propagation.
  155. success := func() {
  156. // If we were running snap sync and it finished, disable doing another
  157. // round on next sync cycle
  158. if atomic.LoadUint32(&h.snapSync) == 1 {
  159. log.Info("Snap sync complete, auto disabling")
  160. atomic.StoreUint32(&h.snapSync, 0)
  161. }
  162. // If we've successfully finished a sync cycle and passed any required
  163. // checkpoint, enable accepting transactions from the network
  164. head := h.chain.CurrentBlock()
  165. if head.NumberU64() >= h.checkpointNumber {
  166. // Checkpoint passed, sanity check the timestamp to have a fallback mechanism
  167. // for non-checkpointed (number = 0) private networks.
  168. if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) {
  169. atomic.StoreUint32(&h.acceptTxs, 1)
  170. }
  171. }
  172. }
  173. // Construct the downloader (long sync)
  174. h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer, success)
  175. if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil {
  176. if h.chain.Config().TerminalTotalDifficultyPassed {
  177. log.Info("Chain post-merge, sync via beacon client")
  178. } else {
  179. head := h.chain.CurrentBlock()
  180. if td := h.chain.GetTd(head.Hash(), head.NumberU64()); td.Cmp(ttd) >= 0 {
  181. log.Info("Chain post-TTD, sync via beacon client")
  182. } else {
  183. log.Warn("Chain pre-merge, sync via PoW (ensure beacon client is ready)")
  184. }
  185. }
  186. } else if h.chain.Config().TerminalTotalDifficultyPassed {
  187. log.Error("Chain configured post-merge, but without TTD. Are you debugging sync?")
  188. }
  189. // Construct the fetcher (short sync)
  190. validator := func(header *types.Header) error {
  191. // All the block fetcher activities should be disabled
  192. // after the transition. Print the warning log.
  193. if h.merger.PoSFinalized() {
  194. log.Warn("Unexpected validation activity", "hash", header.Hash(), "number", header.Number)
  195. return errors.New("unexpected behavior after transition")
  196. }
  197. // Reject all the PoS style headers in the first place. No matter
  198. // the chain has finished the transition or not, the PoS headers
  199. // should only come from the trusted consensus layer instead of
  200. // p2p network.
  201. if beacon, ok := h.chain.Engine().(*beacon.Beacon); ok {
  202. if beacon.IsPoSHeader(header) {
  203. return errors.New("unexpected post-merge header")
  204. }
  205. }
  206. return h.chain.Engine().VerifyHeader(h.chain, header, true)
  207. }
  208. heighter := func() uint64 {
  209. return h.chain.CurrentBlock().NumberU64()
  210. }
  211. inserter := func(blocks types.Blocks) (int, error) {
  212. // All the block fetcher activities should be disabled
  213. // after the transition. Print the warning log.
  214. if h.merger.PoSFinalized() {
  215. var ctx []interface{}
  216. ctx = append(ctx, "blocks", len(blocks))
  217. if len(blocks) > 0 {
  218. ctx = append(ctx, "firsthash", blocks[0].Hash())
  219. ctx = append(ctx, "firstnumber", blocks[0].Number())
  220. ctx = append(ctx, "lasthash", blocks[len(blocks)-1].Hash())
  221. ctx = append(ctx, "lastnumber", blocks[len(blocks)-1].Number())
  222. }
  223. log.Warn("Unexpected insertion activity", ctx...)
  224. return 0, errors.New("unexpected behavior after transition")
  225. }
  226. // If sync hasn't reached the checkpoint yet, deny importing weird blocks.
  227. //
  228. // Ideally we would also compare the head block's timestamp and similarly reject
  229. // the propagated block if the head is too old. Unfortunately there is a corner
  230. // case when starting new networks, where the genesis might be ancient (0 unix)
  231. // which would prevent full nodes from accepting it.
  232. if h.chain.CurrentBlock().NumberU64() < h.checkpointNumber {
  233. log.Warn("Unsynced yet, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
  234. return 0, nil
  235. }
  236. // If snap sync is running, deny importing weird blocks. This is a problematic
  237. // clause when starting up a new network, because snap-syncing miners might not
  238. // accept each others' blocks until a restart. Unfortunately we haven't figured
  239. // out a way yet where nodes can decide unilaterally whether the network is new
  240. // or not. This should be fixed if we figure out a solution.
  241. if atomic.LoadUint32(&h.snapSync) == 1 {
  242. log.Warn("Snap syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
  243. return 0, nil
  244. }
  245. if h.merger.TDDReached() {
  246. // The blocks from the p2p network is regarded as untrusted
  247. // after the transition. In theory block gossip should be disabled
  248. // entirely whenever the transition is started. But in order to
  249. // handle the transition boundary reorg in the consensus-layer,
  250. // the legacy blocks are still accepted, but only for the terminal
  251. // pow blocks. Spec: https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#halt-the-importing-of-pow-blocks
  252. for i, block := range blocks {
  253. ptd := h.chain.GetTd(block.ParentHash(), block.NumberU64()-1)
  254. if ptd == nil {
  255. return 0, nil
  256. }
  257. td := new(big.Int).Add(ptd, block.Difficulty())
  258. if !h.chain.Config().IsTerminalPoWBlock(ptd, td) {
  259. log.Info("Filtered out non-termimal pow block", "number", block.NumberU64(), "hash", block.Hash())
  260. return 0, nil
  261. }
  262. if err := h.chain.InsertBlockWithoutSetHead(block); err != nil {
  263. return i, err
  264. }
  265. }
  266. return 0, nil
  267. }
  268. n, err := h.chain.InsertChain(blocks)
  269. if err == nil {
  270. atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
  271. }
  272. return n, err
  273. }
  274. h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
  275. fetchTx := func(peer string, hashes []common.Hash) error {
  276. p := h.peers.peer(peer)
  277. if p == nil {
  278. return errors.New("unknown peer")
  279. }
  280. return p.RequestTxs(hashes)
  281. }
  282. h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, h.txpool.AddRemotes, fetchTx)
  283. h.chainSync = newChainSyncer(h)
  284. return h, nil
  285. }
  286. // runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
  287. // various subsystems and starts handling messages.
  288. func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
  289. // If the peer has a `snap` extension, wait for it to connect so we can have
  290. // a uniform initialization/teardown mechanism
  291. snap, err := h.peers.waitSnapExtension(peer)
  292. if err != nil {
  293. peer.Log().Error("Snapshot extension barrier failed", "err", err)
  294. return err
  295. }
  296. // TODO(karalabe): Not sure why this is needed
  297. if !h.chainSync.handlePeerEvent(peer) {
  298. return p2p.DiscQuitting
  299. }
  300. h.peerWG.Add(1)
  301. defer h.peerWG.Done()
  302. // Execute the Ethereum handshake
  303. var (
  304. genesis = h.chain.Genesis()
  305. head = h.chain.CurrentHeader()
  306. hash = head.Hash()
  307. number = head.Number.Uint64()
  308. td = h.chain.GetTd(hash, number)
  309. )
  310. forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
  311. if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
  312. peer.Log().Debug("Ethereum handshake failed", "err", err)
  313. return err
  314. }
  315. reject := false // reserved peer slots
  316. if atomic.LoadUint32(&h.snapSync) == 1 {
  317. if snap == nil {
  318. // If we are running snap-sync, we want to reserve roughly half the peer
  319. // slots for peers supporting the snap protocol.
  320. // The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
  321. if all, snp := h.peers.len(), h.peers.snapLen(); all-snp > snp+5 {
  322. reject = true
  323. }
  324. }
  325. }
  326. // Ignore maxPeers if this is a trusted peer
  327. if !peer.Peer.Info().Network.Trusted {
  328. if reject || h.peers.len() >= h.maxPeers {
  329. return p2p.DiscTooManyPeers
  330. }
  331. }
  332. peer.Log().Debug("Ethereum peer connected", "name", peer.Name())
  333. // Register the peer locally
  334. if err := h.peers.registerPeer(peer, snap); err != nil {
  335. peer.Log().Error("Ethereum peer registration failed", "err", err)
  336. return err
  337. }
  338. defer h.unregisterPeer(peer.ID())
  339. p := h.peers.peer(peer.ID())
  340. if p == nil {
  341. return errors.New("peer dropped during handling")
  342. }
  343. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  344. if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil {
  345. peer.Log().Error("Failed to register peer in eth syncer", "err", err)
  346. return err
  347. }
  348. if snap != nil {
  349. if err := h.downloader.SnapSyncer.Register(snap); err != nil {
  350. peer.Log().Error("Failed to register peer in snap syncer", "err", err)
  351. return err
  352. }
  353. }
  354. h.chainSync.handlePeerEvent(peer)
  355. // Propagate existing transactions. new transactions appearing
  356. // after this will be sent via broadcasts.
  357. h.syncTransactions(peer)
  358. // Create a notification channel for pending requests if the peer goes down
  359. dead := make(chan struct{})
  360. defer close(dead)
  361. // If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse)
  362. if h.checkpointHash != (common.Hash{}) {
  363. // Request the peer's checkpoint header for chain height/weight validation
  364. resCh := make(chan *eth.Response)
  365. if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {
  366. return err
  367. }
  368. // Start a timer to disconnect if the peer doesn't reply in time
  369. go func() {
  370. timeout := time.NewTimer(syncChallengeTimeout)
  371. defer timeout.Stop()
  372. select {
  373. case res := <-resCh:
  374. headers := ([]*types.Header)(*res.Res.(*eth.BlockHeadersPacket))
  375. if len(headers) == 0 {
  376. // If we're doing a snap sync, we must enforce the checkpoint
  377. // block to avoid eclipse attacks. Unsynced nodes are welcome
  378. // to connect after we're done joining the network.
  379. if atomic.LoadUint32(&h.snapSync) == 1 {
  380. peer.Log().Warn("Dropping unsynced node during sync", "addr", peer.RemoteAddr(), "type", peer.Name())
  381. res.Done <- errors.New("unsynced node cannot serve sync")
  382. return
  383. }
  384. res.Done <- nil
  385. return
  386. }
  387. // Validate the header and either drop the peer or continue
  388. if len(headers) > 1 {
  389. res.Done <- errors.New("too many headers in checkpoint response")
  390. return
  391. }
  392. if headers[0].Hash() != h.checkpointHash {
  393. res.Done <- errors.New("checkpoint hash mismatch")
  394. return
  395. }
  396. res.Done <- nil
  397. case <-timeout.C:
  398. peer.Log().Warn("Checkpoint challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
  399. h.removePeer(peer.ID())
  400. case <-dead:
  401. // Peer handler terminated, abort all goroutines
  402. }
  403. }()
  404. }
  405. // If we have any explicit peer required block hashes, request them
  406. for number, hash := range h.requiredBlocks {
  407. resCh := make(chan *eth.Response)
  408. if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {
  409. return err
  410. }
  411. go func(number uint64, hash common.Hash) {
  412. timeout := time.NewTimer(syncChallengeTimeout)
  413. defer timeout.Stop()
  414. select {
  415. case res := <-resCh:
  416. headers := ([]*types.Header)(*res.Res.(*eth.BlockHeadersPacket))
  417. if len(headers) == 0 {
  418. // Required blocks are allowed to be missing if the remote
  419. // node is not yet synced
  420. res.Done <- nil
  421. return
  422. }
  423. // Validate the header and either drop the peer or continue
  424. if len(headers) > 1 {
  425. res.Done <- errors.New("too many headers in required block response")
  426. return
  427. }
  428. if headers[0].Number.Uint64() != number || headers[0].Hash() != hash {
  429. peer.Log().Info("Required block mismatch, dropping peer", "number", number, "hash", headers[0].Hash(), "want", hash)
  430. res.Done <- errors.New("required block mismatch")
  431. return
  432. }
  433. peer.Log().Debug("Peer required block verified", "number", number, "hash", hash)
  434. res.Done <- nil
  435. case <-timeout.C:
  436. peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
  437. h.removePeer(peer.ID())
  438. }
  439. }(number, hash)
  440. }
  441. // Handle incoming messages until the connection is torn down
  442. return handler(peer)
  443. }
  444. // runSnapExtension registers a `snap` peer into the joint eth/snap peerset and
  445. // starts handling inbound messages. As `snap` is only a satellite protocol to
  446. // `eth`, all subsystem registrations and lifecycle management will be done by
  447. // the main `eth` handler to prevent strange races.
  448. func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error {
  449. h.peerWG.Add(1)
  450. defer h.peerWG.Done()
  451. if err := h.peers.registerSnapExtension(peer); err != nil {
  452. peer.Log().Warn("Snapshot extension registration failed", "err", err)
  453. return err
  454. }
  455. return handler(peer)
  456. }
  457. // removePeer requests disconnection of a peer.
  458. func (h *handler) removePeer(id string) {
  459. peer := h.peers.peer(id)
  460. if peer != nil {
  461. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  462. }
  463. }
  464. // unregisterPeer removes a peer from the downloader, fetchers and main peer set.
  465. func (h *handler) unregisterPeer(id string) {
  466. // Create a custom logger to avoid printing the entire id
  467. var logger log.Logger
  468. if len(id) < 16 {
  469. // Tests use short IDs, don't choke on them
  470. logger = log.New("peer", id)
  471. } else {
  472. logger = log.New("peer", id[:8])
  473. }
  474. // Abort if the peer does not exist
  475. peer := h.peers.peer(id)
  476. if peer == nil {
  477. logger.Error("Ethereum peer removal failed", "err", errPeerNotRegistered)
  478. return
  479. }
  480. // Remove the `eth` peer if it exists
  481. logger.Debug("Removing Ethereum peer", "snap", peer.snapExt != nil)
  482. // Remove the `snap` extension if it exists
  483. if peer.snapExt != nil {
  484. h.downloader.SnapSyncer.Unregister(id)
  485. }
  486. h.downloader.UnregisterPeer(id)
  487. h.txFetcher.Drop(id)
  488. if err := h.peers.unregisterPeer(id); err != nil {
  489. logger.Error("Ethereum peer removal failed", "err", err)
  490. }
  491. }
  492. func (h *handler) Start(maxPeers int) {
  493. h.maxPeers = maxPeers
  494. // broadcast transactions
  495. h.wg.Add(1)
  496. h.txsCh = make(chan core.NewTxsEvent, txChanSize)
  497. h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
  498. go h.txBroadcastLoop()
  499. // broadcast mined blocks
  500. h.wg.Add(1)
  501. h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
  502. go h.minedBroadcastLoop()
  503. // start sync handlers
  504. h.wg.Add(1)
  505. go h.chainSync.loop()
  506. }
  507. func (h *handler) Stop() {
  508. h.txsSub.Unsubscribe() // quits txBroadcastLoop
  509. h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
  510. // Quit chainSync and txsync64.
  511. // After this is done, no new peers will be accepted.
  512. close(h.quitSync)
  513. h.wg.Wait()
  514. // Disconnect existing sessions.
  515. // This also closes the gate for any new registrations on the peer set.
  516. // sessions which are already established but not added to h.peers yet
  517. // will exit when they try to register.
  518. h.peers.close()
  519. h.peerWG.Wait()
  520. log.Info("Ethereum protocol stopped")
  521. }
  522. // BroadcastBlock will either propagate a block to a subset of its peers, or
  523. // will only announce its availability (depending what's requested).
  524. func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
  525. // Disable the block propagation if the chain has already entered the PoS
  526. // stage. The block propagation is delegated to the consensus layer.
  527. if h.merger.PoSFinalized() {
  528. return
  529. }
  530. // Disable the block propagation if it's the post-merge block.
  531. if beacon, ok := h.chain.Engine().(*beacon.Beacon); ok {
  532. if beacon.IsPoSHeader(block.Header()) {
  533. return
  534. }
  535. }
  536. hash := block.Hash()
  537. peers := h.peers.peersWithoutBlock(hash)
  538. // If propagation is requested, send to a subset of the peer
  539. if propagate {
  540. // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
  541. var td *big.Int
  542. if parent := h.chain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
  543. td = new(big.Int).Add(block.Difficulty(), h.chain.GetTd(block.ParentHash(), block.NumberU64()-1))
  544. } else {
  545. log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
  546. return
  547. }
  548. // Send the block to a subset of our peers
  549. transfer := peers[:int(math.Sqrt(float64(len(peers))))]
  550. for _, peer := range transfer {
  551. peer.AsyncSendNewBlock(block, td)
  552. }
  553. log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
  554. return
  555. }
  556. // Otherwise if the block is indeed in out own chain, announce it
  557. if h.chain.HasBlock(hash, block.NumberU64()) {
  558. for _, peer := range peers {
  559. peer.AsyncSendNewBlockHash(block)
  560. }
  561. log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
  562. }
  563. }
  564. // BroadcastTransactions will propagate a batch of transactions
  565. // - To a square root of all peers
  566. // - And, separately, as announcements to all peers which are not known to
  567. // already have the given transaction.
  568. func (h *handler) BroadcastTransactions(txs types.Transactions) {
  569. var (
  570. annoCount int // Count of announcements made
  571. annoPeers int
  572. directCount int // Count of the txs sent directly to peers
  573. directPeers int // Count of the peers that were sent transactions directly
  574. txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
  575. annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
  576. )
  577. // Broadcast transactions to a batch of peers not knowing about it
  578. for _, tx := range txs {
  579. peers := h.peers.peersWithoutTransaction(tx.Hash())
  580. // Send the tx unconditionally to a subset of our peers
  581. numDirect := int(math.Sqrt(float64(len(peers))))
  582. for _, peer := range peers[:numDirect] {
  583. txset[peer] = append(txset[peer], tx.Hash())
  584. }
  585. // For the remaining peers, send announcement only
  586. for _, peer := range peers[numDirect:] {
  587. annos[peer] = append(annos[peer], tx.Hash())
  588. }
  589. }
  590. for peer, hashes := range txset {
  591. directPeers++
  592. directCount += len(hashes)
  593. peer.AsyncSendTransactions(hashes)
  594. }
  595. for peer, hashes := range annos {
  596. annoPeers++
  597. annoCount += len(hashes)
  598. peer.AsyncSendPooledTransactionHashes(hashes)
  599. }
  600. log.Debug("Transaction broadcast", "txs", len(txs),
  601. "announce packs", annoPeers, "announced hashes", annoCount,
  602. "tx packs", directPeers, "broadcast txs", directCount)
  603. }
  604. // minedBroadcastLoop sends mined blocks to connected peers.
  605. func (h *handler) minedBroadcastLoop() {
  606. defer h.wg.Done()
  607. for obj := range h.minedBlockSub.Chan() {
  608. if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
  609. h.BroadcastBlock(ev.Block, true) // First propagate block to peers
  610. h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
  611. }
  612. }
  613. }
  614. // txBroadcastLoop announces new transactions to connected peers.
  615. func (h *handler) txBroadcastLoop() {
  616. defer h.wg.Done()
  617. for {
  618. select {
  619. case event := <-h.txsCh:
  620. h.BroadcastTransactions(event.Txs)
  621. case <-h.txsSub.Err():
  622. return
  623. }
  624. }
  625. }