handler.go 27 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/state"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/eth"
  32. "github.com/ethereum/go-ethereum/eth/downloader"
  33. "github.com/ethereum/go-ethereum/ethdb"
  34. "github.com/ethereum/go-ethereum/event"
  35. "github.com/ethereum/go-ethereum/log"
  36. "github.com/ethereum/go-ethereum/p2p"
  37. "github.com/ethereum/go-ethereum/p2p/discover"
  38. "github.com/ethereum/go-ethereum/p2p/discv5"
  39. "github.com/ethereum/go-ethereum/params"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 63 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. disableClientRemovePeer = false
  55. )
  56. // errIncompatibleConfig is returned if the requested protocols and configs are
  57. // not compatible (low protocol version restrictions and high requirements).
  58. var errIncompatibleConfig = errors.New("incompatible configuration")
  59. func errResp(code errCode, format string, v ...interface{}) error {
  60. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  61. }
  62. type hashFetcherFn func(common.Hash) error
  63. type BlockChain interface {
  64. HasHeader(hash common.Hash) bool
  65. GetHeader(hash common.Hash, number uint64) *types.Header
  66. GetHeaderByHash(hash common.Hash) *types.Header
  67. CurrentHeader() *types.Header
  68. GetTdByHash(hash common.Hash) *big.Int
  69. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  70. Rollback(chain []common.Hash)
  71. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  72. GetHeaderByNumber(number uint64) *types.Header
  73. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  74. LastBlockHash() common.Hash
  75. Genesis() *types.Block
  76. }
  77. type txPool interface {
  78. // AddTransactions should add the given transactions to the pool.
  79. AddBatch([]*types.Transaction) error
  80. }
  81. type ProtocolManager struct {
  82. lightSync bool
  83. txpool txPool
  84. txrelay *LesTxRelay
  85. networkId uint64
  86. chainConfig *params.ChainConfig
  87. blockchain BlockChain
  88. chainDb ethdb.Database
  89. odr *LesOdr
  90. server *LesServer
  91. serverPool *serverPool
  92. lesTopic discv5.Topic
  93. reqDist *requestDistributor
  94. retriever *retrieveManager
  95. downloader *downloader.Downloader
  96. fetcher *lightFetcher
  97. peers *peerSet
  98. SubProtocols []p2p.Protocol
  99. eventMux *event.TypeMux
  100. // channels for fetcher, syncer, txsyncLoop
  101. newPeerCh chan *peer
  102. quitSync chan struct{}
  103. noMorePeers chan struct{}
  104. syncMu sync.Mutex
  105. syncing bool
  106. syncDone chan struct{}
  107. // wait group is used for graceful shutdowns during downloading
  108. // and processing
  109. wg *sync.WaitGroup
  110. }
  111. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  112. // with the ethereum network.
  113. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
  114. // Create the protocol manager with the base fields
  115. manager := &ProtocolManager{
  116. lightSync: lightSync,
  117. eventMux: mux,
  118. blockchain: blockchain,
  119. chainConfig: chainConfig,
  120. chainDb: chainDb,
  121. odr: odr,
  122. networkId: networkId,
  123. txpool: txpool,
  124. txrelay: txrelay,
  125. peers: peers,
  126. newPeerCh: make(chan *peer),
  127. quitSync: quitSync,
  128. wg: wg,
  129. noMorePeers: make(chan struct{}),
  130. }
  131. if odr != nil {
  132. manager.retriever = odr.retriever
  133. manager.reqDist = odr.retriever.dist
  134. }
  135. // Initiate a sub-protocol for every implemented version we can handle
  136. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  137. for i, version := range ProtocolVersions {
  138. // Compatible, initialize the sub-protocol
  139. version := version // Closure for the run
  140. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  141. Name: "les",
  142. Version: version,
  143. Length: ProtocolLengths[i],
  144. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  145. var entry *poolEntry
  146. peer := manager.newPeer(int(version), networkId, p, rw)
  147. if manager.serverPool != nil {
  148. addr := p.RemoteAddr().(*net.TCPAddr)
  149. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  150. }
  151. peer.poolEntry = entry
  152. select {
  153. case manager.newPeerCh <- peer:
  154. manager.wg.Add(1)
  155. defer manager.wg.Done()
  156. err := manager.handle(peer)
  157. if entry != nil {
  158. manager.serverPool.disconnect(entry)
  159. }
  160. return err
  161. case <-manager.quitSync:
  162. if entry != nil {
  163. manager.serverPool.disconnect(entry)
  164. }
  165. return p2p.DiscQuitting
  166. }
  167. },
  168. NodeInfo: func() interface{} {
  169. return manager.NodeInfo()
  170. },
  171. PeerInfo: func(id discover.NodeID) interface{} {
  172. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  173. return p.Info()
  174. }
  175. return nil
  176. },
  177. })
  178. }
  179. if len(manager.SubProtocols) == 0 {
  180. return nil, errIncompatibleConfig
  181. }
  182. removePeer := manager.removePeer
  183. if disableClientRemovePeer {
  184. removePeer = func(id string) {}
  185. }
  186. if lightSync {
  187. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
  188. manager.peers.notify((*downloaderPeerNotify)(manager))
  189. manager.fetcher = newLightFetcher(manager)
  190. }
  191. return manager, nil
  192. }
  193. // removePeer initiates disconnection from a peer by removing it from the peer set
  194. func (pm *ProtocolManager) removePeer(id string) {
  195. pm.peers.Unregister(id)
  196. }
  197. func (pm *ProtocolManager) Start() {
  198. if pm.lightSync {
  199. go pm.syncer()
  200. } else {
  201. go func() {
  202. for range pm.newPeerCh {
  203. }
  204. }()
  205. }
  206. }
  207. func (pm *ProtocolManager) Stop() {
  208. // Showing a log message. During download / process this could actually
  209. // take between 5 to 10 seconds and therefor feedback is required.
  210. log.Info("Stopping light Ethereum protocol")
  211. // Quit the sync loop.
  212. // After this send has completed, no new peers will be accepted.
  213. pm.noMorePeers <- struct{}{}
  214. close(pm.quitSync) // quits syncer, fetcher
  215. // Disconnect existing sessions.
  216. // This also closes the gate for any new registrations on the peer set.
  217. // sessions which are already established but not added to pm.peers yet
  218. // will exit when they try to register.
  219. pm.peers.Close()
  220. // Wait for any process action
  221. pm.wg.Wait()
  222. log.Info("Light Ethereum protocol stopped")
  223. }
  224. func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  225. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  226. }
  227. // handle is the callback invoked to manage the life cycle of a les peer. When
  228. // this function terminates, the peer is disconnected.
  229. func (pm *ProtocolManager) handle(p *peer) error {
  230. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  231. // Execute the LES handshake
  232. td, head, genesis := pm.blockchain.Status()
  233. headNum := core.GetBlockNumber(pm.chainDb, head)
  234. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  235. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  236. return err
  237. }
  238. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  239. rw.Init(p.version)
  240. }
  241. // Register the peer locally
  242. if err := pm.peers.Register(p); err != nil {
  243. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  244. return err
  245. }
  246. defer func() {
  247. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  248. p.fcClient.Remove(pm.server.fcManager)
  249. }
  250. pm.removePeer(p.id)
  251. }()
  252. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  253. if pm.lightSync {
  254. p.lock.Lock()
  255. head := p.headInfo
  256. p.lock.Unlock()
  257. if pm.fetcher != nil {
  258. pm.fetcher.announce(p, head)
  259. }
  260. if p.poolEntry != nil {
  261. pm.serverPool.registered(p.poolEntry)
  262. }
  263. }
  264. stop := make(chan struct{})
  265. defer close(stop)
  266. go func() {
  267. // new block announce loop
  268. for {
  269. select {
  270. case announce := <-p.announceChn:
  271. p.SendAnnounce(announce)
  272. case <-stop:
  273. return
  274. }
  275. }
  276. }()
  277. // main loop. handle incoming messages.
  278. for {
  279. if err := pm.handleMsg(p); err != nil {
  280. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  281. return err
  282. }
  283. }
  284. }
  285. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  286. // handleMsg is invoked whenever an inbound message is received from a remote
  287. // peer. The remote connection is torn down upon returning any error.
  288. func (pm *ProtocolManager) handleMsg(p *peer) error {
  289. // Read the next message from the remote peer, and ensure it's fully consumed
  290. msg, err := p.rw.ReadMsg()
  291. if err != nil {
  292. return err
  293. }
  294. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  295. costs := p.fcCosts[msg.Code]
  296. reject := func(reqCnt, maxCnt uint64) bool {
  297. if p.fcClient == nil || reqCnt > maxCnt {
  298. return true
  299. }
  300. bufValue, _ := p.fcClient.AcceptRequest()
  301. cost := costs.baseCost + reqCnt*costs.reqCost
  302. if cost > pm.server.defParams.BufLimit {
  303. cost = pm.server.defParams.BufLimit
  304. }
  305. if cost > bufValue {
  306. recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
  307. p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
  308. return true
  309. }
  310. return false
  311. }
  312. if msg.Size > ProtocolMaxMsgSize {
  313. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  314. }
  315. defer msg.Discard()
  316. var deliverMsg *Msg
  317. // Handle the message depending on its contents
  318. switch msg.Code {
  319. case StatusMsg:
  320. p.Log().Trace("Received status message")
  321. // Status messages should never arrive after the handshake
  322. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  323. // Block header query, collect the requested headers and reply
  324. case AnnounceMsg:
  325. p.Log().Trace("Received announce message")
  326. var req announceData
  327. if err := msg.Decode(&req); err != nil {
  328. return errResp(ErrDecode, "%v: %v", msg, err)
  329. }
  330. p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
  331. if pm.fetcher != nil {
  332. pm.fetcher.announce(p, &req)
  333. }
  334. case GetBlockHeadersMsg:
  335. p.Log().Trace("Received block header request")
  336. // Decode the complex header query
  337. var req struct {
  338. ReqID uint64
  339. Query getBlockHeadersData
  340. }
  341. if err := msg.Decode(&req); err != nil {
  342. return errResp(ErrDecode, "%v: %v", msg, err)
  343. }
  344. query := req.Query
  345. if reject(query.Amount, MaxHeaderFetch) {
  346. return errResp(ErrRequestRejected, "")
  347. }
  348. hashMode := query.Origin.Hash != (common.Hash{})
  349. // Gather headers until the fetch or network limits is reached
  350. var (
  351. bytes common.StorageSize
  352. headers []*types.Header
  353. unknown bool
  354. )
  355. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  356. // Retrieve the next header satisfying the query
  357. var origin *types.Header
  358. if hashMode {
  359. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  360. } else {
  361. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  362. }
  363. if origin == nil {
  364. break
  365. }
  366. number := origin.Number.Uint64()
  367. headers = append(headers, origin)
  368. bytes += estHeaderRlpSize
  369. // Advance to the next header of the query
  370. switch {
  371. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  372. // Hash based traversal towards the genesis block
  373. for i := 0; i < int(query.Skip)+1; i++ {
  374. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  375. query.Origin.Hash = header.ParentHash
  376. number--
  377. } else {
  378. unknown = true
  379. break
  380. }
  381. }
  382. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  383. // Hash based traversal towards the leaf block
  384. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  385. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  386. query.Origin.Hash = header.Hash()
  387. } else {
  388. unknown = true
  389. }
  390. } else {
  391. unknown = true
  392. }
  393. case query.Reverse:
  394. // Number based traversal towards the genesis block
  395. if query.Origin.Number >= query.Skip+1 {
  396. query.Origin.Number -= (query.Skip + 1)
  397. } else {
  398. unknown = true
  399. }
  400. case !query.Reverse:
  401. // Number based traversal towards the leaf block
  402. query.Origin.Number += (query.Skip + 1)
  403. }
  404. }
  405. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  406. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  407. return p.SendBlockHeaders(req.ReqID, bv, headers)
  408. case BlockHeadersMsg:
  409. if pm.downloader == nil {
  410. return errResp(ErrUnexpectedResponse, "")
  411. }
  412. p.Log().Trace("Received block header response message")
  413. // A batch of headers arrived to one of our previous requests
  414. var resp struct {
  415. ReqID, BV uint64
  416. Headers []*types.Header
  417. }
  418. if err := msg.Decode(&resp); err != nil {
  419. return errResp(ErrDecode, "msg %v: %v", msg, err)
  420. }
  421. p.fcServer.GotReply(resp.ReqID, resp.BV)
  422. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  423. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  424. } else {
  425. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  426. if err != nil {
  427. log.Debug(fmt.Sprint(err))
  428. }
  429. }
  430. case GetBlockBodiesMsg:
  431. p.Log().Trace("Received block bodies request")
  432. // Decode the retrieval message
  433. var req struct {
  434. ReqID uint64
  435. Hashes []common.Hash
  436. }
  437. if err := msg.Decode(&req); err != nil {
  438. return errResp(ErrDecode, "msg %v: %v", msg, err)
  439. }
  440. // Gather blocks until the fetch or network limits is reached
  441. var (
  442. bytes int
  443. bodies []rlp.RawValue
  444. )
  445. reqCnt := len(req.Hashes)
  446. if reject(uint64(reqCnt), MaxBodyFetch) {
  447. return errResp(ErrRequestRejected, "")
  448. }
  449. for _, hash := range req.Hashes {
  450. if bytes >= softResponseLimit {
  451. break
  452. }
  453. // Retrieve the requested block body, stopping if enough was found
  454. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  455. bodies = append(bodies, data)
  456. bytes += len(data)
  457. }
  458. }
  459. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  460. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  461. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  462. case BlockBodiesMsg:
  463. if pm.odr == nil {
  464. return errResp(ErrUnexpectedResponse, "")
  465. }
  466. p.Log().Trace("Received block bodies response")
  467. // A batch of block bodies arrived to one of our previous requests
  468. var resp struct {
  469. ReqID, BV uint64
  470. Data []*types.Body
  471. }
  472. if err := msg.Decode(&resp); err != nil {
  473. return errResp(ErrDecode, "msg %v: %v", msg, err)
  474. }
  475. p.fcServer.GotReply(resp.ReqID, resp.BV)
  476. deliverMsg = &Msg{
  477. MsgType: MsgBlockBodies,
  478. ReqID: resp.ReqID,
  479. Obj: resp.Data,
  480. }
  481. case GetCodeMsg:
  482. p.Log().Trace("Received code request")
  483. // Decode the retrieval message
  484. var req struct {
  485. ReqID uint64
  486. Reqs []CodeReq
  487. }
  488. if err := msg.Decode(&req); err != nil {
  489. return errResp(ErrDecode, "msg %v: %v", msg, err)
  490. }
  491. // Gather state data until the fetch or network limits is reached
  492. var (
  493. bytes int
  494. data [][]byte
  495. )
  496. reqCnt := len(req.Reqs)
  497. if reject(uint64(reqCnt), MaxCodeFetch) {
  498. return errResp(ErrRequestRejected, "")
  499. }
  500. for _, req := range req.Reqs {
  501. // Retrieve the requested state entry, stopping if enough was found
  502. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  503. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  504. sdata := trie.Get(req.AccKey)
  505. var acc state.Account
  506. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  507. entry, _ := pm.chainDb.Get(acc.CodeHash)
  508. if bytes+len(entry) >= softResponseLimit {
  509. break
  510. }
  511. data = append(data, entry)
  512. bytes += len(entry)
  513. }
  514. }
  515. }
  516. }
  517. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  518. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  519. return p.SendCode(req.ReqID, bv, data)
  520. case CodeMsg:
  521. if pm.odr == nil {
  522. return errResp(ErrUnexpectedResponse, "")
  523. }
  524. p.Log().Trace("Received code response")
  525. // A batch of node state data arrived to one of our previous requests
  526. var resp struct {
  527. ReqID, BV uint64
  528. Data [][]byte
  529. }
  530. if err := msg.Decode(&resp); err != nil {
  531. return errResp(ErrDecode, "msg %v: %v", msg, err)
  532. }
  533. p.fcServer.GotReply(resp.ReqID, resp.BV)
  534. deliverMsg = &Msg{
  535. MsgType: MsgCode,
  536. ReqID: resp.ReqID,
  537. Obj: resp.Data,
  538. }
  539. case GetReceiptsMsg:
  540. p.Log().Trace("Received receipts request")
  541. // Decode the retrieval message
  542. var req struct {
  543. ReqID uint64
  544. Hashes []common.Hash
  545. }
  546. if err := msg.Decode(&req); err != nil {
  547. return errResp(ErrDecode, "msg %v: %v", msg, err)
  548. }
  549. // Gather state data until the fetch or network limits is reached
  550. var (
  551. bytes int
  552. receipts []rlp.RawValue
  553. )
  554. reqCnt := len(req.Hashes)
  555. if reject(uint64(reqCnt), MaxReceiptFetch) {
  556. return errResp(ErrRequestRejected, "")
  557. }
  558. for _, hash := range req.Hashes {
  559. if bytes >= softResponseLimit {
  560. break
  561. }
  562. // Retrieve the requested block's receipts, skipping if unknown to us
  563. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  564. if results == nil {
  565. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  566. continue
  567. }
  568. }
  569. // If known, encode and queue for response packet
  570. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  571. log.Error("Failed to encode receipt", "err", err)
  572. } else {
  573. receipts = append(receipts, encoded)
  574. bytes += len(encoded)
  575. }
  576. }
  577. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  578. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  579. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  580. case ReceiptsMsg:
  581. if pm.odr == nil {
  582. return errResp(ErrUnexpectedResponse, "")
  583. }
  584. p.Log().Trace("Received receipts response")
  585. // A batch of receipts arrived to one of our previous requests
  586. var resp struct {
  587. ReqID, BV uint64
  588. Receipts []types.Receipts
  589. }
  590. if err := msg.Decode(&resp); err != nil {
  591. return errResp(ErrDecode, "msg %v: %v", msg, err)
  592. }
  593. p.fcServer.GotReply(resp.ReqID, resp.BV)
  594. deliverMsg = &Msg{
  595. MsgType: MsgReceipts,
  596. ReqID: resp.ReqID,
  597. Obj: resp.Receipts,
  598. }
  599. case GetProofsMsg:
  600. p.Log().Trace("Received proofs request")
  601. // Decode the retrieval message
  602. var req struct {
  603. ReqID uint64
  604. Reqs []ProofReq
  605. }
  606. if err := msg.Decode(&req); err != nil {
  607. return errResp(ErrDecode, "msg %v: %v", msg, err)
  608. }
  609. // Gather state data until the fetch or network limits is reached
  610. var (
  611. bytes int
  612. proofs proofsData
  613. )
  614. reqCnt := len(req.Reqs)
  615. if reject(uint64(reqCnt), MaxProofsFetch) {
  616. return errResp(ErrRequestRejected, "")
  617. }
  618. for _, req := range req.Reqs {
  619. if bytes >= softResponseLimit {
  620. break
  621. }
  622. // Retrieve the requested state entry, stopping if enough was found
  623. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  624. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  625. if len(req.AccKey) > 0 {
  626. sdata := tr.Get(req.AccKey)
  627. tr = nil
  628. var acc state.Account
  629. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  630. tr, _ = trie.New(acc.Root, pm.chainDb)
  631. }
  632. }
  633. if tr != nil {
  634. proof := tr.Prove(req.Key)
  635. proofs = append(proofs, proof)
  636. bytes += len(proof)
  637. }
  638. }
  639. }
  640. }
  641. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  642. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  643. return p.SendProofs(req.ReqID, bv, proofs)
  644. case ProofsMsg:
  645. if pm.odr == nil {
  646. return errResp(ErrUnexpectedResponse, "")
  647. }
  648. p.Log().Trace("Received proofs response")
  649. // A batch of merkle proofs arrived to one of our previous requests
  650. var resp struct {
  651. ReqID, BV uint64
  652. Data [][]rlp.RawValue
  653. }
  654. if err := msg.Decode(&resp); err != nil {
  655. return errResp(ErrDecode, "msg %v: %v", msg, err)
  656. }
  657. p.fcServer.GotReply(resp.ReqID, resp.BV)
  658. deliverMsg = &Msg{
  659. MsgType: MsgProofs,
  660. ReqID: resp.ReqID,
  661. Obj: resp.Data,
  662. }
  663. case GetHeaderProofsMsg:
  664. p.Log().Trace("Received headers proof request")
  665. // Decode the retrieval message
  666. var req struct {
  667. ReqID uint64
  668. Reqs []ChtReq
  669. }
  670. if err := msg.Decode(&req); err != nil {
  671. return errResp(ErrDecode, "msg %v: %v", msg, err)
  672. }
  673. // Gather state data until the fetch or network limits is reached
  674. var (
  675. bytes int
  676. proofs []ChtResp
  677. )
  678. reqCnt := len(req.Reqs)
  679. if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
  680. return errResp(ErrRequestRejected, "")
  681. }
  682. for _, req := range req.Reqs {
  683. if bytes >= softResponseLimit {
  684. break
  685. }
  686. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  687. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  688. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  689. var encNumber [8]byte
  690. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  691. proof := tr.Prove(encNumber[:])
  692. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  693. bytes += len(proof) + estHeaderRlpSize
  694. }
  695. }
  696. }
  697. }
  698. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  699. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  700. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  701. case HeaderProofsMsg:
  702. if pm.odr == nil {
  703. return errResp(ErrUnexpectedResponse, "")
  704. }
  705. p.Log().Trace("Received headers proof response")
  706. var resp struct {
  707. ReqID, BV uint64
  708. Data []ChtResp
  709. }
  710. if err := msg.Decode(&resp); err != nil {
  711. return errResp(ErrDecode, "msg %v: %v", msg, err)
  712. }
  713. p.fcServer.GotReply(resp.ReqID, resp.BV)
  714. deliverMsg = &Msg{
  715. MsgType: MsgHeaderProofs,
  716. ReqID: resp.ReqID,
  717. Obj: resp.Data,
  718. }
  719. case SendTxMsg:
  720. if pm.txpool == nil {
  721. return errResp(ErrUnexpectedResponse, "")
  722. }
  723. // Transactions arrived, parse all of them and deliver to the pool
  724. var txs []*types.Transaction
  725. if err := msg.Decode(&txs); err != nil {
  726. return errResp(ErrDecode, "msg %v: %v", msg, err)
  727. }
  728. reqCnt := len(txs)
  729. if reject(uint64(reqCnt), MaxTxSend) {
  730. return errResp(ErrRequestRejected, "")
  731. }
  732. if err := pm.txpool.AddBatch(txs); err != nil {
  733. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  734. }
  735. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  736. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  737. default:
  738. p.Log().Trace("Received unknown message", "code", msg.Code)
  739. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  740. }
  741. if deliverMsg != nil {
  742. err := pm.retriever.deliver(p, deliverMsg)
  743. if err != nil {
  744. p.responseErrors++
  745. if p.responseErrors > maxResponseErrors {
  746. return err
  747. }
  748. }
  749. }
  750. return nil
  751. }
  752. // NodeInfo retrieves some protocol metadata about the running host node.
  753. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  754. return &eth.EthNodeInfo{
  755. Network: self.networkId,
  756. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  757. Genesis: self.blockchain.Genesis().Hash(),
  758. Head: self.blockchain.LastBlockHash(),
  759. }
  760. }
  761. // downloaderPeerNotify implements peerSetNotify
  762. type downloaderPeerNotify ProtocolManager
  763. type peerConnection struct {
  764. manager *ProtocolManager
  765. peer *peer
  766. }
  767. func (pc *peerConnection) Head() (common.Hash, *big.Int) {
  768. return pc.peer.HeadAndTd()
  769. }
  770. func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  771. reqID := genReqID()
  772. rq := &distReq{
  773. getCost: func(dp distPeer) uint64 {
  774. peer := dp.(*peer)
  775. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  776. },
  777. canSend: func(dp distPeer) bool {
  778. return dp.(*peer) == pc.peer
  779. },
  780. request: func(dp distPeer) func() {
  781. peer := dp.(*peer)
  782. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  783. peer.fcServer.QueueRequest(reqID, cost)
  784. return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
  785. },
  786. }
  787. _, ok := <-pc.manager.reqDist.queue(rq)
  788. if !ok {
  789. return ErrNoPeers
  790. }
  791. return nil
  792. }
  793. func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  794. reqID := genReqID()
  795. rq := &distReq{
  796. getCost: func(dp distPeer) uint64 {
  797. peer := dp.(*peer)
  798. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  799. },
  800. canSend: func(dp distPeer) bool {
  801. return dp.(*peer) == pc.peer
  802. },
  803. request: func(dp distPeer) func() {
  804. peer := dp.(*peer)
  805. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  806. peer.fcServer.QueueRequest(reqID, cost)
  807. return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
  808. },
  809. }
  810. _, ok := <-pc.manager.reqDist.queue(rq)
  811. if !ok {
  812. return ErrNoPeers
  813. }
  814. return nil
  815. }
  816. func (pc *peerConnection) RequestBodies(hashes []common.Hash) error {
  817. panic("RequestBodies not supported in light client mode sync")
  818. }
  819. func (pc *peerConnection) RequestReceipts(hashes []common.Hash) error {
  820. panic("RequestReceipts not supported in light client mode sync")
  821. }
  822. func (pc *peerConnection) RequestNodeData(hashes []common.Hash) error {
  823. panic("RequestNodeData not supported in light client mode sync")
  824. }
  825. func (d *downloaderPeerNotify) registerPeer(p *peer) {
  826. pm := (*ProtocolManager)(d)
  827. pc := &peerConnection{
  828. manager: pm,
  829. peer: p,
  830. }
  831. pm.downloader.RegisterPeer(p.id, ethVersion, pc)
  832. }
  833. func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
  834. pm := (*ProtocolManager)(d)
  835. pm.downloader.UnregisterPeer(p.id)
  836. }