handler.go 28 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/state"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/eth"
  32. "github.com/ethereum/go-ethereum/eth/downloader"
  33. "github.com/ethereum/go-ethereum/ethdb"
  34. "github.com/ethereum/go-ethereum/event"
  35. "github.com/ethereum/go-ethereum/log"
  36. "github.com/ethereum/go-ethereum/p2p"
  37. "github.com/ethereum/go-ethereum/p2p/discover"
  38. "github.com/ethereum/go-ethereum/p2p/discv5"
  39. "github.com/ethereum/go-ethereum/params"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 63 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. disableClientRemovePeer = false
  55. )
  56. // errIncompatibleConfig is returned if the requested protocols and configs are
  57. // not compatible (low protocol version restrictions and high requirements).
  58. var errIncompatibleConfig = errors.New("incompatible configuration")
  59. func errResp(code errCode, format string, v ...interface{}) error {
  60. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  61. }
  62. type hashFetcherFn func(common.Hash) error
  63. type BlockChain interface {
  64. HasHeader(hash common.Hash) bool
  65. GetHeader(hash common.Hash, number uint64) *types.Header
  66. GetHeaderByHash(hash common.Hash) *types.Header
  67. CurrentHeader() *types.Header
  68. GetTdByHash(hash common.Hash) *big.Int
  69. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  70. Rollback(chain []common.Hash)
  71. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  72. GetHeaderByNumber(number uint64) *types.Header
  73. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  74. LastBlockHash() common.Hash
  75. Genesis() *types.Block
  76. }
  77. type txPool interface {
  78. // AddTransactions should add the given transactions to the pool.
  79. AddBatch([]*types.Transaction) error
  80. }
  81. type ProtocolManager struct {
  82. lightSync bool
  83. txpool txPool
  84. txrelay *LesTxRelay
  85. networkId int
  86. chainConfig *params.ChainConfig
  87. blockchain BlockChain
  88. chainDb ethdb.Database
  89. odr *LesOdr
  90. server *LesServer
  91. serverPool *serverPool
  92. reqDist *requestDistributor
  93. downloader *downloader.Downloader
  94. fetcher *lightFetcher
  95. peers *peerSet
  96. SubProtocols []p2p.Protocol
  97. eventMux *event.TypeMux
  98. // channels for fetcher, syncer, txsyncLoop
  99. newPeerCh chan *peer
  100. quitSync chan struct{}
  101. noMorePeers chan struct{}
  102. syncMu sync.Mutex
  103. syncing bool
  104. syncDone chan struct{}
  105. // wait group is used for graceful shutdowns during downloading
  106. // and processing
  107. wg sync.WaitGroup
  108. }
  109. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  110. // with the ethereum network.
  111. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, engine consensus.Engine, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
  112. // Create the protocol manager with the base fields
  113. manager := &ProtocolManager{
  114. lightSync: lightSync,
  115. eventMux: mux,
  116. blockchain: blockchain,
  117. chainConfig: chainConfig,
  118. chainDb: chainDb,
  119. networkId: networkId,
  120. txpool: txpool,
  121. txrelay: txrelay,
  122. odr: odr,
  123. peers: newPeerSet(),
  124. newPeerCh: make(chan *peer),
  125. quitSync: make(chan struct{}),
  126. noMorePeers: make(chan struct{}),
  127. }
  128. // Initiate a sub-protocol for every implemented version we can handle
  129. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  130. for i, version := range ProtocolVersions {
  131. // Compatible, initialize the sub-protocol
  132. version := version // Closure for the run
  133. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  134. Name: "les",
  135. Version: version,
  136. Length: ProtocolLengths[i],
  137. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  138. var entry *poolEntry
  139. peer := manager.newPeer(int(version), networkId, p, rw)
  140. if manager.serverPool != nil {
  141. addr := p.RemoteAddr().(*net.TCPAddr)
  142. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  143. }
  144. peer.poolEntry = entry
  145. select {
  146. case manager.newPeerCh <- peer:
  147. manager.wg.Add(1)
  148. defer manager.wg.Done()
  149. err := manager.handle(peer)
  150. if entry != nil {
  151. manager.serverPool.disconnect(entry)
  152. }
  153. return err
  154. case <-manager.quitSync:
  155. if entry != nil {
  156. manager.serverPool.disconnect(entry)
  157. }
  158. return p2p.DiscQuitting
  159. }
  160. },
  161. NodeInfo: func() interface{} {
  162. return manager.NodeInfo()
  163. },
  164. PeerInfo: func(id discover.NodeID) interface{} {
  165. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  166. return p.Info()
  167. }
  168. return nil
  169. },
  170. })
  171. }
  172. if len(manager.SubProtocols) == 0 {
  173. return nil, errIncompatibleConfig
  174. }
  175. removePeer := manager.removePeer
  176. if disableClientRemovePeer {
  177. removePeer = func(id string) {}
  178. }
  179. if lightSync {
  180. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
  181. nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
  182. blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
  183. }
  184. manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
  185. m := make(map[distPeer]struct{})
  186. peers := manager.peers.AllPeers()
  187. for _, peer := range peers {
  188. m[peer] = struct{}{}
  189. }
  190. return m
  191. }, manager.quitSync)
  192. if odr != nil {
  193. odr.removePeer = removePeer
  194. odr.reqDist = manager.reqDist
  195. }
  196. /*validator := func(block *types.Block, parent *types.Block) error {
  197. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  198. }
  199. heighter := func() uint64 {
  200. return chainman.LastBlockNumberU64()
  201. }
  202. manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
  203. */
  204. return manager, nil
  205. }
  206. func (pm *ProtocolManager) removePeer(id string) {
  207. // Short circuit if the peer was already removed
  208. peer := pm.peers.Peer(id)
  209. if peer == nil {
  210. return
  211. }
  212. log.Debug("Removing light Ethereum peer", "peer", id)
  213. if err := pm.peers.Unregister(id); err != nil {
  214. if err == errNotRegistered {
  215. return
  216. }
  217. }
  218. // Unregister the peer from the downloader and Ethereum peer set
  219. if pm.lightSync {
  220. pm.downloader.UnregisterPeer(id)
  221. if pm.txrelay != nil {
  222. pm.txrelay.removePeer(id)
  223. }
  224. if pm.fetcher != nil {
  225. pm.fetcher.removePeer(peer)
  226. }
  227. }
  228. // Hard disconnect at the networking layer
  229. if peer != nil {
  230. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  231. }
  232. }
  233. func (pm *ProtocolManager) Start(srvr *p2p.Server) {
  234. var topicDisc *discv5.Network
  235. if srvr != nil {
  236. topicDisc = srvr.DiscV5
  237. }
  238. lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
  239. if pm.lightSync {
  240. // start sync handler
  241. if srvr != nil { // srvr is nil during testing
  242. pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
  243. pm.odr.serverPool = pm.serverPool
  244. pm.fetcher = newLightFetcher(pm)
  245. }
  246. go pm.syncer()
  247. } else {
  248. if topicDisc != nil {
  249. go func() {
  250. logger := log.New("topic", lesTopic)
  251. logger.Info("Starting topic registration")
  252. defer logger.Info("Terminated topic registration")
  253. topicDisc.RegisterTopic(lesTopic, pm.quitSync)
  254. }()
  255. }
  256. go func() {
  257. for range pm.newPeerCh {
  258. }
  259. }()
  260. }
  261. }
  262. func (pm *ProtocolManager) Stop() {
  263. // Showing a log message. During download / process this could actually
  264. // take between 5 to 10 seconds and therefor feedback is required.
  265. log.Info("Stopping light Ethereum protocol")
  266. // Quit the sync loop.
  267. // After this send has completed, no new peers will be accepted.
  268. pm.noMorePeers <- struct{}{}
  269. close(pm.quitSync) // quits syncer, fetcher
  270. // Disconnect existing sessions.
  271. // This also closes the gate for any new registrations on the peer set.
  272. // sessions which are already established but not added to pm.peers yet
  273. // will exit when they try to register.
  274. pm.peers.Close()
  275. // Wait for any process action
  276. pm.wg.Wait()
  277. log.Info("Light Ethereum protocol stopped")
  278. }
  279. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  280. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  281. }
  282. // handle is the callback invoked to manage the life cycle of a les peer. When
  283. // this function terminates, the peer is disconnected.
  284. func (pm *ProtocolManager) handle(p *peer) error {
  285. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  286. // Execute the LES handshake
  287. td, head, genesis := pm.blockchain.Status()
  288. headNum := core.GetBlockNumber(pm.chainDb, head)
  289. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  290. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  291. return err
  292. }
  293. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  294. rw.Init(p.version)
  295. }
  296. // Register the peer locally
  297. if err := pm.peers.Register(p); err != nil {
  298. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  299. return err
  300. }
  301. defer func() {
  302. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  303. p.fcClient.Remove(pm.server.fcManager)
  304. }
  305. pm.removePeer(p.id)
  306. }()
  307. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  308. if pm.lightSync {
  309. requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
  310. reqID := getNextReqID()
  311. rq := &distReq{
  312. getCost: func(dp distPeer) uint64 {
  313. peer := dp.(*peer)
  314. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  315. },
  316. canSend: func(dp distPeer) bool {
  317. return dp.(*peer) == p
  318. },
  319. request: func(dp distPeer) func() {
  320. peer := dp.(*peer)
  321. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  322. peer.fcServer.QueueRequest(reqID, cost)
  323. return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
  324. },
  325. }
  326. _, ok := <-pm.reqDist.queue(rq)
  327. if !ok {
  328. return ErrNoPeers
  329. }
  330. return nil
  331. }
  332. requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
  333. reqID := getNextReqID()
  334. rq := &distReq{
  335. getCost: func(dp distPeer) uint64 {
  336. peer := dp.(*peer)
  337. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  338. },
  339. canSend: func(dp distPeer) bool {
  340. return dp.(*peer) == p
  341. },
  342. request: func(dp distPeer) func() {
  343. peer := dp.(*peer)
  344. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  345. peer.fcServer.QueueRequest(reqID, cost)
  346. return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
  347. },
  348. }
  349. _, ok := <-pm.reqDist.queue(rq)
  350. if !ok {
  351. return ErrNoPeers
  352. }
  353. return nil
  354. }
  355. if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
  356. requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
  357. return err
  358. }
  359. if pm.txrelay != nil {
  360. pm.txrelay.addPeer(p)
  361. }
  362. p.lock.Lock()
  363. head := p.headInfo
  364. p.lock.Unlock()
  365. if pm.fetcher != nil {
  366. pm.fetcher.addPeer(p)
  367. pm.fetcher.announce(p, head)
  368. }
  369. if p.poolEntry != nil {
  370. pm.serverPool.registered(p.poolEntry)
  371. }
  372. }
  373. stop := make(chan struct{})
  374. defer close(stop)
  375. go func() {
  376. // new block announce loop
  377. for {
  378. select {
  379. case announce := <-p.announceChn:
  380. p.SendAnnounce(announce)
  381. case <-stop:
  382. return
  383. }
  384. }
  385. }()
  386. // main loop. handle incoming messages.
  387. for {
  388. if err := pm.handleMsg(p); err != nil {
  389. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  390. return err
  391. }
  392. }
  393. }
  394. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  395. // handleMsg is invoked whenever an inbound message is received from a remote
  396. // peer. The remote connection is torn down upon returning any error.
  397. func (pm *ProtocolManager) handleMsg(p *peer) error {
  398. // Read the next message from the remote peer, and ensure it's fully consumed
  399. msg, err := p.rw.ReadMsg()
  400. if err != nil {
  401. return err
  402. }
  403. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  404. costs := p.fcCosts[msg.Code]
  405. reject := func(reqCnt, maxCnt uint64) bool {
  406. if p.fcClient == nil || reqCnt > maxCnt {
  407. return true
  408. }
  409. bufValue, _ := p.fcClient.AcceptRequest()
  410. cost := costs.baseCost + reqCnt*costs.reqCost
  411. if cost > pm.server.defParams.BufLimit {
  412. cost = pm.server.defParams.BufLimit
  413. }
  414. if cost > bufValue {
  415. recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
  416. p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
  417. return true
  418. }
  419. return false
  420. }
  421. if msg.Size > ProtocolMaxMsgSize {
  422. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  423. }
  424. defer msg.Discard()
  425. var deliverMsg *Msg
  426. // Handle the message depending on its contents
  427. switch msg.Code {
  428. case StatusMsg:
  429. p.Log().Trace("Received status message")
  430. // Status messages should never arrive after the handshake
  431. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  432. // Block header query, collect the requested headers and reply
  433. case AnnounceMsg:
  434. p.Log().Trace("Received announce message")
  435. var req announceData
  436. if err := msg.Decode(&req); err != nil {
  437. return errResp(ErrDecode, "%v: %v", msg, err)
  438. }
  439. p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
  440. if pm.fetcher != nil {
  441. pm.fetcher.announce(p, &req)
  442. }
  443. case GetBlockHeadersMsg:
  444. p.Log().Trace("Received block header request")
  445. // Decode the complex header query
  446. var req struct {
  447. ReqID uint64
  448. Query getBlockHeadersData
  449. }
  450. if err := msg.Decode(&req); err != nil {
  451. return errResp(ErrDecode, "%v: %v", msg, err)
  452. }
  453. query := req.Query
  454. if reject(query.Amount, MaxHeaderFetch) {
  455. return errResp(ErrRequestRejected, "")
  456. }
  457. hashMode := query.Origin.Hash != (common.Hash{})
  458. // Gather headers until the fetch or network limits is reached
  459. var (
  460. bytes common.StorageSize
  461. headers []*types.Header
  462. unknown bool
  463. )
  464. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  465. // Retrieve the next header satisfying the query
  466. var origin *types.Header
  467. if hashMode {
  468. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  469. } else {
  470. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  471. }
  472. if origin == nil {
  473. break
  474. }
  475. number := origin.Number.Uint64()
  476. headers = append(headers, origin)
  477. bytes += estHeaderRlpSize
  478. // Advance to the next header of the query
  479. switch {
  480. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  481. // Hash based traversal towards the genesis block
  482. for i := 0; i < int(query.Skip)+1; i++ {
  483. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  484. query.Origin.Hash = header.ParentHash
  485. number--
  486. } else {
  487. unknown = true
  488. break
  489. }
  490. }
  491. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  492. // Hash based traversal towards the leaf block
  493. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  494. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  495. query.Origin.Hash = header.Hash()
  496. } else {
  497. unknown = true
  498. }
  499. } else {
  500. unknown = true
  501. }
  502. case query.Reverse:
  503. // Number based traversal towards the genesis block
  504. if query.Origin.Number >= query.Skip+1 {
  505. query.Origin.Number -= (query.Skip + 1)
  506. } else {
  507. unknown = true
  508. }
  509. case !query.Reverse:
  510. // Number based traversal towards the leaf block
  511. query.Origin.Number += (query.Skip + 1)
  512. }
  513. }
  514. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  515. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  516. return p.SendBlockHeaders(req.ReqID, bv, headers)
  517. case BlockHeadersMsg:
  518. if pm.downloader == nil {
  519. return errResp(ErrUnexpectedResponse, "")
  520. }
  521. p.Log().Trace("Received block header response message")
  522. // A batch of headers arrived to one of our previous requests
  523. var resp struct {
  524. ReqID, BV uint64
  525. Headers []*types.Header
  526. }
  527. if err := msg.Decode(&resp); err != nil {
  528. return errResp(ErrDecode, "msg %v: %v", msg, err)
  529. }
  530. p.fcServer.GotReply(resp.ReqID, resp.BV)
  531. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  532. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  533. } else {
  534. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  535. if err != nil {
  536. log.Debug(fmt.Sprint(err))
  537. }
  538. }
  539. case GetBlockBodiesMsg:
  540. p.Log().Trace("Received block bodies request")
  541. // Decode the retrieval message
  542. var req struct {
  543. ReqID uint64
  544. Hashes []common.Hash
  545. }
  546. if err := msg.Decode(&req); err != nil {
  547. return errResp(ErrDecode, "msg %v: %v", msg, err)
  548. }
  549. // Gather blocks until the fetch or network limits is reached
  550. var (
  551. bytes int
  552. bodies []rlp.RawValue
  553. )
  554. reqCnt := len(req.Hashes)
  555. if reject(uint64(reqCnt), MaxBodyFetch) {
  556. return errResp(ErrRequestRejected, "")
  557. }
  558. for _, hash := range req.Hashes {
  559. if bytes >= softResponseLimit {
  560. break
  561. }
  562. // Retrieve the requested block body, stopping if enough was found
  563. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  564. bodies = append(bodies, data)
  565. bytes += len(data)
  566. }
  567. }
  568. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  569. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  570. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  571. case BlockBodiesMsg:
  572. if pm.odr == nil {
  573. return errResp(ErrUnexpectedResponse, "")
  574. }
  575. p.Log().Trace("Received block bodies response")
  576. // A batch of block bodies arrived to one of our previous requests
  577. var resp struct {
  578. ReqID, BV uint64
  579. Data []*types.Body
  580. }
  581. if err := msg.Decode(&resp); err != nil {
  582. return errResp(ErrDecode, "msg %v: %v", msg, err)
  583. }
  584. p.fcServer.GotReply(resp.ReqID, resp.BV)
  585. deliverMsg = &Msg{
  586. MsgType: MsgBlockBodies,
  587. ReqID: resp.ReqID,
  588. Obj: resp.Data,
  589. }
  590. case GetCodeMsg:
  591. p.Log().Trace("Received code request")
  592. // Decode the retrieval message
  593. var req struct {
  594. ReqID uint64
  595. Reqs []CodeReq
  596. }
  597. if err := msg.Decode(&req); err != nil {
  598. return errResp(ErrDecode, "msg %v: %v", msg, err)
  599. }
  600. // Gather state data until the fetch or network limits is reached
  601. var (
  602. bytes int
  603. data [][]byte
  604. )
  605. reqCnt := len(req.Reqs)
  606. if reject(uint64(reqCnt), MaxCodeFetch) {
  607. return errResp(ErrRequestRejected, "")
  608. }
  609. for _, req := range req.Reqs {
  610. // Retrieve the requested state entry, stopping if enough was found
  611. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  612. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  613. sdata := trie.Get(req.AccKey)
  614. var acc state.Account
  615. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  616. entry, _ := pm.chainDb.Get(acc.CodeHash)
  617. if bytes+len(entry) >= softResponseLimit {
  618. break
  619. }
  620. data = append(data, entry)
  621. bytes += len(entry)
  622. }
  623. }
  624. }
  625. }
  626. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  627. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  628. return p.SendCode(req.ReqID, bv, data)
  629. case CodeMsg:
  630. if pm.odr == nil {
  631. return errResp(ErrUnexpectedResponse, "")
  632. }
  633. p.Log().Trace("Received code response")
  634. // A batch of node state data arrived to one of our previous requests
  635. var resp struct {
  636. ReqID, BV uint64
  637. Data [][]byte
  638. }
  639. if err := msg.Decode(&resp); err != nil {
  640. return errResp(ErrDecode, "msg %v: %v", msg, err)
  641. }
  642. p.fcServer.GotReply(resp.ReqID, resp.BV)
  643. deliverMsg = &Msg{
  644. MsgType: MsgCode,
  645. ReqID: resp.ReqID,
  646. Obj: resp.Data,
  647. }
  648. case GetReceiptsMsg:
  649. p.Log().Trace("Received receipts request")
  650. // Decode the retrieval message
  651. var req struct {
  652. ReqID uint64
  653. Hashes []common.Hash
  654. }
  655. if err := msg.Decode(&req); err != nil {
  656. return errResp(ErrDecode, "msg %v: %v", msg, err)
  657. }
  658. // Gather state data until the fetch or network limits is reached
  659. var (
  660. bytes int
  661. receipts []rlp.RawValue
  662. )
  663. reqCnt := len(req.Hashes)
  664. if reject(uint64(reqCnt), MaxReceiptFetch) {
  665. return errResp(ErrRequestRejected, "")
  666. }
  667. for _, hash := range req.Hashes {
  668. if bytes >= softResponseLimit {
  669. break
  670. }
  671. // Retrieve the requested block's receipts, skipping if unknown to us
  672. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  673. if results == nil {
  674. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  675. continue
  676. }
  677. }
  678. // If known, encode and queue for response packet
  679. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  680. log.Error("Failed to encode receipt", "err", err)
  681. } else {
  682. receipts = append(receipts, encoded)
  683. bytes += len(encoded)
  684. }
  685. }
  686. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  687. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  688. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  689. case ReceiptsMsg:
  690. if pm.odr == nil {
  691. return errResp(ErrUnexpectedResponse, "")
  692. }
  693. p.Log().Trace("Received receipts response")
  694. // A batch of receipts arrived to one of our previous requests
  695. var resp struct {
  696. ReqID, BV uint64
  697. Receipts []types.Receipts
  698. }
  699. if err := msg.Decode(&resp); err != nil {
  700. return errResp(ErrDecode, "msg %v: %v", msg, err)
  701. }
  702. p.fcServer.GotReply(resp.ReqID, resp.BV)
  703. deliverMsg = &Msg{
  704. MsgType: MsgReceipts,
  705. ReqID: resp.ReqID,
  706. Obj: resp.Receipts,
  707. }
  708. case GetProofsMsg:
  709. p.Log().Trace("Received proofs request")
  710. // Decode the retrieval message
  711. var req struct {
  712. ReqID uint64
  713. Reqs []ProofReq
  714. }
  715. if err := msg.Decode(&req); err != nil {
  716. return errResp(ErrDecode, "msg %v: %v", msg, err)
  717. }
  718. // Gather state data until the fetch or network limits is reached
  719. var (
  720. bytes int
  721. proofs proofsData
  722. )
  723. reqCnt := len(req.Reqs)
  724. if reject(uint64(reqCnt), MaxProofsFetch) {
  725. return errResp(ErrRequestRejected, "")
  726. }
  727. for _, req := range req.Reqs {
  728. if bytes >= softResponseLimit {
  729. break
  730. }
  731. // Retrieve the requested state entry, stopping if enough was found
  732. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  733. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  734. if len(req.AccKey) > 0 {
  735. sdata := tr.Get(req.AccKey)
  736. tr = nil
  737. var acc state.Account
  738. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  739. tr, _ = trie.New(acc.Root, pm.chainDb)
  740. }
  741. }
  742. if tr != nil {
  743. proof := tr.Prove(req.Key)
  744. proofs = append(proofs, proof)
  745. bytes += len(proof)
  746. }
  747. }
  748. }
  749. }
  750. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  751. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  752. return p.SendProofs(req.ReqID, bv, proofs)
  753. case ProofsMsg:
  754. if pm.odr == nil {
  755. return errResp(ErrUnexpectedResponse, "")
  756. }
  757. p.Log().Trace("Received proofs response")
  758. // A batch of merkle proofs arrived to one of our previous requests
  759. var resp struct {
  760. ReqID, BV uint64
  761. Data [][]rlp.RawValue
  762. }
  763. if err := msg.Decode(&resp); err != nil {
  764. return errResp(ErrDecode, "msg %v: %v", msg, err)
  765. }
  766. p.fcServer.GotReply(resp.ReqID, resp.BV)
  767. deliverMsg = &Msg{
  768. MsgType: MsgProofs,
  769. ReqID: resp.ReqID,
  770. Obj: resp.Data,
  771. }
  772. case GetHeaderProofsMsg:
  773. p.Log().Trace("Received headers proof request")
  774. // Decode the retrieval message
  775. var req struct {
  776. ReqID uint64
  777. Reqs []ChtReq
  778. }
  779. if err := msg.Decode(&req); err != nil {
  780. return errResp(ErrDecode, "msg %v: %v", msg, err)
  781. }
  782. // Gather state data until the fetch or network limits is reached
  783. var (
  784. bytes int
  785. proofs []ChtResp
  786. )
  787. reqCnt := len(req.Reqs)
  788. if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
  789. return errResp(ErrRequestRejected, "")
  790. }
  791. for _, req := range req.Reqs {
  792. if bytes >= softResponseLimit {
  793. break
  794. }
  795. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  796. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  797. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  798. var encNumber [8]byte
  799. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  800. proof := tr.Prove(encNumber[:])
  801. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  802. bytes += len(proof) + estHeaderRlpSize
  803. }
  804. }
  805. }
  806. }
  807. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  808. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  809. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  810. case HeaderProofsMsg:
  811. if pm.odr == nil {
  812. return errResp(ErrUnexpectedResponse, "")
  813. }
  814. p.Log().Trace("Received headers proof response")
  815. var resp struct {
  816. ReqID, BV uint64
  817. Data []ChtResp
  818. }
  819. if err := msg.Decode(&resp); err != nil {
  820. return errResp(ErrDecode, "msg %v: %v", msg, err)
  821. }
  822. p.fcServer.GotReply(resp.ReqID, resp.BV)
  823. deliverMsg = &Msg{
  824. MsgType: MsgHeaderProofs,
  825. ReqID: resp.ReqID,
  826. Obj: resp.Data,
  827. }
  828. case SendTxMsg:
  829. if pm.txpool == nil {
  830. return errResp(ErrUnexpectedResponse, "")
  831. }
  832. // Transactions arrived, parse all of them and deliver to the pool
  833. var txs []*types.Transaction
  834. if err := msg.Decode(&txs); err != nil {
  835. return errResp(ErrDecode, "msg %v: %v", msg, err)
  836. }
  837. reqCnt := len(txs)
  838. if reject(uint64(reqCnt), MaxTxSend) {
  839. return errResp(ErrRequestRejected, "")
  840. }
  841. if err := pm.txpool.AddBatch(txs); err != nil {
  842. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  843. }
  844. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  845. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  846. default:
  847. p.Log().Trace("Received unknown message", "code", msg.Code)
  848. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  849. }
  850. if deliverMsg != nil {
  851. err := pm.odr.Deliver(p, deliverMsg)
  852. if err != nil {
  853. p.responseErrors++
  854. if p.responseErrors > maxResponseErrors {
  855. return err
  856. }
  857. }
  858. }
  859. return nil
  860. }
  861. // NodeInfo retrieves some protocol metadata about the running host node.
  862. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  863. return &eth.EthNodeInfo{
  864. Network: self.networkId,
  865. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  866. Genesis: self.blockchain.Genesis().Hash(),
  867. Head: self.blockchain.LastBlockHash(),
  868. }
  869. }