peer.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package les
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "math/rand"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/eth"
  30. "github.com/ethereum/go-ethereum/les/flowcontrol"
  31. "github.com/ethereum/go-ethereum/light"
  32. "github.com/ethereum/go-ethereum/p2p"
  33. "github.com/ethereum/go-ethereum/params"
  34. "github.com/ethereum/go-ethereum/rlp"
  35. )
  36. var (
  37. errClosed = errors.New("peer set is closed")
  38. errAlreadyRegistered = errors.New("peer is already registered")
  39. errNotRegistered = errors.New("peer is not registered")
  40. )
  41. const (
  42. maxRequestErrors = 20 // number of invalid requests tolerated (makes the protocol less brittle but still avoids spam)
  43. maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
  44. )
  45. // capacity limitation for parameter updates
  46. const (
  47. allowedUpdateBytes = 100000 // initial/maximum allowed update size
  48. allowedUpdateRate = time.Millisecond * 10 // time constant for recharging one byte of allowance
  49. )
  50. const (
  51. freezeTimeBase = time.Millisecond * 700 // fixed component of client freeze time
  52. freezeTimeRandom = time.Millisecond * 600 // random component of client freeze time
  53. freezeCheckPeriod = time.Millisecond * 100 // buffer value recheck period after initial freeze time has elapsed
  54. )
  55. // if the total encoded size of a sent transaction batch is over txSizeCostLimit
  56. // per transaction then the request cost is calculated as proportional to the
  57. // encoded size instead of the transaction count
  58. const txSizeCostLimit = 0x4000
  59. const (
  60. announceTypeNone = iota
  61. announceTypeSimple
  62. announceTypeSigned
  63. )
  64. type peer struct {
  65. *p2p.Peer
  66. rw p2p.MsgReadWriter
  67. version int // Protocol version negotiated
  68. network uint64 // Network ID being on
  69. announceType uint64
  70. // Checkpoint relative fields
  71. checkpoint params.TrustedCheckpoint
  72. checkpointNumber uint64
  73. id string
  74. headInfo *announceData
  75. lock sync.RWMutex
  76. sendQueue *execQueue
  77. errCh chan error
  78. // responseLock ensures that responses are queued in the same order as
  79. // RequestProcessed is called
  80. responseLock sync.Mutex
  81. responseCount uint64
  82. invalidCount uint32
  83. poolEntry *poolEntry
  84. hasBlock func(common.Hash, uint64, bool) bool
  85. responseErrors int
  86. updateCounter uint64
  87. updateTime mclock.AbsTime
  88. frozen uint32 // 1 if client is in frozen state
  89. fcClient *flowcontrol.ClientNode // nil if the peer is server only
  90. fcServer *flowcontrol.ServerNode // nil if the peer is client only
  91. fcParams flowcontrol.ServerParams
  92. fcCosts requestCostTable
  93. trusted bool
  94. onlyAnnounce bool
  95. chainSince, chainRecent uint64
  96. stateSince, stateRecent uint64
  97. }
  98. func newPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  99. return &peer{
  100. Peer: p,
  101. rw: rw,
  102. version: version,
  103. network: network,
  104. id: fmt.Sprintf("%x", p.ID().Bytes()),
  105. trusted: trusted,
  106. errCh: make(chan error, 1),
  107. }
  108. }
  109. // rejectUpdate returns true if a parameter update has to be rejected because
  110. // the size and/or rate of updates exceed the capacity limitation
  111. func (p *peer) rejectUpdate(size uint64) bool {
  112. now := mclock.Now()
  113. if p.updateCounter == 0 {
  114. p.updateTime = now
  115. } else {
  116. dt := now - p.updateTime
  117. r := uint64(dt / mclock.AbsTime(allowedUpdateRate))
  118. if p.updateCounter > r {
  119. p.updateCounter -= r
  120. p.updateTime += mclock.AbsTime(allowedUpdateRate * time.Duration(r))
  121. } else {
  122. p.updateCounter = 0
  123. p.updateTime = now
  124. }
  125. }
  126. p.updateCounter += size
  127. return p.updateCounter > allowedUpdateBytes
  128. }
  129. // freezeClient temporarily puts the client in a frozen state which means all
  130. // unprocessed and subsequent requests are dropped. Unfreezing happens automatically
  131. // after a short time if the client's buffer value is at least in the slightly positive
  132. // region. The client is also notified about being frozen/unfrozen with a Stop/Resume
  133. // message.
  134. func (p *peer) freezeClient() {
  135. if p.version < lpv3 {
  136. // if Stop/Resume is not supported then just drop the peer after setting
  137. // its frozen status permanently
  138. atomic.StoreUint32(&p.frozen, 1)
  139. p.Peer.Disconnect(p2p.DiscUselessPeer)
  140. return
  141. }
  142. if atomic.SwapUint32(&p.frozen, 1) == 0 {
  143. go func() {
  144. p.SendStop()
  145. time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom))))
  146. for {
  147. bufValue, bufLimit := p.fcClient.BufferStatus()
  148. if bufLimit == 0 {
  149. return
  150. }
  151. if bufValue <= bufLimit/8 {
  152. time.Sleep(freezeCheckPeriod)
  153. } else {
  154. atomic.StoreUint32(&p.frozen, 0)
  155. p.SendResume(bufValue)
  156. break
  157. }
  158. }
  159. }()
  160. }
  161. }
  162. // freezeServer processes Stop/Resume messages from the given server
  163. func (p *peer) freezeServer(frozen bool) {
  164. var f uint32
  165. if frozen {
  166. f = 1
  167. }
  168. if atomic.SwapUint32(&p.frozen, f) != f && frozen {
  169. p.sendQueue.clear()
  170. }
  171. }
  172. // isFrozen returns true if the client is frozen or the server has put our
  173. // client in frozen state
  174. func (p *peer) isFrozen() bool {
  175. return atomic.LoadUint32(&p.frozen) != 0
  176. }
  177. func (p *peer) canQueue() bool {
  178. return p.sendQueue.canQueue() && !p.isFrozen()
  179. }
  180. func (p *peer) queueSend(f func()) {
  181. p.sendQueue.queue(f)
  182. }
  183. // Info gathers and returns a collection of metadata known about a peer.
  184. func (p *peer) Info() *eth.PeerInfo {
  185. return &eth.PeerInfo{
  186. Version: p.version,
  187. Difficulty: p.Td(),
  188. Head: fmt.Sprintf("%x", p.Head()),
  189. }
  190. }
  191. // Head retrieves a copy of the current head (most recent) hash of the peer.
  192. func (p *peer) Head() (hash common.Hash) {
  193. p.lock.RLock()
  194. defer p.lock.RUnlock()
  195. copy(hash[:], p.headInfo.Hash[:])
  196. return hash
  197. }
  198. func (p *peer) HeadAndTd() (hash common.Hash, td *big.Int) {
  199. p.lock.RLock()
  200. defer p.lock.RUnlock()
  201. copy(hash[:], p.headInfo.Hash[:])
  202. return hash, p.headInfo.Td
  203. }
  204. func (p *peer) headBlockInfo() blockInfo {
  205. p.lock.RLock()
  206. defer p.lock.RUnlock()
  207. return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}
  208. }
  209. // Td retrieves the current total difficulty of a peer.
  210. func (p *peer) Td() *big.Int {
  211. p.lock.RLock()
  212. defer p.lock.RUnlock()
  213. return new(big.Int).Set(p.headInfo.Td)
  214. }
  215. // waitBefore implements distPeer interface
  216. func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
  217. return p.fcServer.CanSend(maxCost)
  218. }
  219. // updateCapacity updates the request serving capacity assigned to a given client
  220. // and also sends an announcement about the updated flow control parameters
  221. func (p *peer) updateCapacity(cap uint64) {
  222. p.responseLock.Lock()
  223. defer p.responseLock.Unlock()
  224. p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio}
  225. p.fcClient.UpdateParams(p.fcParams)
  226. var kvList keyValueList
  227. kvList = kvList.add("flowControl/MRR", cap)
  228. kvList = kvList.add("flowControl/BL", cap*bufLimitRatio)
  229. p.queueSend(func() { p.SendAnnounce(announceData{Update: kvList}) })
  230. }
  231. func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
  232. type req struct {
  233. ReqID uint64
  234. Data interface{}
  235. }
  236. return p2p.Send(w, msgcode, req{reqID, data})
  237. }
  238. // reply struct represents a reply with the actual data already RLP encoded and
  239. // only the bv (buffer value) missing. This allows the serving mechanism to
  240. // calculate the bv value which depends on the data size before sending the reply.
  241. type reply struct {
  242. w p2p.MsgWriter
  243. msgcode, reqID uint64
  244. data rlp.RawValue
  245. }
  246. // send sends the reply with the calculated buffer value
  247. func (r *reply) send(bv uint64) error {
  248. type resp struct {
  249. ReqID, BV uint64
  250. Data rlp.RawValue
  251. }
  252. return p2p.Send(r.w, r.msgcode, resp{r.reqID, bv, r.data})
  253. }
  254. // size returns the RLP encoded size of the message data
  255. func (r *reply) size() uint32 {
  256. return uint32(len(r.data))
  257. }
  258. func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
  259. p.lock.RLock()
  260. defer p.lock.RUnlock()
  261. costs := p.fcCosts[msgcode]
  262. if costs == nil {
  263. return 0
  264. }
  265. cost := costs.baseCost + costs.reqCost*uint64(amount)
  266. if cost > p.fcParams.BufLimit {
  267. cost = p.fcParams.BufLimit
  268. }
  269. return cost
  270. }
  271. func (p *peer) GetTxRelayCost(amount, size int) uint64 {
  272. p.lock.RLock()
  273. defer p.lock.RUnlock()
  274. costs := p.fcCosts[SendTxV2Msg]
  275. if costs == nil {
  276. return 0
  277. }
  278. cost := costs.baseCost + costs.reqCost*uint64(amount)
  279. sizeCost := costs.baseCost + costs.reqCost*uint64(size)/txSizeCostLimit
  280. if sizeCost > cost {
  281. cost = sizeCost
  282. }
  283. if cost > p.fcParams.BufLimit {
  284. cost = p.fcParams.BufLimit
  285. }
  286. return cost
  287. }
  288. // HasBlock checks if the peer has a given block
  289. func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool {
  290. var head, since, recent uint64
  291. p.lock.RLock()
  292. if p.headInfo != nil {
  293. head = p.headInfo.Number
  294. }
  295. if hasState {
  296. since = p.stateSince
  297. recent = p.stateRecent
  298. } else {
  299. since = p.chainSince
  300. recent = p.chainRecent
  301. }
  302. hasBlock := p.hasBlock
  303. p.lock.RUnlock()
  304. return head >= number && number >= since && (recent == 0 || number+recent+4 > head) && hasBlock != nil && hasBlock(hash, number, hasState)
  305. }
  306. // SendAnnounce announces the availability of a number of blocks through
  307. // a hash notification.
  308. func (p *peer) SendAnnounce(request announceData) error {
  309. return p2p.Send(p.rw, AnnounceMsg, request)
  310. }
  311. // SendStop notifies the client about being in frozen state
  312. func (p *peer) SendStop() error {
  313. return p2p.Send(p.rw, StopMsg, struct{}{})
  314. }
  315. // SendResume notifies the client about getting out of frozen state
  316. func (p *peer) SendResume(bv uint64) error {
  317. return p2p.Send(p.rw, ResumeMsg, bv)
  318. }
  319. // ReplyBlockHeaders creates a reply with a batch of block headers
  320. func (p *peer) ReplyBlockHeaders(reqID uint64, headers []*types.Header) *reply {
  321. data, _ := rlp.EncodeToBytes(headers)
  322. return &reply{p.rw, BlockHeadersMsg, reqID, data}
  323. }
  324. // ReplyBlockBodiesRLP creates a reply with a batch of block contents from
  325. // an already RLP encoded format.
  326. func (p *peer) ReplyBlockBodiesRLP(reqID uint64, bodies []rlp.RawValue) *reply {
  327. data, _ := rlp.EncodeToBytes(bodies)
  328. return &reply{p.rw, BlockBodiesMsg, reqID, data}
  329. }
  330. // ReplyCode creates a reply with a batch of arbitrary internal data, corresponding to the
  331. // hashes requested.
  332. func (p *peer) ReplyCode(reqID uint64, codes [][]byte) *reply {
  333. data, _ := rlp.EncodeToBytes(codes)
  334. return &reply{p.rw, CodeMsg, reqID, data}
  335. }
  336. // ReplyReceiptsRLP creates a reply with a batch of transaction receipts, corresponding to the
  337. // ones requested from an already RLP encoded format.
  338. func (p *peer) ReplyReceiptsRLP(reqID uint64, receipts []rlp.RawValue) *reply {
  339. data, _ := rlp.EncodeToBytes(receipts)
  340. return &reply{p.rw, ReceiptsMsg, reqID, data}
  341. }
  342. // ReplyProofsV2 creates a reply with a batch of merkle proofs, corresponding to the ones requested.
  343. func (p *peer) ReplyProofsV2(reqID uint64, proofs light.NodeList) *reply {
  344. data, _ := rlp.EncodeToBytes(proofs)
  345. return &reply{p.rw, ProofsV2Msg, reqID, data}
  346. }
  347. // ReplyHelperTrieProofs creates a reply with a batch of HelperTrie proofs, corresponding to the ones requested.
  348. func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply {
  349. data, _ := rlp.EncodeToBytes(resp)
  350. return &reply{p.rw, HelperTrieProofsMsg, reqID, data}
  351. }
  352. // ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested.
  353. func (p *peer) ReplyTxStatus(reqID uint64, stats []light.TxStatus) *reply {
  354. data, _ := rlp.EncodeToBytes(stats)
  355. return &reply{p.rw, TxStatusMsg, reqID, data}
  356. }
  357. // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
  358. // specified header query, based on the hash of an origin block.
  359. func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error {
  360. p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
  361. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  362. }
  363. // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
  364. // specified header query, based on the number of an origin block.
  365. func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error {
  366. p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
  367. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  368. }
  369. // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
  370. // specified.
  371. func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error {
  372. p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
  373. return sendRequest(p.rw, GetBlockBodiesMsg, reqID, cost, hashes)
  374. }
  375. // RequestCode fetches a batch of arbitrary data from a node's known state
  376. // data, corresponding to the specified hashes.
  377. func (p *peer) RequestCode(reqID, cost uint64, reqs []CodeReq) error {
  378. p.Log().Debug("Fetching batch of codes", "count", len(reqs))
  379. return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs)
  380. }
  381. // RequestReceipts fetches a batch of transaction receipts from a remote node.
  382. func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error {
  383. p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
  384. return sendRequest(p.rw, GetReceiptsMsg, reqID, cost, hashes)
  385. }
  386. // RequestProofs fetches a batch of merkle proofs from a remote node.
  387. func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error {
  388. p.Log().Debug("Fetching batch of proofs", "count", len(reqs))
  389. return sendRequest(p.rw, GetProofsV2Msg, reqID, cost, reqs)
  390. }
  391. // RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
  392. func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error {
  393. p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
  394. return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs)
  395. }
  396. // RequestTxStatus fetches a batch of transaction status records from a remote node.
  397. func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error {
  398. p.Log().Debug("Requesting transaction status", "count", len(txHashes))
  399. return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes)
  400. }
  401. // SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool.
  402. func (p *peer) SendTxs(reqID, cost uint64, txs rlp.RawValue) error {
  403. p.Log().Debug("Sending batch of transactions", "size", len(txs))
  404. return sendRequest(p.rw, SendTxV2Msg, reqID, cost, txs)
  405. }
  406. type keyValueEntry struct {
  407. Key string
  408. Value rlp.RawValue
  409. }
  410. type keyValueList []keyValueEntry
  411. type keyValueMap map[string]rlp.RawValue
  412. func (l keyValueList) add(key string, val interface{}) keyValueList {
  413. var entry keyValueEntry
  414. entry.Key = key
  415. if val == nil {
  416. val = uint64(0)
  417. }
  418. enc, err := rlp.EncodeToBytes(val)
  419. if err == nil {
  420. entry.Value = enc
  421. }
  422. return append(l, entry)
  423. }
  424. func (l keyValueList) decode() (keyValueMap, uint64) {
  425. m := make(keyValueMap)
  426. var size uint64
  427. for _, entry := range l {
  428. m[entry.Key] = entry.Value
  429. size += uint64(len(entry.Key)) + uint64(len(entry.Value)) + 8
  430. }
  431. return m, size
  432. }
  433. func (m keyValueMap) get(key string, val interface{}) error {
  434. enc, ok := m[key]
  435. if !ok {
  436. return errResp(ErrMissingKey, "%s", key)
  437. }
  438. if val == nil {
  439. return nil
  440. }
  441. return rlp.DecodeBytes(enc, val)
  442. }
  443. func (p *peer) sendReceiveHandshake(sendList keyValueList) (keyValueList, error) {
  444. // Send out own handshake in a new thread
  445. errc := make(chan error, 1)
  446. go func() {
  447. errc <- p2p.Send(p.rw, StatusMsg, sendList)
  448. }()
  449. // In the mean time retrieve the remote status message
  450. msg, err := p.rw.ReadMsg()
  451. if err != nil {
  452. return nil, err
  453. }
  454. if msg.Code != StatusMsg {
  455. return nil, errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
  456. }
  457. if msg.Size > ProtocolMaxMsgSize {
  458. return nil, errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  459. }
  460. // Decode the handshake
  461. var recvList keyValueList
  462. if err := msg.Decode(&recvList); err != nil {
  463. return nil, errResp(ErrDecode, "msg %v: %v", msg, err)
  464. }
  465. if err := <-errc; err != nil {
  466. return nil, err
  467. }
  468. return recvList, nil
  469. }
  470. // Handshake executes the les protocol handshake, negotiating version number,
  471. // network IDs, difficulties, head and genesis blocks.
  472. func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error {
  473. p.lock.Lock()
  474. defer p.lock.Unlock()
  475. var send keyValueList
  476. send = send.add("protocolVersion", uint64(p.version))
  477. send = send.add("networkId", p.network)
  478. send = send.add("headTd", td)
  479. send = send.add("headHash", head)
  480. send = send.add("headNum", headNum)
  481. send = send.add("genesisHash", genesis)
  482. if server != nil {
  483. if !server.onlyAnnounce {
  484. send = send.add("serveHeaders", nil)
  485. send = send.add("serveChainSince", uint64(0))
  486. send = send.add("serveStateSince", uint64(0))
  487. // If local ethereum node is running in archive mode, advertise ourselves we have
  488. // all version state data. Otherwise only recent state is available.
  489. stateRecent := uint64(core.TriesInMemory - 4)
  490. if server.archiveMode {
  491. stateRecent = 0
  492. }
  493. send = send.add("serveRecentState", stateRecent)
  494. send = send.add("txRelay", nil)
  495. }
  496. send = send.add("flowControl/BL", server.defParams.BufLimit)
  497. send = send.add("flowControl/MRR", server.defParams.MinRecharge)
  498. var costList RequestCostList
  499. if server.costTracker != nil {
  500. costList = server.costTracker.makeCostList(server.costTracker.globalFactor())
  501. } else {
  502. costList = testCostList(server.testCost)
  503. }
  504. send = send.add("flowControl/MRC", costList)
  505. p.fcCosts = costList.decode(ProtocolLengths[uint(p.version)])
  506. p.fcParams = server.defParams
  507. if server.protocolManager != nil && server.protocolManager.reg != nil && server.protocolManager.reg.isRunning() {
  508. cp, height := server.protocolManager.reg.stableCheckpoint()
  509. if cp != nil {
  510. send = send.add("checkpoint/value", cp)
  511. send = send.add("checkpoint/registerHeight", height)
  512. }
  513. }
  514. } else {
  515. //on client node
  516. p.announceType = announceTypeSimple
  517. if p.trusted {
  518. p.announceType = announceTypeSigned
  519. }
  520. send = send.add("announceType", p.announceType)
  521. }
  522. recvList, err := p.sendReceiveHandshake(send)
  523. if err != nil {
  524. return err
  525. }
  526. recv, size := recvList.decode()
  527. if p.rejectUpdate(size) {
  528. return errResp(ErrRequestRejected, "")
  529. }
  530. var rGenesis, rHash common.Hash
  531. var rVersion, rNetwork, rNum uint64
  532. var rTd *big.Int
  533. if err := recv.get("protocolVersion", &rVersion); err != nil {
  534. return err
  535. }
  536. if err := recv.get("networkId", &rNetwork); err != nil {
  537. return err
  538. }
  539. if err := recv.get("headTd", &rTd); err != nil {
  540. return err
  541. }
  542. if err := recv.get("headHash", &rHash); err != nil {
  543. return err
  544. }
  545. if err := recv.get("headNum", &rNum); err != nil {
  546. return err
  547. }
  548. if err := recv.get("genesisHash", &rGenesis); err != nil {
  549. return err
  550. }
  551. if rGenesis != genesis {
  552. return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", rGenesis[:8], genesis[:8])
  553. }
  554. if rNetwork != p.network {
  555. return errResp(ErrNetworkIdMismatch, "%d (!= %d)", rNetwork, p.network)
  556. }
  557. if int(rVersion) != p.version {
  558. return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version)
  559. }
  560. if server != nil {
  561. // until we have a proper peer connectivity API, allow LES connection to other servers
  562. /*if recv.get("serveStateSince", nil) == nil {
  563. return errResp(ErrUselessPeer, "wanted client, got server")
  564. }*/
  565. if recv.get("announceType", &p.announceType) != nil {
  566. //set default announceType on server side
  567. p.announceType = announceTypeSimple
  568. }
  569. p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams)
  570. } else {
  571. //mark OnlyAnnounce server if "serveHeaders", "serveChainSince", "serveStateSince" or "txRelay" fields don't exist
  572. if recv.get("serveChainSince", &p.chainSince) != nil {
  573. p.onlyAnnounce = true
  574. }
  575. if recv.get("serveRecentChain", &p.chainRecent) != nil {
  576. p.chainRecent = 0
  577. }
  578. if recv.get("serveStateSince", &p.stateSince) != nil {
  579. p.onlyAnnounce = true
  580. }
  581. if recv.get("serveRecentState", &p.stateRecent) != nil {
  582. p.stateRecent = 0
  583. }
  584. if recv.get("txRelay", nil) != nil {
  585. p.onlyAnnounce = true
  586. }
  587. if p.onlyAnnounce && !p.trusted {
  588. return errResp(ErrUselessPeer, "peer cannot serve requests")
  589. }
  590. var sParams flowcontrol.ServerParams
  591. if err := recv.get("flowControl/BL", &sParams.BufLimit); err != nil {
  592. return err
  593. }
  594. if err := recv.get("flowControl/MRR", &sParams.MinRecharge); err != nil {
  595. return err
  596. }
  597. var MRC RequestCostList
  598. if err := recv.get("flowControl/MRC", &MRC); err != nil {
  599. return err
  600. }
  601. p.fcParams = sParams
  602. p.fcServer = flowcontrol.NewServerNode(sParams, &mclock.System{})
  603. p.fcCosts = MRC.decode(ProtocolLengths[uint(p.version)])
  604. recv.get("checkpoint/value", &p.checkpoint)
  605. recv.get("checkpoint/registerHeight", &p.checkpointNumber)
  606. if !p.onlyAnnounce {
  607. for msgCode := range reqAvgTimeCost {
  608. if p.fcCosts[msgCode] == nil {
  609. return errResp(ErrUselessPeer, "peer does not support message %d", msgCode)
  610. }
  611. }
  612. }
  613. }
  614. p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
  615. return nil
  616. }
  617. // updateFlowControl updates the flow control parameters belonging to the server
  618. // node if the announced key/value set contains relevant fields
  619. func (p *peer) updateFlowControl(update keyValueMap) {
  620. if p.fcServer == nil {
  621. return
  622. }
  623. params := p.fcParams
  624. updateParams := false
  625. if update.get("flowControl/BL", &params.BufLimit) == nil {
  626. updateParams = true
  627. }
  628. if update.get("flowControl/MRR", &params.MinRecharge) == nil {
  629. updateParams = true
  630. }
  631. if updateParams {
  632. p.fcParams = params
  633. p.fcServer.UpdateParams(params)
  634. }
  635. var MRC RequestCostList
  636. if update.get("flowControl/MRC", &MRC) == nil {
  637. costUpdate := MRC.decode(ProtocolLengths[uint(p.version)])
  638. for code, cost := range costUpdate {
  639. p.fcCosts[code] = cost
  640. }
  641. }
  642. }
  643. // String implements fmt.Stringer.
  644. func (p *peer) String() string {
  645. return fmt.Sprintf("Peer %s [%s]", p.id,
  646. fmt.Sprintf("les/%d", p.version),
  647. )
  648. }
  649. // peerSetNotify is a callback interface to notify services about added or
  650. // removed peers
  651. type peerSetNotify interface {
  652. registerPeer(*peer)
  653. unregisterPeer(*peer)
  654. }
  655. // peerSet represents the collection of active peers currently participating in
  656. // the Light Ethereum sub-protocol.
  657. type peerSet struct {
  658. peers map[string]*peer
  659. lock sync.RWMutex
  660. notifyList []peerSetNotify
  661. closed bool
  662. }
  663. // newPeerSet creates a new peer set to track the active participants.
  664. func newPeerSet() *peerSet {
  665. return &peerSet{
  666. peers: make(map[string]*peer),
  667. }
  668. }
  669. // notify adds a service to be notified about added or removed peers
  670. func (ps *peerSet) notify(n peerSetNotify) {
  671. ps.lock.Lock()
  672. ps.notifyList = append(ps.notifyList, n)
  673. peers := make([]*peer, 0, len(ps.peers))
  674. for _, p := range ps.peers {
  675. peers = append(peers, p)
  676. }
  677. ps.lock.Unlock()
  678. for _, p := range peers {
  679. n.registerPeer(p)
  680. }
  681. }
  682. // Register injects a new peer into the working set, or returns an error if the
  683. // peer is already known.
  684. func (ps *peerSet) Register(p *peer) error {
  685. ps.lock.Lock()
  686. if ps.closed {
  687. ps.lock.Unlock()
  688. return errClosed
  689. }
  690. if _, ok := ps.peers[p.id]; ok {
  691. ps.lock.Unlock()
  692. return errAlreadyRegistered
  693. }
  694. ps.peers[p.id] = p
  695. p.sendQueue = newExecQueue(100)
  696. peers := make([]peerSetNotify, len(ps.notifyList))
  697. copy(peers, ps.notifyList)
  698. ps.lock.Unlock()
  699. for _, n := range peers {
  700. n.registerPeer(p)
  701. }
  702. return nil
  703. }
  704. // Unregister removes a remote peer from the active set, disabling any further
  705. // actions to/from that particular entity. It also initiates disconnection at the networking layer.
  706. func (ps *peerSet) Unregister(id string) error {
  707. ps.lock.Lock()
  708. if p, ok := ps.peers[id]; !ok {
  709. ps.lock.Unlock()
  710. return errNotRegistered
  711. } else {
  712. delete(ps.peers, id)
  713. peers := make([]peerSetNotify, len(ps.notifyList))
  714. copy(peers, ps.notifyList)
  715. ps.lock.Unlock()
  716. for _, n := range peers {
  717. n.unregisterPeer(p)
  718. }
  719. p.sendQueue.quit()
  720. p.Peer.Disconnect(p2p.DiscUselessPeer)
  721. return nil
  722. }
  723. }
  724. // AllPeerIDs returns a list of all registered peer IDs
  725. func (ps *peerSet) AllPeerIDs() []string {
  726. ps.lock.RLock()
  727. defer ps.lock.RUnlock()
  728. res := make([]string, len(ps.peers))
  729. idx := 0
  730. for id := range ps.peers {
  731. res[idx] = id
  732. idx++
  733. }
  734. return res
  735. }
  736. // Peer retrieves the registered peer with the given id.
  737. func (ps *peerSet) Peer(id string) *peer {
  738. ps.lock.RLock()
  739. defer ps.lock.RUnlock()
  740. return ps.peers[id]
  741. }
  742. // Len returns if the current number of peers in the set.
  743. func (ps *peerSet) Len() int {
  744. ps.lock.RLock()
  745. defer ps.lock.RUnlock()
  746. return len(ps.peers)
  747. }
  748. // BestPeer retrieves the known peer with the currently highest total difficulty.
  749. func (ps *peerSet) BestPeer() *peer {
  750. ps.lock.RLock()
  751. defer ps.lock.RUnlock()
  752. var (
  753. bestPeer *peer
  754. bestTd *big.Int
  755. )
  756. for _, p := range ps.peers {
  757. if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  758. bestPeer, bestTd = p, td
  759. }
  760. }
  761. return bestPeer
  762. }
  763. // AllPeers returns all peers in a list
  764. func (ps *peerSet) AllPeers() []*peer {
  765. ps.lock.RLock()
  766. defer ps.lock.RUnlock()
  767. list := make([]*peer, len(ps.peers))
  768. i := 0
  769. for _, peer := range ps.peers {
  770. list[i] = peer
  771. i++
  772. }
  773. return list
  774. }
  775. // Close disconnects all peers.
  776. // No new peers can be registered after Close has returned.
  777. func (ps *peerSet) Close() {
  778. ps.lock.Lock()
  779. defer ps.lock.Unlock()
  780. for _, p := range ps.peers {
  781. p.Disconnect(p2p.DiscQuitting)
  782. }
  783. ps.closed = true
  784. }