peer.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "errors"
  20. "fmt"
  21. "math/big"
  22. "sync"
  23. "github.com/ethereum/go-ethereum/common"
  24. "github.com/ethereum/go-ethereum/core/types"
  25. "github.com/ethereum/go-ethereum/eth"
  26. "github.com/ethereum/go-ethereum/les/flowcontrol"
  27. "github.com/ethereum/go-ethereum/logger"
  28. "github.com/ethereum/go-ethereum/logger/glog"
  29. "github.com/ethereum/go-ethereum/p2p"
  30. "github.com/ethereum/go-ethereum/rlp"
  31. )
  32. var (
  33. errClosed = errors.New("peer set is closed")
  34. errAlreadyRegistered = errors.New("peer is already registered")
  35. errNotRegistered = errors.New("peer is not registered")
  36. )
  37. const maxHeadInfoLen = 20
  38. type peer struct {
  39. *p2p.Peer
  40. rw p2p.MsgReadWriter
  41. version int // Protocol version negotiated
  42. network int // Network ID being on
  43. id string
  44. headInfo *announceData
  45. lock sync.RWMutex
  46. announceChn chan announceData
  47. poolEntry *poolEntry
  48. hasBlock func(common.Hash, uint64) bool
  49. fcClient *flowcontrol.ClientNode // nil if the peer is server only
  50. fcServer *flowcontrol.ServerNode // nil if the peer is client only
  51. fcServerParams *flowcontrol.ServerParams
  52. fcCosts requestCostTable
  53. }
  54. func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
  55. id := p.ID()
  56. return &peer{
  57. Peer: p,
  58. rw: rw,
  59. version: version,
  60. network: network,
  61. id: fmt.Sprintf("%x", id[:8]),
  62. announceChn: make(chan announceData, 20),
  63. }
  64. }
  65. // Info gathers and returns a collection of metadata known about a peer.
  66. func (p *peer) Info() *eth.PeerInfo {
  67. return &eth.PeerInfo{
  68. Version: p.version,
  69. Difficulty: p.Td(),
  70. Head: fmt.Sprintf("%x", p.Head()),
  71. }
  72. }
  73. // Head retrieves a copy of the current head (most recent) hash of the peer.
  74. func (p *peer) Head() (hash common.Hash) {
  75. p.lock.RLock()
  76. defer p.lock.RUnlock()
  77. copy(hash[:], p.headInfo.Hash[:])
  78. return hash
  79. }
  80. func (p *peer) HeadAndTd() (hash common.Hash, td *big.Int) {
  81. p.lock.RLock()
  82. defer p.lock.RUnlock()
  83. copy(hash[:], p.headInfo.Hash[:])
  84. return hash, p.headInfo.Td
  85. }
  86. func (p *peer) headBlockInfo() blockInfo {
  87. p.lock.RLock()
  88. defer p.lock.RUnlock()
  89. return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}
  90. }
  91. // Td retrieves the current total difficulty of a peer.
  92. func (p *peer) Td() *big.Int {
  93. p.lock.RLock()
  94. defer p.lock.RUnlock()
  95. return new(big.Int).Set(p.headInfo.Td)
  96. }
  97. func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
  98. type req struct {
  99. ReqID uint64
  100. Data interface{}
  101. }
  102. return p2p.Send(w, msgcode, req{reqID, data})
  103. }
  104. func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{}) error {
  105. type resp struct {
  106. ReqID, BV uint64
  107. Data interface{}
  108. }
  109. return p2p.Send(w, msgcode, resp{reqID, bv, data})
  110. }
  111. func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
  112. p.lock.RLock()
  113. defer p.lock.RUnlock()
  114. cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
  115. if cost > p.fcServerParams.BufLimit {
  116. cost = p.fcServerParams.BufLimit
  117. }
  118. return cost
  119. }
  120. // HasBlock checks if the peer has a given block
  121. func (p *peer) HasBlock(hash common.Hash, number uint64) bool {
  122. p.lock.RLock()
  123. hashBlock := p.hasBlock
  124. p.lock.RUnlock()
  125. return hashBlock != nil && hashBlock(hash, number)
  126. }
  127. // SendAnnounce announces the availability of a number of blocks through
  128. // a hash notification.
  129. func (p *peer) SendAnnounce(request announceData) error {
  130. return p2p.Send(p.rw, AnnounceMsg, request)
  131. }
  132. // SendBlockHeaders sends a batch of block headers to the remote peer.
  133. func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error {
  134. return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers)
  135. }
  136. // SendBlockBodiesRLP sends a batch of block contents to the remote peer from
  137. // an already RLP encoded format.
  138. func (p *peer) SendBlockBodiesRLP(reqID, bv uint64, bodies []rlp.RawValue) error {
  139. return sendResponse(p.rw, BlockBodiesMsg, reqID, bv, bodies)
  140. }
  141. // SendCodeRLP sends a batch of arbitrary internal data, corresponding to the
  142. // hashes requested.
  143. func (p *peer) SendCode(reqID, bv uint64, data [][]byte) error {
  144. return sendResponse(p.rw, CodeMsg, reqID, bv, data)
  145. }
  146. // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
  147. // ones requested from an already RLP encoded format.
  148. func (p *peer) SendReceiptsRLP(reqID, bv uint64, receipts []rlp.RawValue) error {
  149. return sendResponse(p.rw, ReceiptsMsg, reqID, bv, receipts)
  150. }
  151. // SendProofs sends a batch of merkle proofs, corresponding to the ones requested.
  152. func (p *peer) SendProofs(reqID, bv uint64, proofs proofsData) error {
  153. return sendResponse(p.rw, ProofsMsg, reqID, bv, proofs)
  154. }
  155. // SendHeaderProofs sends a batch of header proofs, corresponding to the ones requested.
  156. func (p *peer) SendHeaderProofs(reqID, bv uint64, proofs []ChtResp) error {
  157. return sendResponse(p.rw, HeaderProofsMsg, reqID, bv, proofs)
  158. }
  159. // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
  160. // specified header query, based on the hash of an origin block.
  161. func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error {
  162. glog.V(logger.Debug).Infof("%v fetching %d headers from %x, skipping %d (reverse = %v)", p, amount, origin[:4], skip, reverse)
  163. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  164. }
  165. // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
  166. // specified header query, based on the number of an origin block.
  167. func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error {
  168. glog.V(logger.Debug).Infof("%v fetching %d headers from #%d, skipping %d (reverse = %v)", p, amount, origin, skip, reverse)
  169. return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  170. }
  171. // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
  172. // specified.
  173. func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error {
  174. glog.V(logger.Debug).Infof("%v fetching %d block bodies", p, len(hashes))
  175. return sendRequest(p.rw, GetBlockBodiesMsg, reqID, cost, hashes)
  176. }
  177. // RequestCode fetches a batch of arbitrary data from a node's known state
  178. // data, corresponding to the specified hashes.
  179. func (p *peer) RequestCode(reqID, cost uint64, reqs []*CodeReq) error {
  180. glog.V(logger.Debug).Infof("%v fetching %v state data", p, len(reqs))
  181. return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs)
  182. }
  183. // RequestReceipts fetches a batch of transaction receipts from a remote node.
  184. func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error {
  185. glog.V(logger.Debug).Infof("%v fetching %v receipts", p, len(hashes))
  186. return sendRequest(p.rw, GetReceiptsMsg, reqID, cost, hashes)
  187. }
  188. // RequestProofs fetches a batch of merkle proofs from a remote node.
  189. func (p *peer) RequestProofs(reqID, cost uint64, reqs []*ProofReq) error {
  190. glog.V(logger.Debug).Infof("%v fetching %v proofs", p, len(reqs))
  191. return sendRequest(p.rw, GetProofsMsg, reqID, cost, reqs)
  192. }
  193. // RequestHeaderProofs fetches a batch of header merkle proofs from a remote node.
  194. func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error {
  195. glog.V(logger.Debug).Infof("%v fetching %v header proofs", p, len(reqs))
  196. return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
  197. }
  198. func (p *peer) SendTxs(cost uint64, txs types.Transactions) error {
  199. glog.V(logger.Debug).Infof("%v relaying %v txs", p, len(txs))
  200. reqID := getNextReqID()
  201. p.fcServer.MustAssignRequest(reqID)
  202. p.fcServer.SendRequest(reqID, cost)
  203. return p2p.Send(p.rw, SendTxMsg, txs)
  204. }
  205. type keyValueEntry struct {
  206. Key string
  207. Value rlp.RawValue
  208. }
  209. type keyValueList []keyValueEntry
  210. type keyValueMap map[string]rlp.RawValue
  211. func (l keyValueList) add(key string, val interface{}) keyValueList {
  212. var entry keyValueEntry
  213. entry.Key = key
  214. if val == nil {
  215. val = uint64(0)
  216. }
  217. enc, err := rlp.EncodeToBytes(val)
  218. if err == nil {
  219. entry.Value = enc
  220. }
  221. return append(l, entry)
  222. }
  223. func (l keyValueList) decode() keyValueMap {
  224. m := make(keyValueMap)
  225. for _, entry := range l {
  226. m[entry.Key] = entry.Value
  227. }
  228. return m
  229. }
  230. func (m keyValueMap) get(key string, val interface{}) error {
  231. enc, ok := m[key]
  232. if !ok {
  233. return errResp(ErrHandshakeMissingKey, "%s", key)
  234. }
  235. if val == nil {
  236. return nil
  237. }
  238. return rlp.DecodeBytes(enc, val)
  239. }
  240. func (p *peer) sendReceiveHandshake(sendList keyValueList) (keyValueList, error) {
  241. // Send out own handshake in a new thread
  242. errc := make(chan error, 1)
  243. go func() {
  244. errc <- p2p.Send(p.rw, StatusMsg, sendList)
  245. }()
  246. // In the mean time retrieve the remote status message
  247. msg, err := p.rw.ReadMsg()
  248. if err != nil {
  249. return nil, err
  250. }
  251. if msg.Code != StatusMsg {
  252. return nil, errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
  253. }
  254. if msg.Size > ProtocolMaxMsgSize {
  255. return nil, errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  256. }
  257. // Decode the handshake
  258. var recvList keyValueList
  259. if err := msg.Decode(&recvList); err != nil {
  260. return nil, errResp(ErrDecode, "msg %v: %v", msg, err)
  261. }
  262. if err := <-errc; err != nil {
  263. return nil, err
  264. }
  265. return recvList, nil
  266. }
  267. // Handshake executes the les protocol handshake, negotiating version number,
  268. // network IDs, difficulties, head and genesis blocks.
  269. func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error {
  270. p.lock.Lock()
  271. defer p.lock.Unlock()
  272. var send keyValueList
  273. send = send.add("protocolVersion", uint64(p.version))
  274. send = send.add("networkId", uint64(p.network))
  275. send = send.add("headTd", td)
  276. send = send.add("headHash", head)
  277. send = send.add("headNum", headNum)
  278. send = send.add("genesisHash", genesis)
  279. if server != nil {
  280. send = send.add("serveHeaders", nil)
  281. send = send.add("serveChainSince", uint64(0))
  282. send = send.add("serveStateSince", uint64(0))
  283. send = send.add("txRelay", nil)
  284. send = send.add("flowControl/BL", server.defParams.BufLimit)
  285. send = send.add("flowControl/MRR", server.defParams.MinRecharge)
  286. list := server.fcCostStats.getCurrentList()
  287. send = send.add("flowControl/MRC", list)
  288. p.fcCosts = list.decode()
  289. }
  290. recvList, err := p.sendReceiveHandshake(send)
  291. if err != nil {
  292. return err
  293. }
  294. recv := recvList.decode()
  295. var rGenesis, rHash common.Hash
  296. var rVersion, rNetwork, rNum uint64
  297. var rTd *big.Int
  298. if err := recv.get("protocolVersion", &rVersion); err != nil {
  299. return err
  300. }
  301. if err := recv.get("networkId", &rNetwork); err != nil {
  302. return err
  303. }
  304. if err := recv.get("headTd", &rTd); err != nil {
  305. return err
  306. }
  307. if err := recv.get("headHash", &rHash); err != nil {
  308. return err
  309. }
  310. if err := recv.get("headNum", &rNum); err != nil {
  311. return err
  312. }
  313. if err := recv.get("genesisHash", &rGenesis); err != nil {
  314. return err
  315. }
  316. if rGenesis != genesis {
  317. return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", rGenesis, genesis)
  318. }
  319. if int(rNetwork) != p.network {
  320. return errResp(ErrNetworkIdMismatch, "%d (!= %d)", rNetwork, p.network)
  321. }
  322. if int(rVersion) != p.version {
  323. return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version)
  324. }
  325. if server != nil {
  326. if recv.get("serveStateSince", nil) == nil {
  327. return errResp(ErrUselessPeer, "wanted client, got server")
  328. }
  329. p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams)
  330. } else {
  331. if recv.get("serveChainSince", nil) != nil {
  332. return errResp(ErrUselessPeer, "peer cannot serve chain")
  333. }
  334. if recv.get("serveStateSince", nil) != nil {
  335. return errResp(ErrUselessPeer, "peer cannot serve state")
  336. }
  337. if recv.get("txRelay", nil) != nil {
  338. return errResp(ErrUselessPeer, "peer cannot relay transactions")
  339. }
  340. params := &flowcontrol.ServerParams{}
  341. if err := recv.get("flowControl/BL", &params.BufLimit); err != nil {
  342. return err
  343. }
  344. if err := recv.get("flowControl/MRR", &params.MinRecharge); err != nil {
  345. return err
  346. }
  347. var MRC RequestCostList
  348. if err := recv.get("flowControl/MRC", &MRC); err != nil {
  349. return err
  350. }
  351. p.fcServerParams = params
  352. p.fcServer = flowcontrol.NewServerNode(params)
  353. p.fcCosts = MRC.decode()
  354. }
  355. p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
  356. return nil
  357. }
  358. // String implements fmt.Stringer.
  359. func (p *peer) String() string {
  360. return fmt.Sprintf("Peer %s [%s]", p.id,
  361. fmt.Sprintf("les/%d", p.version),
  362. )
  363. }
  364. // peerSet represents the collection of active peers currently participating in
  365. // the Light Ethereum sub-protocol.
  366. type peerSet struct {
  367. peers map[string]*peer
  368. lock sync.RWMutex
  369. closed bool
  370. }
  371. // newPeerSet creates a new peer set to track the active participants.
  372. func newPeerSet() *peerSet {
  373. return &peerSet{
  374. peers: make(map[string]*peer),
  375. }
  376. }
  377. // Register injects a new peer into the working set, or returns an error if the
  378. // peer is already known.
  379. func (ps *peerSet) Register(p *peer) error {
  380. ps.lock.Lock()
  381. defer ps.lock.Unlock()
  382. if ps.closed {
  383. return errClosed
  384. }
  385. if _, ok := ps.peers[p.id]; ok {
  386. return errAlreadyRegistered
  387. }
  388. ps.peers[p.id] = p
  389. return nil
  390. }
  391. // Unregister removes a remote peer from the active set, disabling any further
  392. // actions to/from that particular entity.
  393. func (ps *peerSet) Unregister(id string) error {
  394. ps.lock.Lock()
  395. defer ps.lock.Unlock()
  396. if _, ok := ps.peers[id]; !ok {
  397. return errNotRegistered
  398. }
  399. delete(ps.peers, id)
  400. return nil
  401. }
  402. // AllPeerIDs returns a list of all registered peer IDs
  403. func (ps *peerSet) AllPeerIDs() []string {
  404. ps.lock.RLock()
  405. defer ps.lock.RUnlock()
  406. res := make([]string, len(ps.peers))
  407. idx := 0
  408. for id := range ps.peers {
  409. res[idx] = id
  410. idx++
  411. }
  412. return res
  413. }
  414. // Peer retrieves the registered peer with the given id.
  415. func (ps *peerSet) Peer(id string) *peer {
  416. ps.lock.RLock()
  417. defer ps.lock.RUnlock()
  418. return ps.peers[id]
  419. }
  420. // Len returns if the current number of peers in the set.
  421. func (ps *peerSet) Len() int {
  422. ps.lock.RLock()
  423. defer ps.lock.RUnlock()
  424. return len(ps.peers)
  425. }
  426. // BestPeer retrieves the known peer with the currently highest total difficulty.
  427. func (ps *peerSet) BestPeer() *peer {
  428. ps.lock.RLock()
  429. defer ps.lock.RUnlock()
  430. var (
  431. bestPeer *peer
  432. bestTd *big.Int
  433. )
  434. for _, p := range ps.peers {
  435. if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  436. bestPeer, bestTd = p, td
  437. }
  438. }
  439. return bestPeer
  440. }
  441. // AllPeers returns all peers in a list
  442. func (ps *peerSet) AllPeers() []*peer {
  443. ps.lock.RLock()
  444. defer ps.lock.RUnlock()
  445. list := make([]*peer, len(ps.peers))
  446. i := 0
  447. for _, peer := range ps.peers {
  448. list[i] = peer
  449. i++
  450. }
  451. return list
  452. }
  453. // Close disconnects all peers.
  454. // No new peers can be registered after Close has returned.
  455. func (ps *peerSet) Close() {
  456. ps.lock.Lock()
  457. defer ps.lock.Unlock()
  458. for _, p := range ps.peers {
  459. p.Disconnect(p2p.DiscQuitting)
  460. }
  461. ps.closed = true
  462. }