tx_fetcher.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893
  1. // Copyright 2019 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "bytes"
  19. "errors"
  20. "fmt"
  21. mrand "math/rand"
  22. "sort"
  23. "time"
  24. mapset "github.com/deckarep/golang-set"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/metrics"
  31. )
  32. const (
  33. // maxTxAnnounces is the maximum number of unique transaction a peer
  34. // can announce in a short time.
  35. maxTxAnnounces = 4096
  36. // maxTxRetrievals is the maximum transaction number can be fetched in one
  37. // request. The rationale to pick 256 is:
  38. // - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
  39. // Etherscan the average transaction size is around 200B, so in theory
  40. // we can include lots of transaction in a single protocol packet.
  41. // - However the maximum size of a single transaction is raised to 128KB,
  42. // so pick a middle value here to ensure we can maximize the efficiency
  43. // of the retrieval and response size overflow won't happen in most cases.
  44. maxTxRetrievals = 256
  45. // maxTxUnderpricedSetSize is the size of the underpriced transaction set that
  46. // is used to track recent transactions that have been dropped so we don't
  47. // re-request them.
  48. maxTxUnderpricedSetSize = 32768
  49. // txArriveTimeout is the time allowance before an announced transaction is
  50. // explicitly requested.
  51. txArriveTimeout = 500 * time.Millisecond
  52. // txGatherSlack is the interval used to collate almost-expired announces
  53. // with network fetches.
  54. txGatherSlack = 100 * time.Millisecond
  55. )
  56. var (
  57. // txFetchTimeout is the maximum allotted time to return an explicitly
  58. // requested transaction.
  59. txFetchTimeout = 5 * time.Second
  60. )
  61. var (
  62. txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil)
  63. txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil)
  64. txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil)
  65. txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil)
  66. txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil)
  67. txBroadcastKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/known", nil)
  68. txBroadcastUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/underpriced", nil)
  69. txBroadcastOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/otherreject", nil)
  70. txRequestOutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/out", nil)
  71. txRequestFailMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/fail", nil)
  72. txRequestDoneMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/done", nil)
  73. txRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/timeout", nil)
  74. txReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/in", nil)
  75. txReplyKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/known", nil)
  76. txReplyUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/underpriced", nil)
  77. txReplyOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/otherreject", nil)
  78. txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil)
  79. txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil)
  80. txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil)
  81. txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
  82. txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
  83. txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)
  84. )
  85. // txAnnounce is the notification of the availability of a batch
  86. // of new transactions in the network.
  87. type txAnnounce struct {
  88. origin string // Identifier of the peer originating the notification
  89. hashes []common.Hash // Batch of transaction hashes being announced
  90. }
  91. // txRequest represents an in-flight transaction retrieval request destined to
  92. // a specific peers.
  93. type txRequest struct {
  94. hashes []common.Hash // Transactions having been requested
  95. stolen map[common.Hash]struct{} // Deliveries by someone else (don't re-request)
  96. time mclock.AbsTime // Timestamp of the request
  97. }
  98. // txDelivery is the notification that a batch of transactions have been added
  99. // to the pool and should be untracked.
  100. type txDelivery struct {
  101. origin string // Identifier of the peer originating the notification
  102. hashes []common.Hash // Batch of transaction hashes having been delivered
  103. direct bool // Whether this is a direct reply or a broadcast
  104. }
  105. // txDrop is the notification that a peer has disconnected.
  106. type txDrop struct {
  107. peer string
  108. }
  109. // TxFetcher is responsible for retrieving new transaction based on announcements.
  110. //
  111. // The fetcher operates in 3 stages:
  112. // - Transactions that are newly discovered are moved into a wait list.
  113. // - After ~500ms passes, transactions from the wait list that have not been
  114. // broadcast to us in whole are moved into a queueing area.
  115. // - When a connected peer doesn't have in-flight retrieval requests, any
  116. // transaction queued up (and announced by the peer) are allocated to the
  117. // peer and moved into a fetching status until it's fulfilled or fails.
  118. //
  119. // The invariants of the fetcher are:
  120. // - Each tracked transaction (hash) must only be present in one of the
  121. // three stages. This ensures that the fetcher operates akin to a finite
  122. // state automata and there's do data leak.
  123. // - Each peer that announced transactions may be scheduled retrievals, but
  124. // only ever one concurrently. This ensures we can immediately know what is
  125. // missing from a reply and reschedule it.
  126. type TxFetcher struct {
  127. notify chan *txAnnounce
  128. cleanup chan *txDelivery
  129. drop chan *txDrop
  130. quit chan struct{}
  131. underpriced mapset.Set // Transactions discarded as too cheap (don't re-fetch)
  132. // Stage 1: Waiting lists for newly discovered transactions that might be
  133. // broadcast without needing explicit request/reply round trips.
  134. waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
  135. waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
  136. waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection)
  137. // Stage 2: Queue of transactions that waiting to be allocated to some peer
  138. // to be retrieved directly.
  139. announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
  140. announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
  141. // Stage 3: Set of transactions currently being retrieved, some which may be
  142. // fulfilled and some rescheduled. Note, this step shares 'announces' from the
  143. // previous stage to avoid having to duplicate (need it for DoS checks).
  144. fetching map[common.Hash]string // Transaction set currently being retrieved
  145. requests map[string]*txRequest // In-flight transaction retrievals
  146. alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
  147. // Callbacks
  148. hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
  149. addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
  150. fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
  151. step chan struct{} // Notification channel when the fetcher loop iterates
  152. clock mclock.Clock // Time wrapper to simulate in tests
  153. rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)
  154. }
  155. // NewTxFetcher creates a transaction fetcher to retrieve transaction
  156. // based on hash announcements.
  157. func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
  158. return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
  159. }
  160. // NewTxFetcherForTests is a testing method to mock out the realtime clock with
  161. // a simulated version and the internal randomness with a deterministic one.
  162. func NewTxFetcherForTests(
  163. hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
  164. clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
  165. return &TxFetcher{
  166. notify: make(chan *txAnnounce),
  167. cleanup: make(chan *txDelivery),
  168. drop: make(chan *txDrop),
  169. quit: make(chan struct{}),
  170. waitlist: make(map[common.Hash]map[string]struct{}),
  171. waittime: make(map[common.Hash]mclock.AbsTime),
  172. waitslots: make(map[string]map[common.Hash]struct{}),
  173. announces: make(map[string]map[common.Hash]struct{}),
  174. announced: make(map[common.Hash]map[string]struct{}),
  175. fetching: make(map[common.Hash]string),
  176. requests: make(map[string]*txRequest),
  177. alternates: make(map[common.Hash]map[string]struct{}),
  178. underpriced: mapset.NewSet(),
  179. hasTx: hasTx,
  180. addTxs: addTxs,
  181. fetchTxs: fetchTxs,
  182. clock: clock,
  183. rand: rand,
  184. }
  185. }
  186. // Notify announces the fetcher of the potential availability of a new batch of
  187. // transactions in the network.
  188. func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
  189. // Keep track of all the announced transactions
  190. txAnnounceInMeter.Mark(int64(len(hashes)))
  191. // Skip any transaction announcements that we already know of, or that we've
  192. // previously marked as cheap and discarded. This check is of course racey,
  193. // because multiple concurrent notifies will still manage to pass it, but it's
  194. // still valuable to check here because it runs concurrent to the internal
  195. // loop, so anything caught here is time saved internally.
  196. var (
  197. unknowns = make([]common.Hash, 0, len(hashes))
  198. duplicate, underpriced int64
  199. )
  200. for _, hash := range hashes {
  201. switch {
  202. case f.hasTx(hash):
  203. duplicate++
  204. case f.underpriced.Contains(hash):
  205. underpriced++
  206. default:
  207. unknowns = append(unknowns, hash)
  208. }
  209. }
  210. txAnnounceKnownMeter.Mark(duplicate)
  211. txAnnounceUnderpricedMeter.Mark(underpriced)
  212. // If anything's left to announce, push it into the internal loop
  213. if len(unknowns) == 0 {
  214. return nil
  215. }
  216. announce := &txAnnounce{
  217. origin: peer,
  218. hashes: unknowns,
  219. }
  220. select {
  221. case f.notify <- announce:
  222. return nil
  223. case <-f.quit:
  224. return errTerminated
  225. }
  226. }
  227. // Enqueue imports a batch of received transaction into the transaction pool
  228. // and the fetcher. This method may be called by both transaction broadcasts and
  229. // direct request replies. The differentiation is important so the fetcher can
  230. // re-schedule missing transactions as soon as possible.
  231. func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
  232. // Keep track of all the propagated transactions
  233. if direct {
  234. txReplyInMeter.Mark(int64(len(txs)))
  235. } else {
  236. txBroadcastInMeter.Mark(int64(len(txs)))
  237. }
  238. // Push all the transactions into the pool, tracking underpriced ones to avoid
  239. // re-requesting them and dropping the peer in case of malicious transfers.
  240. var (
  241. added = make([]common.Hash, 0, len(txs))
  242. duplicate int64
  243. underpriced int64
  244. otherreject int64
  245. )
  246. errs := f.addTxs(txs)
  247. for i, err := range errs {
  248. // Track the transaction hash if the price is too low for us.
  249. // Avoid re-request this transaction when we receive another
  250. // announcement.
  251. if errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced) {
  252. for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
  253. f.underpriced.Pop()
  254. }
  255. f.underpriced.Add(txs[i].Hash())
  256. }
  257. // Track a few interesting failure types
  258. switch {
  259. case err == nil: // Noop, but need to handle to not count these
  260. case errors.Is(err, core.ErrAlreadyKnown):
  261. duplicate++
  262. case errors.Is(err, core.ErrUnderpriced) || errors.Is(err, core.ErrReplaceUnderpriced):
  263. underpriced++
  264. default:
  265. otherreject++
  266. }
  267. added = append(added, txs[i].Hash())
  268. }
  269. if direct {
  270. txReplyKnownMeter.Mark(duplicate)
  271. txReplyUnderpricedMeter.Mark(underpriced)
  272. txReplyOtherRejectMeter.Mark(otherreject)
  273. } else {
  274. txBroadcastKnownMeter.Mark(duplicate)
  275. txBroadcastUnderpricedMeter.Mark(underpriced)
  276. txBroadcastOtherRejectMeter.Mark(otherreject)
  277. }
  278. select {
  279. case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
  280. return nil
  281. case <-f.quit:
  282. return errTerminated
  283. }
  284. }
  285. // Drop should be called when a peer disconnects. It cleans up all the internal
  286. // data structures of the given node.
  287. func (f *TxFetcher) Drop(peer string) error {
  288. select {
  289. case f.drop <- &txDrop{peer: peer}:
  290. return nil
  291. case <-f.quit:
  292. return errTerminated
  293. }
  294. }
  295. // Start boots up the announcement based synchroniser, accepting and processing
  296. // hash notifications and block fetches until termination requested.
  297. func (f *TxFetcher) Start() {
  298. go f.loop()
  299. }
  300. // Stop terminates the announcement based synchroniser, canceling all pending
  301. // operations.
  302. func (f *TxFetcher) Stop() {
  303. close(f.quit)
  304. }
  305. func (f *TxFetcher) loop() {
  306. var (
  307. waitTimer = new(mclock.Timer)
  308. timeoutTimer = new(mclock.Timer)
  309. waitTrigger = make(chan struct{}, 1)
  310. timeoutTrigger = make(chan struct{}, 1)
  311. )
  312. for {
  313. select {
  314. case ann := <-f.notify:
  315. // Drop part of the new announcements if there are too many accumulated.
  316. // Note, we could but do not filter already known transactions here as
  317. // the probability of something arriving between this call and the pre-
  318. // filter outside is essentially zero.
  319. used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin])
  320. if used >= maxTxAnnounces {
  321. // This can happen if a set of transactions are requested but not
  322. // all fulfilled, so the remainder are rescheduled without the cap
  323. // check. Should be fine as the limit is in the thousands and the
  324. // request size in the hundreds.
  325. txAnnounceDOSMeter.Mark(int64(len(ann.hashes)))
  326. break
  327. }
  328. want := used + len(ann.hashes)
  329. if want > maxTxAnnounces {
  330. txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
  331. ann.hashes = ann.hashes[:want-maxTxAnnounces]
  332. }
  333. // All is well, schedule the remainder of the transactions
  334. idleWait := len(f.waittime) == 0
  335. _, oldPeer := f.announces[ann.origin]
  336. for _, hash := range ann.hashes {
  337. // If the transaction is already downloading, add it to the list
  338. // of possible alternates (in case the current retrieval fails) and
  339. // also account it for the peer.
  340. if f.alternates[hash] != nil {
  341. f.alternates[hash][ann.origin] = struct{}{}
  342. // Stage 2 and 3 share the set of origins per tx
  343. if announces := f.announces[ann.origin]; announces != nil {
  344. announces[hash] = struct{}{}
  345. } else {
  346. f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
  347. }
  348. continue
  349. }
  350. // If the transaction is not downloading, but is already queued
  351. // from a different peer, track it for the new peer too.
  352. if f.announced[hash] != nil {
  353. f.announced[hash][ann.origin] = struct{}{}
  354. // Stage 2 and 3 share the set of origins per tx
  355. if announces := f.announces[ann.origin]; announces != nil {
  356. announces[hash] = struct{}{}
  357. } else {
  358. f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
  359. }
  360. continue
  361. }
  362. // If the transaction is already known to the fetcher, but not
  363. // yet downloading, add the peer as an alternate origin in the
  364. // waiting list.
  365. if f.waitlist[hash] != nil {
  366. f.waitlist[hash][ann.origin] = struct{}{}
  367. if waitslots := f.waitslots[ann.origin]; waitslots != nil {
  368. waitslots[hash] = struct{}{}
  369. } else {
  370. f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
  371. }
  372. continue
  373. }
  374. // Transaction unknown to the fetcher, insert it into the waiting list
  375. f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
  376. f.waittime[hash] = f.clock.Now()
  377. if waitslots := f.waitslots[ann.origin]; waitslots != nil {
  378. waitslots[hash] = struct{}{}
  379. } else {
  380. f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
  381. }
  382. }
  383. // If a new item was added to the waitlist, schedule it into the fetcher
  384. if idleWait && len(f.waittime) > 0 {
  385. f.rescheduleWait(waitTimer, waitTrigger)
  386. }
  387. // If this peer is new and announced something already queued, maybe
  388. // request transactions from them
  389. if !oldPeer && len(f.announces[ann.origin]) > 0 {
  390. f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
  391. }
  392. case <-waitTrigger:
  393. // At least one transaction's waiting time ran out, push all expired
  394. // ones into the retrieval queues
  395. actives := make(map[string]struct{})
  396. for hash, instance := range f.waittime {
  397. if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
  398. // Transaction expired without propagation, schedule for retrieval
  399. if f.announced[hash] != nil {
  400. panic("announce tracker already contains waitlist item")
  401. }
  402. f.announced[hash] = f.waitlist[hash]
  403. for peer := range f.waitlist[hash] {
  404. if announces := f.announces[peer]; announces != nil {
  405. announces[hash] = struct{}{}
  406. } else {
  407. f.announces[peer] = map[common.Hash]struct{}{hash: {}}
  408. }
  409. delete(f.waitslots[peer], hash)
  410. if len(f.waitslots[peer]) == 0 {
  411. delete(f.waitslots, peer)
  412. }
  413. actives[peer] = struct{}{}
  414. }
  415. delete(f.waittime, hash)
  416. delete(f.waitlist, hash)
  417. }
  418. }
  419. // If transactions are still waiting for propagation, reschedule the wait timer
  420. if len(f.waittime) > 0 {
  421. f.rescheduleWait(waitTimer, waitTrigger)
  422. }
  423. // If any peers became active and are idle, request transactions from them
  424. if len(actives) > 0 {
  425. f.scheduleFetches(timeoutTimer, timeoutTrigger, actives)
  426. }
  427. case <-timeoutTrigger:
  428. // Clean up any expired retrievals and avoid re-requesting them from the
  429. // same peer (either overloaded or malicious, useless in both cases). We
  430. // could also penalize (Drop), but there's nothing to gain, and if could
  431. // possibly further increase the load on it.
  432. for peer, req := range f.requests {
  433. if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout {
  434. txRequestTimeoutMeter.Mark(int64(len(req.hashes)))
  435. // Reschedule all the not-yet-delivered fetches to alternate peers
  436. for _, hash := range req.hashes {
  437. // Skip rescheduling hashes already delivered by someone else
  438. if req.stolen != nil {
  439. if _, ok := req.stolen[hash]; ok {
  440. continue
  441. }
  442. }
  443. // Move the delivery back from fetching to queued
  444. if _, ok := f.announced[hash]; ok {
  445. panic("announced tracker already contains alternate item")
  446. }
  447. if f.alternates[hash] != nil { // nil if tx was broadcast during fetch
  448. f.announced[hash] = f.alternates[hash]
  449. }
  450. delete(f.announced[hash], peer)
  451. if len(f.announced[hash]) == 0 {
  452. delete(f.announced, hash)
  453. }
  454. delete(f.announces[peer], hash)
  455. delete(f.alternates, hash)
  456. delete(f.fetching, hash)
  457. }
  458. if len(f.announces[peer]) == 0 {
  459. delete(f.announces, peer)
  460. }
  461. // Keep track of the request as dangling, but never expire
  462. f.requests[peer].hashes = nil
  463. }
  464. }
  465. // Schedule a new transaction retrieval
  466. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
  467. // No idea if we scheduled something or not, trigger the timer if needed
  468. // TODO(karalabe): this is kind of lame, can't we dump it into scheduleFetches somehow?
  469. f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
  470. case delivery := <-f.cleanup:
  471. // Independent if the delivery was direct or broadcast, remove all
  472. // traces of the hash from internal trackers
  473. for _, hash := range delivery.hashes {
  474. if _, ok := f.waitlist[hash]; ok {
  475. for peer, txset := range f.waitslots {
  476. delete(txset, hash)
  477. if len(txset) == 0 {
  478. delete(f.waitslots, peer)
  479. }
  480. }
  481. delete(f.waitlist, hash)
  482. delete(f.waittime, hash)
  483. } else {
  484. for peer, txset := range f.announces {
  485. delete(txset, hash)
  486. if len(txset) == 0 {
  487. delete(f.announces, peer)
  488. }
  489. }
  490. delete(f.announced, hash)
  491. delete(f.alternates, hash)
  492. // If a transaction currently being fetched from a different
  493. // origin was delivered (delivery stolen), mark it so the
  494. // actual delivery won't double schedule it.
  495. if origin, ok := f.fetching[hash]; ok && (origin != delivery.origin || !delivery.direct) {
  496. stolen := f.requests[origin].stolen
  497. if stolen == nil {
  498. f.requests[origin].stolen = make(map[common.Hash]struct{})
  499. stolen = f.requests[origin].stolen
  500. }
  501. stolen[hash] = struct{}{}
  502. }
  503. delete(f.fetching, hash)
  504. }
  505. }
  506. // In case of a direct delivery, also reschedule anything missing
  507. // from the original query
  508. if delivery.direct {
  509. // Mark the requesting successful (independent of individual status)
  510. txRequestDoneMeter.Mark(int64(len(delivery.hashes)))
  511. // Make sure something was pending, nuke it
  512. req := f.requests[delivery.origin]
  513. if req == nil {
  514. log.Warn("Unexpected transaction delivery", "peer", delivery.origin)
  515. break
  516. }
  517. delete(f.requests, delivery.origin)
  518. // Anything not delivered should be re-scheduled (with or without
  519. // this peer, depending on the response cutoff)
  520. delivered := make(map[common.Hash]struct{})
  521. for _, hash := range delivery.hashes {
  522. delivered[hash] = struct{}{}
  523. }
  524. cutoff := len(req.hashes) // If nothing is delivered, assume everything is missing, don't retry!!!
  525. for i, hash := range req.hashes {
  526. if _, ok := delivered[hash]; ok {
  527. cutoff = i
  528. }
  529. }
  530. // Reschedule missing hashes from alternates, not-fulfilled from alt+self
  531. for i, hash := range req.hashes {
  532. // Skip rescheduling hashes already delivered by someone else
  533. if req.stolen != nil {
  534. if _, ok := req.stolen[hash]; ok {
  535. continue
  536. }
  537. }
  538. if _, ok := delivered[hash]; !ok {
  539. if i < cutoff {
  540. delete(f.alternates[hash], delivery.origin)
  541. delete(f.announces[delivery.origin], hash)
  542. if len(f.announces[delivery.origin]) == 0 {
  543. delete(f.announces, delivery.origin)
  544. }
  545. }
  546. if len(f.alternates[hash]) > 0 {
  547. if _, ok := f.announced[hash]; ok {
  548. panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
  549. }
  550. f.announced[hash] = f.alternates[hash]
  551. }
  552. }
  553. delete(f.alternates, hash)
  554. delete(f.fetching, hash)
  555. }
  556. // Something was delivered, try to reschedule requests
  557. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
  558. }
  559. case drop := <-f.drop:
  560. // A peer was dropped, remove all traces of it
  561. if _, ok := f.waitslots[drop.peer]; ok {
  562. for hash := range f.waitslots[drop.peer] {
  563. delete(f.waitlist[hash], drop.peer)
  564. if len(f.waitlist[hash]) == 0 {
  565. delete(f.waitlist, hash)
  566. delete(f.waittime, hash)
  567. }
  568. }
  569. delete(f.waitslots, drop.peer)
  570. if len(f.waitlist) > 0 {
  571. f.rescheduleWait(waitTimer, waitTrigger)
  572. }
  573. }
  574. // Clean up any active requests
  575. var request *txRequest
  576. if request = f.requests[drop.peer]; request != nil {
  577. for _, hash := range request.hashes {
  578. // Skip rescheduling hashes already delivered by someone else
  579. if request.stolen != nil {
  580. if _, ok := request.stolen[hash]; ok {
  581. continue
  582. }
  583. }
  584. // Undelivered hash, reschedule if there's an alternative origin available
  585. delete(f.alternates[hash], drop.peer)
  586. if len(f.alternates[hash]) == 0 {
  587. delete(f.alternates, hash)
  588. } else {
  589. f.announced[hash] = f.alternates[hash]
  590. delete(f.alternates, hash)
  591. }
  592. delete(f.fetching, hash)
  593. }
  594. delete(f.requests, drop.peer)
  595. }
  596. // Clean up general announcement tracking
  597. if _, ok := f.announces[drop.peer]; ok {
  598. for hash := range f.announces[drop.peer] {
  599. delete(f.announced[hash], drop.peer)
  600. if len(f.announced[hash]) == 0 {
  601. delete(f.announced, hash)
  602. }
  603. }
  604. delete(f.announces, drop.peer)
  605. }
  606. // If a request was cancelled, check if anything needs to be rescheduled
  607. if request != nil {
  608. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
  609. f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
  610. }
  611. case <-f.quit:
  612. return
  613. }
  614. // No idea what happened, but bump some sanity metrics
  615. txFetcherWaitingPeers.Update(int64(len(f.waitslots)))
  616. txFetcherWaitingHashes.Update(int64(len(f.waitlist)))
  617. txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
  618. txFetcherQueueingHashes.Update(int64(len(f.announced)))
  619. txFetcherFetchingPeers.Update(int64(len(f.requests)))
  620. txFetcherFetchingHashes.Update(int64(len(f.fetching)))
  621. // Loop did something, ping the step notifier if needed (tests)
  622. if f.step != nil {
  623. f.step <- struct{}{}
  624. }
  625. }
  626. }
  627. // rescheduleWait iterates over all the transactions currently in the waitlist
  628. // and schedules the movement into the fetcher for the earliest.
  629. //
  630. // The method has a granularity of 'gatherSlack', since there's not much point in
  631. // spinning over all the transactions just to maybe find one that should trigger
  632. // a few ms earlier.
  633. func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
  634. if *timer != nil {
  635. (*timer).Stop()
  636. }
  637. now := f.clock.Now()
  638. earliest := now
  639. for _, instance := range f.waittime {
  640. if earliest > instance {
  641. earliest = instance
  642. if txArriveTimeout-time.Duration(now-earliest) < gatherSlack {
  643. break
  644. }
  645. }
  646. }
  647. *timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
  648. trigger <- struct{}{}
  649. })
  650. }
  651. // rescheduleTimeout iterates over all the transactions currently in flight and
  652. // schedules a cleanup run when the first would trigger.
  653. //
  654. // The method has a granularity of 'gatherSlack', since there's not much point in
  655. // spinning over all the transactions just to maybe find one that should trigger
  656. // a few ms earlier.
  657. //
  658. // This method is a bit "flaky" "by design". In theory the timeout timer only ever
  659. // should be rescheduled if some request is pending. In practice, a timeout will
  660. // cause the timer to be rescheduled every 5 secs (until the peer comes through or
  661. // disconnects). This is a limitation of the fetcher code because we don't trac
  662. // pending requests and timed out requests separately. Without double tracking, if
  663. // we simply didn't reschedule the timer on all-timeout then the timer would never
  664. // be set again since len(request) > 0 => something's running.
  665. func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{}) {
  666. if *timer != nil {
  667. (*timer).Stop()
  668. }
  669. now := f.clock.Now()
  670. earliest := now
  671. for _, req := range f.requests {
  672. // If this request already timed out, skip it altogether
  673. if req.hashes == nil {
  674. continue
  675. }
  676. if earliest > req.time {
  677. earliest = req.time
  678. if txFetchTimeout-time.Duration(now-earliest) < gatherSlack {
  679. break
  680. }
  681. }
  682. }
  683. *timer = f.clock.AfterFunc(txFetchTimeout-time.Duration(now-earliest), func() {
  684. trigger <- struct{}{}
  685. })
  686. }
  687. // scheduleFetches starts a batch of retrievals for all available idle peers.
  688. func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, whitelist map[string]struct{}) {
  689. // Gather the set of peers we want to retrieve from (default to all)
  690. actives := whitelist
  691. if actives == nil {
  692. actives = make(map[string]struct{})
  693. for peer := range f.announces {
  694. actives[peer] = struct{}{}
  695. }
  696. }
  697. if len(actives) == 0 {
  698. return
  699. }
  700. // For each active peer, try to schedule some transaction fetches
  701. idle := len(f.requests) == 0
  702. f.forEachPeer(actives, func(peer string) {
  703. if f.requests[peer] != nil {
  704. return // continue in the for-each
  705. }
  706. if len(f.announces[peer]) == 0 {
  707. return // continue in the for-each
  708. }
  709. hashes := make([]common.Hash, 0, maxTxRetrievals)
  710. f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
  711. if _, ok := f.fetching[hash]; !ok {
  712. // Mark the hash as fetching and stash away possible alternates
  713. f.fetching[hash] = peer
  714. if _, ok := f.alternates[hash]; ok {
  715. panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
  716. }
  717. f.alternates[hash] = f.announced[hash]
  718. delete(f.announced, hash)
  719. // Accumulate the hash and stop if the limit was reached
  720. hashes = append(hashes, hash)
  721. if len(hashes) >= maxTxRetrievals {
  722. return false // break in the for-each
  723. }
  724. }
  725. return true // continue in the for-each
  726. })
  727. // If any hashes were allocated, request them from the peer
  728. if len(hashes) > 0 {
  729. f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
  730. txRequestOutMeter.Mark(int64(len(hashes)))
  731. go func(peer string, hashes []common.Hash) {
  732. // Try to fetch the transactions, but in case of a request
  733. // failure (e.g. peer disconnected), reschedule the hashes.
  734. if err := f.fetchTxs(peer, hashes); err != nil {
  735. txRequestFailMeter.Mark(int64(len(hashes)))
  736. f.Drop(peer)
  737. }
  738. }(peer, hashes)
  739. }
  740. })
  741. // If a new request was fired, schedule a timeout timer
  742. if idle && len(f.requests) > 0 {
  743. f.rescheduleTimeout(timer, timeout)
  744. }
  745. }
  746. // forEachPeer does a range loop over a map of peers in production, but during
  747. // testing it does a deterministic sorted random to allow reproducing issues.
  748. func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) {
  749. // If we're running production, use whatever Go's map gives us
  750. if f.rand == nil {
  751. for peer := range peers {
  752. do(peer)
  753. }
  754. return
  755. }
  756. // We're running the test suite, make iteration deterministic
  757. list := make([]string, 0, len(peers))
  758. for peer := range peers {
  759. list = append(list, peer)
  760. }
  761. sort.Strings(list)
  762. rotateStrings(list, f.rand.Intn(len(list)))
  763. for _, peer := range list {
  764. do(peer)
  765. }
  766. }
  767. // forEachHash does a range loop over a map of hashes in production, but during
  768. // testing it does a deterministic sorted random to allow reproducing issues.
  769. func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
  770. // If we're running production, use whatever Go's map gives us
  771. if f.rand == nil {
  772. for hash := range hashes {
  773. if !do(hash) {
  774. return
  775. }
  776. }
  777. return
  778. }
  779. // We're running the test suite, make iteration deterministic
  780. list := make([]common.Hash, 0, len(hashes))
  781. for hash := range hashes {
  782. list = append(list, hash)
  783. }
  784. sortHashes(list)
  785. rotateHashes(list, f.rand.Intn(len(list)))
  786. for _, hash := range list {
  787. if !do(hash) {
  788. return
  789. }
  790. }
  791. }
  792. // rotateStrings rotates the contents of a slice by n steps. This method is only
  793. // used in tests to simulate random map iteration but keep it deterministic.
  794. func rotateStrings(slice []string, n int) {
  795. orig := make([]string, len(slice))
  796. copy(orig, slice)
  797. for i := 0; i < len(orig); i++ {
  798. slice[i] = orig[(i+n)%len(orig)]
  799. }
  800. }
  801. // sortHashes sorts a slice of hashes. This method is only used in tests in order
  802. // to simulate random map iteration but keep it deterministic.
  803. func sortHashes(slice []common.Hash) {
  804. for i := 0; i < len(slice); i++ {
  805. for j := i + 1; j < len(slice); j++ {
  806. if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
  807. slice[i], slice[j] = slice[j], slice[i]
  808. }
  809. }
  810. }
  811. }
  812. // rotateHashes rotates the contents of a slice by n steps. This method is only
  813. // used in tests to simulate random map iteration but keep it deterministic.
  814. func rotateHashes(slice []common.Hash, n int) {
  815. orig := make([]common.Hash, len(slice))
  816. copy(orig, slice)
  817. for i := 0; i < len(orig); i++ {
  818. slice[i] = orig[(i+n)%len(orig)]
  819. }
  820. }