handler.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "encoding/json"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/common/mclock"
  28. "github.com/ethereum/go-ethereum/consensus"
  29. "github.com/ethereum/go-ethereum/core"
  30. "github.com/ethereum/go-ethereum/core/rawdb"
  31. "github.com/ethereum/go-ethereum/core/state"
  32. "github.com/ethereum/go-ethereum/core/types"
  33. "github.com/ethereum/go-ethereum/eth/downloader"
  34. "github.com/ethereum/go-ethereum/ethdb"
  35. "github.com/ethereum/go-ethereum/event"
  36. "github.com/ethereum/go-ethereum/light"
  37. "github.com/ethereum/go-ethereum/log"
  38. "github.com/ethereum/go-ethereum/p2p"
  39. "github.com/ethereum/go-ethereum/p2p/discv5"
  40. "github.com/ethereum/go-ethereum/params"
  41. "github.com/ethereum/go-ethereum/rlp"
  42. "github.com/ethereum/go-ethereum/trie"
  43. )
  44. const (
  45. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  46. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  47. ethVersion = 63 // equivalent eth version for the downloader
  48. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  49. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  50. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  51. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  52. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  54. MaxTxSend = 64 // Amount of transactions to be send per request
  55. MaxTxStatus = 256 // Amount of transactions to queried per request
  56. disableClientRemovePeer = false
  57. )
  58. func errResp(code errCode, format string, v ...interface{}) error {
  59. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  60. }
  61. type BlockChain interface {
  62. Config() *params.ChainConfig
  63. HasHeader(hash common.Hash, number uint64) bool
  64. GetHeader(hash common.Hash, number uint64) *types.Header
  65. GetHeaderByHash(hash common.Hash) *types.Header
  66. CurrentHeader() *types.Header
  67. GetTd(hash common.Hash, number uint64) *big.Int
  68. State() (*state.StateDB, error)
  69. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  70. Rollback(chain []common.Hash)
  71. GetHeaderByNumber(number uint64) *types.Header
  72. GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64)
  73. Genesis() *types.Block
  74. SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
  75. }
  76. type txPool interface {
  77. AddRemotes(txs []*types.Transaction) []error
  78. Status(hashes []common.Hash) []core.TxStatus
  79. }
  80. type ProtocolManager struct {
  81. lightSync bool
  82. txpool txPool
  83. txrelay *LesTxRelay
  84. networkId uint64
  85. chainConfig *params.ChainConfig
  86. iConfig *light.IndexerConfig
  87. blockchain BlockChain
  88. chainDb ethdb.Database
  89. odr *LesOdr
  90. server *LesServer
  91. serverPool *serverPool
  92. clientPool *freeClientPool
  93. lesTopic discv5.Topic
  94. reqDist *requestDistributor
  95. retriever *retrieveManager
  96. downloader *downloader.Downloader
  97. fetcher *lightFetcher
  98. peers *peerSet
  99. maxPeers int
  100. eventMux *event.TypeMux
  101. // channels for fetcher, syncer, txsyncLoop
  102. newPeerCh chan *peer
  103. quitSync chan struct{}
  104. noMorePeers chan struct{}
  105. // wait group is used for graceful shutdowns during downloading
  106. // and processing
  107. wg *sync.WaitGroup
  108. }
  109. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  110. // with the ethereum network.
  111. func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
  112. // Create the protocol manager with the base fields
  113. manager := &ProtocolManager{
  114. lightSync: lightSync,
  115. eventMux: mux,
  116. blockchain: blockchain,
  117. chainConfig: chainConfig,
  118. iConfig: indexerConfig,
  119. chainDb: chainDb,
  120. odr: odr,
  121. networkId: networkId,
  122. txpool: txpool,
  123. txrelay: txrelay,
  124. serverPool: serverPool,
  125. peers: peers,
  126. newPeerCh: make(chan *peer),
  127. quitSync: quitSync,
  128. wg: wg,
  129. noMorePeers: make(chan struct{}),
  130. }
  131. if odr != nil {
  132. manager.retriever = odr.retriever
  133. manager.reqDist = odr.retriever.dist
  134. }
  135. removePeer := manager.removePeer
  136. if disableClientRemovePeer {
  137. removePeer = func(id string) {}
  138. }
  139. if lightSync {
  140. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
  141. manager.peers.notify((*downloaderPeerNotify)(manager))
  142. manager.fetcher = newLightFetcher(manager)
  143. }
  144. return manager, nil
  145. }
  146. // removePeer initiates disconnection from a peer by removing it from the peer set
  147. func (pm *ProtocolManager) removePeer(id string) {
  148. pm.peers.Unregister(id)
  149. }
  150. func (pm *ProtocolManager) Start(maxPeers int) {
  151. pm.maxPeers = maxPeers
  152. if pm.lightSync {
  153. go pm.syncer()
  154. } else {
  155. pm.clientPool = newFreeClientPool(pm.chainDb, maxPeers, 10000, mclock.System{})
  156. go func() {
  157. for range pm.newPeerCh {
  158. }
  159. }()
  160. }
  161. }
  162. func (pm *ProtocolManager) Stop() {
  163. // Showing a log message. During download / process this could actually
  164. // take between 5 to 10 seconds and therefor feedback is required.
  165. log.Info("Stopping light Ethereum protocol")
  166. // Quit the sync loop.
  167. // After this send has completed, no new peers will be accepted.
  168. pm.noMorePeers <- struct{}{}
  169. close(pm.quitSync) // quits syncer, fetcher
  170. if pm.clientPool != nil {
  171. pm.clientPool.stop()
  172. }
  173. // Disconnect existing sessions.
  174. // This also closes the gate for any new registrations on the peer set.
  175. // sessions which are already established but not added to pm.peers yet
  176. // will exit when they try to register.
  177. pm.peers.Close()
  178. // Wait for any process action
  179. pm.wg.Wait()
  180. log.Info("Light Ethereum protocol stopped")
  181. }
  182. // runPeer is the p2p protocol run function for the given version.
  183. func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
  184. var entry *poolEntry
  185. peer := pm.newPeer(int(version), pm.networkId, p, rw)
  186. if pm.serverPool != nil {
  187. entry = pm.serverPool.connect(peer, peer.Node())
  188. }
  189. peer.poolEntry = entry
  190. select {
  191. case pm.newPeerCh <- peer:
  192. pm.wg.Add(1)
  193. defer pm.wg.Done()
  194. err := pm.handle(peer)
  195. if entry != nil {
  196. pm.serverPool.disconnect(entry)
  197. }
  198. return err
  199. case <-pm.quitSync:
  200. if entry != nil {
  201. pm.serverPool.disconnect(entry)
  202. }
  203. return p2p.DiscQuitting
  204. }
  205. }
  206. func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  207. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  208. }
  209. // handle is the callback invoked to manage the life cycle of a les peer. When
  210. // this function terminates, the peer is disconnected.
  211. func (pm *ProtocolManager) handle(p *peer) error {
  212. // Ignore maxPeers if this is a trusted peer
  213. // In server mode we try to check into the client pool after handshake
  214. if pm.lightSync && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
  215. return p2p.DiscTooManyPeers
  216. }
  217. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  218. // Execute the LES handshake
  219. var (
  220. genesis = pm.blockchain.Genesis()
  221. head = pm.blockchain.CurrentHeader()
  222. hash = head.Hash()
  223. number = head.Number.Uint64()
  224. td = pm.blockchain.GetTd(hash, number)
  225. )
  226. if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil {
  227. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  228. return err
  229. }
  230. if !pm.lightSync && !p.Peer.Info().Network.Trusted {
  231. addr, ok := p.RemoteAddr().(*net.TCPAddr)
  232. // test peer address is not a tcp address, don't use client pool if can not typecast
  233. if ok {
  234. id := addr.IP.String()
  235. if !pm.clientPool.connect(id, func() { go pm.removePeer(p.id) }) {
  236. return p2p.DiscTooManyPeers
  237. }
  238. defer pm.clientPool.disconnect(id)
  239. }
  240. }
  241. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  242. rw.Init(p.version)
  243. }
  244. // Register the peer locally
  245. if err := pm.peers.Register(p); err != nil {
  246. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  247. return err
  248. }
  249. defer func() {
  250. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  251. p.fcClient.Remove(pm.server.fcManager)
  252. }
  253. pm.removePeer(p.id)
  254. }()
  255. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  256. if pm.lightSync {
  257. p.lock.Lock()
  258. head := p.headInfo
  259. p.lock.Unlock()
  260. if pm.fetcher != nil {
  261. pm.fetcher.announce(p, head)
  262. }
  263. if p.poolEntry != nil {
  264. pm.serverPool.registered(p.poolEntry)
  265. }
  266. }
  267. stop := make(chan struct{})
  268. defer close(stop)
  269. go func() {
  270. // new block announce loop
  271. for {
  272. select {
  273. case announce := <-p.announceChn:
  274. p.SendAnnounce(announce)
  275. case <-stop:
  276. return
  277. }
  278. }
  279. }()
  280. // main loop. handle incoming messages.
  281. for {
  282. if err := pm.handleMsg(p); err != nil {
  283. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  284. return err
  285. }
  286. }
  287. }
  288. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
  289. // handleMsg is invoked whenever an inbound message is received from a remote
  290. // peer. The remote connection is torn down upon returning any error.
  291. func (pm *ProtocolManager) handleMsg(p *peer) error {
  292. // Read the next message from the remote peer, and ensure it's fully consumed
  293. msg, err := p.rw.ReadMsg()
  294. if err != nil {
  295. return err
  296. }
  297. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  298. costs := p.fcCosts[msg.Code]
  299. reject := func(reqCnt, maxCnt uint64) bool {
  300. if p.fcClient == nil || reqCnt > maxCnt {
  301. return true
  302. }
  303. bufValue, _ := p.fcClient.AcceptRequest()
  304. cost := costs.baseCost + reqCnt*costs.reqCost
  305. if cost > pm.server.defParams.BufLimit {
  306. cost = pm.server.defParams.BufLimit
  307. }
  308. if cost > bufValue {
  309. recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
  310. p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
  311. return true
  312. }
  313. return false
  314. }
  315. if msg.Size > ProtocolMaxMsgSize {
  316. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  317. }
  318. defer msg.Discard()
  319. var deliverMsg *Msg
  320. // Handle the message depending on its contents
  321. switch msg.Code {
  322. case StatusMsg:
  323. p.Log().Trace("Received status message")
  324. // Status messages should never arrive after the handshake
  325. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  326. // Block header query, collect the requested headers and reply
  327. case AnnounceMsg:
  328. p.Log().Trace("Received announce message")
  329. if p.requestAnnounceType == announceTypeNone {
  330. return errResp(ErrUnexpectedResponse, "")
  331. }
  332. var req announceData
  333. if err := msg.Decode(&req); err != nil {
  334. return errResp(ErrDecode, "%v: %v", msg, err)
  335. }
  336. if p.requestAnnounceType == announceTypeSigned {
  337. if err := req.checkSignature(p.ID()); err != nil {
  338. p.Log().Trace("Invalid announcement signature", "err", err)
  339. return err
  340. }
  341. p.Log().Trace("Valid announcement signature")
  342. }
  343. p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
  344. if pm.fetcher != nil {
  345. pm.fetcher.announce(p, &req)
  346. }
  347. case GetBlockHeadersMsg:
  348. p.Log().Trace("Received block header request")
  349. // Decode the complex header query
  350. var req struct {
  351. ReqID uint64
  352. Query getBlockHeadersData
  353. }
  354. if err := msg.Decode(&req); err != nil {
  355. return errResp(ErrDecode, "%v: %v", msg, err)
  356. }
  357. query := req.Query
  358. if reject(query.Amount, MaxHeaderFetch) {
  359. return errResp(ErrRequestRejected, "")
  360. }
  361. hashMode := query.Origin.Hash != (common.Hash{})
  362. first := true
  363. maxNonCanonical := uint64(100)
  364. // Gather headers until the fetch or network limits is reached
  365. var (
  366. bytes common.StorageSize
  367. headers []*types.Header
  368. unknown bool
  369. )
  370. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  371. // Retrieve the next header satisfying the query
  372. var origin *types.Header
  373. if hashMode {
  374. if first {
  375. first = false
  376. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  377. if origin != nil {
  378. query.Origin.Number = origin.Number.Uint64()
  379. }
  380. } else {
  381. origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
  382. }
  383. } else {
  384. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  385. }
  386. if origin == nil {
  387. break
  388. }
  389. headers = append(headers, origin)
  390. bytes += estHeaderRlpSize
  391. // Advance to the next header of the query
  392. switch {
  393. case hashMode && query.Reverse:
  394. // Hash based traversal towards the genesis block
  395. ancestor := query.Skip + 1
  396. if ancestor == 0 {
  397. unknown = true
  398. } else {
  399. query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
  400. unknown = (query.Origin.Hash == common.Hash{})
  401. }
  402. case hashMode && !query.Reverse:
  403. // Hash based traversal towards the leaf block
  404. var (
  405. current = origin.Number.Uint64()
  406. next = current + query.Skip + 1
  407. )
  408. if next <= current {
  409. infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
  410. p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
  411. unknown = true
  412. } else {
  413. if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
  414. nextHash := header.Hash()
  415. expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
  416. if expOldHash == query.Origin.Hash {
  417. query.Origin.Hash, query.Origin.Number = nextHash, next
  418. } else {
  419. unknown = true
  420. }
  421. } else {
  422. unknown = true
  423. }
  424. }
  425. case query.Reverse:
  426. // Number based traversal towards the genesis block
  427. if query.Origin.Number >= query.Skip+1 {
  428. query.Origin.Number -= query.Skip + 1
  429. } else {
  430. unknown = true
  431. }
  432. case !query.Reverse:
  433. // Number based traversal towards the leaf block
  434. query.Origin.Number += query.Skip + 1
  435. }
  436. }
  437. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  438. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  439. return p.SendBlockHeaders(req.ReqID, bv, headers)
  440. case BlockHeadersMsg:
  441. if pm.downloader == nil {
  442. return errResp(ErrUnexpectedResponse, "")
  443. }
  444. p.Log().Trace("Received block header response message")
  445. // A batch of headers arrived to one of our previous requests
  446. var resp struct {
  447. ReqID, BV uint64
  448. Headers []*types.Header
  449. }
  450. if err := msg.Decode(&resp); err != nil {
  451. return errResp(ErrDecode, "msg %v: %v", msg, err)
  452. }
  453. p.fcServer.GotReply(resp.ReqID, resp.BV)
  454. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  455. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  456. } else {
  457. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  458. if err != nil {
  459. log.Debug(fmt.Sprint(err))
  460. }
  461. }
  462. case GetBlockBodiesMsg:
  463. p.Log().Trace("Received block bodies request")
  464. // Decode the retrieval message
  465. var req struct {
  466. ReqID uint64
  467. Hashes []common.Hash
  468. }
  469. if err := msg.Decode(&req); err != nil {
  470. return errResp(ErrDecode, "msg %v: %v", msg, err)
  471. }
  472. // Gather blocks until the fetch or network limits is reached
  473. var (
  474. bytes int
  475. bodies []rlp.RawValue
  476. )
  477. reqCnt := len(req.Hashes)
  478. if reject(uint64(reqCnt), MaxBodyFetch) {
  479. return errResp(ErrRequestRejected, "")
  480. }
  481. for _, hash := range req.Hashes {
  482. if bytes >= softResponseLimit {
  483. break
  484. }
  485. // Retrieve the requested block body, stopping if enough was found
  486. if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
  487. if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 {
  488. bodies = append(bodies, data)
  489. bytes += len(data)
  490. }
  491. }
  492. }
  493. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  494. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  495. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  496. case BlockBodiesMsg:
  497. if pm.odr == nil {
  498. return errResp(ErrUnexpectedResponse, "")
  499. }
  500. p.Log().Trace("Received block bodies response")
  501. // A batch of block bodies arrived to one of our previous requests
  502. var resp struct {
  503. ReqID, BV uint64
  504. Data []*types.Body
  505. }
  506. if err := msg.Decode(&resp); err != nil {
  507. return errResp(ErrDecode, "msg %v: %v", msg, err)
  508. }
  509. p.fcServer.GotReply(resp.ReqID, resp.BV)
  510. deliverMsg = &Msg{
  511. MsgType: MsgBlockBodies,
  512. ReqID: resp.ReqID,
  513. Obj: resp.Data,
  514. }
  515. case GetCodeMsg:
  516. p.Log().Trace("Received code request")
  517. // Decode the retrieval message
  518. var req struct {
  519. ReqID uint64
  520. Reqs []CodeReq
  521. }
  522. if err := msg.Decode(&req); err != nil {
  523. return errResp(ErrDecode, "msg %v: %v", msg, err)
  524. }
  525. // Gather state data until the fetch or network limits is reached
  526. var (
  527. bytes int
  528. data [][]byte
  529. )
  530. reqCnt := len(req.Reqs)
  531. if reject(uint64(reqCnt), MaxCodeFetch) {
  532. return errResp(ErrRequestRejected, "")
  533. }
  534. for _, req := range req.Reqs {
  535. // Retrieve the requested state entry, stopping if enough was found
  536. if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
  537. if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
  538. statedb, err := pm.blockchain.State()
  539. if err != nil {
  540. continue
  541. }
  542. account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
  543. if err != nil {
  544. continue
  545. }
  546. code, _ := statedb.Database().TrieDB().Node(common.BytesToHash(account.CodeHash))
  547. data = append(data, code)
  548. if bytes += len(code); bytes >= softResponseLimit {
  549. break
  550. }
  551. }
  552. }
  553. }
  554. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  555. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  556. return p.SendCode(req.ReqID, bv, data)
  557. case CodeMsg:
  558. if pm.odr == nil {
  559. return errResp(ErrUnexpectedResponse, "")
  560. }
  561. p.Log().Trace("Received code response")
  562. // A batch of node state data arrived to one of our previous requests
  563. var resp struct {
  564. ReqID, BV uint64
  565. Data [][]byte
  566. }
  567. if err := msg.Decode(&resp); err != nil {
  568. return errResp(ErrDecode, "msg %v: %v", msg, err)
  569. }
  570. p.fcServer.GotReply(resp.ReqID, resp.BV)
  571. deliverMsg = &Msg{
  572. MsgType: MsgCode,
  573. ReqID: resp.ReqID,
  574. Obj: resp.Data,
  575. }
  576. case GetReceiptsMsg:
  577. p.Log().Trace("Received receipts request")
  578. // Decode the retrieval message
  579. var req struct {
  580. ReqID uint64
  581. Hashes []common.Hash
  582. }
  583. if err := msg.Decode(&req); err != nil {
  584. return errResp(ErrDecode, "msg %v: %v", msg, err)
  585. }
  586. // Gather state data until the fetch or network limits is reached
  587. var (
  588. bytes int
  589. receipts []rlp.RawValue
  590. )
  591. reqCnt := len(req.Hashes)
  592. if reject(uint64(reqCnt), MaxReceiptFetch) {
  593. return errResp(ErrRequestRejected, "")
  594. }
  595. for _, hash := range req.Hashes {
  596. if bytes >= softResponseLimit {
  597. break
  598. }
  599. // Retrieve the requested block's receipts, skipping if unknown to us
  600. var results types.Receipts
  601. if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil {
  602. results = rawdb.ReadReceipts(pm.chainDb, hash, *number)
  603. }
  604. if results == nil {
  605. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  606. continue
  607. }
  608. }
  609. // If known, encode and queue for response packet
  610. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  611. log.Error("Failed to encode receipt", "err", err)
  612. } else {
  613. receipts = append(receipts, encoded)
  614. bytes += len(encoded)
  615. }
  616. }
  617. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  618. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  619. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  620. case ReceiptsMsg:
  621. if pm.odr == nil {
  622. return errResp(ErrUnexpectedResponse, "")
  623. }
  624. p.Log().Trace("Received receipts response")
  625. // A batch of receipts arrived to one of our previous requests
  626. var resp struct {
  627. ReqID, BV uint64
  628. Receipts []types.Receipts
  629. }
  630. if err := msg.Decode(&resp); err != nil {
  631. return errResp(ErrDecode, "msg %v: %v", msg, err)
  632. }
  633. p.fcServer.GotReply(resp.ReqID, resp.BV)
  634. deliverMsg = &Msg{
  635. MsgType: MsgReceipts,
  636. ReqID: resp.ReqID,
  637. Obj: resp.Receipts,
  638. }
  639. case GetProofsV1Msg:
  640. p.Log().Trace("Received proofs request")
  641. // Decode the retrieval message
  642. var req struct {
  643. ReqID uint64
  644. Reqs []ProofReq
  645. }
  646. if err := msg.Decode(&req); err != nil {
  647. return errResp(ErrDecode, "msg %v: %v", msg, err)
  648. }
  649. // Gather state data until the fetch or network limits is reached
  650. var (
  651. bytes int
  652. proofs proofsData
  653. )
  654. reqCnt := len(req.Reqs)
  655. if reject(uint64(reqCnt), MaxProofsFetch) {
  656. return errResp(ErrRequestRejected, "")
  657. }
  658. for _, req := range req.Reqs {
  659. // Retrieve the requested state entry, stopping if enough was found
  660. if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
  661. if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
  662. statedb, err := pm.blockchain.State()
  663. if err != nil {
  664. continue
  665. }
  666. var trie state.Trie
  667. if len(req.AccKey) > 0 {
  668. account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
  669. if err != nil {
  670. continue
  671. }
  672. trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
  673. } else {
  674. trie, _ = statedb.Database().OpenTrie(header.Root)
  675. }
  676. if trie != nil {
  677. var proof light.NodeList
  678. trie.Prove(req.Key, 0, &proof)
  679. proofs = append(proofs, proof)
  680. if bytes += proof.DataSize(); bytes >= softResponseLimit {
  681. break
  682. }
  683. }
  684. }
  685. }
  686. }
  687. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  688. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  689. return p.SendProofs(req.ReqID, bv, proofs)
  690. case GetProofsV2Msg:
  691. p.Log().Trace("Received les/2 proofs request")
  692. // Decode the retrieval message
  693. var req struct {
  694. ReqID uint64
  695. Reqs []ProofReq
  696. }
  697. if err := msg.Decode(&req); err != nil {
  698. return errResp(ErrDecode, "msg %v: %v", msg, err)
  699. }
  700. // Gather state data until the fetch or network limits is reached
  701. var (
  702. lastBHash common.Hash
  703. statedb *state.StateDB
  704. root common.Hash
  705. )
  706. reqCnt := len(req.Reqs)
  707. if reject(uint64(reqCnt), MaxProofsFetch) {
  708. return errResp(ErrRequestRejected, "")
  709. }
  710. nodes := light.NewNodeSet()
  711. for _, req := range req.Reqs {
  712. // Look up the state belonging to the request
  713. if statedb == nil || req.BHash != lastBHash {
  714. statedb, root, lastBHash = nil, common.Hash{}, req.BHash
  715. if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil {
  716. if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil {
  717. statedb, _ = pm.blockchain.State()
  718. root = header.Root
  719. }
  720. }
  721. }
  722. if statedb == nil {
  723. continue
  724. }
  725. // Pull the account or storage trie of the request
  726. var trie state.Trie
  727. if len(req.AccKey) > 0 {
  728. account, err := pm.getAccount(statedb, root, common.BytesToHash(req.AccKey))
  729. if err != nil {
  730. continue
  731. }
  732. trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
  733. } else {
  734. trie, _ = statedb.Database().OpenTrie(root)
  735. }
  736. if trie == nil {
  737. continue
  738. }
  739. // Prove the user's request from the account or stroage trie
  740. trie.Prove(req.Key, req.FromLevel, nodes)
  741. if nodes.DataSize() >= softResponseLimit {
  742. break
  743. }
  744. }
  745. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  746. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  747. return p.SendProofsV2(req.ReqID, bv, nodes.NodeList())
  748. case ProofsV1Msg:
  749. if pm.odr == nil {
  750. return errResp(ErrUnexpectedResponse, "")
  751. }
  752. p.Log().Trace("Received proofs response")
  753. // A batch of merkle proofs arrived to one of our previous requests
  754. var resp struct {
  755. ReqID, BV uint64
  756. Data []light.NodeList
  757. }
  758. if err := msg.Decode(&resp); err != nil {
  759. return errResp(ErrDecode, "msg %v: %v", msg, err)
  760. }
  761. p.fcServer.GotReply(resp.ReqID, resp.BV)
  762. deliverMsg = &Msg{
  763. MsgType: MsgProofsV1,
  764. ReqID: resp.ReqID,
  765. Obj: resp.Data,
  766. }
  767. case ProofsV2Msg:
  768. if pm.odr == nil {
  769. return errResp(ErrUnexpectedResponse, "")
  770. }
  771. p.Log().Trace("Received les/2 proofs response")
  772. // A batch of merkle proofs arrived to one of our previous requests
  773. var resp struct {
  774. ReqID, BV uint64
  775. Data light.NodeList
  776. }
  777. if err := msg.Decode(&resp); err != nil {
  778. return errResp(ErrDecode, "msg %v: %v", msg, err)
  779. }
  780. p.fcServer.GotReply(resp.ReqID, resp.BV)
  781. deliverMsg = &Msg{
  782. MsgType: MsgProofsV2,
  783. ReqID: resp.ReqID,
  784. Obj: resp.Data,
  785. }
  786. case GetHeaderProofsMsg:
  787. p.Log().Trace("Received headers proof request")
  788. // Decode the retrieval message
  789. var req struct {
  790. ReqID uint64
  791. Reqs []ChtReq
  792. }
  793. if err := msg.Decode(&req); err != nil {
  794. return errResp(ErrDecode, "msg %v: %v", msg, err)
  795. }
  796. // Gather state data until the fetch or network limits is reached
  797. var (
  798. bytes int
  799. proofs []ChtResp
  800. )
  801. reqCnt := len(req.Reqs)
  802. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  803. return errResp(ErrRequestRejected, "")
  804. }
  805. trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
  806. for _, req := range req.Reqs {
  807. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  808. sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*pm.iConfig.ChtSize-1)
  809. if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
  810. trie, err := trie.New(root, trieDb)
  811. if err != nil {
  812. continue
  813. }
  814. var encNumber [8]byte
  815. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  816. var proof light.NodeList
  817. trie.Prove(encNumber[:], 0, &proof)
  818. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  819. if bytes += proof.DataSize() + estHeaderRlpSize; bytes >= softResponseLimit {
  820. break
  821. }
  822. }
  823. }
  824. }
  825. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  826. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  827. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  828. case GetHelperTrieProofsMsg:
  829. p.Log().Trace("Received helper trie proof request")
  830. // Decode the retrieval message
  831. var req struct {
  832. ReqID uint64
  833. Reqs []HelperTrieReq
  834. }
  835. if err := msg.Decode(&req); err != nil {
  836. return errResp(ErrDecode, "msg %v: %v", msg, err)
  837. }
  838. // Gather state data until the fetch or network limits is reached
  839. var (
  840. auxBytes int
  841. auxData [][]byte
  842. )
  843. reqCnt := len(req.Reqs)
  844. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  845. return errResp(ErrRequestRejected, "")
  846. }
  847. var (
  848. lastIdx uint64
  849. lastType uint
  850. root common.Hash
  851. auxTrie *trie.Trie
  852. )
  853. nodes := light.NewNodeSet()
  854. for _, req := range req.Reqs {
  855. if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx {
  856. auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx
  857. var prefix string
  858. if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) {
  859. auxTrie, _ = trie.New(root, trie.NewDatabase(ethdb.NewTable(pm.chainDb, prefix)))
  860. }
  861. }
  862. if req.AuxReq == auxRoot {
  863. var data []byte
  864. if root != (common.Hash{}) {
  865. data = root[:]
  866. }
  867. auxData = append(auxData, data)
  868. auxBytes += len(data)
  869. } else {
  870. if auxTrie != nil {
  871. auxTrie.Prove(req.Key, req.FromLevel, nodes)
  872. }
  873. if req.AuxReq != 0 {
  874. data := pm.getHelperTrieAuxData(req)
  875. auxData = append(auxData, data)
  876. auxBytes += len(data)
  877. }
  878. }
  879. if nodes.DataSize()+auxBytes >= softResponseLimit {
  880. break
  881. }
  882. }
  883. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  884. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  885. return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
  886. case HeaderProofsMsg:
  887. if pm.odr == nil {
  888. return errResp(ErrUnexpectedResponse, "")
  889. }
  890. p.Log().Trace("Received headers proof response")
  891. var resp struct {
  892. ReqID, BV uint64
  893. Data []ChtResp
  894. }
  895. if err := msg.Decode(&resp); err != nil {
  896. return errResp(ErrDecode, "msg %v: %v", msg, err)
  897. }
  898. p.fcServer.GotReply(resp.ReqID, resp.BV)
  899. deliverMsg = &Msg{
  900. MsgType: MsgHeaderProofs,
  901. ReqID: resp.ReqID,
  902. Obj: resp.Data,
  903. }
  904. case HelperTrieProofsMsg:
  905. if pm.odr == nil {
  906. return errResp(ErrUnexpectedResponse, "")
  907. }
  908. p.Log().Trace("Received helper trie proof response")
  909. var resp struct {
  910. ReqID, BV uint64
  911. Data HelperTrieResps
  912. }
  913. if err := msg.Decode(&resp); err != nil {
  914. return errResp(ErrDecode, "msg %v: %v", msg, err)
  915. }
  916. p.fcServer.GotReply(resp.ReqID, resp.BV)
  917. deliverMsg = &Msg{
  918. MsgType: MsgHelperTrieProofs,
  919. ReqID: resp.ReqID,
  920. Obj: resp.Data,
  921. }
  922. case SendTxMsg:
  923. if pm.txpool == nil {
  924. return errResp(ErrRequestRejected, "")
  925. }
  926. // Transactions arrived, parse all of them and deliver to the pool
  927. var txs []*types.Transaction
  928. if err := msg.Decode(&txs); err != nil {
  929. return errResp(ErrDecode, "msg %v: %v", msg, err)
  930. }
  931. reqCnt := len(txs)
  932. if reject(uint64(reqCnt), MaxTxSend) {
  933. return errResp(ErrRequestRejected, "")
  934. }
  935. pm.txpool.AddRemotes(txs)
  936. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  937. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  938. case SendTxV2Msg:
  939. if pm.txpool == nil {
  940. return errResp(ErrRequestRejected, "")
  941. }
  942. // Transactions arrived, parse all of them and deliver to the pool
  943. var req struct {
  944. ReqID uint64
  945. Txs []*types.Transaction
  946. }
  947. if err := msg.Decode(&req); err != nil {
  948. return errResp(ErrDecode, "msg %v: %v", msg, err)
  949. }
  950. reqCnt := len(req.Txs)
  951. if reject(uint64(reqCnt), MaxTxSend) {
  952. return errResp(ErrRequestRejected, "")
  953. }
  954. hashes := make([]common.Hash, len(req.Txs))
  955. for i, tx := range req.Txs {
  956. hashes[i] = tx.Hash()
  957. }
  958. stats := pm.txStatus(hashes)
  959. for i, stat := range stats {
  960. if stat.Status == core.TxStatusUnknown {
  961. if errs := pm.txpool.AddRemotes([]*types.Transaction{req.Txs[i]}); errs[0] != nil {
  962. stats[i].Error = errs[0].Error()
  963. continue
  964. }
  965. stats[i] = pm.txStatus([]common.Hash{hashes[i]})[0]
  966. }
  967. }
  968. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  969. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  970. return p.SendTxStatus(req.ReqID, bv, stats)
  971. case GetTxStatusMsg:
  972. if pm.txpool == nil {
  973. return errResp(ErrUnexpectedResponse, "")
  974. }
  975. // Transactions arrived, parse all of them and deliver to the pool
  976. var req struct {
  977. ReqID uint64
  978. Hashes []common.Hash
  979. }
  980. if err := msg.Decode(&req); err != nil {
  981. return errResp(ErrDecode, "msg %v: %v", msg, err)
  982. }
  983. reqCnt := len(req.Hashes)
  984. if reject(uint64(reqCnt), MaxTxStatus) {
  985. return errResp(ErrRequestRejected, "")
  986. }
  987. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  988. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  989. return p.SendTxStatus(req.ReqID, bv, pm.txStatus(req.Hashes))
  990. case TxStatusMsg:
  991. if pm.odr == nil {
  992. return errResp(ErrUnexpectedResponse, "")
  993. }
  994. p.Log().Trace("Received tx status response")
  995. var resp struct {
  996. ReqID, BV uint64
  997. Status []txStatus
  998. }
  999. if err := msg.Decode(&resp); err != nil {
  1000. return errResp(ErrDecode, "msg %v: %v", msg, err)
  1001. }
  1002. p.fcServer.GotReply(resp.ReqID, resp.BV)
  1003. default:
  1004. p.Log().Trace("Received unknown message", "code", msg.Code)
  1005. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  1006. }
  1007. if deliverMsg != nil {
  1008. err := pm.retriever.deliver(p, deliverMsg)
  1009. if err != nil {
  1010. p.responseErrors++
  1011. if p.responseErrors > maxResponseErrors {
  1012. return err
  1013. }
  1014. }
  1015. }
  1016. return nil
  1017. }
  1018. // getAccount retrieves an account from the state based at root.
  1019. func (pm *ProtocolManager) getAccount(statedb *state.StateDB, root, hash common.Hash) (state.Account, error) {
  1020. trie, err := trie.New(root, statedb.Database().TrieDB())
  1021. if err != nil {
  1022. return state.Account{}, err
  1023. }
  1024. blob, err := trie.TryGet(hash[:])
  1025. if err != nil {
  1026. return state.Account{}, err
  1027. }
  1028. var account state.Account
  1029. if err = rlp.DecodeBytes(blob, &account); err != nil {
  1030. return state.Account{}, err
  1031. }
  1032. return account, nil
  1033. }
  1034. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  1035. func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
  1036. switch id {
  1037. case htCanonical:
  1038. idxV1 := (idx+1)*(pm.iConfig.PairChtSize/pm.iConfig.ChtSize) - 1
  1039. sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idxV1+1)*pm.iConfig.ChtSize-1)
  1040. return light.GetChtRoot(pm.chainDb, idxV1, sectionHead), light.ChtTablePrefix
  1041. case htBloomBits:
  1042. sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*pm.iConfig.BloomTrieSize-1)
  1043. return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
  1044. }
  1045. return common.Hash{}, ""
  1046. }
  1047. // getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request
  1048. func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
  1049. if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 {
  1050. blockNum := binary.BigEndian.Uint64(req.Key)
  1051. hash := rawdb.ReadCanonicalHash(pm.chainDb, blockNum)
  1052. return rawdb.ReadHeaderRLP(pm.chainDb, hash, blockNum)
  1053. }
  1054. return nil
  1055. }
  1056. func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus {
  1057. stats := make([]txStatus, len(hashes))
  1058. for i, stat := range pm.txpool.Status(hashes) {
  1059. // Save the status we've got from the transaction pool
  1060. stats[i].Status = stat
  1061. // If the transaction is unknown to the pool, try looking it up locally
  1062. if stat == core.TxStatusUnknown {
  1063. if block, number, index := rawdb.ReadTxLookupEntry(pm.chainDb, hashes[i]); block != (common.Hash{}) {
  1064. stats[i].Status = core.TxStatusIncluded
  1065. stats[i].Lookup = &rawdb.TxLookupEntry{BlockHash: block, BlockIndex: number, Index: index}
  1066. }
  1067. }
  1068. }
  1069. return stats
  1070. }
  1071. // downloaderPeerNotify implements peerSetNotify
  1072. type downloaderPeerNotify ProtocolManager
  1073. type peerConnection struct {
  1074. manager *ProtocolManager
  1075. peer *peer
  1076. }
  1077. func (pc *peerConnection) Head() (common.Hash, *big.Int) {
  1078. return pc.peer.HeadAndTd()
  1079. }
  1080. func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  1081. reqID := genReqID()
  1082. rq := &distReq{
  1083. getCost: func(dp distPeer) uint64 {
  1084. peer := dp.(*peer)
  1085. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1086. },
  1087. canSend: func(dp distPeer) bool {
  1088. return dp.(*peer) == pc.peer
  1089. },
  1090. request: func(dp distPeer) func() {
  1091. peer := dp.(*peer)
  1092. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1093. peer.fcServer.QueueRequest(reqID, cost)
  1094. return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
  1095. },
  1096. }
  1097. _, ok := <-pc.manager.reqDist.queue(rq)
  1098. if !ok {
  1099. return light.ErrNoPeers
  1100. }
  1101. return nil
  1102. }
  1103. func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  1104. reqID := genReqID()
  1105. rq := &distReq{
  1106. getCost: func(dp distPeer) uint64 {
  1107. peer := dp.(*peer)
  1108. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1109. },
  1110. canSend: func(dp distPeer) bool {
  1111. return dp.(*peer) == pc.peer
  1112. },
  1113. request: func(dp distPeer) func() {
  1114. peer := dp.(*peer)
  1115. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1116. peer.fcServer.QueueRequest(reqID, cost)
  1117. return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
  1118. },
  1119. }
  1120. _, ok := <-pc.manager.reqDist.queue(rq)
  1121. if !ok {
  1122. return light.ErrNoPeers
  1123. }
  1124. return nil
  1125. }
  1126. func (d *downloaderPeerNotify) registerPeer(p *peer) {
  1127. pm := (*ProtocolManager)(d)
  1128. pc := &peerConnection{
  1129. manager: pm,
  1130. peer: p,
  1131. }
  1132. pm.downloader.RegisterLightPeer(p.id, ethVersion, pc)
  1133. }
  1134. func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
  1135. pm := (*ProtocolManager)(d)
  1136. pm.downloader.UnregisterPeer(p.id)
  1137. }