server_handler.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "encoding/json"
  20. "errors"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/mclock"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. lps "github.com/ethereum/go-ethereum/les/lespay/server"
  32. "github.com/ethereum/go-ethereum/light"
  33. "github.com/ethereum/go-ethereum/log"
  34. "github.com/ethereum/go-ethereum/metrics"
  35. "github.com/ethereum/go-ethereum/p2p"
  36. "github.com/ethereum/go-ethereum/rlp"
  37. "github.com/ethereum/go-ethereum/trie"
  38. )
  39. const (
  40. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  41. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  42. ethVersion = 63 // equivalent eth version for the downloader
  43. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  44. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  45. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  46. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  47. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  48. MaxHelperTrieProofsFetch = 64 // Amount of helper tries to be fetched per retrieval request
  49. MaxTxSend = 64 // Amount of transactions to be send per request
  50. MaxTxStatus = 256 // Amount of transactions to queried per request
  51. )
  52. var (
  53. errTooManyInvalidRequest = errors.New("too many invalid requests made")
  54. errFullClientPool = errors.New("client pool is full")
  55. )
  56. // serverHandler is responsible for serving light client and process
  57. // all incoming light requests.
  58. type serverHandler struct {
  59. blockchain *core.BlockChain
  60. chainDb ethdb.Database
  61. txpool *core.TxPool
  62. server *LesServer
  63. closeCh chan struct{} // Channel used to exit all background routines of handler.
  64. wg sync.WaitGroup // WaitGroup used to track all background routines of handler.
  65. synced func() bool // Callback function used to determine whether local node is synced.
  66. // Testing fields
  67. addTxsSync bool
  68. }
  69. func newServerHandler(server *LesServer, blockchain *core.BlockChain, chainDb ethdb.Database, txpool *core.TxPool, synced func() bool) *serverHandler {
  70. handler := &serverHandler{
  71. server: server,
  72. blockchain: blockchain,
  73. chainDb: chainDb,
  74. txpool: txpool,
  75. closeCh: make(chan struct{}),
  76. synced: synced,
  77. }
  78. return handler
  79. }
  80. // start starts the server handler.
  81. func (h *serverHandler) start() {
  82. h.wg.Add(1)
  83. go h.broadcastHeaders()
  84. }
  85. // stop stops the server handler.
  86. func (h *serverHandler) stop() {
  87. close(h.closeCh)
  88. h.wg.Wait()
  89. }
  90. // runPeer is the p2p protocol run function for the given version.
  91. func (h *serverHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
  92. peer := newClientPeer(int(version), h.server.config.NetworkId, p, newMeteredMsgWriter(rw, int(version)))
  93. defer peer.close()
  94. h.wg.Add(1)
  95. defer h.wg.Done()
  96. return h.handle(peer)
  97. }
  98. func (h *serverHandler) handle(p *clientPeer) error {
  99. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  100. // Execute the LES handshake
  101. var (
  102. head = h.blockchain.CurrentHeader()
  103. hash = head.Hash()
  104. number = head.Number.Uint64()
  105. td = h.blockchain.GetTd(hash, number)
  106. )
  107. if err := p.Handshake(td, hash, number, h.blockchain.Genesis().Hash(), h.server); err != nil {
  108. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  109. return err
  110. }
  111. if p.server {
  112. if err := h.server.serverset.register(p); err != nil {
  113. return err
  114. }
  115. // connected to another server, no messages expected, just wait for disconnection
  116. _, err := p.rw.ReadMsg()
  117. return err
  118. }
  119. // Reject light clients if server is not synced.
  120. if !h.synced() {
  121. p.Log().Debug("Light server not synced, rejecting peer")
  122. return p2p.DiscRequested
  123. }
  124. defer p.fcClient.Disconnect()
  125. // Disconnect the inbound peer if it's rejected by clientPool
  126. if cap, err := h.server.clientPool.connect(p); cap != p.fcParams.MinRecharge || err != nil {
  127. p.Log().Debug("Light Ethereum peer rejected", "err", errFullClientPool)
  128. return errFullClientPool
  129. }
  130. p.balance, _ = h.server.clientPool.ns.GetField(p.Node(), h.server.clientPool.BalanceField).(*lps.NodeBalance)
  131. if p.balance == nil {
  132. return p2p.DiscRequested
  133. }
  134. // Register the peer locally
  135. if err := h.server.peers.register(p); err != nil {
  136. h.server.clientPool.disconnect(p)
  137. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  138. return err
  139. }
  140. clientConnectionGauge.Update(int64(h.server.peers.len()))
  141. var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
  142. connectedAt := mclock.Now()
  143. defer func() {
  144. wg.Wait() // Ensure all background task routines have exited.
  145. h.server.peers.unregister(p.id)
  146. h.server.clientPool.disconnect(p)
  147. p.balance = nil
  148. clientConnectionGauge.Update(int64(h.server.peers.len()))
  149. connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
  150. }()
  151. // Mark the peer starts to be served.
  152. atomic.StoreUint32(&p.serving, 1)
  153. defer atomic.StoreUint32(&p.serving, 0)
  154. // Spawn a main loop to handle all incoming messages.
  155. for {
  156. select {
  157. case err := <-p.errCh:
  158. p.Log().Debug("Failed to send light ethereum response", "err", err)
  159. return err
  160. default:
  161. }
  162. if err := h.handleMsg(p, &wg); err != nil {
  163. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  164. return err
  165. }
  166. }
  167. }
  168. // handleMsg is invoked whenever an inbound message is received from a remote
  169. // peer. The remote connection is torn down upon returning any error.
  170. func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
  171. // Read the next message from the remote peer, and ensure it's fully consumed
  172. msg, err := p.rw.ReadMsg()
  173. if err != nil {
  174. return err
  175. }
  176. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  177. // Discard large message which exceeds the limitation.
  178. if msg.Size > ProtocolMaxMsgSize {
  179. clientErrorMeter.Mark(1)
  180. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  181. }
  182. defer msg.Discard()
  183. var (
  184. maxCost uint64
  185. task *servingTask
  186. )
  187. p.responseCount++
  188. responseCount := p.responseCount
  189. // accept returns an indicator whether the request can be served.
  190. // If so, deduct the max cost from the flow control buffer.
  191. accept := func(reqID, reqCnt, maxCnt uint64) bool {
  192. // Short circuit if the peer is already frozen or the request is invalid.
  193. inSizeCost := h.server.costTracker.realCost(0, msg.Size, 0)
  194. if p.isFrozen() || reqCnt == 0 || reqCnt > maxCnt {
  195. p.fcClient.OneTimeCost(inSizeCost)
  196. return false
  197. }
  198. // Prepaid max cost units before request been serving.
  199. maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt)
  200. accepted, bufShort, priority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost)
  201. if !accepted {
  202. p.freeze()
  203. p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
  204. p.fcClient.OneTimeCost(inSizeCost)
  205. return false
  206. }
  207. // Create a multi-stage task, estimate the time it takes for the task to
  208. // execute, and cache it in the request service queue.
  209. factor := h.server.costTracker.globalFactor()
  210. if factor < 0.001 {
  211. factor = 1
  212. p.Log().Error("Invalid global cost factor", "factor", factor)
  213. }
  214. maxTime := uint64(float64(maxCost) / factor)
  215. task = h.server.servingQueue.newTask(p, maxTime, priority)
  216. if task.start() {
  217. return true
  218. }
  219. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost)
  220. return false
  221. }
  222. // sendResponse sends back the response and updates the flow control statistic.
  223. sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
  224. p.responseLock.Lock()
  225. defer p.responseLock.Unlock()
  226. // Short circuit if the client is already frozen.
  227. if p.isFrozen() {
  228. realCost := h.server.costTracker.realCost(servingTime, msg.Size, 0)
  229. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  230. return
  231. }
  232. // Positive correction buffer value with real cost.
  233. var replySize uint32
  234. if reply != nil {
  235. replySize = reply.size()
  236. }
  237. var realCost uint64
  238. if h.server.costTracker.testing {
  239. realCost = maxCost // Assign a fake cost for testing purpose
  240. } else {
  241. realCost = h.server.costTracker.realCost(servingTime, msg.Size, replySize)
  242. if realCost > maxCost {
  243. realCost = maxCost
  244. }
  245. }
  246. bv := p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  247. if amount != 0 {
  248. // Feed cost tracker request serving statistic.
  249. h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
  250. // Reduce priority "balance" for the specific peer.
  251. p.balance.RequestServed(realCost)
  252. }
  253. if reply != nil {
  254. p.queueSend(func() {
  255. if err := reply.send(bv); err != nil {
  256. select {
  257. case p.errCh <- err:
  258. default:
  259. }
  260. }
  261. })
  262. }
  263. }
  264. switch msg.Code {
  265. case GetBlockHeadersMsg:
  266. p.Log().Trace("Received block header request")
  267. if metrics.EnabledExpensive {
  268. miscInHeaderPacketsMeter.Mark(1)
  269. miscInHeaderTrafficMeter.Mark(int64(msg.Size))
  270. }
  271. var req struct {
  272. ReqID uint64
  273. Query getBlockHeadersData
  274. }
  275. if err := msg.Decode(&req); err != nil {
  276. clientErrorMeter.Mark(1)
  277. return errResp(ErrDecode, "%v: %v", msg, err)
  278. }
  279. query := req.Query
  280. if accept(req.ReqID, query.Amount, MaxHeaderFetch) {
  281. wg.Add(1)
  282. go func() {
  283. defer wg.Done()
  284. hashMode := query.Origin.Hash != (common.Hash{})
  285. first := true
  286. maxNonCanonical := uint64(100)
  287. // Gather headers until the fetch or network limits is reached
  288. var (
  289. bytes common.StorageSize
  290. headers []*types.Header
  291. unknown bool
  292. )
  293. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  294. if !first && !task.waitOrStop() {
  295. sendResponse(req.ReqID, 0, nil, task.servingTime)
  296. return
  297. }
  298. // Retrieve the next header satisfying the query
  299. var origin *types.Header
  300. if hashMode {
  301. if first {
  302. origin = h.blockchain.GetHeaderByHash(query.Origin.Hash)
  303. if origin != nil {
  304. query.Origin.Number = origin.Number.Uint64()
  305. }
  306. } else {
  307. origin = h.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
  308. }
  309. } else {
  310. origin = h.blockchain.GetHeaderByNumber(query.Origin.Number)
  311. }
  312. if origin == nil {
  313. p.bumpInvalid()
  314. break
  315. }
  316. headers = append(headers, origin)
  317. bytes += estHeaderRlpSize
  318. // Advance to the next header of the query
  319. switch {
  320. case hashMode && query.Reverse:
  321. // Hash based traversal towards the genesis block
  322. ancestor := query.Skip + 1
  323. if ancestor == 0 {
  324. unknown = true
  325. } else {
  326. query.Origin.Hash, query.Origin.Number = h.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
  327. unknown = query.Origin.Hash == common.Hash{}
  328. }
  329. case hashMode && !query.Reverse:
  330. // Hash based traversal towards the leaf block
  331. var (
  332. current = origin.Number.Uint64()
  333. next = current + query.Skip + 1
  334. )
  335. if next <= current {
  336. infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
  337. p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
  338. unknown = true
  339. } else {
  340. if header := h.blockchain.GetHeaderByNumber(next); header != nil {
  341. nextHash := header.Hash()
  342. expOldHash, _ := h.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
  343. if expOldHash == query.Origin.Hash {
  344. query.Origin.Hash, query.Origin.Number = nextHash, next
  345. } else {
  346. unknown = true
  347. }
  348. } else {
  349. unknown = true
  350. }
  351. }
  352. case query.Reverse:
  353. // Number based traversal towards the genesis block
  354. if query.Origin.Number >= query.Skip+1 {
  355. query.Origin.Number -= query.Skip + 1
  356. } else {
  357. unknown = true
  358. }
  359. case !query.Reverse:
  360. // Number based traversal towards the leaf block
  361. query.Origin.Number += query.Skip + 1
  362. }
  363. first = false
  364. }
  365. reply := p.replyBlockHeaders(req.ReqID, headers)
  366. sendResponse(req.ReqID, query.Amount, reply, task.done())
  367. if metrics.EnabledExpensive {
  368. miscOutHeaderPacketsMeter.Mark(1)
  369. miscOutHeaderTrafficMeter.Mark(int64(reply.size()))
  370. miscServingTimeHeaderTimer.Update(time.Duration(task.servingTime))
  371. }
  372. }()
  373. }
  374. case GetBlockBodiesMsg:
  375. p.Log().Trace("Received block bodies request")
  376. if metrics.EnabledExpensive {
  377. miscInBodyPacketsMeter.Mark(1)
  378. miscInBodyTrafficMeter.Mark(int64(msg.Size))
  379. }
  380. var req struct {
  381. ReqID uint64
  382. Hashes []common.Hash
  383. }
  384. if err := msg.Decode(&req); err != nil {
  385. clientErrorMeter.Mark(1)
  386. return errResp(ErrDecode, "msg %v: %v", msg, err)
  387. }
  388. var (
  389. bytes int
  390. bodies []rlp.RawValue
  391. )
  392. reqCnt := len(req.Hashes)
  393. if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
  394. wg.Add(1)
  395. go func() {
  396. defer wg.Done()
  397. for i, hash := range req.Hashes {
  398. if i != 0 && !task.waitOrStop() {
  399. sendResponse(req.ReqID, 0, nil, task.servingTime)
  400. return
  401. }
  402. if bytes >= softResponseLimit {
  403. break
  404. }
  405. body := h.blockchain.GetBodyRLP(hash)
  406. if body == nil {
  407. p.bumpInvalid()
  408. continue
  409. }
  410. bodies = append(bodies, body)
  411. bytes += len(body)
  412. }
  413. reply := p.replyBlockBodiesRLP(req.ReqID, bodies)
  414. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  415. if metrics.EnabledExpensive {
  416. miscOutBodyPacketsMeter.Mark(1)
  417. miscOutBodyTrafficMeter.Mark(int64(reply.size()))
  418. miscServingTimeBodyTimer.Update(time.Duration(task.servingTime))
  419. }
  420. }()
  421. }
  422. case GetCodeMsg:
  423. p.Log().Trace("Received code request")
  424. if metrics.EnabledExpensive {
  425. miscInCodePacketsMeter.Mark(1)
  426. miscInCodeTrafficMeter.Mark(int64(msg.Size))
  427. }
  428. var req struct {
  429. ReqID uint64
  430. Reqs []CodeReq
  431. }
  432. if err := msg.Decode(&req); err != nil {
  433. clientErrorMeter.Mark(1)
  434. return errResp(ErrDecode, "msg %v: %v", msg, err)
  435. }
  436. var (
  437. bytes int
  438. data [][]byte
  439. )
  440. reqCnt := len(req.Reqs)
  441. if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
  442. wg.Add(1)
  443. go func() {
  444. defer wg.Done()
  445. for i, request := range req.Reqs {
  446. if i != 0 && !task.waitOrStop() {
  447. sendResponse(req.ReqID, 0, nil, task.servingTime)
  448. return
  449. }
  450. // Look up the root hash belonging to the request
  451. header := h.blockchain.GetHeaderByHash(request.BHash)
  452. if header == nil {
  453. p.Log().Warn("Failed to retrieve associate header for code", "hash", request.BHash)
  454. p.bumpInvalid()
  455. continue
  456. }
  457. // Refuse to search stale state data in the database since looking for
  458. // a non-exist key is kind of expensive.
  459. local := h.blockchain.CurrentHeader().Number.Uint64()
  460. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  461. p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local)
  462. p.bumpInvalid()
  463. continue
  464. }
  465. triedb := h.blockchain.StateCache().TrieDB()
  466. account, err := h.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey))
  467. if err != nil {
  468. p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  469. p.bumpInvalid()
  470. continue
  471. }
  472. code, err := h.blockchain.StateCache().ContractCode(common.BytesToHash(request.AccKey), common.BytesToHash(account.CodeHash))
  473. if err != nil {
  474. p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err)
  475. continue
  476. }
  477. // Accumulate the code and abort if enough data was retrieved
  478. data = append(data, code)
  479. if bytes += len(code); bytes >= softResponseLimit {
  480. break
  481. }
  482. }
  483. reply := p.replyCode(req.ReqID, data)
  484. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  485. if metrics.EnabledExpensive {
  486. miscOutCodePacketsMeter.Mark(1)
  487. miscOutCodeTrafficMeter.Mark(int64(reply.size()))
  488. miscServingTimeCodeTimer.Update(time.Duration(task.servingTime))
  489. }
  490. }()
  491. }
  492. case GetReceiptsMsg:
  493. p.Log().Trace("Received receipts request")
  494. if metrics.EnabledExpensive {
  495. miscInReceiptPacketsMeter.Mark(1)
  496. miscInReceiptTrafficMeter.Mark(int64(msg.Size))
  497. }
  498. var req struct {
  499. ReqID uint64
  500. Hashes []common.Hash
  501. }
  502. if err := msg.Decode(&req); err != nil {
  503. clientErrorMeter.Mark(1)
  504. return errResp(ErrDecode, "msg %v: %v", msg, err)
  505. }
  506. var (
  507. bytes int
  508. receipts []rlp.RawValue
  509. )
  510. reqCnt := len(req.Hashes)
  511. if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
  512. wg.Add(1)
  513. go func() {
  514. defer wg.Done()
  515. for i, hash := range req.Hashes {
  516. if i != 0 && !task.waitOrStop() {
  517. sendResponse(req.ReqID, 0, nil, task.servingTime)
  518. return
  519. }
  520. if bytes >= softResponseLimit {
  521. break
  522. }
  523. // Retrieve the requested block's receipts, skipping if unknown to us
  524. results := h.blockchain.GetReceiptsByHash(hash)
  525. if results == nil {
  526. if header := h.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  527. p.bumpInvalid()
  528. continue
  529. }
  530. }
  531. // If known, encode and queue for response packet
  532. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  533. log.Error("Failed to encode receipt", "err", err)
  534. } else {
  535. receipts = append(receipts, encoded)
  536. bytes += len(encoded)
  537. }
  538. }
  539. reply := p.replyReceiptsRLP(req.ReqID, receipts)
  540. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  541. if metrics.EnabledExpensive {
  542. miscOutReceiptPacketsMeter.Mark(1)
  543. miscOutReceiptTrafficMeter.Mark(int64(reply.size()))
  544. miscServingTimeReceiptTimer.Update(time.Duration(task.servingTime))
  545. }
  546. }()
  547. }
  548. case GetProofsV2Msg:
  549. p.Log().Trace("Received les/2 proofs request")
  550. if metrics.EnabledExpensive {
  551. miscInTrieProofPacketsMeter.Mark(1)
  552. miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
  553. }
  554. var req struct {
  555. ReqID uint64
  556. Reqs []ProofReq
  557. }
  558. if err := msg.Decode(&req); err != nil {
  559. clientErrorMeter.Mark(1)
  560. return errResp(ErrDecode, "msg %v: %v", msg, err)
  561. }
  562. // Gather state data until the fetch or network limits is reached
  563. var (
  564. lastBHash common.Hash
  565. root common.Hash
  566. )
  567. reqCnt := len(req.Reqs)
  568. if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
  569. wg.Add(1)
  570. go func() {
  571. defer wg.Done()
  572. nodes := light.NewNodeSet()
  573. for i, request := range req.Reqs {
  574. if i != 0 && !task.waitOrStop() {
  575. sendResponse(req.ReqID, 0, nil, task.servingTime)
  576. return
  577. }
  578. // Look up the root hash belonging to the request
  579. var (
  580. header *types.Header
  581. trie state.Trie
  582. )
  583. if request.BHash != lastBHash {
  584. root, lastBHash = common.Hash{}, request.BHash
  585. if header = h.blockchain.GetHeaderByHash(request.BHash); header == nil {
  586. p.Log().Warn("Failed to retrieve header for proof", "hash", request.BHash)
  587. p.bumpInvalid()
  588. continue
  589. }
  590. // Refuse to search stale state data in the database since looking for
  591. // a non-exist key is kind of expensive.
  592. local := h.blockchain.CurrentHeader().Number.Uint64()
  593. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  594. p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local)
  595. p.bumpInvalid()
  596. continue
  597. }
  598. root = header.Root
  599. }
  600. // If a header lookup failed (non existent), ignore subsequent requests for the same header
  601. if root == (common.Hash{}) {
  602. p.bumpInvalid()
  603. continue
  604. }
  605. // Open the account or storage trie for the request
  606. statedb := h.blockchain.StateCache()
  607. switch len(request.AccKey) {
  608. case 0:
  609. // No account key specified, open an account trie
  610. trie, err = statedb.OpenTrie(root)
  611. if trie == nil || err != nil {
  612. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err)
  613. continue
  614. }
  615. default:
  616. // Account key specified, open a storage trie
  617. account, err := h.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey))
  618. if err != nil {
  619. p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  620. p.bumpInvalid()
  621. continue
  622. }
  623. trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root)
  624. if trie == nil || err != nil {
  625. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err)
  626. continue
  627. }
  628. }
  629. // Prove the user's request from the account or stroage trie
  630. if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil {
  631. p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err)
  632. continue
  633. }
  634. if nodes.DataSize() >= softResponseLimit {
  635. break
  636. }
  637. }
  638. reply := p.replyProofsV2(req.ReqID, nodes.NodeList())
  639. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  640. if metrics.EnabledExpensive {
  641. miscOutTrieProofPacketsMeter.Mark(1)
  642. miscOutTrieProofTrafficMeter.Mark(int64(reply.size()))
  643. miscServingTimeTrieProofTimer.Update(time.Duration(task.servingTime))
  644. }
  645. }()
  646. }
  647. case GetHelperTrieProofsMsg:
  648. p.Log().Trace("Received helper trie proof request")
  649. if metrics.EnabledExpensive {
  650. miscInHelperTriePacketsMeter.Mark(1)
  651. miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
  652. }
  653. var req struct {
  654. ReqID uint64
  655. Reqs []HelperTrieReq
  656. }
  657. if err := msg.Decode(&req); err != nil {
  658. clientErrorMeter.Mark(1)
  659. return errResp(ErrDecode, "msg %v: %v", msg, err)
  660. }
  661. // Gather state data until the fetch or network limits is reached
  662. var (
  663. auxBytes int
  664. auxData [][]byte
  665. )
  666. reqCnt := len(req.Reqs)
  667. if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
  668. wg.Add(1)
  669. go func() {
  670. defer wg.Done()
  671. var (
  672. lastIdx uint64
  673. lastType uint
  674. root common.Hash
  675. auxTrie *trie.Trie
  676. )
  677. nodes := light.NewNodeSet()
  678. for i, request := range req.Reqs {
  679. if i != 0 && !task.waitOrStop() {
  680. sendResponse(req.ReqID, 0, nil, task.servingTime)
  681. return
  682. }
  683. if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx {
  684. auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx
  685. var prefix string
  686. if root, prefix = h.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) {
  687. auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(h.chainDb, prefix)))
  688. }
  689. }
  690. if request.AuxReq == auxRoot {
  691. var data []byte
  692. if root != (common.Hash{}) {
  693. data = root[:]
  694. }
  695. auxData = append(auxData, data)
  696. auxBytes += len(data)
  697. } else {
  698. if auxTrie != nil {
  699. auxTrie.Prove(request.Key, request.FromLevel, nodes)
  700. }
  701. if request.AuxReq != 0 {
  702. data := h.getAuxiliaryHeaders(request)
  703. auxData = append(auxData, data)
  704. auxBytes += len(data)
  705. }
  706. }
  707. if nodes.DataSize()+auxBytes >= softResponseLimit {
  708. break
  709. }
  710. }
  711. reply := p.replyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
  712. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  713. if metrics.EnabledExpensive {
  714. miscOutHelperTriePacketsMeter.Mark(1)
  715. miscOutHelperTrieTrafficMeter.Mark(int64(reply.size()))
  716. miscServingTimeHelperTrieTimer.Update(time.Duration(task.servingTime))
  717. }
  718. }()
  719. }
  720. case SendTxV2Msg:
  721. p.Log().Trace("Received new transactions")
  722. if metrics.EnabledExpensive {
  723. miscInTxsPacketsMeter.Mark(1)
  724. miscInTxsTrafficMeter.Mark(int64(msg.Size))
  725. }
  726. var req struct {
  727. ReqID uint64
  728. Txs []*types.Transaction
  729. }
  730. if err := msg.Decode(&req); err != nil {
  731. clientErrorMeter.Mark(1)
  732. return errResp(ErrDecode, "msg %v: %v", msg, err)
  733. }
  734. reqCnt := len(req.Txs)
  735. if accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
  736. wg.Add(1)
  737. go func() {
  738. defer wg.Done()
  739. stats := make([]light.TxStatus, len(req.Txs))
  740. for i, tx := range req.Txs {
  741. if i != 0 && !task.waitOrStop() {
  742. return
  743. }
  744. hash := tx.Hash()
  745. stats[i] = h.txStatus(hash)
  746. if stats[i].Status == core.TxStatusUnknown {
  747. addFn := h.txpool.AddRemotes
  748. // Add txs synchronously for testing purpose
  749. if h.addTxsSync {
  750. addFn = h.txpool.AddRemotesSync
  751. }
  752. if errs := addFn([]*types.Transaction{tx}); errs[0] != nil {
  753. stats[i].Error = errs[0].Error()
  754. continue
  755. }
  756. stats[i] = h.txStatus(hash)
  757. }
  758. }
  759. reply := p.replyTxStatus(req.ReqID, stats)
  760. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  761. if metrics.EnabledExpensive {
  762. miscOutTxsPacketsMeter.Mark(1)
  763. miscOutTxsTrafficMeter.Mark(int64(reply.size()))
  764. miscServingTimeTxTimer.Update(time.Duration(task.servingTime))
  765. }
  766. }()
  767. }
  768. case GetTxStatusMsg:
  769. p.Log().Trace("Received transaction status query request")
  770. if metrics.EnabledExpensive {
  771. miscInTxStatusPacketsMeter.Mark(1)
  772. miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
  773. }
  774. var req struct {
  775. ReqID uint64
  776. Hashes []common.Hash
  777. }
  778. if err := msg.Decode(&req); err != nil {
  779. clientErrorMeter.Mark(1)
  780. return errResp(ErrDecode, "msg %v: %v", msg, err)
  781. }
  782. reqCnt := len(req.Hashes)
  783. if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
  784. wg.Add(1)
  785. go func() {
  786. defer wg.Done()
  787. stats := make([]light.TxStatus, len(req.Hashes))
  788. for i, hash := range req.Hashes {
  789. if i != 0 && !task.waitOrStop() {
  790. sendResponse(req.ReqID, 0, nil, task.servingTime)
  791. return
  792. }
  793. stats[i] = h.txStatus(hash)
  794. }
  795. reply := p.replyTxStatus(req.ReqID, stats)
  796. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  797. if metrics.EnabledExpensive {
  798. miscOutTxStatusPacketsMeter.Mark(1)
  799. miscOutTxStatusTrafficMeter.Mark(int64(reply.size()))
  800. miscServingTimeTxStatusTimer.Update(time.Duration(task.servingTime))
  801. }
  802. }()
  803. }
  804. default:
  805. p.Log().Trace("Received invalid message", "code", msg.Code)
  806. clientErrorMeter.Mark(1)
  807. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  808. }
  809. // If the client has made too much invalid request(e.g. request a non-existent data),
  810. // reject them to prevent SPAM attack.
  811. if p.getInvalid() > maxRequestErrors {
  812. clientErrorMeter.Mark(1)
  813. return errTooManyInvalidRequest
  814. }
  815. return nil
  816. }
  817. // getAccount retrieves an account from the state based on root.
  818. func (h *serverHandler) getAccount(triedb *trie.Database, root, hash common.Hash) (state.Account, error) {
  819. trie, err := trie.New(root, triedb)
  820. if err != nil {
  821. return state.Account{}, err
  822. }
  823. blob, err := trie.TryGet(hash[:])
  824. if err != nil {
  825. return state.Account{}, err
  826. }
  827. var account state.Account
  828. if err = rlp.DecodeBytes(blob, &account); err != nil {
  829. return state.Account{}, err
  830. }
  831. return account, nil
  832. }
  833. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  834. func (h *serverHandler) getHelperTrie(typ uint, index uint64) (common.Hash, string) {
  835. switch typ {
  836. case htCanonical:
  837. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.ChtSize-1)
  838. return light.GetChtRoot(h.chainDb, index, sectionHead), light.ChtTablePrefix
  839. case htBloomBits:
  840. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.BloomTrieSize-1)
  841. return light.GetBloomTrieRoot(h.chainDb, index, sectionHead), light.BloomTrieTablePrefix
  842. }
  843. return common.Hash{}, ""
  844. }
  845. // getAuxiliaryHeaders returns requested auxiliary headers for the CHT request.
  846. func (h *serverHandler) getAuxiliaryHeaders(req HelperTrieReq) []byte {
  847. if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 {
  848. blockNum := binary.BigEndian.Uint64(req.Key)
  849. hash := rawdb.ReadCanonicalHash(h.chainDb, blockNum)
  850. return rawdb.ReadHeaderRLP(h.chainDb, hash, blockNum)
  851. }
  852. return nil
  853. }
  854. // txStatus returns the status of a specified transaction.
  855. func (h *serverHandler) txStatus(hash common.Hash) light.TxStatus {
  856. var stat light.TxStatus
  857. // Looking the transaction in txpool first.
  858. stat.Status = h.txpool.Status([]common.Hash{hash})[0]
  859. // If the transaction is unknown to the pool, try looking it up locally.
  860. if stat.Status == core.TxStatusUnknown {
  861. lookup := h.blockchain.GetTransactionLookup(hash)
  862. if lookup != nil {
  863. stat.Status = core.TxStatusIncluded
  864. stat.Lookup = lookup
  865. }
  866. }
  867. return stat
  868. }
  869. // broadcastHeaders broadcasts new block information to all connected light
  870. // clients. According to the agreement between client and server, server should
  871. // only broadcast new announcement if the total difficulty is higher than the
  872. // last one. Besides server will add the signature if client requires.
  873. func (h *serverHandler) broadcastHeaders() {
  874. defer h.wg.Done()
  875. headCh := make(chan core.ChainHeadEvent, 10)
  876. headSub := h.blockchain.SubscribeChainHeadEvent(headCh)
  877. defer headSub.Unsubscribe()
  878. var (
  879. lastHead *types.Header
  880. lastTd = common.Big0
  881. )
  882. for {
  883. select {
  884. case ev := <-headCh:
  885. peers := h.server.peers.allPeers()
  886. if len(peers) == 0 {
  887. continue
  888. }
  889. header := ev.Block.Header()
  890. hash, number := header.Hash(), header.Number.Uint64()
  891. td := h.blockchain.GetTd(hash, number)
  892. if td == nil || td.Cmp(lastTd) <= 0 {
  893. continue
  894. }
  895. var reorg uint64
  896. if lastHead != nil {
  897. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(h.chainDb, header, lastHead).Number.Uint64()
  898. }
  899. lastHead, lastTd = header, td
  900. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  901. var (
  902. signed bool
  903. signedAnnounce announceData
  904. )
  905. announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg}
  906. for _, p := range peers {
  907. p := p
  908. switch p.announceType {
  909. case announceTypeSimple:
  910. if !p.queueSend(func() { p.sendAnnounce(announce) }) {
  911. log.Debug("Drop announcement because queue is full", "number", number, "hash", hash)
  912. }
  913. case announceTypeSigned:
  914. if !signed {
  915. signedAnnounce = announce
  916. signedAnnounce.sign(h.server.privateKey)
  917. signed = true
  918. }
  919. if !p.queueSend(func() { p.sendAnnounce(signedAnnounce) }) {
  920. log.Debug("Drop announcement because queue is full", "number", number, "hash", hash)
  921. }
  922. }
  923. }
  924. case <-h.closeCh:
  925. return
  926. }
  927. }
  928. }