peer.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "math/big"
  19. "math/rand"
  20. "sync"
  21. mapset "github.com/deckarep/golang-set"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/p2p"
  25. "github.com/ethereum/go-ethereum/rlp"
  26. )
  27. const (
  28. // maxKnownTxs is the maximum transactions hashes to keep in the known list
  29. // before starting to randomly evict them.
  30. maxKnownTxs = 32768
  31. // maxKnownBlocks is the maximum block hashes to keep in the known list
  32. // before starting to randomly evict them.
  33. maxKnownBlocks = 1024
  34. // maxQueuedTxs is the maximum number of transactions to queue up before dropping
  35. // older broadcasts.
  36. maxQueuedTxs = 4096
  37. // maxQueuedTxAnns is the maximum number of transaction announcements to queue up
  38. // before dropping older announcements.
  39. maxQueuedTxAnns = 4096
  40. // maxQueuedBlocks is the maximum number of block propagations to queue up before
  41. // dropping broadcasts. There's not much point in queueing stale blocks, so a few
  42. // that might cover uncles should be enough.
  43. maxQueuedBlocks = 4
  44. // maxQueuedBlockAnns is the maximum number of block announcements to queue up before
  45. // dropping broadcasts. Similarly to block propagations, there's no point to queue
  46. // above some healthy uncle limit, so use that.
  47. maxQueuedBlockAnns = 4
  48. )
  49. // max is a helper function which returns the larger of the two given integers.
  50. func max(a, b int) int {
  51. if a > b {
  52. return a
  53. }
  54. return b
  55. }
  56. // Peer is a collection of relevant information we have about a `eth` peer.
  57. type Peer struct {
  58. id string // Unique ID for the peer, cached
  59. *p2p.Peer // The embedded P2P package peer
  60. rw p2p.MsgReadWriter // Input/output streams for snap
  61. version uint // Protocol version negotiated
  62. head common.Hash // Latest advertised head block hash
  63. td *big.Int // Latest advertised head block total difficulty
  64. knownBlocks mapset.Set // Set of block hashes known to be known by this peer
  65. queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
  66. queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer
  67. txpool TxPool // Transaction pool used by the broadcasters for liveness checks
  68. knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
  69. txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
  70. txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests
  71. term chan struct{} // Termination channel to stop the broadcasters
  72. lock sync.RWMutex // Mutex protecting the internal fields
  73. }
  74. // NewPeer create a wrapper for a network connection and negotiated protocol
  75. // version.
  76. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer {
  77. peer := &Peer{
  78. id: p.ID().String(),
  79. Peer: p,
  80. rw: rw,
  81. version: version,
  82. knownTxs: mapset.NewSet(),
  83. knownBlocks: mapset.NewSet(),
  84. queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks),
  85. queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
  86. txBroadcast: make(chan []common.Hash),
  87. txAnnounce: make(chan []common.Hash),
  88. txpool: txpool,
  89. term: make(chan struct{}),
  90. }
  91. // Start up all the broadcasters
  92. go peer.broadcastBlocks()
  93. go peer.broadcastTransactions()
  94. if version >= ETH65 {
  95. go peer.announceTransactions()
  96. }
  97. return peer
  98. }
  99. // Close signals the broadcast goroutine to terminate. Only ever call this if
  100. // you created the peer yourself via NewPeer. Otherwise let whoever created it
  101. // clean it up!
  102. func (p *Peer) Close() {
  103. close(p.term)
  104. }
  105. // ID retrieves the peer's unique identifier.
  106. func (p *Peer) ID() string {
  107. return p.id
  108. }
  109. // Version retrieves the peer's negoatiated `eth` protocol version.
  110. func (p *Peer) Version() uint {
  111. return p.version
  112. }
  113. // Head retrieves the current head hash and total difficulty of the peer.
  114. func (p *Peer) Head() (hash common.Hash, td *big.Int) {
  115. p.lock.RLock()
  116. defer p.lock.RUnlock()
  117. copy(hash[:], p.head[:])
  118. return hash, new(big.Int).Set(p.td)
  119. }
  120. // SetHead updates the head hash and total difficulty of the peer.
  121. func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
  122. p.lock.Lock()
  123. defer p.lock.Unlock()
  124. copy(p.head[:], hash[:])
  125. p.td.Set(td)
  126. }
  127. // KnownBlock returns whether peer is known to already have a block.
  128. func (p *Peer) KnownBlock(hash common.Hash) bool {
  129. return p.knownBlocks.Contains(hash)
  130. }
  131. // KnownTransaction returns whether peer is known to already have a transaction.
  132. func (p *Peer) KnownTransaction(hash common.Hash) bool {
  133. return p.knownTxs.Contains(hash)
  134. }
  135. // markBlock marks a block as known for the peer, ensuring that the block will
  136. // never be propagated to this particular peer.
  137. func (p *Peer) markBlock(hash common.Hash) {
  138. // If we reached the memory allowance, drop a previously known block hash
  139. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  140. p.knownBlocks.Pop()
  141. }
  142. p.knownBlocks.Add(hash)
  143. }
  144. // markTransaction marks a transaction as known for the peer, ensuring that it
  145. // will never be propagated to this particular peer.
  146. func (p *Peer) markTransaction(hash common.Hash) {
  147. // If we reached the memory allowance, drop a previously known transaction hash
  148. for p.knownTxs.Cardinality() >= maxKnownTxs {
  149. p.knownTxs.Pop()
  150. }
  151. p.knownTxs.Add(hash)
  152. }
  153. // SendTransactions sends transactions to the peer and includes the hashes
  154. // in its transaction hash set for future reference.
  155. //
  156. // This method is a helper used by the async transaction sender. Don't call it
  157. // directly as the queueing (memory) and transmission (bandwidth) costs should
  158. // not be managed directly.
  159. //
  160. // The reasons this is public is to allow packages using this protocol to write
  161. // tests that directly send messages without having to do the asyn queueing.
  162. func (p *Peer) SendTransactions(txs types.Transactions) error {
  163. // Mark all the transactions as known, but ensure we don't overflow our limits
  164. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(txs)) {
  165. p.knownTxs.Pop()
  166. }
  167. for _, tx := range txs {
  168. p.knownTxs.Add(tx.Hash())
  169. }
  170. return p2p.Send(p.rw, TransactionsMsg, txs)
  171. }
  172. // AsyncSendTransactions queues a list of transactions (by hash) to eventually
  173. // propagate to a remote peer. The number of pending sends are capped (new ones
  174. // will force old sends to be dropped)
  175. func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
  176. select {
  177. case p.txBroadcast <- hashes:
  178. // Mark all the transactions as known, but ensure we don't overflow our limits
  179. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  180. p.knownTxs.Pop()
  181. }
  182. for _, hash := range hashes {
  183. p.knownTxs.Add(hash)
  184. }
  185. case <-p.term:
  186. p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
  187. }
  188. }
  189. // sendPooledTransactionHashes sends transaction hashes to the peer and includes
  190. // them in its transaction hash set for future reference.
  191. //
  192. // This method is a helper used by the async transaction announcer. Don't call it
  193. // directly as the queueing (memory) and transmission (bandwidth) costs should
  194. // not be managed directly.
  195. func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
  196. // Mark all the transactions as known, but ensure we don't overflow our limits
  197. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  198. p.knownTxs.Pop()
  199. }
  200. for _, hash := range hashes {
  201. p.knownTxs.Add(hash)
  202. }
  203. return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
  204. }
  205. // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
  206. // announce to a remote peer. The number of pending sends are capped (new ones
  207. // will force old sends to be dropped)
  208. func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
  209. select {
  210. case p.txAnnounce <- hashes:
  211. // Mark all the transactions as known, but ensure we don't overflow our limits
  212. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  213. p.knownTxs.Pop()
  214. }
  215. for _, hash := range hashes {
  216. p.knownTxs.Add(hash)
  217. }
  218. case <-p.term:
  219. p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
  220. }
  221. }
  222. // SendPooledTransactionsRLP sends requested transactions to the peer and adds the
  223. // hashes in its transaction hash set for future reference.
  224. //
  225. // Note, the method assumes the hashes are correct and correspond to the list of
  226. // transactions being sent.
  227. func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValue) error {
  228. // Mark all the transactions as known, but ensure we don't overflow our limits
  229. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  230. p.knownTxs.Pop()
  231. }
  232. for _, hash := range hashes {
  233. p.knownTxs.Add(hash)
  234. }
  235. return p2p.Send(p.rw, PooledTransactionsMsg, txs) // Not packed into PooledTransactionsPacket to avoid RLP decoding
  236. }
  237. // ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
  238. func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
  239. // Mark all the transactions as known, but ensure we don't overflow our limits
  240. for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
  241. p.knownTxs.Pop()
  242. }
  243. for _, hash := range hashes {
  244. p.knownTxs.Add(hash)
  245. }
  246. // Not packed into PooledTransactionsPacket to avoid RLP decoding
  247. return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{
  248. RequestId: id,
  249. PooledTransactionsRLPPacket: txs,
  250. })
  251. }
  252. // SendNewBlockHashes announces the availability of a number of blocks through
  253. // a hash notification.
  254. func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
  255. // Mark all the block hashes as known, but ensure we don't overflow our limits
  256. for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) {
  257. p.knownBlocks.Pop()
  258. }
  259. for _, hash := range hashes {
  260. p.knownBlocks.Add(hash)
  261. }
  262. request := make(NewBlockHashesPacket, len(hashes))
  263. for i := 0; i < len(hashes); i++ {
  264. request[i].Hash = hashes[i]
  265. request[i].Number = numbers[i]
  266. }
  267. return p2p.Send(p.rw, NewBlockHashesMsg, request)
  268. }
  269. // AsyncSendNewBlockHash queues the availability of a block for propagation to a
  270. // remote peer. If the peer's broadcast queue is full, the event is silently
  271. // dropped.
  272. func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
  273. select {
  274. case p.queuedBlockAnns <- block:
  275. // Mark all the block hash as known, but ensure we don't overflow our limits
  276. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  277. p.knownBlocks.Pop()
  278. }
  279. p.knownBlocks.Add(block.Hash())
  280. default:
  281. p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
  282. }
  283. }
  284. // SendNewBlock propagates an entire block to a remote peer.
  285. func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
  286. // Mark all the block hash as known, but ensure we don't overflow our limits
  287. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  288. p.knownBlocks.Pop()
  289. }
  290. p.knownBlocks.Add(block.Hash())
  291. return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{
  292. Block: block,
  293. TD: td,
  294. })
  295. }
  296. // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
  297. // the peer's broadcast queue is full, the event is silently dropped.
  298. func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
  299. select {
  300. case p.queuedBlocks <- &blockPropagation{block: block, td: td}:
  301. // Mark all the block hash as known, but ensure we don't overflow our limits
  302. for p.knownBlocks.Cardinality() >= maxKnownBlocks {
  303. p.knownBlocks.Pop()
  304. }
  305. p.knownBlocks.Add(block.Hash())
  306. default:
  307. p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
  308. }
  309. }
  310. // SendBlockHeaders sends a batch of block headers to the remote peer.
  311. func (p *Peer) SendBlockHeaders(headers []*types.Header) error {
  312. return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers))
  313. }
  314. // ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
  315. func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
  316. return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{
  317. RequestId: id,
  318. BlockHeadersPacket: headers,
  319. })
  320. }
  321. // SendBlockBodiesRLP sends a batch of block contents to the remote peer from
  322. // an already RLP encoded format.
  323. func (p *Peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
  324. return p2p.Send(p.rw, BlockBodiesMsg, bodies) // Not packed into BlockBodiesPacket to avoid RLP decoding
  325. }
  326. // ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
  327. func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
  328. // Not packed into BlockBodiesPacket to avoid RLP decoding
  329. return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesRLPPacket66{
  330. RequestId: id,
  331. BlockBodiesRLPPacket: bodies,
  332. })
  333. }
  334. // SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
  335. // hashes requested.
  336. func (p *Peer) SendNodeData(data [][]byte) error {
  337. return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket(data))
  338. }
  339. // ReplyNodeData is the eth/66 response to GetNodeData.
  340. func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
  341. return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{
  342. RequestId: id,
  343. NodeDataPacket: data,
  344. })
  345. }
  346. // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
  347. // ones requested from an already RLP encoded format.
  348. func (p *Peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
  349. return p2p.Send(p.rw, ReceiptsMsg, receipts) // Not packed into ReceiptsPacket to avoid RLP decoding
  350. }
  351. // ReplyReceiptsRLP is the eth/66 response to GetReceipts.
  352. func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
  353. return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{
  354. RequestId: id,
  355. ReceiptsRLPPacket: receipts,
  356. })
  357. }
  358. // RequestOneHeader is a wrapper around the header query functions to fetch a
  359. // single header. It is used solely by the fetcher.
  360. func (p *Peer) RequestOneHeader(hash common.Hash) error {
  361. p.Log().Debug("Fetching single header", "hash", hash)
  362. query := GetBlockHeadersPacket{
  363. Origin: HashOrNumber{Hash: hash},
  364. Amount: uint64(1),
  365. Skip: uint64(0),
  366. Reverse: false,
  367. }
  368. if p.Version() >= ETH66 {
  369. id := rand.Uint64()
  370. requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
  371. return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
  372. RequestId: id,
  373. GetBlockHeadersPacket: &query,
  374. })
  375. }
  376. return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
  377. }
  378. // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
  379. // specified header query, based on the hash of an origin block.
  380. func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
  381. p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
  382. query := GetBlockHeadersPacket{
  383. Origin: HashOrNumber{Hash: origin},
  384. Amount: uint64(amount),
  385. Skip: uint64(skip),
  386. Reverse: reverse,
  387. }
  388. if p.Version() >= ETH66 {
  389. id := rand.Uint64()
  390. requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
  391. return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
  392. RequestId: id,
  393. GetBlockHeadersPacket: &query,
  394. })
  395. }
  396. return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
  397. }
  398. // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
  399. // specified header query, based on the number of an origin block.
  400. func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  401. p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
  402. query := GetBlockHeadersPacket{
  403. Origin: HashOrNumber{Number: origin},
  404. Amount: uint64(amount),
  405. Skip: uint64(skip),
  406. Reverse: reverse,
  407. }
  408. if p.Version() >= ETH66 {
  409. id := rand.Uint64()
  410. requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
  411. return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
  412. RequestId: id,
  413. GetBlockHeadersPacket: &query,
  414. })
  415. }
  416. return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
  417. }
  418. // ExpectRequestHeadersByNumber is a testing method to mirror the recipient side
  419. // of the RequestHeadersByNumber operation.
  420. func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
  421. req := &GetBlockHeadersPacket{
  422. Origin: HashOrNumber{Number: origin},
  423. Amount: uint64(amount),
  424. Skip: uint64(skip),
  425. Reverse: reverse,
  426. }
  427. return p2p.ExpectMsg(p.rw, GetBlockHeadersMsg, req)
  428. }
  429. // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
  430. // specified.
  431. func (p *Peer) RequestBodies(hashes []common.Hash) error {
  432. p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
  433. if p.Version() >= ETH66 {
  434. id := rand.Uint64()
  435. requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id)
  436. return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{
  437. RequestId: id,
  438. GetBlockBodiesPacket: hashes,
  439. })
  440. }
  441. return p2p.Send(p.rw, GetBlockBodiesMsg, GetBlockBodiesPacket(hashes))
  442. }
  443. // RequestNodeData fetches a batch of arbitrary data from a node's known state
  444. // data, corresponding to the specified hashes.
  445. func (p *Peer) RequestNodeData(hashes []common.Hash) error {
  446. p.Log().Debug("Fetching batch of state data", "count", len(hashes))
  447. if p.Version() >= ETH66 {
  448. id := rand.Uint64()
  449. requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id)
  450. return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{
  451. RequestId: id,
  452. GetNodeDataPacket: hashes,
  453. })
  454. }
  455. return p2p.Send(p.rw, GetNodeDataMsg, GetNodeDataPacket(hashes))
  456. }
  457. // RequestReceipts fetches a batch of transaction receipts from a remote node.
  458. func (p *Peer) RequestReceipts(hashes []common.Hash) error {
  459. p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
  460. if p.Version() >= ETH66 {
  461. id := rand.Uint64()
  462. requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id)
  463. return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{
  464. RequestId: id,
  465. GetReceiptsPacket: hashes,
  466. })
  467. }
  468. return p2p.Send(p.rw, GetReceiptsMsg, GetReceiptsPacket(hashes))
  469. }
  470. // RequestTxs fetches a batch of transactions from a remote node.
  471. func (p *Peer) RequestTxs(hashes []common.Hash) error {
  472. p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
  473. if p.Version() >= ETH66 {
  474. id := rand.Uint64()
  475. requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
  476. return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
  477. RequestId: id,
  478. GetPooledTransactionsPacket: hashes,
  479. })
  480. }
  481. return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes))
  482. }