handler.go 37 KB


  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "encoding/json"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/consensus"
  29. "github.com/ethereum/go-ethereum/core"
  30. "github.com/ethereum/go-ethereum/core/rawdb"
  31. "github.com/ethereum/go-ethereum/core/state"
  32. "github.com/ethereum/go-ethereum/core/types"
  33. "github.com/ethereum/go-ethereum/eth/downloader"
  34. "github.com/ethereum/go-ethereum/ethdb"
  35. "github.com/ethereum/go-ethereum/event"
  36. "github.com/ethereum/go-ethereum/light"
  37. "github.com/ethereum/go-ethereum/log"
  38. "github.com/ethereum/go-ethereum/p2p"
  39. "github.com/ethereum/go-ethereum/p2p/discv5"
  40. "github.com/ethereum/go-ethereum/params"
  41. "github.com/ethereum/go-ethereum/rlp"
  42. "github.com/ethereum/go-ethereum/trie"
  43. )
  44. const (
  45. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  46. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  47. ethVersion = 63 // equivalent eth version for the downloader
  48. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  49. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  50. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  51. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  52. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  54. MaxTxSend = 64 // Amount of transactions to be send per request
  55. MaxTxStatus = 256 // Amount of transactions to queried per request
  56. disableClientRemovePeer = false
  57. )
  58. func errResp(code errCode, format string, v ...interface{}) error {
  59. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  60. }
  61. type BlockChain interface {
  62. Config() *params.ChainConfig
  63. HasHeader(hash common.Hash, number uint64) bool
  64. GetHeader(hash common.Hash, number uint64) *types.Header
  65. GetHeaderByHash(hash common.Hash) *types.Header
  66. CurrentHeader() *types.Header
  67. GetTd(hash common.Hash, number uint64) *big.Int
  68. State() (*state.StateDB, error)
  69. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  70. Rollback(chain []common.Hash)
  71. GetHeaderByNumber(number uint64) *types.Header
  72. GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64)
  73. Genesis() *types.Block
  74. SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
  75. }
  76. type txPool interface {
  77. AddRemotes(txs []*types.Transaction) []error
  78. Status(hashes []common.Hash) []core.TxStatus
  79. }
  80. type ProtocolManager struct {
  81. lightSync bool
  82. txpool txPool
  83. txrelay *LesTxRelay
  84. networkId uint64
  85. chainConfig *params.ChainConfig
  86. blockchain BlockChain
  87. chainDb ethdb.Database
  88. odr *LesOdr
  89. server *LesServer
  90. serverPool *serverPool
  91. clientPool *freeClientPool
  92. lesTopic discv5.Topic
  93. reqDist *requestDistributor
  94. retriever *retrieveManager
  95. downloader *downloader.Downloader
  96. fetcher *lightFetcher
  97. peers *peerSet
  98. maxPeers int
  99. eventMux *event.TypeMux
  100. // channels for fetcher, syncer, txsyncLoop
  101. newPeerCh chan *peer
  102. quitSync chan struct{}
  103. noMorePeers chan struct{}
  104. // wait group is used for graceful shutdowns during downloading
  105. // and processing
  106. wg *sync.WaitGroup
  107. }
  108. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  109. // with the ethereum network.
  110. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
  111. // Create the protocol manager with the base fields
  112. manager := &ProtocolManager{
  113. lightSync: lightSync,
  114. eventMux: mux,
  115. blockchain: blockchain,
  116. chainConfig: chainConfig,
  117. chainDb: chainDb,
  118. odr: odr,
  119. networkId: networkId,
  120. txpool: txpool,
  121. txrelay: txrelay,
  122. serverPool: serverPool,
  123. peers: peers,
  124. newPeerCh: make(chan *peer),
  125. quitSync: quitSync,
  126. wg: wg,
  127. noMorePeers: make(chan struct{}),
  128. }
  129. if odr != nil {
  130. manager.retriever = odr.retriever
  131. manager.reqDist = odr.retriever.dist
  132. }
  133. removePeer := manager.removePeer
  134. if disableClientRemovePeer {
  135. removePeer = func(id string) {}
  136. }
  137. if lightSync {
  138. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
  139. manager.peers.notify((*downloaderPeerNotify)(manager))
  140. manager.fetcher = newLightFetcher(manager)
  141. }
  142. return manager, nil
  143. }
  144. // removePeer initiates disconnection from a peer by removing it from the peer set
  145. func (pm *ProtocolManager) removePeer(id string) {
  146. pm.peers.Unregister(id)
  147. }
  148. func (pm *ProtocolManager) Start(maxPeers int) {
  149. pm.maxPeers = maxPeers
  150. if pm.lightSync {
  151. go pm.syncer()
  152. } else {
  153. pm.clientPool = newFreeClientPool(pm.chainDb, maxPeers, 10000, mclock.System{})
  154. go func() {
  155. for range pm.newPeerCh {
  156. }
  157. }()
  158. }
  159. }
  160. func (pm *ProtocolManager) Stop() {
  161. // Showing a log message. During download / process this could actually
  162. // take between 5 to 10 seconds and therefor feedback is required.
  163. log.Info("Stopping light Ethereum protocol")
  164. // Quit the sync loop.
  165. // After this send has completed, no new peers will be accepted.
  166. pm.noMorePeers <- struct{}{}
  167. close(pm.quitSync) // quits syncer, fetcher
  168. if pm.clientPool != nil {
  169. pm.clientPool.stop()
  170. }
  171. // Disconnect existing sessions.
  172. // This also closes the gate for any new registrations on the peer set.
  173. // sessions which are already established but not added to pm.peers yet
  174. // will exit when they try to register.
  175. pm.peers.Close()
  176. // Wait for any process action
  177. pm.wg.Wait()
  178. log.Info("Light Ethereum protocol stopped")
  179. }
  180. // runPeer is the p2p protocol run function for the given version.
  181. func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
  182. var entry *poolEntry
  183. peer := pm.newPeer(int(version), pm.networkId, p, rw)
  184. if pm.serverPool != nil {
  185. addr := p.RemoteAddr().(*net.TCPAddr)
  186. entry = pm.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  187. }
  188. peer.poolEntry = entry
  189. select {
  190. case pm.newPeerCh <- peer:
  191. pm.wg.Add(1)
  192. defer pm.wg.Done()
  193. err := pm.handle(peer)
  194. if entry != nil {
  195. pm.serverPool.disconnect(entry)
  196. }
  197. return err
  198. case <-pm.quitSync:
  199. if entry != nil {
  200. pm.serverPool.disconnect(entry)
  201. }
  202. return p2p.DiscQuitting
  203. }
  204. }
  205. func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  206. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  207. }
  208. // handle is the callback invoked to manage the life cycle of a les peer. When
  209. // this function terminates, the peer is disconnected.
  210. func (pm *ProtocolManager) handle(p *peer) error {
  211. // Ignore maxPeers if this is a trusted peer
  212. // In server mode we try to check into the client pool after handshake
  213. if pm.lightSync && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
  214. return p2p.DiscTooManyPeers
  215. }
  216. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  217. // Execute the LES handshake
  218. var (
  219. genesis = pm.blockchain.Genesis()
  220. head = pm.blockchain.CurrentHeader()
  221. hash = head.Hash()
  222. number = head.Number.Uint64()
  223. td = pm.blockchain.GetTd(hash, number)
  224. )
  225. if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil {
  226. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  227. return err
  228. }
  229. if !pm.lightSync && !p.Peer.Info().Network.Trusted {
  230. addr, ok := p.RemoteAddr().(*net.TCPAddr)
  231. // test peer address is not a tcp address, don't use client pool if can not typecast
  232. if ok {
  233. id := addr.IP.String()
  234. if !pm.clientPool.connect(id, func() { go pm.removePeer(p.id) }) {
  235. return p2p.DiscTooManyPeers
  236. }
  237. defer pm.clientPool.disconnect(id)
  238. }
  239. }
  240. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  241. rw.Init(p.version)
  242. }
  243. // Register the peer locally
  244. if err := pm.peers.Register(p); err != nil {
  245. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  246. return err
  247. }
  248. defer func() {
  249. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  250. p.fcClient.Remove(pm.server.fcManager)
  251. }
  252. pm.removePeer(p.id)
  253. }()
  254. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  255. if pm.lightSync {
  256. p.lock.Lock()
  257. head := p.headInfo
  258. p.lock.Unlock()
  259. if pm.fetcher != nil {
  260. pm.fetcher.announce(p, head)
  261. }
  262. if p.poolEntry != nil {
  263. pm.serverPool.registered(p.poolEntry)
  264. }
  265. }
  266. stop := make(chan struct{})
  267. defer close(stop)
  268. go func() {
  269. // new block announce loop
  270. for {
  271. select {
  272. case announce := <-p.announceChn:
  273. p.SendAnnounce(announce)
  274. case <-stop:
  275. return
  276. }
  277. }
  278. }()
  279. // main loop. handle incoming messages.
  280. for {
  281. if err := pm.handleMsg(p); err != nil {
  282. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  283. return err
  284. }
  285. }
  286. }
  287. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
  288. // handleMsg is invoked whenever an inbound message is received from a remote
  289. // peer. The remote connection is torn down upon returning any error.
  290. func (pm *ProtocolManager) handleMsg(p *peer) error {
  291. // Read the next message from the remote peer, and ensure it's fully consumed
  292. msg, err := p.rw.ReadMsg()
  293. if err != nil {
  294. return err
  295. }
  296. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  297. costs := p.fcCosts[msg.Code]
  298. reject := func(reqCnt, maxCnt uint64) bool {
  299. if p.fcClient == nil || reqCnt > maxCnt {
  300. return true
  301. }
  302. bufValue, _ := p.fcClient.AcceptRequest()
  303. cost := costs.baseCost + reqCnt*costs.reqCost
  304. if cost > pm.server.defParams.BufLimit {
  305. cost = pm.server.defParams.BufLimit
  306. }
  307. if cost > bufValue {
  308. recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
  309. p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
  310. return true
  311. }
  312. return false
  313. }
  314. if msg.Size > ProtocolMaxMsgSize {
  315. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  316. }
  317. defer msg.Discard()
  318. var deliverMsg *Msg
  319. // Handle the message depending on its contents
  320. switch msg.Code {
  321. case StatusMsg:
  322. p.Log().Trace("Received status message")
  323. // Status messages should never arrive after the handshake
  324. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  325. // Block header query, collect the requested headers and reply
  326. case AnnounceMsg:
  327. p.Log().Trace("Received announce message")
  328. if p.requestAnnounceType == announceTypeNone {
  329. return errResp(ErrUnexpectedResponse, "")
  330. }
  331. var req announceData
  332. if err := msg.Decode(&req); err != nil {
  333. return errResp(ErrDecode, "%v: %v", msg, err)
  334. }
  335. if p.requestAnnounceType == announceTypeSigned {
  336. if err := req.checkSignature(p.pubKey); err != nil {
  337. p.Log().Trace("Invalid announcement signature", "err", err)
  338. return err
  339. }
  340. p.Log().Trace("Valid announcement signature")
  341. }
  342. p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
  343. if pm.fetcher != nil {
  344. pm.fetcher.announce(p, &req)
  345. }
  346. case GetBlockHeadersMsg:
  347. p.Log().Trace("Received block header request")
  348. // Decode the complex header query
  349. var req struct {
  350. ReqID uint64
  351. Query getBlockHeadersData
  352. }
  353. if err := msg.Decode(&req); err != nil {
  354. return errResp(ErrDecode, "%v: %v", msg, err)
  355. }
  356. query := req.Query
  357. if reject(query.Amount, MaxHeaderFetch) {
  358. return errResp(ErrRequestRejected, "")
  359. }
  360. hashMode := query.Origin.Hash != (common.Hash{})
  361. first := true
  362. maxNonCanonical := uint64(100)
  363. // Gather headers until the fetch or network limits is reached
  364. var (
  365. bytes common.StorageSize
  366. headers []*types.Header
  367. unknown bool
  368. )
  369. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  370. // Retrieve the next header satisfying the query
  371. var origin *types.Header
  372. if hashMode {
  373. if first {
  374. first = false
  375. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  376. if origin != nil {
  377. query.Origin.Number = origin.Number.Uint64()
  378. }
  379. } else {
  380. origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
  381. }
  382. } else {
  383. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  384. }
  385. if origin == nil {
  386. break
  387. }
  388. headers = append(headers, origin)
  389. bytes += estHeaderRlpSize
  390. // Advance to the next header of the query
  391. switch {
  392. case hashMode && query.Reverse:
  393. // Hash based traversal towards the genesis block
  394. ancestor := query.Skip + 1
  395. if ancestor == 0 {
  396. unknown = true
  397. } else {
  398. query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
  399. unknown = (query.Origin.Hash == common.Hash{})
  400. }
  401. case hashMode && !query.Reverse:
  402. // Hash based traversal towards the leaf block
  403. var (
  404. current = origin.Number.Uint64()
  405. next = current + query.Skip + 1
  406. )
  407. if next <= current {
  408. infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
  409. p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
  410. unknown = true
  411. } else {
  412. if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
  413. nextHash := header.Hash()
  414. expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
  415. if expOldHash == query.Origin.Hash {
  416. query.Origin.Hash, query.Origin.Number = nextHash, next
  417. } else {
  418. unknown = true
  419. }
  420. } else {
  421. unknown = true
  422. }
  423. }
  424. case query.Reverse:
  425. // Number based traversal towards the genesis block
  426. if query.Origin.Number >= query.Skip+1 {
  427. query.Origin.Number -= query.Skip + 1
  428. } else {
  429. unknown = true
  430. }
  431. case !query.Reverse:
  432. // Number based traversal towards the leaf block
  433. query.Origin.Number += query.Skip + 1
  434. }
  435. }
  436. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  437. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  438. return p.SendBlockHeaders(req.ReqID, bv, headers)
  439. case BlockHeadersMsg:
  440. if pm.downloader == nil {
  441. return errResp(ErrUnexpectedResponse, "")
  442. }
  443. p.Log().Trace("Received block header response message")
  444. // A batch of headers arrived to one of our previous requests
  445. var resp struct {
  446. ReqID, BV uint64
  447. Headers []*types.Header
  448. }
  449. if err := msg.Decode(&resp); err != nil {
  450. return errResp(ErrDecode, "msg %v: %v", msg, err)
  451. }
  452. p.fcServer.GotReply(resp.ReqID, resp.BV)
  453. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  454. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  455. } else {
  456. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  457. if err != nil {
  458. log.Debug(fmt.Sprint(err))
  459. }
  460. }
  461. case GetBlockBodiesMsg:
  462. p.Log().Trace("Received block bodies request")
  463. // Decode the retrieval message
  464. var req struct {
  465. ReqID uint64
  466. Hashes []common.Hash
  467. }
  468. if err := msg.Decode(&req); err != nil {
  469. return errResp(ErrDecode, "msg %v: %v", msg, err)
  470. }
  471. // Gather blocks until the fetch or network limits is reached
  472. var (
  473. bytes int
  474. bodies []rlp.RawValue
  475. )
  476. reqCnt := len(req.Hashes)
  477. if reject(uint64(reqCnt), MaxBodyFetch) {
  478. return errResp(ErrRequestRejected, "")
  479. }
  480. for _, hash := range req.Hashes {
  481. if bytes >= softResponseLimit {
  482. break
  483. }
  484. // Retrieve the requested block body, stopping if enough was found
  485. if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
  486. if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 {
  487. bodies = append(bodies, data)
  488. bytes += len(data)
  489. }
  490. }
  491. }
  492. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  493. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  494. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  495. case BlockBodiesMsg:
  496. if pm.odr == nil {
  497. return errResp(ErrUnexpectedResponse, "")
  498. }
  499. p.Log().Trace("Received block bodies response")
  500. // A batch of block bodies arrived to one of our previous requests
  501. var resp struct {
  502. ReqID, BV uint64
  503. Data []*types.Body
  504. }
  505. if err := msg.Decode(&resp); err != nil {
  506. return errResp(ErrDecode, "msg %v: %v", msg, err)
  507. }
  508. p.fcServer.GotReply(resp.ReqID, resp.BV)
  509. deliverMsg = &Msg{
  510. MsgType: MsgBlockBodies,
  511. ReqID: resp.ReqID,
  512. Obj: resp.Data,
  513. }
  514. case GetCodeMsg:
  515. p.Log().Trace("Received code request")
  516. // Decode the retrieval message
  517. var req struct {
  518. ReqID uint64
  519. Reqs []CodeReq
  520. }
  521. if err := msg.Decode(&req); err != nil {
  522. return errResp(ErrDecode, "msg %v: %v", msg, err)
  523. }
  524. // Gather state data until the fetch or network limits is reached
  525. var (
  526. bytes int
  527. data [][]byte
  528. )
  529. reqCnt := len(req.Reqs)
  530. if reject(uint64(reqCnt), MaxCodeFetch) {
  531. return errResp(ErrRequestRejected, "")
  532. }
  533. for _, req := range req.Reqs {
  534. // Retrieve the requested state entry, stopping if enough was found
  535. if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
  536. if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
  537. statedb, err := pm.blockchain.State()
  538. if err != nil {
  539. continue
  540. }
  541. account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
  542. if err != nil {
  543. continue
  544. }
  545. code, _ := statedb.Database().TrieDB().Node(common.BytesToHash(account.CodeHash))
  546. data = append(data, code)
  547. if bytes += len(code); bytes >= softResponseLimit {
  548. break
  549. }
  550. }
  551. }
  552. }
  553. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  554. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  555. return p.SendCode(req.ReqID, bv, data)
  556. case CodeMsg:
  557. if pm.odr == nil {
  558. return errResp(ErrUnexpectedResponse, "")
  559. }
  560. p.Log().Trace("Received code response")
  561. // A batch of node state data arrived to one of our previous requests
  562. var resp struct {
  563. ReqID, BV uint64
  564. Data [][]byte
  565. }
  566. if err := msg.Decode(&resp); err != nil {
  567. return errResp(ErrDecode, "msg %v: %v", msg, err)
  568. }
  569. p.fcServer.GotReply(resp.ReqID, resp.BV)
  570. deliverMsg = &Msg{
  571. MsgType: MsgCode,
  572. ReqID: resp.ReqID,
  573. Obj: resp.Data,
  574. }
  575. case GetReceiptsMsg:
  576. p.Log().Trace("Received receipts request")
  577. // Decode the retrieval message
  578. var req struct {
  579. ReqID uint64
  580. Hashes []common.Hash
  581. }
  582. if err := msg.Decode(&req); err != nil {
  583. return errResp(ErrDecode, "msg %v: %v", msg, err)
  584. }
  585. // Gather state data until the fetch or network limits is reached
  586. var (
  587. bytes int
  588. receipts []rlp.RawValue
  589. )
  590. reqCnt := len(req.Hashes)
  591. if reject(uint64(reqCnt), MaxReceiptFetch) {
  592. return errResp(ErrRequestRejected, "")
  593. }
  594. for _, hash := range req.Hashes {
  595. if bytes >= softResponseLimit {
  596. break
  597. }
  598. // Retrieve the requested block's receipts, skipping if unknown to us
  599. var results types.Receipts
  600. if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
  601. results = rawdb.ReadReceipts(pm.chainDb, hash, *number)
  602. }
  603. if results == nil {
  604. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  605. continue
  606. }
  607. }
  608. // If known, encode and queue for response packet
  609. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  610. log.Error("Failed to encode receipt", "err", err)
  611. } else {
  612. receipts = append(receipts, encoded)
  613. bytes += len(encoded)
  614. }
  615. }
  616. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  617. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  618. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  619. case ReceiptsMsg:
  620. if pm.odr == nil {
  621. return errResp(ErrUnexpectedResponse, "")
  622. }
  623. p.Log().Trace("Received receipts response")
  624. // A batch of receipts arrived to one of our previous requests
  625. var resp struct {
  626. ReqID, BV uint64
  627. Receipts []types.Receipts
  628. }
  629. if err := msg.Decode(&resp); err != nil {
  630. return errResp(ErrDecode, "msg %v: %v", msg, err)
  631. }
  632. p.fcServer.GotReply(resp.ReqID, resp.BV)
  633. deliverMsg = &Msg{
  634. MsgType: MsgReceipts,
  635. ReqID: resp.ReqID,
  636. Obj: resp.Receipts,
  637. }
  638. case GetProofsV1Msg:
  639. p.Log().Trace("Received proofs request")
  640. // Decode the retrieval message
  641. var req struct {
  642. ReqID uint64
  643. Reqs []ProofReq
  644. }
  645. if err := msg.Decode(&req); err != nil {
  646. return errResp(ErrDecode, "msg %v: %v", msg, err)
  647. }
  648. // Gather state data until the fetch or network limits is reached
  649. var (
  650. bytes int
  651. proofs proofsData
  652. )
  653. reqCnt := len(req.Reqs)
  654. if reject(uint64(reqCnt), MaxProofsFetch) {
  655. return errResp(ErrRequestRejected, "")
  656. }
  657. for _, req := range req.Reqs {
  658. // Retrieve the requested state entry, stopping if enough was found
  659. if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
  660. if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
  661. statedb, err := pm.blockchain.State()
  662. if err != nil {
  663. continue
  664. }
  665. var trie state.Trie
  666. if len(req.AccKey) > 0 {
  667. account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
  668. if err != nil {
  669. continue
  670. }
  671. trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
  672. } else {
  673. trie, _ = statedb.Database().OpenTrie(header.Root)
  674. }
  675. if trie != nil {
  676. var proof light.NodeList
  677. trie.Prove(req.Key, 0, &proof)
  678. proofs = append(proofs, proof)
  679. if bytes += proof.DataSize(); bytes >= softResponseLimit {
  680. break
  681. }
  682. }
  683. }
  684. }
  685. }
  686. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  687. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  688. return p.SendProofs(req.ReqID, bv, proofs)
  689. case GetProofsV2Msg:
  690. p.Log().Trace("Received les/2 proofs request")
  691. // Decode the retrieval message
  692. var req struct {
  693. ReqID uint64
  694. Reqs []ProofReq
  695. }
  696. if err := msg.Decode(&req); err != nil {
  697. return errResp(ErrDecode, "msg %v: %v", msg, err)
  698. }
  699. // Gather state data until the fetch or network limits is reached
  700. var (
  701. lastBHash common.Hash
  702. statedb *state.StateDB
  703. root common.Hash
  704. )
  705. reqCnt := len(req.Reqs)
  706. if reject(uint64(reqCnt), MaxProofsFetch) {
  707. return errResp(ErrRequestRejected, "")
  708. }
  709. nodes := light.NewNodeSet()
  710. for _, req := range req.Reqs {
  711. // Look up the state belonging to the request
  712. if statedb == nil || req.BHash != lastBHash {
  713. statedb, root, lastBHash = nil, common.Hash{}, req.BHash
  714. if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
  715. if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
  716. statedb, _ = pm.blockchain.State()
  717. root = header.Root
  718. }
  719. }
  720. }
  721. if statedb == nil {
  722. continue
  723. }
  724. // Pull the account or storage trie of the request
  725. var trie state.Trie
  726. if len(req.AccKey) > 0 {
  727. account, err := pm.getAccount(statedb, root, common.BytesToHash(req.AccKey))
  728. if err != nil {
  729. continue
  730. }
  731. trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
  732. } else {
  733. trie, _ = statedb.Database().OpenTrie(root)
  734. }
  735. if trie == nil {
  736. continue
  737. }
  738. // Prove the user's request from the account or stroage trie
  739. trie.Prove(req.Key, req.FromLevel, nodes)
  740. if nodes.DataSize() >= softResponseLimit {
  741. break
  742. }
  743. }
  744. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  745. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  746. return p.SendProofsV2(req.ReqID, bv, nodes.NodeList())
  747. case ProofsV1Msg:
  748. if pm.odr == nil {
  749. return errResp(ErrUnexpectedResponse, "")
  750. }
  751. p.Log().Trace("Received proofs response")
  752. // A batch of merkle proofs arrived to one of our previous requests
  753. var resp struct {
  754. ReqID, BV uint64
  755. Data []light.NodeList
  756. }
  757. if err := msg.Decode(&resp); err != nil {
  758. return errResp(ErrDecode, "msg %v: %v", msg, err)
  759. }
  760. p.fcServer.GotReply(resp.ReqID, resp.BV)
  761. deliverMsg = &Msg{
  762. MsgType: MsgProofsV1,
  763. ReqID: resp.ReqID,
  764. Obj: resp.Data,
  765. }
  766. case ProofsV2Msg:
  767. if pm.odr == nil {
  768. return errResp(ErrUnexpectedResponse, "")
  769. }
  770. p.Log().Trace("Received les/2 proofs response")
  771. // A batch of merkle proofs arrived to one of our previous requests
  772. var resp struct {
  773. ReqID, BV uint64
  774. Data light.NodeList
  775. }
  776. if err := msg.Decode(&resp); err != nil {
  777. return errResp(ErrDecode, "msg %v: %v", msg, err)
  778. }
  779. p.fcServer.GotReply(resp.ReqID, resp.BV)
  780. deliverMsg = &Msg{
  781. MsgType: MsgProofsV2,
  782. ReqID: resp.ReqID,
  783. Obj: resp.Data,
  784. }
  785. case GetHeaderProofsMsg:
  786. p.Log().Trace("Received headers proof request")
  787. // Decode the retrieval message
  788. var req struct {
  789. ReqID uint64
  790. Reqs []ChtReq
  791. }
  792. if err := msg.Decode(&req); err != nil {
  793. return errResp(ErrDecode, "msg %v: %v", msg, err)
  794. }
  795. // Gather state data until the fetch or network limits is reached
  796. var (
  797. bytes int
  798. proofs []ChtResp
  799. )
  800. reqCnt := len(req.Reqs)
  801. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  802. return errResp(ErrRequestRejected, "")
  803. }
  804. trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
  805. for _, req := range req.Reqs {
  806. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  807. sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*light.CHTFrequencyServer-1)
  808. if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
  809. trie, err := trie.New(root, trieDb)
  810. if err != nil {
  811. continue
  812. }
  813. var encNumber [8]byte
  814. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  815. var proof light.NodeList
  816. trie.Prove(encNumber[:], 0, &proof)
  817. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  818. if bytes += proof.DataSize() + estHeaderRlpSize; bytes >= softResponseLimit {
  819. break
  820. }
  821. }
  822. }
  823. }
  824. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  825. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  826. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  827. case GetHelperTrieProofsMsg:
  828. p.Log().Trace("Received helper trie proof request")
  829. // Decode the retrieval message
  830. var req struct {
  831. ReqID uint64
  832. Reqs []HelperTrieReq
  833. }
  834. if err := msg.Decode(&req); err != nil {
  835. return errResp(ErrDecode, "msg %v: %v", msg, err)
  836. }
  837. // Gather state data until the fetch or network limits is reached
  838. var (
  839. auxBytes int
  840. auxData [][]byte
  841. )
  842. reqCnt := len(req.Reqs)
  843. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  844. return errResp(ErrRequestRejected, "")
  845. }
  846. var (
  847. lastIdx uint64
  848. lastType uint
  849. root common.Hash
  850. auxTrie *trie.Trie
  851. )
  852. nodes := light.NewNodeSet()
  853. for _, req := range req.Reqs {
  854. if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx {
  855. auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx
  856. var prefix string
  857. if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) {
  858. auxTrie, _ = trie.New(root, trie.NewDatabase(ethdb.NewTable(pm.chainDb, prefix)))
  859. }
  860. }
  861. if req.AuxReq == auxRoot {
  862. var data []byte
  863. if root != (common.Hash{}) {
  864. data = root[:]
  865. }
  866. auxData = append(auxData, data)
  867. auxBytes += len(data)
  868. } else {
  869. if auxTrie != nil {
  870. auxTrie.Prove(req.Key, req.FromLevel, nodes)
  871. }
  872. if req.AuxReq != 0 {
  873. data := pm.getHelperTrieAuxData(req)
  874. auxData = append(auxData, data)
  875. auxBytes += len(data)
  876. }
  877. }
  878. if nodes.DataSize()+auxBytes >= softResponseLimit {
  879. break
  880. }
  881. }
  882. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  883. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  884. return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
  885. case HeaderProofsMsg:
  886. if pm.odr == nil {
  887. return errResp(ErrUnexpectedResponse, "")
  888. }
  889. p.Log().Trace("Received headers proof response")
  890. var resp struct {
  891. ReqID, BV uint64
  892. Data []ChtResp
  893. }
  894. if err := msg.Decode(&resp); err != nil {
  895. return errResp(ErrDecode, "msg %v: %v", msg, err)
  896. }
  897. p.fcServer.GotReply(resp.ReqID, resp.BV)
  898. deliverMsg = &Msg{
  899. MsgType: MsgHeaderProofs,
  900. ReqID: resp.ReqID,
  901. Obj: resp.Data,
  902. }
  903. case HelperTrieProofsMsg:
  904. if pm.odr == nil {
  905. return errResp(ErrUnexpectedResponse, "")
  906. }
  907. p.Log().Trace("Received helper trie proof response")
  908. var resp struct {
  909. ReqID, BV uint64
  910. Data HelperTrieResps
  911. }
  912. if err := msg.Decode(&resp); err != nil {
  913. return errResp(ErrDecode, "msg %v: %v", msg, err)
  914. }
  915. p.fcServer.GotReply(resp.ReqID, resp.BV)
  916. deliverMsg = &Msg{
  917. MsgType: MsgHelperTrieProofs,
  918. ReqID: resp.ReqID,
  919. Obj: resp.Data,
  920. }
  921. case SendTxMsg:
  922. if pm.txpool == nil {
  923. return errResp(ErrRequestRejected, "")
  924. }
  925. // Transactions arrived, parse all of them and deliver to the pool
  926. var txs []*types.Transaction
  927. if err := msg.Decode(&txs); err != nil {
  928. return errResp(ErrDecode, "msg %v: %v", msg, err)
  929. }
  930. reqCnt := len(txs)
  931. if reject(uint64(reqCnt), MaxTxSend) {
  932. return errResp(ErrRequestRejected, "")
  933. }
  934. pm.txpool.AddRemotes(txs)
  935. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  936. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  937. case SendTxV2Msg:
  938. if pm.txpool == nil {
  939. return errResp(ErrRequestRejected, "")
  940. }
  941. // Transactions arrived, parse all of them and deliver to the pool
  942. var req struct {
  943. ReqID uint64
  944. Txs []*types.Transaction
  945. }
  946. if err := msg.Decode(&req); err != nil {
  947. return errResp(ErrDecode, "msg %v: %v", msg, err)
  948. }
  949. reqCnt := len(req.Txs)
  950. if reject(uint64(reqCnt), MaxTxSend) {
  951. return errResp(ErrRequestRejected, "")
  952. }
  953. hashes := make([]common.Hash, len(req.Txs))
  954. for i, tx := range req.Txs {
  955. hashes[i] = tx.Hash()
  956. }
  957. stats := pm.txStatus(hashes)
  958. for i, stat := range stats {
  959. if stat.Status == core.TxStatusUnknown {
  960. if errs := pm.txpool.AddRemotes([]*types.Transaction{req.Txs[i]}); errs[0] != nil {
  961. stats[i].Error = errs[0].Error()
  962. continue
  963. }
  964. stats[i] = pm.txStatus([]common.Hash{hashes[i]})[0]
  965. }
  966. }
  967. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  968. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  969. return p.SendTxStatus(req.ReqID, bv, stats)
  970. case GetTxStatusMsg:
  971. if pm.txpool == nil {
  972. return errResp(ErrUnexpectedResponse, "")
  973. }
  974. // Transactions arrived, parse all of them and deliver to the pool
  975. var req struct {
  976. ReqID uint64
  977. Hashes []common.Hash
  978. }
  979. if err := msg.Decode(&req); err != nil {
  980. return errResp(ErrDecode, "msg %v: %v", msg, err)
  981. }
  982. reqCnt := len(req.Hashes)
  983. if reject(uint64(reqCnt), MaxTxStatus) {
  984. return errResp(ErrRequestRejected, "")
  985. }
  986. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  987. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  988. return p.SendTxStatus(req.ReqID, bv, pm.txStatus(req.Hashes))
  989. case TxStatusMsg:
  990. if pm.odr == nil {
  991. return errResp(ErrUnexpectedResponse, "")
  992. }
  993. p.Log().Trace("Received tx status response")
  994. var resp struct {
  995. ReqID, BV uint64
  996. Status []txStatus
  997. }
  998. if err := msg.Decode(&resp); err != nil {
  999. return errResp(ErrDecode, "msg %v: %v", msg, err)
  1000. }
  1001. p.fcServer.GotReply(resp.ReqID, resp.BV)
  1002. default:
  1003. p.Log().Trace("Received unknown message", "code", msg.Code)
  1004. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  1005. }
  1006. if deliverMsg != nil {
  1007. err := pm.retriever.deliver(p, deliverMsg)
  1008. if err != nil {
  1009. p.responseErrors++
  1010. if p.responseErrors > maxResponseErrors {
  1011. return err
  1012. }
  1013. }
  1014. }
  1015. return nil
  1016. }
  1017. // getAccount retrieves an account from the state based at root.
  1018. func (pm *ProtocolManager) getAccount(statedb *state.StateDB, root, hash common.Hash) (state.Account, error) {
  1019. trie, err := trie.New(root, statedb.Database().TrieDB())
  1020. if err != nil {
  1021. return state.Account{}, err
  1022. }
  1023. blob, err := trie.TryGet(hash[:])
  1024. if err != nil {
  1025. return state.Account{}, err
  1026. }
  1027. var account state.Account
  1028. if err = rlp.DecodeBytes(blob, &account); err != nil {
  1029. return state.Account{}, err
  1030. }
  1031. return account, nil
  1032. }
  1033. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  1034. func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
  1035. switch id {
  1036. case htCanonical:
  1037. sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.CHTFrequencyClient-1)
  1038. return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix
  1039. case htBloomBits:
  1040. sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1)
  1041. return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
  1042. }
  1043. return common.Hash{}, ""
  1044. }
  1045. // getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request
  1046. func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
  1047. if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 {
  1048. blockNum := binary.BigEndian.Uint64(req.Key)
  1049. hash := rawdb.ReadCanonicalHash(pm.chainDb, blockNum)
  1050. return rawdb.ReadHeaderRLP(pm.chainDb, hash, blockNum)
  1051. }
  1052. return nil
  1053. }
  1054. func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus {
  1055. stats := make([]txStatus, len(hashes))
  1056. for i, stat := range pm.txpool.Status(hashes) {
  1057. // Save the status we've got from the transaction pool
  1058. stats[i].Status = stat
  1059. // If the transaction is unknown to the pool, try looking it up locally
  1060. if stat == core.TxStatusUnknown {
  1061. if block, number, index := rawdb.ReadTxLookupEntry(pm.chainDb, hashes[i]); block != (common.Hash{}) {
  1062. stats[i].Status = core.TxStatusIncluded
  1063. stats[i].Lookup = &rawdb.TxLookupEntry{BlockHash: block, BlockIndex: number, Index: index}
  1064. }
  1065. }
  1066. }
  1067. return stats
  1068. }
  1069. // downloaderPeerNotify implements peerSetNotify
  1070. type downloaderPeerNotify ProtocolManager
  1071. type peerConnection struct {
  1072. manager *ProtocolManager
  1073. peer *peer
  1074. }
  1075. func (pc *peerConnection) Head() (common.Hash, *big.Int) {
  1076. return pc.peer.HeadAndTd()
  1077. }
  1078. func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  1079. reqID := genReqID()
  1080. rq := &distReq{
  1081. getCost: func(dp distPeer) uint64 {
  1082. peer := dp.(*peer)
  1083. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1084. },
  1085. canSend: func(dp distPeer) bool {
  1086. return dp.(*peer) == pc.peer
  1087. },
  1088. request: func(dp distPeer) func() {
  1089. peer := dp.(*peer)
  1090. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1091. peer.fcServer.QueueRequest(reqID, cost)
  1092. return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
  1093. },
  1094. }
  1095. _, ok := <-pc.manager.reqDist.queue(rq)
  1096. if !ok {
  1097. return light.ErrNoPeers
  1098. }
  1099. return nil
  1100. }
  1101. func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  1102. reqID := genReqID()
  1103. rq := &distReq{
  1104. getCost: func(dp distPeer) uint64 {
  1105. peer := dp.(*peer)
  1106. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1107. },
  1108. canSend: func(dp distPeer) bool {
  1109. return dp.(*peer) == pc.peer
  1110. },
  1111. request: func(dp distPeer) func() {
  1112. peer := dp.(*peer)
  1113. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1114. peer.fcServer.QueueRequest(reqID, cost)
  1115. return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
  1116. },
  1117. }
  1118. _, ok := <-pc.manager.reqDist.queue(rq)
  1119. if !ok {
  1120. return light.ErrNoPeers
  1121. }
  1122. return nil
  1123. }
  1124. func (d *downloaderPeerNotify) registerPeer(p *peer) {
  1125. pm := (*ProtocolManager)(d)
  1126. pc := &peerConnection{
  1127. manager: pm,
  1128. peer: p,
  1129. }
  1130. pm.downloader.RegisterLightPeer(p.id, ethVersion, pc)
  1131. }
  1132. func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
  1133. pm := (*ProtocolManager)(d)
  1134. pm.downloader.UnregisterPeer(p.id)
  1135. }