peer.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "math/rand"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/eth"
  30. "github.com/ethereum/go-ethereum/les/flowcontrol"
  31. "github.com/ethereum/go-ethereum/light"
  32. "github.com/ethereum/go-ethereum/p2p"
  33. "github.com/ethereum/go-ethereum/rlp"
  34. )
  35. var (
  36. errClosed = errors.New("peer set is closed")
  37. errAlreadyRegistered = errors.New("peer is already registered")
  38. errNotRegistered = errors.New("peer is not registered")
  39. )
  40. const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
  41. // capacity limitation for parameter updates
  42. const (
  43. allowedUpdateBytes = 100000 // initial/maximum allowed update size
  44. allowedUpdateRate = time.Millisecond * 10 // time constant for recharging one byte of allowance
  45. )
  46. const (
  47. freezeTimeBase = time.Millisecond * 700 // fixed component of client freeze time
  48. freezeTimeRandom = time.Millisecond * 600 // random component of client freeze time
  49. freezeCheckPeriod = time.Millisecond * 100 // buffer value recheck period after initial freeze time has elapsed
  50. )
  51. // if the total encoded size of a sent transaction batch is over txSizeCostLimit
  52. // per transaction then the request cost is calculated as proportional to the
  53. // encoded size instead of the transaction count
  54. const txSizeCostLimit = 0x4000
  55. const (
  56. announceTypeNone = iota
  57. announceTypeSimple
  58. announceTypeSigned
  59. )
  60. type peer struct {
  61. *p2p.Peer
  62. rw p2p.MsgReadWriter
  63. version int // Protocol version negotiated
  64. network uint64 // Network ID being on
  65. announceType uint64
  66. id string
  67. headInfo *announceData
  68. lock sync.RWMutex
  69. sendQueue *execQueue
  70. errCh chan error
  71. // responseLock ensures that responses are queued in the same order as
  72. // RequestProcessed is called
  73. responseLock sync.Mutex
  74. responseCount uint64
  75. poolEntry *poolEntry
  76. hasBlock func(common.Hash, uint64, bool) bool
  77. responseErrors int
  78. updateCounter uint64
  79. updateTime mclock.AbsTime
  80. frozen uint32 // 1 if client is in frozen state
  81. fcClient *flowcontrol.ClientNode // nil if the peer is server only
  82. fcServer *flowcontrol.ServerNode // nil if the peer is client only
  83. fcParams flowcontrol.ServerParams
  84. fcCosts requestCostTable
  85. isTrusted bool
  86. isOnlyAnnounce bool
  87. chainSince, chainRecent uint64
  88. stateSince, stateRecent uint64
  89. }
  90. func newPeer(version int, network uint64, isTrusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  91. return &peer{
  92. Peer: p,
  93. rw: rw,
  94. version: version,
  95. network: network,
  96. id: fmt.Sprintf("%x", p.ID().Bytes()),
  97. isTrusted: isTrusted,
  98. errCh: make(chan error, 1),
  99. }
  100. }
  101. // rejectUpdate returns true if a parameter update has to be rejected because
  102. // the size and/or rate of updates exceed the capacity limitation
  103. func (p *peer) rejectUpdate(size uint64) bool {
  104. now := mclock.Now()
  105. if p.updateCounter == 0 {
  106. p.updateTime = now
  107. } else {
  108. dt := now - p.updateTime
  109. r := uint64(dt / mclock.AbsTime(allowedUpdateRate))
  110. if p.updateCounter > r {
  111. p.updateCounter -= r
  112. p.updateTime += mclock.AbsTime(allowedUpdateRate * time.Duration(r))
  113. } else {
  114. p.updateCounter = 0
  115. p.updateTime = now
  116. }
  117. }
  118. p.updateCounter += size
  119. return p.updateCounter > allowedUpdateBytes
  120. }
  121. // freezeClient temporarily puts the client in a frozen state which means all
  122. // unprocessed and subsequent requests are dropped. Unfreezing happens automatically
  123. // after a short time if the client's buffer value is at least in the slightly positive
  124. // region. The client is also notified about being frozen/unfrozen with a Stop/Resume
  125. // message.
  126. func (p *peer) freezeClient() {
  127. if p.version < lpv3 {
  128. // if Stop/Resume is not supported then just drop the peer after setting
  129. // its frozen status permanently
  130. atomic.StoreUint32(&p.frozen, 1)
  131. p.Peer.Disconnect(p2p.DiscUselessPeer)
  132. return
  133. }
  134. if atomic.SwapUint32(&p.frozen, 1) == 0 {
  135. go func() {
  136. p.SendStop()
  137. time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom))))
  138. for {
  139. bufValue, bufLimit := p.fcClient.BufferStatus()
  140. if bufLimit == 0 {
  141. return
  142. }
  143. if bufValue <= bufLimit/8 {
  144. time.Sleep(freezeCheckPeriod)
  145. } else {
  146. atomic.StoreUint32(&p.frozen, 0)
  147. p.SendResume(bufValue)
  148. break
  149. }
  150. }
  151. }()
  152. }
  153. }
  154. // freezeServer processes Stop/Resume messages from the given server
  155. func (p *peer) freezeServer(frozen bool) {
  156. var f uint32
  157. if frozen {
  158. f = 1
  159. }
  160. if atomic.SwapUint32(&p.frozen, f) != f && frozen {
  161. p.sendQueue.clear()
  162. }
  163. }
  164. // isFrozen returns true if the client is frozen or the server has put our
  165. // client in frozen state
  166. func (p *peer) isFrozen() bool {
  167. return atomic.LoadUint32(&p.frozen) != 0
  168. }
  169. func (p *peer) canQueue() bool {
  170. return p.sendQueue.canQueue() && !p.isFrozen()
  171. }
  172. func (p *peer) queueSend(f func()) {
  173. p.sendQueue.queue(f)
  174. }
  175. // Info gathers and returns a collection of metadata known about a peer.
  176. func (p *peer) Info() *eth.PeerInfo {
  177. return &eth.PeerInfo{
  178. Version: p.version,
  179. Difficulty: p.Td(),
  180. Head: fmt.Sprintf("%x", p.Head()),
  181. }
  182. }
  183. // Head retrieves a copy of the current head (most recent) hash of the peer.
  184. func (p *peer) Head() (hash common.Hash) {
  185. p.lock.RLock()
  186. defer p.lock.RUnlock()
  187. copy(hash[:], p.headInfo.Hash[:])
  188. return hash
  189. }
  190. func (p *peer) HeadAndTd() (hash common.Hash, td *big.Int) {
  191. p.lock.RLock()
  192. defer p.lock.RUnlock()
  193. copy(hash[:], p.headInfo.Hash[:])
  194. return hash, p.headInfo.Td
  195. }
  196. func (p *peer) headBlockInfo() blockInfo {
  197. p.lock.RLock()
  198. defer p.lock.RUnlock()
  199. return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}
  200. }
  201. // Td retrieves the current total difficulty of a peer.
  202. func (p *peer) Td() *big.Int {
  203. p.lock.RLock()
  204. defer p.lock.RUnlock()
  205. return new(big.Int).Set(p.headInfo.Td)
  206. }
  207. // waitBefore implements distPeer interface
  208. func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
  209. return p.fcServer.CanSend(maxCost)
  210. }
  211. // updateCapacity updates the request serving capacity assigned to a given client
  212. // and also sends an announcement about the updated flow control parameters
  213. func (p *peer) updateCapacity(cap uint64) {
  214. p.responseLock.Lock()
  215. defer p.responseLock.Unlock()
  216. p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio}
  217. p.fcClient.UpdateParams(p.fcParams)
  218. var kvList keyValueList
  219. kvList = kvList.add("flowControl/MRR", cap)
  220. kvList = kvList.add("flowControl/BL", cap*bufLimitRatio)
  221. p.queueSend(func() { p.SendAnnounce(announceData{Update: kvList}) })
  222. }
  223. func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
  224. type req struct {
  225. ReqID uint64
  226. Data interface{}
  227. }
  228. return p2p.Send(w, msgcode, req{reqID, data})
  229. }
  230. // reply struct represents a reply with the actual data already RLP encoded and
  231. // only the bv (buffer value) missing. This allows the serving mechanism to
  232. // calculate the bv value which depends on the data size before sending the reply.
  233. type reply struct {
  234. w p2p.MsgWriter
  235. msgcode, reqID uint64
  236. data rlp.RawValue
  237. }
  238. // send sends the reply with the calculated buffer value
  239. func (r *reply) send(bv uint64) error {
  240. type resp struct {
  241. ReqID, BV uint64
  242. Data rlp.RawValue
  243. }
  244. return p2p.Send(r.w, r.msgcode, resp{r.reqID, bv, r.data})
  245. }
  246. // size returns the RLP encoded size of the message data
  247. func (r *reply) size() uint32 {
  248. return uint32(len(r.data))
  249. }
  250. func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
  251. p.lock.RLock()
  252. defer p.lock.RUnlock()
  253. costs := p.fcCosts[msgcode]
  254. if costs == nil {
  255. return 0
  256. }
  257. cost := costs.baseCost + costs.reqCost*uint64(amount)
  258. if cost > p.fcParams.BufLimit {
  259. cost = p.fcParams.BufLimit
  260. }
  261. return cost
  262. }
  263. func (p *peer) GetTxRelayCost(amount, size int) uint64 {
  264. p.lock.RLock()
  265. defer p.lock.RUnlock()
  266. costs := p.fcCosts[SendTxV2Msg]
  267. if costs == nil {
  268. return 0
  269. }
  270. cost := costs.baseCost + costs.reqCost*uint64(amount)
  271. sizeCost := costs.baseCost + costs.reqCost*uint64(size)/txSizeCostLimit
  272. if sizeCost > cost {
  273. cost = sizeCost
  274. }
  275. if cost > p.fcParams.BufLimit {
  276. cost = p.fcParams.BufLimit
  277. }
  278. return cost
  279. }
  280. // HasBlock checks if the peer has a given block
  281. func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool {
  282. var head, since, recent uint64
  283. p.lock.RLock()
  284. if p.headInfo != nil {
  285. head = p.headInfo.Number
  286. }
  287. if hasState {
  288. since = p.stateSince
  289. recent = p.stateRecent
  290. } else {
  291. since = p.chainSince
  292. recent = p.chainRecent
  293. }
  294. hasBlock := p.hasBlock
  295. p.lock.RUnlock()
  296. return head >= number && number >= since && (recent == 0 || number+recent+4 > head) && hasBlock != nil && hasBlock(hash, number, hasState)
  297. }
  298. // SendAnnounce announces the availability of a number of blocks through
  299. // a hash notification.
  300. func (p *peer) SendAnnounce(request announceData) error {
  301. return p2p.Send(p.rw, AnnounceMsg, request)
  302. }
  303. // SendStop notifies the client about being in frozen state
  304. func (p *peer) SendStop() error {
  305. return p2p.Send(p.rw, StopMsg, struct{}{})
  306. }
  307. // SendResume notifies the client about getting out of frozen state
  308. func (p *peer) SendResume(bv uint64) error {
  309. return p2p.Send(p.rw, ResumeMsg, bv)
  310. }
  311. // ReplyBlockHeaders creates a reply with a batch of block headers
  312. func (p *peer) ReplyBlockHeaders(reqID uint64, headers []*types.Header) *reply {
  313. data, _ := rlp.EncodeToBytes(headers)
  314. return &reply{p.rw, BlockHeadersMsg, reqID, data}
  315. }
  316. // ReplyBlockBodiesRLP creates a reply with a batch of block contents from
  317. // an already RLP encoded format.
  318. func (p *peer) ReplyBlockBodiesRLP(reqID uint64, bodies []rlp.RawValue) *reply {
  319. data, _ := rlp.EncodeToBytes(bodies)
  320. return &reply{p.rw, BlockBodiesMsg, reqID, data}
  321. }
  322. // ReplyCode creates a reply with a batch of arbitrary internal data, corresponding to the
  323. // hashes requested.
  324. func (p *peer) ReplyCode(reqID uint64, codes [][]byte) *reply {
  325. data, _ := rlp.EncodeToBytes(codes)
  326. return &reply{p.rw, CodeMsg, reqID, data}
  327. }
  328. // ReplyReceiptsRLP creates a reply with a batch of transaction receipts, corresponding to the
  329. // ones requested from an already RLP encoded format.
  330. func (p *peer) ReplyReceiptsRLP(reqID uint64, receipts []rlp.RawValue) *reply {
  331. data, _ := rlp.EncodeToBytes(receipts)
  332. return &reply{p.rw, ReceiptsMsg, reqID, data}
  333. }
  334. // ReplyProofsV2 creates a reply with a batch of merkle proofs, corresponding to the ones requested.
  335. func (p *peer) ReplyProofsV2(reqID uint64, proofs light.NodeList) *reply {
  336. data, _ := rlp.EncodeToBytes(proofs)
  337. return &reply{p.rw, ProofsV2Msg, reqID, data}
  338. }
  339. // ReplyHelperTrieProofs creates a reply with a batch of HelperTrie proofs, corresponding to the ones requested.
  340. func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply {
  341. data, _ := rlp.EncodeToBytes(resp)
  342. return &reply{p.rw, HelperTrieProofsMsg, reqID, data}
  343. }
  344. // ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested.
  345. func (p *peer) ReplyTxStatus(reqID uint64, stats []light.TxStatus) *reply {
  346. data, _ := rlp.EncodeToBytes(stats)
  347. return &reply{p.rw, TxStatusMsg, reqID, data}
  348. }
  349. // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
  350. // specified header query, based on the hash of an origin block.
  351. func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error {
  352. p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
  353. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  354. }
  355. // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
  356. // specified header query, based on the number of an origin block.
  357. func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error {
  358. p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
  359. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  360. }
  361. // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
  362. // specified.
  363. func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error {
  364. p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
  365. return sendRequest(p.rw, GetBlockBodiesMsg, reqID, cost, hashes)
  366. }
  367. // RequestCode fetches a batch of arbitrary data from a node's known state
  368. // data, corresponding to the specified hashes.
  369. func (p *peer) RequestCode(reqID, cost uint64, reqs []CodeReq) error {
  370. p.Log().Debug("Fetching batch of codes", "count", len(reqs))
  371. return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs)
  372. }
  373. // RequestReceipts fetches a batch of transaction receipts from a remote node.
  374. func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error {
  375. p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
  376. return sendRequest(p.rw, GetReceiptsMsg, reqID, cost, hashes)
  377. }
  378. // RequestProofs fetches a batch of merkle proofs from a remote node.
  379. func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error {
  380. p.Log().Debug("Fetching batch of proofs", "count", len(reqs))
  381. return sendRequest(p.rw, GetProofsV2Msg, reqID, cost, reqs)
  382. }
  383. // RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
  384. func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error {
  385. p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
  386. return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs)
  387. }
  388. // RequestTxStatus fetches a batch of transaction status records from a remote node.
  389. func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error {
  390. p.Log().Debug("Requesting transaction status", "count", len(txHashes))
  391. return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes)
  392. }
  393. // SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool.
  394. func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error {
  395. p.Log().Debug("Sending batch of transactions", "size", len(txs))
  396. return sendRequest(p.rw, SendTxV2Msg, reqID, cost, txs)
  397. }
  398. type keyValueEntry struct {
  399. Key string
  400. Value rlp.RawValue
  401. }
  402. type keyValueList []keyValueEntry
  403. type keyValueMap map[string]rlp.RawValue
  404. func (l keyValueList) add(key string, val interface{}) keyValueList {
  405. var entry keyValueEntry
  406. entry.Key = key
  407. if val == nil {
  408. val = uint64(0)
  409. }
  410. enc, err := rlp.EncodeToBytes(val)
  411. if err == nil {
  412. entry.Value = enc
  413. }
  414. return append(l, entry)
  415. }
  416. func (l keyValueList) decode() (keyValueMap, uint64) {
  417. m := make(keyValueMap)
  418. var size uint64
  419. for _, entry := range l {
  420. m[entry.Key] = entry.Value
  421. size += uint64(len(entry.Key)) + uint64(len(entry.Value)) + 8
  422. }
  423. return m, size
  424. }
  425. func (m keyValueMap) get(key string, val interface{}) error {
  426. enc, ok := m[key]
  427. if !ok {
  428. return errResp(ErrMissingKey, "%s", key)
  429. }
  430. if val == nil {
  431. return nil
  432. }
  433. return rlp.DecodeBytes(enc, val)
  434. }
  435. func (p *peer) sendReceiveHandshake(sendList keyValueList) (keyValueList, error) {
  436. // Send out own handshake in a new thread
  437. errc := make(chan error, 1)
  438. go func() {
  439. errc <- p2p.Send(p.rw, StatusMsg, sendList)
  440. }()
  441. // In the mean time retrieve the remote status message
  442. msg, err := p.rw.ReadMsg()
  443. if err != nil {
  444. return nil, err
  445. }
  446. if msg.Code != StatusMsg {
  447. return nil, errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
  448. }
  449. if msg.Size > ProtocolMaxMsgSize {
  450. return nil, errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  451. }
  452. // Decode the handshake
  453. var recvList keyValueList
  454. if err := msg.Decode(&recvList); err != nil {
  455. return nil, errResp(ErrDecode, "msg %v: %v", msg, err)
  456. }
  457. if err := <-errc; err != nil {
  458. return nil, err
  459. }
  460. return recvList, nil
  461. }
  462. // Handshake executes the les protocol handshake, negotiating version number,
  463. // network IDs, difficulties, head and genesis blocks.
  464. func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error {
  465. p.lock.Lock()
  466. defer p.lock.Unlock()
  467. var send keyValueList
  468. send = send.add("protocolVersion", uint64(p.version))
  469. send = send.add("networkId", p.network)
  470. send = send.add("headTd", td)
  471. send = send.add("headHash", head)
  472. send = send.add("headNum", headNum)
  473. send = send.add("genesisHash", genesis)
  474. if server != nil {
  475. if !server.onlyAnnounce {
  476. send = send.add("serveHeaders", nil)
  477. send = send.add("serveChainSince", uint64(0))
  478. send = send.add("serveStateSince", uint64(0))
  479. send = send.add("serveRecentState", uint64(core.TriesInMemory-4))
  480. send = send.add("txRelay", nil)
  481. }
  482. send = send.add("flowControl/BL", server.defParams.BufLimit)
  483. send = send.add("flowControl/MRR", server.defParams.MinRecharge)
  484. var costList RequestCostList
  485. if server.costTracker != nil {
  486. costList = server.costTracker.makeCostList(server.costTracker.globalFactor())
  487. } else {
  488. costList = testCostList(server.testCost)
  489. }
  490. send = send.add("flowControl/MRC", costList)
  491. p.fcCosts = costList.decode(ProtocolLengths[uint(p.version)])
  492. p.fcParams = server.defParams
  493. } else {
  494. //on client node
  495. p.announceType = announceTypeSimple
  496. if p.isTrusted {
  497. p.announceType = announceTypeSigned
  498. }
  499. send = send.add("announceType", p.announceType)
  500. }
  501. recvList, err := p.sendReceiveHandshake(send)
  502. if err != nil {
  503. return err
  504. }
  505. recv, size := recvList.decode()
  506. if p.rejectUpdate(size) {
  507. return errResp(ErrRequestRejected, "")
  508. }
  509. var rGenesis, rHash common.Hash
  510. var rVersion, rNetwork, rNum uint64
  511. var rTd *big.Int
  512. if err := recv.get("protocolVersion", &rVersion); err != nil {
  513. return err
  514. }
  515. if err := recv.get("networkId", &rNetwork); err != nil {
  516. return err
  517. }
  518. if err := recv.get("headTd", &rTd); err != nil {
  519. return err
  520. }
  521. if err := recv.get("headHash", &rHash); err != nil {
  522. return err
  523. }
  524. if err := recv.get("headNum", &rNum); err != nil {
  525. return err
  526. }
  527. if err := recv.get("genesisHash", &rGenesis); err != nil {
  528. return err
  529. }
  530. if rGenesis != genesis {
  531. return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", rGenesis[:8], genesis[:8])
  532. }
  533. if rNetwork != p.network {
  534. return errResp(ErrNetworkIdMismatch, "%d (!= %d)", rNetwork, p.network)
  535. }
  536. if int(rVersion) != p.version {
  537. return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version)
  538. }
  539. if server != nil {
  540. // until we have a proper peer connectivity API, allow LES connection to other servers
  541. /*if recv.get("serveStateSince", nil) == nil {
  542. return errResp(ErrUselessPeer, "wanted client, got server")
  543. }*/
  544. if recv.get("announceType", &p.announceType) != nil {
  545. //set default announceType on server side
  546. p.announceType = announceTypeSimple
  547. }
  548. p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams)
  549. } else {
  550. //mark OnlyAnnounce server if "serveHeaders", "serveChainSince", "serveStateSince" or "txRelay" fields don't exist
  551. if recv.get("serveChainSince", &p.chainSince) != nil {
  552. p.isOnlyAnnounce = true
  553. }
  554. if recv.get("serveRecentChain", &p.chainRecent) != nil {
  555. p.chainRecent = 0
  556. }
  557. if recv.get("serveStateSince", &p.stateSince) != nil {
  558. p.isOnlyAnnounce = true
  559. }
  560. if recv.get("serveRecentState", &p.stateRecent) != nil {
  561. p.stateRecent = 0
  562. }
  563. if recv.get("txRelay", nil) != nil {
  564. p.isOnlyAnnounce = true
  565. }
  566. if p.isOnlyAnnounce && !p.isTrusted {
  567. return errResp(ErrUselessPeer, "peer cannot serve requests")
  568. }
  569. var params flowcontrol.ServerParams
  570. if err := recv.get("flowControl/BL", &params.BufLimit); err != nil {
  571. return err
  572. }
  573. if err := recv.get("flowControl/MRR", &params.MinRecharge); err != nil {
  574. return err
  575. }
  576. var MRC RequestCostList
  577. if err := recv.get("flowControl/MRC", &MRC); err != nil {
  578. return err
  579. }
  580. p.fcParams = params
  581. p.fcServer = flowcontrol.NewServerNode(params, &mclock.System{})
  582. p.fcCosts = MRC.decode(ProtocolLengths[uint(p.version)])
  583. if !p.isOnlyAnnounce {
  584. for msgCode := range reqAvgTimeCost {
  585. if p.fcCosts[msgCode] == nil {
  586. return errResp(ErrUselessPeer, "peer does not support message %d", msgCode)
  587. }
  588. }
  589. }
  590. }
  591. p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
  592. return nil
  593. }
  594. // updateFlowControl updates the flow control parameters belonging to the server
  595. // node if the announced key/value set contains relevant fields
  596. func (p *peer) updateFlowControl(update keyValueMap) {
  597. if p.fcServer == nil {
  598. return
  599. }
  600. params := p.fcParams
  601. updateParams := false
  602. if update.get("flowControl/BL", &params.BufLimit) == nil {
  603. updateParams = true
  604. }
  605. if update.get("flowControl/MRR", &params.MinRecharge) == nil {
  606. updateParams = true
  607. }
  608. if updateParams {
  609. p.fcParams = params
  610. p.fcServer.UpdateParams(params)
  611. }
  612. var MRC RequestCostList
  613. if update.get("flowControl/MRC", &MRC) == nil {
  614. costUpdate := MRC.decode(ProtocolLengths[uint(p.version)])
  615. for code, cost := range costUpdate {
  616. p.fcCosts[code] = cost
  617. }
  618. }
  619. }
  620. // String implements fmt.Stringer.
  621. func (p *peer) String() string {
  622. return fmt.Sprintf("Peer %s [%s]", p.id,
  623. fmt.Sprintf("les/%d", p.version),
  624. )
  625. }
  626. // peerSetNotify is a callback interface to notify services about added or
  627. // removed peers
  628. type peerSetNotify interface {
  629. registerPeer(*peer)
  630. unregisterPeer(*peer)
  631. }
  632. // peerSet represents the collection of active peers currently participating in
  633. // the Light Ethereum sub-protocol.
  634. type peerSet struct {
  635. peers map[string]*peer
  636. lock sync.RWMutex
  637. notifyList []peerSetNotify
  638. closed bool
  639. }
  640. // newPeerSet creates a new peer set to track the active participants.
  641. func newPeerSet() *peerSet {
  642. return &peerSet{
  643. peers: make(map[string]*peer),
  644. }
  645. }
  646. // notify adds a service to be notified about added or removed peers
  647. func (ps *peerSet) notify(n peerSetNotify) {
  648. ps.lock.Lock()
  649. ps.notifyList = append(ps.notifyList, n)
  650. peers := make([]*peer, 0, len(ps.peers))
  651. for _, p := range ps.peers {
  652. peers = append(peers, p)
  653. }
  654. ps.lock.Unlock()
  655. for _, p := range peers {
  656. n.registerPeer(p)
  657. }
  658. }
  659. // Register injects a new peer into the working set, or returns an error if the
  660. // peer is already known.
  661. func (ps *peerSet) Register(p *peer) error {
  662. ps.lock.Lock()
  663. if ps.closed {
  664. ps.lock.Unlock()
  665. return errClosed
  666. }
  667. if _, ok := ps.peers[p.id]; ok {
  668. ps.lock.Unlock()
  669. return errAlreadyRegistered
  670. }
  671. ps.peers[p.id] = p
  672. p.sendQueue = newExecQueue(100)
  673. peers := make([]peerSetNotify, len(ps.notifyList))
  674. copy(peers, ps.notifyList)
  675. ps.lock.Unlock()
  676. for _, n := range peers {
  677. n.registerPeer(p)
  678. }
  679. return nil
  680. }
  681. // Unregister removes a remote peer from the active set, disabling any further
  682. // actions to/from that particular entity. It also initiates disconnection at the networking layer.
  683. func (ps *peerSet) Unregister(id string) error {
  684. ps.lock.Lock()
  685. if p, ok := ps.peers[id]; !ok {
  686. ps.lock.Unlock()
  687. return errNotRegistered
  688. } else {
  689. delete(ps.peers, id)
  690. peers := make([]peerSetNotify, len(ps.notifyList))
  691. copy(peers, ps.notifyList)
  692. ps.lock.Unlock()
  693. for _, n := range peers {
  694. n.unregisterPeer(p)
  695. }
  696. p.sendQueue.quit()
  697. p.Peer.Disconnect(p2p.DiscUselessPeer)
  698. return nil
  699. }
  700. }
  701. // AllPeerIDs returns a list of all registered peer IDs
  702. func (ps *peerSet) AllPeerIDs() []string {
  703. ps.lock.RLock()
  704. defer ps.lock.RUnlock()
  705. res := make([]string, len(ps.peers))
  706. idx := 0
  707. for id := range ps.peers {
  708. res[idx] = id
  709. idx++
  710. }
  711. return res
  712. }
  713. // Peer retrieves the registered peer with the given id.
  714. func (ps *peerSet) Peer(id string) *peer {
  715. ps.lock.RLock()
  716. defer ps.lock.RUnlock()
  717. return ps.peers[id]
  718. }
  719. // Len returns if the current number of peers in the set.
  720. func (ps *peerSet) Len() int {
  721. ps.lock.RLock()
  722. defer ps.lock.RUnlock()
  723. return len(ps.peers)
  724. }
  725. // BestPeer retrieves the known peer with the currently highest total difficulty.
  726. func (ps *peerSet) BestPeer() *peer {
  727. ps.lock.RLock()
  728. defer ps.lock.RUnlock()
  729. var (
  730. bestPeer *peer
  731. bestTd *big.Int
  732. )
  733. for _, p := range ps.peers {
  734. if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  735. bestPeer, bestTd = p, td
  736. }
  737. }
  738. return bestPeer
  739. }
  740. // AllPeers returns all peers in a list
  741. func (ps *peerSet) AllPeers() []*peer {
  742. ps.lock.RLock()
  743. defer ps.lock.RUnlock()
  744. list := make([]*peer, len(ps.peers))
  745. i := 0
  746. for _, peer := range ps.peers {
  747. list[i] = peer
  748. i++
  749. }
  750. return list
  751. }
  752. // Close disconnects all peers.
  753. // No new peers can be registered after Close has returned.
  754. func (ps *peerSet) Close() {
  755. ps.lock.Lock()
  756. defer ps.lock.Unlock()
  757. for _, p := range ps.peers {
  758. p.Disconnect(p2p.DiscQuitting)
  759. }
  760. ps.closed = true
  761. }