handler.go 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/consensus"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/state"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/eth/downloader"
  32. "github.com/ethereum/go-ethereum/ethdb"
  33. "github.com/ethereum/go-ethereum/event"
  34. "github.com/ethereum/go-ethereum/light"
  35. "github.com/ethereum/go-ethereum/log"
  36. "github.com/ethereum/go-ethereum/p2p"
  37. "github.com/ethereum/go-ethereum/p2p/discover"
  38. "github.com/ethereum/go-ethereum/p2p/discv5"
  39. "github.com/ethereum/go-ethereum/params"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 63 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. MaxTxStatus = 256 // Amount of transactions to queried per request
  55. disableClientRemovePeer = false
  56. )
  57. // errIncompatibleConfig is returned if the requested protocols and configs are
  58. // not compatible (low protocol version restrictions and high requirements).
  59. var errIncompatibleConfig = errors.New("incompatible configuration")
  60. func errResp(code errCode, format string, v ...interface{}) error {
  61. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  62. }
  63. type BlockChain interface {
  64. Config() *params.ChainConfig
  65. HasHeader(hash common.Hash, number uint64) bool
  66. GetHeader(hash common.Hash, number uint64) *types.Header
  67. GetHeaderByHash(hash common.Hash) *types.Header
  68. CurrentHeader() *types.Header
  69. GetTd(hash common.Hash, number uint64) *big.Int
  70. State() (*state.StateDB, error)
  71. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  72. Rollback(chain []common.Hash)
  73. GetHeaderByNumber(number uint64) *types.Header
  74. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  75. Genesis() *types.Block
  76. SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
  77. }
  78. type txPool interface {
  79. AddRemotes(txs []*types.Transaction) []error
  80. Status(hashes []common.Hash) []core.TxStatus
  81. }
  82. type ProtocolManager struct {
  83. lightSync bool
  84. txpool txPool
  85. txrelay *LesTxRelay
  86. networkId uint64
  87. chainConfig *params.ChainConfig
  88. blockchain BlockChain
  89. chainDb ethdb.Database
  90. odr *LesOdr
  91. server *LesServer
  92. serverPool *serverPool
  93. lesTopic discv5.Topic
  94. reqDist *requestDistributor
  95. retriever *retrieveManager
  96. downloader *downloader.Downloader
  97. fetcher *lightFetcher
  98. peers *peerSet
  99. maxPeers int
  100. SubProtocols []p2p.Protocol
  101. eventMux *event.TypeMux
  102. // channels for fetcher, syncer, txsyncLoop
  103. newPeerCh chan *peer
  104. quitSync chan struct{}
  105. noMorePeers chan struct{}
  106. // wait group is used for graceful shutdowns during downloading
  107. // and processing
  108. wg *sync.WaitGroup
  109. }
  110. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  111. // with the ethereum network.
  112. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protocolVersions []uint, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
  113. // Create the protocol manager with the base fields
  114. manager := &ProtocolManager{
  115. lightSync: lightSync,
  116. eventMux: mux,
  117. blockchain: blockchain,
  118. chainConfig: chainConfig,
  119. chainDb: chainDb,
  120. odr: odr,
  121. networkId: networkId,
  122. txpool: txpool,
  123. txrelay: txrelay,
  124. peers: peers,
  125. newPeerCh: make(chan *peer),
  126. quitSync: quitSync,
  127. wg: wg,
  128. noMorePeers: make(chan struct{}),
  129. }
  130. if odr != nil {
  131. manager.retriever = odr.retriever
  132. manager.reqDist = odr.retriever.dist
  133. }
  134. // Initiate a sub-protocol for every implemented version we can handle
  135. manager.SubProtocols = make([]p2p.Protocol, 0, len(protocolVersions))
  136. for _, version := range protocolVersions {
  137. // Compatible, initialize the sub-protocol
  138. version := version // Closure for the run
  139. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  140. Name: "les",
  141. Version: version,
  142. Length: ProtocolLengths[version],
  143. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  144. var entry *poolEntry
  145. peer := manager.newPeer(int(version), networkId, p, rw)
  146. if manager.serverPool != nil {
  147. addr := p.RemoteAddr().(*net.TCPAddr)
  148. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  149. }
  150. peer.poolEntry = entry
  151. select {
  152. case manager.newPeerCh <- peer:
  153. manager.wg.Add(1)
  154. defer manager.wg.Done()
  155. err := manager.handle(peer)
  156. if entry != nil {
  157. manager.serverPool.disconnect(entry)
  158. }
  159. return err
  160. case <-manager.quitSync:
  161. if entry != nil {
  162. manager.serverPool.disconnect(entry)
  163. }
  164. return p2p.DiscQuitting
  165. }
  166. },
  167. NodeInfo: func() interface{} {
  168. return manager.NodeInfo()
  169. },
  170. PeerInfo: func(id discover.NodeID) interface{} {
  171. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  172. return p.Info()
  173. }
  174. return nil
  175. },
  176. })
  177. }
  178. if len(manager.SubProtocols) == 0 {
  179. return nil, errIncompatibleConfig
  180. }
  181. removePeer := manager.removePeer
  182. if disableClientRemovePeer {
  183. removePeer = func(id string) {}
  184. }
  185. if lightSync {
  186. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
  187. manager.peers.notify((*downloaderPeerNotify)(manager))
  188. manager.fetcher = newLightFetcher(manager)
  189. }
  190. return manager, nil
  191. }
  192. // removePeer initiates disconnection from a peer by removing it from the peer set
  193. func (pm *ProtocolManager) removePeer(id string) {
  194. pm.peers.Unregister(id)
  195. }
  196. func (pm *ProtocolManager) Start(maxPeers int) {
  197. pm.maxPeers = maxPeers
  198. if pm.lightSync {
  199. go pm.syncer()
  200. } else {
  201. go func() {
  202. for range pm.newPeerCh {
  203. }
  204. }()
  205. }
  206. }
  207. func (pm *ProtocolManager) Stop() {
  208. // Showing a log message. During download / process this could actually
  209. // take between 5 to 10 seconds and therefor feedback is required.
  210. log.Info("Stopping light Ethereum protocol")
  211. // Quit the sync loop.
  212. // After this send has completed, no new peers will be accepted.
  213. pm.noMorePeers <- struct{}{}
  214. close(pm.quitSync) // quits syncer, fetcher
  215. // Disconnect existing sessions.
  216. // This also closes the gate for any new registrations on the peer set.
  217. // sessions which are already established but not added to pm.peers yet
  218. // will exit when they try to register.
  219. pm.peers.Close()
  220. // Wait for any process action
  221. pm.wg.Wait()
  222. log.Info("Light Ethereum protocol stopped")
  223. }
  224. func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  225. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  226. }
  227. // handle is the callback invoked to manage the life cycle of a les peer. When
  228. // this function terminates, the peer is disconnected.
  229. func (pm *ProtocolManager) handle(p *peer) error {
  230. if pm.peers.Len() >= pm.maxPeers {
  231. return p2p.DiscTooManyPeers
  232. }
  233. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  234. // Execute the LES handshake
  235. var (
  236. genesis = pm.blockchain.Genesis()
  237. head = pm.blockchain.CurrentHeader()
  238. hash = head.Hash()
  239. number = head.Number.Uint64()
  240. td = pm.blockchain.GetTd(hash, number)
  241. )
  242. if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil {
  243. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  244. return err
  245. }
  246. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  247. rw.Init(p.version)
  248. }
  249. // Register the peer locally
  250. if err := pm.peers.Register(p); err != nil {
  251. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  252. return err
  253. }
  254. defer func() {
  255. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  256. p.fcClient.Remove(pm.server.fcManager)
  257. }
  258. pm.removePeer(p.id)
  259. }()
  260. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  261. if pm.lightSync {
  262. p.lock.Lock()
  263. head := p.headInfo
  264. p.lock.Unlock()
  265. if pm.fetcher != nil {
  266. pm.fetcher.announce(p, head)
  267. }
  268. if p.poolEntry != nil {
  269. pm.serverPool.registered(p.poolEntry)
  270. }
  271. }
  272. stop := make(chan struct{})
  273. defer close(stop)
  274. go func() {
  275. // new block announce loop
  276. for {
  277. select {
  278. case announce := <-p.announceChn:
  279. p.SendAnnounce(announce)
  280. case <-stop:
  281. return
  282. }
  283. }
  284. }()
  285. // main loop. handle incoming messages.
  286. for {
  287. if err := pm.handleMsg(p); err != nil {
  288. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  289. return err
  290. }
  291. }
  292. }
  293. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg}
  294. // handleMsg is invoked whenever an inbound message is received from a remote
  295. // peer. The remote connection is torn down upon returning any error.
  296. func (pm *ProtocolManager) handleMsg(p *peer) error {
  297. // Read the next message from the remote peer, and ensure it's fully consumed
  298. msg, err := p.rw.ReadMsg()
  299. if err != nil {
  300. return err
  301. }
  302. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  303. costs := p.fcCosts[msg.Code]
  304. reject := func(reqCnt, maxCnt uint64) bool {
  305. if p.fcClient == nil || reqCnt > maxCnt {
  306. return true
  307. }
  308. bufValue, _ := p.fcClient.AcceptRequest()
  309. cost := costs.baseCost + reqCnt*costs.reqCost
  310. if cost > pm.server.defParams.BufLimit {
  311. cost = pm.server.defParams.BufLimit
  312. }
  313. if cost > bufValue {
  314. recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge)
  315. p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge))
  316. return true
  317. }
  318. return false
  319. }
  320. if msg.Size > ProtocolMaxMsgSize {
  321. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  322. }
  323. defer msg.Discard()
  324. var deliverMsg *Msg
  325. // Handle the message depending on its contents
  326. switch msg.Code {
  327. case StatusMsg:
  328. p.Log().Trace("Received status message")
  329. // Status messages should never arrive after the handshake
  330. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  331. // Block header query, collect the requested headers and reply
  332. case AnnounceMsg:
  333. p.Log().Trace("Received announce message")
  334. if p.requestAnnounceType == announceTypeNone {
  335. return errResp(ErrUnexpectedResponse, "")
  336. }
  337. var req announceData
  338. if err := msg.Decode(&req); err != nil {
  339. return errResp(ErrDecode, "%v: %v", msg, err)
  340. }
  341. if p.requestAnnounceType == announceTypeSigned {
  342. if err := req.checkSignature(p.pubKey); err != nil {
  343. p.Log().Trace("Invalid announcement signature", "err", err)
  344. return err
  345. }
  346. p.Log().Trace("Valid announcement signature")
  347. }
  348. p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
  349. if pm.fetcher != nil {
  350. pm.fetcher.announce(p, &req)
  351. }
  352. case GetBlockHeadersMsg:
  353. p.Log().Trace("Received block header request")
  354. // Decode the complex header query
  355. var req struct {
  356. ReqID uint64
  357. Query getBlockHeadersData
  358. }
  359. if err := msg.Decode(&req); err != nil {
  360. return errResp(ErrDecode, "%v: %v", msg, err)
  361. }
  362. query := req.Query
  363. if reject(query.Amount, MaxHeaderFetch) {
  364. return errResp(ErrRequestRejected, "")
  365. }
  366. hashMode := query.Origin.Hash != (common.Hash{})
  367. // Gather headers until the fetch or network limits is reached
  368. var (
  369. bytes common.StorageSize
  370. headers []*types.Header
  371. unknown bool
  372. )
  373. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  374. // Retrieve the next header satisfying the query
  375. var origin *types.Header
  376. if hashMode {
  377. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  378. } else {
  379. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  380. }
  381. if origin == nil {
  382. break
  383. }
  384. number := origin.Number.Uint64()
  385. headers = append(headers, origin)
  386. bytes += estHeaderRlpSize
  387. // Advance to the next header of the query
  388. switch {
  389. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  390. // Hash based traversal towards the genesis block
  391. for i := 0; i < int(query.Skip)+1; i++ {
  392. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  393. query.Origin.Hash = header.ParentHash
  394. number--
  395. } else {
  396. unknown = true
  397. break
  398. }
  399. }
  400. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  401. // Hash based traversal towards the leaf block
  402. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  403. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  404. query.Origin.Hash = header.Hash()
  405. } else {
  406. unknown = true
  407. }
  408. } else {
  409. unknown = true
  410. }
  411. case query.Reverse:
  412. // Number based traversal towards the genesis block
  413. if query.Origin.Number >= query.Skip+1 {
  414. query.Origin.Number -= query.Skip + 1
  415. } else {
  416. unknown = true
  417. }
  418. case !query.Reverse:
  419. // Number based traversal towards the leaf block
  420. query.Origin.Number += query.Skip + 1
  421. }
  422. }
  423. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  424. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  425. return p.SendBlockHeaders(req.ReqID, bv, headers)
  426. case BlockHeadersMsg:
  427. if pm.downloader == nil {
  428. return errResp(ErrUnexpectedResponse, "")
  429. }
  430. p.Log().Trace("Received block header response message")
  431. // A batch of headers arrived to one of our previous requests
  432. var resp struct {
  433. ReqID, BV uint64
  434. Headers []*types.Header
  435. }
  436. if err := msg.Decode(&resp); err != nil {
  437. return errResp(ErrDecode, "msg %v: %v", msg, err)
  438. }
  439. p.fcServer.GotReply(resp.ReqID, resp.BV)
  440. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  441. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  442. } else {
  443. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  444. if err != nil {
  445. log.Debug(fmt.Sprint(err))
  446. }
  447. }
  448. case GetBlockBodiesMsg:
  449. p.Log().Trace("Received block bodies request")
  450. // Decode the retrieval message
  451. var req struct {
  452. ReqID uint64
  453. Hashes []common.Hash
  454. }
  455. if err := msg.Decode(&req); err != nil {
  456. return errResp(ErrDecode, "msg %v: %v", msg, err)
  457. }
  458. // Gather blocks until the fetch or network limits is reached
  459. var (
  460. bytes int
  461. bodies []rlp.RawValue
  462. )
  463. reqCnt := len(req.Hashes)
  464. if reject(uint64(reqCnt), MaxBodyFetch) {
  465. return errResp(ErrRequestRejected, "")
  466. }
  467. for _, hash := range req.Hashes {
  468. if bytes >= softResponseLimit {
  469. break
  470. }
  471. // Retrieve the requested block body, stopping if enough was found
  472. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  473. bodies = append(bodies, data)
  474. bytes += len(data)
  475. }
  476. }
  477. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  478. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  479. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  480. case BlockBodiesMsg:
  481. if pm.odr == nil {
  482. return errResp(ErrUnexpectedResponse, "")
  483. }
  484. p.Log().Trace("Received block bodies response")
  485. // A batch of block bodies arrived to one of our previous requests
  486. var resp struct {
  487. ReqID, BV uint64
  488. Data []*types.Body
  489. }
  490. if err := msg.Decode(&resp); err != nil {
  491. return errResp(ErrDecode, "msg %v: %v", msg, err)
  492. }
  493. p.fcServer.GotReply(resp.ReqID, resp.BV)
  494. deliverMsg = &Msg{
  495. MsgType: MsgBlockBodies,
  496. ReqID: resp.ReqID,
  497. Obj: resp.Data,
  498. }
  499. case GetCodeMsg:
  500. p.Log().Trace("Received code request")
  501. // Decode the retrieval message
  502. var req struct {
  503. ReqID uint64
  504. Reqs []CodeReq
  505. }
  506. if err := msg.Decode(&req); err != nil {
  507. return errResp(ErrDecode, "msg %v: %v", msg, err)
  508. }
  509. // Gather state data until the fetch or network limits is reached
  510. var (
  511. bytes int
  512. data [][]byte
  513. )
  514. reqCnt := len(req.Reqs)
  515. if reject(uint64(reqCnt), MaxCodeFetch) {
  516. return errResp(ErrRequestRejected, "")
  517. }
  518. for _, req := range req.Reqs {
  519. // Retrieve the requested state entry, stopping if enough was found
  520. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  521. statedb, err := pm.blockchain.State()
  522. if err != nil {
  523. continue
  524. }
  525. account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
  526. if err != nil {
  527. continue
  528. }
  529. code, _ := statedb.Database().TrieDB().Node(common.BytesToHash(account.CodeHash))
  530. data = append(data, code)
  531. if bytes += len(code); bytes >= softResponseLimit {
  532. break
  533. }
  534. }
  535. }
  536. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  537. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  538. return p.SendCode(req.ReqID, bv, data)
  539. case CodeMsg:
  540. if pm.odr == nil {
  541. return errResp(ErrUnexpectedResponse, "")
  542. }
  543. p.Log().Trace("Received code response")
  544. // A batch of node state data arrived to one of our previous requests
  545. var resp struct {
  546. ReqID, BV uint64
  547. Data [][]byte
  548. }
  549. if err := msg.Decode(&resp); err != nil {
  550. return errResp(ErrDecode, "msg %v: %v", msg, err)
  551. }
  552. p.fcServer.GotReply(resp.ReqID, resp.BV)
  553. deliverMsg = &Msg{
  554. MsgType: MsgCode,
  555. ReqID: resp.ReqID,
  556. Obj: resp.Data,
  557. }
  558. case GetReceiptsMsg:
  559. p.Log().Trace("Received receipts request")
  560. // Decode the retrieval message
  561. var req struct {
  562. ReqID uint64
  563. Hashes []common.Hash
  564. }
  565. if err := msg.Decode(&req); err != nil {
  566. return errResp(ErrDecode, "msg %v: %v", msg, err)
  567. }
  568. // Gather state data until the fetch or network limits is reached
  569. var (
  570. bytes int
  571. receipts []rlp.RawValue
  572. )
  573. reqCnt := len(req.Hashes)
  574. if reject(uint64(reqCnt), MaxReceiptFetch) {
  575. return errResp(ErrRequestRejected, "")
  576. }
  577. for _, hash := range req.Hashes {
  578. if bytes >= softResponseLimit {
  579. break
  580. }
  581. // Retrieve the requested block's receipts, skipping if unknown to us
  582. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  583. if results == nil {
  584. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  585. continue
  586. }
  587. }
  588. // If known, encode and queue for response packet
  589. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  590. log.Error("Failed to encode receipt", "err", err)
  591. } else {
  592. receipts = append(receipts, encoded)
  593. bytes += len(encoded)
  594. }
  595. }
  596. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  597. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  598. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  599. case ReceiptsMsg:
  600. if pm.odr == nil {
  601. return errResp(ErrUnexpectedResponse, "")
  602. }
  603. p.Log().Trace("Received receipts response")
  604. // A batch of receipts arrived to one of our previous requests
  605. var resp struct {
  606. ReqID, BV uint64
  607. Receipts []types.Receipts
  608. }
  609. if err := msg.Decode(&resp); err != nil {
  610. return errResp(ErrDecode, "msg %v: %v", msg, err)
  611. }
  612. p.fcServer.GotReply(resp.ReqID, resp.BV)
  613. deliverMsg = &Msg{
  614. MsgType: MsgReceipts,
  615. ReqID: resp.ReqID,
  616. Obj: resp.Receipts,
  617. }
  618. case GetProofsV1Msg:
  619. p.Log().Trace("Received proofs request")
  620. // Decode the retrieval message
  621. var req struct {
  622. ReqID uint64
  623. Reqs []ProofReq
  624. }
  625. if err := msg.Decode(&req); err != nil {
  626. return errResp(ErrDecode, "msg %v: %v", msg, err)
  627. }
  628. // Gather state data until the fetch or network limits is reached
  629. var (
  630. bytes int
  631. proofs proofsData
  632. )
  633. reqCnt := len(req.Reqs)
  634. if reject(uint64(reqCnt), MaxProofsFetch) {
  635. return errResp(ErrRequestRejected, "")
  636. }
  637. for _, req := range req.Reqs {
  638. // Retrieve the requested state entry, stopping if enough was found
  639. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  640. statedb, err := pm.blockchain.State()
  641. if err != nil {
  642. continue
  643. }
  644. var trie state.Trie
  645. if len(req.AccKey) > 0 {
  646. account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey))
  647. if err != nil {
  648. continue
  649. }
  650. trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
  651. } else {
  652. trie, _ = statedb.Database().OpenTrie(header.Root)
  653. }
  654. if trie != nil {
  655. var proof light.NodeList
  656. trie.Prove(req.Key, 0, &proof)
  657. proofs = append(proofs, proof)
  658. if bytes += proof.DataSize(); bytes >= softResponseLimit {
  659. break
  660. }
  661. }
  662. }
  663. }
  664. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  665. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  666. return p.SendProofs(req.ReqID, bv, proofs)
  667. case GetProofsV2Msg:
  668. p.Log().Trace("Received les/2 proofs request")
  669. // Decode the retrieval message
  670. var req struct {
  671. ReqID uint64
  672. Reqs []ProofReq
  673. }
  674. if err := msg.Decode(&req); err != nil {
  675. return errResp(ErrDecode, "msg %v: %v", msg, err)
  676. }
  677. // Gather state data until the fetch or network limits is reached
  678. var (
  679. lastBHash common.Hash
  680. statedb *state.StateDB
  681. root common.Hash
  682. )
  683. reqCnt := len(req.Reqs)
  684. if reject(uint64(reqCnt), MaxProofsFetch) {
  685. return errResp(ErrRequestRejected, "")
  686. }
  687. nodes := light.NewNodeSet()
  688. for _, req := range req.Reqs {
  689. // Look up the state belonging to the request
  690. if statedb == nil || req.BHash != lastBHash {
  691. statedb, root, lastBHash = nil, common.Hash{}, req.BHash
  692. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  693. statedb, _ = pm.blockchain.State()
  694. root = header.Root
  695. }
  696. }
  697. if statedb == nil {
  698. continue
  699. }
  700. // Pull the account or storage trie of the request
  701. var trie state.Trie
  702. if len(req.AccKey) > 0 {
  703. account, err := pm.getAccount(statedb, root, common.BytesToHash(req.AccKey))
  704. if err != nil {
  705. continue
  706. }
  707. trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root)
  708. } else {
  709. trie, _ = statedb.Database().OpenTrie(root)
  710. }
  711. if trie == nil {
  712. continue
  713. }
  714. // Prove the user's request from the account or stroage trie
  715. trie.Prove(req.Key, req.FromLevel, nodes)
  716. if nodes.DataSize() >= softResponseLimit {
  717. break
  718. }
  719. }
  720. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  721. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  722. return p.SendProofsV2(req.ReqID, bv, nodes.NodeList())
  723. case ProofsV1Msg:
  724. if pm.odr == nil {
  725. return errResp(ErrUnexpectedResponse, "")
  726. }
  727. p.Log().Trace("Received proofs response")
  728. // A batch of merkle proofs arrived to one of our previous requests
  729. var resp struct {
  730. ReqID, BV uint64
  731. Data []light.NodeList
  732. }
  733. if err := msg.Decode(&resp); err != nil {
  734. return errResp(ErrDecode, "msg %v: %v", msg, err)
  735. }
  736. p.fcServer.GotReply(resp.ReqID, resp.BV)
  737. deliverMsg = &Msg{
  738. MsgType: MsgProofsV1,
  739. ReqID: resp.ReqID,
  740. Obj: resp.Data,
  741. }
  742. case ProofsV2Msg:
  743. if pm.odr == nil {
  744. return errResp(ErrUnexpectedResponse, "")
  745. }
  746. p.Log().Trace("Received les/2 proofs response")
  747. // A batch of merkle proofs arrived to one of our previous requests
  748. var resp struct {
  749. ReqID, BV uint64
  750. Data light.NodeList
  751. }
  752. if err := msg.Decode(&resp); err != nil {
  753. return errResp(ErrDecode, "msg %v: %v", msg, err)
  754. }
  755. p.fcServer.GotReply(resp.ReqID, resp.BV)
  756. deliverMsg = &Msg{
  757. MsgType: MsgProofsV2,
  758. ReqID: resp.ReqID,
  759. Obj: resp.Data,
  760. }
  761. case GetHeaderProofsMsg:
  762. p.Log().Trace("Received headers proof request")
  763. // Decode the retrieval message
  764. var req struct {
  765. ReqID uint64
  766. Reqs []ChtReq
  767. }
  768. if err := msg.Decode(&req); err != nil {
  769. return errResp(ErrDecode, "msg %v: %v", msg, err)
  770. }
  771. // Gather state data until the fetch or network limits is reached
  772. var (
  773. bytes int
  774. proofs []ChtResp
  775. )
  776. reqCnt := len(req.Reqs)
  777. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  778. return errResp(ErrRequestRejected, "")
  779. }
  780. trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
  781. for _, req := range req.Reqs {
  782. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  783. sectionHead := core.GetCanonicalHash(pm.chainDb, req.ChtNum*light.CHTFrequencyServer-1)
  784. if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
  785. trie, err := trie.New(root, trieDb)
  786. if err != nil {
  787. continue
  788. }
  789. var encNumber [8]byte
  790. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  791. var proof light.NodeList
  792. trie.Prove(encNumber[:], 0, &proof)
  793. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  794. if bytes += proof.DataSize() + estHeaderRlpSize; bytes >= softResponseLimit {
  795. break
  796. }
  797. }
  798. }
  799. }
  800. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  801. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  802. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  803. case GetHelperTrieProofsMsg:
  804. p.Log().Trace("Received helper trie proof request")
  805. // Decode the retrieval message
  806. var req struct {
  807. ReqID uint64
  808. Reqs []HelperTrieReq
  809. }
  810. if err := msg.Decode(&req); err != nil {
  811. return errResp(ErrDecode, "msg %v: %v", msg, err)
  812. }
  813. // Gather state data until the fetch or network limits is reached
  814. var (
  815. auxBytes int
  816. auxData [][]byte
  817. )
  818. reqCnt := len(req.Reqs)
  819. if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) {
  820. return errResp(ErrRequestRejected, "")
  821. }
  822. var (
  823. lastIdx uint64
  824. lastType uint
  825. root common.Hash
  826. auxTrie *trie.Trie
  827. )
  828. nodes := light.NewNodeSet()
  829. for _, req := range req.Reqs {
  830. if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx {
  831. auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx
  832. var prefix string
  833. if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) {
  834. auxTrie, _ = trie.New(root, trie.NewDatabase(ethdb.NewTable(pm.chainDb, prefix)))
  835. }
  836. }
  837. if req.AuxReq == auxRoot {
  838. var data []byte
  839. if root != (common.Hash{}) {
  840. data = root[:]
  841. }
  842. auxData = append(auxData, data)
  843. auxBytes += len(data)
  844. } else {
  845. if auxTrie != nil {
  846. auxTrie.Prove(req.Key, req.FromLevel, nodes)
  847. }
  848. if req.AuxReq != 0 {
  849. data := pm.getHelperTrieAuxData(req)
  850. auxData = append(auxData, data)
  851. auxBytes += len(data)
  852. }
  853. }
  854. if nodes.DataSize()+auxBytes >= softResponseLimit {
  855. break
  856. }
  857. }
  858. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  859. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  860. return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
  861. case HeaderProofsMsg:
  862. if pm.odr == nil {
  863. return errResp(ErrUnexpectedResponse, "")
  864. }
  865. p.Log().Trace("Received headers proof response")
  866. var resp struct {
  867. ReqID, BV uint64
  868. Data []ChtResp
  869. }
  870. if err := msg.Decode(&resp); err != nil {
  871. return errResp(ErrDecode, "msg %v: %v", msg, err)
  872. }
  873. p.fcServer.GotReply(resp.ReqID, resp.BV)
  874. deliverMsg = &Msg{
  875. MsgType: MsgHeaderProofs,
  876. ReqID: resp.ReqID,
  877. Obj: resp.Data,
  878. }
  879. case HelperTrieProofsMsg:
  880. if pm.odr == nil {
  881. return errResp(ErrUnexpectedResponse, "")
  882. }
  883. p.Log().Trace("Received helper trie proof response")
  884. var resp struct {
  885. ReqID, BV uint64
  886. Data HelperTrieResps
  887. }
  888. if err := msg.Decode(&resp); err != nil {
  889. return errResp(ErrDecode, "msg %v: %v", msg, err)
  890. }
  891. p.fcServer.GotReply(resp.ReqID, resp.BV)
  892. deliverMsg = &Msg{
  893. MsgType: MsgHelperTrieProofs,
  894. ReqID: resp.ReqID,
  895. Obj: resp.Data,
  896. }
  897. case SendTxMsg:
  898. if pm.txpool == nil {
  899. return errResp(ErrRequestRejected, "")
  900. }
  901. // Transactions arrived, parse all of them and deliver to the pool
  902. var txs []*types.Transaction
  903. if err := msg.Decode(&txs); err != nil {
  904. return errResp(ErrDecode, "msg %v: %v", msg, err)
  905. }
  906. reqCnt := len(txs)
  907. if reject(uint64(reqCnt), MaxTxSend) {
  908. return errResp(ErrRequestRejected, "")
  909. }
  910. pm.txpool.AddRemotes(txs)
  911. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  912. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  913. case SendTxV2Msg:
  914. if pm.txpool == nil {
  915. return errResp(ErrRequestRejected, "")
  916. }
  917. // Transactions arrived, parse all of them and deliver to the pool
  918. var req struct {
  919. ReqID uint64
  920. Txs []*types.Transaction
  921. }
  922. if err := msg.Decode(&req); err != nil {
  923. return errResp(ErrDecode, "msg %v: %v", msg, err)
  924. }
  925. reqCnt := len(req.Txs)
  926. if reject(uint64(reqCnt), MaxTxSend) {
  927. return errResp(ErrRequestRejected, "")
  928. }
  929. hashes := make([]common.Hash, len(req.Txs))
  930. for i, tx := range req.Txs {
  931. hashes[i] = tx.Hash()
  932. }
  933. stats := pm.txStatus(hashes)
  934. for i, stat := range stats {
  935. if stat.Status == core.TxStatusUnknown {
  936. if errs := pm.txpool.AddRemotes([]*types.Transaction{req.Txs[i]}); errs[0] != nil {
  937. stats[i].Error = errs[0].Error()
  938. continue
  939. }
  940. stats[i] = pm.txStatus([]common.Hash{hashes[i]})[0]
  941. }
  942. }
  943. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  944. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  945. return p.SendTxStatus(req.ReqID, bv, stats)
  946. case GetTxStatusMsg:
  947. if pm.txpool == nil {
  948. return errResp(ErrUnexpectedResponse, "")
  949. }
  950. // Transactions arrived, parse all of them and deliver to the pool
  951. var req struct {
  952. ReqID uint64
  953. Hashes []common.Hash
  954. }
  955. if err := msg.Decode(&req); err != nil {
  956. return errResp(ErrDecode, "msg %v: %v", msg, err)
  957. }
  958. reqCnt := len(req.Hashes)
  959. if reject(uint64(reqCnt), MaxTxStatus) {
  960. return errResp(ErrRequestRejected, "")
  961. }
  962. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  963. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  964. return p.SendTxStatus(req.ReqID, bv, pm.txStatus(req.Hashes))
  965. case TxStatusMsg:
  966. if pm.odr == nil {
  967. return errResp(ErrUnexpectedResponse, "")
  968. }
  969. p.Log().Trace("Received tx status response")
  970. var resp struct {
  971. ReqID, BV uint64
  972. Status []txStatus
  973. }
  974. if err := msg.Decode(&resp); err != nil {
  975. return errResp(ErrDecode, "msg %v: %v", msg, err)
  976. }
  977. p.fcServer.GotReply(resp.ReqID, resp.BV)
  978. default:
  979. p.Log().Trace("Received unknown message", "code", msg.Code)
  980. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  981. }
  982. if deliverMsg != nil {
  983. err := pm.retriever.deliver(p, deliverMsg)
  984. if err != nil {
  985. p.responseErrors++
  986. if p.responseErrors > maxResponseErrors {
  987. return err
  988. }
  989. }
  990. }
  991. return nil
  992. }
  993. // getAccount retrieves an account from the state based at root.
  994. func (pm *ProtocolManager) getAccount(statedb *state.StateDB, root, hash common.Hash) (state.Account, error) {
  995. trie, err := trie.New(root, statedb.Database().TrieDB())
  996. if err != nil {
  997. return state.Account{}, err
  998. }
  999. blob, err := trie.TryGet(hash[:])
  1000. if err != nil {
  1001. return state.Account{}, err
  1002. }
  1003. var account state.Account
  1004. if err = rlp.DecodeBytes(blob, &account); err != nil {
  1005. return state.Account{}, err
  1006. }
  1007. return account, nil
  1008. }
  1009. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  1010. func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
  1011. switch id {
  1012. case htCanonical:
  1013. sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.CHTFrequencyClient-1)
  1014. return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix
  1015. case htBloomBits:
  1016. sectionHead := core.GetCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1)
  1017. return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
  1018. }
  1019. return common.Hash{}, ""
  1020. }
  1021. // getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request
  1022. func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
  1023. switch {
  1024. case req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8:
  1025. blockNum := binary.BigEndian.Uint64(req.Key)
  1026. hash := core.GetCanonicalHash(pm.chainDb, blockNum)
  1027. return core.GetHeaderRLP(pm.chainDb, hash, blockNum)
  1028. }
  1029. return nil
  1030. }
  1031. func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus {
  1032. stats := make([]txStatus, len(hashes))
  1033. for i, stat := range pm.txpool.Status(hashes) {
  1034. // Save the status we've got from the transaction pool
  1035. stats[i].Status = stat
  1036. // If the transaction is unknown to the pool, try looking it up locally
  1037. if stat == core.TxStatusUnknown {
  1038. if block, number, index := core.GetTxLookupEntry(pm.chainDb, hashes[i]); block != (common.Hash{}) {
  1039. stats[i].Status = core.TxStatusIncluded
  1040. stats[i].Lookup = &core.TxLookupEntry{BlockHash: block, BlockIndex: number, Index: index}
  1041. }
  1042. }
  1043. }
  1044. return stats
  1045. }
  1046. // NodeInfo represents a short summary of the Ethereum sub-protocol metadata
  1047. // known about the host peer.
  1048. type NodeInfo struct {
  1049. Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4)
  1050. Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
  1051. Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
  1052. Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
  1053. Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
  1054. }
  1055. // NodeInfo retrieves some protocol metadata about the running host node.
  1056. func (self *ProtocolManager) NodeInfo() *NodeInfo {
  1057. head := self.blockchain.CurrentHeader()
  1058. hash := head.Hash()
  1059. return &NodeInfo{
  1060. Network: self.networkId,
  1061. Difficulty: self.blockchain.GetTd(hash, head.Number.Uint64()),
  1062. Genesis: self.blockchain.Genesis().Hash(),
  1063. Config: self.blockchain.Config(),
  1064. Head: hash,
  1065. }
  1066. }
  1067. // downloaderPeerNotify implements peerSetNotify
  1068. type downloaderPeerNotify ProtocolManager
  1069. type peerConnection struct {
  1070. manager *ProtocolManager
  1071. peer *peer
  1072. }
  1073. func (pc *peerConnection) Head() (common.Hash, *big.Int) {
  1074. return pc.peer.HeadAndTd()
  1075. }
  1076. func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  1077. reqID := genReqID()
  1078. rq := &distReq{
  1079. getCost: func(dp distPeer) uint64 {
  1080. peer := dp.(*peer)
  1081. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1082. },
  1083. canSend: func(dp distPeer) bool {
  1084. return dp.(*peer) == pc.peer
  1085. },
  1086. request: func(dp distPeer) func() {
  1087. peer := dp.(*peer)
  1088. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1089. peer.fcServer.QueueRequest(reqID, cost)
  1090. return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
  1091. },
  1092. }
  1093. _, ok := <-pc.manager.reqDist.queue(rq)
  1094. if !ok {
  1095. return ErrNoPeers
  1096. }
  1097. return nil
  1098. }
  1099. func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  1100. reqID := genReqID()
  1101. rq := &distReq{
  1102. getCost: func(dp distPeer) uint64 {
  1103. peer := dp.(*peer)
  1104. return peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1105. },
  1106. canSend: func(dp distPeer) bool {
  1107. return dp.(*peer) == pc.peer
  1108. },
  1109. request: func(dp distPeer) func() {
  1110. peer := dp.(*peer)
  1111. cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
  1112. peer.fcServer.QueueRequest(reqID, cost)
  1113. return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
  1114. },
  1115. }
  1116. _, ok := <-pc.manager.reqDist.queue(rq)
  1117. if !ok {
  1118. return ErrNoPeers
  1119. }
  1120. return nil
  1121. }
  1122. func (d *downloaderPeerNotify) registerPeer(p *peer) {
  1123. pm := (*ProtocolManager)(d)
  1124. pc := &peerConnection{
  1125. manager: pm,
  1126. peer: p,
  1127. }
  1128. pm.downloader.RegisterLightPeer(p.id, ethVersion, pc)
  1129. }
  1130. func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
  1131. pm := (*ProtocolManager)(d)
  1132. pm.downloader.UnregisterPeer(p.id)
  1133. }