server_handler.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "encoding/binary"
  19. "encoding/json"
  20. "errors"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/mclock"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/core/state"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/ethdb"
  31. "github.com/ethereum/go-ethereum/light"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/metrics"
  34. "github.com/ethereum/go-ethereum/p2p"
  35. "github.com/ethereum/go-ethereum/rlp"
  36. "github.com/ethereum/go-ethereum/trie"
  37. )
  38. const (
  39. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  40. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  41. ethVersion = 63 // equivalent eth version for the downloader
  42. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  43. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  44. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  45. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  46. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  47. MaxHelperTrieProofsFetch = 64 // Amount of helper tries to be fetched per retrieval request
  48. MaxTxSend = 64 // Amount of transactions to be send per request
  49. MaxTxStatus = 256 // Amount of transactions to queried per request
  50. )
  51. var (
  52. errTooManyInvalidRequest = errors.New("too many invalid requests made")
  53. errFullClientPool = errors.New("client pool is full")
  54. )
  55. // serverHandler is responsible for serving light client and process
  56. // all incoming light requests.
  57. type serverHandler struct {
  58. blockchain *core.BlockChain
  59. chainDb ethdb.Database
  60. txpool *core.TxPool
  61. server *LesServer
  62. closeCh chan struct{} // Channel used to exit all background routines of handler.
  63. wg sync.WaitGroup // WaitGroup used to track all background routines of handler.
  64. synced func() bool // Callback function used to determine whether local node is synced.
  65. // Testing fields
  66. addTxsSync bool
  67. }
  68. func newServerHandler(server *LesServer, blockchain *core.BlockChain, chainDb ethdb.Database, txpool *core.TxPool, synced func() bool) *serverHandler {
  69. handler := &serverHandler{
  70. server: server,
  71. blockchain: blockchain,
  72. chainDb: chainDb,
  73. txpool: txpool,
  74. closeCh: make(chan struct{}),
  75. synced: synced,
  76. }
  77. return handler
  78. }
  79. // start starts the server handler.
  80. func (h *serverHandler) start() {
  81. h.wg.Add(1)
  82. go h.broadcastHeaders()
  83. }
  84. // stop stops the server handler.
  85. func (h *serverHandler) stop() {
  86. close(h.closeCh)
  87. h.wg.Wait()
  88. }
  89. // runPeer is the p2p protocol run function for the given version.
  90. func (h *serverHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
  91. peer := newClientPeer(int(version), h.server.config.NetworkId, p, newMeteredMsgWriter(rw, int(version)))
  92. defer peer.close()
  93. h.wg.Add(1)
  94. defer h.wg.Done()
  95. return h.handle(peer)
  96. }
  97. func (h *serverHandler) handle(p *clientPeer) error {
  98. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  99. // Execute the LES handshake
  100. var (
  101. head = h.blockchain.CurrentHeader()
  102. hash = head.Hash()
  103. number = head.Number.Uint64()
  104. td = h.blockchain.GetTd(hash, number)
  105. )
  106. if err := p.Handshake(td, hash, number, h.blockchain.Genesis().Hash(), h.server); err != nil {
  107. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  108. return err
  109. }
  110. if p.server {
  111. // connected to another server, no messages expected, just wait for disconnection
  112. _, err := p.rw.ReadMsg()
  113. return err
  114. }
  115. // Reject light clients if server is not synced.
  116. if !h.synced() {
  117. p.Log().Debug("Light server not synced, rejecting peer")
  118. return p2p.DiscRequested
  119. }
  120. defer p.fcClient.Disconnect()
  121. // Disconnect the inbound peer if it's rejected by clientPool
  122. if !h.server.clientPool.connect(p, 0) {
  123. p.Log().Debug("Light Ethereum peer registration failed", "err", errFullClientPool)
  124. return errFullClientPool
  125. }
  126. // Register the peer locally
  127. if err := h.server.peers.register(p); err != nil {
  128. h.server.clientPool.disconnect(p)
  129. p.Log().Error("Light Ethereum peer registration failed", "err", err)
  130. return err
  131. }
  132. clientConnectionGauge.Update(int64(h.server.peers.len()))
  133. var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
  134. connectedAt := mclock.Now()
  135. defer func() {
  136. wg.Wait() // Ensure all background task routines have exited.
  137. h.server.peers.unregister(p.id)
  138. h.server.clientPool.disconnect(p)
  139. clientConnectionGauge.Update(int64(h.server.peers.len()))
  140. connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
  141. }()
  142. // Mark the peer starts to be served.
  143. atomic.StoreUint32(&p.serving, 1)
  144. defer atomic.StoreUint32(&p.serving, 0)
  145. // Spawn a main loop to handle all incoming messages.
  146. for {
  147. select {
  148. case err := <-p.errCh:
  149. p.Log().Debug("Failed to send light ethereum response", "err", err)
  150. return err
  151. default:
  152. }
  153. if err := h.handleMsg(p, &wg); err != nil {
  154. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  155. return err
  156. }
  157. }
  158. }
  159. // handleMsg is invoked whenever an inbound message is received from a remote
  160. // peer. The remote connection is torn down upon returning any error.
  161. func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
  162. // Read the next message from the remote peer, and ensure it's fully consumed
  163. msg, err := p.rw.ReadMsg()
  164. if err != nil {
  165. return err
  166. }
  167. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  168. // Discard large message which exceeds the limitation.
  169. if msg.Size > ProtocolMaxMsgSize {
  170. clientErrorMeter.Mark(1)
  171. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  172. }
  173. defer msg.Discard()
  174. var (
  175. maxCost uint64
  176. task *servingTask
  177. )
  178. p.responseCount++
  179. responseCount := p.responseCount
  180. // accept returns an indicator whether the request can be served.
  181. // If so, deduct the max cost from the flow control buffer.
  182. accept := func(reqID, reqCnt, maxCnt uint64) bool {
  183. // Short circuit if the peer is already frozen or the request is invalid.
  184. inSizeCost := h.server.costTracker.realCost(0, msg.Size, 0)
  185. if p.isFrozen() || reqCnt == 0 || reqCnt > maxCnt {
  186. p.fcClient.OneTimeCost(inSizeCost)
  187. return false
  188. }
  189. // Prepaid max cost units before request been serving.
  190. maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt)
  191. accepted, bufShort, priority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost)
  192. if !accepted {
  193. p.freeze()
  194. p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
  195. p.fcClient.OneTimeCost(inSizeCost)
  196. return false
  197. }
  198. // Create a multi-stage task, estimate the time it takes for the task to
  199. // execute, and cache it in the request service queue.
  200. factor := h.server.costTracker.globalFactor()
  201. if factor < 0.001 {
  202. factor = 1
  203. p.Log().Error("Invalid global cost factor", "factor", factor)
  204. }
  205. maxTime := uint64(float64(maxCost) / factor)
  206. task = h.server.servingQueue.newTask(p, maxTime, priority)
  207. if task.start() {
  208. return true
  209. }
  210. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost)
  211. return false
  212. }
  213. // sendResponse sends back the response and updates the flow control statistic.
  214. sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
  215. p.responseLock.Lock()
  216. defer p.responseLock.Unlock()
  217. // Short circuit if the client is already frozen.
  218. if p.isFrozen() {
  219. realCost := h.server.costTracker.realCost(servingTime, msg.Size, 0)
  220. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  221. return
  222. }
  223. // Positive correction buffer value with real cost.
  224. var replySize uint32
  225. if reply != nil {
  226. replySize = reply.size()
  227. }
  228. var realCost uint64
  229. if h.server.costTracker.testing {
  230. realCost = maxCost // Assign a fake cost for testing purpose
  231. } else {
  232. realCost = h.server.costTracker.realCost(servingTime, msg.Size, replySize)
  233. }
  234. bv := p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  235. if amount != 0 {
  236. // Feed cost tracker request serving statistic.
  237. h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
  238. // Reduce priority "balance" for the specific peer.
  239. h.server.clientPool.requestCost(p, realCost)
  240. }
  241. if reply != nil {
  242. p.queueSend(func() {
  243. if err := reply.send(bv); err != nil {
  244. select {
  245. case p.errCh <- err:
  246. default:
  247. }
  248. }
  249. })
  250. }
  251. }
  252. switch msg.Code {
  253. case GetBlockHeadersMsg:
  254. p.Log().Trace("Received block header request")
  255. if metrics.EnabledExpensive {
  256. miscInHeaderPacketsMeter.Mark(1)
  257. miscInHeaderTrafficMeter.Mark(int64(msg.Size))
  258. }
  259. var req struct {
  260. ReqID uint64
  261. Query getBlockHeadersData
  262. }
  263. if err := msg.Decode(&req); err != nil {
  264. clientErrorMeter.Mark(1)
  265. return errResp(ErrDecode, "%v: %v", msg, err)
  266. }
  267. query := req.Query
  268. if accept(req.ReqID, query.Amount, MaxHeaderFetch) {
  269. wg.Add(1)
  270. go func() {
  271. defer wg.Done()
  272. hashMode := query.Origin.Hash != (common.Hash{})
  273. first := true
  274. maxNonCanonical := uint64(100)
  275. // Gather headers until the fetch or network limits is reached
  276. var (
  277. bytes common.StorageSize
  278. headers []*types.Header
  279. unknown bool
  280. )
  281. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  282. if !first && !task.waitOrStop() {
  283. sendResponse(req.ReqID, 0, nil, task.servingTime)
  284. return
  285. }
  286. // Retrieve the next header satisfying the query
  287. var origin *types.Header
  288. if hashMode {
  289. if first {
  290. origin = h.blockchain.GetHeaderByHash(query.Origin.Hash)
  291. if origin != nil {
  292. query.Origin.Number = origin.Number.Uint64()
  293. }
  294. } else {
  295. origin = h.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
  296. }
  297. } else {
  298. origin = h.blockchain.GetHeaderByNumber(query.Origin.Number)
  299. }
  300. if origin == nil {
  301. atomic.AddUint32(&p.invalidCount, 1)
  302. break
  303. }
  304. headers = append(headers, origin)
  305. bytes += estHeaderRlpSize
  306. // Advance to the next header of the query
  307. switch {
  308. case hashMode && query.Reverse:
  309. // Hash based traversal towards the genesis block
  310. ancestor := query.Skip + 1
  311. if ancestor == 0 {
  312. unknown = true
  313. } else {
  314. query.Origin.Hash, query.Origin.Number = h.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
  315. unknown = query.Origin.Hash == common.Hash{}
  316. }
  317. case hashMode && !query.Reverse:
  318. // Hash based traversal towards the leaf block
  319. var (
  320. current = origin.Number.Uint64()
  321. next = current + query.Skip + 1
  322. )
  323. if next <= current {
  324. infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
  325. p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
  326. unknown = true
  327. } else {
  328. if header := h.blockchain.GetHeaderByNumber(next); header != nil {
  329. nextHash := header.Hash()
  330. expOldHash, _ := h.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
  331. if expOldHash == query.Origin.Hash {
  332. query.Origin.Hash, query.Origin.Number = nextHash, next
  333. } else {
  334. unknown = true
  335. }
  336. } else {
  337. unknown = true
  338. }
  339. }
  340. case query.Reverse:
  341. // Number based traversal towards the genesis block
  342. if query.Origin.Number >= query.Skip+1 {
  343. query.Origin.Number -= query.Skip + 1
  344. } else {
  345. unknown = true
  346. }
  347. case !query.Reverse:
  348. // Number based traversal towards the leaf block
  349. query.Origin.Number += query.Skip + 1
  350. }
  351. first = false
  352. }
  353. reply := p.replyBlockHeaders(req.ReqID, headers)
  354. sendResponse(req.ReqID, query.Amount, p.replyBlockHeaders(req.ReqID, headers), task.done())
  355. if metrics.EnabledExpensive {
  356. miscOutHeaderPacketsMeter.Mark(1)
  357. miscOutHeaderTrafficMeter.Mark(int64(reply.size()))
  358. miscServingTimeHeaderTimer.Update(time.Duration(task.servingTime))
  359. }
  360. }()
  361. }
  362. case GetBlockBodiesMsg:
  363. p.Log().Trace("Received block bodies request")
  364. if metrics.EnabledExpensive {
  365. miscInBodyPacketsMeter.Mark(1)
  366. miscInBodyTrafficMeter.Mark(int64(msg.Size))
  367. }
  368. var req struct {
  369. ReqID uint64
  370. Hashes []common.Hash
  371. }
  372. if err := msg.Decode(&req); err != nil {
  373. clientErrorMeter.Mark(1)
  374. return errResp(ErrDecode, "msg %v: %v", msg, err)
  375. }
  376. var (
  377. bytes int
  378. bodies []rlp.RawValue
  379. )
  380. reqCnt := len(req.Hashes)
  381. if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
  382. wg.Add(1)
  383. go func() {
  384. defer wg.Done()
  385. for i, hash := range req.Hashes {
  386. if i != 0 && !task.waitOrStop() {
  387. sendResponse(req.ReqID, 0, nil, task.servingTime)
  388. return
  389. }
  390. if bytes >= softResponseLimit {
  391. break
  392. }
  393. body := h.blockchain.GetBodyRLP(hash)
  394. if body == nil {
  395. atomic.AddUint32(&p.invalidCount, 1)
  396. continue
  397. }
  398. bodies = append(bodies, body)
  399. bytes += len(body)
  400. }
  401. reply := p.replyBlockBodiesRLP(req.ReqID, bodies)
  402. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  403. if metrics.EnabledExpensive {
  404. miscOutBodyPacketsMeter.Mark(1)
  405. miscOutBodyTrafficMeter.Mark(int64(reply.size()))
  406. miscServingTimeBodyTimer.Update(time.Duration(task.servingTime))
  407. }
  408. }()
  409. }
  410. case GetCodeMsg:
  411. p.Log().Trace("Received code request")
  412. if metrics.EnabledExpensive {
  413. miscInCodePacketsMeter.Mark(1)
  414. miscInCodeTrafficMeter.Mark(int64(msg.Size))
  415. }
  416. var req struct {
  417. ReqID uint64
  418. Reqs []CodeReq
  419. }
  420. if err := msg.Decode(&req); err != nil {
  421. clientErrorMeter.Mark(1)
  422. return errResp(ErrDecode, "msg %v: %v", msg, err)
  423. }
  424. var (
  425. bytes int
  426. data [][]byte
  427. )
  428. reqCnt := len(req.Reqs)
  429. if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
  430. wg.Add(1)
  431. go func() {
  432. defer wg.Done()
  433. for i, request := range req.Reqs {
  434. if i != 0 && !task.waitOrStop() {
  435. sendResponse(req.ReqID, 0, nil, task.servingTime)
  436. return
  437. }
  438. // Look up the root hash belonging to the request
  439. header := h.blockchain.GetHeaderByHash(request.BHash)
  440. if header == nil {
  441. p.Log().Warn("Failed to retrieve associate header for code", "hash", request.BHash)
  442. atomic.AddUint32(&p.invalidCount, 1)
  443. continue
  444. }
  445. // Refuse to search stale state data in the database since looking for
  446. // a non-exist key is kind of expensive.
  447. local := h.blockchain.CurrentHeader().Number.Uint64()
  448. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  449. p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local)
  450. atomic.AddUint32(&p.invalidCount, 1)
  451. continue
  452. }
  453. triedb := h.blockchain.StateCache().TrieDB()
  454. account, err := h.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey))
  455. if err != nil {
  456. p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  457. atomic.AddUint32(&p.invalidCount, 1)
  458. continue
  459. }
  460. code, err := triedb.Node(common.BytesToHash(account.CodeHash))
  461. if err != nil {
  462. p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err)
  463. continue
  464. }
  465. // Accumulate the code and abort if enough data was retrieved
  466. data = append(data, code)
  467. if bytes += len(code); bytes >= softResponseLimit {
  468. break
  469. }
  470. }
  471. reply := p.replyCode(req.ReqID, data)
  472. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  473. if metrics.EnabledExpensive {
  474. miscOutCodePacketsMeter.Mark(1)
  475. miscOutCodeTrafficMeter.Mark(int64(reply.size()))
  476. miscServingTimeCodeTimer.Update(time.Duration(task.servingTime))
  477. }
  478. }()
  479. }
  480. case GetReceiptsMsg:
  481. p.Log().Trace("Received receipts request")
  482. if metrics.EnabledExpensive {
  483. miscInReceiptPacketsMeter.Mark(1)
  484. miscInReceiptTrafficMeter.Mark(int64(msg.Size))
  485. }
  486. var req struct {
  487. ReqID uint64
  488. Hashes []common.Hash
  489. }
  490. if err := msg.Decode(&req); err != nil {
  491. clientErrorMeter.Mark(1)
  492. return errResp(ErrDecode, "msg %v: %v", msg, err)
  493. }
  494. var (
  495. bytes int
  496. receipts []rlp.RawValue
  497. )
  498. reqCnt := len(req.Hashes)
  499. if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
  500. wg.Add(1)
  501. go func() {
  502. defer wg.Done()
  503. for i, hash := range req.Hashes {
  504. if i != 0 && !task.waitOrStop() {
  505. sendResponse(req.ReqID, 0, nil, task.servingTime)
  506. return
  507. }
  508. if bytes >= softResponseLimit {
  509. break
  510. }
  511. // Retrieve the requested block's receipts, skipping if unknown to us
  512. results := h.blockchain.GetReceiptsByHash(hash)
  513. if results == nil {
  514. if header := h.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  515. atomic.AddUint32(&p.invalidCount, 1)
  516. continue
  517. }
  518. }
  519. // If known, encode and queue for response packet
  520. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  521. log.Error("Failed to encode receipt", "err", err)
  522. } else {
  523. receipts = append(receipts, encoded)
  524. bytes += len(encoded)
  525. }
  526. }
  527. reply := p.replyReceiptsRLP(req.ReqID, receipts)
  528. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  529. if metrics.EnabledExpensive {
  530. miscOutReceiptPacketsMeter.Mark(1)
  531. miscOutReceiptTrafficMeter.Mark(int64(reply.size()))
  532. miscServingTimeReceiptTimer.Update(time.Duration(task.servingTime))
  533. }
  534. }()
  535. }
  536. case GetProofsV2Msg:
  537. p.Log().Trace("Received les/2 proofs request")
  538. if metrics.EnabledExpensive {
  539. miscInTrieProofPacketsMeter.Mark(1)
  540. miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
  541. }
  542. var req struct {
  543. ReqID uint64
  544. Reqs []ProofReq
  545. }
  546. if err := msg.Decode(&req); err != nil {
  547. clientErrorMeter.Mark(1)
  548. return errResp(ErrDecode, "msg %v: %v", msg, err)
  549. }
  550. // Gather state data until the fetch or network limits is reached
  551. var (
  552. lastBHash common.Hash
  553. root common.Hash
  554. )
  555. reqCnt := len(req.Reqs)
  556. if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
  557. wg.Add(1)
  558. go func() {
  559. defer wg.Done()
  560. nodes := light.NewNodeSet()
  561. for i, request := range req.Reqs {
  562. if i != 0 && !task.waitOrStop() {
  563. sendResponse(req.ReqID, 0, nil, task.servingTime)
  564. return
  565. }
  566. // Look up the root hash belonging to the request
  567. var (
  568. header *types.Header
  569. trie state.Trie
  570. )
  571. if request.BHash != lastBHash {
  572. root, lastBHash = common.Hash{}, request.BHash
  573. if header = h.blockchain.GetHeaderByHash(request.BHash); header == nil {
  574. p.Log().Warn("Failed to retrieve header for proof", "hash", request.BHash)
  575. atomic.AddUint32(&p.invalidCount, 1)
  576. continue
  577. }
  578. // Refuse to search stale state data in the database since looking for
  579. // a non-exist key is kind of expensive.
  580. local := h.blockchain.CurrentHeader().Number.Uint64()
  581. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  582. p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local)
  583. atomic.AddUint32(&p.invalidCount, 1)
  584. continue
  585. }
  586. root = header.Root
  587. }
  588. // If a header lookup failed (non existent), ignore subsequent requests for the same header
  589. if root == (common.Hash{}) {
  590. atomic.AddUint32(&p.invalidCount, 1)
  591. continue
  592. }
  593. // Open the account or storage trie for the request
  594. statedb := h.blockchain.StateCache()
  595. switch len(request.AccKey) {
  596. case 0:
  597. // No account key specified, open an account trie
  598. trie, err = statedb.OpenTrie(root)
  599. if trie == nil || err != nil {
  600. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err)
  601. continue
  602. }
  603. default:
  604. // Account key specified, open a storage trie
  605. account, err := h.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey))
  606. if err != nil {
  607. p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  608. atomic.AddUint32(&p.invalidCount, 1)
  609. continue
  610. }
  611. trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root)
  612. if trie == nil || err != nil {
  613. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err)
  614. continue
  615. }
  616. }
  617. // Prove the user's request from the account or stroage trie
  618. if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil {
  619. p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err)
  620. continue
  621. }
  622. if nodes.DataSize() >= softResponseLimit {
  623. break
  624. }
  625. }
  626. reply := p.replyProofsV2(req.ReqID, nodes.NodeList())
  627. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  628. if metrics.EnabledExpensive {
  629. miscOutTrieProofPacketsMeter.Mark(1)
  630. miscOutTrieProofTrafficMeter.Mark(int64(reply.size()))
  631. miscServingTimeTrieProofTimer.Update(time.Duration(task.servingTime))
  632. }
  633. }()
  634. }
  635. case GetHelperTrieProofsMsg:
  636. p.Log().Trace("Received helper trie proof request")
  637. if metrics.EnabledExpensive {
  638. miscInHelperTriePacketsMeter.Mark(1)
  639. miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
  640. }
  641. var req struct {
  642. ReqID uint64
  643. Reqs []HelperTrieReq
  644. }
  645. if err := msg.Decode(&req); err != nil {
  646. clientErrorMeter.Mark(1)
  647. return errResp(ErrDecode, "msg %v: %v", msg, err)
  648. }
  649. // Gather state data until the fetch or network limits is reached
  650. var (
  651. auxBytes int
  652. auxData [][]byte
  653. )
  654. reqCnt := len(req.Reqs)
  655. if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
  656. wg.Add(1)
  657. go func() {
  658. defer wg.Done()
  659. var (
  660. lastIdx uint64
  661. lastType uint
  662. root common.Hash
  663. auxTrie *trie.Trie
  664. )
  665. nodes := light.NewNodeSet()
  666. for i, request := range req.Reqs {
  667. if i != 0 && !task.waitOrStop() {
  668. sendResponse(req.ReqID, 0, nil, task.servingTime)
  669. return
  670. }
  671. if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx {
  672. auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx
  673. var prefix string
  674. if root, prefix = h.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) {
  675. auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(h.chainDb, prefix)))
  676. }
  677. }
  678. if request.AuxReq == auxRoot {
  679. var data []byte
  680. if root != (common.Hash{}) {
  681. data = root[:]
  682. }
  683. auxData = append(auxData, data)
  684. auxBytes += len(data)
  685. } else {
  686. if auxTrie != nil {
  687. auxTrie.Prove(request.Key, request.FromLevel, nodes)
  688. }
  689. if request.AuxReq != 0 {
  690. data := h.getAuxiliaryHeaders(request)
  691. auxData = append(auxData, data)
  692. auxBytes += len(data)
  693. }
  694. }
  695. if nodes.DataSize()+auxBytes >= softResponseLimit {
  696. break
  697. }
  698. }
  699. reply := p.replyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
  700. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  701. if metrics.EnabledExpensive {
  702. miscOutHelperTriePacketsMeter.Mark(1)
  703. miscOutHelperTrieTrafficMeter.Mark(int64(reply.size()))
  704. miscServingTimeHelperTrieTimer.Update(time.Duration(task.servingTime))
  705. }
  706. }()
  707. }
  708. case SendTxV2Msg:
  709. p.Log().Trace("Received new transactions")
  710. if metrics.EnabledExpensive {
  711. miscInTxsPacketsMeter.Mark(1)
  712. miscInTxsTrafficMeter.Mark(int64(msg.Size))
  713. }
  714. var req struct {
  715. ReqID uint64
  716. Txs []*types.Transaction
  717. }
  718. if err := msg.Decode(&req); err != nil {
  719. clientErrorMeter.Mark(1)
  720. return errResp(ErrDecode, "msg %v: %v", msg, err)
  721. }
  722. reqCnt := len(req.Txs)
  723. if accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
  724. wg.Add(1)
  725. go func() {
  726. defer wg.Done()
  727. stats := make([]light.TxStatus, len(req.Txs))
  728. for i, tx := range req.Txs {
  729. if i != 0 && !task.waitOrStop() {
  730. return
  731. }
  732. hash := tx.Hash()
  733. stats[i] = h.txStatus(hash)
  734. if stats[i].Status == core.TxStatusUnknown {
  735. addFn := h.txpool.AddRemotes
  736. // Add txs synchronously for testing purpose
  737. if h.addTxsSync {
  738. addFn = h.txpool.AddRemotesSync
  739. }
  740. if errs := addFn([]*types.Transaction{tx}); errs[0] != nil {
  741. stats[i].Error = errs[0].Error()
  742. continue
  743. }
  744. stats[i] = h.txStatus(hash)
  745. }
  746. }
  747. reply := p.replyTxStatus(req.ReqID, stats)
  748. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  749. if metrics.EnabledExpensive {
  750. miscOutTxsPacketsMeter.Mark(1)
  751. miscOutTxsTrafficMeter.Mark(int64(reply.size()))
  752. miscServingTimeTxTimer.Update(time.Duration(task.servingTime))
  753. }
  754. }()
  755. }
  756. case GetTxStatusMsg:
  757. p.Log().Trace("Received transaction status query request")
  758. if metrics.EnabledExpensive {
  759. miscInTxStatusPacketsMeter.Mark(1)
  760. miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
  761. }
  762. var req struct {
  763. ReqID uint64
  764. Hashes []common.Hash
  765. }
  766. if err := msg.Decode(&req); err != nil {
  767. clientErrorMeter.Mark(1)
  768. return errResp(ErrDecode, "msg %v: %v", msg, err)
  769. }
  770. reqCnt := len(req.Hashes)
  771. if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
  772. wg.Add(1)
  773. go func() {
  774. defer wg.Done()
  775. stats := make([]light.TxStatus, len(req.Hashes))
  776. for i, hash := range req.Hashes {
  777. if i != 0 && !task.waitOrStop() {
  778. sendResponse(req.ReqID, 0, nil, task.servingTime)
  779. return
  780. }
  781. stats[i] = h.txStatus(hash)
  782. }
  783. reply := p.replyTxStatus(req.ReqID, stats)
  784. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  785. if metrics.EnabledExpensive {
  786. miscOutTxStatusPacketsMeter.Mark(1)
  787. miscOutTxStatusTrafficMeter.Mark(int64(reply.size()))
  788. miscServingTimeTxStatusTimer.Update(time.Duration(task.servingTime))
  789. }
  790. }()
  791. }
  792. default:
  793. p.Log().Trace("Received invalid message", "code", msg.Code)
  794. clientErrorMeter.Mark(1)
  795. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  796. }
  797. // If the client has made too much invalid request(e.g. request a non-exist data),
  798. // reject them to prevent SPAM attack.
  799. if atomic.LoadUint32(&p.invalidCount) > maxRequestErrors {
  800. clientErrorMeter.Mark(1)
  801. return errTooManyInvalidRequest
  802. }
  803. return nil
  804. }
  805. // getAccount retrieves an account from the state based on root.
  806. func (h *serverHandler) getAccount(triedb *trie.Database, root, hash common.Hash) (state.Account, error) {
  807. trie, err := trie.New(root, triedb)
  808. if err != nil {
  809. return state.Account{}, err
  810. }
  811. blob, err := trie.TryGet(hash[:])
  812. if err != nil {
  813. return state.Account{}, err
  814. }
  815. var account state.Account
  816. if err = rlp.DecodeBytes(blob, &account); err != nil {
  817. return state.Account{}, err
  818. }
  819. return account, nil
  820. }
  821. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  822. func (h *serverHandler) getHelperTrie(typ uint, index uint64) (common.Hash, string) {
  823. switch typ {
  824. case htCanonical:
  825. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.ChtSize-1)
  826. return light.GetChtRoot(h.chainDb, index, sectionHead), light.ChtTablePrefix
  827. case htBloomBits:
  828. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.BloomTrieSize-1)
  829. return light.GetBloomTrieRoot(h.chainDb, index, sectionHead), light.BloomTrieTablePrefix
  830. }
  831. return common.Hash{}, ""
  832. }
  833. // getAuxiliaryHeaders returns requested auxiliary headers for the CHT request.
  834. func (h *serverHandler) getAuxiliaryHeaders(req HelperTrieReq) []byte {
  835. if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 {
  836. blockNum := binary.BigEndian.Uint64(req.Key)
  837. hash := rawdb.ReadCanonicalHash(h.chainDb, blockNum)
  838. return rawdb.ReadHeaderRLP(h.chainDb, hash, blockNum)
  839. }
  840. return nil
  841. }
  842. // txStatus returns the status of a specified transaction.
  843. func (h *serverHandler) txStatus(hash common.Hash) light.TxStatus {
  844. var stat light.TxStatus
  845. // Looking the transaction in txpool first.
  846. stat.Status = h.txpool.Status([]common.Hash{hash})[0]
  847. // If the transaction is unknown to the pool, try looking it up locally.
  848. if stat.Status == core.TxStatusUnknown {
  849. lookup := h.blockchain.GetTransactionLookup(hash)
  850. if lookup != nil {
  851. stat.Status = core.TxStatusIncluded
  852. stat.Lookup = lookup
  853. }
  854. }
  855. return stat
  856. }
  857. // broadcastHeaders broadcasts new block information to all connected light
  858. // clients. According to the agreement between client and server, server should
  859. // only broadcast new announcement if the total difficulty is higher than the
  860. // last one. Besides server will add the signature if client requires.
  861. func (h *serverHandler) broadcastHeaders() {
  862. defer h.wg.Done()
  863. headCh := make(chan core.ChainHeadEvent, 10)
  864. headSub := h.blockchain.SubscribeChainHeadEvent(headCh)
  865. defer headSub.Unsubscribe()
  866. var (
  867. lastHead *types.Header
  868. lastTd = common.Big0
  869. )
  870. for {
  871. select {
  872. case ev := <-headCh:
  873. peers := h.server.peers.allPeers()
  874. if len(peers) == 0 {
  875. continue
  876. }
  877. header := ev.Block.Header()
  878. hash, number := header.Hash(), header.Number.Uint64()
  879. td := h.blockchain.GetTd(hash, number)
  880. if td == nil || td.Cmp(lastTd) <= 0 {
  881. continue
  882. }
  883. var reorg uint64
  884. if lastHead != nil {
  885. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(h.chainDb, header, lastHead).Number.Uint64()
  886. }
  887. lastHead, lastTd = header, td
  888. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  889. var (
  890. signed bool
  891. signedAnnounce announceData
  892. )
  893. announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg}
  894. for _, p := range peers {
  895. p := p
  896. switch p.announceType {
  897. case announceTypeSimple:
  898. if !p.queueSend(func() { p.sendAnnounce(announce) }) {
  899. log.Debug("Drop announcement because queue is full", "number", number, "hash", hash)
  900. }
  901. case announceTypeSigned:
  902. if !signed {
  903. signedAnnounce = announce
  904. signedAnnounce.sign(h.server.privateKey)
  905. signed = true
  906. }
  907. if !p.queueSend(func() { p.sendAnnounce(signedAnnounce) }) {
  908. log.Debug("Drop announcement because queue is full", "number", number, "hash", hash)
  909. }
  910. }
  911. }
  912. case <-h.closeCh:
  913. return
  914. }
  915. }
  916. }