handler.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/state"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/eth"
  30. "github.com/ethereum/go-ethereum/eth/downloader"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. "github.com/ethereum/go-ethereum/event"
  33. "github.com/ethereum/go-ethereum/logger"
  34. "github.com/ethereum/go-ethereum/logger/glog"
  35. "github.com/ethereum/go-ethereum/p2p"
  36. "github.com/ethereum/go-ethereum/p2p/discover"
  37. "github.com/ethereum/go-ethereum/p2p/discv5"
  38. "github.com/ethereum/go-ethereum/params"
  39. "github.com/ethereum/go-ethereum/pow"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 63 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. disableClientRemovePeer = true
  55. )
  56. // errIncompatibleConfig is returned if the requested protocols and configs are
  57. // not compatible (low protocol version restrictions and high requirements).
  58. var errIncompatibleConfig = errors.New("incompatible configuration")
  59. func errResp(code errCode, format string, v ...interface{}) error {
  60. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  61. }
  62. type hashFetcherFn func(common.Hash) error
  63. type BlockChain interface {
  64. HasHeader(hash common.Hash) bool
  65. GetHeader(hash common.Hash, number uint64) *types.Header
  66. GetHeaderByHash(hash common.Hash) *types.Header
  67. CurrentHeader() *types.Header
  68. GetTdByHash(hash common.Hash) *big.Int
  69. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  70. Rollback(chain []common.Hash)
  71. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  72. GetHeaderByNumber(number uint64) *types.Header
  73. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  74. LastBlockHash() common.Hash
  75. Genesis() *types.Block
  76. }
  77. type txPool interface {
  78. // AddTransactions should add the given transactions to the pool.
  79. AddBatch([]*types.Transaction) error
  80. }
  81. type ProtocolManager struct {
  82. lightSync bool
  83. txpool txPool
  84. txrelay *LesTxRelay
  85. networkId int
  86. chainConfig *params.ChainConfig
  87. blockchain BlockChain
  88. chainDb ethdb.Database
  89. odr *LesOdr
  90. server *LesServer
  91. topicDisc *discv5.Network
  92. lesTopic discv5.Topic
  93. p2pServer *p2p.Server
  94. downloader *downloader.Downloader
  95. fetcher *lightFetcher
  96. peers *peerSet
  97. SubProtocols []p2p.Protocol
  98. eventMux *event.TypeMux
  99. // channels for fetcher, syncer, txsyncLoop
  100. newPeerCh chan *peer
  101. quitSync chan struct{}
  102. noMorePeers chan struct{}
  103. syncMu sync.Mutex
  104. syncing bool
  105. syncDone chan struct{}
  106. // wait group is used for graceful shutdowns during downloading
  107. // and processing
  108. wg sync.WaitGroup
  109. }
  110. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  111. // with the ethereum network.
  112. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, pow pow.PoW, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
  113. // Create the protocol manager with the base fields
  114. manager := &ProtocolManager{
  115. lightSync: lightSync,
  116. eventMux: mux,
  117. blockchain: blockchain,
  118. chainConfig: chainConfig,
  119. chainDb: chainDb,
  120. networkId: networkId,
  121. txpool: txpool,
  122. txrelay: txrelay,
  123. odr: odr,
  124. peers: newPeerSet(),
  125. newPeerCh: make(chan *peer),
  126. quitSync: make(chan struct{}),
  127. noMorePeers: make(chan struct{}),
  128. }
  129. // Initiate a sub-protocol for every implemented version we can handle
  130. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  131. for i, version := range ProtocolVersions {
  132. // Compatible, initialize the sub-protocol
  133. version := version // Closure for the run
  134. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  135. Name: "les",
  136. Version: version,
  137. Length: ProtocolLengths[i],
  138. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  139. peer := manager.newPeer(int(version), networkId, p, rw)
  140. select {
  141. case manager.newPeerCh <- peer:
  142. manager.wg.Add(1)
  143. defer manager.wg.Done()
  144. return manager.handle(peer)
  145. case <-manager.quitSync:
  146. return p2p.DiscQuitting
  147. }
  148. },
  149. NodeInfo: func() interface{} {
  150. return manager.NodeInfo()
  151. },
  152. PeerInfo: func(id discover.NodeID) interface{} {
  153. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  154. return p.Info()
  155. }
  156. return nil
  157. },
  158. })
  159. }
  160. if len(manager.SubProtocols) == 0 {
  161. return nil, errIncompatibleConfig
  162. }
  163. removePeer := manager.removePeer
  164. if disableClientRemovePeer {
  165. removePeer = func(id string) {}
  166. }
  167. if lightSync {
  168. glog.V(logger.Debug).Infof("LES: create downloader")
  169. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
  170. nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
  171. blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
  172. manager.fetcher = newLightFetcher(manager)
  173. }
  174. if odr != nil {
  175. odr.removePeer = removePeer
  176. }
  177. /*validator := func(block *types.Block, parent *types.Block) error {
  178. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  179. }
  180. heighter := func() uint64 {
  181. return chainman.LastBlockNumberU64()
  182. }
  183. manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
  184. */
  185. return manager, nil
  186. }
  187. func (pm *ProtocolManager) removePeer(id string) {
  188. // Short circuit if the peer was already removed
  189. peer := pm.peers.Peer(id)
  190. if peer == nil {
  191. return
  192. }
  193. glog.V(logger.Debug).Infoln("Removing peer", id)
  194. // Unregister the peer from the downloader and Ethereum peer set
  195. glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
  196. if pm.lightSync {
  197. pm.downloader.UnregisterPeer(id)
  198. pm.odr.UnregisterPeer(peer)
  199. if pm.txrelay != nil {
  200. pm.txrelay.removePeer(id)
  201. }
  202. }
  203. if err := pm.peers.Unregister(id); err != nil {
  204. glog.V(logger.Error).Infoln("Removal failed:", err)
  205. }
  206. // Hard disconnect at the networking layer
  207. if peer != nil {
  208. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  209. }
  210. }
  211. func (pm *ProtocolManager) findServers() {
  212. if pm.p2pServer == nil || pm.topicDisc == nil {
  213. return
  214. }
  215. glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
  216. enodes := make(chan string, 100)
  217. stop := make(chan struct{})
  218. go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
  219. go func() {
  220. added := make(map[string]bool)
  221. for {
  222. select {
  223. case enode := <-enodes:
  224. if !added[enode] {
  225. glog.V(logger.Info).Infoln("Found LES server:", enode)
  226. added[enode] = true
  227. if node, err := discover.ParseNode(enode); err == nil {
  228. pm.p2pServer.AddPeer(node)
  229. }
  230. }
  231. case <-stop:
  232. return
  233. }
  234. }
  235. }()
  236. select {
  237. case <-time.After(time.Second * 20):
  238. case <-pm.quitSync:
  239. }
  240. close(stop)
  241. }
  242. func (pm *ProtocolManager) Start(srvr *p2p.Server) {
  243. pm.p2pServer = srvr
  244. if srvr != nil {
  245. pm.topicDisc = srvr.DiscV5
  246. }
  247. pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
  248. if pm.lightSync {
  249. // start sync handler
  250. go pm.findServers()
  251. go pm.syncer()
  252. } else {
  253. if pm.topicDisc != nil {
  254. go func() {
  255. glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
  256. pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
  257. glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
  258. }()
  259. }
  260. go func() {
  261. for range pm.newPeerCh {
  262. }
  263. }()
  264. }
  265. }
  266. func (pm *ProtocolManager) Stop() {
  267. // Showing a log message. During download / process this could actually
  268. // take between 5 to 10 seconds and therefor feedback is required.
  269. glog.V(logger.Info).Infoln("Stopping light ethereum protocol handler...")
  270. // Quit the sync loop.
  271. // After this send has completed, no new peers will be accepted.
  272. pm.noMorePeers <- struct{}{}
  273. close(pm.quitSync) // quits syncer, fetcher
  274. // Disconnect existing sessions.
  275. // This also closes the gate for any new registrations on the peer set.
  276. // sessions which are already established but not added to pm.peers yet
  277. // will exit when they try to register.
  278. pm.peers.Close()
  279. // Wait for any process action
  280. pm.wg.Wait()
  281. glog.V(logger.Info).Infoln("Light ethereum protocol handler stopped")
  282. }
  283. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  284. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  285. }
  286. // handle is the callback invoked to manage the life cycle of a les peer. When
  287. // this function terminates, the peer is disconnected.
  288. func (pm *ProtocolManager) handle(p *peer) error {
  289. glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
  290. // Execute the LES handshake
  291. td, head, genesis := pm.blockchain.Status()
  292. headNum := core.GetBlockNumber(pm.chainDb, head)
  293. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  294. glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
  295. return err
  296. }
  297. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  298. rw.Init(p.version)
  299. }
  300. // Register the peer locally
  301. glog.V(logger.Detail).Infof("%v: adding peer", p)
  302. if err := pm.peers.Register(p); err != nil {
  303. glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
  304. return err
  305. }
  306. defer func() {
  307. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  308. p.fcClient.Remove(pm.server.fcManager)
  309. }
  310. pm.removePeer(p.id)
  311. }()
  312. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  313. glog.V(logger.Debug).Infof("LES: register peer %v", p.id)
  314. if pm.lightSync {
  315. requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
  316. reqID := pm.odr.getNextReqID()
  317. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  318. p.fcServer.SendRequest(reqID, cost)
  319. return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
  320. }
  321. requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
  322. reqID := pm.odr.getNextReqID()
  323. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  324. p.fcServer.SendRequest(reqID, cost)
  325. return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
  326. }
  327. if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
  328. requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
  329. return err
  330. }
  331. pm.odr.RegisterPeer(p)
  332. if pm.txrelay != nil {
  333. pm.txrelay.addPeer(p)
  334. }
  335. pm.fetcher.notify(p, nil)
  336. }
  337. stop := make(chan struct{})
  338. defer close(stop)
  339. go func() {
  340. // new block announce loop
  341. for {
  342. select {
  343. case announce := <-p.announceChn:
  344. p.SendAnnounce(announce)
  345. case <-stop:
  346. return
  347. }
  348. }
  349. }()
  350. // main loop. handle incoming messages.
  351. for {
  352. if err := pm.handleMsg(p); err != nil {
  353. glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
  354. return err
  355. }
  356. }
  357. }
  358. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  359. // handleMsg is invoked whenever an inbound message is received from a remote
  360. // peer. The remote connection is torn down upon returning any error.
  361. func (pm *ProtocolManager) handleMsg(p *peer) error {
  362. // Read the next message from the remote peer, and ensure it's fully consumed
  363. msg, err := p.rw.ReadMsg()
  364. if err != nil {
  365. return err
  366. }
  367. var costs *requestCosts
  368. var reqCnt, maxReqs int
  369. glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size)
  370. if rc, ok := p.fcCosts[msg.Code]; ok { // check if msg is a supported request type
  371. costs = rc
  372. if p.fcClient == nil {
  373. return errResp(ErrRequestRejected, "")
  374. }
  375. bv, ok := p.fcClient.AcceptRequest()
  376. if !ok || bv < costs.baseCost {
  377. return errResp(ErrRequestRejected, "")
  378. }
  379. maxReqs = 10000
  380. if bv < pm.server.defParams.BufLimit {
  381. d := bv - costs.baseCost
  382. if d/10000 < costs.reqCost {
  383. maxReqs = int(d / costs.reqCost)
  384. }
  385. }
  386. }
  387. if msg.Size > ProtocolMaxMsgSize {
  388. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  389. }
  390. defer msg.Discard()
  391. var deliverMsg *Msg
  392. // Handle the message depending on its contents
  393. switch msg.Code {
  394. case StatusMsg:
  395. glog.V(logger.Debug).Infof("<=== StatusMsg from peer %v", p.id)
  396. // Status messages should never arrive after the handshake
  397. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  398. // Block header query, collect the requested headers and reply
  399. case AnnounceMsg:
  400. glog.V(logger.Debug).Infoln("<=== AnnounceMsg from peer %v:", p.id)
  401. var req announceData
  402. if err := msg.Decode(&req); err != nil {
  403. return errResp(ErrDecode, "%v: %v", msg, err)
  404. }
  405. glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
  406. pm.fetcher.notify(p, &req)
  407. case GetBlockHeadersMsg:
  408. glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
  409. // Decode the complex header query
  410. var req struct {
  411. ReqID uint64
  412. Query getBlockHeadersData
  413. }
  414. if err := msg.Decode(&req); err != nil {
  415. return errResp(ErrDecode, "%v: %v", msg, err)
  416. }
  417. query := req.Query
  418. if query.Amount > uint64(maxReqs) || query.Amount > MaxHeaderFetch {
  419. return errResp(ErrRequestRejected, "")
  420. }
  421. hashMode := query.Origin.Hash != (common.Hash{})
  422. // Gather headers until the fetch or network limits is reached
  423. var (
  424. bytes common.StorageSize
  425. headers []*types.Header
  426. unknown bool
  427. )
  428. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  429. // Retrieve the next header satisfying the query
  430. var origin *types.Header
  431. if hashMode {
  432. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  433. } else {
  434. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  435. }
  436. if origin == nil {
  437. break
  438. }
  439. number := origin.Number.Uint64()
  440. headers = append(headers, origin)
  441. bytes += estHeaderRlpSize
  442. // Advance to the next header of the query
  443. switch {
  444. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  445. // Hash based traversal towards the genesis block
  446. for i := 0; i < int(query.Skip)+1; i++ {
  447. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  448. query.Origin.Hash = header.ParentHash
  449. number--
  450. } else {
  451. unknown = true
  452. break
  453. }
  454. }
  455. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  456. // Hash based traversal towards the leaf block
  457. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  458. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  459. query.Origin.Hash = header.Hash()
  460. } else {
  461. unknown = true
  462. }
  463. } else {
  464. unknown = true
  465. }
  466. case query.Reverse:
  467. // Number based traversal towards the genesis block
  468. if query.Origin.Number >= query.Skip+1 {
  469. query.Origin.Number -= (query.Skip + 1)
  470. } else {
  471. unknown = true
  472. }
  473. case !query.Reverse:
  474. // Number based traversal towards the leaf block
  475. query.Origin.Number += (query.Skip + 1)
  476. }
  477. }
  478. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  479. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  480. return p.SendBlockHeaders(req.ReqID, bv, headers)
  481. case BlockHeadersMsg:
  482. if pm.downloader == nil {
  483. return errResp(ErrUnexpectedResponse, "")
  484. }
  485. glog.V(logger.Debug).Infof("<=== BlockHeadersMsg from peer %v", p.id)
  486. // A batch of headers arrived to one of our previous requests
  487. var resp struct {
  488. ReqID, BV uint64
  489. Headers []*types.Header
  490. }
  491. if err := msg.Decode(&resp); err != nil {
  492. return errResp(ErrDecode, "msg %v: %v", msg, err)
  493. }
  494. p.fcServer.GotReply(resp.ReqID, resp.BV)
  495. if pm.fetcher.requestedID(resp.ReqID) {
  496. pm.fetcher.deliverHeaders(resp.ReqID, resp.Headers)
  497. } else {
  498. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  499. if err != nil {
  500. glog.V(logger.Debug).Infoln(err)
  501. }
  502. }
  503. case GetBlockBodiesMsg:
  504. glog.V(logger.Debug).Infof("<=== GetBlockBodiesMsg from peer %v", p.id)
  505. // Decode the retrieval message
  506. var req struct {
  507. ReqID uint64
  508. Hashes []common.Hash
  509. }
  510. if err := msg.Decode(&req); err != nil {
  511. return errResp(ErrDecode, "msg %v: %v", msg, err)
  512. }
  513. // Gather blocks until the fetch or network limits is reached
  514. var (
  515. bytes int
  516. bodies []rlp.RawValue
  517. )
  518. reqCnt = len(req.Hashes)
  519. if reqCnt > maxReqs || reqCnt > MaxBodyFetch {
  520. return errResp(ErrRequestRejected, "")
  521. }
  522. for _, hash := range req.Hashes {
  523. if bytes >= softResponseLimit {
  524. break
  525. }
  526. // Retrieve the requested block body, stopping if enough was found
  527. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  528. bodies = append(bodies, data)
  529. bytes += len(data)
  530. }
  531. }
  532. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  533. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  534. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  535. case BlockBodiesMsg:
  536. if pm.odr == nil {
  537. return errResp(ErrUnexpectedResponse, "")
  538. }
  539. glog.V(logger.Debug).Infof("<=== BlockBodiesMsg from peer %v", p.id)
  540. // A batch of block bodies arrived to one of our previous requests
  541. var resp struct {
  542. ReqID, BV uint64
  543. Data []*types.Body
  544. }
  545. if err := msg.Decode(&resp); err != nil {
  546. return errResp(ErrDecode, "msg %v: %v", msg, err)
  547. }
  548. p.fcServer.GotReply(resp.ReqID, resp.BV)
  549. deliverMsg = &Msg{
  550. MsgType: MsgBlockBodies,
  551. ReqID: resp.ReqID,
  552. Obj: resp.Data,
  553. }
  554. case GetCodeMsg:
  555. glog.V(logger.Debug).Infof("<=== GetCodeMsg from peer %v", p.id)
  556. // Decode the retrieval message
  557. var req struct {
  558. ReqID uint64
  559. Reqs []CodeReq
  560. }
  561. if err := msg.Decode(&req); err != nil {
  562. return errResp(ErrDecode, "msg %v: %v", msg, err)
  563. }
  564. // Gather state data until the fetch or network limits is reached
  565. var (
  566. bytes int
  567. data [][]byte
  568. )
  569. reqCnt = len(req.Reqs)
  570. if reqCnt > maxReqs || reqCnt > MaxCodeFetch {
  571. return errResp(ErrRequestRejected, "")
  572. }
  573. for _, req := range req.Reqs {
  574. // Retrieve the requested state entry, stopping if enough was found
  575. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  576. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  577. sdata := trie.Get(req.AccKey)
  578. var acc state.Account
  579. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  580. entry, _ := pm.chainDb.Get(acc.CodeHash)
  581. if bytes+len(entry) >= softResponseLimit {
  582. break
  583. }
  584. data = append(data, entry)
  585. bytes += len(entry)
  586. }
  587. }
  588. }
  589. }
  590. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  591. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  592. return p.SendCode(req.ReqID, bv, data)
  593. case CodeMsg:
  594. if pm.odr == nil {
  595. return errResp(ErrUnexpectedResponse, "")
  596. }
  597. glog.V(logger.Debug).Infof("<=== CodeMsg from peer %v", p.id)
  598. // A batch of node state data arrived to one of our previous requests
  599. var resp struct {
  600. ReqID, BV uint64
  601. Data [][]byte
  602. }
  603. if err := msg.Decode(&resp); err != nil {
  604. return errResp(ErrDecode, "msg %v: %v", msg, err)
  605. }
  606. p.fcServer.GotReply(resp.ReqID, resp.BV)
  607. deliverMsg = &Msg{
  608. MsgType: MsgCode,
  609. ReqID: resp.ReqID,
  610. Obj: resp.Data,
  611. }
  612. case GetReceiptsMsg:
  613. glog.V(logger.Debug).Infof("<=== GetReceiptsMsg from peer %v", p.id)
  614. // Decode the retrieval message
  615. var req struct {
  616. ReqID uint64
  617. Hashes []common.Hash
  618. }
  619. if err := msg.Decode(&req); err != nil {
  620. return errResp(ErrDecode, "msg %v: %v", msg, err)
  621. }
  622. // Gather state data until the fetch or network limits is reached
  623. var (
  624. bytes int
  625. receipts []rlp.RawValue
  626. )
  627. reqCnt = len(req.Hashes)
  628. if reqCnt > maxReqs || reqCnt > MaxReceiptFetch {
  629. return errResp(ErrRequestRejected, "")
  630. }
  631. for _, hash := range req.Hashes {
  632. if bytes >= softResponseLimit {
  633. break
  634. }
  635. // Retrieve the requested block's receipts, skipping if unknown to us
  636. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  637. if results == nil {
  638. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  639. continue
  640. }
  641. }
  642. // If known, encode and queue for response packet
  643. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  644. glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
  645. } else {
  646. receipts = append(receipts, encoded)
  647. bytes += len(encoded)
  648. }
  649. }
  650. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  651. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  652. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  653. case ReceiptsMsg:
  654. if pm.odr == nil {
  655. return errResp(ErrUnexpectedResponse, "")
  656. }
  657. glog.V(logger.Debug).Infof("<=== ReceiptsMsg from peer %v", p.id)
  658. // A batch of receipts arrived to one of our previous requests
  659. var resp struct {
  660. ReqID, BV uint64
  661. Receipts []types.Receipts
  662. }
  663. if err := msg.Decode(&resp); err != nil {
  664. return errResp(ErrDecode, "msg %v: %v", msg, err)
  665. }
  666. p.fcServer.GotReply(resp.ReqID, resp.BV)
  667. deliverMsg = &Msg{
  668. MsgType: MsgReceipts,
  669. ReqID: resp.ReqID,
  670. Obj: resp.Receipts,
  671. }
  672. case GetProofsMsg:
  673. glog.V(logger.Debug).Infof("<=== GetProofsMsg from peer %v", p.id)
  674. // Decode the retrieval message
  675. var req struct {
  676. ReqID uint64
  677. Reqs []ProofReq
  678. }
  679. if err := msg.Decode(&req); err != nil {
  680. return errResp(ErrDecode, "msg %v: %v", msg, err)
  681. }
  682. // Gather state data until the fetch or network limits is reached
  683. var (
  684. bytes int
  685. proofs proofsData
  686. )
  687. reqCnt = len(req.Reqs)
  688. if reqCnt > maxReqs || reqCnt > MaxProofsFetch {
  689. return errResp(ErrRequestRejected, "")
  690. }
  691. for _, req := range req.Reqs {
  692. if bytes >= softResponseLimit {
  693. break
  694. }
  695. // Retrieve the requested state entry, stopping if enough was found
  696. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  697. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  698. if len(req.AccKey) > 0 {
  699. sdata := tr.Get(req.AccKey)
  700. tr = nil
  701. var acc state.Account
  702. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  703. tr, _ = trie.New(acc.Root, pm.chainDb)
  704. }
  705. }
  706. if tr != nil {
  707. proof := tr.Prove(req.Key)
  708. proofs = append(proofs, proof)
  709. bytes += len(proof)
  710. }
  711. }
  712. }
  713. }
  714. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  715. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  716. return p.SendProofs(req.ReqID, bv, proofs)
  717. case ProofsMsg:
  718. if pm.odr == nil {
  719. return errResp(ErrUnexpectedResponse, "")
  720. }
  721. glog.V(logger.Debug).Infof("<=== ProofsMsg from peer %v", p.id)
  722. // A batch of merkle proofs arrived to one of our previous requests
  723. var resp struct {
  724. ReqID, BV uint64
  725. Data [][]rlp.RawValue
  726. }
  727. if err := msg.Decode(&resp); err != nil {
  728. return errResp(ErrDecode, "msg %v: %v", msg, err)
  729. }
  730. p.fcServer.GotReply(resp.ReqID, resp.BV)
  731. deliverMsg = &Msg{
  732. MsgType: MsgProofs,
  733. ReqID: resp.ReqID,
  734. Obj: resp.Data,
  735. }
  736. case GetHeaderProofsMsg:
  737. glog.V(logger.Debug).Infof("<=== GetHeaderProofsMsg from peer %v", p.id)
  738. // Decode the retrieval message
  739. var req struct {
  740. ReqID uint64
  741. Reqs []ChtReq
  742. }
  743. if err := msg.Decode(&req); err != nil {
  744. return errResp(ErrDecode, "msg %v: %v", msg, err)
  745. }
  746. // Gather state data until the fetch or network limits is reached
  747. var (
  748. bytes int
  749. proofs []ChtResp
  750. )
  751. reqCnt = len(req.Reqs)
  752. if reqCnt > maxReqs || reqCnt > MaxHeaderProofsFetch {
  753. return errResp(ErrRequestRejected, "")
  754. }
  755. for _, req := range req.Reqs {
  756. if bytes >= softResponseLimit {
  757. break
  758. }
  759. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  760. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  761. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  762. var encNumber [8]byte
  763. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  764. proof := tr.Prove(encNumber[:])
  765. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  766. bytes += len(proof) + estHeaderRlpSize
  767. }
  768. }
  769. }
  770. }
  771. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  772. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  773. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  774. case HeaderProofsMsg:
  775. if pm.odr == nil {
  776. return errResp(ErrUnexpectedResponse, "")
  777. }
  778. glog.V(logger.Debug).Infof("<=== HeaderProofsMsg from peer %v", p.id)
  779. var resp struct {
  780. ReqID, BV uint64
  781. Data []ChtResp
  782. }
  783. if err := msg.Decode(&resp); err != nil {
  784. return errResp(ErrDecode, "msg %v: %v", msg, err)
  785. }
  786. p.fcServer.GotReply(resp.ReqID, resp.BV)
  787. deliverMsg = &Msg{
  788. MsgType: MsgHeaderProofs,
  789. ReqID: resp.ReqID,
  790. Obj: resp.Data,
  791. }
  792. case SendTxMsg:
  793. if pm.txpool == nil {
  794. return errResp(ErrUnexpectedResponse, "")
  795. }
  796. // Transactions arrived, parse all of them and deliver to the pool
  797. var txs []*types.Transaction
  798. if err := msg.Decode(&txs); err != nil {
  799. return errResp(ErrDecode, "msg %v: %v", msg, err)
  800. }
  801. reqCnt = len(txs)
  802. if reqCnt > maxReqs || reqCnt > MaxTxSend {
  803. return errResp(ErrRequestRejected, "")
  804. }
  805. if err := pm.txpool.AddBatch(txs); err != nil {
  806. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  807. }
  808. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  809. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  810. default:
  811. glog.V(logger.Debug).Infof("<=== unknown message with code %d from peer %v", msg.Code, p.id)
  812. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  813. }
  814. if deliverMsg != nil {
  815. return pm.odr.Deliver(p, deliverMsg)
  816. }
  817. return nil
  818. }
  819. // NodeInfo retrieves some protocol metadata about the running host node.
  820. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  821. return &eth.EthNodeInfo{
  822. Network: self.networkId,
  823. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  824. Genesis: self.blockchain.Genesis().Hash(),
  825. Head: self.blockchain.LastBlockHash(),
  826. }
  827. }