handler.go 27 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/state"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/eth"
  32. "github.com/ethereum/go-ethereum/eth/downloader"
  33. "github.com/ethereum/go-ethereum/ethdb"
  34. "github.com/ethereum/go-ethereum/event"
  35. "github.com/ethereum/go-ethereum/log"
  36. "github.com/ethereum/go-ethereum/p2p"
  37. "github.com/ethereum/go-ethereum/p2p/discover"
  38. "github.com/ethereum/go-ethereum/p2p/discv5"
  39. "github.com/ethereum/go-ethereum/params"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 63 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. disableClientRemovePeer = false
  55. )
  56. // errIncompatibleConfig is returned if the requested protocols and configs are
  57. // not compatible (low protocol version restrictions and high requirements).
  58. var errIncompatibleConfig = errors.New("incompatible configuration")
  59. func errResp(code errCode, format string, v ...interface{}) error {
  60. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  61. }
  62. type BlockChain interface {
  63. HasHeader(hash common.Hash, number uint64) bool
  64. GetHeader(hash common.Hash, number uint64) *types.Header
  65. GetHeaderByHash(hash common.Hash) *types.Header
  66. CurrentHeader() *types.Header
  67. GetTdByHash(hash common.Hash) *big.Int
  68. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  69. Rollback(chain []common.Hash)
  70. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  71. GetHeaderByNumber(number uint64) *types.Header
  72. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  73. LastBlockHash() common.Hash
  74. Genesis() *types.Block
  75. SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
  76. }
  77. type txPool interface {
  78. // AddRemotes should add the given transactions to the pool.
  79. AddRemotes([]*types.Transaction) error
  80. }
  81. type ProtocolManager struct {
  82. lightSync bool
  83. txpool txPool
  84. txrelay *LesTxRelay
  85. networkId uint64
  86. chainConfig *params.ChainConfig
  87. blockchain BlockChain
  88. chainDb ethdb.Database
  89. odr *LesOdr
  90. server *LesServer
  91. serverPool *serverPool
  92. lesTopic discv5.Topic
  93. reqDist *requestDistributor
  94. retriever *retrieveManager
  95. downloader *downloader.Downloader
  96. fetcher *lightFetcher
  97. peers *peerSet
  98. SubProtocols []p2p.Protocol
  99. eventMux *event.TypeMux
  100. // channels for fetcher, syncer, txsyncLoop
  101. newPeerCh chan *peer
  102. quitSync chan struct{}
  103. noMorePeers chan struct{}
  104. // wait group is used for graceful shutdowns during downloading
  105. // and processing
  106. wg *sync.WaitGroup
  107. }
  108. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  109. // with the ethereum network.
  110. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
  111. // Create the protocol manager with the base fields
  112. manager := &ProtocolManager{
  113. lightSync: lightSync,
  114. eventMux: mux,
  115. blockchain: blockchain,
  116. chainConfig: chainConfig,
  117. chainDb: chainDb,
  118. odr: odr,
  119. networkId: networkId,
  120. txpool: txpool,
  121. txrelay: txrelay,
  122. peers: peers,
  123. newPeerCh: make(chan *peer),
  124. quitSync: quitSync,
  125. wg: wg,
  126. noMorePeers: make(chan struct{}),
  127. }
  128. if odr != nil {
  129. manager.retriever = odr.retriever
  130. manager.reqDist = odr.retriever.dist
  131. }
  132. // Initiate a sub-protocol for every implemented version we can handle
  133. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  134. for i, version := range ProtocolVersions {
  135. // Compatible, initialize the sub-protocol
  136. version := version // Closure for the run
  137. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  138. Name: "les",
  139. Version: version,
  140. Length: ProtocolLengths[i],
  141. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  142. var entry *poolEntry
  143. peer := manager.newPeer(int(version), networkId, p, rw)
  144. if manager.serverPool != nil {
  145. addr := p.RemoteAddr().(*net.TCPAddr)
  146. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  147. }
  148. peer.poolEntry = entry
  149. select {
  150. case manager.newPeerCh <- peer:
  151. manager.wg.Add(1)
  152. defer manager.wg.Done()
  153. err := manager.handle(peer)
  154. if entry != nil {
  155. manager.serverPool.disconnect(entry)
  156. }
  157. return err
  158. case <-manager.quitSync:
  159. if entry != nil {
  160. manager.serverPool.disconnect(entry)
  161. }
  162. return p2p.DiscQuitting
  163. }
  164. },
  165. NodeInfo: func() interface{} {
  166. return manager.NodeInfo()
  167. },
  168. PeerInfo: func(id discover.NodeID) interface{} {
  169. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  170. return p.Info()
  171. }
  172. return nil
  173. },
  174. })
  175. }
  176. if len(manager.SubProtocols) == 0 {
  177. return nil, errIncompatibleConfig
  178. }
  179. removePeer := manager.removePeer
  180. if disableClientRemovePeer {
  181. removePeer = func(id string) {}
  182. }
  183. if lightSync {
  184. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
  185. manager.peers.notify((*downloaderPeerNotify)(manager))
  186. manager.fetcher = newLightFetcher(manager)
  187. }
  188. return manager, nil
  189. }
  190. // removePeer initiates disconnection from a peer by removing it from the peer set
  191. func (pm *ProtocolManager) removePeer(id string) {
  192. pm.peers.Unregister(id)
  193. }
  194. func (pm *ProtocolManager) Start() {
  195. if pm.lightSync {
  196. go pm.syncer()
  197. } else {
  198. go func() {
  199. for range pm.newPeerCh {
  200. }
  201. }()
  202. }
  203. }
  204. func (pm *ProtocolManager) Stop() {
  205. // Showing a log message. During download / process this could actually
  206. // take between 5 to 10 seconds and therefor feedback is required.
  207. log.Info("Stopping light Ethereum protocol")
  208. // Quit the sync loop.
  209. // After this send has completed, no new peers will be accepted.
  210. pm.noMorePeers <- struct{}{}
  211. close(pm.quitSync) // quits syncer, fetcher
  212. // Disconnect existing sessions.
  213. // This also closes the gate for any new registrations on the peer set.
  214. // sessions which are already established but not added to pm.peers yet
  215. // will exit when they try to register.
  216. pm.peers.Close()
  217. // Wait for any process action
  218. pm.wg.Wait()
  219. log.Info("Light Ethereum protocol stopped")
  220. }
  221. func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  222. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  223. }
  224. // handle is the callback invoked to manage the life cycle of a les peer. When
  225. // this function terminates, the peer is disconnected.
  226. func (pm *ProtocolManager) handle(p *peer) error {
  227. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  228. // Execute the LES handshake
  229. td, head, genesis := pm.blockchain.Status()
  230. headNum := core.GetBlockNumber(pm.chainDb, head)
  231. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  232. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  233. return err
  234. }
  235. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  236. rw.Init(p.version)
  237. }
  238. // Register the peer locally
  239. if err := pm.peers.Register(p); err != nil {
  240. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  241. return err
  242. }
  243. defer func() {
  244. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  245. p.fcClient.Remove(pm.server.fcManager)
  246. }
  247. pm.removePeer(p.id)
  248. }()
  249. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  250. if pm.lightSync {
  251. p.lock.Lock()
  252. head := p.headInfo
  253. p.lock.Unlock()
  254. if pm.fetcher != nil {
  255. pm.fetcher.announce(p, head)
  256. }
  257. if p.poolEntry != nil {
  258. pm.serverPool.registered(p.poolEntry)
  259. }
  260. }
  261. stop := make(chan struct{})
  262. defer close(stop)
  263. go func() {
  264. // new block announce loop
  265. for {
  266. select {
  267. case announce := <-p.announceChn:
  268. p.SendAnnounce(announce)
  269. case <-stop:
  270. return
  271. }
  272. }
  273. }()
  274. // main loop. handle incoming messages.
  275. for {
  276. if err := pm.handleMsg(p); err != nil {
  277. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  278. return err
  279. }
  280. }
  281. }
  282. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  283. // handleMsg is invoked whenever an inbound message is received from a remote
  284. // peer. The remote connection is torn down upon returning any error.
  285. func (pm *ProtocolManager) handleMsg(p *peer) error {
  286. // Read the next message from the remote peer, and ensure it's fully consumed
  287. msg, err := p.rw.ReadMsg()
  288. if err != nil {
  289. return err
  290. }
  291. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  292. costs := p.fcCosts[msg.Code]
  293. reject := func(reqCnt, maxCnt uint64) bool {
  294. if p.fcClient == nil || reqCnt > maxCnt {
  295. return true
  296. }
  297. bufValue, _ := p.fcClient.AcceptRequest()
  298. cost := costs.baseCost + reqCnt*costs.reqCost
  299. if cost > pm.server.defParams.BufLimit {
  300. cost = pm.server.defParams.BufLimit
  301. }
  302. if cost > bufValue {
  303. recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
  304. p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
  305. return true
  306. }
  307. return false
  308. }
  309. if msg.Size > ProtocolMaxMsgSize {
  310. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  311. }
  312. defer msg.Discard()
  313. var deliverMsg *Msg
  314. // Handle the message depending on its contents
  315. switch msg.Code {
  316. case StatusMsg:
  317. p.Log().Trace("Received status message")
  318. // Status messages should never arrive after the handshake
  319. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  320. // Block header query, collect the requested headers and reply
  321. case AnnounceMsg:
  322. p.Log().Trace("Received announce message")
  323. var req announceData
  324. if err := msg.Decode(&req); err != nil {
  325. return errResp(ErrDecode, "%v: %v", msg, err)
  326. }
  327. p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
  328. if pm.fetcher != nil {
  329. pm.fetcher.announce(p, &req)
  330. }
  331. case GetBlockHeadersMsg:
  332. p.Log().Trace("Received block header request")
  333. // Decode the complex header query
  334. var req struct {
  335. ReqID uint64
  336. Query getBlockHeadersData
  337. }
  338. if err := msg.Decode(&req); err != nil {
  339. return errResp(ErrDecode, "%v: %v", msg, err)
  340. }
  341. query := req.Query
  342. if reject(query.Amount, MaxHeaderFetch) {
  343. return errResp(ErrRequestRejected, "")
  344. }
  345. hashMode := query.Origin.Hash != (common.Hash{})
  346. // Gather headers until the fetch or network limits is reached
  347. var (
  348. bytes common.StorageSize
  349. headers []*types.Header
  350. unknown bool
  351. )
  352. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  353. // Retrieve the next header satisfying the query
  354. var origin *types.Header
  355. if hashMode {
  356. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  357. } else {
  358. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  359. }
  360. if origin == nil {
  361. break
  362. }
  363. number := origin.Number.Uint64()
  364. headers = append(headers, origin)
  365. bytes += estHeaderRlpSize
  366. // Advance to the next header of the query
  367. switch {
  368. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  369. // Hash based traversal towards the genesis block
  370. for i := 0; i < int(query.Skip)+1; i++ {
  371. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  372. query.Origin.Hash = header.ParentHash
  373. number--
  374. } else {
  375. unknown = true
  376. break
  377. }
  378. }
  379. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  380. // Hash based traversal towards the leaf block
  381. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  382. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  383. query.Origin.Hash = header.Hash()
  384. } else {
  385. unknown = true
  386. }
  387. } else {
  388. unknown = true
  389. }
  390. case query.Reverse:
  391. // Number based traversal towards the genesis block
  392. if query.Origin.Number >= query.Skip+1 {
  393. query.Origin.Number -= (query.Skip + 1)
  394. } else {
  395. unknown = true
  396. }
  397. case !query.Reverse:
  398. // Number based traversal towards the leaf block
  399. query.Origin.Number += (query.Skip + 1)
  400. }
  401. }
  402. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  403. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  404. return p.SendBlockHeaders(req.ReqID, bv, headers)
  405. case BlockHeadersMsg:
  406. if pm.downloader == nil {
  407. return errResp(ErrUnexpectedResponse, "")
  408. }
  409. p.Log().Trace("Received block header response message")
  410. // A batch of headers arrived to one of our previous requests
  411. var resp struct {
  412. ReqID, BV uint64
  413. Headers []*types.Header
  414. }
  415. if err := msg.Decode(&resp); err != nil {
  416. return errResp(ErrDecode, "msg %v: %v", msg, err)
  417. }
  418. p.fcServer.GotReply(resp.ReqID, resp.BV)
  419. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  420. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  421. } else {
  422. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  423. if err != nil {
  424. log.Debug(fmt.Sprint(err))
  425. }
  426. }
  427. case GetBlockBodiesMsg:
  428. p.Log().Trace("Received block bodies request")
  429. // Decode the retrieval message
  430. var req struct {
  431. ReqID uint64
  432. Hashes []common.Hash
  433. }
  434. if err := msg.Decode(&req); err != nil {
  435. return errResp(ErrDecode, "msg %v: %v", msg, err)
  436. }
  437. // Gather blocks until the fetch or network limits is reached
  438. var (
  439. bytes int
  440. bodies []rlp.RawValue
  441. )
  442. reqCnt := len(req.Hashes)
  443. if reject(uint64(reqCnt), MaxBodyFetch) {
  444. return errResp(ErrRequestRejected, "")
  445. }
  446. for _, hash := range req.Hashes {
  447. if bytes >= softResponseLimit {
  448. break
  449. }
  450. // Retrieve the requested block body, stopping if enough was found
  451. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  452. bodies = append(bodies, data)
  453. bytes += len(data)
  454. }
  455. }
  456. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  457. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  458. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  459. case BlockBodiesMsg:
  460. if pm.odr == nil {
  461. return errResp(ErrUnexpectedResponse, "")
  462. }
  463. p.Log().Trace("Received block bodies response")
  464. // A batch of block bodies arrived to one of our previous requests
  465. var resp struct {
  466. ReqID, BV uint64
  467. Data []*types.Body
  468. }
  469. if err := msg.Decode(&resp); err != nil {
  470. return errResp(ErrDecode, "msg %v: %v", msg, err)
  471. }
  472. p.fcServer.GotReply(resp.ReqID, resp.BV)
  473. deliverMsg = &Msg{
  474. MsgType: MsgBlockBodies,
  475. ReqID: resp.ReqID,
  476. Obj: resp.Data,
  477. }
  478. case GetCodeMsg:
  479. p.Log().Trace("Received code request")
  480. // Decode the retrieval message
  481. var req struct {
  482. ReqID uint64
  483. Reqs []CodeReq
  484. }
  485. if err := msg.Decode(&req); err != nil {
  486. return errResp(ErrDecode, "msg %v: %v", msg, err)
  487. }
  488. // Gather state data until the fetch or network limits is reached
  489. var (
  490. bytes int
  491. data [][]byte
  492. )
  493. reqCnt := len(req.Reqs)
  494. if reject(uint64(reqCnt), MaxCodeFetch) {
  495. return errResp(ErrRequestRejected, "")
  496. }
  497. for _, req := range req.Reqs {
  498. // Retrieve the requested state entry, stopping if enough was found
  499. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  500. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  501. sdata := trie.Get(req.AccKey)
  502. var acc state.Account
  503. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  504. entry, _ := pm.chainDb.Get(acc.CodeHash)
  505. if bytes+len(entry) >= softResponseLimit {
  506. break
  507. }
  508. data = append(data, entry)
  509. bytes += len(entry)
  510. }
  511. }
  512. }
  513. }
  514. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  515. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  516. return p.SendCode(req.ReqID, bv, data)
  517. case CodeMsg:
  518. if pm.odr == nil {
  519. return errResp(ErrUnexpectedResponse, "")
  520. }
  521. p.Log().Trace("Received code response")
  522. // A batch of node state data arrived to one of our previous requests
  523. var resp struct {
  524. ReqID, BV uint64
  525. Data [][]byte
  526. }
  527. if err := msg.Decode(&resp); err != nil {
  528. return errResp(ErrDecode, "msg %v: %v", msg, err)
  529. }
  530. p.fcServer.GotReply(resp.ReqID, resp.BV)
  531. deliverMsg = &Msg{
  532. MsgType: MsgCode,
  533. ReqID: resp.ReqID,
  534. Obj: resp.Data,
  535. }
  536. case GetReceiptsMsg:
  537. p.Log().Trace("Received receipts request")
  538. // Decode the retrieval message
  539. var req struct {
  540. ReqID uint64
  541. Hashes []common.Hash
  542. }
  543. if err := msg.Decode(&req); err != nil {
  544. return errResp(ErrDecode, "msg %v: %v", msg, err)
  545. }
  546. // Gather state data until the fetch or network limits is reached
  547. var (
  548. bytes int
  549. receipts []rlp.RawValue
  550. )
  551. reqCnt := len(req.Hashes)
  552. if reject(uint64(reqCnt), MaxReceiptFetch) {
  553. return errResp(ErrRequestRejected, "")
  554. }
  555. for _, hash := range req.Hashes {
  556. if bytes >= softResponseLimit {
  557. break
  558. }
  559. // Retrieve the requested block's receipts, skipping if unknown to us
  560. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  561. if results == nil {
  562. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  563. continue
  564. }
  565. }
  566. // If known, encode and queue for response packet
  567. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  568. log.Error("Failed to encode receipt", "err", err)
  569. } else {
  570. receipts = append(receipts, encoded)
  571. bytes += len(encoded)
  572. }
  573. }
  574. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  575. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  576. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  577. case ReceiptsMsg:
  578. if pm.odr == nil {
  579. return errResp(ErrUnexpectedResponse, "")
  580. }
  581. p.Log().Trace("Received receipts response")
  582. // A batch of receipts arrived to one of our previous requests
  583. var resp struct {
  584. ReqID, BV uint64
  585. Receipts []types.Receipts
  586. }
  587. if err := msg.Decode(&resp); err != nil {
  588. return errResp(ErrDecode, "msg %v: %v", msg, err)
  589. }
  590. p.fcServer.GotReply(resp.ReqID, resp.BV)
  591. deliverMsg = &Msg{
  592. MsgType: MsgReceipts,
  593. ReqID: resp.ReqID,
  594. Obj: resp.Receipts,
  595. }
  596. case GetProofsMsg:
  597. p.Log().Trace("Received proofs request")
  598. // Decode the retrieval message
  599. var req struct {
  600. ReqID uint64
  601. Reqs []ProofReq
  602. }
  603. if err := msg.Decode(&req); err != nil {
  604. return errResp(ErrDecode, "msg %v: %v", msg, err)
  605. }
  606. // Gather state data until the fetch or network limits is reached
  607. var (
  608. bytes int
  609. proofs proofsData
  610. )
  611. reqCnt := len(req.Reqs)
  612. if reject(uint64(reqCnt), MaxProofsFetch) {
  613. return errResp(ErrRequestRejected, "")
  614. }
  615. for _, req := range req.Reqs {
  616. if bytes >= softResponseLimit {
  617. break
  618. }
  619. // Retrieve the requested state entry, stopping if enough was found
  620. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  621. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  622. if len(req.AccKey) > 0 {
  623. sdata := tr.Get(req.AccKey)
  624. tr = nil
  625. var acc state.Account
  626. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  627. tr, _ = trie.New(acc.Root, pm.chainDb)
  628. }
  629. }
  630. if tr != nil {
  631. proof := tr.Prove(req.Key)
  632. proofs = append(proofs, proof)
  633. bytes += len(proof)
  634. }
  635. }
  636. }
  637. }
  638. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  639. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  640. return p.SendProofs(req.ReqID, bv, proofs)
  641. case ProofsMsg:
  642. if pm.odr == nil {
  643. return errResp(ErrUnexpectedResponse, "")
  644. }
  645. p.Log().Trace("Received proofs response")
  646. // A batch of merkle proofs arrived to one of our previous requests
  647. var resp struct {
  648. ReqID, BV uint64
  649. Data [][]rlp.RawValue
  650. }
  651. if err := msg.Decode(&resp); err != nil {
  652. return errResp(ErrDecode, "msg %v: %v", msg, err)
  653. }
  654. p.fcServer.GotReply(resp.ReqID, resp.BV)
  655. deliverMsg = &Msg{
  656. MsgType: MsgProofs,
  657. ReqID: resp.ReqID,
  658. Obj: resp.Data,
  659. }
  660. case GetHeaderProofsMsg:
  661. p.Log().Trace("Received headers proof request")
  662. // Decode the retrieval message
  663. var req struct {
  664. ReqID uint64
  665. Reqs []ChtReq
  666. }
  667. if err := msg.Decode(&req); err != nil {
  668. return errResp(ErrDecode, "msg %v: %v", msg, err)
  669. }
  670. // Gather state data until the fetch or network limits is reached
  671. var (
  672. bytes int
  673. proofs []ChtResp
  674. )
  675. reqCnt := len(req.Reqs)
  676. if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
  677. return errResp(ErrRequestRejected, "")
  678. }
  679. for _, req := range req.Reqs {
  680. if bytes >= softResponseLimit {
  681. break
  682. }
  683. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  684. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  685. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  686. var encNumber [8]byte
  687. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  688. proof := tr.Prove(encNumber[:])
  689. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  690. bytes += len(proof) + estHeaderRlpSize
  691. }
  692. }
  693. }
  694. }
  695. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  696. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  697. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  698. case HeaderProofsMsg:
  699. if pm.odr == nil {
  700. return errResp(ErrUnexpectedResponse, "")
  701. }
  702. p.Log().Trace("Received headers proof response")
  703. var resp struct {
  704. ReqID, BV uint64
  705. Data []ChtResp
  706. }
  707. if err := msg.Decode(&resp); err != nil {
  708. return errResp(ErrDecode, "msg %v: %v", msg, err)
  709. }
  710. p.fcServer.GotReply(resp.ReqID, resp.BV)
  711. deliverMsg = &Msg{
  712. MsgType: MsgHeaderProofs,
  713. ReqID: resp.ReqID,
  714. Obj: resp.Data,
  715. }
  716. case SendTxMsg:
  717. if pm.txpool == nil {
  718. return errResp(ErrUnexpectedResponse, "")
  719. }
  720. // Transactions arrived, parse all of them and deliver to the pool
  721. var txs []*types.Transaction
  722. if err := msg.Decode(&txs); err != nil {
  723. return errResp(ErrDecode, "msg %v: %v", msg, err)
  724. }
  725. reqCnt := len(txs)
  726. if reject(uint64(reqCnt), MaxTxSend) {
  727. return errResp(ErrRequestRejected, "")
  728. }
  729. if err := pm.txpool.AddRemotes(txs); err != nil {
  730. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  731. }
  732. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  733. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  734. default:
  735. p.Log().Trace("Received unknown message", "code", msg.Code)
  736. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  737. }
  738. if deliverMsg != nil {
  739. err := pm.retriever.deliver(p, deliverMsg)
  740. if err != nil {
  741. p.responseErrors++
  742. if p.responseErrors > maxResponseErrors {
  743. return err
  744. }
  745. }
  746. }
  747. return nil
  748. }
  749. // NodeInfo retrieves some protocol metadata about the running host node.
  750. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  751. return &eth.EthNodeInfo{
  752. Network: self.networkId,
  753. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  754. Genesis: self.blockchain.Genesis().Hash(),
  755. Head: self.blockchain.LastBlockHash(),
  756. }
  757. }
  758. // downloaderPeerNotify implements peerSetNotify
  759. type downloaderPeerNotify ProtocolManager
  760. type peerConnection struct {
  761. manager *ProtocolManager
  762. peer *peer
  763. }
  764. func (pc *peerConnection) Head() (common.Hash, *big.Int) {
  765. return pc.peer.HeadAndTd()
  766. }
  767. func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  768. reqID := genReqID()
  769. rq := &distReq{
  770. getCost: func(dp distPeer) uint64 {
  771. peer := dp.(*peer)
  772. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  773. },
  774. canSend: func(dp distPeer) bool {
  775. return dp.(*peer) == pc.peer
  776. },
  777. request: func(dp distPeer) func() {
  778. peer := dp.(*peer)
  779. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  780. peer.fcServer.QueueRequest(reqID, cost)
  781. return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
  782. },
  783. }
  784. _, ok := <-pc.manager.reqDist.queue(rq)
  785. if !ok {
  786. return ErrNoPeers
  787. }
  788. return nil
  789. }
  790. func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  791. reqID := genReqID()
  792. rq := &distReq{
  793. getCost: func(dp distPeer) uint64 {
  794. peer := dp.(*peer)
  795. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  796. },
  797. canSend: func(dp distPeer) bool {
  798. return dp.(*peer) == pc.peer
  799. },
  800. request: func(dp distPeer) func() {
  801. peer := dp.(*peer)
  802. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  803. peer.fcServer.QueueRequest(reqID, cost)
  804. return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
  805. },
  806. }
  807. _, ok := <-pc.manager.reqDist.queue(rq)
  808. if !ok {
  809. return ErrNoPeers
  810. }
  811. return nil
  812. }
  813. func (d *downloaderPeerNotify) registerPeer(p *peer) {
  814. pm := (*ProtocolManager)(d)
  815. pc := &peerConnection{
  816. manager: pm,
  817. peer: p,
  818. }
  819. pm.downloader.RegisterLightPeer(p.id, ethVersion, pc)
  820. }
  821. func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
  822. pm := (*ProtocolManager)(d)
  823. pm.downloader.UnregisterPeer(p.id)
  824. }