handler.go 28 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/eth"
  31. "github.com/ethereum/go-ethereum/eth/downloader"
  32. "github.com/ethereum/go-ethereum/ethdb"
  33. "github.com/ethereum/go-ethereum/event"
  34. "github.com/ethereum/go-ethereum/logger"
  35. "github.com/ethereum/go-ethereum/logger/glog"
  36. "github.com/ethereum/go-ethereum/p2p"
  37. "github.com/ethereum/go-ethereum/p2p/discover"
  38. "github.com/ethereum/go-ethereum/p2p/discv5"
  39. "github.com/ethereum/go-ethereum/params"
  40. "github.com/ethereum/go-ethereum/pow"
  41. "github.com/ethereum/go-ethereum/rlp"
  42. "github.com/ethereum/go-ethereum/trie"
  43. )
  44. const (
  45. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  46. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  47. ethVersion = 63 // equivalent eth version for the downloader
  48. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  49. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  50. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  51. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  52. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  54. MaxTxSend = 64 // Amount of transactions to be send per request
  55. disableClientRemovePeer = false
  56. )
  57. // errIncompatibleConfig is returned if the requested protocols and configs are
  58. // not compatible (low protocol version restrictions and high requirements).
  59. var errIncompatibleConfig = errors.New("incompatible configuration")
  60. func errResp(code errCode, format string, v ...interface{}) error {
  61. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  62. }
  63. type hashFetcherFn func(common.Hash) error
  64. type BlockChain interface {
  65. HasHeader(hash common.Hash) bool
  66. GetHeader(hash common.Hash, number uint64) *types.Header
  67. GetHeaderByHash(hash common.Hash) *types.Header
  68. CurrentHeader() *types.Header
  69. GetTdByHash(hash common.Hash) *big.Int
  70. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  71. Rollback(chain []common.Hash)
  72. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  73. GetHeaderByNumber(number uint64) *types.Header
  74. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  75. LastBlockHash() common.Hash
  76. Genesis() *types.Block
  77. }
  78. type txPool interface {
  79. // AddTransactions should add the given transactions to the pool.
  80. AddBatch([]*types.Transaction) error
  81. }
  82. type ProtocolManager struct {
  83. lightSync bool
  84. txpool txPool
  85. txrelay *LesTxRelay
  86. networkId int
  87. chainConfig *params.ChainConfig
  88. blockchain BlockChain
  89. chainDb ethdb.Database
  90. odr *LesOdr
  91. server *LesServer
  92. serverPool *serverPool
  93. downloader *downloader.Downloader
  94. fetcher *lightFetcher
  95. peers *peerSet
  96. SubProtocols []p2p.Protocol
  97. eventMux *event.TypeMux
  98. // channels for fetcher, syncer, txsyncLoop
  99. newPeerCh chan *peer
  100. quitSync chan struct{}
  101. noMorePeers chan struct{}
  102. syncMu sync.Mutex
  103. syncing bool
  104. syncDone chan struct{}
  105. // wait group is used for graceful shutdowns during downloading
  106. // and processing
  107. wg sync.WaitGroup
  108. }
  109. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  110. // with the ethereum network.
  111. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, pow pow.PoW, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
  112. // Create the protocol manager with the base fields
  113. manager := &ProtocolManager{
  114. lightSync: lightSync,
  115. eventMux: mux,
  116. blockchain: blockchain,
  117. chainConfig: chainConfig,
  118. chainDb: chainDb,
  119. networkId: networkId,
  120. txpool: txpool,
  121. txrelay: txrelay,
  122. odr: odr,
  123. peers: newPeerSet(),
  124. newPeerCh: make(chan *peer),
  125. quitSync: make(chan struct{}),
  126. noMorePeers: make(chan struct{}),
  127. }
  128. // Initiate a sub-protocol for every implemented version we can handle
  129. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  130. for i, version := range ProtocolVersions {
  131. // Compatible, initialize the sub-protocol
  132. version := version // Closure for the run
  133. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  134. Name: "les",
  135. Version: version,
  136. Length: ProtocolLengths[i],
  137. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  138. var entry *poolEntry
  139. peer := manager.newPeer(int(version), networkId, p, rw)
  140. if manager.serverPool != nil {
  141. addr := p.RemoteAddr().(*net.TCPAddr)
  142. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  143. if entry == nil {
  144. return fmt.Errorf("unwanted connection")
  145. }
  146. }
  147. peer.poolEntry = entry
  148. select {
  149. case manager.newPeerCh <- peer:
  150. manager.wg.Add(1)
  151. defer manager.wg.Done()
  152. err := manager.handle(peer)
  153. if entry != nil {
  154. manager.serverPool.disconnect(entry)
  155. }
  156. return err
  157. case <-manager.quitSync:
  158. if entry != nil {
  159. manager.serverPool.disconnect(entry)
  160. }
  161. return p2p.DiscQuitting
  162. }
  163. },
  164. NodeInfo: func() interface{} {
  165. return manager.NodeInfo()
  166. },
  167. PeerInfo: func(id discover.NodeID) interface{} {
  168. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  169. return p.Info()
  170. }
  171. return nil
  172. },
  173. })
  174. }
  175. if len(manager.SubProtocols) == 0 {
  176. return nil, errIncompatibleConfig
  177. }
  178. removePeer := manager.removePeer
  179. if disableClientRemovePeer {
  180. removePeer = func(id string) {}
  181. }
  182. if lightSync {
  183. glog.V(logger.Debug).Infof("LES: create downloader")
  184. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
  185. nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
  186. blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
  187. }
  188. if odr != nil {
  189. odr.removePeer = removePeer
  190. }
  191. /*validator := func(block *types.Block, parent *types.Block) error {
  192. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  193. }
  194. heighter := func() uint64 {
  195. return chainman.LastBlockNumberU64()
  196. }
  197. manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
  198. */
  199. return manager, nil
  200. }
  201. func (pm *ProtocolManager) removePeer(id string) {
  202. // Short circuit if the peer was already removed
  203. peer := pm.peers.Peer(id)
  204. if peer == nil {
  205. return
  206. }
  207. if err := pm.peers.Unregister(id); err != nil {
  208. if err == errNotRegistered {
  209. return
  210. }
  211. glog.V(logger.Error).Infoln("Removal failed:", err)
  212. }
  213. glog.V(logger.Debug).Infoln("Removing peer", id)
  214. // Unregister the peer from the downloader and Ethereum peer set
  215. glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
  216. if pm.lightSync {
  217. pm.downloader.UnregisterPeer(id)
  218. if pm.txrelay != nil {
  219. pm.txrelay.removePeer(id)
  220. }
  221. if pm.fetcher != nil {
  222. pm.fetcher.removePeer(peer)
  223. }
  224. }
  225. // Hard disconnect at the networking layer
  226. if peer != nil {
  227. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  228. }
  229. }
  230. func (pm *ProtocolManager) Start(srvr *p2p.Server) {
  231. var topicDisc *discv5.Network
  232. if srvr != nil {
  233. topicDisc = srvr.DiscV5
  234. }
  235. lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
  236. if pm.lightSync {
  237. // start sync handler
  238. if srvr != nil { // srvr is nil during testing
  239. pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
  240. pm.odr.serverPool = pm.serverPool
  241. pm.fetcher = newLightFetcher(pm)
  242. }
  243. go pm.syncer()
  244. } else {
  245. if topicDisc != nil {
  246. go func() {
  247. glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic))
  248. topicDisc.RegisterTopic(lesTopic, pm.quitSync)
  249. glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic))
  250. }()
  251. }
  252. go func() {
  253. for range pm.newPeerCh {
  254. }
  255. }()
  256. }
  257. }
  258. func (pm *ProtocolManager) Stop() {
  259. // Showing a log message. During download / process this could actually
  260. // take between 5 to 10 seconds and therefor feedback is required.
  261. glog.V(logger.Info).Infoln("Stopping light ethereum protocol handler...")
  262. // Quit the sync loop.
  263. // After this send has completed, no new peers will be accepted.
  264. pm.noMorePeers <- struct{}{}
  265. close(pm.quitSync) // quits syncer, fetcher
  266. // Disconnect existing sessions.
  267. // This also closes the gate for any new registrations on the peer set.
  268. // sessions which are already established but not added to pm.peers yet
  269. // will exit when they try to register.
  270. pm.peers.Close()
  271. // Wait for any process action
  272. pm.wg.Wait()
  273. glog.V(logger.Info).Infoln("Light ethereum protocol handler stopped")
  274. }
  275. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  276. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  277. }
  278. // handle is the callback invoked to manage the life cycle of a les peer. When
  279. // this function terminates, the peer is disconnected.
  280. func (pm *ProtocolManager) handle(p *peer) error {
  281. glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
  282. // Execute the LES handshake
  283. td, head, genesis := pm.blockchain.Status()
  284. headNum := core.GetBlockNumber(pm.chainDb, head)
  285. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  286. glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
  287. return err
  288. }
  289. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  290. rw.Init(p.version)
  291. }
  292. // Register the peer locally
  293. glog.V(logger.Detail).Infof("%v: adding peer", p)
  294. if err := pm.peers.Register(p); err != nil {
  295. glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
  296. return err
  297. }
  298. defer func() {
  299. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  300. p.fcClient.Remove(pm.server.fcManager)
  301. }
  302. pm.removePeer(p.id)
  303. }()
  304. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  305. glog.V(logger.Debug).Infof("LES: register peer %v", p.id)
  306. if pm.lightSync {
  307. requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
  308. reqID := getNextReqID()
  309. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  310. p.fcServer.MustAssignRequest(reqID)
  311. p.fcServer.SendRequest(reqID, cost)
  312. return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
  313. }
  314. requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
  315. reqID := getNextReqID()
  316. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  317. p.fcServer.MustAssignRequest(reqID)
  318. p.fcServer.SendRequest(reqID, cost)
  319. return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
  320. }
  321. if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
  322. requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
  323. return err
  324. }
  325. if pm.txrelay != nil {
  326. pm.txrelay.addPeer(p)
  327. }
  328. p.lock.Lock()
  329. head := p.headInfo
  330. p.lock.Unlock()
  331. if pm.fetcher != nil {
  332. pm.fetcher.addPeer(p)
  333. pm.fetcher.announce(p, head)
  334. }
  335. if p.poolEntry != nil {
  336. pm.serverPool.registered(p.poolEntry)
  337. }
  338. }
  339. stop := make(chan struct{})
  340. defer close(stop)
  341. go func() {
  342. // new block announce loop
  343. for {
  344. select {
  345. case announce := <-p.announceChn:
  346. p.SendAnnounce(announce)
  347. case <-stop:
  348. return
  349. }
  350. }
  351. }()
  352. // main loop. handle incoming messages.
  353. for {
  354. if err := pm.handleMsg(p); err != nil {
  355. glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
  356. return err
  357. }
  358. }
  359. }
  360. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  361. // handleMsg is invoked whenever an inbound message is received from a remote
  362. // peer. The remote connection is torn down upon returning any error.
  363. func (pm *ProtocolManager) handleMsg(p *peer) error {
  364. // Read the next message from the remote peer, and ensure it's fully consumed
  365. msg, err := p.rw.ReadMsg()
  366. if err != nil {
  367. return err
  368. }
  369. glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size)
  370. costs := p.fcCosts[msg.Code]
  371. reject := func(reqCnt, maxCnt uint64) bool {
  372. if p.fcClient == nil || reqCnt > maxCnt {
  373. return true
  374. }
  375. bufValue, _ := p.fcClient.AcceptRequest()
  376. cost := costs.baseCost + reqCnt*costs.reqCost
  377. if cost > pm.server.defParams.BufLimit {
  378. cost = pm.server.defParams.BufLimit
  379. }
  380. if cost > bufValue {
  381. glog.V(logger.Error).Infof("Request from %v came %v too early", p.id, time.Duration((cost-bufValue)*1000000/pm.server.defParams.MinRecharge))
  382. return true
  383. }
  384. return false
  385. }
  386. if msg.Size > ProtocolMaxMsgSize {
  387. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  388. }
  389. defer msg.Discard()
  390. var deliverMsg *Msg
  391. // Handle the message depending on its contents
  392. switch msg.Code {
  393. case StatusMsg:
  394. glog.V(logger.Debug).Infof("<=== StatusMsg from peer %v", p.id)
  395. // Status messages should never arrive after the handshake
  396. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  397. // Block header query, collect the requested headers and reply
  398. case AnnounceMsg:
  399. glog.V(logger.Debug).Infoln("<=== AnnounceMsg from peer %v:", p.id)
  400. var req announceData
  401. if err := msg.Decode(&req); err != nil {
  402. return errResp(ErrDecode, "%v: %v", msg, err)
  403. }
  404. glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
  405. if pm.fetcher != nil {
  406. pm.fetcher.announce(p, &req)
  407. }
  408. case GetBlockHeadersMsg:
  409. glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
  410. // Decode the complex header query
  411. var req struct {
  412. ReqID uint64
  413. Query getBlockHeadersData
  414. }
  415. if err := msg.Decode(&req); err != nil {
  416. return errResp(ErrDecode, "%v: %v", msg, err)
  417. }
  418. query := req.Query
  419. if reject(query.Amount, MaxHeaderFetch) {
  420. return errResp(ErrRequestRejected, "")
  421. }
  422. hashMode := query.Origin.Hash != (common.Hash{})
  423. // Gather headers until the fetch or network limits is reached
  424. var (
  425. bytes common.StorageSize
  426. headers []*types.Header
  427. unknown bool
  428. )
  429. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  430. // Retrieve the next header satisfying the query
  431. var origin *types.Header
  432. if hashMode {
  433. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  434. } else {
  435. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  436. }
  437. if origin == nil {
  438. break
  439. }
  440. number := origin.Number.Uint64()
  441. headers = append(headers, origin)
  442. bytes += estHeaderRlpSize
  443. // Advance to the next header of the query
  444. switch {
  445. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  446. // Hash based traversal towards the genesis block
  447. for i := 0; i < int(query.Skip)+1; i++ {
  448. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  449. query.Origin.Hash = header.ParentHash
  450. number--
  451. } else {
  452. unknown = true
  453. break
  454. }
  455. }
  456. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  457. // Hash based traversal towards the leaf block
  458. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  459. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  460. query.Origin.Hash = header.Hash()
  461. } else {
  462. unknown = true
  463. }
  464. } else {
  465. unknown = true
  466. }
  467. case query.Reverse:
  468. // Number based traversal towards the genesis block
  469. if query.Origin.Number >= query.Skip+1 {
  470. query.Origin.Number -= (query.Skip + 1)
  471. } else {
  472. unknown = true
  473. }
  474. case !query.Reverse:
  475. // Number based traversal towards the leaf block
  476. query.Origin.Number += (query.Skip + 1)
  477. }
  478. }
  479. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  480. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  481. return p.SendBlockHeaders(req.ReqID, bv, headers)
  482. case BlockHeadersMsg:
  483. if pm.downloader == nil {
  484. return errResp(ErrUnexpectedResponse, "")
  485. }
  486. glog.V(logger.Debug).Infof("<=== BlockHeadersMsg from peer %v", p.id)
  487. // A batch of headers arrived to one of our previous requests
  488. var resp struct {
  489. ReqID, BV uint64
  490. Headers []*types.Header
  491. }
  492. if err := msg.Decode(&resp); err != nil {
  493. return errResp(ErrDecode, "msg %v: %v", msg, err)
  494. }
  495. p.fcServer.GotReply(resp.ReqID, resp.BV)
  496. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  497. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  498. } else {
  499. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  500. if err != nil {
  501. glog.V(logger.Debug).Infoln(err)
  502. }
  503. }
  504. case GetBlockBodiesMsg:
  505. glog.V(logger.Debug).Infof("<=== GetBlockBodiesMsg from peer %v", p.id)
  506. // Decode the retrieval message
  507. var req struct {
  508. ReqID uint64
  509. Hashes []common.Hash
  510. }
  511. if err := msg.Decode(&req); err != nil {
  512. return errResp(ErrDecode, "msg %v: %v", msg, err)
  513. }
  514. // Gather blocks until the fetch or network limits is reached
  515. var (
  516. bytes int
  517. bodies []rlp.RawValue
  518. )
  519. reqCnt := len(req.Hashes)
  520. if reject(uint64(reqCnt), MaxBodyFetch) {
  521. return errResp(ErrRequestRejected, "")
  522. }
  523. for _, hash := range req.Hashes {
  524. if bytes >= softResponseLimit {
  525. break
  526. }
  527. // Retrieve the requested block body, stopping if enough was found
  528. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  529. bodies = append(bodies, data)
  530. bytes += len(data)
  531. }
  532. }
  533. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  534. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  535. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  536. case BlockBodiesMsg:
  537. if pm.odr == nil {
  538. return errResp(ErrUnexpectedResponse, "")
  539. }
  540. glog.V(logger.Debug).Infof("<=== BlockBodiesMsg from peer %v", p.id)
  541. // A batch of block bodies arrived to one of our previous requests
  542. var resp struct {
  543. ReqID, BV uint64
  544. Data []*types.Body
  545. }
  546. if err := msg.Decode(&resp); err != nil {
  547. return errResp(ErrDecode, "msg %v: %v", msg, err)
  548. }
  549. p.fcServer.GotReply(resp.ReqID, resp.BV)
  550. deliverMsg = &Msg{
  551. MsgType: MsgBlockBodies,
  552. ReqID: resp.ReqID,
  553. Obj: resp.Data,
  554. }
  555. case GetCodeMsg:
  556. glog.V(logger.Debug).Infof("<=== GetCodeMsg from peer %v", p.id)
  557. // Decode the retrieval message
  558. var req struct {
  559. ReqID uint64
  560. Reqs []CodeReq
  561. }
  562. if err := msg.Decode(&req); err != nil {
  563. return errResp(ErrDecode, "msg %v: %v", msg, err)
  564. }
  565. // Gather state data until the fetch or network limits is reached
  566. var (
  567. bytes int
  568. data [][]byte
  569. )
  570. reqCnt := len(req.Reqs)
  571. if reject(uint64(reqCnt), MaxCodeFetch) {
  572. return errResp(ErrRequestRejected, "")
  573. }
  574. for _, req := range req.Reqs {
  575. // Retrieve the requested state entry, stopping if enough was found
  576. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  577. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  578. sdata := trie.Get(req.AccKey)
  579. var acc state.Account
  580. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  581. entry, _ := pm.chainDb.Get(acc.CodeHash)
  582. if bytes+len(entry) >= softResponseLimit {
  583. break
  584. }
  585. data = append(data, entry)
  586. bytes += len(entry)
  587. }
  588. }
  589. }
  590. }
  591. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  592. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  593. return p.SendCode(req.ReqID, bv, data)
  594. case CodeMsg:
  595. if pm.odr == nil {
  596. return errResp(ErrUnexpectedResponse, "")
  597. }
  598. glog.V(logger.Debug).Infof("<=== CodeMsg from peer %v", p.id)
  599. // A batch of node state data arrived to one of our previous requests
  600. var resp struct {
  601. ReqID, BV uint64
  602. Data [][]byte
  603. }
  604. if err := msg.Decode(&resp); err != nil {
  605. return errResp(ErrDecode, "msg %v: %v", msg, err)
  606. }
  607. p.fcServer.GotReply(resp.ReqID, resp.BV)
  608. deliverMsg = &Msg{
  609. MsgType: MsgCode,
  610. ReqID: resp.ReqID,
  611. Obj: resp.Data,
  612. }
  613. case GetReceiptsMsg:
  614. glog.V(logger.Debug).Infof("<=== GetReceiptsMsg from peer %v", p.id)
  615. // Decode the retrieval message
  616. var req struct {
  617. ReqID uint64
  618. Hashes []common.Hash
  619. }
  620. if err := msg.Decode(&req); err != nil {
  621. return errResp(ErrDecode, "msg %v: %v", msg, err)
  622. }
  623. // Gather state data until the fetch or network limits is reached
  624. var (
  625. bytes int
  626. receipts []rlp.RawValue
  627. )
  628. reqCnt := len(req.Hashes)
  629. if reject(uint64(reqCnt), MaxReceiptFetch) {
  630. return errResp(ErrRequestRejected, "")
  631. }
  632. for _, hash := range req.Hashes {
  633. if bytes >= softResponseLimit {
  634. break
  635. }
  636. // Retrieve the requested block's receipts, skipping if unknown to us
  637. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  638. if results == nil {
  639. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  640. continue
  641. }
  642. }
  643. // If known, encode and queue for response packet
  644. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  645. glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
  646. } else {
  647. receipts = append(receipts, encoded)
  648. bytes += len(encoded)
  649. }
  650. }
  651. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  652. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  653. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  654. case ReceiptsMsg:
  655. if pm.odr == nil {
  656. return errResp(ErrUnexpectedResponse, "")
  657. }
  658. glog.V(logger.Debug).Infof("<=== ReceiptsMsg from peer %v", p.id)
  659. // A batch of receipts arrived to one of our previous requests
  660. var resp struct {
  661. ReqID, BV uint64
  662. Receipts []types.Receipts
  663. }
  664. if err := msg.Decode(&resp); err != nil {
  665. return errResp(ErrDecode, "msg %v: %v", msg, err)
  666. }
  667. p.fcServer.GotReply(resp.ReqID, resp.BV)
  668. deliverMsg = &Msg{
  669. MsgType: MsgReceipts,
  670. ReqID: resp.ReqID,
  671. Obj: resp.Receipts,
  672. }
  673. case GetProofsMsg:
  674. glog.V(logger.Debug).Infof("<=== GetProofsMsg from peer %v", p.id)
  675. // Decode the retrieval message
  676. var req struct {
  677. ReqID uint64
  678. Reqs []ProofReq
  679. }
  680. if err := msg.Decode(&req); err != nil {
  681. return errResp(ErrDecode, "msg %v: %v", msg, err)
  682. }
  683. // Gather state data until the fetch or network limits is reached
  684. var (
  685. bytes int
  686. proofs proofsData
  687. )
  688. reqCnt := len(req.Reqs)
  689. if reject(uint64(reqCnt), MaxProofsFetch) {
  690. return errResp(ErrRequestRejected, "")
  691. }
  692. for _, req := range req.Reqs {
  693. if bytes >= softResponseLimit {
  694. break
  695. }
  696. // Retrieve the requested state entry, stopping if enough was found
  697. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  698. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  699. if len(req.AccKey) > 0 {
  700. sdata := tr.Get(req.AccKey)
  701. tr = nil
  702. var acc state.Account
  703. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  704. tr, _ = trie.New(acc.Root, pm.chainDb)
  705. }
  706. }
  707. if tr != nil {
  708. proof := tr.Prove(req.Key)
  709. proofs = append(proofs, proof)
  710. bytes += len(proof)
  711. }
  712. }
  713. }
  714. }
  715. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  716. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  717. return p.SendProofs(req.ReqID, bv, proofs)
  718. case ProofsMsg:
  719. if pm.odr == nil {
  720. return errResp(ErrUnexpectedResponse, "")
  721. }
  722. glog.V(logger.Debug).Infof("<=== ProofsMsg from peer %v", p.id)
  723. // A batch of merkle proofs arrived to one of our previous requests
  724. var resp struct {
  725. ReqID, BV uint64
  726. Data [][]rlp.RawValue
  727. }
  728. if err := msg.Decode(&resp); err != nil {
  729. return errResp(ErrDecode, "msg %v: %v", msg, err)
  730. }
  731. p.fcServer.GotReply(resp.ReqID, resp.BV)
  732. deliverMsg = &Msg{
  733. MsgType: MsgProofs,
  734. ReqID: resp.ReqID,
  735. Obj: resp.Data,
  736. }
  737. case GetHeaderProofsMsg:
  738. glog.V(logger.Debug).Infof("<=== GetHeaderProofsMsg from peer %v", p.id)
  739. // Decode the retrieval message
  740. var req struct {
  741. ReqID uint64
  742. Reqs []ChtReq
  743. }
  744. if err := msg.Decode(&req); err != nil {
  745. return errResp(ErrDecode, "msg %v: %v", msg, err)
  746. }
  747. // Gather state data until the fetch or network limits is reached
  748. var (
  749. bytes int
  750. proofs []ChtResp
  751. )
  752. reqCnt := len(req.Reqs)
  753. if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
  754. return errResp(ErrRequestRejected, "")
  755. }
  756. for _, req := range req.Reqs {
  757. if bytes >= softResponseLimit {
  758. break
  759. }
  760. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  761. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  762. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  763. var encNumber [8]byte
  764. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  765. proof := tr.Prove(encNumber[:])
  766. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  767. bytes += len(proof) + estHeaderRlpSize
  768. }
  769. }
  770. }
  771. }
  772. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  773. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  774. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  775. case HeaderProofsMsg:
  776. if pm.odr == nil {
  777. return errResp(ErrUnexpectedResponse, "")
  778. }
  779. glog.V(logger.Debug).Infof("<=== HeaderProofsMsg from peer %v", p.id)
  780. var resp struct {
  781. ReqID, BV uint64
  782. Data []ChtResp
  783. }
  784. if err := msg.Decode(&resp); err != nil {
  785. return errResp(ErrDecode, "msg %v: %v", msg, err)
  786. }
  787. p.fcServer.GotReply(resp.ReqID, resp.BV)
  788. deliverMsg = &Msg{
  789. MsgType: MsgHeaderProofs,
  790. ReqID: resp.ReqID,
  791. Obj: resp.Data,
  792. }
  793. case SendTxMsg:
  794. if pm.txpool == nil {
  795. return errResp(ErrUnexpectedResponse, "")
  796. }
  797. // Transactions arrived, parse all of them and deliver to the pool
  798. var txs []*types.Transaction
  799. if err := msg.Decode(&txs); err != nil {
  800. return errResp(ErrDecode, "msg %v: %v", msg, err)
  801. }
  802. reqCnt := len(txs)
  803. if reject(uint64(reqCnt), MaxTxSend) {
  804. return errResp(ErrRequestRejected, "")
  805. }
  806. if err := pm.txpool.AddBatch(txs); err != nil {
  807. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  808. }
  809. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  810. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  811. default:
  812. glog.V(logger.Debug).Infof("<=== unknown message with code %d from peer %v", msg.Code, p.id)
  813. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  814. }
  815. if deliverMsg != nil {
  816. return pm.odr.Deliver(p, deliverMsg)
  817. }
  818. return nil
  819. }
  820. // NodeInfo retrieves some protocol metadata about the running host node.
  821. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  822. return &eth.EthNodeInfo{
  823. Network: self.networkId,
  824. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  825. Genesis: self.blockchain.Genesis().Hash(),
  826. Head: self.blockchain.LastBlockHash(),
  827. }
  828. }