handler.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "encoding/binary"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/eth"
  31. "github.com/ethereum/go-ethereum/eth/downloader"
  32. "github.com/ethereum/go-ethereum/ethdb"
  33. "github.com/ethereum/go-ethereum/event"
  34. "github.com/ethereum/go-ethereum/logger"
  35. "github.com/ethereum/go-ethereum/logger/glog"
  36. "github.com/ethereum/go-ethereum/p2p"
  37. "github.com/ethereum/go-ethereum/p2p/discover"
  38. "github.com/ethereum/go-ethereum/p2p/discv5"
  39. "github.com/ethereum/go-ethereum/params"
  40. "github.com/ethereum/go-ethereum/pow"
  41. "github.com/ethereum/go-ethereum/rlp"
  42. "github.com/ethereum/go-ethereum/trie"
  43. )
  44. const (
  45. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  46. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  47. ethVersion = 63 // equivalent eth version for the downloader
  48. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  49. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  50. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  51. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  52. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  53. MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  54. MaxTxSend = 64 // Amount of transactions to be send per request
  55. disableClientRemovePeer = false
  56. )
  57. // errIncompatibleConfig is returned if the requested protocols and configs are
  58. // not compatible (low protocol version restrictions and high requirements).
  59. var errIncompatibleConfig = errors.New("incompatible configuration")
  60. func errResp(code errCode, format string, v ...interface{}) error {
  61. return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
  62. }
  63. type hashFetcherFn func(common.Hash) error
  64. type BlockChain interface {
  65. HasHeader(hash common.Hash) bool
  66. GetHeader(hash common.Hash, number uint64) *types.Header
  67. GetHeaderByHash(hash common.Hash) *types.Header
  68. CurrentHeader() *types.Header
  69. GetTdByHash(hash common.Hash) *big.Int
  70. InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
  71. Rollback(chain []common.Hash)
  72. Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
  73. GetHeaderByNumber(number uint64) *types.Header
  74. GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
  75. LastBlockHash() common.Hash
  76. Genesis() *types.Block
  77. }
  78. type txPool interface {
  79. // AddTransactions should add the given transactions to the pool.
  80. AddBatch([]*types.Transaction) error
  81. }
  82. type ProtocolManager struct {
  83. lightSync bool
  84. txpool txPool
  85. txrelay *LesTxRelay
  86. networkId int
  87. chainConfig *params.ChainConfig
  88. blockchain BlockChain
  89. chainDb ethdb.Database
  90. odr *LesOdr
  91. server *LesServer
  92. serverPool *serverPool
  93. downloader *downloader.Downloader
  94. fetcher *lightFetcher
  95. peers *peerSet
  96. SubProtocols []p2p.Protocol
  97. eventMux *event.TypeMux
  98. // channels for fetcher, syncer, txsyncLoop
  99. newPeerCh chan *peer
  100. quitSync chan struct{}
  101. noMorePeers chan struct{}
  102. syncMu sync.Mutex
  103. syncing bool
  104. syncDone chan struct{}
  105. // wait group is used for graceful shutdowns during downloading
  106. // and processing
  107. wg sync.WaitGroup
  108. }
  109. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  110. // with the ethereum network.
  111. func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId int, mux *event.TypeMux, pow pow.PoW, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
  112. // Create the protocol manager with the base fields
  113. manager := &ProtocolManager{
  114. lightSync: lightSync,
  115. eventMux: mux,
  116. blockchain: blockchain,
  117. chainConfig: chainConfig,
  118. chainDb: chainDb,
  119. networkId: networkId,
  120. txpool: txpool,
  121. txrelay: txrelay,
  122. odr: odr,
  123. peers: newPeerSet(),
  124. newPeerCh: make(chan *peer),
  125. quitSync: make(chan struct{}),
  126. noMorePeers: make(chan struct{}),
  127. }
  128. // Initiate a sub-protocol for every implemented version we can handle
  129. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  130. for i, version := range ProtocolVersions {
  131. // Compatible, initialize the sub-protocol
  132. version := version // Closure for the run
  133. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  134. Name: "les",
  135. Version: version,
  136. Length: ProtocolLengths[i],
  137. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  138. var entry *poolEntry
  139. peer := manager.newPeer(int(version), networkId, p, rw)
  140. if manager.serverPool != nil {
  141. addr := p.RemoteAddr().(*net.TCPAddr)
  142. entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
  143. }
  144. peer.poolEntry = entry
  145. select {
  146. case manager.newPeerCh <- peer:
  147. manager.wg.Add(1)
  148. defer manager.wg.Done()
  149. err := manager.handle(peer)
  150. if entry != nil {
  151. manager.serverPool.disconnect(entry)
  152. }
  153. return err
  154. case <-manager.quitSync:
  155. if entry != nil {
  156. manager.serverPool.disconnect(entry)
  157. }
  158. return p2p.DiscQuitting
  159. }
  160. },
  161. NodeInfo: func() interface{} {
  162. return manager.NodeInfo()
  163. },
  164. PeerInfo: func(id discover.NodeID) interface{} {
  165. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  166. return p.Info()
  167. }
  168. return nil
  169. },
  170. })
  171. }
  172. if len(manager.SubProtocols) == 0 {
  173. return nil, errIncompatibleConfig
  174. }
  175. removePeer := manager.removePeer
  176. if disableClientRemovePeer {
  177. removePeer = func(id string) {}
  178. }
  179. if lightSync {
  180. glog.V(logger.Debug).Infof("LES: create downloader")
  181. manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
  182. nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
  183. blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
  184. }
  185. if odr != nil {
  186. odr.removePeer = removePeer
  187. }
  188. /*validator := func(block *types.Block, parent *types.Block) error {
  189. return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
  190. }
  191. heighter := func() uint64 {
  192. return chainman.LastBlockNumberU64()
  193. }
  194. manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
  195. */
  196. return manager, nil
  197. }
  198. func (pm *ProtocolManager) removePeer(id string) {
  199. // Short circuit if the peer was already removed
  200. peer := pm.peers.Peer(id)
  201. if peer == nil {
  202. return
  203. }
  204. if err := pm.peers.Unregister(id); err != nil {
  205. if err == errNotRegistered {
  206. return
  207. }
  208. glog.V(logger.Error).Infoln("Removal failed:", err)
  209. }
  210. glog.V(logger.Debug).Infoln("Removing peer", id)
  211. // Unregister the peer from the downloader and Ethereum peer set
  212. glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
  213. if pm.lightSync {
  214. pm.downloader.UnregisterPeer(id)
  215. if pm.txrelay != nil {
  216. pm.txrelay.removePeer(id)
  217. }
  218. if pm.fetcher != nil {
  219. pm.fetcher.removePeer(peer)
  220. }
  221. }
  222. // Hard disconnect at the networking layer
  223. if peer != nil {
  224. peer.Peer.Disconnect(p2p.DiscUselessPeer)
  225. }
  226. }
  227. func (pm *ProtocolManager) Start(srvr *p2p.Server) {
  228. var topicDisc *discv5.Network
  229. if srvr != nil {
  230. topicDisc = srvr.DiscV5
  231. }
  232. lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
  233. if pm.lightSync {
  234. // start sync handler
  235. if srvr != nil { // srvr is nil during testing
  236. pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
  237. pm.odr.serverPool = pm.serverPool
  238. pm.fetcher = newLightFetcher(pm)
  239. }
  240. go pm.syncer()
  241. } else {
  242. if topicDisc != nil {
  243. go func() {
  244. glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic))
  245. topicDisc.RegisterTopic(lesTopic, pm.quitSync)
  246. glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic))
  247. }()
  248. }
  249. go func() {
  250. for range pm.newPeerCh {
  251. }
  252. }()
  253. }
  254. }
  255. func (pm *ProtocolManager) Stop() {
  256. // Showing a log message. During download / process this could actually
  257. // take between 5 to 10 seconds and therefor feedback is required.
  258. glog.V(logger.Info).Infoln("Stopping light ethereum protocol handler...")
  259. // Quit the sync loop.
  260. // After this send has completed, no new peers will be accepted.
  261. pm.noMorePeers <- struct{}{}
  262. close(pm.quitSync) // quits syncer, fetcher
  263. // Disconnect existing sessions.
  264. // This also closes the gate for any new registrations on the peer set.
  265. // sessions which are already established but not added to pm.peers yet
  266. // will exit when they try to register.
  267. pm.peers.Close()
  268. // Wait for any process action
  269. pm.wg.Wait()
  270. glog.V(logger.Info).Infoln("Light ethereum protocol handler stopped")
  271. }
  272. func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  273. return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
  274. }
  275. // handle is the callback invoked to manage the life cycle of a les peer. When
  276. // this function terminates, the peer is disconnected.
  277. func (pm *ProtocolManager) handle(p *peer) error {
  278. glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
  279. // Execute the LES handshake
  280. td, head, genesis := pm.blockchain.Status()
  281. headNum := core.GetBlockNumber(pm.chainDb, head)
  282. if err := p.Handshake(td, head, headNum, genesis, pm.server); err != nil {
  283. glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
  284. return err
  285. }
  286. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  287. rw.Init(p.version)
  288. }
  289. // Register the peer locally
  290. glog.V(logger.Detail).Infof("%v: adding peer", p)
  291. if err := pm.peers.Register(p); err != nil {
  292. glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
  293. return err
  294. }
  295. defer func() {
  296. if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil {
  297. p.fcClient.Remove(pm.server.fcManager)
  298. }
  299. pm.removePeer(p.id)
  300. }()
  301. // Register the peer in the downloader. If the downloader considers it banned, we disconnect
  302. glog.V(logger.Debug).Infof("LES: register peer %v", p.id)
  303. if pm.lightSync {
  304. requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
  305. reqID := getNextReqID()
  306. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  307. p.fcServer.MustAssignRequest(reqID)
  308. p.fcServer.SendRequest(reqID, cost)
  309. return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
  310. }
  311. requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
  312. reqID := getNextReqID()
  313. cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
  314. p.fcServer.MustAssignRequest(reqID)
  315. p.fcServer.SendRequest(reqID, cost)
  316. return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
  317. }
  318. if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
  319. requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
  320. return err
  321. }
  322. if pm.txrelay != nil {
  323. pm.txrelay.addPeer(p)
  324. }
  325. p.lock.Lock()
  326. head := p.headInfo
  327. p.lock.Unlock()
  328. if pm.fetcher != nil {
  329. pm.fetcher.addPeer(p)
  330. pm.fetcher.announce(p, head)
  331. }
  332. if p.poolEntry != nil {
  333. pm.serverPool.registered(p.poolEntry)
  334. }
  335. }
  336. stop := make(chan struct{})
  337. defer close(stop)
  338. go func() {
  339. // new block announce loop
  340. for {
  341. select {
  342. case announce := <-p.announceChn:
  343. p.SendAnnounce(announce)
  344. case <-stop:
  345. return
  346. }
  347. }
  348. }()
  349. // main loop. handle incoming messages.
  350. for {
  351. if err := pm.handleMsg(p); err != nil {
  352. glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
  353. return err
  354. }
  355. }
  356. }
  357. var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsMsg, SendTxMsg, GetHeaderProofsMsg}
  358. // handleMsg is invoked whenever an inbound message is received from a remote
  359. // peer. The remote connection is torn down upon returning any error.
  360. func (pm *ProtocolManager) handleMsg(p *peer) error {
  361. // Read the next message from the remote peer, and ensure it's fully consumed
  362. msg, err := p.rw.ReadMsg()
  363. if err != nil {
  364. return err
  365. }
  366. glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size)
  367. costs := p.fcCosts[msg.Code]
  368. reject := func(reqCnt, maxCnt uint64) bool {
  369. if p.fcClient == nil || reqCnt > maxCnt {
  370. return true
  371. }
  372. bufValue, _ := p.fcClient.AcceptRequest()
  373. cost := costs.baseCost + reqCnt*costs.reqCost
  374. if cost > pm.server.defParams.BufLimit {
  375. cost = pm.server.defParams.BufLimit
  376. }
  377. if cost > bufValue {
  378. glog.V(logger.Error).Infof("Request from %v came %v too early", p.id, time.Duration((cost-bufValue)*1000000/pm.server.defParams.MinRecharge))
  379. return true
  380. }
  381. return false
  382. }
  383. if msg.Size > ProtocolMaxMsgSize {
  384. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  385. }
  386. defer msg.Discard()
  387. var deliverMsg *Msg
  388. // Handle the message depending on its contents
  389. switch msg.Code {
  390. case StatusMsg:
  391. glog.V(logger.Debug).Infof("<=== StatusMsg from peer %v", p.id)
  392. // Status messages should never arrive after the handshake
  393. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  394. // Block header query, collect the requested headers and reply
  395. case AnnounceMsg:
  396. glog.V(logger.Debug).Infoln("<=== AnnounceMsg from peer %v:", p.id)
  397. var req announceData
  398. if err := msg.Decode(&req); err != nil {
  399. return errResp(ErrDecode, "%v: %v", msg, err)
  400. }
  401. glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
  402. if pm.fetcher != nil {
  403. pm.fetcher.announce(p, &req)
  404. }
  405. case GetBlockHeadersMsg:
  406. glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
  407. // Decode the complex header query
  408. var req struct {
  409. ReqID uint64
  410. Query getBlockHeadersData
  411. }
  412. if err := msg.Decode(&req); err != nil {
  413. return errResp(ErrDecode, "%v: %v", msg, err)
  414. }
  415. query := req.Query
  416. if reject(query.Amount, MaxHeaderFetch) {
  417. return errResp(ErrRequestRejected, "")
  418. }
  419. hashMode := query.Origin.Hash != (common.Hash{})
  420. // Gather headers until the fetch or network limits is reached
  421. var (
  422. bytes common.StorageSize
  423. headers []*types.Header
  424. unknown bool
  425. )
  426. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  427. // Retrieve the next header satisfying the query
  428. var origin *types.Header
  429. if hashMode {
  430. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  431. } else {
  432. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  433. }
  434. if origin == nil {
  435. break
  436. }
  437. number := origin.Number.Uint64()
  438. headers = append(headers, origin)
  439. bytes += estHeaderRlpSize
  440. // Advance to the next header of the query
  441. switch {
  442. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  443. // Hash based traversal towards the genesis block
  444. for i := 0; i < int(query.Skip)+1; i++ {
  445. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  446. query.Origin.Hash = header.ParentHash
  447. number--
  448. } else {
  449. unknown = true
  450. break
  451. }
  452. }
  453. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  454. // Hash based traversal towards the leaf block
  455. if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
  456. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  457. query.Origin.Hash = header.Hash()
  458. } else {
  459. unknown = true
  460. }
  461. } else {
  462. unknown = true
  463. }
  464. case query.Reverse:
  465. // Number based traversal towards the genesis block
  466. if query.Origin.Number >= query.Skip+1 {
  467. query.Origin.Number -= (query.Skip + 1)
  468. } else {
  469. unknown = true
  470. }
  471. case !query.Reverse:
  472. // Number based traversal towards the leaf block
  473. query.Origin.Number += (query.Skip + 1)
  474. }
  475. }
  476. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost)
  477. pm.server.fcCostStats.update(msg.Code, query.Amount, rcost)
  478. return p.SendBlockHeaders(req.ReqID, bv, headers)
  479. case BlockHeadersMsg:
  480. if pm.downloader == nil {
  481. return errResp(ErrUnexpectedResponse, "")
  482. }
  483. glog.V(logger.Debug).Infof("<=== BlockHeadersMsg from peer %v", p.id)
  484. // A batch of headers arrived to one of our previous requests
  485. var resp struct {
  486. ReqID, BV uint64
  487. Headers []*types.Header
  488. }
  489. if err := msg.Decode(&resp); err != nil {
  490. return errResp(ErrDecode, "msg %v: %v", msg, err)
  491. }
  492. p.fcServer.GotReply(resp.ReqID, resp.BV)
  493. if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
  494. pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
  495. } else {
  496. err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
  497. if err != nil {
  498. glog.V(logger.Debug).Infoln(err)
  499. }
  500. }
  501. case GetBlockBodiesMsg:
  502. glog.V(logger.Debug).Infof("<=== GetBlockBodiesMsg from peer %v", p.id)
  503. // Decode the retrieval message
  504. var req struct {
  505. ReqID uint64
  506. Hashes []common.Hash
  507. }
  508. if err := msg.Decode(&req); err != nil {
  509. return errResp(ErrDecode, "msg %v: %v", msg, err)
  510. }
  511. // Gather blocks until the fetch or network limits is reached
  512. var (
  513. bytes int
  514. bodies []rlp.RawValue
  515. )
  516. reqCnt := len(req.Hashes)
  517. if reject(uint64(reqCnt), MaxBodyFetch) {
  518. return errResp(ErrRequestRejected, "")
  519. }
  520. for _, hash := range req.Hashes {
  521. if bytes >= softResponseLimit {
  522. break
  523. }
  524. // Retrieve the requested block body, stopping if enough was found
  525. if data := core.GetBodyRLP(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash)); len(data) != 0 {
  526. bodies = append(bodies, data)
  527. bytes += len(data)
  528. }
  529. }
  530. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  531. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  532. return p.SendBlockBodiesRLP(req.ReqID, bv, bodies)
  533. case BlockBodiesMsg:
  534. if pm.odr == nil {
  535. return errResp(ErrUnexpectedResponse, "")
  536. }
  537. glog.V(logger.Debug).Infof("<=== BlockBodiesMsg from peer %v", p.id)
  538. // A batch of block bodies arrived to one of our previous requests
  539. var resp struct {
  540. ReqID, BV uint64
  541. Data []*types.Body
  542. }
  543. if err := msg.Decode(&resp); err != nil {
  544. return errResp(ErrDecode, "msg %v: %v", msg, err)
  545. }
  546. p.fcServer.GotReply(resp.ReqID, resp.BV)
  547. deliverMsg = &Msg{
  548. MsgType: MsgBlockBodies,
  549. ReqID: resp.ReqID,
  550. Obj: resp.Data,
  551. }
  552. case GetCodeMsg:
  553. glog.V(logger.Debug).Infof("<=== GetCodeMsg from peer %v", p.id)
  554. // Decode the retrieval message
  555. var req struct {
  556. ReqID uint64
  557. Reqs []CodeReq
  558. }
  559. if err := msg.Decode(&req); err != nil {
  560. return errResp(ErrDecode, "msg %v: %v", msg, err)
  561. }
  562. // Gather state data until the fetch or network limits is reached
  563. var (
  564. bytes int
  565. data [][]byte
  566. )
  567. reqCnt := len(req.Reqs)
  568. if reject(uint64(reqCnt), MaxCodeFetch) {
  569. return errResp(ErrRequestRejected, "")
  570. }
  571. for _, req := range req.Reqs {
  572. // Retrieve the requested state entry, stopping if enough was found
  573. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  574. if trie, _ := trie.New(header.Root, pm.chainDb); trie != nil {
  575. sdata := trie.Get(req.AccKey)
  576. var acc state.Account
  577. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  578. entry, _ := pm.chainDb.Get(acc.CodeHash)
  579. if bytes+len(entry) >= softResponseLimit {
  580. break
  581. }
  582. data = append(data, entry)
  583. bytes += len(entry)
  584. }
  585. }
  586. }
  587. }
  588. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  589. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  590. return p.SendCode(req.ReqID, bv, data)
  591. case CodeMsg:
  592. if pm.odr == nil {
  593. return errResp(ErrUnexpectedResponse, "")
  594. }
  595. glog.V(logger.Debug).Infof("<=== CodeMsg from peer %v", p.id)
  596. // A batch of node state data arrived to one of our previous requests
  597. var resp struct {
  598. ReqID, BV uint64
  599. Data [][]byte
  600. }
  601. if err := msg.Decode(&resp); err != nil {
  602. return errResp(ErrDecode, "msg %v: %v", msg, err)
  603. }
  604. p.fcServer.GotReply(resp.ReqID, resp.BV)
  605. deliverMsg = &Msg{
  606. MsgType: MsgCode,
  607. ReqID: resp.ReqID,
  608. Obj: resp.Data,
  609. }
  610. case GetReceiptsMsg:
  611. glog.V(logger.Debug).Infof("<=== GetReceiptsMsg from peer %v", p.id)
  612. // Decode the retrieval message
  613. var req struct {
  614. ReqID uint64
  615. Hashes []common.Hash
  616. }
  617. if err := msg.Decode(&req); err != nil {
  618. return errResp(ErrDecode, "msg %v: %v", msg, err)
  619. }
  620. // Gather state data until the fetch or network limits is reached
  621. var (
  622. bytes int
  623. receipts []rlp.RawValue
  624. )
  625. reqCnt := len(req.Hashes)
  626. if reject(uint64(reqCnt), MaxReceiptFetch) {
  627. return errResp(ErrRequestRejected, "")
  628. }
  629. for _, hash := range req.Hashes {
  630. if bytes >= softResponseLimit {
  631. break
  632. }
  633. // Retrieve the requested block's receipts, skipping if unknown to us
  634. results := core.GetBlockReceipts(pm.chainDb, hash, core.GetBlockNumber(pm.chainDb, hash))
  635. if results == nil {
  636. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  637. continue
  638. }
  639. }
  640. // If known, encode and queue for response packet
  641. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  642. glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
  643. } else {
  644. receipts = append(receipts, encoded)
  645. bytes += len(encoded)
  646. }
  647. }
  648. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  649. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  650. return p.SendReceiptsRLP(req.ReqID, bv, receipts)
  651. case ReceiptsMsg:
  652. if pm.odr == nil {
  653. return errResp(ErrUnexpectedResponse, "")
  654. }
  655. glog.V(logger.Debug).Infof("<=== ReceiptsMsg from peer %v", p.id)
  656. // A batch of receipts arrived to one of our previous requests
  657. var resp struct {
  658. ReqID, BV uint64
  659. Receipts []types.Receipts
  660. }
  661. if err := msg.Decode(&resp); err != nil {
  662. return errResp(ErrDecode, "msg %v: %v", msg, err)
  663. }
  664. p.fcServer.GotReply(resp.ReqID, resp.BV)
  665. deliverMsg = &Msg{
  666. MsgType: MsgReceipts,
  667. ReqID: resp.ReqID,
  668. Obj: resp.Receipts,
  669. }
  670. case GetProofsMsg:
  671. glog.V(logger.Debug).Infof("<=== GetProofsMsg from peer %v", p.id)
  672. // Decode the retrieval message
  673. var req struct {
  674. ReqID uint64
  675. Reqs []ProofReq
  676. }
  677. if err := msg.Decode(&req); err != nil {
  678. return errResp(ErrDecode, "msg %v: %v", msg, err)
  679. }
  680. // Gather state data until the fetch or network limits is reached
  681. var (
  682. bytes int
  683. proofs proofsData
  684. )
  685. reqCnt := len(req.Reqs)
  686. if reject(uint64(reqCnt), MaxProofsFetch) {
  687. return errResp(ErrRequestRejected, "")
  688. }
  689. for _, req := range req.Reqs {
  690. if bytes >= softResponseLimit {
  691. break
  692. }
  693. // Retrieve the requested state entry, stopping if enough was found
  694. if header := core.GetHeader(pm.chainDb, req.BHash, core.GetBlockNumber(pm.chainDb, req.BHash)); header != nil {
  695. if tr, _ := trie.New(header.Root, pm.chainDb); tr != nil {
  696. if len(req.AccKey) > 0 {
  697. sdata := tr.Get(req.AccKey)
  698. tr = nil
  699. var acc state.Account
  700. if err := rlp.DecodeBytes(sdata, &acc); err == nil {
  701. tr, _ = trie.New(acc.Root, pm.chainDb)
  702. }
  703. }
  704. if tr != nil {
  705. proof := tr.Prove(req.Key)
  706. proofs = append(proofs, proof)
  707. bytes += len(proof)
  708. }
  709. }
  710. }
  711. }
  712. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  713. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  714. return p.SendProofs(req.ReqID, bv, proofs)
  715. case ProofsMsg:
  716. if pm.odr == nil {
  717. return errResp(ErrUnexpectedResponse, "")
  718. }
  719. glog.V(logger.Debug).Infof("<=== ProofsMsg from peer %v", p.id)
  720. // A batch of merkle proofs arrived to one of our previous requests
  721. var resp struct {
  722. ReqID, BV uint64
  723. Data [][]rlp.RawValue
  724. }
  725. if err := msg.Decode(&resp); err != nil {
  726. return errResp(ErrDecode, "msg %v: %v", msg, err)
  727. }
  728. p.fcServer.GotReply(resp.ReqID, resp.BV)
  729. deliverMsg = &Msg{
  730. MsgType: MsgProofs,
  731. ReqID: resp.ReqID,
  732. Obj: resp.Data,
  733. }
  734. case GetHeaderProofsMsg:
  735. glog.V(logger.Debug).Infof("<=== GetHeaderProofsMsg from peer %v", p.id)
  736. // Decode the retrieval message
  737. var req struct {
  738. ReqID uint64
  739. Reqs []ChtReq
  740. }
  741. if err := msg.Decode(&req); err != nil {
  742. return errResp(ErrDecode, "msg %v: %v", msg, err)
  743. }
  744. // Gather state data until the fetch or network limits is reached
  745. var (
  746. bytes int
  747. proofs []ChtResp
  748. )
  749. reqCnt := len(req.Reqs)
  750. if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
  751. return errResp(ErrRequestRejected, "")
  752. }
  753. for _, req := range req.Reqs {
  754. if bytes >= softResponseLimit {
  755. break
  756. }
  757. if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
  758. if root := getChtRoot(pm.chainDb, req.ChtNum); root != (common.Hash{}) {
  759. if tr, _ := trie.New(root, pm.chainDb); tr != nil {
  760. var encNumber [8]byte
  761. binary.BigEndian.PutUint64(encNumber[:], req.BlockNum)
  762. proof := tr.Prove(encNumber[:])
  763. proofs = append(proofs, ChtResp{Header: header, Proof: proof})
  764. bytes += len(proof) + estHeaderRlpSize
  765. }
  766. }
  767. }
  768. }
  769. bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  770. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  771. return p.SendHeaderProofs(req.ReqID, bv, proofs)
  772. case HeaderProofsMsg:
  773. if pm.odr == nil {
  774. return errResp(ErrUnexpectedResponse, "")
  775. }
  776. glog.V(logger.Debug).Infof("<=== HeaderProofsMsg from peer %v", p.id)
  777. var resp struct {
  778. ReqID, BV uint64
  779. Data []ChtResp
  780. }
  781. if err := msg.Decode(&resp); err != nil {
  782. return errResp(ErrDecode, "msg %v: %v", msg, err)
  783. }
  784. p.fcServer.GotReply(resp.ReqID, resp.BV)
  785. deliverMsg = &Msg{
  786. MsgType: MsgHeaderProofs,
  787. ReqID: resp.ReqID,
  788. Obj: resp.Data,
  789. }
  790. case SendTxMsg:
  791. if pm.txpool == nil {
  792. return errResp(ErrUnexpectedResponse, "")
  793. }
  794. // Transactions arrived, parse all of them and deliver to the pool
  795. var txs []*types.Transaction
  796. if err := msg.Decode(&txs); err != nil {
  797. return errResp(ErrDecode, "msg %v: %v", msg, err)
  798. }
  799. reqCnt := len(txs)
  800. if reject(uint64(reqCnt), MaxTxSend) {
  801. return errResp(ErrRequestRejected, "")
  802. }
  803. if err := pm.txpool.AddBatch(txs); err != nil {
  804. return errResp(ErrUnexpectedResponse, "msg: %v", err)
  805. }
  806. _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost)
  807. pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost)
  808. default:
  809. glog.V(logger.Debug).Infof("<=== unknown message with code %d from peer %v", msg.Code, p.id)
  810. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  811. }
  812. if deliverMsg != nil {
  813. return pm.odr.Deliver(p, deliverMsg)
  814. }
  815. return nil
  816. }
  817. // NodeInfo retrieves some protocol metadata about the running host node.
  818. func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
  819. return &eth.EthNodeInfo{
  820. Network: self.networkId,
  821. Difficulty: self.blockchain.GetTdByHash(self.blockchain.LastBlockHash()),
  822. Genesis: self.blockchain.Genesis().Hash(),
  823. Head: self.blockchain.LastBlockHash(),
  824. }
  825. }