handler.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "bytes"
  20. "encoding/binary"
  21. "errors"
  22. "fmt"
  23. "math/big"
  24. "net"
  25. "sync"
  26. "time"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/consensus"
  29. "github.com/ethereum/go-ethereum/core"
  30. "github.com/ethereum/go-ethereum/core/state"
  31. "github.com/ethereum/go-ethereum/core/types"
  32. "github.com/ethereum/go-ethereum/eth/downloader"
  33. "github.com/ethereum/go-ethereum/ethdb"
  34. "github.com/ethereum/go-ethereum/event"
  35. "github.com/ethereum/go-ethereum/light"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/p2p"
  38. "github.com/ethereum/go-ethereum/p2p/discover"
  39. "github.com/ethereum/go-ethereum/p2p/discv5"
  40. "github.com/ethereum/go-ethereum/params"
  41. "github.com/ethereum/go-ethereum/rlp"
  42. "github.com/ethereum/go-ethereum/trie"
  43. )
  44. const (
  45. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  46. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  47. ethVersion = 63 // equivalent eth version for the downloader
  48. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  49. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  50. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  51. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  52. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  54. MaxTxSend = 64 // Amount of transactions to be send per request
  55. MaxTxStatus = 256 // Amount of transactions to queried per request
  56. disableClientRemovePeer = false
  57. )
  58. // errIncompatibleConfig is returned if the requested protocols and configs are
  59. // not compatible (low protocol version restrictions and high requirements).
  60. var errIncompatibleConfig = errors.New("incompatible configuration")
  61. func errResp(code errCode, format string, v ...interface{}) error {
  62. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  63. }
  64. type BlockChain interface {
  65. Config() *params.ChainConfig
  66. HasHeader(hash common.Hash, number uint64) bool
  67. GetHeader(hash common.Hash, number uint64) *types.Header
  68. GetHeaderByHash(hash common.Hash) *types.Header
  69. CurrentHeader() *types.Header
  70. GetTdByHash(hash common.Hash) *big.Int
  71. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  72. Rollback(chain []common.Hash)
  73. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  74. GetHeaderByNumber(number uint64) *types.Header
  75. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  76. LastBlockHash() common.Hash
  77. Genesis() *types.Block
  78. SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
  79. }
  80. type txPool interface {
  81. AddRemotes(txs []*types.Transaction) []error
  82. Status(hashes []common.Hash) []core.TxStatus
  83. }
  84. type ProtocolManager struct {
  85. lightSync bool
  86. txpool txPool
  87. txrelay *LesTxRelay
  88. networkId uint64
  89. chainConfig *params.ChainConfig
  90. blockchain BlockChain
  91. chainDb ethdb.Database
  92. odr *LesOdr
  93. server *LesServer
  94. serverPool *serverPool
  95. lesTopic discv5.Topic
  96. reqDist *requestDistributor
  97. retriever *retrieveManager
  98. downloader *downloader.Downloader
  99. fetcher *lightFetcher
  100. peers *peerSet
  101. SubProtocols []p2p.Protocol
  102. eventMux *event.TypeMux
  103. // channels for fetcher, syncer, txsyncLoop
  104. newPeerCh chan *peer
  105. quitSync chan struct{}
  106. noMorePeers chan struct{}
  107. // wait group is used for graceful shutdowns during downloading
  108. // and processing
  109. wg *sync.WaitGroup
  110. }
  111. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  112. // with the ethereum network.
  113. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protocolVersions []uint, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
  114. // Create the protocol manager with the base fields
  115. manager := &ProtocolManager{
  116. lightSync: lightSync,
  117. eventMux: mux,
  118. blockchain: blockchain,
  119. chainConfig: chainConfig,
  120. chainDb: chainDb,
  121. odr: odr,
  122. networkId: networkId,
  123. txpool: txpool,
  124. txrelay: txrelay,
  125. peers: peers,
  126. newPeerCh: make(chan *peer),
  127. quitSync: quitSync,
  128. wg: wg,
  129. noMorePeers: make(chan struct{}),
  130. }
  131. if odr != nil {
  132. manager.retriever = odr.retriever
  133. manager.reqDist = odr.retriever.dist
  134. }
  135. // Initiate a sub-protocol for every implemented version we can handle
  136. manager.SubProtocols = make([]p2p.Protocol, 0, len(protocolVersions))
  137. for _, version := range protocolVersions {
  138. // Compatible, initialize the sub-protocol
  139. version := version // Closure for the run
  140. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  141. Name: "les",
  142. Version: version,
  143. Length: ProtocolLengths[version],
  144. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  145. var entry *poolEntry
  146. peer := manager.newPeer(int(version), networkId, p, rw)
  147. if manager.serverPool != nil {
  148. addr := p.RemoteAddr().(*net.TCPAddr)
  149. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  150. }
  151. peer.poolEntry = entry
  152. select {
  153. case manager.newPeerCh <- peer:
  154. manager.wg.Add(1)
  155. defer manager.wg.Done()
  156. err := manager.handle(peer)
  157. if entry != nil {
  158. manager.serverPool.disconnect(entry)
  159. }
  160. return err
  161. case <-manager.quitSync:
  162. if entry != nil {
  163. manager.serverPool.disconnect(entry)
  164. }
  165. return p2p.DiscQuitting
  166. }
  167. },
  168. NodeInfo: func() interface{} {
  169. return manager.NodeInfo()
  170. },
  171. PeerInfo: func(id discover.NodeID) interface{} {
  172. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  173. return p.Info()
  174. }
  175. return nil
  176. },
  177. })
  178. }
  179. if len(manager.SubProtocols) == 0 {
  180. return nil, errIncompatibleConfig
  181. }
  182. removePeer := manager.removePeer
  183. if disableClientRemovePeer {
  184. removePeer = func(id string) {}
  185. }
  186. if lightSync {
  187. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
  188. manager.peers.notify((*downloaderPeerNotify)(manager))
  189. manager.fetcher = newLightFetcher(manager)
  190. }
  191. return manager, nil
  192. }
  193. // removePeer initiates disconnection from a peer by removing it from the peer set
  194. func (pm *ProtocolManager) removePeer(id string) {
  195. pm.peers.Unregister(id)
  196. }
  197. func (pm *ProtocolManager) Start() {
  198. if pm.lightSync {
  199. go pm.syncer()
  200. } else {
  201. go func() {
  202. for range pm.newPeerCh {
  203. }
  204. }()
  205. }
  206. }
  207. func (pm *ProtocolManager) Stop() {
  208. // Showing a log message. During download / process this could actually
  209. // take between 5 to 10 seconds and therefor feedback is required.
  210. log.Info("Stopping light Ethereum protocol")
  211. // Quit the sync loop.
  212. // After this send has completed, no new peers will be accepted.
  213. pm.noMorePeers <- struct{}{}
  214. close(pm.quitSync) // quits syncer, fetcher
  215. // Disconnect existing sessions.
  216. // This also closes the gate for any new registrations on the peer set.
  217. // sessions which are already established but not added to pm.peers yet
  218. // will exit when they try to register.
  219. pm.peers.Close()
  220. // Wait for any process action
  221. pm.wg.Wait()
  222. log.Info("Light Ethereum protocol stopped")
  223. }
  224. func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  225. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  226. }
  227. // handle is the callback invoked to manage the life cycle of a les peer. When
  228. // this function terminates, the peer is disconnected.
  229. func (pm *ProtocolManager) handle(p *peer) error {
  230. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  231. // Execute the LES handshake
  232. td, head, genesis := pm.blockchain.Status()
  233. headNum := core.GetBlockNumber(pm.chainDb, head)
  234. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  235. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  236. return err
  237. }
  238. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  239. rw.Init(p.version)
  240. }
  241. // Register the peer locally
  242. if err := pm.peers.Register(p); err != nil {
  243. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  244. return err
  245. }
  246. defer func() {
  247. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  248. p.fcClient.Remove(pm.server.fcManager)
  249. }
  250. pm.removePeer(p.id)
  251. }()
  252. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  253. if pm.lightSync {
  254. p.lock.Lock()
  255. head := p.headInfo
  256. p.lock.Unlock()
  257. if pm.fetcher != nil {
  258. pm.fetcher.announce(p, head)
  259. }
  260. if p.poolEntry != nil {
  261. pm.serverPool.registered(p.poolEntry)
  262. }
  263. }
  264. stop := make(chan struct{})
  265. defer close(stop)
  266. go func() {
  267. // new block announce loop
  268. for {
  269. select {
  270. case announce := <-p.announceChn:
  271. p.SendAnnounce(announce)
  272. case <-stop:
  273. return
  274. }
  275. }
  276. }()
  277. // main loop. handle incoming messages.
  278. for {
  279. if err := pm.handleMsg(p); err != nil {
  280. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  281. return err
  282. }
  283. }
  284. }
  285. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
  286. // handleMsg is invoked whenever an inbound message is received from a remote
  287. // peer. The remote connection is torn down upon returning any error.
  288. func (pm *ProtocolManager) handleMsg(p *peer) error {
  289. // Read the next message from the remote peer, and ensure it's fully consumed
  290. msg, err := p.rw.ReadMsg()
  291. if err != nil {
  292. return err
  293. }
  294. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  295. costs := p.fcCosts[msg.Code]
  296. reject := func(reqCnt, maxCnt uint64) bool {
  297. if p.fcClient == nil || reqCnt > maxCnt {
  298. return true
  299. }
  300. bufValue, _ := p.fcClient.AcceptRequest()
  301. cost := costs.baseCost + reqCnt*costs.reqCost
  302. if cost > pm.server.defParams.BufLimit {
  303. cost = pm.server.defParams.BufLimit
  304. }
  305. if cost > bufValue {
  306. recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
  307. p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
  308. return true
  309. }
  310. return false
  311. }
  312. if msg.Size > ProtocolMaxMsgSize {
  313. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  314. }
  315. defer msg.Discard()
  316. var deliverMsg *Msg
  317. // Handle the message depending on its contents
  318. switch msg.Code {
  319. case StatusMsg:
  320. p.Log().Trace("Received status message")
  321. // Status messages should never arrive after the handshake
  322. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  323. // Block header query, collect the requested headers and reply
  324. case AnnounceMsg:
  325. p.Log().Trace("Received announce message")
  326. if p.requestAnnounceType == announceTypeNone {
  327. return errResp(ErrUnexpectedResponse, "")
  328. }
  329. var req announceData
  330. if err := msg.Decode(&req); err != nil {
  331. return errResp(ErrDecode, "%v: %v", msg, err)
  332. }
  333. if p.requestAnnounceType == announceTypeSigned {
  334. if err := req.checkSignature(p.pubKey); err != nil {
  335. p.Log().Trace("Invalid announcement signature", "err", err)
  336. return err
  337. }
  338. p.Log().Trace("Valid announcement signature")
  339. }
  340. p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
  341. if pm.fetcher != nil {
  342. pm.fetcher.announce(p, &req)
  343. }
  344. case GetBlockHeadersMsg:
  345. p.Log().Trace("Received block header request")
  346. // Decode the complex header query
  347. var req struct {
  348. ReqID uint64
  349. Query getBlockHeadersData
  350. }
  351. if err := msg.Decode(&req); err != nil {
  352. return errResp(ErrDecode, "%v: %v", msg, err)
  353. }
  354. query := req.Query
  355. if reject(query.Amount, MaxHeaderFetch) {
  356. return errResp(ErrRequestRejected, "")
  357. }
  358. hashMode := query.Origin.Hash != (common.Hash{})
  359. // Gather headers until the fetch or network limits is reached
  360. var (
  361. bytes common.StorageSize
  362. headers []*types.Header
  363. unknown bool
  364. )
  365. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  366. // Retrieve the next header satisfying the query
  367. var origin *types.Header
  368. if hashMode {
  369. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  370. } else {
  371. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  372. }
  373. if origin == nil {
  374. break
  375. }
  376. number := origin.Number.Uint64()
  377. headers = append(headers, origin)
  378. bytes += estHeaderRlpSize
  379. // Advance to the next header of the query
  380. switch {
  381. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  382. // Hash based traversal towards the genesis block
  383. for i := 0; i < int(query.Skip)+1; i++ {
  384. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  385. query.Origin.Hash = header.ParentHash
  386. number--
  387. } else {
  388. unknown = true
  389. break
  390. }
  391. }
  392. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  393. // Hash based traversal towards the leaf block
  394. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  395. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  396. query.Origin.Hash = header.Hash()
  397. } else {
  398. unknown = true
  399. }
  400. } else {
  401. unknown = true
  402. }
  403. case query.Reverse:
  404. // Number based traversal towards the genesis block
  405. if query.Origin.Number >= query.Skip+1 {
  406. query.Origin.Number -= query.Skip + 1
  407. } else {
  408. unknown = true
  409. }
  410. case !query.Reverse:
  411. // Number based traversal towards the leaf block
  412. query.Origin.Number += query.Skip + 1
  413. }
  414. }
  415. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  416. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  417. return p.SendBlockHeaders(req.ReqID, bv, headers)
  418. case BlockHeadersMsg:
  419. if pm.downloader == nil {
  420. return errResp(ErrUnexpectedResponse, "")
  421. }
  422. p.Log().Trace("Received block header response message")
  423. // A batch of headers arrived to one of our previous requests
  424. var resp struct {
  425. ReqID, BV uint64
  426. Headers []*types.Header
  427. }
  428. if err := msg.Decode(&resp); err != nil {
  429. return errResp(ErrDecode, "msg %v: %v", msg, err)
  430. }
  431. p.fcServer.GotReply(resp.ReqID, resp.BV)
  432. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  433. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  434. } else {
  435. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  436. if err != nil {
  437. log.Debug(fmt.Sprint(err))
  438. }
  439. }
  440. case GetBlockBodiesMsg:
  441. p.Log().Trace("Received block bodies request")
  442. // Decode the retrieval message
  443. var req struct {
  444. ReqID uint64
  445. Hashes []common.Hash
  446. }
  447. if err := msg.Decode(&req); err != nil {
  448. return errResp(ErrDecode, "msg %v: %v", msg, err)
  449. }
  450. // Gather blocks until the fetch or network limits is reached
  451. var (
  452. bytes int
  453. bodies []rlp.RawValue
  454. )
  455. reqCnt := len(req.Hashes)
  456. if reject(uint64(reqCnt), MaxBodyFetch) {
  457. return errResp(ErrRequestRejected, "")
  458. }
  459. for _, hash := range req.Hashes {
  460. if bytes >= softResponseLimit {
  461. break
  462. }
  463. // Retrieve the requested block body, stopping if enough was found
  464. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  465. bodies = append(bodies, data)
  466. bytes += len(data)
  467. }
  468. }
  469. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  470. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  471. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  472. case BlockBodiesMsg:
  473. if pm.odr == nil {
  474. return errResp(ErrUnexpectedResponse, "")
  475. }
  476. p.Log().Trace("Received block bodies response")
  477. // A batch of block bodies arrived to one of our previous requests
  478. var resp struct {
  479. ReqID, BV uint64
  480. Data []*types.Body
  481. }
  482. if err := msg.Decode(&resp); err != nil {
  483. return errResp(ErrDecode, "msg %v: %v", msg, err)
  484. }
  485. p.fcServer.GotReply(resp.ReqID, resp.BV)
  486. deliverMsg = &Msg{
  487. MsgType: MsgBlockBodies,
  488. ReqID: resp.ReqID,
  489. Obj: resp.Data,
  490. }
  491. case GetCodeMsg:
  492. p.Log().Trace("Received code request")
  493. // Decode the retrieval message
  494. var req struct {
  495. ReqID uint64
  496. Reqs []CodeReq
  497. }
  498. if err := msg.Decode(&req); err != nil {
  499. return errResp(ErrDecode, "msg %v: %v", msg, err)
  500. }
  501. // Gather state data until the fetch or network limits is reached
  502. var (
  503. bytes int
  504. data [][]byte
  505. )
  506. reqCnt := len(req.Reqs)
  507. if reject(uint64(reqCnt), MaxCodeFetch) {
  508. return errResp(ErrRequestRejected, "")
  509. }
  510. for _, req := range req.Reqs {
  511. // Retrieve the requested state entry, stopping if enough was found
  512. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  513. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  514. sdata := trie.Get(req.AccKey)
  515. var acc state.Account
  516. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  517. entry, _ := pm.chainDb.Get(acc.CodeHash)
  518. if bytes+len(entry) >= softResponseLimit {
  519. break
  520. }
  521. data = append(data, entry)
  522. bytes += len(entry)
  523. }
  524. }
  525. }
  526. }
  527. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  528. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  529. return p.SendCode(req.ReqID, bv, data)
  530. case CodeMsg:
  531. if pm.odr == nil {
  532. return errResp(ErrUnexpectedResponse, "")
  533. }
  534. p.Log().Trace("Received code response")
  535. // A batch of node state data arrived to one of our previous requests
  536. var resp struct {
  537. ReqID, BV uint64
  538. Data [][]byte
  539. }
  540. if err := msg.Decode(&resp); err != nil {
  541. return errResp(ErrDecode, "msg %v: %v", msg, err)
  542. }
  543. p.fcServer.GotReply(resp.ReqID, resp.BV)
  544. deliverMsg = &Msg{
  545. MsgType: MsgCode,
  546. ReqID: resp.ReqID,
  547. Obj: resp.Data,
  548. }
  549. case GetReceiptsMsg:
  550. p.Log().Trace("Received receipts request")
  551. // Decode the retrieval message
  552. var req struct {
  553. ReqID uint64
  554. Hashes []common.Hash
  555. }
  556. if err := msg.Decode(&req); err != nil {
  557. return errResp(ErrDecode, "msg %v: %v", msg, err)
  558. }
  559. // Gather state data until the fetch or network limits is reached
  560. var (
  561. bytes int
  562. receipts []rlp.RawValue
  563. )
  564. reqCnt := len(req.Hashes)
  565. if reject(uint64(reqCnt), MaxReceiptFetch) {
  566. return errResp(ErrRequestRejected, "")
  567. }
  568. for _, hash := range req.Hashes {
  569. if bytes >= softResponseLimit {
  570. break
  571. }
  572. // Retrieve the requested block's receipts, skipping if unknown to us
  573. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  574. if results == nil {
  575. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  576. continue
  577. }
  578. }
  579. // If known, encode and queue for response packet
  580. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  581. log.Error("Failed to encode receipt", "err", err)
  582. } else {
  583. receipts = append(receipts, encoded)
  584. bytes += len(encoded)
  585. }
  586. }
  587. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  588. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  589. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  590. case ReceiptsMsg:
  591. if pm.odr == nil {
  592. return errResp(ErrUnexpectedResponse, "")
  593. }
  594. p.Log().Trace("Received receipts response")
  595. // A batch of receipts arrived to one of our previous requests
  596. var resp struct {
  597. ReqID, BV uint64
  598. Receipts []types.Receipts
  599. }
  600. if err := msg.Decode(&resp); err != nil {
  601. return errResp(ErrDecode, "msg %v: %v", msg, err)
  602. }
  603. p.fcServer.GotReply(resp.ReqID, resp.BV)
  604. deliverMsg = &Msg{
  605. MsgType: MsgReceipts,
  606. ReqID: resp.ReqID,
  607. Obj: resp.Receipts,
  608. }
  609. case GetProofsV1Msg:
  610. p.Log().Trace("Received proofs request")
  611. // Decode the retrieval message
  612. var req struct {
  613. ReqID uint64
  614. Reqs []ProofReq
  615. }
  616. if err := msg.Decode(&req); err != nil {
  617. return errResp(ErrDecode, "msg %v: %v", msg, err)
  618. }
  619. // Gather state data until the fetch or network limits is reached
  620. var (
  621. bytes int
  622. proofs proofsData
  623. )
  624. reqCnt := len(req.Reqs)
  625. if reject(uint64(reqCnt), MaxProofsFetch) {
  626. return errResp(ErrRequestRejected, "")
  627. }
  628. for _, req := range req.Reqs {
  629. if bytes >= softResponseLimit {
  630. break
  631. }
  632. // Retrieve the requested state entry, stopping if enough was found
  633. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  634. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  635. if len(req.AccKey) > 0 {
  636. sdata := tr.Get(req.AccKey)
  637. tr = nil
  638. var acc state.Account
  639. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  640. tr, _ = trie.New(acc.Root, pm.chainDb)
  641. }
  642. }
  643. if tr != nil {
  644. var proof light.NodeList
  645. tr.Prove(req.Key, 0, &proof)
  646. proofs = append(proofs, proof)
  647. bytes += proof.DataSize()
  648. }
  649. }
  650. }
  651. }
  652. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  653. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  654. return p.SendProofs(req.ReqID, bv, proofs)
  655. case GetProofsV2Msg:
  656. p.Log().Trace("Received les/2 proofs request")
  657. // Decode the retrieval message
  658. var req struct {
  659. ReqID uint64
  660. Reqs []ProofReq
  661. }
  662. if err := msg.Decode(&req); err != nil {
  663. return errResp(ErrDecode, "msg %v: %v", msg, err)
  664. }
  665. // Gather state data until the fetch or network limits is reached
  666. var (
  667. lastBHash common.Hash
  668. lastAccKey []byte
  669. tr, str *trie.Trie
  670. )
  671. reqCnt := len(req.Reqs)
  672. if reject(uint64(reqCnt), MaxProofsFetch) {
  673. return errResp(ErrRequestRejected, "")
  674. }
  675. nodes := light.NewNodeSet()
  676. for _, req := range req.Reqs {
  677. if nodes.DataSize() >= softResponseLimit {
  678. break
  679. }
  680. if tr == nil || req.BHash != lastBHash {
  681. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  682. tr, _ = trie.New(header.Root, pm.chainDb)
  683. } else {
  684. tr = nil
  685. }
  686. lastBHash = req.BHash
  687. str = nil
  688. }
  689. if tr != nil {
  690. if len(req.AccKey) > 0 {
  691. if str == nil || !bytes.Equal(req.AccKey, lastAccKey) {
  692. sdata := tr.Get(req.AccKey)
  693. str = nil
  694. var acc state.Account
  695. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  696. str, _ = trie.New(acc.Root, pm.chainDb)
  697. }
  698. lastAccKey = common.CopyBytes(req.AccKey)
  699. }
  700. if str != nil {
  701. str.Prove(req.Key, req.FromLevel, nodes)
  702. }
  703. } else {
  704. tr.Prove(req.Key, req.FromLevel, nodes)
  705. }
  706. }
  707. }
  708. proofs := nodes.NodeList()
  709. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  710. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  711. return p.SendProofsV2(req.ReqID, bv, proofs)
  712. case ProofsV1Msg:
  713. if pm.odr == nil {
  714. return errResp(ErrUnexpectedResponse, "")
  715. }
  716. p.Log().Trace("Received proofs response")
  717. // A batch of merkle proofs arrived to one of our previous requests
  718. var resp struct {
  719. ReqID, BV uint64
  720. Data []light.NodeList
  721. }
  722. if err := msg.Decode(&resp); err != nil {
  723. return errResp(ErrDecode, "msg %v: %v", msg, err)
  724. }
  725. p.fcServer.GotReply(resp.ReqID, resp.BV)
  726. deliverMsg = &Msg{
  727. MsgType: MsgProofsV1,
  728. ReqID: resp.ReqID,
  729. Obj: resp.Data,
  730. }
  731. case ProofsV2Msg:
  732. if pm.odr == nil {
  733. return errResp(ErrUnexpectedResponse, "")
  734. }
  735. p.Log().Trace("Received les/2 proofs response")
  736. // A batch of merkle proofs arrived to one of our previous requests
  737. var resp struct {
  738. ReqID, BV uint64
  739. Data light.NodeList
  740. }
  741. if err := msg.Decode(&resp); err != nil {
  742. return errResp(ErrDecode, "msg %v: %v", msg, err)
  743. }
  744. p.fcServer.GotReply(resp.ReqID, resp.BV)
  745. deliverMsg = &Msg{
  746. MsgType: MsgProofsV2,
  747. ReqID: resp.ReqID,
  748. Obj: resp.Data,
  749. }
  750. case GetHeaderProofsMsg:
  751. p.Log().Trace("Received headers proof request")
  752. // Decode the retrieval message
  753. var req struct {
  754. ReqID uint64
  755. Reqs []ChtReq
  756. }
  757. if err := msg.Decode(&req); err != nil {
  758. return errResp(ErrDecode, "msg %v: %v", msg, err)
  759. }
  760. // Gather state data until the fetch or network limits is reached
  761. var (
  762. bytes int
  763. proofs []ChtResp
  764. )
  765. reqCnt := len(req.Reqs)
  766. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  767. return errResp(ErrRequestRejected, "")
  768. }
  769. trieDb := ethdb.NewTable(pm.chainDb, light.ChtTablePrefix)
  770. for _, req := range req.Reqs {
  771. if bytes >= softResponseLimit {
  772. break
  773. }
  774. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  775. sectionHead := core.GetCanonicalHash(pm.chainDb, (req.ChtNum+1)*light.ChtV1Frequency-1)
  776. if root := light.GetChtRoot(pm.chainDb, req.ChtNum, sectionHead); root != (common.Hash{}) {
  777. if tr, _ := trie.New(root, trieDb); tr != nil {
  778. var encNumber [8]byte
  779. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  780. var proof light.NodeList
  781. tr.Prove(encNumber[:], 0, &proof)
  782. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  783. bytes += proof.DataSize() + estHeaderRlpSize
  784. }
  785. }
  786. }
  787. }
  788. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  789. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  790. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  791. case GetHelperTrieProofsMsg:
  792. p.Log().Trace("Received helper trie proof request")
  793. // Decode the retrieval message
  794. var req struct {
  795. ReqID uint64
  796. Reqs []HelperTrieReq
  797. }
  798. if err := msg.Decode(&req); err != nil {
  799. return errResp(ErrDecode, "msg %v: %v", msg, err)
  800. }
  801. // Gather state data until the fetch or network limits is reached
  802. var (
  803. auxBytes int
  804. auxData [][]byte
  805. )
  806. reqCnt := len(req.Reqs)
  807. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  808. return errResp(ErrRequestRejected, "")
  809. }
  810. var (
  811. lastIdx uint64
  812. lastType uint
  813. root common.Hash
  814. tr *trie.Trie
  815. )
  816. nodes := light.NewNodeSet()
  817. for _, req := range req.Reqs {
  818. if nodes.DataSize()+auxBytes >= softResponseLimit {
  819. break
  820. }
  821. if tr == nil || req.HelperTrieType != lastType || req.TrieIdx != lastIdx {
  822. var prefix string
  823. root, prefix = pm.getHelperTrie(req.HelperTrieType, req.TrieIdx)
  824. if root != (common.Hash{}) {
  825. if t, err := trie.New(root, ethdb.NewTable(pm.chainDb, prefix)); err == nil {
  826. tr = t
  827. }
  828. }
  829. lastType = req.HelperTrieType
  830. lastIdx = req.TrieIdx
  831. }
  832. if req.AuxReq == auxRoot {
  833. var data []byte
  834. if root != (common.Hash{}) {
  835. data = root[:]
  836. }
  837. auxData = append(auxData, data)
  838. auxBytes += len(data)
  839. } else {
  840. if tr != nil {
  841. tr.Prove(req.Key, req.FromLevel, nodes)
  842. }
  843. if req.AuxReq != 0 {
  844. data := pm.getHelperTrieAuxData(req)
  845. auxData = append(auxData, data)
  846. auxBytes += len(data)
  847. }
  848. }
  849. }
  850. proofs := nodes.NodeList()
  851. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  852. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  853. return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: proofs, AuxData: auxData})
  854. case HeaderProofsMsg:
  855. if pm.odr == nil {
  856. return errResp(ErrUnexpectedResponse, "")
  857. }
  858. p.Log().Trace("Received headers proof response")
  859. var resp struct {
  860. ReqID, BV uint64
  861. Data []ChtResp
  862. }
  863. if err := msg.Decode(&resp); err != nil {
  864. return errResp(ErrDecode, "msg %v: %v", msg, err)
  865. }
  866. p.fcServer.GotReply(resp.ReqID, resp.BV)
  867. deliverMsg = &Msg{
  868. MsgType: MsgHeaderProofs,
  869. ReqID: resp.ReqID,
  870. Obj: resp.Data,
  871. }
  872. case HelperTrieProofsMsg:
  873. if pm.odr == nil {
  874. return errResp(ErrUnexpectedResponse, "")
  875. }
  876. p.Log().Trace("Received helper trie proof response")
  877. var resp struct {
  878. ReqID, BV uint64
  879. Data HelperTrieResps
  880. }
  881. if err := msg.Decode(&resp); err != nil {
  882. return errResp(ErrDecode, "msg %v: %v", msg, err)
  883. }
  884. p.fcServer.GotReply(resp.ReqID, resp.BV)
  885. deliverMsg = &Msg{
  886. MsgType: MsgHelperTrieProofs,
  887. ReqID: resp.ReqID,
  888. Obj: resp.Data,
  889. }
  890. case SendTxMsg:
  891. if pm.txpool == nil {
  892. return errResp(ErrRequestRejected, "")
  893. }
  894. // Transactions arrived, parse all of them and deliver to the pool
  895. var txs []*types.Transaction
  896. if err := msg.Decode(&txs); err != nil {
  897. return errResp(ErrDecode, "msg %v: %v", msg, err)
  898. }
  899. reqCnt := len(txs)
  900. if reject(uint64(reqCnt), MaxTxSend) {
  901. return errResp(ErrRequestRejected, "")
  902. }
  903. pm.txpool.AddRemotes(txs)
  904. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  905. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  906. case SendTxV2Msg:
  907. if pm.txpool == nil {
  908. return errResp(ErrRequestRejected, "")
  909. }
  910. // Transactions arrived, parse all of them and deliver to the pool
  911. var req struct {
  912. ReqID uint64
  913. Txs []*types.Transaction
  914. }
  915. if err := msg.Decode(&req); err != nil {
  916. return errResp(ErrDecode, "msg %v: %v", msg, err)
  917. }
  918. reqCnt := len(req.Txs)
  919. if reject(uint64(reqCnt), MaxTxSend) {
  920. return errResp(ErrRequestRejected, "")
  921. }
  922. hashes := make([]common.Hash, len(req.Txs))
  923. for i, tx := range req.Txs {
  924. hashes[i] = tx.Hash()
  925. }
  926. stats := pm.txStatus(hashes)
  927. for i, stat := range stats {
  928. if stat.Status == core.TxStatusUnknown {
  929. if errs := pm.txpool.AddRemotes([]*types.Transaction{req.Txs[i]}); errs[0] != nil {
  930. stats[i].Error = errs[0]
  931. continue
  932. }
  933. stats[i] = pm.txStatus([]common.Hash{hashes[i]})[0]
  934. }
  935. }
  936. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  937. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  938. return p.SendTxStatus(req.ReqID, bv, stats)
  939. case GetTxStatusMsg:
  940. if pm.txpool == nil {
  941. return errResp(ErrUnexpectedResponse, "")
  942. }
  943. // Transactions arrived, parse all of them and deliver to the pool
  944. var req struct {
  945. ReqID uint64
  946. Hashes []common.Hash
  947. }
  948. if err := msg.Decode(&req); err != nil {
  949. return errResp(ErrDecode, "msg %v: %v", msg, err)
  950. }
  951. reqCnt := len(req.Hashes)
  952. if reject(uint64(reqCnt), MaxTxStatus) {
  953. return errResp(ErrRequestRejected, "")
  954. }
  955. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  956. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  957. return p.SendTxStatus(req.ReqID, bv, pm.txStatus(req.Hashes))
  958. case TxStatusMsg:
  959. if pm.odr == nil {
  960. return errResp(ErrUnexpectedResponse, "")
  961. }
  962. p.Log().Trace("Received tx status response")
  963. var resp struct {
  964. ReqID, BV uint64
  965. Status []core.TxStatus
  966. }
  967. if err := msg.Decode(&resp); err != nil {
  968. return errResp(ErrDecode, "msg %v: %v", msg, err)
  969. }
  970. p.fcServer.GotReply(resp.ReqID, resp.BV)
  971. default:
  972. p.Log().Trace("Received unknown message", "code", msg.Code)
  973. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  974. }
  975. if deliverMsg != nil {
  976. err := pm.retriever.deliver(p, deliverMsg)
  977. if err != nil {
  978. p.responseErrors++
  979. if p.responseErrors > maxResponseErrors {
  980. return err
  981. }
  982. }
  983. }
  984. return nil
  985. }
  986. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  987. func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
  988. switch id {
  989. case htCanonical:
  990. sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.ChtFrequency-1)
  991. return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix
  992. case htBloomBits:
  993. sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1)
  994. return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
  995. }
  996. return common.Hash{}, ""
  997. }
  998. // getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request
  999. func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
  1000. if req.HelperTrieType == htCanonical && req.AuxReq == auxHeader {
  1001. if len(req.Key) != 8 {
  1002. return nil
  1003. }
  1004. blockNum := binary.BigEndian.Uint64(req.Key)
  1005. hash := core.GetCanonicalHash(pm.chainDb, blockNum)
  1006. return core.GetHeaderRLP(pm.chainDb, hash, blockNum)
  1007. }
  1008. return nil
  1009. }
  1010. func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus {
  1011. stats := make([]txStatus, len(hashes))
  1012. for i, stat := range pm.txpool.Status(hashes) {
  1013. // Save the status we've got from the transaction pool
  1014. stats[i].Status = stat
  1015. // If the transaction is unknown to the pool, try looking it up locally
  1016. if stat == core.TxStatusUnknown {
  1017. if block, number, index := core.GetTxLookupEntry(pm.chainDb, hashes[i]); block != (common.Hash{}) {
  1018. stats[i].Status = core.TxStatusIncluded
  1019. stats[i].Lookup = &core.TxLookupEntry{BlockHash: block, BlockIndex: number, Index: index}
  1020. }
  1021. }
  1022. }
  1023. return stats
  1024. }
  1025. // NodeInfo represents a short summary of the Ethereum sub-protocol metadata
  1026. // known about the host peer.
  1027. type NodeInfo struct {
  1028. Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4)
  1029. Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
  1030. Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
  1031. Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
  1032. Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
  1033. }
  1034. // NodeInfo retrieves some protocol metadata about the running host node.
  1035. func (self *ProtocolManager) NodeInfo() *NodeInfo {
  1036. return &NodeInfo{
  1037. Network: self.networkId,
  1038. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  1039. Genesis: self.blockchain.Genesis().Hash(),
  1040. Config: self.blockchain.Config(),
  1041. Head: self.blockchain.LastBlockHash(),
  1042. }
  1043. }
  1044. // downloaderPeerNotify implements peerSetNotify
  1045. type downloaderPeerNotify ProtocolManager
  1046. type peerConnection struct {
  1047. manager *ProtocolManager
  1048. peer *peer
  1049. }
  1050. func (pc *peerConnection) Head() (common.Hash, *big.Int) {
  1051. return pc.peer.HeadAndTd()
  1052. }
  1053. func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  1054. reqID := genReqID()
  1055. rq := &distReq{
  1056. getCost: func(dp distPeer) uint64 {
  1057. peer := dp.(*peer)
  1058. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1059. },
  1060. canSend: func(dp distPeer) bool {
  1061. return dp.(*peer) == pc.peer
  1062. },
  1063. request: func(dp distPeer) func() {
  1064. peer := dp.(*peer)
  1065. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1066. peer.fcServer.QueueRequest(reqID, cost)
  1067. return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
  1068. },
  1069. }
  1070. _, ok := <-pc.manager.reqDist.queue(rq)
  1071. if !ok {
  1072. return ErrNoPeers
  1073. }
  1074. return nil
  1075. }
  1076. func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  1077. reqID := genReqID()
  1078. rq := &distReq{
  1079. getCost: func(dp distPeer) uint64 {
  1080. peer := dp.(*peer)
  1081. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1082. },
  1083. canSend: func(dp distPeer) bool {
  1084. return dp.(*peer) == pc.peer
  1085. },
  1086. request: func(dp distPeer) func() {
  1087. peer := dp.(*peer)
  1088. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1089. peer.fcServer.QueueRequest(reqID, cost)
  1090. return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
  1091. },
  1092. }
  1093. _, ok := <-pc.manager.reqDist.queue(rq)
  1094. if !ok {
  1095. return ErrNoPeers
  1096. }
  1097. return nil
  1098. }
  1099. func (d *downloaderPeerNotify) registerPeer(p *peer) {
  1100. pm := (*ProtocolManager)(d)
  1101. pc := &peerConnection{
  1102. manager: pm,
  1103. peer: p,
  1104. }
  1105. pm.downloader.RegisterLightPeer(p.id, ethVersion, pc)
  1106. }
  1107. func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
  1108. pm := (*ProtocolManager)(d)
  1109. pm.downloader.UnregisterPeer(p.id)
  1110. }