handler.go 28 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/state"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/eth"
  30. "github.com/ethereum/go-ethereum/eth/downloader"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/logger"
  34. "github.com/ethereum/go-ethereum/logger/glog"
  35. "github.com/ethereum/go-ethereum/p2p"
  36. "github.com/ethereum/go-ethereum/p2p/discover"
  37. "github.com/ethereum/go-ethereum/p2p/discv5"
  38. "github.com/ethereum/go-ethereum/params"
  39. "github.com/ethereum/go-ethereum/pow"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 63 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. disableClientRemovePeer = false
  55. )
  56. // errIncompatibleConfig is returned if the requested protocols and configs are
  57. // not compatible (low protocol version restrictions and high requirements).
  58. var errIncompatibleConfig = errors.New("incompatible configuration")
  59. func errResp(code errCode, format string, v ...interface{}) error {
  60. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  61. }
  62. type hashFetcherFn func(common.Hash) error
  63. type BlockChain interface {
  64. HasHeader(hash common.Hash) bool
  65. GetHeader(hash common.Hash, number uint64) *types.Header
  66. GetHeaderByHash(hash common.Hash) *types.Header
  67. CurrentHeader() *types.Header
  68. GetTdByHash(hash common.Hash) *big.Int
  69. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  70. Rollback(chain []common.Hash)
  71. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  72. GetHeaderByNumber(number uint64) *types.Header
  73. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  74. LastBlockHash() common.Hash
  75. Genesis() *types.Block
  76. }
  77. type txPool interface {
  78. // AddTransactions should add the given transactions to the pool.
  79. AddBatch([]*types.Transaction) error
  80. }
  81. type ProtocolManager struct {
  82. lightSync bool
  83. txpool txPool
  84. txrelay *LesTxRelay
  85. networkId int
  86. chainConfig *params.ChainConfig
  87. blockchain BlockChain
  88. chainDb ethdb.Database
  89. odr *LesOdr
  90. server *LesServer
  91. serverPool *serverPool
  92. downloader *downloader.Downloader
  93. fetcher *lightFetcher
  94. peers *peerSet
  95. SubProtocols []p2p.Protocol
  96. eventMux *event.TypeMux
  97. // channels for fetcher, syncer, txsyncLoop
  98. newPeerCh chan *peer
  99. quitSync chan struct{}
  100. noMorePeers chan struct{}
  101. syncMu sync.Mutex
  102. syncing bool
  103. syncDone chan struct{}
  104. // wait group is used for graceful shutdowns during downloading
  105. // and processing
  106. wg sync.WaitGroup
  107. }
  108. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  109. // with the ethereum network.
  110. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, pow pow.PoW, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
  111. // Create the protocol manager with the base fields
  112. manager := &ProtocolManager{
  113. lightSync: lightSync,
  114. eventMux: mux,
  115. blockchain: blockchain,
  116. chainConfig: chainConfig,
  117. chainDb: chainDb,
  118. networkId: networkId,
  119. txpool: txpool,
  120. txrelay: txrelay,
  121. odr: odr,
  122. peers: newPeerSet(),
  123. newPeerCh: make(chan *peer),
  124. quitSync: make(chan struct{}),
  125. noMorePeers: make(chan struct{}),
  126. }
  127. // Initiate a sub-protocol for every implemented version we can handle
  128. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  129. for i, version := range ProtocolVersions {
  130. // Compatible, initialize the sub-protocol
  131. version := version // Closure for the run
  132. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  133. Name: "les",
  134. Version: version,
  135. Length: ProtocolLengths[i],
  136. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  137. var entry *poolEntry
  138. peer := manager.newPeer(int(version), networkId, p, rw)
  139. if manager.serverPool != nil {
  140. addr := p.RemoteAddr().(*net.TCPAddr)
  141. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  142. if entry == nil {
  143. return fmt.Errorf("unwanted connection")
  144. }
  145. }
  146. peer.poolEntry = entry
  147. select {
  148. case manager.newPeerCh <- peer:
  149. manager.wg.Add(1)
  150. defer manager.wg.Done()
  151. err := manager.handle(peer)
  152. if entry != nil {
  153. manager.serverPool.disconnect(entry)
  154. }
  155. return err
  156. case <-manager.quitSync:
  157. if entry != nil {
  158. manager.serverPool.disconnect(entry)
  159. }
  160. return p2p.DiscQuitting
  161. }
  162. },
  163. NodeInfo: func() interface{} {
  164. return manager.NodeInfo()
  165. },
  166. PeerInfo: func(id discover.NodeID) interface{} {
  167. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  168. return p.Info()
  169. }
  170. return nil
  171. },
  172. })
  173. }
  174. if len(manager.SubProtocols) == 0 {
  175. return nil, errIncompatibleConfig
  176. }
  177. removePeer := manager.removePeer
  178. if disableClientRemovePeer {
  179. removePeer = func(id string) {}
  180. }
  181. if lightSync {
  182. glog.V(logger.Debug).Infof("LES: create downloader")
  183. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
  184. nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
  185. blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
  186. }
  187. if odr != nil {
  188. odr.removePeer = removePeer
  189. }
  190. /*validator := func(block *types.Block, parent *types.Block) error {
  191. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  192. }
  193. heighter := func() uint64 {
  194. return chainman.LastBlockNumberU64()
  195. }
  196. manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
  197. */
  198. return manager, nil
  199. }
  200. func (pm *ProtocolManager) removePeer(id string) {
  201. // Short circuit if the peer was already removed
  202. peer := pm.peers.Peer(id)
  203. if peer == nil {
  204. return
  205. }
  206. glog.V(logger.Debug).Infoln("Removing peer", id)
  207. // Unregister the peer from the downloader and Ethereum peer set
  208. glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
  209. if pm.lightSync {
  210. pm.downloader.UnregisterPeer(id)
  211. if pm.txrelay != nil {
  212. pm.txrelay.removePeer(id)
  213. }
  214. if pm.fetcher != nil {
  215. pm.fetcher.removePeer(peer)
  216. }
  217. }
  218. if err := pm.peers.Unregister(id); err != nil {
  219. glog.V(logger.Error).Infoln("Removal failed:", err)
  220. }
  221. // Hard disconnect at the networking layer
  222. if peer != nil {
  223. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  224. }
  225. }
  226. func (pm *ProtocolManager) Start(srvr *p2p.Server) {
  227. var topicDisc *discv5.Network
  228. if srvr != nil {
  229. topicDisc = srvr.DiscV5
  230. }
  231. lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
  232. if pm.lightSync {
  233. // start sync handler
  234. if srvr != nil { // srvr is nil during testing
  235. pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
  236. pm.odr.serverPool = pm.serverPool
  237. pm.fetcher = newLightFetcher(pm)
  238. }
  239. go pm.syncer()
  240. } else {
  241. if topicDisc != nil {
  242. go func() {
  243. glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic))
  244. topicDisc.RegisterTopic(lesTopic, pm.quitSync)
  245. glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic))
  246. }()
  247. }
  248. go func() {
  249. for range pm.newPeerCh {
  250. }
  251. }()
  252. }
  253. }
  254. func (pm *ProtocolManager) Stop() {
  255. // Showing a log message. During download / process this could actually
  256. // take between 5 to 10 seconds and therefor feedback is required.
  257. glog.V(logger.Info).Infoln("Stopping light ethereum protocol handler...")
  258. // Quit the sync loop.
  259. // After this send has completed, no new peers will be accepted.
  260. pm.noMorePeers <- struct{}{}
  261. close(pm.quitSync) // quits syncer, fetcher
  262. // Disconnect existing sessions.
  263. // This also closes the gate for any new registrations on the peer set.
  264. // sessions which are already established but not added to pm.peers yet
  265. // will exit when they try to register.
  266. pm.peers.Close()
  267. // Wait for any process action
  268. pm.wg.Wait()
  269. glog.V(logger.Info).Infoln("Light ethereum protocol handler stopped")
  270. }
  271. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  272. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  273. }
  274. // handle is the callback invoked to manage the life cycle of a les peer. When
  275. // this function terminates, the peer is disconnected.
  276. func (pm *ProtocolManager) handle(p *peer) error {
  277. glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
  278. // Execute the LES handshake
  279. td, head, genesis := pm.blockchain.Status()
  280. headNum := core.GetBlockNumber(pm.chainDb, head)
  281. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  282. glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
  283. return err
  284. }
  285. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  286. rw.Init(p.version)
  287. }
  288. // Register the peer locally
  289. glog.V(logger.Detail).Infof("%v: adding peer", p)
  290. if err := pm.peers.Register(p); err != nil {
  291. glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
  292. return err
  293. }
  294. defer func() {
  295. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  296. p.fcClient.Remove(pm.server.fcManager)
  297. }
  298. pm.removePeer(p.id)
  299. }()
  300. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  301. glog.V(logger.Debug).Infof("LES: register peer %v", p.id)
  302. if pm.lightSync {
  303. requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
  304. reqID := getNextReqID()
  305. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  306. p.fcServer.SendRequest(reqID, cost)
  307. return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
  308. }
  309. requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
  310. reqID := getNextReqID()
  311. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  312. p.fcServer.SendRequest(reqID, cost)
  313. return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
  314. }
  315. if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
  316. requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
  317. return err
  318. }
  319. if pm.txrelay != nil {
  320. pm.txrelay.addPeer(p)
  321. }
  322. p.lock.Lock()
  323. head := p.headInfo
  324. p.lock.Unlock()
  325. if pm.fetcher != nil {
  326. pm.fetcher.addPeer(p)
  327. pm.fetcher.announce(p, head)
  328. }
  329. if p.poolEntry != nil {
  330. pm.serverPool.registered(p.poolEntry)
  331. }
  332. }
  333. stop := make(chan struct{})
  334. defer close(stop)
  335. go func() {
  336. // new block announce loop
  337. for {
  338. select {
  339. case announce := <-p.announceChn:
  340. p.SendAnnounce(announce)
  341. case <-stop:
  342. return
  343. }
  344. }
  345. }()
  346. // main loop. handle incoming messages.
  347. for {
  348. if err := pm.handleMsg(p); err != nil {
  349. glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
  350. return err
  351. }
  352. }
  353. }
  354. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  355. // handleMsg is invoked whenever an inbound message is received from a remote
  356. // peer. The remote connection is torn down upon returning any error.
  357. func (pm *ProtocolManager) handleMsg(p *peer) error {
  358. // Read the next message from the remote peer, and ensure it's fully consumed
  359. msg, err := p.rw.ReadMsg()
  360. if err != nil {
  361. return err
  362. }
  363. var costs *requestCosts
  364. var reqCnt, maxReqs int
  365. glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size)
  366. if rc, ok := p.fcCosts[msg.Code]; ok { // check if msg is a supported request type
  367. costs = rc
  368. if p.fcClient == nil {
  369. return errResp(ErrRequestRejected, "")
  370. }
  371. bv, ok := p.fcClient.AcceptRequest()
  372. if !ok || bv < costs.baseCost {
  373. return errResp(ErrRequestRejected, "")
  374. }
  375. maxReqs = 10000
  376. if bv < pm.server.defParams.BufLimit {
  377. d := bv - costs.baseCost
  378. if d/10000 < costs.reqCost {
  379. maxReqs = int(d / costs.reqCost)
  380. }
  381. }
  382. }
  383. if msg.Size > ProtocolMaxMsgSize {
  384. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  385. }
  386. defer msg.Discard()
  387. var deliverMsg *Msg
  388. // Handle the message depending on its contents
  389. switch msg.Code {
  390. case StatusMsg:
  391. glog.V(logger.Debug).Infof("<=== StatusMsg from peer %v", p.id)
  392. // Status messages should never arrive after the handshake
  393. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  394. // Block header query, collect the requested headers and reply
  395. case AnnounceMsg:
  396. glog.V(logger.Debug).Infoln("<=== AnnounceMsg from peer %v:", p.id)
  397. var req announceData
  398. if err := msg.Decode(&req); err != nil {
  399. return errResp(ErrDecode, "%v: %v", msg, err)
  400. }
  401. glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
  402. if pm.fetcher != nil {
  403. go pm.fetcher.announce(p, &req)
  404. }
  405. case GetBlockHeadersMsg:
  406. glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
  407. // Decode the complex header query
  408. var req struct {
  409. ReqID uint64
  410. Query getBlockHeadersData
  411. }
  412. if err := msg.Decode(&req); err != nil {
  413. return errResp(ErrDecode, "%v: %v", msg, err)
  414. }
  415. query := req.Query
  416. if query.Amount > uint64(maxReqs) || query.Amount > MaxHeaderFetch {
  417. return errResp(ErrRequestRejected, "")
  418. }
  419. hashMode := query.Origin.Hash != (common.Hash{})
  420. // Gather headers until the fetch or network limits is reached
  421. var (
  422. bytes common.StorageSize
  423. headers []*types.Header
  424. unknown bool
  425. )
  426. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  427. // Retrieve the next header satisfying the query
  428. var origin *types.Header
  429. if hashMode {
  430. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  431. } else {
  432. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  433. }
  434. if origin == nil {
  435. break
  436. }
  437. number := origin.Number.Uint64()
  438. headers = append(headers, origin)
  439. bytes += estHeaderRlpSize
  440. // Advance to the next header of the query
  441. switch {
  442. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  443. // Hash based traversal towards the genesis block
  444. for i := 0; i < int(query.Skip)+1; i++ {
  445. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  446. query.Origin.Hash = header.ParentHash
  447. number--
  448. } else {
  449. unknown = true
  450. break
  451. }
  452. }
  453. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  454. // Hash based traversal towards the leaf block
  455. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  456. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  457. query.Origin.Hash = header.Hash()
  458. } else {
  459. unknown = true
  460. }
  461. } else {
  462. unknown = true
  463. }
  464. case query.Reverse:
  465. // Number based traversal towards the genesis block
  466. if query.Origin.Number >= query.Skip+1 {
  467. query.Origin.Number -= (query.Skip + 1)
  468. } else {
  469. unknown = true
  470. }
  471. case !query.Reverse:
  472. // Number based traversal towards the leaf block
  473. query.Origin.Number += (query.Skip + 1)
  474. }
  475. }
  476. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  477. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  478. return p.SendBlockHeaders(req.ReqID, bv, headers)
  479. case BlockHeadersMsg:
  480. if pm.downloader == nil {
  481. return errResp(ErrUnexpectedResponse, "")
  482. }
  483. glog.V(logger.Debug).Infof("<=== BlockHeadersMsg from peer %v", p.id)
  484. // A batch of headers arrived to one of our previous requests
  485. var resp struct {
  486. ReqID, BV uint64
  487. Headers []*types.Header
  488. }
  489. if err := msg.Decode(&resp); err != nil {
  490. return errResp(ErrDecode, "msg %v: %v", msg, err)
  491. }
  492. p.fcServer.GotReply(resp.ReqID, resp.BV)
  493. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  494. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  495. } else {
  496. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  497. if err != nil {
  498. glog.V(logger.Debug).Infoln(err)
  499. }
  500. }
  501. case GetBlockBodiesMsg:
  502. glog.V(logger.Debug).Infof("<=== GetBlockBodiesMsg from peer %v", p.id)
  503. // Decode the retrieval message
  504. var req struct {
  505. ReqID uint64
  506. Hashes []common.Hash
  507. }
  508. if err := msg.Decode(&req); err != nil {
  509. return errResp(ErrDecode, "msg %v: %v", msg, err)
  510. }
  511. // Gather blocks until the fetch or network limits is reached
  512. var (
  513. bytes int
  514. bodies []rlp.RawValue
  515. )
  516. reqCnt = len(req.Hashes)
  517. if reqCnt > maxReqs || reqCnt > MaxBodyFetch {
  518. return errResp(ErrRequestRejected, "")
  519. }
  520. for _, hash := range req.Hashes {
  521. if bytes >= softResponseLimit {
  522. break
  523. }
  524. // Retrieve the requested block body, stopping if enough was found
  525. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  526. bodies = append(bodies, data)
  527. bytes += len(data)
  528. }
  529. }
  530. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  531. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  532. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  533. case BlockBodiesMsg:
  534. if pm.odr == nil {
  535. return errResp(ErrUnexpectedResponse, "")
  536. }
  537. glog.V(logger.Debug).Infof("<=== BlockBodiesMsg from peer %v", p.id)
  538. // A batch of block bodies arrived to one of our previous requests
  539. var resp struct {
  540. ReqID, BV uint64
  541. Data []*types.Body
  542. }
  543. if err := msg.Decode(&resp); err != nil {
  544. return errResp(ErrDecode, "msg %v: %v", msg, err)
  545. }
  546. p.fcServer.GotReply(resp.ReqID, resp.BV)
  547. deliverMsg = &Msg{
  548. MsgType: MsgBlockBodies,
  549. ReqID: resp.ReqID,
  550. Obj: resp.Data,
  551. }
  552. case GetCodeMsg:
  553. glog.V(logger.Debug).Infof("<=== GetCodeMsg from peer %v", p.id)
  554. // Decode the retrieval message
  555. var req struct {
  556. ReqID uint64
  557. Reqs []CodeReq
  558. }
  559. if err := msg.Decode(&req); err != nil {
  560. return errResp(ErrDecode, "msg %v: %v", msg, err)
  561. }
  562. // Gather state data until the fetch or network limits is reached
  563. var (
  564. bytes int
  565. data [][]byte
  566. )
  567. reqCnt = len(req.Reqs)
  568. if reqCnt > maxReqs || reqCnt > MaxCodeFetch {
  569. return errResp(ErrRequestRejected, "")
  570. }
  571. for _, req := range req.Reqs {
  572. // Retrieve the requested state entry, stopping if enough was found
  573. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  574. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  575. sdata := trie.Get(req.AccKey)
  576. var acc state.Account
  577. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  578. entry, _ := pm.chainDb.Get(acc.CodeHash)
  579. if bytes+len(entry) >= softResponseLimit {
  580. break
  581. }
  582. data = append(data, entry)
  583. bytes += len(entry)
  584. }
  585. }
  586. }
  587. }
  588. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  589. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  590. return p.SendCode(req.ReqID, bv, data)
  591. case CodeMsg:
  592. if pm.odr == nil {
  593. return errResp(ErrUnexpectedResponse, "")
  594. }
  595. glog.V(logger.Debug).Infof("<=== CodeMsg from peer %v", p.id)
  596. // A batch of node state data arrived to one of our previous requests
  597. var resp struct {
  598. ReqID, BV uint64
  599. Data [][]byte
  600. }
  601. if err := msg.Decode(&resp); err != nil {
  602. return errResp(ErrDecode, "msg %v: %v", msg, err)
  603. }
  604. p.fcServer.GotReply(resp.ReqID, resp.BV)
  605. deliverMsg = &Msg{
  606. MsgType: MsgCode,
  607. ReqID: resp.ReqID,
  608. Obj: resp.Data,
  609. }
  610. case GetReceiptsMsg:
  611. glog.V(logger.Debug).Infof("<=== GetReceiptsMsg from peer %v", p.id)
  612. // Decode the retrieval message
  613. var req struct {
  614. ReqID uint64
  615. Hashes []common.Hash
  616. }
  617. if err := msg.Decode(&req); err != nil {
  618. return errResp(ErrDecode, "msg %v: %v", msg, err)
  619. }
  620. // Gather state data until the fetch or network limits is reached
  621. var (
  622. bytes int
  623. receipts []rlp.RawValue
  624. )
  625. reqCnt = len(req.Hashes)
  626. if reqCnt > maxReqs || reqCnt > MaxReceiptFetch {
  627. return errResp(ErrRequestRejected, "")
  628. }
  629. for _, hash := range req.Hashes {
  630. if bytes >= softResponseLimit {
  631. break
  632. }
  633. // Retrieve the requested block's receipts, skipping if unknown to us
  634. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  635. if results == nil {
  636. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  637. continue
  638. }
  639. }
  640. // If known, encode and queue for response packet
  641. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  642. glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
  643. } else {
  644. receipts = append(receipts, encoded)
  645. bytes += len(encoded)
  646. }
  647. }
  648. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  649. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  650. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  651. case ReceiptsMsg:
  652. if pm.odr == nil {
  653. return errResp(ErrUnexpectedResponse, "")
  654. }
  655. glog.V(logger.Debug).Infof("<=== ReceiptsMsg from peer %v", p.id)
  656. // A batch of receipts arrived to one of our previous requests
  657. var resp struct {
  658. ReqID, BV uint64
  659. Receipts []types.Receipts
  660. }
  661. if err := msg.Decode(&resp); err != nil {
  662. return errResp(ErrDecode, "msg %v: %v", msg, err)
  663. }
  664. p.fcServer.GotReply(resp.ReqID, resp.BV)
  665. deliverMsg = &Msg{
  666. MsgType: MsgReceipts,
  667. ReqID: resp.ReqID,
  668. Obj: resp.Receipts,
  669. }
  670. case GetProofsMsg:
  671. glog.V(logger.Debug).Infof("<=== GetProofsMsg from peer %v", p.id)
  672. // Decode the retrieval message
  673. var req struct {
  674. ReqID uint64
  675. Reqs []ProofReq
  676. }
  677. if err := msg.Decode(&req); err != nil {
  678. return errResp(ErrDecode, "msg %v: %v", msg, err)
  679. }
  680. // Gather state data until the fetch or network limits is reached
  681. var (
  682. bytes int
  683. proofs proofsData
  684. )
  685. reqCnt = len(req.Reqs)
  686. if reqCnt > maxReqs || reqCnt > MaxProofsFetch {
  687. return errResp(ErrRequestRejected, "")
  688. }
  689. for _, req := range req.Reqs {
  690. if bytes >= softResponseLimit {
  691. break
  692. }
  693. // Retrieve the requested state entry, stopping if enough was found
  694. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  695. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  696. if len(req.AccKey) > 0 {
  697. sdata := tr.Get(req.AccKey)
  698. tr = nil
  699. var acc state.Account
  700. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  701. tr, _ = trie.New(acc.Root, pm.chainDb)
  702. }
  703. }
  704. if tr != nil {
  705. proof := tr.Prove(req.Key)
  706. proofs = append(proofs, proof)
  707. bytes += len(proof)
  708. }
  709. }
  710. }
  711. }
  712. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  713. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  714. return p.SendProofs(req.ReqID, bv, proofs)
  715. case ProofsMsg:
  716. if pm.odr == nil {
  717. return errResp(ErrUnexpectedResponse, "")
  718. }
  719. glog.V(logger.Debug).Infof("<=== ProofsMsg from peer %v", p.id)
  720. // A batch of merkle proofs arrived to one of our previous requests
  721. var resp struct {
  722. ReqID, BV uint64
  723. Data [][]rlp.RawValue
  724. }
  725. if err := msg.Decode(&resp); err != nil {
  726. return errResp(ErrDecode, "msg %v: %v", msg, err)
  727. }
  728. p.fcServer.GotReply(resp.ReqID, resp.BV)
  729. deliverMsg = &Msg{
  730. MsgType: MsgProofs,
  731. ReqID: resp.ReqID,
  732. Obj: resp.Data,
  733. }
  734. case GetHeaderProofsMsg:
  735. glog.V(logger.Debug).Infof("<=== GetHeaderProofsMsg from peer %v", p.id)
  736. // Decode the retrieval message
  737. var req struct {
  738. ReqID uint64
  739. Reqs []ChtReq
  740. }
  741. if err := msg.Decode(&req); err != nil {
  742. return errResp(ErrDecode, "msg %v: %v", msg, err)
  743. }
  744. // Gather state data until the fetch or network limits is reached
  745. var (
  746. bytes int
  747. proofs []ChtResp
  748. )
  749. reqCnt = len(req.Reqs)
  750. if reqCnt > maxReqs || reqCnt > MaxHeaderProofsFetch {
  751. return errResp(ErrRequestRejected, "")
  752. }
  753. for _, req := range req.Reqs {
  754. if bytes >= softResponseLimit {
  755. break
  756. }
  757. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  758. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  759. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  760. var encNumber [8]byte
  761. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  762. proof := tr.Prove(encNumber[:])
  763. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  764. bytes += len(proof) + estHeaderRlpSize
  765. }
  766. }
  767. }
  768. }
  769. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  770. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  771. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  772. case HeaderProofsMsg:
  773. if pm.odr == nil {
  774. return errResp(ErrUnexpectedResponse, "")
  775. }
  776. glog.V(logger.Debug).Infof("<=== HeaderProofsMsg from peer %v", p.id)
  777. var resp struct {
  778. ReqID, BV uint64
  779. Data []ChtResp
  780. }
  781. if err := msg.Decode(&resp); err != nil {
  782. return errResp(ErrDecode, "msg %v: %v", msg, err)
  783. }
  784. p.fcServer.GotReply(resp.ReqID, resp.BV)
  785. deliverMsg = &Msg{
  786. MsgType: MsgHeaderProofs,
  787. ReqID: resp.ReqID,
  788. Obj: resp.Data,
  789. }
  790. case SendTxMsg:
  791. if pm.txpool == nil {
  792. return errResp(ErrUnexpectedResponse, "")
  793. }
  794. // Transactions arrived, parse all of them and deliver to the pool
  795. var txs []*types.Transaction
  796. if err := msg.Decode(&txs); err != nil {
  797. return errResp(ErrDecode, "msg %v: %v", msg, err)
  798. }
  799. reqCnt = len(txs)
  800. if reqCnt > maxReqs || reqCnt > MaxTxSend {
  801. return errResp(ErrRequestRejected, "")
  802. }
  803. if err := pm.txpool.AddBatch(txs); err != nil {
  804. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  805. }
  806. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  807. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  808. default:
  809. glog.V(logger.Debug).Infof("<=== unknown message with code %d from peer %v", msg.Code, p.id)
  810. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  811. }
  812. if deliverMsg != nil {
  813. return pm.odr.Deliver(p, deliverMsg)
  814. }
  815. return nil
  816. }
  817. // NodeInfo retrieves some protocol metadata about the running host node.
  818. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  819. return &eth.EthNodeInfo{
  820. Network: self.networkId,
  821. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  822. Genesis: self.blockchain.Genesis().Hash(),
  823. Head: self.blockchain.LastBlockHash(),
  824. }
  825. }