handler.go 27 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/state"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/eth"
  30. "github.com/ethereum/go-ethereum/eth/downloader"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/logger"
  34. "github.com/ethereum/go-ethereum/logger/glog"
  35. "github.com/ethereum/go-ethereum/p2p"
  36. "github.com/ethereum/go-ethereum/p2p/discover"
  37. "github.com/ethereum/go-ethereum/p2p/discv5"
  38. "github.com/ethereum/go-ethereum/pow"
  39. "github.com/ethereum/go-ethereum/rlp"
  40. "github.com/ethereum/go-ethereum/trie"
  41. )
  42. const (
  43. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  44. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  45. ethVersion = 63 // equivalent eth version for the downloader
  46. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  47. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  48. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  49. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  50. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  51. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxTxSend = 64 // Amount of transactions to be send per request
  53. disableClientRemovePeer = true
  54. )
  55. // errIncompatibleConfig is returned if the requested protocols and configs are
  56. // not compatible (low protocol version restrictions and high requirements).
  57. var errIncompatibleConfig = errors.New("incompatible configuration")
  58. func errResp(code errCode, format string, v ...interface{}) error {
  59. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  60. }
  61. type hashFetcherFn func(common.Hash) error
  62. type BlockChain interface {
  63. HasHeader(hash common.Hash) bool
  64. GetHeader(hash common.Hash, number uint64) *types.Header
  65. GetHeaderByHash(hash common.Hash) *types.Header
  66. CurrentHeader() *types.Header
  67. GetTdByHash(hash common.Hash) *big.Int
  68. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  69. Rollback(chain []common.Hash)
  70. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  71. GetHeaderByNumber(number uint64) *types.Header
  72. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  73. LastBlockHash() common.Hash
  74. Genesis() *types.Block
  75. }
  76. type txPool interface {
  77. // AddTransactions should add the given transactions to the pool.
  78. AddBatch([]*types.Transaction)
  79. }
  80. type ProtocolManager struct {
  81. lightSync bool
  82. txpool txPool
  83. txrelay *LesTxRelay
  84. networkId int
  85. chainConfig *core.ChainConfig
  86. blockchain BlockChain
  87. chainDb ethdb.Database
  88. odr *LesOdr
  89. server *LesServer
  90. topicDisc *discv5.Network
  91. lesTopic discv5.Topic
  92. p2pServer *p2p.Server
  93. downloader *downloader.Downloader
  94. fetcher *lightFetcher
  95. peers *peerSet
  96. SubProtocols []p2p.Protocol
  97. eventMux *event.TypeMux
  98. // channels for fetcher, syncer, txsyncLoop
  99. newPeerCh chan *peer
  100. quitSync chan struct{}
  101. noMorePeers chan struct{}
  102. syncMu sync.Mutex
  103. syncing bool
  104. syncDone chan struct{}
  105. // wait group is used for graceful shutdowns during downloading
  106. // and processing
  107. wg sync.WaitGroup
  108. }
  109. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  110. // with the ethereum network.
  111. func NewProtocolManager(chainConfig *core.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, pow pow.PoW, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
  112. // Create the protocol manager with the base fields
  113. manager := &ProtocolManager{
  114. lightSync: lightSync,
  115. eventMux: mux,
  116. blockchain: blockchain,
  117. chainConfig: chainConfig,
  118. chainDb: chainDb,
  119. networkId: networkId,
  120. txpool: txpool,
  121. txrelay: txrelay,
  122. odr: odr,
  123. peers: newPeerSet(),
  124. newPeerCh: make(chan *peer),
  125. quitSync: make(chan struct{}),
  126. noMorePeers: make(chan struct{}),
  127. }
  128. // Initiate a sub-protocol for every implemented version we can handle
  129. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  130. for i, version := range ProtocolVersions {
  131. // Compatible, initialize the sub-protocol
  132. version := version // Closure for the run
  133. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  134. Name: "les",
  135. Version: version,
  136. Length: ProtocolLengths[i],
  137. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  138. peer := manager.newPeer(int(version), networkId, p, rw)
  139. select {
  140. case manager.newPeerCh <- peer:
  141. manager.wg.Add(1)
  142. defer manager.wg.Done()
  143. return manager.handle(peer)
  144. case <-manager.quitSync:
  145. return p2p.DiscQuitting
  146. }
  147. },
  148. NodeInfo: func() interface{} {
  149. return manager.NodeInfo()
  150. },
  151. PeerInfo: func(id discover.NodeID) interface{} {
  152. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  153. return p.Info()
  154. }
  155. return nil
  156. },
  157. })
  158. }
  159. if len(manager.SubProtocols) == 0 {
  160. return nil, errIncompatibleConfig
  161. }
  162. removePeer := manager.removePeer
  163. if disableClientRemovePeer {
  164. removePeer = func(id string) {}
  165. }
  166. if lightSync {
  167. glog.V(logger.Debug).Infof("LES: create downloader")
  168. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
  169. nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
  170. blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
  171. manager.fetcher = newLightFetcher(manager)
  172. }
  173. if odr != nil {
  174. odr.removePeer = removePeer
  175. }
  176. /*validator := func(block *types.Block, parent *types.Block) error {
  177. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  178. }
  179. heighter := func() uint64 {
  180. return chainman.LastBlockNumberU64()
  181. }
  182. manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
  183. */
  184. return manager, nil
  185. }
  186. func (pm *ProtocolManager) removePeer(id string) {
  187. // Short circuit if the peer was already removed
  188. peer := pm.peers.Peer(id)
  189. if peer == nil {
  190. return
  191. }
  192. glog.V(logger.Debug).Infoln("Removing peer", id)
  193. // Unregister the peer from the downloader and Ethereum peer set
  194. glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
  195. if pm.lightSync {
  196. pm.downloader.UnregisterPeer(id)
  197. pm.odr.UnregisterPeer(peer)
  198. if pm.txrelay != nil {
  199. pm.txrelay.removePeer(id)
  200. }
  201. }
  202. if err := pm.peers.Unregister(id); err != nil {
  203. glog.V(logger.Error).Infoln("Removal failed:", err)
  204. }
  205. // Hard disconnect at the networking layer
  206. if peer != nil {
  207. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  208. }
  209. }
  210. func (pm *ProtocolManager) findServers() {
  211. if pm.p2pServer == nil {
  212. return
  213. }
  214. enodes := make(chan string, 100)
  215. stop := make(chan struct{})
  216. go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
  217. go func() {
  218. added := make(map[string]bool)
  219. for {
  220. select {
  221. case enode := <-enodes:
  222. if !added[enode] {
  223. fmt.Println("Found LES server:", enode)
  224. added[enode] = true
  225. if node, err := discover.ParseNode(enode); err == nil {
  226. pm.p2pServer.AddPeer(node)
  227. }
  228. }
  229. case <-stop:
  230. return
  231. }
  232. }
  233. }()
  234. time.Sleep(time.Second * 20)
  235. close(stop)
  236. }
  237. func (pm *ProtocolManager) Start(srvr *p2p.Server) {
  238. pm.p2pServer = srvr
  239. if srvr != nil {
  240. pm.topicDisc = srvr.DiscV5
  241. }
  242. pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
  243. if pm.lightSync {
  244. // start sync handler
  245. go pm.findServers()
  246. go pm.syncer()
  247. } else {
  248. if pm.topicDisc != nil {
  249. go func() {
  250. fmt.Println("Starting topic register")
  251. pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
  252. fmt.Println("Stopped topic register")
  253. }()
  254. }
  255. go func() {
  256. for range pm.newPeerCh {
  257. }
  258. }()
  259. }
  260. }
  261. func (pm *ProtocolManager) Stop() {
  262. // Showing a log message. During download / process this could actually
  263. // take between 5 to 10 seconds and therefor feedback is required.
  264. glog.V(logger.Info).Infoln("Stopping light ethereum protocol handler...")
  265. // Quit the sync loop.
  266. // After this send has completed, no new peers will be accepted.
  267. pm.noMorePeers <- struct{}{}
  268. close(pm.quitSync) // quits syncer, fetcher
  269. // Disconnect existing sessions.
  270. // This also closes the gate for any new registrations on the peer set.
  271. // sessions which are already established but not added to pm.peers yet
  272. // will exit when they try to register.
  273. pm.peers.Close()
  274. // Wait for any process action
  275. pm.wg.Wait()
  276. glog.V(logger.Info).Infoln("Light ethereum protocol handler stopped")
  277. }
  278. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  279. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  280. }
  281. // handle is the callback invoked to manage the life cycle of a les peer. When
  282. // this function terminates, the peer is disconnected.
  283. func (pm *ProtocolManager) handle(p *peer) error {
  284. glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
  285. // Execute the LES handshake
  286. td, head, genesis := pm.blockchain.Status()
  287. headNum := core.GetBlockNumber(pm.chainDb, head)
  288. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  289. glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
  290. return err
  291. }
  292. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  293. rw.Init(p.version)
  294. }
  295. // Register the peer locally
  296. glog.V(logger.Detail).Infof("%v: adding peer", p)
  297. if err := pm.peers.Register(p); err != nil {
  298. glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
  299. return err
  300. }
  301. defer func() {
  302. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  303. p.fcClient.Remove(pm.server.fcManager)
  304. }
  305. pm.removePeer(p.id)
  306. }()
  307. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  308. glog.V(logger.Debug).Infof("LES: register peer %v", p.id)
  309. if pm.lightSync {
  310. requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
  311. reqID := pm.odr.getNextReqID()
  312. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  313. p.fcServer.SendRequest(reqID, cost)
  314. return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
  315. }
  316. requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
  317. reqID := pm.odr.getNextReqID()
  318. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  319. p.fcServer.SendRequest(reqID, cost)
  320. return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
  321. }
  322. if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
  323. requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
  324. return err
  325. }
  326. pm.odr.RegisterPeer(p)
  327. if pm.txrelay != nil {
  328. pm.txrelay.addPeer(p)
  329. }
  330. pm.fetcher.notify(p, nil)
  331. }
  332. stop := make(chan struct{})
  333. defer close(stop)
  334. go func() {
  335. // new block announce loop
  336. for {
  337. select {
  338. case announce := <-p.announceChn:
  339. p.SendAnnounce(announce)
  340. //fmt.Println(" BROADCAST sent")
  341. case <-stop:
  342. return
  343. }
  344. }
  345. }()
  346. // main loop. handle incoming messages.
  347. for {
  348. if err := pm.handleMsg(p); err != nil {
  349. glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
  350. //fmt.Println("handleMsg err:", err)
  351. return err
  352. }
  353. }
  354. }
  355. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  356. // handleMsg is invoked whenever an inbound message is received from a remote
  357. // peer. The remote connection is torn down upon returning any error.
  358. func (pm *ProtocolManager) handleMsg(p *peer) error {
  359. // Read the next message from the remote peer, and ensure it's fully consumed
  360. msg, err := p.rw.ReadMsg()
  361. if err != nil {
  362. return err
  363. }
  364. var costs *requestCosts
  365. var reqCnt, maxReqs int
  366. //fmt.Println("MSG", msg.Code, msg.Size)
  367. if rc, ok := p.fcCosts[msg.Code]; ok { // check if msg is a supported request type
  368. costs = rc
  369. if p.fcClient == nil {
  370. return errResp(ErrRequestRejected, "")
  371. }
  372. bv, ok := p.fcClient.AcceptRequest()
  373. if !ok || bv < costs.baseCost {
  374. return errResp(ErrRequestRejected, "")
  375. }
  376. maxReqs = 10000
  377. if bv < pm.server.defParams.BufLimit {
  378. d := bv - costs.baseCost
  379. if d/10000 < costs.reqCost {
  380. maxReqs = int(d / costs.reqCost)
  381. }
  382. }
  383. }
  384. if msg.Size > ProtocolMaxMsgSize {
  385. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  386. }
  387. defer msg.Discard()
  388. var deliverMsg *Msg
  389. // Handle the message depending on its contents
  390. switch msg.Code {
  391. case StatusMsg:
  392. glog.V(logger.Debug).Infof("LES: received StatusMsg from peer %v", p.id)
  393. // Status messages should never arrive after the handshake
  394. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  395. // Block header query, collect the requested headers and reply
  396. case AnnounceMsg:
  397. var req announceData
  398. if err := msg.Decode(&req); err != nil {
  399. return errResp(ErrDecode, "%v: %v", msg, err)
  400. }
  401. //fmt.Println("RECEIVED", req.Number, req.Hash, req.Td, req.ReorgDepth)
  402. pm.fetcher.notify(p, &req)
  403. case GetBlockHeadersMsg:
  404. glog.V(logger.Debug).Infof("LES: received GetBlockHeadersMsg from peer %v", p.id)
  405. // Decode the complex header query
  406. var req struct {
  407. ReqID uint64
  408. Query getBlockHeadersData
  409. }
  410. if err := msg.Decode(&req); err != nil {
  411. return errResp(ErrDecode, "%v: %v", msg, err)
  412. }
  413. query := req.Query
  414. if query.Amount > uint64(maxReqs) || query.Amount > MaxHeaderFetch {
  415. return errResp(ErrRequestRejected, "")
  416. }
  417. hashMode := query.Origin.Hash != (common.Hash{})
  418. // Gather headers until the fetch or network limits is reached
  419. var (
  420. bytes common.StorageSize
  421. headers []*types.Header
  422. unknown bool
  423. )
  424. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  425. // Retrieve the next header satisfying the query
  426. var origin *types.Header
  427. if hashMode {
  428. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  429. } else {
  430. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  431. }
  432. if origin == nil {
  433. break
  434. }
  435. number := origin.Number.Uint64()
  436. headers = append(headers, origin)
  437. bytes += estHeaderRlpSize
  438. // Advance to the next header of the query
  439. switch {
  440. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  441. // Hash based traversal towards the genesis block
  442. for i := 0; i < int(query.Skip)+1; i++ {
  443. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  444. query.Origin.Hash = header.ParentHash
  445. number--
  446. } else {
  447. unknown = true
  448. break
  449. }
  450. }
  451. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  452. // Hash based traversal towards the leaf block
  453. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  454. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  455. query.Origin.Hash = header.Hash()
  456. } else {
  457. unknown = true
  458. }
  459. } else {
  460. unknown = true
  461. }
  462. case query.Reverse:
  463. // Number based traversal towards the genesis block
  464. if query.Origin.Number >= query.Skip+1 {
  465. query.Origin.Number -= (query.Skip + 1)
  466. } else {
  467. unknown = true
  468. }
  469. case !query.Reverse:
  470. // Number based traversal towards the leaf block
  471. query.Origin.Number += (query.Skip + 1)
  472. }
  473. }
  474. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  475. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  476. return p.SendBlockHeaders(req.ReqID, bv, headers)
  477. case BlockHeadersMsg:
  478. if pm.downloader == nil {
  479. return errResp(ErrUnexpectedResponse, "")
  480. }
  481. glog.V(logger.Debug).Infof("LES: received BlockHeadersMsg from peer %v", p.id)
  482. // A batch of headers arrived to one of our previous requests
  483. var resp struct {
  484. ReqID, BV uint64
  485. Headers []*types.Header
  486. }
  487. if err := msg.Decode(&resp); err != nil {
  488. return errResp(ErrDecode, "msg %v: %v", msg, err)
  489. }
  490. p.fcServer.GotReply(resp.ReqID, resp.BV)
  491. if pm.fetcher.requestedID(resp.ReqID) {
  492. pm.fetcher.deliverHeaders(resp.ReqID, resp.Headers)
  493. } else {
  494. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  495. if err != nil {
  496. glog.V(logger.Debug).Infoln(err)
  497. }
  498. }
  499. case GetBlockBodiesMsg:
  500. glog.V(logger.Debug).Infof("LES: received GetBlockBodiesMsg from peer %v", p.id)
  501. // Decode the retrieval message
  502. var req struct {
  503. ReqID uint64
  504. Hashes []common.Hash
  505. }
  506. if err := msg.Decode(&req); err != nil {
  507. return errResp(ErrDecode, "msg %v: %v", msg, err)
  508. }
  509. // Gather blocks until the fetch or network limits is reached
  510. var (
  511. bytes int
  512. bodies []rlp.RawValue
  513. )
  514. reqCnt = len(req.Hashes)
  515. if reqCnt > maxReqs || reqCnt > MaxBodyFetch {
  516. return errResp(ErrRequestRejected, "")
  517. }
  518. for _, hash := range req.Hashes {
  519. if bytes >= softResponseLimit {
  520. break
  521. }
  522. // Retrieve the requested block body, stopping if enough was found
  523. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  524. bodies = append(bodies, data)
  525. bytes += len(data)
  526. }
  527. }
  528. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  529. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  530. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  531. case BlockBodiesMsg:
  532. if pm.odr == nil {
  533. return errResp(ErrUnexpectedResponse, "")
  534. }
  535. glog.V(logger.Debug).Infof("LES: received BlockBodiesMsg from peer %v", p.id)
  536. // A batch of block bodies arrived to one of our previous requests
  537. var resp struct {
  538. ReqID, BV uint64
  539. Data []*types.Body
  540. }
  541. if err := msg.Decode(&resp); err != nil {
  542. return errResp(ErrDecode, "msg %v: %v", msg, err)
  543. }
  544. p.fcServer.GotReply(resp.ReqID, resp.BV)
  545. deliverMsg = &Msg{
  546. MsgType: MsgBlockBodies,
  547. ReqID: resp.ReqID,
  548. Obj: resp.Data,
  549. }
  550. case GetCodeMsg:
  551. glog.V(logger.Debug).Infof("LES: received GetCodeMsg from peer %v", p.id)
  552. // Decode the retrieval message
  553. var req struct {
  554. ReqID uint64
  555. Reqs []CodeReq
  556. }
  557. if err := msg.Decode(&req); err != nil {
  558. return errResp(ErrDecode, "msg %v: %v", msg, err)
  559. }
  560. // Gather state data until the fetch or network limits is reached
  561. var (
  562. bytes int
  563. data [][]byte
  564. )
  565. reqCnt = len(req.Reqs)
  566. if reqCnt > maxReqs || reqCnt > MaxCodeFetch {
  567. return errResp(ErrRequestRejected, "")
  568. }
  569. for _, req := range req.Reqs {
  570. // Retrieve the requested state entry, stopping if enough was found
  571. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  572. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  573. sdata := trie.Get(req.AccKey)
  574. var acc state.Account
  575. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  576. entry, _ := pm.chainDb.Get(acc.CodeHash)
  577. if bytes+len(entry) >= softResponseLimit {
  578. break
  579. }
  580. data = append(data, entry)
  581. bytes += len(entry)
  582. }
  583. }
  584. }
  585. }
  586. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  587. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  588. return p.SendCode(req.ReqID, bv, data)
  589. case CodeMsg:
  590. if pm.odr == nil {
  591. return errResp(ErrUnexpectedResponse, "")
  592. }
  593. glog.V(logger.Debug).Infof("LES: received CodeMsg from peer %v", p.id)
  594. // A batch of node state data arrived to one of our previous requests
  595. var resp struct {
  596. ReqID, BV uint64
  597. Data [][]byte
  598. }
  599. if err := msg.Decode(&resp); err != nil {
  600. return errResp(ErrDecode, "msg %v: %v", msg, err)
  601. }
  602. p.fcServer.GotReply(resp.ReqID, resp.BV)
  603. deliverMsg = &Msg{
  604. MsgType: MsgCode,
  605. ReqID: resp.ReqID,
  606. Obj: resp.Data,
  607. }
  608. case GetReceiptsMsg:
  609. glog.V(logger.Debug).Infof("LES: received GetReceiptsMsg from peer %v", p.id)
  610. // Decode the retrieval message
  611. var req struct {
  612. ReqID uint64
  613. Hashes []common.Hash
  614. }
  615. if err := msg.Decode(&req); err != nil {
  616. return errResp(ErrDecode, "msg %v: %v", msg, err)
  617. }
  618. // Gather state data until the fetch or network limits is reached
  619. var (
  620. bytes int
  621. receipts []rlp.RawValue
  622. )
  623. reqCnt = len(req.Hashes)
  624. if reqCnt > maxReqs || reqCnt > MaxReceiptFetch {
  625. return errResp(ErrRequestRejected, "")
  626. }
  627. for _, hash := range req.Hashes {
  628. if bytes >= softResponseLimit {
  629. break
  630. }
  631. // Retrieve the requested block's receipts, skipping if unknown to us
  632. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  633. if results == nil {
  634. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  635. continue
  636. }
  637. }
  638. // If known, encode and queue for response packet
  639. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  640. glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
  641. } else {
  642. receipts = append(receipts, encoded)
  643. bytes += len(encoded)
  644. }
  645. }
  646. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  647. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  648. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  649. case ReceiptsMsg:
  650. if pm.odr == nil {
  651. return errResp(ErrUnexpectedResponse, "")
  652. }
  653. glog.V(logger.Debug).Infof("LES: received ReceiptsMsg from peer %v", p.id)
  654. // A batch of receipts arrived to one of our previous requests
  655. var resp struct {
  656. ReqID, BV uint64
  657. Receipts []types.Receipts
  658. }
  659. if err := msg.Decode(&resp); err != nil {
  660. return errResp(ErrDecode, "msg %v: %v", msg, err)
  661. }
  662. p.fcServer.GotReply(resp.ReqID, resp.BV)
  663. deliverMsg = &Msg{
  664. MsgType: MsgReceipts,
  665. ReqID: resp.ReqID,
  666. Obj: resp.Receipts,
  667. }
  668. case GetProofsMsg:
  669. glog.V(logger.Debug).Infof("LES: received GetProofsMsg from peer %v", p.id)
  670. // Decode the retrieval message
  671. var req struct {
  672. ReqID uint64
  673. Reqs []ProofReq
  674. }
  675. if err := msg.Decode(&req); err != nil {
  676. return errResp(ErrDecode, "msg %v: %v", msg, err)
  677. }
  678. // Gather state data until the fetch or network limits is reached
  679. var (
  680. bytes int
  681. proofs proofsData
  682. )
  683. reqCnt = len(req.Reqs)
  684. if reqCnt > maxReqs || reqCnt > MaxProofsFetch {
  685. return errResp(ErrRequestRejected, "")
  686. }
  687. for _, req := range req.Reqs {
  688. if bytes >= softResponseLimit {
  689. break
  690. }
  691. // Retrieve the requested state entry, stopping if enough was found
  692. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  693. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  694. if len(req.AccKey) > 0 {
  695. sdata := tr.Get(req.AccKey)
  696. tr = nil
  697. var acc state.Account
  698. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  699. tr, _ = trie.New(acc.Root, pm.chainDb)
  700. }
  701. }
  702. if tr != nil {
  703. proof := tr.Prove(req.Key)
  704. proofs = append(proofs, proof)
  705. bytes += len(proof)
  706. }
  707. }
  708. }
  709. }
  710. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  711. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  712. return p.SendProofs(req.ReqID, bv, proofs)
  713. case ProofsMsg:
  714. if pm.odr == nil {
  715. return errResp(ErrUnexpectedResponse, "")
  716. }
  717. glog.V(logger.Debug).Infof("LES: received ProofsMsg from peer %v", p.id)
  718. // A batch of merkle proofs arrived to one of our previous requests
  719. var resp struct {
  720. ReqID, BV uint64
  721. Data [][]rlp.RawValue
  722. }
  723. if err := msg.Decode(&resp); err != nil {
  724. return errResp(ErrDecode, "msg %v: %v", msg, err)
  725. }
  726. p.fcServer.GotReply(resp.ReqID, resp.BV)
  727. deliverMsg = &Msg{
  728. MsgType: MsgProofs,
  729. ReqID: resp.ReqID,
  730. Obj: resp.Data,
  731. }
  732. case GetHeaderProofsMsg:
  733. glog.V(logger.Debug).Infof("LES: received GetHeaderProofsMsg from peer %v", p.id)
  734. // Decode the retrieval message
  735. var req struct {
  736. ReqID uint64
  737. Reqs []ChtReq
  738. }
  739. if err := msg.Decode(&req); err != nil {
  740. return errResp(ErrDecode, "msg %v: %v", msg, err)
  741. }
  742. // Gather state data until the fetch or network limits is reached
  743. var (
  744. bytes int
  745. proofs []ChtResp
  746. )
  747. reqCnt = len(req.Reqs)
  748. if reqCnt > maxReqs || reqCnt > MaxHeaderProofsFetch {
  749. return errResp(ErrRequestRejected, "")
  750. }
  751. for _, req := range req.Reqs {
  752. if bytes >= softResponseLimit {
  753. break
  754. }
  755. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  756. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  757. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  758. var encNumber [8]byte
  759. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  760. proof := tr.Prove(encNumber[:])
  761. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  762. bytes += len(proof) + estHeaderRlpSize
  763. }
  764. }
  765. }
  766. }
  767. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  768. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  769. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  770. case HeaderProofsMsg:
  771. if pm.odr == nil {
  772. return errResp(ErrUnexpectedResponse, "")
  773. }
  774. glog.V(logger.Debug).Infof("LES: received HeaderProofsMsg from peer %v", p.id)
  775. var resp struct {
  776. ReqID, BV uint64
  777. Data []ChtResp
  778. }
  779. if err := msg.Decode(&resp); err != nil {
  780. return errResp(ErrDecode, "msg %v: %v", msg, err)
  781. }
  782. p.fcServer.GotReply(resp.ReqID, resp.BV)
  783. deliverMsg = &Msg{
  784. MsgType: MsgHeaderProofs,
  785. ReqID: resp.ReqID,
  786. Obj: resp.Data,
  787. }
  788. case SendTxMsg:
  789. if pm.txpool == nil {
  790. return errResp(ErrUnexpectedResponse, "")
  791. }
  792. // Transactions arrived, parse all of them and deliver to the pool
  793. var txs []*types.Transaction
  794. if err := msg.Decode(&txs); err != nil {
  795. return errResp(ErrDecode, "msg %v: %v", msg, err)
  796. }
  797. reqCnt = len(txs)
  798. if reqCnt > maxReqs || reqCnt > MaxTxSend {
  799. return errResp(ErrRequestRejected, "")
  800. }
  801. pm.txpool.AddBatch(txs)
  802. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  803. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  804. default:
  805. glog.V(logger.Debug).Infof("LES: received unknown message with code %d from peer %v", msg.Code, p.id)
  806. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  807. }
  808. if deliverMsg != nil {
  809. return pm.odr.Deliver(p, deliverMsg)
  810. }
  811. return nil
  812. }
  813. // NodeInfo retrieves some protocol metadata about the running host node.
  814. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  815. return &eth.EthNodeInfo{
  816. Network: self.networkId,
  817. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  818. Genesis: self.blockchain.Genesis().Hash(),
  819. Head: self.blockchain.LastBlockHash(),
  820. }
  821. }