peer.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "errors"
  19. "fmt"
  20. "math/big"
  21. "sync"
  22. "time"
  23. mapset "github.com/deckarep/golang-set"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/core/forkid"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/p2p"
  28. "github.com/ethereum/go-ethereum/rlp"
  29. )
  30. var (
  31. errClosed = errors.New("peer set is closed")
  32. errAlreadyRegistered = errors.New("peer is already registered")
  33. errNotRegistered = errors.New("peer is not registered")
  34. )
  35. const (
  36. maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
  37. maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
  38. // maxQueuedTxs is the maximum number of transactions to queue up before dropping
  39. // older broadcasts.
  40. maxQueuedTxs = 4096
  41. // maxQueuedTxAnns is the maximum number of transaction announcements to queue up
  42. // before dropping older announcements.
  43. maxQueuedTxAnns = 4096
  44. // maxQueuedBlocks is the maximum number of block propagations to queue up before
  45. // dropping broadcasts. There's not much point in queueing stale blocks, so a few
  46. // that might cover uncles should be enough.
  47. maxQueuedBlocks = 4
  48. // maxQueuedBlockAnns is the maximum number of block announcements to queue up before
  49. // dropping broadcasts. Similarly to block propagations, there's no point to queue
  50. // above some healthy uncle limit, so use that.
  51. maxQueuedBlockAnns = 4
  52. handshakeTimeout = 5 * time.Second
  53. )
  54. // max is a helper function which returns the larger of the two given integers.
  55. func max(a, b int) int {
  56. if a > b {
  57. return a
  58. }
  59. return b
  60. }
  61. // PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
  62. // about a connected peer.
  63. type PeerInfo struct {
  64. Version int `json:"version"` // Ethereum protocol version negotiated
  65. Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain
  66. Head string `json:"head"` // SHA3 hash of the peer's best owned block
  67. }
  68. // propEvent is a block propagation, waiting for its turn in the broadcast queue.
  69. type propEvent struct {
  70. block *types.Block
  71. td *big.Int
  72. }
  73. type peer struct {
  74. id string
  75. *p2p.Peer
  76. rw p2p.MsgReadWriter
  77. version int // Protocol version negotiated
  78. syncDrop *time.Timer // Timed connection dropper if sync progress isn't validated in time
  79. head common.Hash
  80. td *big.Int
  81. lock sync.RWMutex
  82. knownBlocks mapset.Set // Set of block hashes known to be known by this peer
  83. queuedBlocks chan *propEvent // Queue of blocks to broadcast to the peer
  84. queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer
  85. knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
  86. txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
  87. txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests
  88. getPooledTx func(common.Hash) *types.Transaction // Callback used to retrieve transaction from txpool
  89. term chan struct{} // Termination channel to stop the broadcaster
  90. }
  91. func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(hash common.Hash) *types.Transaction) *peer {
  92. return &peer{
  93. Peer: p,
  94. rw: rw,
  95. version: version,
  96. id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
  97. knownTxs: mapset.NewSet(),
  98. knownBlocks: mapset.NewSet(),
  99. queuedBlocks: make(chan *propEvent, maxQueuedBlocks),
  100. queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
  101. txBroadcast: make(chan []common.Hash),
  102. txAnnounce: make(chan []common.Hash),
  103. getPooledTx: getPooledTx,
  104. term: make(chan struct{}),
  105. }
  106. }
  107. // broadcastBlocks is a write loop that multiplexes blocks and block accouncements
  108. // to the remote peer. The goal is to have an async writer that does not lock up
  109. // node internals and at the same time rate limits queued data.
  110. func (p *peer) broadcastBlocks() {
  111. for {
  112. select {
  113. case prop := <-p.queuedBlocks:
  114. if err := p.SendNewBlock(prop.block, prop.td); err != nil {
  115. return
  116. }
  117. p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
  118. case block := <-p.queuedBlockAnns:
  119. if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
  120. return
  121. }
  122. p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
  123. case <-p.term:
  124. return
  125. }
  126. }
  127. }
  128. // broadcastTransactions is a write loop that schedules transaction broadcasts
  129. // to the remote peer. The goal is to have an async writer that does not lock up
  130. // node internals and at the same time rate limits queued data.
  131. func (p *peer) broadcastTransactions() {
  132. var (
  133. queue []common.Hash // Queue of hashes to broadcast as full transactions
  134. done chan struct{} // Non-nil if background broadcaster is running
  135. fail = make(chan error, 1) // Channel used to receive network error
  136. )
  137. for {
  138. // If there's no in-flight broadcast running, check if a new one is needed
  139. if done == nil && len(queue) > 0 {
  140. // Pile transaction until we reach our allowed network limit
  141. var (
  142. hashes []common.Hash
  143. txs []*types.Transaction
  144. size common.StorageSize
  145. )
  146. for i := 0; i < len(queue) && size < txsyncPackSize; i++ {
  147. if tx := p.getPooledTx(queue[i]); tx != nil {
  148. txs = append(txs, tx)
  149. size += tx.Size()
  150. }
  151. hashes = append(hashes, queue[i])
  152. }
  153. queue = queue[:copy(queue, queue[len(hashes):])]
  154. // If there's anything available to transfer, fire up an async writer
  155. if len(txs) > 0 {
  156. done = make(chan struct{})
  157. go func() {
  158. if err := p.sendTransactions(txs); err != nil {
  159. fail <- err
  160. return
  161. }
  162. close(done)
  163. p.Log().Trace("Sent transactions", "count", len(txs))
  164. }()
  165. }
  166. }
  167. // Transfer goroutine may or may not have been started, listen for events
  168. select {
  169. case hashes := <-p.txBroadcast:
  170. // New batch of transactions to be broadcast, queue them (with cap)
  171. queue = append(queue, hashes...)
  172. if len(queue) > maxQueuedTxs {
  173. // Fancy copy and resize to ensure buffer doesn't grow indefinitely
  174. queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
  175. }
  176. case <-done:
  177. done = nil
  178. case <-fail:
  179. return
  180. case <-p.term:
  181. return
  182. }
  183. }
  184. }
  185. // announceTransactions is a write loop that schedules transaction broadcasts
  186. // to the remote peer. The goal is to have an async writer that does not lock up
  187. // node internals and at the same time rate limits queued data.
  188. func (p *peer) announceTransactions() {
  189. var (
  190. queue []common.Hash // Queue of hashes to announce as transaction stubs
  191. done chan struct{} // Non-nil if background announcer is running
  192. fail = make(chan error, 1) // Channel used to receive network error
  193. )
  194. for {
  195. // If there's no in-flight announce running, check if a new one is needed
  196. if done == nil && len(queue) > 0 {
  197. // Pile transaction hashes until we reach our allowed network limit
  198. var (
  199. hashes []common.Hash
  200. pending []common.Hash
  201. size common.StorageSize
  202. )
  203. for i := 0; i < len(queue) && size < txsyncPackSize; i++ {
  204. if p.getPooledTx(queue[i]) != nil {
  205. pending = append(pending, queue[i])
  206. size += common.HashLength
  207. }
  208. hashes = append(hashes, queue[i])
  209. }
  210. queue = queue[:copy(queue, queue[len(hashes):])]
  211. // If there's anything available to transfer, fire up an async writer
  212. if len(pending) > 0 {
  213. done = make(chan struct{})
  214. go func() {
  215. if err := p.sendPooledTransactionHashes(pending); err != nil {
  216. fail <- err
  217. return
  218. }
  219. close(done)
  220. p.Log().Trace("Sent transaction announcements", "count", len(pending))
  221. }()
  222. }
  223. }
  224. // Transfer goroutine may or may not have been started, listen for events
  225. select {
  226. case hashes := <-p.txAnnounce:
  227. // New batch of transactions to be broadcast, queue them (with cap)
  228. queue = append(queue, hashes...)
  229. if len(queue) > maxQueuedTxAnns {
  230. // Fancy copy and resize to ensure buffer doesn't grow indefinitely
  231. queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
  232. }
  233. case <-done:
  234. done = nil
  235. case <-fail:
  236. return
  237. case <-p.term:
  238. return
  239. }
  240. }
  241. }
  242. // close signals the broadcast goroutine to terminate.
  243. func (p *peer) close() {
  244. close(p.term)
  245. }
  246. // Info gathers and returns a collection of metadata known about a peer.
  247. func (p *peer) Info() *PeerInfo {
  248. hash, td := p.Head()
  249. return &PeerInfo{
  250. Version: p.version,
  251. Difficulty: td,
  252. Head: hash.Hex(),
  253. }
  254. }
  255. // Head retrieves a copy of the current head hash and total difficulty of the
  256. // peer.
  257. func (p *peer) Head() (hash common.Hash, td *big.Int) {
  258. p.lock.RLock()
  259. defer p.lock.RUnlock()
  260. copy(hash[:], p.head[:])
  261. return hash, new(big.Int).Set(p.td)
  262. }
  263. // SetHead updates the head hash and total difficulty of the peer.
  264. func (p *peer) SetHead(hash common.Hash, td *big.Int) {
  265. p.lock.Lock()
  266. defer p.lock.Unlock()
  267. copy(p.head[:], hash[:])
  268. p.td.Set(td)
  269. }
  270. // MarkBlock marks a block as known for the peer, ensuring that the block will
  271. // never be propagated to this particular peer.
  272. func (p *peer) MarkBlock(hash common.Hash) {
  273. // If we reached the memory allowance, drop a previously known block hash
  274. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  275. p.knownBlocks.Pop()
  276. }
  277. p.knownBlocks.Add(hash)
  278. }
  279. // MarkTransaction marks a transaction as known for the peer, ensuring that it
  280. // will never be propagated to this particular peer.
  281. func (p *peer) MarkTransaction(hash common.Hash) {
  282. // If we reached the memory allowance, drop a previously known transaction hash
  283. for p.knownTxs.Cardinality() >= maxKnownTxs {
  284. p.knownTxs.Pop()
  285. }
  286. p.knownTxs.Add(hash)
  287. }
  288. // SendTransactions64 sends transactions to the peer and includes the hashes
  289. // in its transaction hash set for future reference.
  290. //
  291. // This method is legacy support for initial transaction exchange in eth/64 and
  292. // prior. For eth/65 and higher use SendPooledTransactionHashes.
  293. func (p *peer) SendTransactions64(txs types.Transactions) error {
  294. return p.sendTransactions(txs)
  295. }
  296. // sendTransactions sends transactions to the peer and includes the hashes
  297. // in its transaction hash set for future reference.
  298. //
  299. // This method is a helper used by the async transaction sender. Don't call it
  300. // directly as the queueing (memory) and transmission (bandwidth) costs should
  301. // not be managed directly.
  302. func (p *peer) sendTransactions(txs types.Transactions) error {
  303. // Mark all the transactions as known, but ensure we don't overflow our limits
  304. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(txs)) {
  305. p.knownTxs.Pop()
  306. }
  307. for _, tx := range txs {
  308. p.knownTxs.Add(tx.Hash())
  309. }
  310. return p2p.Send(p.rw, TransactionMsg, txs)
  311. }
  312. // AsyncSendTransactions queues a list of transactions (by hash) to eventually
  313. // propagate to a remote peer. The number of pending sends are capped (new ones
  314. // will force old sends to be dropped)
  315. func (p *peer) AsyncSendTransactions(hashes []common.Hash) {
  316. select {
  317. case p.txBroadcast <- hashes:
  318. // Mark all the transactions as known, but ensure we don't overflow our limits
  319. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  320. p.knownTxs.Pop()
  321. }
  322. for _, hash := range hashes {
  323. p.knownTxs.Add(hash)
  324. }
  325. case <-p.term:
  326. p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
  327. }
  328. }
  329. // sendPooledTransactionHashes sends transaction hashes to the peer and includes
  330. // them in its transaction hash set for future reference.
  331. //
  332. // This method is a helper used by the async transaction announcer. Don't call it
  333. // directly as the queueing (memory) and transmission (bandwidth) costs should
  334. // not be managed directly.
  335. func (p *peer) sendPooledTransactionHashes(hashes []common.Hash) error {
  336. // Mark all the transactions as known, but ensure we don't overflow our limits
  337. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  338. p.knownTxs.Pop()
  339. }
  340. for _, hash := range hashes {
  341. p.knownTxs.Add(hash)
  342. }
  343. return p2p.Send(p.rw, NewPooledTransactionHashesMsg, hashes)
  344. }
  345. // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
  346. // announce to a remote peer. The number of pending sends are capped (new ones
  347. // will force old sends to be dropped)
  348. func (p *peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
  349. select {
  350. case p.txAnnounce <- hashes:
  351. // Mark all the transactions as known, but ensure we don't overflow our limits
  352. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  353. p.knownTxs.Pop()
  354. }
  355. for _, hash := range hashes {
  356. p.knownTxs.Add(hash)
  357. }
  358. case <-p.term:
  359. p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
  360. }
  361. }
  362. // SendPooledTransactionsRLP sends requested transactions to the peer and adds the
  363. // hashes in its transaction hash set for future reference.
  364. //
  365. // Note, the method assumes the hashes are correct and correspond to the list of
  366. // transactions being sent.
  367. func (p *peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValue) error {
  368. // Mark all the transactions as known, but ensure we don't overflow our limits
  369. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  370. p.knownTxs.Pop()
  371. }
  372. for _, hash := range hashes {
  373. p.knownTxs.Add(hash)
  374. }
  375. return p2p.Send(p.rw, PooledTransactionsMsg, txs)
  376. }
  377. // SendNewBlockHashes announces the availability of a number of blocks through
  378. // a hash notification.
  379. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
  380. // Mark all the block hashes as known, but ensure we don't overflow our limits
  381. for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) {
  382. p.knownBlocks.Pop()
  383. }
  384. for _, hash := range hashes {
  385. p.knownBlocks.Add(hash)
  386. }
  387. request := make(newBlockHashesData, len(hashes))
  388. for i := 0; i < len(hashes); i++ {
  389. request[i].Hash = hashes[i]
  390. request[i].Number = numbers[i]
  391. }
  392. return p2p.Send(p.rw, NewBlockHashesMsg, request)
  393. }
  394. // AsyncSendNewBlockHash queues the availability of a block for propagation to a
  395. // remote peer. If the peer's broadcast queue is full, the event is silently
  396. // dropped.
  397. func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
  398. select {
  399. case p.queuedBlockAnns <- block:
  400. // Mark all the block hash as known, but ensure we don't overflow our limits
  401. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  402. p.knownBlocks.Pop()
  403. }
  404. p.knownBlocks.Add(block.Hash())
  405. default:
  406. p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
  407. }
  408. }
  409. // SendNewBlock propagates an entire block to a remote peer.
  410. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
  411. // Mark all the block hash as known, but ensure we don't overflow our limits
  412. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  413. p.knownBlocks.Pop()
  414. }
  415. p.knownBlocks.Add(block.Hash())
  416. return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
  417. }
  418. // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
  419. // the peer's broadcast queue is full, the event is silently dropped.
  420. func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
  421. select {
  422. case p.queuedBlocks <- &propEvent{block: block, td: td}:
  423. // Mark all the block hash as known, but ensure we don't overflow our limits
  424. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  425. p.knownBlocks.Pop()
  426. }
  427. p.knownBlocks.Add(block.Hash())
  428. default:
  429. p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
  430. }
  431. }
  432. // SendBlockHeaders sends a batch of block headers to the remote peer.
  433. func (p *peer) SendBlockHeaders(headers []*types.Header) error {
  434. return p2p.Send(p.rw, BlockHeadersMsg, headers)
  435. }
  436. // SendBlockBodies sends a batch of block contents to the remote peer.
  437. func (p *peer) SendBlockBodies(bodies []*blockBody) error {
  438. return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies))
  439. }
  440. // SendBlockBodiesRLP sends a batch of block contents to the remote peer from
  441. // an already RLP encoded format.
  442. func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
  443. return p2p.Send(p.rw, BlockBodiesMsg, bodies)
  444. }
  445. // SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
  446. // hashes requested.
  447. func (p *peer) SendNodeData(data [][]byte) error {
  448. return p2p.Send(p.rw, NodeDataMsg, data)
  449. }
  450. // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
  451. // ones requested from an already RLP encoded format.
  452. func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
  453. return p2p.Send(p.rw, ReceiptsMsg, receipts)
  454. }
  455. // RequestOneHeader is a wrapper around the header query functions to fetch a
  456. // single header. It is used solely by the fetcher.
  457. func (p *peer) RequestOneHeader(hash common.Hash) error {
  458. p.Log().Debug("Fetching single header", "hash", hash)
  459. return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
  460. }
  461. // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
  462. // specified header query, based on the hash of an origin block.
  463. func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  464. p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
  465. return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  466. }
  467. // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
  468. // specified header query, based on the number of an origin block.
  469. func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  470. p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
  471. return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
  472. }
  473. // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
  474. // specified.
  475. func (p *peer) RequestBodies(hashes []common.Hash) error {
  476. p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
  477. return p2p.Send(p.rw, GetBlockBodiesMsg, hashes)
  478. }
  479. // RequestNodeData fetches a batch of arbitrary data from a node's known state
  480. // data, corresponding to the specified hashes.
  481. func (p *peer) RequestNodeData(hashes []common.Hash) error {
  482. p.Log().Debug("Fetching batch of state data", "count", len(hashes))
  483. return p2p.Send(p.rw, GetNodeDataMsg, hashes)
  484. }
  485. // RequestReceipts fetches a batch of transaction receipts from a remote node.
  486. func (p *peer) RequestReceipts(hashes []common.Hash) error {
  487. p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
  488. return p2p.Send(p.rw, GetReceiptsMsg, hashes)
  489. }
  490. // RequestTxs fetches a batch of transactions from a remote node.
  491. func (p *peer) RequestTxs(hashes []common.Hash) error {
  492. p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
  493. return p2p.Send(p.rw, GetPooledTransactionsMsg, hashes)
  494. }
  495. // Handshake executes the eth protocol handshake, negotiating version number,
  496. // network IDs, difficulties, head and genesis blocks.
  497. func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error {
  498. // Send out own handshake in a new thread
  499. errc := make(chan error, 2)
  500. var (
  501. status63 statusData63 // safe to read after two values have been received from errc
  502. status statusData // safe to read after two values have been received from errc
  503. )
  504. go func() {
  505. switch {
  506. case p.version == eth63:
  507. errc <- p2p.Send(p.rw, StatusMsg, &statusData63{
  508. ProtocolVersion: uint32(p.version),
  509. NetworkId: network,
  510. TD: td,
  511. CurrentBlock: head,
  512. GenesisBlock: genesis,
  513. })
  514. case p.version >= eth64:
  515. errc <- p2p.Send(p.rw, StatusMsg, &statusData{
  516. ProtocolVersion: uint32(p.version),
  517. NetworkID: network,
  518. TD: td,
  519. Head: head,
  520. Genesis: genesis,
  521. ForkID: forkID,
  522. })
  523. default:
  524. panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
  525. }
  526. }()
  527. go func() {
  528. switch {
  529. case p.version == eth63:
  530. errc <- p.readStatusLegacy(network, &status63, genesis)
  531. case p.version >= eth64:
  532. errc <- p.readStatus(network, &status, genesis, forkFilter)
  533. default:
  534. panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
  535. }
  536. }()
  537. timeout := time.NewTimer(handshakeTimeout)
  538. defer timeout.Stop()
  539. for i := 0; i < 2; i++ {
  540. select {
  541. case err := <-errc:
  542. if err != nil {
  543. return err
  544. }
  545. case <-timeout.C:
  546. return p2p.DiscReadTimeout
  547. }
  548. }
  549. switch {
  550. case p.version == eth63:
  551. p.td, p.head = status63.TD, status63.CurrentBlock
  552. case p.version >= eth64:
  553. p.td, p.head = status.TD, status.Head
  554. default:
  555. panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
  556. }
  557. return nil
  558. }
  559. func (p *peer) readStatusLegacy(network uint64, status *statusData63, genesis common.Hash) error {
  560. msg, err := p.rw.ReadMsg()
  561. if err != nil {
  562. return err
  563. }
  564. if msg.Code != StatusMsg {
  565. return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
  566. }
  567. if msg.Size > protocolMaxMsgSize {
  568. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize)
  569. }
  570. // Decode the handshake and make sure everything matches
  571. if err := msg.Decode(&status); err != nil {
  572. return errResp(ErrDecode, "msg %v: %v", msg, err)
  573. }
  574. if status.GenesisBlock != genesis {
  575. return errResp(ErrGenesisMismatch, "%x (!= %x)", status.GenesisBlock[:8], genesis[:8])
  576. }
  577. if status.NetworkId != network {
  578. return errResp(ErrNetworkIDMismatch, "%d (!= %d)", status.NetworkId, network)
  579. }
  580. if int(status.ProtocolVersion) != p.version {
  581. return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version)
  582. }
  583. return nil
  584. }
  585. func (p *peer) readStatus(network uint64, status *statusData, genesis common.Hash, forkFilter forkid.Filter) error {
  586. msg, err := p.rw.ReadMsg()
  587. if err != nil {
  588. return err
  589. }
  590. if msg.Code != StatusMsg {
  591. return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
  592. }
  593. if msg.Size > protocolMaxMsgSize {
  594. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, protocolMaxMsgSize)
  595. }
  596. // Decode the handshake and make sure everything matches
  597. if err := msg.Decode(&status); err != nil {
  598. return errResp(ErrDecode, "msg %v: %v", msg, err)
  599. }
  600. if status.NetworkID != network {
  601. return errResp(ErrNetworkIDMismatch, "%d (!= %d)", status.NetworkID, network)
  602. }
  603. if int(status.ProtocolVersion) != p.version {
  604. return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version)
  605. }
  606. if status.Genesis != genesis {
  607. return errResp(ErrGenesisMismatch, "%x (!= %x)", status.Genesis, genesis)
  608. }
  609. if err := forkFilter(status.ForkID); err != nil {
  610. return errResp(ErrForkIDRejected, "%v", err)
  611. }
  612. return nil
  613. }
  614. // String implements fmt.Stringer.
  615. func (p *peer) String() string {
  616. return fmt.Sprintf("Peer %s [%s]", p.id,
  617. fmt.Sprintf("eth/%2d", p.version),
  618. )
  619. }
  620. // peerSet represents the collection of active peers currently participating in
  621. // the Ethereum sub-protocol.
  622. type peerSet struct {
  623. peers map[string]*peer
  624. lock sync.RWMutex
  625. closed bool
  626. }
  627. // newPeerSet creates a new peer set to track the active participants.
  628. func newPeerSet() *peerSet {
  629. return &peerSet{
  630. peers: make(map[string]*peer),
  631. }
  632. }
  633. // Register injects a new peer into the working set, or returns an error if the
  634. // peer is already known. If a new peer it registered, its broadcast loop is also
  635. // started.
  636. func (ps *peerSet) Register(p *peer) error {
  637. ps.lock.Lock()
  638. defer ps.lock.Unlock()
  639. if ps.closed {
  640. return errClosed
  641. }
  642. if _, ok := ps.peers[p.id]; ok {
  643. return errAlreadyRegistered
  644. }
  645. ps.peers[p.id] = p
  646. go p.broadcastBlocks()
  647. go p.broadcastTransactions()
  648. if p.version >= eth65 {
  649. go p.announceTransactions()
  650. }
  651. return nil
  652. }
  653. // Unregister removes a remote peer from the active set, disabling any further
  654. // actions to/from that particular entity.
  655. func (ps *peerSet) Unregister(id string) error {
  656. ps.lock.Lock()
  657. defer ps.lock.Unlock()
  658. p, ok := ps.peers[id]
  659. if !ok {
  660. return errNotRegistered
  661. }
  662. delete(ps.peers, id)
  663. p.close()
  664. return nil
  665. }
  666. // Peer retrieves the registered peer with the given id.
  667. func (ps *peerSet) Peer(id string) *peer {
  668. ps.lock.RLock()
  669. defer ps.lock.RUnlock()
  670. return ps.peers[id]
  671. }
  672. // Len returns if the current number of peers in the set.
  673. func (ps *peerSet) Len() int {
  674. ps.lock.RLock()
  675. defer ps.lock.RUnlock()
  676. return len(ps.peers)
  677. }
  678. // PeersWithoutBlock retrieves a list of peers that do not have a given block in
  679. // their set of known hashes.
  680. func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
  681. ps.lock.RLock()
  682. defer ps.lock.RUnlock()
  683. list := make([]*peer, 0, len(ps.peers))
  684. for _, p := range ps.peers {
  685. if !p.knownBlocks.Contains(hash) {
  686. list = append(list, p)
  687. }
  688. }
  689. return list
  690. }
  691. // PeersWithoutTx retrieves a list of peers that do not have a given transaction
  692. // in their set of known hashes.
  693. func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
  694. ps.lock.RLock()
  695. defer ps.lock.RUnlock()
  696. list := make([]*peer, 0, len(ps.peers))
  697. for _, p := range ps.peers {
  698. if !p.knownTxs.Contains(hash) {
  699. list = append(list, p)
  700. }
  701. }
  702. return list
  703. }
  704. // BestPeer retrieves the known peer with the currently highest total difficulty.
  705. func (ps *peerSet) BestPeer() *peer {
  706. ps.lock.RLock()
  707. defer ps.lock.RUnlock()
  708. var (
  709. bestPeer *peer
  710. bestTd *big.Int
  711. )
  712. for _, p := range ps.peers {
  713. if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
  714. bestPeer, bestTd = p, td
  715. }
  716. }
  717. return bestPeer
  718. }
  719. // Close disconnects all peers.
  720. // No new peers can be registered after Close has returned.
  721. func (ps *peerSet) Close() {
  722. ps.lock.Lock()
  723. defer ps.lock.Unlock()
  724. for _, p := range ps.peers {
  725. p.Disconnect(p2p.DiscQuitting)
  726. }
  727. ps.closed = true
  728. }