server_handler.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "encoding/binary"
  20. "encoding/json"
  21. "errors"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/forkid"
  29. "github.com/ethereum/go-ethereum/core/rawdb"
  30. "github.com/ethereum/go-ethereum/core/state"
  31. "github.com/ethereum/go-ethereum/core/types"
  32. "github.com/ethereum/go-ethereum/ethdb"
  33. lps "github.com/ethereum/go-ethereum/les/lespay/server"
  34. "github.com/ethereum/go-ethereum/light"
  35. "github.com/ethereum/go-ethereum/log"
  36. "github.com/ethereum/go-ethereum/metrics"
  37. "github.com/ethereum/go-ethereum/p2p"
  38. "github.com/ethereum/go-ethereum/p2p/enode"
  39. "github.com/ethereum/go-ethereum/p2p/nodestate"
  40. "github.com/ethereum/go-ethereum/rlp"
  41. "github.com/ethereum/go-ethereum/trie"
  42. )
  43. const (
  44. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  45. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  46. ethVersion = 64 // equivalent eth version for the downloader
  47. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  48. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  49. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  50. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  51. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  52. MaxHelperTrieProofsFetch = 64 // Amount of helper tries to be fetched per retrieval request
  53. MaxTxSend = 64 // Amount of transactions to be send per request
  54. MaxTxStatus = 256 // Amount of transactions to queried per request
  55. )
  56. var (
  57. errTooManyInvalidRequest = errors.New("too many invalid requests made")
  58. errFullClientPool = errors.New("client pool is full")
  59. )
  60. // serverHandler is responsible for serving light client and process
  61. // all incoming light requests.
  62. type serverHandler struct {
  63. forkFilter forkid.Filter
  64. blockchain *core.BlockChain
  65. chainDb ethdb.Database
  66. txpool *core.TxPool
  67. server *LesServer
  68. closeCh chan struct{} // Channel used to exit all background routines of handler.
  69. wg sync.WaitGroup // WaitGroup used to track all background routines of handler.
  70. synced func() bool // Callback function used to determine whether local node is synced.
  71. // Testing fields
  72. addTxsSync bool
  73. }
  74. func newServerHandler(server *LesServer, blockchain *core.BlockChain, chainDb ethdb.Database, txpool *core.TxPool, synced func() bool) *serverHandler {
  75. handler := &serverHandler{
  76. forkFilter: forkid.NewFilter(blockchain),
  77. server: server,
  78. blockchain: blockchain,
  79. chainDb: chainDb,
  80. txpool: txpool,
  81. closeCh: make(chan struct{}),
  82. synced: synced,
  83. }
  84. return handler
  85. }
  86. // start starts the server handler.
  87. func (h *serverHandler) start() {
  88. h.wg.Add(1)
  89. go h.broadcastLoop()
  90. }
  91. // stop stops the server handler.
  92. func (h *serverHandler) stop() {
  93. close(h.closeCh)
  94. h.wg.Wait()
  95. }
  96. // runPeer is the p2p protocol run function for the given version.
  97. func (h *serverHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
  98. peer := newClientPeer(int(version), h.server.config.NetworkId, p, newMeteredMsgWriter(rw, int(version)))
  99. defer peer.close()
  100. h.wg.Add(1)
  101. defer h.wg.Done()
  102. return h.handle(peer)
  103. }
  104. func (h *serverHandler) handle(p *clientPeer) error {
  105. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  106. // Execute the LES handshake
  107. var (
  108. head = h.blockchain.CurrentHeader()
  109. hash = head.Hash()
  110. number = head.Number.Uint64()
  111. td = h.blockchain.GetTd(hash, number)
  112. forkID = forkid.NewID(h.blockchain.Config(), h.blockchain.Genesis().Hash(), h.blockchain.CurrentBlock().NumberU64())
  113. )
  114. if err := p.Handshake(td, hash, number, h.blockchain.Genesis().Hash(), forkID, h.forkFilter, h.server); err != nil {
  115. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  116. return err
  117. }
  118. // Reject the duplicated peer, otherwise register it to peerset.
  119. var registered bool
  120. if err := h.server.ns.Operation(func() {
  121. if h.server.ns.GetField(p.Node(), clientPeerField) != nil {
  122. registered = true
  123. } else {
  124. h.server.ns.SetFieldSub(p.Node(), clientPeerField, p)
  125. }
  126. }); err != nil {
  127. return err
  128. }
  129. if registered {
  130. return errAlreadyRegistered
  131. }
  132. defer func() {
  133. h.server.ns.SetField(p.Node(), clientPeerField, nil)
  134. if p.fcClient != nil { // is nil when connecting another server
  135. p.fcClient.Disconnect()
  136. }
  137. }()
  138. if p.server {
  139. // connected to another server, no messages expected, just wait for disconnection
  140. _, err := p.rw.ReadMsg()
  141. return err
  142. }
  143. // Reject light clients if server is not synced.
  144. //
  145. // Put this checking here, so that "non-synced" les-server peers are still allowed
  146. // to keep the connection.
  147. if !h.synced() {
  148. p.Log().Debug("Light server not synced, rejecting peer")
  149. return p2p.DiscRequested
  150. }
  151. // Disconnect the inbound peer if it's rejected by clientPool
  152. if cap, err := h.server.clientPool.connect(p); cap != p.fcParams.MinRecharge || err != nil {
  153. p.Log().Debug("Light Ethereum peer rejected", "err", errFullClientPool)
  154. return errFullClientPool
  155. }
  156. p.balance, _ = h.server.ns.GetField(p.Node(), h.server.clientPool.BalanceField).(*lps.NodeBalance)
  157. if p.balance == nil {
  158. return p2p.DiscRequested
  159. }
  160. activeCount, _ := h.server.clientPool.pp.Active()
  161. clientConnectionGauge.Update(int64(activeCount))
  162. var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
  163. connectedAt := mclock.Now()
  164. defer func() {
  165. wg.Wait() // Ensure all background task routines have exited.
  166. h.server.clientPool.disconnect(p)
  167. p.balance = nil
  168. activeCount, _ := h.server.clientPool.pp.Active()
  169. clientConnectionGauge.Update(int64(activeCount))
  170. connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
  171. }()
  172. // Mark the peer starts to be served.
  173. atomic.StoreUint32(&p.serving, 1)
  174. defer atomic.StoreUint32(&p.serving, 0)
  175. // Spawn a main loop to handle all incoming messages.
  176. for {
  177. select {
  178. case err := <-p.errCh:
  179. p.Log().Debug("Failed to send light ethereum response", "err", err)
  180. return err
  181. default:
  182. }
  183. if err := h.handleMsg(p, &wg); err != nil {
  184. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  185. return err
  186. }
  187. }
  188. }
  189. // handleMsg is invoked whenever an inbound message is received from a remote
  190. // peer. The remote connection is torn down upon returning any error.
  191. func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
  192. // Read the next message from the remote peer, and ensure it's fully consumed
  193. msg, err := p.rw.ReadMsg()
  194. if err != nil {
  195. return err
  196. }
  197. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  198. // Discard large message which exceeds the limitation.
  199. if msg.Size > ProtocolMaxMsgSize {
  200. clientErrorMeter.Mark(1)
  201. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  202. }
  203. defer msg.Discard()
  204. var (
  205. maxCost uint64
  206. task *servingTask
  207. )
  208. p.responseCount++
  209. responseCount := p.responseCount
  210. // accept returns an indicator whether the request can be served.
  211. // If so, deduct the max cost from the flow control buffer.
  212. accept := func(reqID, reqCnt, maxCnt uint64) bool {
  213. // Short circuit if the peer is already frozen or the request is invalid.
  214. inSizeCost := h.server.costTracker.realCost(0, msg.Size, 0)
  215. if p.isFrozen() || reqCnt == 0 || reqCnt > maxCnt {
  216. p.fcClient.OneTimeCost(inSizeCost)
  217. return false
  218. }
  219. // Prepaid max cost units before request been serving.
  220. maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt)
  221. accepted, bufShort, priority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost)
  222. if !accepted {
  223. p.freeze()
  224. p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
  225. p.fcClient.OneTimeCost(inSizeCost)
  226. return false
  227. }
  228. // Create a multi-stage task, estimate the time it takes for the task to
  229. // execute, and cache it in the request service queue.
  230. factor := h.server.costTracker.globalFactor()
  231. if factor < 0.001 {
  232. factor = 1
  233. p.Log().Error("Invalid global cost factor", "factor", factor)
  234. }
  235. maxTime := uint64(float64(maxCost) / factor)
  236. task = h.server.servingQueue.newTask(p, maxTime, priority)
  237. if task.start() {
  238. return true
  239. }
  240. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost)
  241. return false
  242. }
  243. // sendResponse sends back the response and updates the flow control statistic.
  244. sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
  245. p.responseLock.Lock()
  246. defer p.responseLock.Unlock()
  247. // Short circuit if the client is already frozen.
  248. if p.isFrozen() {
  249. realCost := h.server.costTracker.realCost(servingTime, msg.Size, 0)
  250. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  251. return
  252. }
  253. // Positive correction buffer value with real cost.
  254. var replySize uint32
  255. if reply != nil {
  256. replySize = reply.size()
  257. }
  258. var realCost uint64
  259. if h.server.costTracker.testing {
  260. realCost = maxCost // Assign a fake cost for testing purpose
  261. } else {
  262. realCost = h.server.costTracker.realCost(servingTime, msg.Size, replySize)
  263. if realCost > maxCost {
  264. realCost = maxCost
  265. }
  266. }
  267. bv := p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  268. if amount != 0 {
  269. // Feed cost tracker request serving statistic.
  270. h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
  271. // Reduce priority "balance" for the specific peer.
  272. p.balance.RequestServed(realCost)
  273. }
  274. if reply != nil {
  275. p.queueSend(func() {
  276. if err := reply.send(bv); err != nil {
  277. select {
  278. case p.errCh <- err:
  279. default:
  280. }
  281. }
  282. })
  283. }
  284. }
  285. switch msg.Code {
  286. case GetBlockHeadersMsg:
  287. p.Log().Trace("Received block header request")
  288. if metrics.EnabledExpensive {
  289. miscInHeaderPacketsMeter.Mark(1)
  290. miscInHeaderTrafficMeter.Mark(int64(msg.Size))
  291. }
  292. var req struct {
  293. ReqID uint64
  294. Query getBlockHeadersData
  295. }
  296. if err := msg.Decode(&req); err != nil {
  297. clientErrorMeter.Mark(1)
  298. return errResp(ErrDecode, "%v: %v", msg, err)
  299. }
  300. query := req.Query
  301. if accept(req.ReqID, query.Amount, MaxHeaderFetch) {
  302. wg.Add(1)
  303. go func() {
  304. defer wg.Done()
  305. hashMode := query.Origin.Hash != (common.Hash{})
  306. first := true
  307. maxNonCanonical := uint64(100)
  308. // Gather headers until the fetch or network limits is reached
  309. var (
  310. bytes common.StorageSize
  311. headers []*types.Header
  312. unknown bool
  313. )
  314. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  315. if !first && !task.waitOrStop() {
  316. sendResponse(req.ReqID, 0, nil, task.servingTime)
  317. return
  318. }
  319. // Retrieve the next header satisfying the query
  320. var origin *types.Header
  321. if hashMode {
  322. if first {
  323. origin = h.blockchain.GetHeaderByHash(query.Origin.Hash)
  324. if origin != nil {
  325. query.Origin.Number = origin.Number.Uint64()
  326. }
  327. } else {
  328. origin = h.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
  329. }
  330. } else {
  331. origin = h.blockchain.GetHeaderByNumber(query.Origin.Number)
  332. }
  333. if origin == nil {
  334. break
  335. }
  336. headers = append(headers, origin)
  337. bytes += estHeaderRlpSize
  338. // Advance to the next header of the query
  339. switch {
  340. case hashMode && query.Reverse:
  341. // Hash based traversal towards the genesis block
  342. ancestor := query.Skip + 1
  343. if ancestor == 0 {
  344. unknown = true
  345. } else {
  346. query.Origin.Hash, query.Origin.Number = h.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
  347. unknown = query.Origin.Hash == common.Hash{}
  348. }
  349. case hashMode && !query.Reverse:
  350. // Hash based traversal towards the leaf block
  351. var (
  352. current = origin.Number.Uint64()
  353. next = current + query.Skip + 1
  354. )
  355. if next <= current {
  356. infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
  357. p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
  358. unknown = true
  359. } else {
  360. if header := h.blockchain.GetHeaderByNumber(next); header != nil {
  361. nextHash := header.Hash()
  362. expOldHash, _ := h.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
  363. if expOldHash == query.Origin.Hash {
  364. query.Origin.Hash, query.Origin.Number = nextHash, next
  365. } else {
  366. unknown = true
  367. }
  368. } else {
  369. unknown = true
  370. }
  371. }
  372. case query.Reverse:
  373. // Number based traversal towards the genesis block
  374. if query.Origin.Number >= query.Skip+1 {
  375. query.Origin.Number -= query.Skip + 1
  376. } else {
  377. unknown = true
  378. }
  379. case !query.Reverse:
  380. // Number based traversal towards the leaf block
  381. query.Origin.Number += query.Skip + 1
  382. }
  383. first = false
  384. }
  385. reply := p.replyBlockHeaders(req.ReqID, headers)
  386. sendResponse(req.ReqID, query.Amount, reply, task.done())
  387. if metrics.EnabledExpensive {
  388. miscOutHeaderPacketsMeter.Mark(1)
  389. miscOutHeaderTrafficMeter.Mark(int64(reply.size()))
  390. miscServingTimeHeaderTimer.Update(time.Duration(task.servingTime))
  391. }
  392. }()
  393. }
  394. case GetBlockBodiesMsg:
  395. p.Log().Trace("Received block bodies request")
  396. if metrics.EnabledExpensive {
  397. miscInBodyPacketsMeter.Mark(1)
  398. miscInBodyTrafficMeter.Mark(int64(msg.Size))
  399. }
  400. var req struct {
  401. ReqID uint64
  402. Hashes []common.Hash
  403. }
  404. if err := msg.Decode(&req); err != nil {
  405. clientErrorMeter.Mark(1)
  406. return errResp(ErrDecode, "msg %v: %v", msg, err)
  407. }
  408. var (
  409. bytes int
  410. bodies []rlp.RawValue
  411. )
  412. reqCnt := len(req.Hashes)
  413. if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
  414. wg.Add(1)
  415. go func() {
  416. defer wg.Done()
  417. for i, hash := range req.Hashes {
  418. if i != 0 && !task.waitOrStop() {
  419. sendResponse(req.ReqID, 0, nil, task.servingTime)
  420. return
  421. }
  422. if bytes >= softResponseLimit {
  423. break
  424. }
  425. body := h.blockchain.GetBodyRLP(hash)
  426. if body == nil {
  427. p.bumpInvalid()
  428. continue
  429. }
  430. bodies = append(bodies, body)
  431. bytes += len(body)
  432. }
  433. reply := p.replyBlockBodiesRLP(req.ReqID, bodies)
  434. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  435. if metrics.EnabledExpensive {
  436. miscOutBodyPacketsMeter.Mark(1)
  437. miscOutBodyTrafficMeter.Mark(int64(reply.size()))
  438. miscServingTimeBodyTimer.Update(time.Duration(task.servingTime))
  439. }
  440. }()
  441. }
  442. case GetCodeMsg:
  443. p.Log().Trace("Received code request")
  444. if metrics.EnabledExpensive {
  445. miscInCodePacketsMeter.Mark(1)
  446. miscInCodeTrafficMeter.Mark(int64(msg.Size))
  447. }
  448. var req struct {
  449. ReqID uint64
  450. Reqs []CodeReq
  451. }
  452. if err := msg.Decode(&req); err != nil {
  453. clientErrorMeter.Mark(1)
  454. return errResp(ErrDecode, "msg %v: %v", msg, err)
  455. }
  456. var (
  457. bytes int
  458. data [][]byte
  459. )
  460. reqCnt := len(req.Reqs)
  461. if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
  462. wg.Add(1)
  463. go func() {
  464. defer wg.Done()
  465. for i, request := range req.Reqs {
  466. if i != 0 && !task.waitOrStop() {
  467. sendResponse(req.ReqID, 0, nil, task.servingTime)
  468. return
  469. }
  470. // Look up the root hash belonging to the request
  471. header := h.blockchain.GetHeaderByHash(request.BHash)
  472. if header == nil {
  473. p.Log().Warn("Failed to retrieve associate header for code", "hash", request.BHash)
  474. p.bumpInvalid()
  475. continue
  476. }
  477. // Refuse to search stale state data in the database since looking for
  478. // a non-exist key is kind of expensive.
  479. local := h.blockchain.CurrentHeader().Number.Uint64()
  480. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  481. p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local)
  482. p.bumpInvalid()
  483. continue
  484. }
  485. triedb := h.blockchain.StateCache().TrieDB()
  486. account, err := h.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey))
  487. if err != nil {
  488. p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  489. p.bumpInvalid()
  490. continue
  491. }
  492. code, err := h.blockchain.StateCache().ContractCode(common.BytesToHash(request.AccKey), common.BytesToHash(account.CodeHash))
  493. if err != nil {
  494. p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err)
  495. continue
  496. }
  497. // Accumulate the code and abort if enough data was retrieved
  498. data = append(data, code)
  499. if bytes += len(code); bytes >= softResponseLimit {
  500. break
  501. }
  502. }
  503. reply := p.replyCode(req.ReqID, data)
  504. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  505. if metrics.EnabledExpensive {
  506. miscOutCodePacketsMeter.Mark(1)
  507. miscOutCodeTrafficMeter.Mark(int64(reply.size()))
  508. miscServingTimeCodeTimer.Update(time.Duration(task.servingTime))
  509. }
  510. }()
  511. }
  512. case GetReceiptsMsg:
  513. p.Log().Trace("Received receipts request")
  514. if metrics.EnabledExpensive {
  515. miscInReceiptPacketsMeter.Mark(1)
  516. miscInReceiptTrafficMeter.Mark(int64(msg.Size))
  517. }
  518. var req struct {
  519. ReqID uint64
  520. Hashes []common.Hash
  521. }
  522. if err := msg.Decode(&req); err != nil {
  523. clientErrorMeter.Mark(1)
  524. return errResp(ErrDecode, "msg %v: %v", msg, err)
  525. }
  526. var (
  527. bytes int
  528. receipts []rlp.RawValue
  529. )
  530. reqCnt := len(req.Hashes)
  531. if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
  532. wg.Add(1)
  533. go func() {
  534. defer wg.Done()
  535. for i, hash := range req.Hashes {
  536. if i != 0 && !task.waitOrStop() {
  537. sendResponse(req.ReqID, 0, nil, task.servingTime)
  538. return
  539. }
  540. if bytes >= softResponseLimit {
  541. break
  542. }
  543. // Retrieve the requested block's receipts, skipping if unknown to us
  544. results := h.blockchain.GetReceiptsByHash(hash)
  545. if results == nil {
  546. if header := h.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  547. p.bumpInvalid()
  548. continue
  549. }
  550. }
  551. // If known, encode and queue for response packet
  552. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  553. log.Error("Failed to encode receipt", "err", err)
  554. } else {
  555. receipts = append(receipts, encoded)
  556. bytes += len(encoded)
  557. }
  558. }
  559. reply := p.replyReceiptsRLP(req.ReqID, receipts)
  560. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  561. if metrics.EnabledExpensive {
  562. miscOutReceiptPacketsMeter.Mark(1)
  563. miscOutReceiptTrafficMeter.Mark(int64(reply.size()))
  564. miscServingTimeReceiptTimer.Update(time.Duration(task.servingTime))
  565. }
  566. }()
  567. }
  568. case GetProofsV2Msg:
  569. p.Log().Trace("Received les/2 proofs request")
  570. if metrics.EnabledExpensive {
  571. miscInTrieProofPacketsMeter.Mark(1)
  572. miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
  573. }
  574. var req struct {
  575. ReqID uint64
  576. Reqs []ProofReq
  577. }
  578. if err := msg.Decode(&req); err != nil {
  579. clientErrorMeter.Mark(1)
  580. return errResp(ErrDecode, "msg %v: %v", msg, err)
  581. }
  582. // Gather state data until the fetch or network limits is reached
  583. var (
  584. lastBHash common.Hash
  585. root common.Hash
  586. header *types.Header
  587. )
  588. reqCnt := len(req.Reqs)
  589. if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
  590. wg.Add(1)
  591. go func() {
  592. defer wg.Done()
  593. nodes := light.NewNodeSet()
  594. for i, request := range req.Reqs {
  595. if i != 0 && !task.waitOrStop() {
  596. sendResponse(req.ReqID, 0, nil, task.servingTime)
  597. return
  598. }
  599. // Look up the root hash belonging to the request
  600. if request.BHash != lastBHash {
  601. root, lastBHash = common.Hash{}, request.BHash
  602. if header = h.blockchain.GetHeaderByHash(request.BHash); header == nil {
  603. p.Log().Warn("Failed to retrieve header for proof", "hash", request.BHash)
  604. p.bumpInvalid()
  605. continue
  606. }
  607. // Refuse to search stale state data in the database since looking for
  608. // a non-exist key is kind of expensive.
  609. local := h.blockchain.CurrentHeader().Number.Uint64()
  610. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  611. p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local)
  612. p.bumpInvalid()
  613. continue
  614. }
  615. root = header.Root
  616. }
  617. // If a header lookup failed (non existent), ignore subsequent requests for the same header
  618. if root == (common.Hash{}) {
  619. p.bumpInvalid()
  620. continue
  621. }
  622. // Open the account or storage trie for the request
  623. statedb := h.blockchain.StateCache()
  624. var trie state.Trie
  625. switch len(request.AccKey) {
  626. case 0:
  627. // No account key specified, open an account trie
  628. trie, err = statedb.OpenTrie(root)
  629. if trie == nil || err != nil {
  630. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err)
  631. continue
  632. }
  633. default:
  634. // Account key specified, open a storage trie
  635. account, err := h.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey))
  636. if err != nil {
  637. p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  638. p.bumpInvalid()
  639. continue
  640. }
  641. trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root)
  642. if trie == nil || err != nil {
  643. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err)
  644. continue
  645. }
  646. }
  647. // Prove the user's request from the account or stroage trie
  648. if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil {
  649. p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err)
  650. continue
  651. }
  652. if nodes.DataSize() >= softResponseLimit {
  653. break
  654. }
  655. }
  656. reply := p.replyProofsV2(req.ReqID, nodes.NodeList())
  657. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  658. if metrics.EnabledExpensive {
  659. miscOutTrieProofPacketsMeter.Mark(1)
  660. miscOutTrieProofTrafficMeter.Mark(int64(reply.size()))
  661. miscServingTimeTrieProofTimer.Update(time.Duration(task.servingTime))
  662. }
  663. }()
  664. }
  665. case GetHelperTrieProofsMsg:
  666. p.Log().Trace("Received helper trie proof request")
  667. if metrics.EnabledExpensive {
  668. miscInHelperTriePacketsMeter.Mark(1)
  669. miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
  670. }
  671. var req struct {
  672. ReqID uint64
  673. Reqs []HelperTrieReq
  674. }
  675. if err := msg.Decode(&req); err != nil {
  676. clientErrorMeter.Mark(1)
  677. return errResp(ErrDecode, "msg %v: %v", msg, err)
  678. }
  679. // Gather state data until the fetch or network limits is reached
  680. var (
  681. auxBytes int
  682. auxData [][]byte
  683. )
  684. reqCnt := len(req.Reqs)
  685. if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
  686. wg.Add(1)
  687. go func() {
  688. defer wg.Done()
  689. var (
  690. lastIdx uint64
  691. lastType uint
  692. root common.Hash
  693. auxTrie *trie.Trie
  694. )
  695. nodes := light.NewNodeSet()
  696. for i, request := range req.Reqs {
  697. if i != 0 && !task.waitOrStop() {
  698. sendResponse(req.ReqID, 0, nil, task.servingTime)
  699. return
  700. }
  701. if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx {
  702. auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx
  703. var prefix string
  704. if root, prefix = h.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) {
  705. auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(h.chainDb, prefix)))
  706. }
  707. }
  708. if request.AuxReq == auxRoot {
  709. var data []byte
  710. if root != (common.Hash{}) {
  711. data = root[:]
  712. }
  713. auxData = append(auxData, data)
  714. auxBytes += len(data)
  715. } else {
  716. if auxTrie != nil {
  717. auxTrie.Prove(request.Key, request.FromLevel, nodes)
  718. }
  719. if request.AuxReq != 0 {
  720. data := h.getAuxiliaryHeaders(request)
  721. auxData = append(auxData, data)
  722. auxBytes += len(data)
  723. }
  724. }
  725. if nodes.DataSize()+auxBytes >= softResponseLimit {
  726. break
  727. }
  728. }
  729. reply := p.replyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
  730. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  731. if metrics.EnabledExpensive {
  732. miscOutHelperTriePacketsMeter.Mark(1)
  733. miscOutHelperTrieTrafficMeter.Mark(int64(reply.size()))
  734. miscServingTimeHelperTrieTimer.Update(time.Duration(task.servingTime))
  735. }
  736. }()
  737. }
  738. case SendTxV2Msg:
  739. p.Log().Trace("Received new transactions")
  740. if metrics.EnabledExpensive {
  741. miscInTxsPacketsMeter.Mark(1)
  742. miscInTxsTrafficMeter.Mark(int64(msg.Size))
  743. }
  744. var req struct {
  745. ReqID uint64
  746. Txs []*types.Transaction
  747. }
  748. if err := msg.Decode(&req); err != nil {
  749. clientErrorMeter.Mark(1)
  750. return errResp(ErrDecode, "msg %v: %v", msg, err)
  751. }
  752. reqCnt := len(req.Txs)
  753. if accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
  754. wg.Add(1)
  755. go func() {
  756. defer wg.Done()
  757. stats := make([]light.TxStatus, len(req.Txs))
  758. for i, tx := range req.Txs {
  759. if i != 0 && !task.waitOrStop() {
  760. return
  761. }
  762. hash := tx.Hash()
  763. stats[i] = h.txStatus(hash)
  764. if stats[i].Status == core.TxStatusUnknown {
  765. addFn := h.txpool.AddRemotes
  766. // Add txs synchronously for testing purpose
  767. if h.addTxsSync {
  768. addFn = h.txpool.AddRemotesSync
  769. }
  770. if errs := addFn([]*types.Transaction{tx}); errs[0] != nil {
  771. stats[i].Error = errs[0].Error()
  772. continue
  773. }
  774. stats[i] = h.txStatus(hash)
  775. }
  776. }
  777. reply := p.replyTxStatus(req.ReqID, stats)
  778. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  779. if metrics.EnabledExpensive {
  780. miscOutTxsPacketsMeter.Mark(1)
  781. miscOutTxsTrafficMeter.Mark(int64(reply.size()))
  782. miscServingTimeTxTimer.Update(time.Duration(task.servingTime))
  783. }
  784. }()
  785. }
  786. case GetTxStatusMsg:
  787. p.Log().Trace("Received transaction status query request")
  788. if metrics.EnabledExpensive {
  789. miscInTxStatusPacketsMeter.Mark(1)
  790. miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
  791. }
  792. var req struct {
  793. ReqID uint64
  794. Hashes []common.Hash
  795. }
  796. if err := msg.Decode(&req); err != nil {
  797. clientErrorMeter.Mark(1)
  798. return errResp(ErrDecode, "msg %v: %v", msg, err)
  799. }
  800. reqCnt := len(req.Hashes)
  801. if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
  802. wg.Add(1)
  803. go func() {
  804. defer wg.Done()
  805. stats := make([]light.TxStatus, len(req.Hashes))
  806. for i, hash := range req.Hashes {
  807. if i != 0 && !task.waitOrStop() {
  808. sendResponse(req.ReqID, 0, nil, task.servingTime)
  809. return
  810. }
  811. stats[i] = h.txStatus(hash)
  812. }
  813. reply := p.replyTxStatus(req.ReqID, stats)
  814. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  815. if metrics.EnabledExpensive {
  816. miscOutTxStatusPacketsMeter.Mark(1)
  817. miscOutTxStatusTrafficMeter.Mark(int64(reply.size()))
  818. miscServingTimeTxStatusTimer.Update(time.Duration(task.servingTime))
  819. }
  820. }()
  821. }
  822. default:
  823. p.Log().Trace("Received invalid message", "code", msg.Code)
  824. clientErrorMeter.Mark(1)
  825. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  826. }
  827. // If the client has made too much invalid request(e.g. request a non-existent data),
  828. // reject them to prevent SPAM attack.
  829. if p.getInvalid() > maxRequestErrors {
  830. clientErrorMeter.Mark(1)
  831. return errTooManyInvalidRequest
  832. }
  833. return nil
  834. }
  835. // getAccount retrieves an account from the state based on root.
  836. func (h *serverHandler) getAccount(triedb *trie.Database, root, hash common.Hash) (state.Account, error) {
  837. trie, err := trie.New(root, triedb)
  838. if err != nil {
  839. return state.Account{}, err
  840. }
  841. blob, err := trie.TryGet(hash[:])
  842. if err != nil {
  843. return state.Account{}, err
  844. }
  845. var account state.Account
  846. if err = rlp.DecodeBytes(blob, &account); err != nil {
  847. return state.Account{}, err
  848. }
  849. return account, nil
  850. }
  851. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  852. func (h *serverHandler) getHelperTrie(typ uint, index uint64) (common.Hash, string) {
  853. switch typ {
  854. case htCanonical:
  855. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.ChtSize-1)
  856. return light.GetChtRoot(h.chainDb, index, sectionHead), light.ChtTablePrefix
  857. case htBloomBits:
  858. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.BloomTrieSize-1)
  859. return light.GetBloomTrieRoot(h.chainDb, index, sectionHead), light.BloomTrieTablePrefix
  860. }
  861. return common.Hash{}, ""
  862. }
  863. // getAuxiliaryHeaders returns requested auxiliary headers for the CHT request.
  864. func (h *serverHandler) getAuxiliaryHeaders(req HelperTrieReq) []byte {
  865. if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 {
  866. blockNum := binary.BigEndian.Uint64(req.Key)
  867. hash := rawdb.ReadCanonicalHash(h.chainDb, blockNum)
  868. return rawdb.ReadHeaderRLP(h.chainDb, hash, blockNum)
  869. }
  870. return nil
  871. }
  872. // txStatus returns the status of a specified transaction.
  873. func (h *serverHandler) txStatus(hash common.Hash) light.TxStatus {
  874. var stat light.TxStatus
  875. // Looking the transaction in txpool first.
  876. stat.Status = h.txpool.Status([]common.Hash{hash})[0]
  877. // If the transaction is unknown to the pool, try looking it up locally.
  878. if stat.Status == core.TxStatusUnknown {
  879. lookup := h.blockchain.GetTransactionLookup(hash)
  880. if lookup != nil {
  881. stat.Status = core.TxStatusIncluded
  882. stat.Lookup = lookup
  883. }
  884. }
  885. return stat
  886. }
  887. // broadcastLoop broadcasts new block information to all connected light
  888. // clients. According to the agreement between client and server, server should
  889. // only broadcast new announcement if the total difficulty is higher than the
  890. // last one. Besides server will add the signature if client requires.
  891. func (h *serverHandler) broadcastLoop() {
  892. defer h.wg.Done()
  893. headCh := make(chan core.ChainHeadEvent, 10)
  894. headSub := h.blockchain.SubscribeChainHeadEvent(headCh)
  895. defer headSub.Unsubscribe()
  896. var (
  897. lastHead *types.Header
  898. lastTd = common.Big0
  899. )
  900. for {
  901. select {
  902. case ev := <-headCh:
  903. header := ev.Block.Header()
  904. hash, number := header.Hash(), header.Number.Uint64()
  905. td := h.blockchain.GetTd(hash, number)
  906. if td == nil || td.Cmp(lastTd) <= 0 {
  907. continue
  908. }
  909. var reorg uint64
  910. if lastHead != nil {
  911. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(h.chainDb, header, lastHead).Number.Uint64()
  912. }
  913. lastHead, lastTd = header, td
  914. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  915. h.server.broadcaster.broadcast(announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg})
  916. case <-h.closeCh:
  917. return
  918. }
  919. }
  920. }
  921. // broadcaster sends new header announcements to active client peers
  922. type broadcaster struct {
  923. ns *nodestate.NodeStateMachine
  924. privateKey *ecdsa.PrivateKey
  925. lastAnnounce, signedAnnounce announceData
  926. }
  927. // newBroadcaster creates a new broadcaster
  928. func newBroadcaster(ns *nodestate.NodeStateMachine) *broadcaster {
  929. b := &broadcaster{ns: ns}
  930. ns.SubscribeState(priorityPoolSetup.ActiveFlag, func(node *enode.Node, oldState, newState nodestate.Flags) {
  931. if newState.Equals(priorityPoolSetup.ActiveFlag) {
  932. // send last announcement to activated peers
  933. b.sendTo(node)
  934. }
  935. })
  936. return b
  937. }
  938. // setSignerKey sets the signer key for signed announcements. Should be called before
  939. // starting the protocol handler.
  940. func (b *broadcaster) setSignerKey(privateKey *ecdsa.PrivateKey) {
  941. b.privateKey = privateKey
  942. }
  943. // broadcast sends the given announcements to all active peers
  944. func (b *broadcaster) broadcast(announce announceData) {
  945. b.ns.Operation(func() {
  946. // iterate in an Operation to ensure that the active set does not change while iterating
  947. b.lastAnnounce = announce
  948. b.ns.ForEach(priorityPoolSetup.ActiveFlag, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  949. b.sendTo(node)
  950. })
  951. })
  952. }
  953. // sendTo sends the most recent announcement to the given node unless the same or higher Td
  954. // announcement has already been sent.
  955. func (b *broadcaster) sendTo(node *enode.Node) {
  956. if b.lastAnnounce.Td == nil {
  957. return
  958. }
  959. if p, _ := b.ns.GetField(node, clientPeerField).(*clientPeer); p != nil {
  960. if p.headInfo.Td == nil || b.lastAnnounce.Td.Cmp(p.headInfo.Td) > 0 {
  961. announce := b.lastAnnounce
  962. switch p.announceType {
  963. case announceTypeSimple:
  964. if !p.queueSend(func() { p.sendAnnounce(announce) }) {
  965. log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
  966. } else {
  967. log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
  968. }
  969. case announceTypeSigned:
  970. if b.signedAnnounce.Hash != b.lastAnnounce.Hash {
  971. b.signedAnnounce = b.lastAnnounce
  972. b.signedAnnounce.sign(b.privateKey)
  973. }
  974. announce := b.signedAnnounce
  975. if !p.queueSend(func() { p.sendAnnounce(announce) }) {
  976. log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
  977. } else {
  978. log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
  979. }
  980. }
  981. p.headInfo = blockInfo{b.lastAnnounce.Hash, b.lastAnnounce.Number, b.lastAnnounce.Td}
  982. }
  983. }
  984. }