tx_fetcher.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "bytes"
  19. "fmt"
  20. mrand "math/rand"
  21. "sort"
  22. "time"
  23. mapset "github.com/deckarep/golang-set"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/common/gopool"
  26. "github.com/ethereum/go-ethereum/common/mclock"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/log"
  30. "github.com/ethereum/go-ethereum/metrics"
  31. )
  32. const (
  33. // maxTxAnnounces is the maximum number of unique transaction a peer
  34. // can announce in a short time.
  35. maxTxAnnounces = 4096
  36. // maxTxRetrievals is the maximum transaction number can be fetched in one
  37. // request. The rationale to pick 256 is:
  38. // - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
  39. // Etherscan the average transaction size is around 200B, so in theory
  40. // we can include lots of transaction in a single protocol packet.
  41. // - However the maximum size of a single transaction is raised to 128KB,
  42. // so pick a middle value here to ensure we can maximize the efficiency
  43. // of the retrieval and response size overflow won't happen in most cases.
  44. maxTxRetrievals = 256
  45. // maxTxUnderpricedSetSize is the size of the underpriced transaction set that
  46. // is used to track recent transactions that have been dropped so we don't
  47. // re-request them.
  48. maxTxUnderpricedSetSize = 32768
  49. // txArriveTimeout is the time allowance before an announced transaction is
  50. // explicitly requested.
  51. txArriveTimeout = 500 * time.Millisecond
  52. // txGatherSlack is the interval used to collate almost-expired announces
  53. // with network fetches.
  54. txGatherSlack = 100 * time.Millisecond
  55. )
  56. var (
  57. // txFetchTimeout is the maximum allotted time to return an explicitly
  58. // requested transaction.
  59. txFetchTimeout = 5 * time.Second
  60. )
  61. var (
  62. txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil)
  63. txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil)
  64. txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil)
  65. txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil)
  66. txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil)
  67. txBroadcastKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/known", nil)
  68. txBroadcastUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/underpriced", nil)
  69. txBroadcastOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/otherreject", nil)
  70. txRequestOutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/out", nil)
  71. txRequestFailMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/fail", nil)
  72. txRequestDoneMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/done", nil)
  73. txRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/timeout", nil)
  74. txReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/in", nil)
  75. txReplyKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/known", nil)
  76. txReplyUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/underpriced", nil)
  77. txReplyOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/otherreject", nil)
  78. txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil)
  79. txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil)
  80. txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil)
  81. txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
  82. txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
  83. txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)
  84. )
  85. // txAnnounce is the notification of the availability of a batch
  86. // of new transactions in the network.
  87. type txAnnounce struct {
  88. origin string // Identifier of the peer originating the notification
  89. hashes []common.Hash // Batch of transaction hashes being announced
  90. }
  91. // txRequest represents an in-flight transaction retrieval request destined to
  92. // a specific peers.
  93. type txRequest struct {
  94. hashes []common.Hash // Transactions having been requested
  95. stolen map[common.Hash]struct{} // Deliveries by someone else (don't re-request)
  96. time mclock.AbsTime // Timestamp of the request
  97. }
  98. // txDelivery is the notification that a batch of transactions have been added
  99. // to the pool and should be untracked.
  100. type txDelivery struct {
  101. origin string // Identifier of the peer originating the notification
  102. hashes []common.Hash // Batch of transaction hashes having been delivered
  103. direct bool // Whether this is a direct reply or a broadcast
  104. }
  105. // txDrop is the notiication that a peer has disconnected.
  106. type txDrop struct {
  107. peer string
  108. }
  109. // TxFetcher is responsible for retrieving new transaction based on announcements.
  110. //
  111. // The fetcher operates in 3 stages:
  112. // - Transactions that are newly discovered are moved into a wait list.
  113. // - After ~500ms passes, transactions from the wait list that have not been
  114. // broadcast to us in whole are moved into a queueing area.
  115. // - When a connected peer doesn't have in-flight retrieval requests, any
  116. // transaction queued up (and announced by the peer) are allocated to the
  117. // peer and moved into a fetching status until it's fulfilled or fails.
  118. //
  119. // The invariants of the fetcher are:
  120. // - Each tracked transaction (hash) must only be present in one of the
  121. // three stages. This ensures that the fetcher operates akin to a finite
  122. // state automata and there's do data leak.
  123. // - Each peer that announced transactions may be scheduled retrievals, but
  124. // only ever one concurrently. This ensures we can immediately know what is
  125. // missing from a reply and reschedule it.
  126. type TxFetcher struct {
  127. notify chan *txAnnounce
  128. cleanup chan *txDelivery
  129. drop chan *txDrop
  130. quit chan struct{}
  131. underpriced mapset.Set // Transactions discarded as too cheap (don't re-fetch)
  132. // Stage 1: Waiting lists for newly discovered transactions that might be
  133. // broadcast without needing explicit request/reply round trips.
  134. waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
  135. waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
  136. waitslots map[string]map[common.Hash]struct{} // Waiting announcement sgroupped by peer (DoS protection)
  137. // Stage 2: Queue of transactions that waiting to be allocated to some peer
  138. // to be retrieved directly.
  139. announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
  140. announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
  141. // Stage 3: Set of transactions currently being retrieved, some which may be
  142. // fulfilled and some rescheduled. Note, this step shares 'announces' from the
  143. // previous stage to avoid having to duplicate (need it for DoS checks).
  144. fetching map[common.Hash]string // Transaction set currently being retrieved
  145. requests map[string]*txRequest // In-flight transaction retrievals
  146. alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
  147. // Callbacks
  148. hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
  149. addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
  150. fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
  151. step chan struct{} // Notification channel when the fetcher loop iterates
  152. clock mclock.Clock // Time wrapper to simulate in tests
  153. rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)
  154. }
  155. // NewTxFetcher creates a transaction fetcher to retrieve transaction
  156. // based on hash announcements.
  157. func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
  158. return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
  159. }
  160. // NewTxFetcherForTests is a testing method to mock out the realtime clock with
  161. // a simulated version and the internal randomness with a deterministic one.
  162. func NewTxFetcherForTests(
  163. hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
  164. clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
  165. return &TxFetcher{
  166. notify: make(chan *txAnnounce),
  167. cleanup: make(chan *txDelivery),
  168. drop: make(chan *txDrop),
  169. quit: make(chan struct{}),
  170. waitlist: make(map[common.Hash]map[string]struct{}),
  171. waittime: make(map[common.Hash]mclock.AbsTime),
  172. waitslots: make(map[string]map[common.Hash]struct{}),
  173. announces: make(map[string]map[common.Hash]struct{}),
  174. announced: make(map[common.Hash]map[string]struct{}),
  175. fetching: make(map[common.Hash]string),
  176. requests: make(map[string]*txRequest),
  177. alternates: make(map[common.Hash]map[string]struct{}),
  178. underpriced: mapset.NewSet(),
  179. hasTx: hasTx,
  180. addTxs: addTxs,
  181. fetchTxs: fetchTxs,
  182. clock: clock,
  183. rand: rand,
  184. }
  185. }
  186. // Notify announces the fetcher of the potential availability of a new batch of
  187. // transactions in the network.
  188. func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
  189. // Keep track of all the announced transactions
  190. txAnnounceInMeter.Mark(int64(len(hashes)))
  191. // Skip any transaction announcements that we already know of, or that we've
  192. // previously marked as cheap and discarded. This check is of course racey,
  193. // because multiple concurrent notifies will still manage to pass it, but it's
  194. // still valuable to check here because it runs concurrent to the internal
  195. // loop, so anything caught here is time saved internally.
  196. var (
  197. unknowns = make([]common.Hash, 0, len(hashes))
  198. duplicate, underpriced int64
  199. )
  200. for _, hash := range hashes {
  201. switch {
  202. case f.hasTx(hash):
  203. duplicate++
  204. case f.underpriced.Contains(hash):
  205. underpriced++
  206. default:
  207. unknowns = append(unknowns, hash)
  208. }
  209. }
  210. txAnnounceKnownMeter.Mark(duplicate)
  211. txAnnounceUnderpricedMeter.Mark(underpriced)
  212. // If anything's left to announce, push it into the internal loop
  213. if len(unknowns) == 0 {
  214. return nil
  215. }
  216. announce := &txAnnounce{
  217. origin: peer,
  218. hashes: unknowns,
  219. }
  220. select {
  221. case f.notify <- announce:
  222. return nil
  223. case <-f.quit:
  224. return errTerminated
  225. }
  226. }
  227. // Enqueue imports a batch of received transaction into the transaction pool
  228. // and the fetcher. This method may be called by both transaction broadcasts and
  229. // direct request replies. The differentiation is important so the fetcher can
  230. // re-shedule missing transactions as soon as possible.
  231. func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
  232. // Keep track of all the propagated transactions
  233. if direct {
  234. txReplyInMeter.Mark(int64(len(txs)))
  235. } else {
  236. txBroadcastInMeter.Mark(int64(len(txs)))
  237. }
  238. // Push all the transactions into the pool, tracking underpriced ones to avoid
  239. // re-requesting them and dropping the peer in case of malicious transfers.
  240. var (
  241. added = make([]common.Hash, 0, len(txs))
  242. duplicate int64
  243. underpriced int64
  244. otherreject int64
  245. )
  246. errs := f.addTxs(txs)
  247. for i, err := range errs {
  248. if err != nil {
  249. // Track the transaction hash if the price is too low for us.
  250. // Avoid re-request this transaction when we receive another
  251. // announcement.
  252. if err == core.ErrUnderpriced || err == core.ErrReplaceUnderpriced {
  253. for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
  254. f.underpriced.Pop()
  255. }
  256. f.underpriced.Add(txs[i].Hash())
  257. }
  258. // Track a few interesting failure types
  259. switch err {
  260. case nil: // Noop, but need to handle to not count these
  261. case core.ErrAlreadyKnown:
  262. duplicate++
  263. case core.ErrUnderpriced, core.ErrReplaceUnderpriced:
  264. underpriced++
  265. default:
  266. otherreject++
  267. }
  268. }
  269. added = append(added, txs[i].Hash())
  270. }
  271. if direct {
  272. txReplyKnownMeter.Mark(duplicate)
  273. txReplyUnderpricedMeter.Mark(underpriced)
  274. txReplyOtherRejectMeter.Mark(otherreject)
  275. } else {
  276. txBroadcastKnownMeter.Mark(duplicate)
  277. txBroadcastUnderpricedMeter.Mark(underpriced)
  278. txBroadcastOtherRejectMeter.Mark(otherreject)
  279. }
  280. select {
  281. case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
  282. return nil
  283. case <-f.quit:
  284. return errTerminated
  285. }
  286. }
  287. // Drop should be called when a peer disconnects. It cleans up all the internal
  288. // data structures of the given node.
  289. func (f *TxFetcher) Drop(peer string) error {
  290. select {
  291. case f.drop <- &txDrop{peer: peer}:
  292. return nil
  293. case <-f.quit:
  294. return errTerminated
  295. }
  296. }
  297. // Start boots up the announcement based synchroniser, accepting and processing
  298. // hash notifications and block fetches until termination requested.
  299. func (f *TxFetcher) Start() {
  300. go f.loop()
  301. }
  302. // Stop terminates the announcement based synchroniser, canceling all pending
  303. // operations.
  304. func (f *TxFetcher) Stop() {
  305. close(f.quit)
  306. }
  307. func (f *TxFetcher) loop() {
  308. var (
  309. waitTimer = new(mclock.Timer)
  310. timeoutTimer = new(mclock.Timer)
  311. waitTrigger = make(chan struct{}, 1)
  312. timeoutTrigger = make(chan struct{}, 1)
  313. )
  314. for {
  315. select {
  316. case ann := <-f.notify:
  317. // Drop part of the new announcements if there are too many accumulated.
  318. // Note, we could but do not filter already known transactions here as
  319. // the probability of something arriving between this call and the pre-
  320. // filter outside is essentially zero.
  321. used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin])
  322. if used >= maxTxAnnounces {
  323. // This can happen if a set of transactions are requested but not
  324. // all fulfilled, so the remainder are rescheduled without the cap
  325. // check. Should be fine as the limit is in the thousands and the
  326. // request size in the hundreds.
  327. txAnnounceDOSMeter.Mark(int64(len(ann.hashes)))
  328. break
  329. }
  330. want := used + len(ann.hashes)
  331. if want > maxTxAnnounces {
  332. txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
  333. ann.hashes = ann.hashes[:want-maxTxAnnounces]
  334. }
  335. // All is well, schedule the remainder of the transactions
  336. idleWait := len(f.waittime) == 0
  337. _, oldPeer := f.announces[ann.origin]
  338. for _, hash := range ann.hashes {
  339. // If the transaction is already downloading, add it to the list
  340. // of possible alternates (in case the current retrieval fails) and
  341. // also account it for the peer.
  342. if f.alternates[hash] != nil {
  343. f.alternates[hash][ann.origin] = struct{}{}
  344. // Stage 2 and 3 share the set of origins per tx
  345. if announces := f.announces[ann.origin]; announces != nil {
  346. announces[hash] = struct{}{}
  347. } else {
  348. f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
  349. }
  350. continue
  351. }
  352. // If the transaction is not downloading, but is already queued
  353. // from a different peer, track it for the new peer too.
  354. if f.announced[hash] != nil {
  355. f.announced[hash][ann.origin] = struct{}{}
  356. // Stage 2 and 3 share the set of origins per tx
  357. if announces := f.announces[ann.origin]; announces != nil {
  358. announces[hash] = struct{}{}
  359. } else {
  360. f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
  361. }
  362. continue
  363. }
  364. // If the transaction is already known to the fetcher, but not
  365. // yet downloading, add the peer as an alternate origin in the
  366. // waiting list.
  367. if f.waitlist[hash] != nil {
  368. f.waitlist[hash][ann.origin] = struct{}{}
  369. if waitslots := f.waitslots[ann.origin]; waitslots != nil {
  370. waitslots[hash] = struct{}{}
  371. } else {
  372. f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
  373. }
  374. continue
  375. }
  376. // Transaction unknown to the fetcher, insert it into the waiting list
  377. f.waitlist[hash] = map[string]struct{}{ann.origin: {}}
  378. f.waittime[hash] = f.clock.Now()
  379. if waitslots := f.waitslots[ann.origin]; waitslots != nil {
  380. waitslots[hash] = struct{}{}
  381. } else {
  382. f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
  383. }
  384. }
  385. // If a new item was added to the waitlist, schedule it into the fetcher
  386. if idleWait && len(f.waittime) > 0 {
  387. f.rescheduleWait(waitTimer, waitTrigger)
  388. }
  389. // If this peer is new and announced something already queued, maybe
  390. // request transactions from them
  391. if !oldPeer && len(f.announces[ann.origin]) > 0 {
  392. f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
  393. }
  394. case <-waitTrigger:
  395. // At least one transaction's waiting time ran out, push all expired
  396. // ones into the retrieval queues
  397. actives := make(map[string]struct{})
  398. for hash, instance := range f.waittime {
  399. if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
  400. // Transaction expired without propagation, schedule for retrieval
  401. if f.announced[hash] != nil {
  402. panic("announce tracker already contains waitlist item")
  403. }
  404. f.announced[hash] = f.waitlist[hash]
  405. for peer := range f.waitlist[hash] {
  406. if announces := f.announces[peer]; announces != nil {
  407. announces[hash] = struct{}{}
  408. } else {
  409. f.announces[peer] = map[common.Hash]struct{}{hash: {}}
  410. }
  411. delete(f.waitslots[peer], hash)
  412. if len(f.waitslots[peer]) == 0 {
  413. delete(f.waitslots, peer)
  414. }
  415. actives[peer] = struct{}{}
  416. }
  417. delete(f.waittime, hash)
  418. delete(f.waitlist, hash)
  419. }
  420. }
  421. // If transactions are still waiting for propagation, reschedule the wait timer
  422. if len(f.waittime) > 0 {
  423. f.rescheduleWait(waitTimer, waitTrigger)
  424. }
  425. // If any peers became active and are idle, request transactions from them
  426. if len(actives) > 0 {
  427. f.scheduleFetches(timeoutTimer, timeoutTrigger, actives)
  428. }
  429. case <-timeoutTrigger:
  430. // Clean up any expired retrievals and avoid re-requesting them from the
  431. // same peer (either overloaded or malicious, useless in both cases). We
  432. // could also penalize (Drop), but there's nothing to gain, and if could
  433. // possibly further increase the load on it.
  434. for peer, req := range f.requests {
  435. if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout {
  436. txRequestTimeoutMeter.Mark(int64(len(req.hashes)))
  437. // Reschedule all the not-yet-delivered fetches to alternate peers
  438. for _, hash := range req.hashes {
  439. // Skip rescheduling hashes already delivered by someone else
  440. if req.stolen != nil {
  441. if _, ok := req.stolen[hash]; ok {
  442. continue
  443. }
  444. }
  445. // Move the delivery back from fetching to queued
  446. if _, ok := f.announced[hash]; ok {
  447. panic("announced tracker already contains alternate item")
  448. }
  449. if f.alternates[hash] != nil { // nil if tx was broadcast during fetch
  450. f.announced[hash] = f.alternates[hash]
  451. }
  452. delete(f.announced[hash], peer)
  453. if len(f.announced[hash]) == 0 {
  454. delete(f.announced, hash)
  455. }
  456. delete(f.announces[peer], hash)
  457. delete(f.alternates, hash)
  458. delete(f.fetching, hash)
  459. }
  460. if len(f.announces[peer]) == 0 {
  461. delete(f.announces, peer)
  462. }
  463. // Keep track of the request as dangling, but never expire
  464. f.requests[peer].hashes = nil
  465. }
  466. }
  467. // Schedule a new transaction retrieval
  468. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
  469. // No idea if we scheduled something or not, trigger the timer if needed
  470. // TODO(karalabe): this is kind of lame, can't we dump it into scheduleFetches somehow?
  471. f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
  472. case delivery := <-f.cleanup:
  473. // Independent if the delivery was direct or broadcast, remove all
  474. // traces of the hash from internal trackers
  475. for _, hash := range delivery.hashes {
  476. if _, ok := f.waitlist[hash]; ok {
  477. for peer, txset := range f.waitslots {
  478. delete(txset, hash)
  479. if len(txset) == 0 {
  480. delete(f.waitslots, peer)
  481. }
  482. }
  483. delete(f.waitlist, hash)
  484. delete(f.waittime, hash)
  485. } else {
  486. for peer, txset := range f.announces {
  487. delete(txset, hash)
  488. if len(txset) == 0 {
  489. delete(f.announces, peer)
  490. }
  491. }
  492. delete(f.announced, hash)
  493. delete(f.alternates, hash)
  494. // If a transaction currently being fetched from a different
  495. // origin was delivered (delivery stolen), mark it so the
  496. // actual delivery won't double schedule it.
  497. if origin, ok := f.fetching[hash]; ok && (origin != delivery.origin || !delivery.direct) {
  498. stolen := f.requests[origin].stolen
  499. if stolen == nil {
  500. f.requests[origin].stolen = make(map[common.Hash]struct{})
  501. stolen = f.requests[origin].stolen
  502. }
  503. stolen[hash] = struct{}{}
  504. }
  505. delete(f.fetching, hash)
  506. }
  507. }
  508. // In case of a direct delivery, also reschedule anything missing
  509. // from the original query
  510. if delivery.direct {
  511. // Mark the reqesting successful (independent of individual status)
  512. txRequestDoneMeter.Mark(int64(len(delivery.hashes)))
  513. // Make sure something was pending, nuke it
  514. req := f.requests[delivery.origin]
  515. if req == nil {
  516. log.Warn("Unexpected transaction delivery", "peer", delivery.origin)
  517. break
  518. }
  519. delete(f.requests, delivery.origin)
  520. // Anything not delivered should be re-scheduled (with or without
  521. // this peer, depending on the response cutoff)
  522. delivered := make(map[common.Hash]struct{})
  523. for _, hash := range delivery.hashes {
  524. delivered[hash] = struct{}{}
  525. }
  526. cutoff := len(req.hashes) // If nothing is delivered, assume everything is missing, don't retry!!!
  527. for i, hash := range req.hashes {
  528. if _, ok := delivered[hash]; ok {
  529. cutoff = i
  530. }
  531. }
  532. // Reschedule missing hashes from alternates, not-fulfilled from alt+self
  533. for i, hash := range req.hashes {
  534. // Skip rescheduling hashes already delivered by someone else
  535. if req.stolen != nil {
  536. if _, ok := req.stolen[hash]; ok {
  537. continue
  538. }
  539. }
  540. if _, ok := delivered[hash]; !ok {
  541. if i < cutoff {
  542. delete(f.alternates[hash], delivery.origin)
  543. delete(f.announces[delivery.origin], hash)
  544. if len(f.announces[delivery.origin]) == 0 {
  545. delete(f.announces, delivery.origin)
  546. }
  547. }
  548. if len(f.alternates[hash]) > 0 {
  549. if _, ok := f.announced[hash]; ok {
  550. panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
  551. }
  552. f.announced[hash] = f.alternates[hash]
  553. }
  554. }
  555. delete(f.alternates, hash)
  556. delete(f.fetching, hash)
  557. }
  558. // Something was delivered, try to rechedule requests
  559. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
  560. }
  561. case drop := <-f.drop:
  562. // A peer was dropped, remove all traces of it
  563. if _, ok := f.waitslots[drop.peer]; ok {
  564. for hash := range f.waitslots[drop.peer] {
  565. delete(f.waitlist[hash], drop.peer)
  566. if len(f.waitlist[hash]) == 0 {
  567. delete(f.waitlist, hash)
  568. delete(f.waittime, hash)
  569. }
  570. }
  571. delete(f.waitslots, drop.peer)
  572. if len(f.waitlist) > 0 {
  573. f.rescheduleWait(waitTimer, waitTrigger)
  574. }
  575. }
  576. // Clean up any active requests
  577. var request *txRequest
  578. if request = f.requests[drop.peer]; request != nil {
  579. for _, hash := range request.hashes {
  580. // Skip rescheduling hashes already delivered by someone else
  581. if request.stolen != nil {
  582. if _, ok := request.stolen[hash]; ok {
  583. continue
  584. }
  585. }
  586. // Undelivered hash, reschedule if there's an alternative origin available
  587. delete(f.alternates[hash], drop.peer)
  588. if len(f.alternates[hash]) == 0 {
  589. delete(f.alternates, hash)
  590. } else {
  591. f.announced[hash] = f.alternates[hash]
  592. delete(f.alternates, hash)
  593. }
  594. delete(f.fetching, hash)
  595. }
  596. delete(f.requests, drop.peer)
  597. }
  598. // Clean up general announcement tracking
  599. if _, ok := f.announces[drop.peer]; ok {
  600. for hash := range f.announces[drop.peer] {
  601. delete(f.announced[hash], drop.peer)
  602. if len(f.announced[hash]) == 0 {
  603. delete(f.announced, hash)
  604. }
  605. }
  606. delete(f.announces, drop.peer)
  607. }
  608. // If a request was cancelled, check if anything needs to be rescheduled
  609. if request != nil {
  610. f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
  611. f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
  612. }
  613. case <-f.quit:
  614. return
  615. }
  616. // No idea what happened, but bump some sanity metrics
  617. txFetcherWaitingPeers.Update(int64(len(f.waitslots)))
  618. txFetcherWaitingHashes.Update(int64(len(f.waitlist)))
  619. txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
  620. txFetcherQueueingHashes.Update(int64(len(f.announced)))
  621. txFetcherFetchingPeers.Update(int64(len(f.requests)))
  622. txFetcherFetchingHashes.Update(int64(len(f.fetching)))
  623. // Loop did something, ping the step notifier if needed (tests)
  624. if f.step != nil {
  625. f.step <- struct{}{}
  626. }
  627. }
  628. }
  629. // rescheduleWait iterates over all the transactions currently in the waitlist
  630. // and schedules the movement into the fetcher for the earliest.
  631. //
  632. // The method has a granularity of 'gatherSlack', since there's not much point in
  633. // spinning over all the transactions just to maybe find one that should trigger
  634. // a few ms earlier.
  635. func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
  636. if *timer != nil {
  637. (*timer).Stop()
  638. }
  639. now := f.clock.Now()
  640. earliest := now
  641. for _, instance := range f.waittime {
  642. if earliest > instance {
  643. earliest = instance
  644. if txArriveTimeout-time.Duration(now-earliest) < gatherSlack {
  645. break
  646. }
  647. }
  648. }
  649. *timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
  650. trigger <- struct{}{}
  651. })
  652. }
  653. // rescheduleTimeout iterates over all the transactions currently in flight and
  654. // schedules a cleanup run when the first would trigger.
  655. //
  656. // The method has a granularity of 'gatherSlack', since there's not much point in
  657. // spinning over all the transactions just to maybe find one that should trigger
  658. // a few ms earlier.
  659. //
  660. // This method is a bit "flaky" "by design". In theory the timeout timer only ever
  661. // should be rescheduled if some request is pending. In practice, a timeout will
  662. // cause the timer to be rescheduled every 5 secs (until the peer comes through or
  663. // disconnects). This is a limitation of the fetcher code because we don't trac
  664. // pending requests and timed out requests separatey. Without double tracking, if
  665. // we simply didn't reschedule the timer on all-timeout then the timer would never
  666. // be set again since len(request) > 0 => something's running.
  667. func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{}) {
  668. if *timer != nil {
  669. (*timer).Stop()
  670. }
  671. now := f.clock.Now()
  672. earliest := now
  673. for _, req := range f.requests {
  674. // If this request already timed out, skip it altogether
  675. if req.hashes == nil {
  676. continue
  677. }
  678. if earliest > req.time {
  679. earliest = req.time
  680. if txFetchTimeout-time.Duration(now-earliest) < gatherSlack {
  681. break
  682. }
  683. }
  684. }
  685. *timer = f.clock.AfterFunc(txFetchTimeout-time.Duration(now-earliest), func() {
  686. trigger <- struct{}{}
  687. })
  688. }
  689. // scheduleFetches starts a batch of retrievals for all available idle peers.
  690. func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, whitelist map[string]struct{}) {
  691. // Gather the set of peers we want to retrieve from (default to all)
  692. actives := whitelist
  693. if actives == nil {
  694. actives = make(map[string]struct{})
  695. for peer := range f.announces {
  696. actives[peer] = struct{}{}
  697. }
  698. }
  699. if len(actives) == 0 {
  700. return
  701. }
  702. // For each active peer, try to schedule some transaction fetches
  703. idle := len(f.requests) == 0
  704. f.forEachPeer(actives, func(peer string) {
  705. if f.requests[peer] != nil {
  706. return // continue in the for-each
  707. }
  708. if len(f.announces[peer]) == 0 {
  709. return // continue in the for-each
  710. }
  711. hashes := make([]common.Hash, 0, maxTxRetrievals)
  712. f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
  713. if _, ok := f.fetching[hash]; !ok {
  714. // Mark the hash as fetching and stash away possible alternates
  715. f.fetching[hash] = peer
  716. if _, ok := f.alternates[hash]; ok {
  717. panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
  718. }
  719. f.alternates[hash] = f.announced[hash]
  720. delete(f.announced, hash)
  721. // Accumulate the hash and stop if the limit was reached
  722. hashes = append(hashes, hash)
  723. if len(hashes) >= maxTxRetrievals {
  724. return false // break in the for-each
  725. }
  726. }
  727. return true // continue in the for-each
  728. })
  729. // If any hashes were allocated, request them from the peer
  730. if len(hashes) > 0 {
  731. f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
  732. txRequestOutMeter.Mark(int64(len(hashes)))
  733. p := peer
  734. gopool.Submit(func() {
  735. // Try to fetch the transactions, but in case of a request
  736. // failure (e.g. peer disconnected), reschedule the hashes.
  737. if err := f.fetchTxs(p, hashes); err != nil {
  738. txRequestFailMeter.Mark(int64(len(hashes)))
  739. f.Drop(p)
  740. }
  741. })
  742. }
  743. })
  744. // If a new request was fired, schedule a timeout timer
  745. if idle && len(f.requests) > 0 {
  746. f.rescheduleTimeout(timer, timeout)
  747. }
  748. }
  749. // forEachPeer does a range loop over a map of peers in production, but during
  750. // testing it does a deterministic sorted random to allow reproducing issues.
  751. func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) {
  752. // If we're running production, use whatever Go's map gives us
  753. if f.rand == nil {
  754. for peer := range peers {
  755. do(peer)
  756. }
  757. return
  758. }
  759. // We're running the test suite, make iteration deterministic
  760. list := make([]string, 0, len(peers))
  761. for peer := range peers {
  762. list = append(list, peer)
  763. }
  764. sort.Strings(list)
  765. rotateStrings(list, f.rand.Intn(len(list)))
  766. for _, peer := range list {
  767. do(peer)
  768. }
  769. }
  770. // forEachHash does a range loop over a map of hashes in production, but during
  771. // testing it does a deterministic sorted random to allow reproducing issues.
  772. func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
  773. // If we're running production, use whatever Go's map gives us
  774. if f.rand == nil {
  775. for hash := range hashes {
  776. if !do(hash) {
  777. return
  778. }
  779. }
  780. return
  781. }
  782. // We're running the test suite, make iteration deterministic
  783. list := make([]common.Hash, 0, len(hashes))
  784. for hash := range hashes {
  785. list = append(list, hash)
  786. }
  787. sortHashes(list)
  788. rotateHashes(list, f.rand.Intn(len(list)))
  789. for _, hash := range list {
  790. if !do(hash) {
  791. return
  792. }
  793. }
  794. }
  795. // rotateStrings rotates the contents of a slice by n steps. This method is only
  796. // used in tests to simulate random map iteration but keep it deterministic.
  797. func rotateStrings(slice []string, n int) {
  798. orig := make([]string, len(slice))
  799. copy(orig, slice)
  800. for i := 0; i < len(orig); i++ {
  801. slice[i] = orig[(i+n)%len(orig)]
  802. }
  803. }
  804. // sortHashes sorts a slice of hashes. This method is only used in tests in order
  805. // to simulate random map iteration but keep it deterministic.
  806. func sortHashes(slice []common.Hash) {
  807. for i := 0; i < len(slice); i++ {
  808. for j := i + 1; j < len(slice); j++ {
  809. if bytes.Compare(slice[i][:], slice[j][:]) > 0 {
  810. slice[i], slice[j] = slice[j], slice[i]
  811. }
  812. }
  813. }
  814. }
  815. // rotateHashes rotates the contents of a slice by n steps. This method is only
  816. // used in tests to simulate random map iteration but keep it deterministic.
  817. func rotateHashes(slice []common.Hash, n int) {
  818. orig := make([]common.Hash, len(slice))
  819. copy(orig, slice)
  820. for i := 0; i < len(orig); i++ {
  821. slice[i] = orig[(i+n)%len(orig)]
  822. }
  823. }