handler.go 25 KB


  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "fmt"
  20. "math"
  21. "math/big"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/eth/downloader"
  28. "github.com/ethereum/go-ethereum/eth/fetcher"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/event"
  31. "github.com/ethereum/go-ethereum/logger"
  32. "github.com/ethereum/go-ethereum/logger/glog"
  33. "github.com/ethereum/go-ethereum/p2p"
  34. "github.com/ethereum/go-ethereum/pow"
  35. "github.com/ethereum/go-ethereum/rlp"
  36. )
  37. const (
  38. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  39. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  40. )
  41. // errIncompatibleConfig is returned if the requested protocols and configs are
  42. // not compatible (low protocol version restrictions and high requirements).
  43. var errIncompatibleConfig = errors.New("incompatible configuration")
  44. func errResp(code errCode, format string, v ...interface{}) error {
  45. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  46. }
  47. type hashFetcherFn func(common.Hash) error
  48. type blockFetcherFn func([]common.Hash) error
  49. type ProtocolManager struct {
  50. fastSync bool
  51. txpool txPool
  52. blockchain *core.BlockChain
  53. chaindb ethdb.Database
  54. downloader *downloader.Downloader
  55. fetcher *fetcher.Fetcher
  56. peers *peerSet
  57. SubProtocols []p2p.Protocol
  58. eventMux *event.TypeMux
  59. txSub event.Subscription
  60. minedBlockSub event.Subscription
  61. // channels for fetcher, syncer, txsyncLoop
  62. newPeerCh chan *peer
  63. txsyncCh chan *txsync
  64. quitSync chan struct{}
  65. // wait group is used for graceful shutdowns during downloading
  66. // and processing
  67. wg sync.WaitGroup
  68. quit bool
  69. }
  70. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  71. // with the ethereum network.
  72. func NewProtocolManager(fastSync bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
  73. // Figure out whether to allow fast sync or not
  74. if fastSync && blockchain.CurrentBlock().NumberU64() > 0 {
  75. glog.V(logger.Info).Infof("blockchain not empty, fast sync disabled")
  76. fastSync = false
  77. }
  78. // Create the protocol manager with the base fields
  79. manager := &ProtocolManager{
  80. fastSync: fastSync,
  81. eventMux: mux,
  82. txpool: txpool,
  83. blockchain: blockchain,
  84. chaindb: chaindb,
  85. peers: newPeerSet(),
  86. newPeerCh: make(chan *peer, 1),
  87. txsyncCh: make(chan *txsync),
  88. quitSync: make(chan struct{}),
  89. }
  90. // Initiate a sub-protocol for every implemented version we can handle
  91. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  92. for i, version := range ProtocolVersions {
  93. // Skip protocol version if incompatible with the mode of operation
  94. if fastSync && version < eth63 {
  95. continue
  96. }
  97. // Compatible; initialise the sub-protocol
  98. version := version // Closure for the run
  99. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  100. Name: "eth",
  101. Version: version,
  102. Length: ProtocolLengths[i],
  103. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  104. peer := manager.newPeer(int(version), networkId, p, rw)
  105. manager.newPeerCh <- peer
  106. return manager.handle(peer)
  107. },
  108. })
  109. }
  110. if len(manager.SubProtocols) == 0 {
  111. return nil, errIncompatibleConfig
  112. }
  113. // Construct the different synchronisation mechanisms
  114. manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, blockchain.GetBlock,
  115. blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, blockchain.GetTd,
  116. blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, blockchain.Rollback, manager.removePeer)
  117. validator := func(block *types.Block, parent *types.Block) error {
  118. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  119. }
  120. heighter := func() uint64 {
  121. return blockchain.CurrentBlock().NumberU64()
  122. }
  123. manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, blockchain.InsertChain, manager.removePeer)
  124. return manager, nil
  125. }
  126. func (pm *ProtocolManager) removePeer(id string) {
  127. // Short circuit if the peer was already removed
  128. peer := pm.peers.Peer(id)
  129. if peer == nil {
  130. return
  131. }
  132. glog.V(logger.Debug).Infoln("Removing peer", id)
  133. // Unregister the peer from the downloader and Ethereum peer set
  134. pm.downloader.UnregisterPeer(id)
  135. if err := pm.peers.Unregister(id); err != nil {
  136. glog.V(logger.Error).Infoln("Removal failed:", err)
  137. }
  138. // Hard disconnect at the networking layer
  139. if peer != nil {
  140. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  141. }
  142. }
  143. func (pm *ProtocolManager) Start() {
  144. // broadcast transactions
  145. pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
  146. go pm.txBroadcastLoop()
  147. // broadcast mined blocks
  148. pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
  149. go pm.minedBroadcastLoop()
  150. // start sync handlers
  151. go pm.syncer()
  152. go pm.txsyncLoop()
  153. }
  154. func (pm *ProtocolManager) Stop() {
  155. // Showing a log message. During download / process this could actually
  156. // take between 5 to 10 seconds and therefor feedback is required.
  157. glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
  158. pm.quit = true
  159. pm.txSub.Unsubscribe() // quits txBroadcastLoop
  160. pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
  161. close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
  162. // Wait for any process action
  163. pm.wg.Wait()
  164. glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
  165. }
  166. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  167. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  168. }
  169. // handle is the callback invoked to manage the life cycle of an eth peer. When
  170. // this function terminates, the peer is disconnected.
  171. func (pm *ProtocolManager) handle(p *peer) error {
  172. glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
  173. // Execute the Ethereum handshake
  174. td, head, genesis := pm.blockchain.Status()
  175. if err := p.Handshake(td, head, genesis); err != nil {
  176. glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
  177. return err
  178. }
  179. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  180. rw.Init(p.version)
  181. }
  182. // Register the peer locally
  183. glog.V(logger.Detail).Infof("%v: adding peer", p)
  184. if err := pm.peers.Register(p); err != nil {
  185. glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
  186. return err
  187. }
  188. defer pm.removePeer(p.id)
  189. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  190. if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
  191. p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash,
  192. p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
  193. return err
  194. }
  195. // Propagate existing transactions. new transactions appearing
  196. // after this will be sent via broadcasts.
  197. pm.syncTransactions(p)
  198. // main loop. handle incoming messages.
  199. for {
  200. if err := pm.handleMsg(p); err != nil {
  201. glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
  202. return err
  203. }
  204. }
  205. return nil
  206. }
  207. // handleMsg is invoked whenever an inbound message is received from a remote
  208. // peer. The remote connection is torn down upon returning any error.
  209. func (pm *ProtocolManager) handleMsg(p *peer) error {
  210. // Read the next message from the remote peer, and ensure it's fully consumed
  211. msg, err := p.rw.ReadMsg()
  212. if err != nil {
  213. return err
  214. }
  215. if msg.Size > ProtocolMaxMsgSize {
  216. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  217. }
  218. defer msg.Discard()
  219. // Handle the message depending on its contents
  220. switch {
  221. case msg.Code == StatusMsg:
  222. // Status messages should never arrive after the handshake
  223. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  224. case p.version < eth62 && msg.Code == GetBlockHashesMsg:
  225. // Retrieve the number of hashes to return and from which origin hash
  226. var request getBlockHashesData
  227. if err := msg.Decode(&request); err != nil {
  228. return errResp(ErrDecode, "%v: %v", msg, err)
  229. }
  230. if request.Amount > uint64(downloader.MaxHashFetch) {
  231. request.Amount = uint64(downloader.MaxHashFetch)
  232. }
  233. // Retrieve the hashes from the block chain and return them
  234. hashes := pm.blockchain.GetBlockHashesFromHash(request.Hash, request.Amount)
  235. if len(hashes) == 0 {
  236. glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4])
  237. }
  238. return p.SendBlockHashes(hashes)
  239. case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg:
  240. // Retrieve and decode the number of hashes to return and from which origin number
  241. var request getBlockHashesFromNumberData
  242. if err := msg.Decode(&request); err != nil {
  243. return errResp(ErrDecode, "%v: %v", msg, err)
  244. }
  245. if request.Amount > uint64(downloader.MaxHashFetch) {
  246. request.Amount = uint64(downloader.MaxHashFetch)
  247. }
  248. // Calculate the last block that should be retrieved, and short circuit if unavailable
  249. last := pm.blockchain.GetBlockByNumber(request.Number + request.Amount - 1)
  250. if last == nil {
  251. last = pm.blockchain.CurrentBlock()
  252. request.Amount = last.NumberU64() - request.Number + 1
  253. }
  254. if last.NumberU64() < request.Number {
  255. return p.SendBlockHashes(nil)
  256. }
  257. // Retrieve the hashes from the last block backwards, reverse and return
  258. hashes := []common.Hash{last.Hash()}
  259. hashes = append(hashes, pm.blockchain.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...)
  260. for i := 0; i < len(hashes)/2; i++ {
  261. hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i]
  262. }
  263. return p.SendBlockHashes(hashes)
  264. case p.version < eth62 && msg.Code == BlockHashesMsg:
  265. // A batch of hashes arrived to one of our previous requests
  266. var hashes []common.Hash
  267. if err := msg.Decode(&hashes); err != nil {
  268. break
  269. }
  270. // Deliver them all to the downloader for queuing
  271. err := pm.downloader.DeliverHashes(p.id, hashes)
  272. if err != nil {
  273. glog.V(logger.Debug).Infoln(err)
  274. }
  275. case p.version < eth62 && msg.Code == GetBlocksMsg:
  276. // Decode the retrieval message
  277. msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
  278. if _, err := msgStream.List(); err != nil {
  279. return err
  280. }
  281. // Gather blocks until the fetch or network limits is reached
  282. var (
  283. hash common.Hash
  284. bytes common.StorageSize
  285. blocks []*types.Block
  286. )
  287. for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit {
  288. //Retrieve the hash of the next block
  289. err := msgStream.Decode(&hash)
  290. if err == rlp.EOL {
  291. break
  292. } else if err != nil {
  293. return errResp(ErrDecode, "msg %v: %v", msg, err)
  294. }
  295. // Retrieve the requested block, stopping if enough was found
  296. if block := pm.blockchain.GetBlock(hash); block != nil {
  297. blocks = append(blocks, block)
  298. bytes += block.Size()
  299. }
  300. }
  301. return p.SendBlocks(blocks)
  302. case p.version < eth62 && msg.Code == BlocksMsg:
  303. // Decode the arrived block message
  304. var blocks []*types.Block
  305. if err := msg.Decode(&blocks); err != nil {
  306. glog.V(logger.Detail).Infoln("Decode error", err)
  307. blocks = nil
  308. }
  309. // Update the receive timestamp of each block
  310. for _, block := range blocks {
  311. block.ReceivedAt = msg.ReceivedAt
  312. }
  313. // Filter out any explicitly requested blocks, deliver the rest to the downloader
  314. if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
  315. pm.downloader.DeliverBlocks(p.id, blocks)
  316. }
  317. // Block header query, collect the requested headers and reply
  318. case p.version >= eth62 && msg.Code == GetBlockHeadersMsg:
  319. // Decode the complex header query
  320. var query getBlockHeadersData
  321. if err := msg.Decode(&query); err != nil {
  322. return errResp(ErrDecode, "%v: %v", msg, err)
  323. }
  324. // Gather headers until the fetch or network limits is reached
  325. var (
  326. bytes common.StorageSize
  327. headers []*types.Header
  328. unknown bool
  329. )
  330. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
  331. // Retrieve the next header satisfying the query
  332. var origin *types.Header
  333. if query.Origin.Hash != (common.Hash{}) {
  334. origin = pm.blockchain.GetHeader(query.Origin.Hash)
  335. } else {
  336. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  337. }
  338. if origin == nil {
  339. break
  340. }
  341. headers = append(headers, origin)
  342. bytes += estHeaderRlpSize
  343. // Advance to the next header of the query
  344. switch {
  345. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  346. // Hash based traversal towards the genesis block
  347. for i := 0; i < int(query.Skip)+1; i++ {
  348. if header := pm.blockchain.GetHeader(query.Origin.Hash); header != nil {
  349. query.Origin.Hash = header.ParentHash
  350. } else {
  351. unknown = true
  352. break
  353. }
  354. }
  355. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  356. // Hash based traversal towards the leaf block
  357. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  358. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  359. query.Origin.Hash = header.Hash()
  360. } else {
  361. unknown = true
  362. }
  363. } else {
  364. unknown = true
  365. }
  366. case query.Reverse:
  367. // Number based traversal towards the genesis block
  368. if query.Origin.Number >= query.Skip+1 {
  369. query.Origin.Number -= (query.Skip + 1)
  370. } else {
  371. unknown = true
  372. }
  373. case !query.Reverse:
  374. // Number based traversal towards the leaf block
  375. query.Origin.Number += (query.Skip + 1)
  376. }
  377. }
  378. return p.SendBlockHeaders(headers)
  379. case p.version >= eth62 && msg.Code == BlockHeadersMsg:
  380. // A batch of headers arrived to one of our previous requests
  381. var headers []*types.Header
  382. if err := msg.Decode(&headers); err != nil {
  383. return errResp(ErrDecode, "msg %v: %v", msg, err)
  384. }
  385. // Filter out any explicitly requested headers, deliver the rest to the downloader
  386. filter := len(headers) == 1
  387. if filter {
  388. headers = pm.fetcher.FilterHeaders(headers, time.Now())
  389. }
  390. if len(headers) > 0 || !filter {
  391. err := pm.downloader.DeliverHeaders(p.id, headers)
  392. if err != nil {
  393. glog.V(logger.Debug).Infoln(err)
  394. }
  395. }
  396. case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
  397. // Decode the retrieval message
  398. msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
  399. if _, err := msgStream.List(); err != nil {
  400. return err
  401. }
  402. // Gather blocks until the fetch or network limits is reached
  403. var (
  404. hash common.Hash
  405. bytes int
  406. bodies []rlp.RawValue
  407. )
  408. for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
  409. // Retrieve the hash of the next block
  410. if err := msgStream.Decode(&hash); err == rlp.EOL {
  411. break
  412. } else if err != nil {
  413. return errResp(ErrDecode, "msg %v: %v", msg, err)
  414. }
  415. // Retrieve the requested block body, stopping if enough was found
  416. if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
  417. bodies = append(bodies, data)
  418. bytes += len(data)
  419. }
  420. }
  421. return p.SendBlockBodiesRLP(bodies)
  422. case p.version >= eth62 && msg.Code == BlockBodiesMsg:
  423. // A batch of block bodies arrived to one of our previous requests
  424. var request blockBodiesData
  425. if err := msg.Decode(&request); err != nil {
  426. return errResp(ErrDecode, "msg %v: %v", msg, err)
  427. }
  428. // Deliver them all to the downloader for queuing
  429. trasactions := make([][]*types.Transaction, len(request))
  430. uncles := make([][]*types.Header, len(request))
  431. for i, body := range request {
  432. trasactions[i] = body.Transactions
  433. uncles[i] = body.Uncles
  434. }
  435. // Filter out any explicitly requested bodies, deliver the rest to the downloader
  436. if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
  437. err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
  438. if err != nil {
  439. glog.V(logger.Debug).Infoln(err)
  440. }
  441. }
  442. case p.version >= eth63 && msg.Code == GetNodeDataMsg:
  443. // Decode the retrieval message
  444. msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
  445. if _, err := msgStream.List(); err != nil {
  446. return err
  447. }
  448. // Gather state data until the fetch or network limits is reached
  449. var (
  450. hash common.Hash
  451. bytes int
  452. data [][]byte
  453. )
  454. for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
  455. // Retrieve the hash of the next state entry
  456. if err := msgStream.Decode(&hash); err == rlp.EOL {
  457. break
  458. } else if err != nil {
  459. return errResp(ErrDecode, "msg %v: %v", msg, err)
  460. }
  461. // Retrieve the requested state entry, stopping if enough was found
  462. if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {
  463. data = append(data, entry)
  464. bytes += len(entry)
  465. }
  466. }
  467. return p.SendNodeData(data)
  468. case p.version >= eth63 && msg.Code == NodeDataMsg:
  469. // A batch of node state data arrived to one of our previous requests
  470. var data [][]byte
  471. if err := msg.Decode(&data); err != nil {
  472. return errResp(ErrDecode, "msg %v: %v", msg, err)
  473. }
  474. // Deliver all to the downloader
  475. if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
  476. glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err)
  477. }
  478. case p.version >= eth63 && msg.Code == GetReceiptsMsg:
  479. // Decode the retrieval message
  480. msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
  481. if _, err := msgStream.List(); err != nil {
  482. return err
  483. }
  484. // Gather state data until the fetch or network limits is reached
  485. var (
  486. hash common.Hash
  487. bytes int
  488. receipts []rlp.RawValue
  489. )
  490. for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
  491. // Retrieve the hash of the next block
  492. if err := msgStream.Decode(&hash); err == rlp.EOL {
  493. break
  494. } else if err != nil {
  495. return errResp(ErrDecode, "msg %v: %v", msg, err)
  496. }
  497. // Retrieve the requested block's receipts, skipping if unknown to us
  498. results := core.GetBlockReceipts(pm.chaindb, hash)
  499. if results == nil {
  500. if header := pm.blockchain.GetHeader(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  501. continue
  502. }
  503. }
  504. // If known, encode and queue for response packet
  505. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  506. glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
  507. } else {
  508. receipts = append(receipts, encoded)
  509. bytes += len(encoded)
  510. }
  511. }
  512. return p.SendReceiptsRLP(receipts)
  513. case p.version >= eth63 && msg.Code == ReceiptsMsg:
  514. // A batch of receipts arrived to one of our previous requests
  515. var receipts [][]*types.Receipt
  516. if err := msg.Decode(&receipts); err != nil {
  517. return errResp(ErrDecode, "msg %v: %v", msg, err)
  518. }
  519. // Deliver all to the downloader
  520. if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
  521. glog.V(logger.Debug).Infof("failed to deliver receipts: %v", err)
  522. }
  523. case msg.Code == NewBlockHashesMsg:
  524. // Retrieve and deseralize the remote new block hashes notification
  525. type announce struct {
  526. Hash common.Hash
  527. Number uint64
  528. }
  529. var announces = []announce{}
  530. if p.version < eth62 {
  531. // We're running the old protocol, make block number unknown (0)
  532. var hashes []common.Hash
  533. if err := msg.Decode(&hashes); err != nil {
  534. return errResp(ErrDecode, "%v: %v", msg, err)
  535. }
  536. for _, hash := range hashes {
  537. announces = append(announces, announce{hash, 0})
  538. }
  539. } else {
  540. // Otherwise extract both block hash and number
  541. var request newBlockHashesData
  542. if err := msg.Decode(&request); err != nil {
  543. return errResp(ErrDecode, "%v: %v", msg, err)
  544. }
  545. for _, block := range request {
  546. announces = append(announces, announce{block.Hash, block.Number})
  547. }
  548. }
  549. // Mark the hashes as present at the remote node
  550. for _, block := range announces {
  551. p.MarkBlock(block.Hash)
  552. p.SetHead(block.Hash)
  553. }
  554. // Schedule all the unknown hashes for retrieval
  555. unknown := make([]announce, 0, len(announces))
  556. for _, block := range announces {
  557. if !pm.blockchain.HasBlock(block.Hash) {
  558. unknown = append(unknown, block)
  559. }
  560. }
  561. for _, block := range unknown {
  562. if p.version < eth62 {
  563. pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil)
  564. } else {
  565. pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies)
  566. }
  567. }
  568. case msg.Code == NewBlockMsg:
  569. // Retrieve and decode the propagated block
  570. var request newBlockData
  571. if err := msg.Decode(&request); err != nil {
  572. return errResp(ErrDecode, "%v: %v", msg, err)
  573. }
  574. if err := request.Block.ValidateFields(); err != nil {
  575. return errResp(ErrDecode, "block validation %v: %v", msg, err)
  576. }
  577. request.Block.ReceivedAt = msg.ReceivedAt
  578. // Mark the peer as owning the block and schedule it for import
  579. p.MarkBlock(request.Block.Hash())
  580. p.SetHead(request.Block.Hash())
  581. pm.fetcher.Enqueue(p.id, request.Block)
  582. // Update the peers total difficulty if needed, schedule a download if gapped
  583. if request.TD.Cmp(p.Td()) > 0 {
  584. p.SetTd(request.TD)
  585. td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash())
  586. if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 {
  587. go pm.synchronise(p)
  588. }
  589. }
  590. case msg.Code == TxMsg:
  591. // Transactions arrived, parse all of them and deliver to the pool
  592. var txs []*types.Transaction
  593. if err := msg.Decode(&txs); err != nil {
  594. return errResp(ErrDecode, "msg %v: %v", msg, err)
  595. }
  596. for i, tx := range txs {
  597. // Validate and mark the remote transaction
  598. if tx == nil {
  599. return errResp(ErrDecode, "transaction %d is nil", i)
  600. }
  601. p.MarkTransaction(tx.Hash())
  602. }
  603. pm.txpool.AddTransactions(txs)
  604. default:
  605. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  606. }
  607. return nil
  608. }
  609. // BroadcastBlock will either propagate a block to a subset of it's peers, or
  610. // will only announce it's availability (depending what's requested).
  611. func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
  612. hash := block.Hash()
  613. peers := pm.peers.PeersWithoutBlock(hash)
  614. // If propagation is requested, send to a subset of the peer
  615. if propagate {
  616. // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
  617. var td *big.Int
  618. if parent := pm.blockchain.GetBlock(block.ParentHash()); parent != nil {
  619. td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash()))
  620. } else {
  621. glog.V(logger.Error).Infof("propagating dangling block #%d [%x]", block.NumberU64(), hash[:4])
  622. return
  623. }
  624. // Send the block to a subset of our peers
  625. transfer := peers[:int(math.Sqrt(float64(len(peers))))]
  626. for _, peer := range transfer {
  627. peer.SendNewBlock(block, td)
  628. }
  629. glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt))
  630. }
  631. // Otherwise if the block is indeed in out own chain, announce it
  632. if pm.blockchain.HasBlock(hash) {
  633. for _, peer := range peers {
  634. if peer.version < eth62 {
  635. peer.SendNewBlockHashes61([]common.Hash{hash})
  636. } else {
  637. peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
  638. }
  639. }
  640. glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
  641. }
  642. }
  643. // BroadcastTx will propagate a transaction to all peers which are not known to
  644. // already have the given transaction.
  645. func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
  646. // Broadcast transaction to a batch of peers not knowing about it
  647. peers := pm.peers.PeersWithoutTx(hash)
  648. //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
  649. for _, peer := range peers {
  650. peer.SendTransactions(types.Transactions{tx})
  651. }
  652. glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
  653. }
  654. // Mined broadcast loop
  655. func (self *ProtocolManager) minedBroadcastLoop() {
  656. // automatically stops if unsubscribe
  657. for obj := range self.minedBlockSub.Chan() {
  658. switch ev := obj.Data.(type) {
  659. case core.NewMinedBlockEvent:
  660. self.BroadcastBlock(ev.Block, true) // First propagate block to peers
  661. self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
  662. }
  663. }
  664. }
  665. func (self *ProtocolManager) txBroadcastLoop() {
  666. // automatically stops if unsubscribe
  667. for obj := range self.txSub.Chan() {
  668. event := obj.Data.(core.TxPreEvent)
  669. self.BroadcastTx(event.Tx.Hash(), event.Tx)
  670. }
  671. }