server_handler.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "crypto/ecdsa"
  19. "encoding/binary"
  20. "encoding/json"
  21. "errors"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/rawdb"
  29. "github.com/ethereum/go-ethereum/core/state"
  30. "github.com/ethereum/go-ethereum/core/types"
  31. "github.com/ethereum/go-ethereum/ethdb"
  32. lps "github.com/ethereum/go-ethereum/les/lespay/server"
  33. "github.com/ethereum/go-ethereum/light"
  34. "github.com/ethereum/go-ethereum/log"
  35. "github.com/ethereum/go-ethereum/metrics"
  36. "github.com/ethereum/go-ethereum/p2p"
  37. "github.com/ethereum/go-ethereum/p2p/enode"
  38. "github.com/ethereum/go-ethereum/p2p/nodestate"
  39. "github.com/ethereum/go-ethereum/rlp"
  40. "github.com/ethereum/go-ethereum/trie"
  41. )
  42. const (
  43. softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
  44. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
  45. ethVersion = 63 // equivalent eth version for the downloader
  46. MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
  47. MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
  48. MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
  49. MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
  50. MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
  51. MaxHelperTrieProofsFetch = 64 // Amount of helper tries to be fetched per retrieval request
  52. MaxTxSend = 64 // Amount of transactions to be send per request
  53. MaxTxStatus = 256 // Amount of transactions to queried per request
  54. )
  55. var (
  56. errTooManyInvalidRequest = errors.New("too many invalid requests made")
  57. errFullClientPool = errors.New("client pool is full")
  58. )
  59. // serverHandler is responsible for serving light client and process
  60. // all incoming light requests.
  61. type serverHandler struct {
  62. blockchain *core.BlockChain
  63. chainDb ethdb.Database
  64. txpool *core.TxPool
  65. server *LesServer
  66. closeCh chan struct{} // Channel used to exit all background routines of handler.
  67. wg sync.WaitGroup // WaitGroup used to track all background routines of handler.
  68. synced func() bool // Callback function used to determine whether local node is synced.
  69. // Testing fields
  70. addTxsSync bool
  71. }
  72. func newServerHandler(server *LesServer, blockchain *core.BlockChain, chainDb ethdb.Database, txpool *core.TxPool, synced func() bool) *serverHandler {
  73. handler := &serverHandler{
  74. server: server,
  75. blockchain: blockchain,
  76. chainDb: chainDb,
  77. txpool: txpool,
  78. closeCh: make(chan struct{}),
  79. synced: synced,
  80. }
  81. return handler
  82. }
  83. // start starts the server handler.
  84. func (h *serverHandler) start() {
  85. h.wg.Add(1)
  86. go h.broadcastLoop()
  87. }
  88. // stop stops the server handler.
  89. func (h *serverHandler) stop() {
  90. close(h.closeCh)
  91. h.wg.Wait()
  92. }
  93. // runPeer is the p2p protocol run function for the given version.
  94. func (h *serverHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
  95. peer := newClientPeer(int(version), h.server.config.NetworkId, p, newMeteredMsgWriter(rw, int(version)))
  96. defer peer.close()
  97. h.wg.Add(1)
  98. defer h.wg.Done()
  99. return h.handle(peer)
  100. }
  101. func (h *serverHandler) handle(p *clientPeer) error {
  102. p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
  103. // Execute the LES handshake
  104. var (
  105. head = h.blockchain.CurrentHeader()
  106. hash = head.Hash()
  107. number = head.Number.Uint64()
  108. td = h.blockchain.GetTd(hash, number)
  109. )
  110. if err := p.Handshake(td, hash, number, h.blockchain.Genesis().Hash(), h.server); err != nil {
  111. p.Log().Debug("Light Ethereum handshake failed", "err", err)
  112. return err
  113. }
  114. // Reject the duplicated peer, otherwise register it to peerset.
  115. var registered bool
  116. if err := h.server.ns.Operation(func() {
  117. if h.server.ns.GetField(p.Node(), clientPeerField) != nil {
  118. registered = true
  119. } else {
  120. h.server.ns.SetFieldSub(p.Node(), clientPeerField, p)
  121. }
  122. }); err != nil {
  123. return err
  124. }
  125. if registered {
  126. return errAlreadyRegistered
  127. }
  128. defer func() {
  129. h.server.ns.SetField(p.Node(), clientPeerField, nil)
  130. if p.fcClient != nil { // is nil when connecting another server
  131. p.fcClient.Disconnect()
  132. }
  133. }()
  134. if p.server {
  135. // connected to another server, no messages expected, just wait for disconnection
  136. _, err := p.rw.ReadMsg()
  137. return err
  138. }
  139. // Reject light clients if server is not synced.
  140. //
  141. // Put this checking here, so that "non-synced" les-server peers are still allowed
  142. // to keep the connection.
  143. if !h.synced() {
  144. p.Log().Debug("Light server not synced, rejecting peer")
  145. return p2p.DiscRequested
  146. }
  147. // Disconnect the inbound peer if it's rejected by clientPool
  148. if cap, err := h.server.clientPool.connect(p); cap != p.fcParams.MinRecharge || err != nil {
  149. p.Log().Debug("Light Ethereum peer rejected", "err", errFullClientPool)
  150. return errFullClientPool
  151. }
  152. p.balance, _ = h.server.ns.GetField(p.Node(), h.server.clientPool.BalanceField).(*lps.NodeBalance)
  153. if p.balance == nil {
  154. return p2p.DiscRequested
  155. }
  156. activeCount, _ := h.server.clientPool.pp.Active()
  157. clientConnectionGauge.Update(int64(activeCount))
  158. var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
  159. connectedAt := mclock.Now()
  160. defer func() {
  161. wg.Wait() // Ensure all background task routines have exited.
  162. h.server.clientPool.disconnect(p)
  163. p.balance = nil
  164. activeCount, _ := h.server.clientPool.pp.Active()
  165. clientConnectionGauge.Update(int64(activeCount))
  166. connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
  167. }()
  168. // Mark the peer starts to be served.
  169. atomic.StoreUint32(&p.serving, 1)
  170. defer atomic.StoreUint32(&p.serving, 0)
  171. // Spawn a main loop to handle all incoming messages.
  172. for {
  173. select {
  174. case err := <-p.errCh:
  175. p.Log().Debug("Failed to send light ethereum response", "err", err)
  176. return err
  177. default:
  178. }
  179. if err := h.handleMsg(p, &wg); err != nil {
  180. p.Log().Debug("Light Ethereum message handling failed", "err", err)
  181. return err
  182. }
  183. }
  184. }
  185. // handleMsg is invoked whenever an inbound message is received from a remote
  186. // peer. The remote connection is torn down upon returning any error.
  187. func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
  188. // Read the next message from the remote peer, and ensure it's fully consumed
  189. msg, err := p.rw.ReadMsg()
  190. if err != nil {
  191. return err
  192. }
  193. p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
  194. // Discard large message which exceeds the limitation.
  195. if msg.Size > ProtocolMaxMsgSize {
  196. clientErrorMeter.Mark(1)
  197. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  198. }
  199. defer msg.Discard()
  200. var (
  201. maxCost uint64
  202. task *servingTask
  203. )
  204. p.responseCount++
  205. responseCount := p.responseCount
  206. // accept returns an indicator whether the request can be served.
  207. // If so, deduct the max cost from the flow control buffer.
  208. accept := func(reqID, reqCnt, maxCnt uint64) bool {
  209. // Short circuit if the peer is already frozen or the request is invalid.
  210. inSizeCost := h.server.costTracker.realCost(0, msg.Size, 0)
  211. if p.isFrozen() || reqCnt == 0 || reqCnt > maxCnt {
  212. p.fcClient.OneTimeCost(inSizeCost)
  213. return false
  214. }
  215. // Prepaid max cost units before request been serving.
  216. maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt)
  217. accepted, bufShort, priority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost)
  218. if !accepted {
  219. p.freeze()
  220. p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
  221. p.fcClient.OneTimeCost(inSizeCost)
  222. return false
  223. }
  224. // Create a multi-stage task, estimate the time it takes for the task to
  225. // execute, and cache it in the request service queue.
  226. factor := h.server.costTracker.globalFactor()
  227. if factor < 0.001 {
  228. factor = 1
  229. p.Log().Error("Invalid global cost factor", "factor", factor)
  230. }
  231. maxTime := uint64(float64(maxCost) / factor)
  232. task = h.server.servingQueue.newTask(p, maxTime, priority)
  233. if task.start() {
  234. return true
  235. }
  236. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost)
  237. return false
  238. }
  239. // sendResponse sends back the response and updates the flow control statistic.
  240. sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
  241. p.responseLock.Lock()
  242. defer p.responseLock.Unlock()
  243. // Short circuit if the client is already frozen.
  244. if p.isFrozen() {
  245. realCost := h.server.costTracker.realCost(servingTime, msg.Size, 0)
  246. p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  247. return
  248. }
  249. // Positive correction buffer value with real cost.
  250. var replySize uint32
  251. if reply != nil {
  252. replySize = reply.size()
  253. }
  254. var realCost uint64
  255. if h.server.costTracker.testing {
  256. realCost = maxCost // Assign a fake cost for testing purpose
  257. } else {
  258. realCost = h.server.costTracker.realCost(servingTime, msg.Size, replySize)
  259. if realCost > maxCost {
  260. realCost = maxCost
  261. }
  262. }
  263. bv := p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
  264. if amount != 0 {
  265. // Feed cost tracker request serving statistic.
  266. h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
  267. // Reduce priority "balance" for the specific peer.
  268. p.balance.RequestServed(realCost)
  269. }
  270. if reply != nil {
  271. p.queueSend(func() {
  272. if err := reply.send(bv); err != nil {
  273. select {
  274. case p.errCh <- err:
  275. default:
  276. }
  277. }
  278. })
  279. }
  280. }
  281. switch msg.Code {
  282. case GetBlockHeadersMsg:
  283. p.Log().Trace("Received block header request")
  284. if metrics.EnabledExpensive {
  285. miscInHeaderPacketsMeter.Mark(1)
  286. miscInHeaderTrafficMeter.Mark(int64(msg.Size))
  287. }
  288. var req struct {
  289. ReqID uint64
  290. Query getBlockHeadersData
  291. }
  292. if err := msg.Decode(&req); err != nil {
  293. clientErrorMeter.Mark(1)
  294. return errResp(ErrDecode, "%v: %v", msg, err)
  295. }
  296. query := req.Query
  297. if accept(req.ReqID, query.Amount, MaxHeaderFetch) {
  298. wg.Add(1)
  299. go func() {
  300. defer wg.Done()
  301. hashMode := query.Origin.Hash != (common.Hash{})
  302. first := true
  303. maxNonCanonical := uint64(100)
  304. // Gather headers until the fetch or network limits is reached
  305. var (
  306. bytes common.StorageSize
  307. headers []*types.Header
  308. unknown bool
  309. )
  310. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
  311. if !first && !task.waitOrStop() {
  312. sendResponse(req.ReqID, 0, nil, task.servingTime)
  313. return
  314. }
  315. // Retrieve the next header satisfying the query
  316. var origin *types.Header
  317. if hashMode {
  318. if first {
  319. origin = h.blockchain.GetHeaderByHash(query.Origin.Hash)
  320. if origin != nil {
  321. query.Origin.Number = origin.Number.Uint64()
  322. }
  323. } else {
  324. origin = h.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
  325. }
  326. } else {
  327. origin = h.blockchain.GetHeaderByNumber(query.Origin.Number)
  328. }
  329. if origin == nil {
  330. break
  331. }
  332. headers = append(headers, origin)
  333. bytes += estHeaderRlpSize
  334. // Advance to the next header of the query
  335. switch {
  336. case hashMode && query.Reverse:
  337. // Hash based traversal towards the genesis block
  338. ancestor := query.Skip + 1
  339. if ancestor == 0 {
  340. unknown = true
  341. } else {
  342. query.Origin.Hash, query.Origin.Number = h.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
  343. unknown = query.Origin.Hash == common.Hash{}
  344. }
  345. case hashMode && !query.Reverse:
  346. // Hash based traversal towards the leaf block
  347. var (
  348. current = origin.Number.Uint64()
  349. next = current + query.Skip + 1
  350. )
  351. if next <= current {
  352. infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
  353. p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
  354. unknown = true
  355. } else {
  356. if header := h.blockchain.GetHeaderByNumber(next); header != nil {
  357. nextHash := header.Hash()
  358. expOldHash, _ := h.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
  359. if expOldHash == query.Origin.Hash {
  360. query.Origin.Hash, query.Origin.Number = nextHash, next
  361. } else {
  362. unknown = true
  363. }
  364. } else {
  365. unknown = true
  366. }
  367. }
  368. case query.Reverse:
  369. // Number based traversal towards the genesis block
  370. if query.Origin.Number >= query.Skip+1 {
  371. query.Origin.Number -= query.Skip + 1
  372. } else {
  373. unknown = true
  374. }
  375. case !query.Reverse:
  376. // Number based traversal towards the leaf block
  377. query.Origin.Number += query.Skip + 1
  378. }
  379. first = false
  380. }
  381. reply := p.replyBlockHeaders(req.ReqID, headers)
  382. sendResponse(req.ReqID, query.Amount, reply, task.done())
  383. if metrics.EnabledExpensive {
  384. miscOutHeaderPacketsMeter.Mark(1)
  385. miscOutHeaderTrafficMeter.Mark(int64(reply.size()))
  386. miscServingTimeHeaderTimer.Update(time.Duration(task.servingTime))
  387. }
  388. }()
  389. }
  390. case GetBlockBodiesMsg:
  391. p.Log().Trace("Received block bodies request")
  392. if metrics.EnabledExpensive {
  393. miscInBodyPacketsMeter.Mark(1)
  394. miscInBodyTrafficMeter.Mark(int64(msg.Size))
  395. }
  396. var req struct {
  397. ReqID uint64
  398. Hashes []common.Hash
  399. }
  400. if err := msg.Decode(&req); err != nil {
  401. clientErrorMeter.Mark(1)
  402. return errResp(ErrDecode, "msg %v: %v", msg, err)
  403. }
  404. var (
  405. bytes int
  406. bodies []rlp.RawValue
  407. )
  408. reqCnt := len(req.Hashes)
  409. if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
  410. wg.Add(1)
  411. go func() {
  412. defer wg.Done()
  413. for i, hash := range req.Hashes {
  414. if i != 0 && !task.waitOrStop() {
  415. sendResponse(req.ReqID, 0, nil, task.servingTime)
  416. return
  417. }
  418. if bytes >= softResponseLimit {
  419. break
  420. }
  421. body := h.blockchain.GetBodyRLP(hash)
  422. if body == nil {
  423. p.bumpInvalid()
  424. continue
  425. }
  426. bodies = append(bodies, body)
  427. bytes += len(body)
  428. }
  429. reply := p.replyBlockBodiesRLP(req.ReqID, bodies)
  430. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  431. if metrics.EnabledExpensive {
  432. miscOutBodyPacketsMeter.Mark(1)
  433. miscOutBodyTrafficMeter.Mark(int64(reply.size()))
  434. miscServingTimeBodyTimer.Update(time.Duration(task.servingTime))
  435. }
  436. }()
  437. }
  438. case GetCodeMsg:
  439. p.Log().Trace("Received code request")
  440. if metrics.EnabledExpensive {
  441. miscInCodePacketsMeter.Mark(1)
  442. miscInCodeTrafficMeter.Mark(int64(msg.Size))
  443. }
  444. var req struct {
  445. ReqID uint64
  446. Reqs []CodeReq
  447. }
  448. if err := msg.Decode(&req); err != nil {
  449. clientErrorMeter.Mark(1)
  450. return errResp(ErrDecode, "msg %v: %v", msg, err)
  451. }
  452. var (
  453. bytes int
  454. data [][]byte
  455. )
  456. reqCnt := len(req.Reqs)
  457. if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
  458. wg.Add(1)
  459. go func() {
  460. defer wg.Done()
  461. for i, request := range req.Reqs {
  462. if i != 0 && !task.waitOrStop() {
  463. sendResponse(req.ReqID, 0, nil, task.servingTime)
  464. return
  465. }
  466. // Look up the root hash belonging to the request
  467. header := h.blockchain.GetHeaderByHash(request.BHash)
  468. if header == nil {
  469. p.Log().Warn("Failed to retrieve associate header for code", "hash", request.BHash)
  470. p.bumpInvalid()
  471. continue
  472. }
  473. // Refuse to search stale state data in the database since looking for
  474. // a non-exist key is kind of expensive.
  475. local := h.blockchain.CurrentHeader().Number.Uint64()
  476. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  477. p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local)
  478. p.bumpInvalid()
  479. continue
  480. }
  481. triedb := h.blockchain.StateCache().TrieDB()
  482. account, err := h.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey))
  483. if err != nil {
  484. p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  485. p.bumpInvalid()
  486. continue
  487. }
  488. code, err := h.blockchain.StateCache().ContractCode(common.BytesToHash(request.AccKey), common.BytesToHash(account.CodeHash))
  489. if err != nil {
  490. p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err)
  491. continue
  492. }
  493. // Accumulate the code and abort if enough data was retrieved
  494. data = append(data, code)
  495. if bytes += len(code); bytes >= softResponseLimit {
  496. break
  497. }
  498. }
  499. reply := p.replyCode(req.ReqID, data)
  500. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  501. if metrics.EnabledExpensive {
  502. miscOutCodePacketsMeter.Mark(1)
  503. miscOutCodeTrafficMeter.Mark(int64(reply.size()))
  504. miscServingTimeCodeTimer.Update(time.Duration(task.servingTime))
  505. }
  506. }()
  507. }
  508. case GetReceiptsMsg:
  509. p.Log().Trace("Received receipts request")
  510. if metrics.EnabledExpensive {
  511. miscInReceiptPacketsMeter.Mark(1)
  512. miscInReceiptTrafficMeter.Mark(int64(msg.Size))
  513. }
  514. var req struct {
  515. ReqID uint64
  516. Hashes []common.Hash
  517. }
  518. if err := msg.Decode(&req); err != nil {
  519. clientErrorMeter.Mark(1)
  520. return errResp(ErrDecode, "msg %v: %v", msg, err)
  521. }
  522. var (
  523. bytes int
  524. receipts []rlp.RawValue
  525. )
  526. reqCnt := len(req.Hashes)
  527. if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
  528. wg.Add(1)
  529. go func() {
  530. defer wg.Done()
  531. for i, hash := range req.Hashes {
  532. if i != 0 && !task.waitOrStop() {
  533. sendResponse(req.ReqID, 0, nil, task.servingTime)
  534. return
  535. }
  536. if bytes >= softResponseLimit {
  537. break
  538. }
  539. // Retrieve the requested block's receipts, skipping if unknown to us
  540. results := h.blockchain.GetReceiptsByHash(hash)
  541. if results == nil {
  542. if header := h.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  543. p.bumpInvalid()
  544. continue
  545. }
  546. }
  547. // If known, encode and queue for response packet
  548. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  549. log.Error("Failed to encode receipt", "err", err)
  550. } else {
  551. receipts = append(receipts, encoded)
  552. bytes += len(encoded)
  553. }
  554. }
  555. reply := p.replyReceiptsRLP(req.ReqID, receipts)
  556. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  557. if metrics.EnabledExpensive {
  558. miscOutReceiptPacketsMeter.Mark(1)
  559. miscOutReceiptTrafficMeter.Mark(int64(reply.size()))
  560. miscServingTimeReceiptTimer.Update(time.Duration(task.servingTime))
  561. }
  562. }()
  563. }
  564. case GetProofsV2Msg:
  565. p.Log().Trace("Received les/2 proofs request")
  566. if metrics.EnabledExpensive {
  567. miscInTrieProofPacketsMeter.Mark(1)
  568. miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
  569. }
  570. var req struct {
  571. ReqID uint64
  572. Reqs []ProofReq
  573. }
  574. if err := msg.Decode(&req); err != nil {
  575. clientErrorMeter.Mark(1)
  576. return errResp(ErrDecode, "msg %v: %v", msg, err)
  577. }
  578. // Gather state data until the fetch or network limits is reached
  579. var (
  580. lastBHash common.Hash
  581. root common.Hash
  582. header *types.Header
  583. )
  584. reqCnt := len(req.Reqs)
  585. if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
  586. wg.Add(1)
  587. go func() {
  588. defer wg.Done()
  589. nodes := light.NewNodeSet()
  590. for i, request := range req.Reqs {
  591. if i != 0 && !task.waitOrStop() {
  592. sendResponse(req.ReqID, 0, nil, task.servingTime)
  593. return
  594. }
  595. // Look up the root hash belonging to the request
  596. if request.BHash != lastBHash {
  597. root, lastBHash = common.Hash{}, request.BHash
  598. if header = h.blockchain.GetHeaderByHash(request.BHash); header == nil {
  599. p.Log().Warn("Failed to retrieve header for proof", "hash", request.BHash)
  600. p.bumpInvalid()
  601. continue
  602. }
  603. // Refuse to search stale state data in the database since looking for
  604. // a non-exist key is kind of expensive.
  605. local := h.blockchain.CurrentHeader().Number.Uint64()
  606. if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
  607. p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local)
  608. p.bumpInvalid()
  609. continue
  610. }
  611. root = header.Root
  612. }
  613. // If a header lookup failed (non existent), ignore subsequent requests for the same header
  614. if root == (common.Hash{}) {
  615. p.bumpInvalid()
  616. continue
  617. }
  618. // Open the account or storage trie for the request
  619. statedb := h.blockchain.StateCache()
  620. var trie state.Trie
  621. switch len(request.AccKey) {
  622. case 0:
  623. // No account key specified, open an account trie
  624. trie, err = statedb.OpenTrie(root)
  625. if trie == nil || err != nil {
  626. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err)
  627. continue
  628. }
  629. default:
  630. // Account key specified, open a storage trie
  631. account, err := h.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey))
  632. if err != nil {
  633. p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
  634. p.bumpInvalid()
  635. continue
  636. }
  637. trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root)
  638. if trie == nil || err != nil {
  639. p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err)
  640. continue
  641. }
  642. }
  643. // Prove the user's request from the account or stroage trie
  644. if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil {
  645. p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err)
  646. continue
  647. }
  648. if nodes.DataSize() >= softResponseLimit {
  649. break
  650. }
  651. }
  652. reply := p.replyProofsV2(req.ReqID, nodes.NodeList())
  653. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  654. if metrics.EnabledExpensive {
  655. miscOutTrieProofPacketsMeter.Mark(1)
  656. miscOutTrieProofTrafficMeter.Mark(int64(reply.size()))
  657. miscServingTimeTrieProofTimer.Update(time.Duration(task.servingTime))
  658. }
  659. }()
  660. }
  661. case GetHelperTrieProofsMsg:
  662. p.Log().Trace("Received helper trie proof request")
  663. if metrics.EnabledExpensive {
  664. miscInHelperTriePacketsMeter.Mark(1)
  665. miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
  666. }
  667. var req struct {
  668. ReqID uint64
  669. Reqs []HelperTrieReq
  670. }
  671. if err := msg.Decode(&req); err != nil {
  672. clientErrorMeter.Mark(1)
  673. return errResp(ErrDecode, "msg %v: %v", msg, err)
  674. }
  675. // Gather state data until the fetch or network limits is reached
  676. var (
  677. auxBytes int
  678. auxData [][]byte
  679. )
  680. reqCnt := len(req.Reqs)
  681. if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
  682. wg.Add(1)
  683. go func() {
  684. defer wg.Done()
  685. var (
  686. lastIdx uint64
  687. lastType uint
  688. root common.Hash
  689. auxTrie *trie.Trie
  690. )
  691. nodes := light.NewNodeSet()
  692. for i, request := range req.Reqs {
  693. if i != 0 && !task.waitOrStop() {
  694. sendResponse(req.ReqID, 0, nil, task.servingTime)
  695. return
  696. }
  697. if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx {
  698. auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx
  699. var prefix string
  700. if root, prefix = h.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) {
  701. auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(h.chainDb, prefix)))
  702. }
  703. }
  704. if request.AuxReq == auxRoot {
  705. var data []byte
  706. if root != (common.Hash{}) {
  707. data = root[:]
  708. }
  709. auxData = append(auxData, data)
  710. auxBytes += len(data)
  711. } else {
  712. if auxTrie != nil {
  713. auxTrie.Prove(request.Key, request.FromLevel, nodes)
  714. }
  715. if request.AuxReq != 0 {
  716. data := h.getAuxiliaryHeaders(request)
  717. auxData = append(auxData, data)
  718. auxBytes += len(data)
  719. }
  720. }
  721. if nodes.DataSize()+auxBytes >= softResponseLimit {
  722. break
  723. }
  724. }
  725. reply := p.replyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData})
  726. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  727. if metrics.EnabledExpensive {
  728. miscOutHelperTriePacketsMeter.Mark(1)
  729. miscOutHelperTrieTrafficMeter.Mark(int64(reply.size()))
  730. miscServingTimeHelperTrieTimer.Update(time.Duration(task.servingTime))
  731. }
  732. }()
  733. }
  734. case SendTxV2Msg:
  735. p.Log().Trace("Received new transactions")
  736. if metrics.EnabledExpensive {
  737. miscInTxsPacketsMeter.Mark(1)
  738. miscInTxsTrafficMeter.Mark(int64(msg.Size))
  739. }
  740. var req struct {
  741. ReqID uint64
  742. Txs []*types.Transaction
  743. }
  744. if err := msg.Decode(&req); err != nil {
  745. clientErrorMeter.Mark(1)
  746. return errResp(ErrDecode, "msg %v: %v", msg, err)
  747. }
  748. reqCnt := len(req.Txs)
  749. if accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
  750. wg.Add(1)
  751. go func() {
  752. defer wg.Done()
  753. stats := make([]light.TxStatus, len(req.Txs))
  754. for i, tx := range req.Txs {
  755. if i != 0 && !task.waitOrStop() {
  756. return
  757. }
  758. hash := tx.Hash()
  759. stats[i] = h.txStatus(hash)
  760. if stats[i].Status == core.TxStatusUnknown {
  761. addFn := h.txpool.AddRemotes
  762. // Add txs synchronously for testing purpose
  763. if h.addTxsSync {
  764. addFn = h.txpool.AddRemotesSync
  765. }
  766. if errs := addFn([]*types.Transaction{tx}); errs[0] != nil {
  767. stats[i].Error = errs[0].Error()
  768. continue
  769. }
  770. stats[i] = h.txStatus(hash)
  771. }
  772. }
  773. reply := p.replyTxStatus(req.ReqID, stats)
  774. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  775. if metrics.EnabledExpensive {
  776. miscOutTxsPacketsMeter.Mark(1)
  777. miscOutTxsTrafficMeter.Mark(int64(reply.size()))
  778. miscServingTimeTxTimer.Update(time.Duration(task.servingTime))
  779. }
  780. }()
  781. }
  782. case GetTxStatusMsg:
  783. p.Log().Trace("Received transaction status query request")
  784. if metrics.EnabledExpensive {
  785. miscInTxStatusPacketsMeter.Mark(1)
  786. miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
  787. }
  788. var req struct {
  789. ReqID uint64
  790. Hashes []common.Hash
  791. }
  792. if err := msg.Decode(&req); err != nil {
  793. clientErrorMeter.Mark(1)
  794. return errResp(ErrDecode, "msg %v: %v", msg, err)
  795. }
  796. reqCnt := len(req.Hashes)
  797. if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
  798. wg.Add(1)
  799. go func() {
  800. defer wg.Done()
  801. stats := make([]light.TxStatus, len(req.Hashes))
  802. for i, hash := range req.Hashes {
  803. if i != 0 && !task.waitOrStop() {
  804. sendResponse(req.ReqID, 0, nil, task.servingTime)
  805. return
  806. }
  807. stats[i] = h.txStatus(hash)
  808. }
  809. reply := p.replyTxStatus(req.ReqID, stats)
  810. sendResponse(req.ReqID, uint64(reqCnt), reply, task.done())
  811. if metrics.EnabledExpensive {
  812. miscOutTxStatusPacketsMeter.Mark(1)
  813. miscOutTxStatusTrafficMeter.Mark(int64(reply.size()))
  814. miscServingTimeTxStatusTimer.Update(time.Duration(task.servingTime))
  815. }
  816. }()
  817. }
  818. default:
  819. p.Log().Trace("Received invalid message", "code", msg.Code)
  820. clientErrorMeter.Mark(1)
  821. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  822. }
  823. // If the client has made too much invalid request(e.g. request a non-existent data),
  824. // reject them to prevent SPAM attack.
  825. if p.getInvalid() > maxRequestErrors {
  826. clientErrorMeter.Mark(1)
  827. return errTooManyInvalidRequest
  828. }
  829. return nil
  830. }
  831. // getAccount retrieves an account from the state based on root.
  832. func (h *serverHandler) getAccount(triedb *trie.Database, root, hash common.Hash) (state.Account, error) {
  833. trie, err := trie.New(root, triedb)
  834. if err != nil {
  835. return state.Account{}, err
  836. }
  837. blob, err := trie.TryGet(hash[:])
  838. if err != nil {
  839. return state.Account{}, err
  840. }
  841. var account state.Account
  842. if err = rlp.DecodeBytes(blob, &account); err != nil {
  843. return state.Account{}, err
  844. }
  845. return account, nil
  846. }
  847. // getHelperTrie returns the post-processed trie root for the given trie ID and section index
  848. func (h *serverHandler) getHelperTrie(typ uint, index uint64) (common.Hash, string) {
  849. switch typ {
  850. case htCanonical:
  851. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.ChtSize-1)
  852. return light.GetChtRoot(h.chainDb, index, sectionHead), light.ChtTablePrefix
  853. case htBloomBits:
  854. sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.BloomTrieSize-1)
  855. return light.GetBloomTrieRoot(h.chainDb, index, sectionHead), light.BloomTrieTablePrefix
  856. }
  857. return common.Hash{}, ""
  858. }
  859. // getAuxiliaryHeaders returns requested auxiliary headers for the CHT request.
  860. func (h *serverHandler) getAuxiliaryHeaders(req HelperTrieReq) []byte {
  861. if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 {
  862. blockNum := binary.BigEndian.Uint64(req.Key)
  863. hash := rawdb.ReadCanonicalHash(h.chainDb, blockNum)
  864. return rawdb.ReadHeaderRLP(h.chainDb, hash, blockNum)
  865. }
  866. return nil
  867. }
  868. // txStatus returns the status of a specified transaction.
  869. func (h *serverHandler) txStatus(hash common.Hash) light.TxStatus {
  870. var stat light.TxStatus
  871. // Looking the transaction in txpool first.
  872. stat.Status = h.txpool.Status([]common.Hash{hash})[0]
  873. // If the transaction is unknown to the pool, try looking it up locally.
  874. if stat.Status == core.TxStatusUnknown {
  875. lookup := h.blockchain.GetTransactionLookup(hash)
  876. if lookup != nil {
  877. stat.Status = core.TxStatusIncluded
  878. stat.Lookup = lookup
  879. }
  880. }
  881. return stat
  882. }
  883. // broadcastLoop broadcasts new block information to all connected light
  884. // clients. According to the agreement between client and server, server should
  885. // only broadcast new announcement if the total difficulty is higher than the
  886. // last one. Besides server will add the signature if client requires.
  887. func (h *serverHandler) broadcastLoop() {
  888. defer h.wg.Done()
  889. headCh := make(chan core.ChainHeadEvent, 10)
  890. headSub := h.blockchain.SubscribeChainHeadEvent(headCh)
  891. defer headSub.Unsubscribe()
  892. var (
  893. lastHead *types.Header
  894. lastTd = common.Big0
  895. )
  896. for {
  897. select {
  898. case ev := <-headCh:
  899. header := ev.Block.Header()
  900. hash, number := header.Hash(), header.Number.Uint64()
  901. td := h.blockchain.GetTd(hash, number)
  902. if td == nil || td.Cmp(lastTd) <= 0 {
  903. continue
  904. }
  905. var reorg uint64
  906. if lastHead != nil {
  907. reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(h.chainDb, header, lastHead).Number.Uint64()
  908. }
  909. lastHead, lastTd = header, td
  910. log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
  911. h.server.broadcaster.broadcast(announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg})
  912. case <-h.closeCh:
  913. return
  914. }
  915. }
  916. }
  917. // broadcaster sends new header announcements to active client peers
  918. type broadcaster struct {
  919. ns *nodestate.NodeStateMachine
  920. privateKey *ecdsa.PrivateKey
  921. lastAnnounce, signedAnnounce announceData
  922. }
  923. // newBroadcaster creates a new broadcaster
  924. func newBroadcaster(ns *nodestate.NodeStateMachine) *broadcaster {
  925. b := &broadcaster{ns: ns}
  926. ns.SubscribeState(priorityPoolSetup.ActiveFlag, func(node *enode.Node, oldState, newState nodestate.Flags) {
  927. if newState.Equals(priorityPoolSetup.ActiveFlag) {
  928. // send last announcement to activated peers
  929. b.sendTo(node)
  930. }
  931. })
  932. return b
  933. }
  934. // setSignerKey sets the signer key for signed announcements. Should be called before
  935. // starting the protocol handler.
  936. func (b *broadcaster) setSignerKey(privateKey *ecdsa.PrivateKey) {
  937. b.privateKey = privateKey
  938. }
  939. // broadcast sends the given announcements to all active peers
  940. func (b *broadcaster) broadcast(announce announceData) {
  941. b.ns.Operation(func() {
  942. // iterate in an Operation to ensure that the active set does not change while iterating
  943. b.lastAnnounce = announce
  944. b.ns.ForEach(priorityPoolSetup.ActiveFlag, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
  945. b.sendTo(node)
  946. })
  947. })
  948. }
  949. // sendTo sends the most recent announcement to the given node unless the same or higher Td
  950. // announcement has already been sent.
  951. func (b *broadcaster) sendTo(node *enode.Node) {
  952. if b.lastAnnounce.Td == nil {
  953. return
  954. }
  955. if p, _ := b.ns.GetField(node, clientPeerField).(*clientPeer); p != nil {
  956. if p.headInfo.Td == nil || b.lastAnnounce.Td.Cmp(p.headInfo.Td) > 0 {
  957. announce := b.lastAnnounce
  958. switch p.announceType {
  959. case announceTypeSimple:
  960. if !p.queueSend(func() { p.sendAnnounce(announce) }) {
  961. log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
  962. } else {
  963. log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
  964. }
  965. case announceTypeSigned:
  966. if b.signedAnnounce.Hash != b.lastAnnounce.Hash {
  967. b.signedAnnounce = b.lastAnnounce
  968. b.signedAnnounce.sign(b.privateKey)
  969. }
  970. announce := b.signedAnnounce
  971. if !p.queueSend(func() { p.sendAnnounce(announce) }) {
  972. log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
  973. } else {
  974. log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
  975. }
  976. }
  977. p.headInfo = blockInfo{b.lastAnnounce.Hash, b.lastAnnounce.Number, b.lastAnnounce.Td}
  978. }
  979. }
  980. }