handler.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/eth"
  31. "github.com/ethereum/go-ethereum/eth/downloader"
  32. "github.com/ethereum/go-ethereum/ethdb"
  33. "github.com/ethereum/go-ethereum/event"
  34. "github.com/ethereum/go-ethereum/log"
  35. "github.com/ethereum/go-ethereum/p2p"
  36. "github.com/ethereum/go-ethereum/p2p/discover"
  37. "github.com/ethereum/go-ethereum/p2p/discv5"
  38. "github.com/ethereum/go-ethereum/params"
  39. "github.com/ethereum/go-ethereum/pow"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 63 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. disableClientRemovePeer = false
  55. )
  56. // errIncompatibleConfig is returned if the requested protocols and configs are
  57. // not compatible (low protocol version restrictions and high requirements).
  58. var errIncompatibleConfig = errors.New("incompatible configuration")
  59. func errResp(code errCode, format string, v ...interface{}) error {
  60. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  61. }
  62. type hashFetcherFn func(common.Hash) error
  63. type BlockChain interface {
  64. HasHeader(hash common.Hash) bool
  65. GetHeader(hash common.Hash, number uint64) *types.Header
  66. GetHeaderByHash(hash common.Hash) *types.Header
  67. CurrentHeader() *types.Header
  68. GetTdByHash(hash common.Hash) *big.Int
  69. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  70. Rollback(chain []common.Hash)
  71. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  72. GetHeaderByNumber(number uint64) *types.Header
  73. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  74. LastBlockHash() common.Hash
  75. Genesis() *types.Block
  76. }
  77. type txPool interface {
  78. // AddTransactions should add the given transactions to the pool.
  79. AddBatch([]*types.Transaction) error
  80. }
  81. type ProtocolManager struct {
  82. lightSync bool
  83. txpool txPool
  84. txrelay *LesTxRelay
  85. networkId int
  86. chainConfig *params.ChainConfig
  87. blockchain BlockChain
  88. chainDb ethdb.Database
  89. odr *LesOdr
  90. server *LesServer
  91. serverPool *serverPool
  92. downloader *downloader.Downloader
  93. fetcher *lightFetcher
  94. peers *peerSet
  95. SubProtocols []p2p.Protocol
  96. eventMux *event.TypeMux
  97. // channels for fetcher, syncer, txsyncLoop
  98. newPeerCh chan *peer
  99. quitSync chan struct{}
  100. noMorePeers chan struct{}
  101. syncMu sync.Mutex
  102. syncing bool
  103. syncDone chan struct{}
  104. // wait group is used for graceful shutdowns during downloading
  105. // and processing
  106. wg sync.WaitGroup
  107. }
  108. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  109. // with the ethereum network.
  110. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, pow pow.PoW, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
  111. // Create the protocol manager with the base fields
  112. manager := &ProtocolManager{
  113. lightSync: lightSync,
  114. eventMux: mux,
  115. blockchain: blockchain,
  116. chainConfig: chainConfig,
  117. chainDb: chainDb,
  118. networkId: networkId,
  119. txpool: txpool,
  120. txrelay: txrelay,
  121. odr: odr,
  122. peers: newPeerSet(),
  123. newPeerCh: make(chan *peer),
  124. quitSync: make(chan struct{}),
  125. noMorePeers: make(chan struct{}),
  126. }
  127. // Initiate a sub-protocol for every implemented version we can handle
  128. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  129. for i, version := range ProtocolVersions {
  130. // Compatible, initialize the sub-protocol
  131. version := version // Closure for the run
  132. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  133. Name: "les",
  134. Version: version,
  135. Length: ProtocolLengths[i],
  136. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  137. var entry *poolEntry
  138. peer := manager.newPeer(int(version), networkId, p, rw)
  139. if manager.serverPool != nil {
  140. addr := p.RemoteAddr().(*net.TCPAddr)
  141. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  142. }
  143. peer.poolEntry = entry
  144. select {
  145. case manager.newPeerCh <- peer:
  146. manager.wg.Add(1)
  147. defer manager.wg.Done()
  148. err := manager.handle(peer)
  149. if entry != nil {
  150. manager.serverPool.disconnect(entry)
  151. }
  152. return err
  153. case <-manager.quitSync:
  154. if entry != nil {
  155. manager.serverPool.disconnect(entry)
  156. }
  157. return p2p.DiscQuitting
  158. }
  159. },
  160. NodeInfo: func() interface{} {
  161. return manager.NodeInfo()
  162. },
  163. PeerInfo: func(id discover.NodeID) interface{} {
  164. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  165. return p.Info()
  166. }
  167. return nil
  168. },
  169. })
  170. }
  171. if len(manager.SubProtocols) == 0 {
  172. return nil, errIncompatibleConfig
  173. }
  174. removePeer := manager.removePeer
  175. if disableClientRemovePeer {
  176. removePeer = func(id string) {}
  177. }
  178. if lightSync {
  179. log.Debug(fmt.Sprintf("LES: create downloader"))
  180. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
  181. nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
  182. blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
  183. }
  184. if odr != nil {
  185. odr.removePeer = removePeer
  186. }
  187. /*validator := func(block *types.Block, parent *types.Block) error {
  188. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  189. }
  190. heighter := func() uint64 {
  191. return chainman.LastBlockNumberU64()
  192. }
  193. manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
  194. */
  195. return manager, nil
  196. }
  197. func (pm *ProtocolManager) removePeer(id string) {
  198. // Short circuit if the peer was already removed
  199. peer := pm.peers.Peer(id)
  200. if peer == nil {
  201. return
  202. }
  203. if err := pm.peers.Unregister(id); err != nil {
  204. if err == errNotRegistered {
  205. return
  206. }
  207. log.Error(fmt.Sprint("Removal failed:", err))
  208. }
  209. log.Debug(fmt.Sprint("Removing peer", id))
  210. // Unregister the peer from the downloader and Ethereum peer set
  211. log.Debug(fmt.Sprintf("LES: unregister peer %v", id))
  212. if pm.lightSync {
  213. pm.downloader.UnregisterPeer(id)
  214. if pm.txrelay != nil {
  215. pm.txrelay.removePeer(id)
  216. }
  217. if pm.fetcher != nil {
  218. pm.fetcher.removePeer(peer)
  219. }
  220. }
  221. // Hard disconnect at the networking layer
  222. if peer != nil {
  223. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  224. }
  225. }
  226. func (pm *ProtocolManager) Start(srvr *p2p.Server) {
  227. var topicDisc *discv5.Network
  228. if srvr != nil {
  229. topicDisc = srvr.DiscV5
  230. }
  231. lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
  232. if pm.lightSync {
  233. // start sync handler
  234. if srvr != nil { // srvr is nil during testing
  235. pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
  236. pm.odr.serverPool = pm.serverPool
  237. pm.fetcher = newLightFetcher(pm)
  238. }
  239. go pm.syncer()
  240. } else {
  241. if topicDisc != nil {
  242. go func() {
  243. log.Info(fmt.Sprint("Starting registering topic", string(lesTopic)))
  244. topicDisc.RegisterTopic(lesTopic, pm.quitSync)
  245. log.Info(fmt.Sprint("Stopped registering topic", string(lesTopic)))
  246. }()
  247. }
  248. go func() {
  249. for range pm.newPeerCh {
  250. }
  251. }()
  252. }
  253. }
  254. func (pm *ProtocolManager) Stop() {
  255. // Showing a log message. During download / process this could actually
  256. // take between 5 to 10 seconds and therefor feedback is required.
  257. log.Info(fmt.Sprint("Stopping light ethereum protocol handler..."))
  258. // Quit the sync loop.
  259. // After this send has completed, no new peers will be accepted.
  260. pm.noMorePeers <- struct{}{}
  261. close(pm.quitSync) // quits syncer, fetcher
  262. // Disconnect existing sessions.
  263. // This also closes the gate for any new registrations on the peer set.
  264. // sessions which are already established but not added to pm.peers yet
  265. // will exit when they try to register.
  266. pm.peers.Close()
  267. // Wait for any process action
  268. pm.wg.Wait()
  269. log.Info(fmt.Sprint("Light ethereum protocol handler stopped"))
  270. }
  271. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  272. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  273. }
  274. // handle is the callback invoked to manage the life cycle of a les peer. When
  275. // this function terminates, the peer is disconnected.
  276. func (pm *ProtocolManager) handle(p *peer) error {
  277. log.Debug(fmt.Sprintf("%v: peer connected [%s]", p, p.Name()))
  278. // Execute the LES handshake
  279. td, head, genesis := pm.blockchain.Status()
  280. headNum := core.GetBlockNumber(pm.chainDb, head)
  281. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  282. log.Debug(fmt.Sprintf("%v: handshake failed: %v", p, err))
  283. return err
  284. }
  285. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  286. rw.Init(p.version)
  287. }
  288. // Register the peer locally
  289. log.Trace(fmt.Sprintf("%v: adding peer", p))
  290. if err := pm.peers.Register(p); err != nil {
  291. log.Error(fmt.Sprintf("%v: addition failed: %v", p, err))
  292. return err
  293. }
  294. defer func() {
  295. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  296. p.fcClient.Remove(pm.server.fcManager)
  297. }
  298. pm.removePeer(p.id)
  299. }()
  300. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  301. log.Debug(fmt.Sprintf("LES: register peer %v", p.id))
  302. if pm.lightSync {
  303. requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
  304. reqID := getNextReqID()
  305. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  306. p.fcServer.MustAssignRequest(reqID)
  307. p.fcServer.SendRequest(reqID, cost)
  308. return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
  309. }
  310. requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
  311. reqID := getNextReqID()
  312. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  313. p.fcServer.MustAssignRequest(reqID)
  314. p.fcServer.SendRequest(reqID, cost)
  315. return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
  316. }
  317. if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
  318. requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
  319. return err
  320. }
  321. if pm.txrelay != nil {
  322. pm.txrelay.addPeer(p)
  323. }
  324. p.lock.Lock()
  325. head := p.headInfo
  326. p.lock.Unlock()
  327. if pm.fetcher != nil {
  328. pm.fetcher.addPeer(p)
  329. pm.fetcher.announce(p, head)
  330. }
  331. if p.poolEntry != nil {
  332. pm.serverPool.registered(p.poolEntry)
  333. }
  334. }
  335. stop := make(chan struct{})
  336. defer close(stop)
  337. go func() {
  338. // new block announce loop
  339. for {
  340. select {
  341. case announce := <-p.announceChn:
  342. p.SendAnnounce(announce)
  343. case <-stop:
  344. return
  345. }
  346. }
  347. }()
  348. // main loop. handle incoming messages.
  349. for {
  350. if err := pm.handleMsg(p); err != nil {
  351. log.Debug(fmt.Sprintf("%v: message handling failed: %v", p, err))
  352. return err
  353. }
  354. }
  355. }
  356. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  357. // handleMsg is invoked whenever an inbound message is received from a remote
  358. // peer. The remote connection is torn down upon returning any error.
  359. func (pm *ProtocolManager) handleMsg(p *peer) error {
  360. // Read the next message from the remote peer, and ensure it's fully consumed
  361. msg, err := p.rw.ReadMsg()
  362. if err != nil {
  363. return err
  364. }
  365. log.Debug(fmt.Sprint("msg:", msg.Code, msg.Size))
  366. costs := p.fcCosts[msg.Code]
  367. reject := func(reqCnt, maxCnt uint64) bool {
  368. if p.fcClient == nil || reqCnt > maxCnt {
  369. return true
  370. }
  371. bufValue, _ := p.fcClient.AcceptRequest()
  372. cost := costs.baseCost + reqCnt*costs.reqCost
  373. if cost > pm.server.defParams.BufLimit {
  374. cost = pm.server.defParams.BufLimit
  375. }
  376. if cost > bufValue {
  377. log.Error(fmt.Sprintf("Request from %v came %v too early", p.id, time.Duration((cost-bufValue)*1000000/pm.server.defParams.MinRecharge)))
  378. return true
  379. }
  380. return false
  381. }
  382. if msg.Size > ProtocolMaxMsgSize {
  383. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  384. }
  385. defer msg.Discard()
  386. var deliverMsg *Msg
  387. // Handle the message depending on its contents
  388. switch msg.Code {
  389. case StatusMsg:
  390. log.Debug(fmt.Sprintf("<=== StatusMsg from peer %v", p.id))
  391. // Status messages should never arrive after the handshake
  392. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  393. // Block header query, collect the requested headers and reply
  394. case AnnounceMsg:
  395. log.Debug(fmt.Sprint("<=== AnnounceMsg from peer %v:", p.id))
  396. var req announceData
  397. if err := msg.Decode(&req); err != nil {
  398. return errResp(ErrDecode, "%v: %v", msg, err)
  399. }
  400. log.Trace(fmt.Sprint("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth))
  401. if pm.fetcher != nil {
  402. pm.fetcher.announce(p, &req)
  403. }
  404. case GetBlockHeadersMsg:
  405. log.Debug(fmt.Sprintf("<=== GetBlockHeadersMsg from peer %v", p.id))
  406. // Decode the complex header query
  407. var req struct {
  408. ReqID uint64
  409. Query getBlockHeadersData
  410. }
  411. if err := msg.Decode(&req); err != nil {
  412. return errResp(ErrDecode, "%v: %v", msg, err)
  413. }
  414. query := req.Query
  415. if reject(query.Amount, MaxHeaderFetch) {
  416. return errResp(ErrRequestRejected, "")
  417. }
  418. hashMode := query.Origin.Hash != (common.Hash{})
  419. // Gather headers until the fetch or network limits is reached
  420. var (
  421. bytes common.StorageSize
  422. headers []*types.Header
  423. unknown bool
  424. )
  425. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  426. // Retrieve the next header satisfying the query
  427. var origin *types.Header
  428. if hashMode {
  429. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  430. } else {
  431. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  432. }
  433. if origin == nil {
  434. break
  435. }
  436. number := origin.Number.Uint64()
  437. headers = append(headers, origin)
  438. bytes += estHeaderRlpSize
  439. // Advance to the next header of the query
  440. switch {
  441. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  442. // Hash based traversal towards the genesis block
  443. for i := 0; i < int(query.Skip)+1; i++ {
  444. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  445. query.Origin.Hash = header.ParentHash
  446. number--
  447. } else {
  448. unknown = true
  449. break
  450. }
  451. }
  452. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  453. // Hash based traversal towards the leaf block
  454. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  455. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  456. query.Origin.Hash = header.Hash()
  457. } else {
  458. unknown = true
  459. }
  460. } else {
  461. unknown = true
  462. }
  463. case query.Reverse:
  464. // Number based traversal towards the genesis block
  465. if query.Origin.Number >= query.Skip+1 {
  466. query.Origin.Number -= (query.Skip + 1)
  467. } else {
  468. unknown = true
  469. }
  470. case !query.Reverse:
  471. // Number based traversal towards the leaf block
  472. query.Origin.Number += (query.Skip + 1)
  473. }
  474. }
  475. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  476. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  477. return p.SendBlockHeaders(req.ReqID, bv, headers)
  478. case BlockHeadersMsg:
  479. if pm.downloader == nil {
  480. return errResp(ErrUnexpectedResponse, "")
  481. }
  482. log.Debug(fmt.Sprintf("<=== BlockHeadersMsg from peer %v", p.id))
  483. // A batch of headers arrived to one of our previous requests
  484. var resp struct {
  485. ReqID, BV uint64
  486. Headers []*types.Header
  487. }
  488. if err := msg.Decode(&resp); err != nil {
  489. return errResp(ErrDecode, "msg %v: %v", msg, err)
  490. }
  491. p.fcServer.GotReply(resp.ReqID, resp.BV)
  492. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  493. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  494. } else {
  495. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  496. if err != nil {
  497. log.Debug(fmt.Sprint(err))
  498. }
  499. }
  500. case GetBlockBodiesMsg:
  501. log.Debug(fmt.Sprintf("<=== GetBlockBodiesMsg from peer %v", p.id))
  502. // Decode the retrieval message
  503. var req struct {
  504. ReqID uint64
  505. Hashes []common.Hash
  506. }
  507. if err := msg.Decode(&req); err != nil {
  508. return errResp(ErrDecode, "msg %v: %v", msg, err)
  509. }
  510. // Gather blocks until the fetch or network limits is reached
  511. var (
  512. bytes int
  513. bodies []rlp.RawValue
  514. )
  515. reqCnt := len(req.Hashes)
  516. if reject(uint64(reqCnt), MaxBodyFetch) {
  517. return errResp(ErrRequestRejected, "")
  518. }
  519. for _, hash := range req.Hashes {
  520. if bytes >= softResponseLimit {
  521. break
  522. }
  523. // Retrieve the requested block body, stopping if enough was found
  524. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  525. bodies = append(bodies, data)
  526. bytes += len(data)
  527. }
  528. }
  529. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  530. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  531. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  532. case BlockBodiesMsg:
  533. if pm.odr == nil {
  534. return errResp(ErrUnexpectedResponse, "")
  535. }
  536. log.Debug(fmt.Sprintf("<=== BlockBodiesMsg from peer %v", p.id))
  537. // A batch of block bodies arrived to one of our previous requests
  538. var resp struct {
  539. ReqID, BV uint64
  540. Data []*types.Body
  541. }
  542. if err := msg.Decode(&resp); err != nil {
  543. return errResp(ErrDecode, "msg %v: %v", msg, err)
  544. }
  545. p.fcServer.GotReply(resp.ReqID, resp.BV)
  546. deliverMsg = &Msg{
  547. MsgType: MsgBlockBodies,
  548. ReqID: resp.ReqID,
  549. Obj: resp.Data,
  550. }
  551. case GetCodeMsg:
  552. log.Debug(fmt.Sprintf("<=== GetCodeMsg from peer %v", p.id))
  553. // Decode the retrieval message
  554. var req struct {
  555. ReqID uint64
  556. Reqs []CodeReq
  557. }
  558. if err := msg.Decode(&req); err != nil {
  559. return errResp(ErrDecode, "msg %v: %v", msg, err)
  560. }
  561. // Gather state data until the fetch or network limits is reached
  562. var (
  563. bytes int
  564. data [][]byte
  565. )
  566. reqCnt := len(req.Reqs)
  567. if reject(uint64(reqCnt), MaxCodeFetch) {
  568. return errResp(ErrRequestRejected, "")
  569. }
  570. for _, req := range req.Reqs {
  571. // Retrieve the requested state entry, stopping if enough was found
  572. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  573. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  574. sdata := trie.Get(req.AccKey)
  575. var acc state.Account
  576. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  577. entry, _ := pm.chainDb.Get(acc.CodeHash)
  578. if bytes+len(entry) >= softResponseLimit {
  579. break
  580. }
  581. data = append(data, entry)
  582. bytes += len(entry)
  583. }
  584. }
  585. }
  586. }
  587. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  588. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  589. return p.SendCode(req.ReqID, bv, data)
  590. case CodeMsg:
  591. if pm.odr == nil {
  592. return errResp(ErrUnexpectedResponse, "")
  593. }
  594. log.Debug(fmt.Sprintf("<=== CodeMsg from peer %v", p.id))
  595. // A batch of node state data arrived to one of our previous requests
  596. var resp struct {
  597. ReqID, BV uint64
  598. Data [][]byte
  599. }
  600. if err := msg.Decode(&resp); err != nil {
  601. return errResp(ErrDecode, "msg %v: %v", msg, err)
  602. }
  603. p.fcServer.GotReply(resp.ReqID, resp.BV)
  604. deliverMsg = &Msg{
  605. MsgType: MsgCode,
  606. ReqID: resp.ReqID,
  607. Obj: resp.Data,
  608. }
  609. case GetReceiptsMsg:
  610. log.Debug(fmt.Sprintf("<=== GetReceiptsMsg from peer %v", p.id))
  611. // Decode the retrieval message
  612. var req struct {
  613. ReqID uint64
  614. Hashes []common.Hash
  615. }
  616. if err := msg.Decode(&req); err != nil {
  617. return errResp(ErrDecode, "msg %v: %v", msg, err)
  618. }
  619. // Gather state data until the fetch or network limits is reached
  620. var (
  621. bytes int
  622. receipts []rlp.RawValue
  623. )
  624. reqCnt := len(req.Hashes)
  625. if reject(uint64(reqCnt), MaxReceiptFetch) {
  626. return errResp(ErrRequestRejected, "")
  627. }
  628. for _, hash := range req.Hashes {
  629. if bytes >= softResponseLimit {
  630. break
  631. }
  632. // Retrieve the requested block's receipts, skipping if unknown to us
  633. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  634. if results == nil {
  635. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  636. continue
  637. }
  638. }
  639. // If known, encode and queue for response packet
  640. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  641. log.Error(fmt.Sprintf("failed to encode receipt: %v", err))
  642. } else {
  643. receipts = append(receipts, encoded)
  644. bytes += len(encoded)
  645. }
  646. }
  647. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  648. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  649. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  650. case ReceiptsMsg:
  651. if pm.odr == nil {
  652. return errResp(ErrUnexpectedResponse, "")
  653. }
  654. log.Debug(fmt.Sprintf("<=== ReceiptsMsg from peer %v", p.id))
  655. // A batch of receipts arrived to one of our previous requests
  656. var resp struct {
  657. ReqID, BV uint64
  658. Receipts []types.Receipts
  659. }
  660. if err := msg.Decode(&resp); err != nil {
  661. return errResp(ErrDecode, "msg %v: %v", msg, err)
  662. }
  663. p.fcServer.GotReply(resp.ReqID, resp.BV)
  664. deliverMsg = &Msg{
  665. MsgType: MsgReceipts,
  666. ReqID: resp.ReqID,
  667. Obj: resp.Receipts,
  668. }
  669. case GetProofsMsg:
  670. log.Debug(fmt.Sprintf("<=== GetProofsMsg from peer %v", p.id))
  671. // Decode the retrieval message
  672. var req struct {
  673. ReqID uint64
  674. Reqs []ProofReq
  675. }
  676. if err := msg.Decode(&req); err != nil {
  677. return errResp(ErrDecode, "msg %v: %v", msg, err)
  678. }
  679. // Gather state data until the fetch or network limits is reached
  680. var (
  681. bytes int
  682. proofs proofsData
  683. )
  684. reqCnt := len(req.Reqs)
  685. if reject(uint64(reqCnt), MaxProofsFetch) {
  686. return errResp(ErrRequestRejected, "")
  687. }
  688. for _, req := range req.Reqs {
  689. if bytes >= softResponseLimit {
  690. break
  691. }
  692. // Retrieve the requested state entry, stopping if enough was found
  693. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  694. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  695. if len(req.AccKey) > 0 {
  696. sdata := tr.Get(req.AccKey)
  697. tr = nil
  698. var acc state.Account
  699. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  700. tr, _ = trie.New(acc.Root, pm.chainDb)
  701. }
  702. }
  703. if tr != nil {
  704. proof := tr.Prove(req.Key)
  705. proofs = append(proofs, proof)
  706. bytes += len(proof)
  707. }
  708. }
  709. }
  710. }
  711. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  712. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  713. return p.SendProofs(req.ReqID, bv, proofs)
  714. case ProofsMsg:
  715. if pm.odr == nil {
  716. return errResp(ErrUnexpectedResponse, "")
  717. }
  718. log.Debug(fmt.Sprintf("<=== ProofsMsg from peer %v", p.id))
  719. // A batch of merkle proofs arrived to one of our previous requests
  720. var resp struct {
  721. ReqID, BV uint64
  722. Data [][]rlp.RawValue
  723. }
  724. if err := msg.Decode(&resp); err != nil {
  725. return errResp(ErrDecode, "msg %v: %v", msg, err)
  726. }
  727. p.fcServer.GotReply(resp.ReqID, resp.BV)
  728. deliverMsg = &Msg{
  729. MsgType: MsgProofs,
  730. ReqID: resp.ReqID,
  731. Obj: resp.Data,
  732. }
  733. case GetHeaderProofsMsg:
  734. log.Debug(fmt.Sprintf("<=== GetHeaderProofsMsg from peer %v", p.id))
  735. // Decode the retrieval message
  736. var req struct {
  737. ReqID uint64
  738. Reqs []ChtReq
  739. }
  740. if err := msg.Decode(&req); err != nil {
  741. return errResp(ErrDecode, "msg %v: %v", msg, err)
  742. }
  743. // Gather state data until the fetch or network limits is reached
  744. var (
  745. bytes int
  746. proofs []ChtResp
  747. )
  748. reqCnt := len(req.Reqs)
  749. if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
  750. return errResp(ErrRequestRejected, "")
  751. }
  752. for _, req := range req.Reqs {
  753. if bytes >= softResponseLimit {
  754. break
  755. }
  756. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  757. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  758. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  759. var encNumber [8]byte
  760. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  761. proof := tr.Prove(encNumber[:])
  762. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  763. bytes += len(proof) + estHeaderRlpSize
  764. }
  765. }
  766. }
  767. }
  768. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  769. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  770. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  771. case HeaderProofsMsg:
  772. if pm.odr == nil {
  773. return errResp(ErrUnexpectedResponse, "")
  774. }
  775. log.Debug(fmt.Sprintf("<=== HeaderProofsMsg from peer %v", p.id))
  776. var resp struct {
  777. ReqID, BV uint64
  778. Data []ChtResp
  779. }
  780. if err := msg.Decode(&resp); err != nil {
  781. return errResp(ErrDecode, "msg %v: %v", msg, err)
  782. }
  783. p.fcServer.GotReply(resp.ReqID, resp.BV)
  784. deliverMsg = &Msg{
  785. MsgType: MsgHeaderProofs,
  786. ReqID: resp.ReqID,
  787. Obj: resp.Data,
  788. }
  789. case SendTxMsg:
  790. if pm.txpool == nil {
  791. return errResp(ErrUnexpectedResponse, "")
  792. }
  793. // Transactions arrived, parse all of them and deliver to the pool
  794. var txs []*types.Transaction
  795. if err := msg.Decode(&txs); err != nil {
  796. return errResp(ErrDecode, "msg %v: %v", msg, err)
  797. }
  798. reqCnt := len(txs)
  799. if reject(uint64(reqCnt), MaxTxSend) {
  800. return errResp(ErrRequestRejected, "")
  801. }
  802. if err := pm.txpool.AddBatch(txs); err != nil {
  803. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  804. }
  805. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  806. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  807. default:
  808. log.Debug(fmt.Sprintf("<=== unknown message with code %d from peer %v", msg.Code, p.id))
  809. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  810. }
  811. if deliverMsg != nil {
  812. return pm.odr.Deliver(p, deliverMsg)
  813. }
  814. return nil
  815. }
  816. // NodeInfo retrieves some protocol metadata about the running host node.
  817. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  818. return &eth.EthNodeInfo{
  819. Network: self.networkId,
  820. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  821. Genesis: self.blockchain.Genesis().Hash(),
  822. Head: self.blockchain.LastBlockHash(),
  823. }
  824. }