dashboard.rs 56 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503
  1. pub mod pane;
  2. use futures::TryFutureExt;
  3. pub use pane::{PaneState, PaneContent, PaneSettings};
  4. use crate::{
  5. charts::{
  6. candlestick::CandlestickChart, footprint::FootprintChart, Message as ChartMessage
  7. },
  8. data_providers::{
  9. binance, bybit, fetcher::FetchRange, Depth, Exchange, Kline, OpenInterest, TickMultiplier, Ticker, TickerInfo, Timeframe, Trade
  10. },
  11. screen::InfoType,
  12. style,
  13. window::{self, Window},
  14. StreamType,
  15. };
  16. use super::{
  17. create_notis_column, modal::dashboard_notification,
  18. DashboardError, Notification,
  19. NotificationManager, UserTimezone,
  20. };
  21. use std::{
  22. collections::{HashMap, HashSet},
  23. vec,
  24. };
  25. use iced::{
  26. widget::{
  27. center, container,
  28. pane_grid::{self, Configuration},
  29. PaneGrid,
  30. },
  31. Element, Length, Point, Size, Task, Vector,
  32. };
  33. #[derive(Debug, Clone)]
  34. pub enum Message {
  35. Pane(window::Id, pane::Message),
  36. SavePopoutSpecs(HashMap<window::Id, (Point, Size)>),
  37. ResetLayout,
  38. ErrorOccurred(window::Id, Option<pane_grid::Pane>, DashboardError),
  39. ClearLastNotification(window::Id, pane_grid::Pane),
  40. ClearLastGlobalNotification,
  41. LayoutFetchAll,
  42. RefreshStreams,
  43. // Kline fetching
  44. FetchEvent(
  45. Option<uuid::Uuid>,
  46. Result<Vec<Kline>, String>,
  47. StreamType,
  48. pane_grid::Pane,
  49. window::Id,
  50. ),
  51. OIFetchEvent(
  52. Option<uuid::Uuid>,
  53. Result<Vec<OpenInterest>, String>,
  54. StreamType,
  55. pane_grid::Pane,
  56. window::Id,
  57. ),
  58. DistributeFetchedKlines(StreamType, Result<Vec<Kline>, String>),
  59. ChartMessage(pane_grid::Pane, window::Id, ChartMessage),
  60. // Batched trade fetching
  61. FetchTrades(
  62. window::Id,
  63. pane_grid::Pane,
  64. i64,
  65. i64,
  66. StreamType,
  67. ),
  68. DistributeFetchedTrades(
  69. window::Id,
  70. pane_grid::Pane,
  71. Vec<Trade>,
  72. StreamType,
  73. i64,
  74. ),
  75. }
  76. pub struct Dashboard {
  77. pub panes: pane_grid::State<PaneState>,
  78. pub focus: Option<(window::Id, pane_grid::Pane)>,
  79. pub popout: HashMap<window::Id, (pane_grid::State<PaneState>, (Point, Size))>,
  80. pub pane_streams: HashMap<Exchange, HashMap<Ticker, HashSet<StreamType>>>,
  81. notification_manager: NotificationManager,
  82. pub trade_fetch_enabled: bool,
  83. }
  84. impl Default for Dashboard {
  85. fn default() -> Self {
  86. Self::empty()
  87. }
  88. }
  89. impl Dashboard {
  90. fn empty() -> Self {
  91. Self {
  92. panes: pane_grid::State::with_configuration(Self::default_pane_config()),
  93. focus: None,
  94. pane_streams: HashMap::new(),
  95. notification_manager: NotificationManager::new(),
  96. popout: HashMap::new(),
  97. trade_fetch_enabled: false,
  98. }
  99. }
  100. fn default_pane_config() -> Configuration<PaneState> {
  101. Configuration::Split {
  102. axis: pane_grid::Axis::Vertical,
  103. ratio: 0.8,
  104. a: Box::new(Configuration::Split {
  105. axis: pane_grid::Axis::Horizontal,
  106. ratio: 0.4,
  107. a: Box::new(Configuration::Split {
  108. axis: pane_grid::Axis::Vertical,
  109. ratio: 0.5,
  110. a: Box::new(Configuration::Pane(PaneState {
  111. modal: pane::PaneModal::None,
  112. stream: vec![],
  113. content: PaneContent::Starter,
  114. settings: PaneSettings::default(),
  115. })),
  116. b: Box::new(Configuration::Pane(PaneState {
  117. modal: pane::PaneModal::None,
  118. stream: vec![],
  119. content: PaneContent::Starter,
  120. settings: PaneSettings::default(),
  121. })),
  122. }),
  123. b: Box::new(Configuration::Split {
  124. axis: pane_grid::Axis::Vertical,
  125. ratio: 0.5,
  126. a: Box::new(Configuration::Pane(PaneState {
  127. modal: pane::PaneModal::None,
  128. stream: vec![],
  129. content: PaneContent::Starter,
  130. settings: PaneSettings::default(),
  131. })),
  132. b: Box::new(Configuration::Pane(PaneState {
  133. modal: pane::PaneModal::None,
  134. stream: vec![],
  135. content: PaneContent::Starter,
  136. settings: PaneSettings::default(),
  137. })),
  138. }),
  139. }),
  140. b: Box::new(Configuration::Pane(PaneState {
  141. modal: pane::PaneModal::None,
  142. stream: vec![],
  143. content: PaneContent::Starter,
  144. settings: PaneSettings::default(),
  145. })),
  146. }
  147. }
  148. pub fn from_config(
  149. panes: Configuration<PaneState>,
  150. popout_windows: Vec<(Configuration<PaneState>, (Point, Size))>,
  151. trade_fetch_enabled: bool,
  152. ) -> Self {
  153. let panes = pane_grid::State::with_configuration(panes);
  154. let mut popout = HashMap::new();
  155. for (pane, specs) in popout_windows {
  156. popout.insert(
  157. window::Id::unique(),
  158. (pane_grid::State::with_configuration(pane), specs),
  159. );
  160. }
  161. Self {
  162. panes,
  163. focus: None,
  164. pane_streams: HashMap::new(),
  165. notification_manager: NotificationManager::new(),
  166. popout,
  167. trade_fetch_enabled,
  168. }
  169. }
  170. pub fn load_layout(&mut self) -> Task<Message> {
  171. let mut open_popouts_tasks: Vec<Task<Message>> = vec![];
  172. let mut new_popout: Vec<(
  173. iced::window::Id,
  174. (pane_grid::State<PaneState>, (Point, Size)),
  175. )> = Vec::new();
  176. let mut keys_to_remove: Vec<(iced::window::Id, (Point, Size))> = Vec::new();
  177. for (old_window_id, (_, specs)) in &self.popout {
  178. keys_to_remove.push((*old_window_id, *specs));
  179. }
  180. // remove keys and open new windows
  181. for (old_window_id, (pos, size)) in keys_to_remove {
  182. let (window, task) = window::open(window::Settings {
  183. position: window::Position::Specific(pos),
  184. size,
  185. exit_on_close_request: false,
  186. ..window::settings()
  187. });
  188. open_popouts_tasks.push(task.then(|_| Task::none()));
  189. if let Some((removed_pane, specs)) = self.popout.remove(&old_window_id) {
  190. new_popout.push((window, (removed_pane, specs)));
  191. }
  192. }
  193. // assign new windows to old panes
  194. for (window, (pane, specs)) in new_popout {
  195. self.popout.insert(window, (pane, specs));
  196. }
  197. Task::batch(open_popouts_tasks).chain(Task::batch(vec![
  198. Task::done(Message::RefreshStreams),
  199. Task::done(Message::LayoutFetchAll),
  200. ]))
  201. }
  202. pub fn reset_layout(&mut self) -> Task<Message> {
  203. Task::done(Message::ResetLayout)
  204. }
  205. pub fn update(&mut self, message: Message, main_window: &Window) -> Task<Message> {
  206. match message {
  207. Message::ResetLayout => {
  208. self.panes = pane_grid::State::with_configuration(Self::default_pane_config());
  209. self.focus = None;
  210. (self.popout, self.pane_streams) = (HashMap::new(), HashMap::new());
  211. }
  212. Message::SavePopoutSpecs(specs) => {
  213. for (window_id, (position, size)) in specs {
  214. if let Some((_, specs)) = self.popout.get_mut(&window_id) {
  215. *specs = (position, size);
  216. }
  217. }
  218. }
  219. Message::ClearLastNotification(window, pane) => {
  220. self.notification_manager.remove_last(&window, &pane);
  221. }
  222. Message::ClearLastGlobalNotification => {
  223. self.notification_manager.global_notifications.pop();
  224. }
  225. Message::ErrorOccurred(window, pane, err) => {
  226. if let Some(pane) = pane {
  227. self.notification_manager.handle_error(window, pane, err);
  228. return Task::perform(
  229. async { std::thread::sleep(std::time::Duration::from_secs(15)) },
  230. move |()| Message::ClearLastNotification(window, pane),
  231. );
  232. }
  233. }
  234. Message::Pane(window, message) => {
  235. match message {
  236. pane::Message::PaneClicked(pane) => {
  237. self.focus = Some((window, pane));
  238. }
  239. pane::Message::PaneResized(pane_grid::ResizeEvent { split, ratio }) => {
  240. self.panes.resize(split, ratio);
  241. }
  242. pane::Message::PaneDragged(event) => {
  243. if let pane_grid::DragEvent::Dropped { pane, target } = event {
  244. self.panes.drop(pane, target);
  245. self.focus = None;
  246. }
  247. }
  248. pane::Message::SplitPane(axis, pane) => {
  249. let focus_pane = if let Some((new_pane, _)) = self.panes.split(
  250. axis,
  251. pane,
  252. PaneState::new(vec![], PaneSettings::default()),
  253. ) {
  254. Some(new_pane)
  255. } else {
  256. None
  257. };
  258. if Some(focus_pane).is_some() {
  259. self.focus = Some((window, focus_pane.unwrap()));
  260. }
  261. }
  262. pane::Message::ClosePane(pane) => {
  263. if let Some((_, sibling)) = self.panes.close(pane) {
  264. self.focus = Some((window, sibling));
  265. }
  266. }
  267. pane::Message::MaximizePane(pane) => {
  268. self.panes.maximize(pane);
  269. }
  270. pane::Message::Restore => {
  271. self.panes.restore();
  272. }
  273. pane::Message::ReplacePane(pane) => {
  274. if let Some(pane) = self.panes.get_mut(pane) {
  275. *pane = PaneState::new(vec![], PaneSettings::default());
  276. }
  277. }
  278. pane::Message::ToggleModal(pane, modal_type) => {
  279. if let Some(pane) = self.get_mut_pane(main_window.id, window, pane) {
  280. if modal_type == pane.modal {
  281. pane.modal = pane::PaneModal::None;
  282. } else {
  283. pane.modal = modal_type;
  284. }
  285. };
  286. }
  287. pane::Message::ChartUserUpdate(pane, chart_message) => {
  288. return self.update_chart_state(
  289. pane,
  290. window,
  291. &chart_message,
  292. main_window.id,
  293. );
  294. }
  295. pane::Message::SliderChanged(pane, value, is_trade_filter) => {
  296. return self.set_pane_size_filter(
  297. window,
  298. pane,
  299. value,
  300. is_trade_filter,
  301. main_window.id,
  302. );
  303. }
  304. pane::Message::InitPaneContent(
  305. window,
  306. content_str,
  307. is_pane,
  308. pane_stream,
  309. ticker_info,
  310. ) => {
  311. let pane;
  312. if let Some(parent_pane) = is_pane {
  313. pane = parent_pane;
  314. } else {
  315. pane = self.panes.iter().next().map(|(pane, _)| *pane).unwrap();
  316. }
  317. let err_occurred = |err| {
  318. Task::done(Message::ErrorOccurred(window, Some(pane), err))
  319. };
  320. // set pane's stream and content identifiers
  321. if let Some(pane_state) = self.get_mut_pane(main_window.id, window, pane) {
  322. if let Err(err) = pane_state.set_content(
  323. ticker_info,
  324. &content_str,
  325. ) {
  326. return err_occurred(err);
  327. }
  328. } else {
  329. return err_occurred(DashboardError::PaneSet(
  330. "No pane found".to_string()
  331. ));
  332. }
  333. // prepare unique streams for websocket
  334. for stream in &pane_stream {
  335. match stream {
  336. StreamType::Kline {
  337. exchange, ticker, ..
  338. }
  339. | StreamType::DepthAndTrades { exchange, ticker } => {
  340. self.pane_streams
  341. .entry(*exchange)
  342. .or_default()
  343. .entry(*ticker)
  344. .or_default()
  345. .insert(*stream);
  346. }
  347. _ => {}
  348. }
  349. }
  350. log::info!("{:?}", &self.pane_streams);
  351. // get fetch tasks for pane's content
  352. if ["footprint", "candlestick", "heatmap"]
  353. .contains(&content_str.as_str())
  354. {
  355. for stream in &pane_stream {
  356. if let StreamType::Kline { .. } = stream {
  357. if ["candlestick", "footprint"]
  358. .contains(&content_str.as_str())
  359. {
  360. return get_kline_fetch_task(
  361. window, pane, *stream, None, None,
  362. );
  363. }
  364. }
  365. }
  366. }
  367. }
  368. pane::Message::TimeframeSelected(timeframe, pane) => {
  369. self.notification_manager.clear(&window, &pane);
  370. match self.set_pane_timeframe(main_window.id, window, pane, timeframe) {
  371. Ok(stream_type) => {
  372. if let StreamType::Kline { .. } = stream_type {
  373. let task = get_kline_fetch_task(
  374. window,
  375. pane,
  376. *stream_type,
  377. None,
  378. None,
  379. );
  380. self.notification_manager.push(
  381. window,
  382. pane,
  383. Notification::Info(InfoType::FetchingKlines),
  384. );
  385. return Task::done(Message::RefreshStreams)
  386. .chain(task);
  387. }
  388. }
  389. Err(err) => {
  390. return Task::done(
  391. Message::ErrorOccurred(window, Some(pane), err)
  392. );
  393. }
  394. }
  395. }
  396. pane::Message::TicksizeSelected(tick_multiply, pane) => {
  397. self.notification_manager.clear(&window, &pane);
  398. return self.set_pane_ticksize(main_window.id, window, pane, tick_multiply);
  399. }
  400. pane::Message::Popout => return self.popout_pane(main_window),
  401. pane::Message::Merge => return self.merge_pane(main_window),
  402. pane::Message::ToggleIndicator(pane, indicator_str) => {
  403. if let Some(pane_state) = self.get_mut_pane(main_window.id, window, pane) {
  404. pane_state.content.toggle_indicator(indicator_str);
  405. }
  406. }
  407. pane::Message::HideNotification(pane, notification) => {
  408. self.notification_manager.find_and_remove(window, pane, notification);
  409. }
  410. }
  411. }
  412. Message::FetchEvent(req_id, klines, pane_stream, pane_id, window) => {
  413. self.notification_manager.remove_info_type(
  414. window,
  415. &pane_id,
  416. &InfoType::FetchingKlines,
  417. );
  418. match klines {
  419. Ok(klines) => {
  420. if let StreamType::Kline { timeframe, .. } = pane_stream {
  421. if let Some(pane_state) =
  422. self.get_mut_pane(main_window.id, window, pane_id)
  423. {
  424. pane_state.insert_klines_vec(req_id, timeframe, &klines);
  425. }
  426. }
  427. }
  428. Err(err) => {
  429. return Task::done(Message::ErrorOccurred(
  430. window,
  431. Some(pane_id),
  432. DashboardError::Fetch(err)
  433. ));
  434. }
  435. }
  436. }
  437. Message::OIFetchEvent(req_id, oi, pane_stream, pane_id, window) => {
  438. self.notification_manager.remove_info_type(
  439. window,
  440. &pane_id,
  441. &InfoType::FetchingOI,
  442. );
  443. if let Some(pane_state) =
  444. self.get_mut_pane(main_window.id, window, pane_id)
  445. {
  446. match oi {
  447. Ok(oi) => {
  448. if let StreamType::Kline { .. } = pane_stream {
  449. pane_state.insert_oi_vec(req_id, oi);
  450. }
  451. }
  452. Err(err) => {
  453. return Task::done(Message::ErrorOccurred(
  454. window,
  455. Some(pane_id),
  456. DashboardError::Fetch(err),
  457. ))
  458. }
  459. }
  460. }
  461. }
  462. Message::LayoutFetchAll => {
  463. let mut fetched_panes = vec![];
  464. self.iter_all_panes(main_window.id)
  465. .for_each(|(window, pane, pane_state)| match pane_state.content {
  466. PaneContent::Candlestick(_, _) | PaneContent::Footprint(_, _) => {
  467. fetched_panes.push((window, pane));
  468. }
  469. _ => {}
  470. });
  471. for (window, pane) in fetched_panes {
  472. self.notification_manager.push(
  473. window,
  474. pane,
  475. Notification::Info(InfoType::FetchingKlines),
  476. );
  477. }
  478. return Task::batch(klines_fetch_all_task(&self.pane_streams));
  479. }
  480. Message::DistributeFetchedKlines(stream_type, klines) => match klines {
  481. Ok(klines) => {
  482. let mut inserted_panes = vec![];
  483. self.iter_all_panes_mut(main_window.id)
  484. .for_each(|(window, pane, state)| {
  485. if state.matches_stream(&stream_type) {
  486. if let StreamType::Kline { timeframe, .. } = stream_type {
  487. match &mut state.content {
  488. PaneContent::Candlestick(chart, indicators) => {
  489. let tick_size = chart.get_tick_size();
  490. *chart = CandlestickChart::new(
  491. chart.get_chart_layout(),
  492. klines.clone(),
  493. timeframe,
  494. tick_size,
  495. indicators,
  496. );
  497. }
  498. PaneContent::Footprint(chart, indicators) => {
  499. let (raw_trades, tick_size) =
  500. (chart.get_raw_trades(), chart.get_tick_size());
  501. *chart = FootprintChart::new(
  502. chart.get_chart_layout(),
  503. timeframe,
  504. tick_size,
  505. klines.clone(),
  506. raw_trades,
  507. indicators,
  508. );
  509. }
  510. _ => {}
  511. }
  512. inserted_panes.push((window, pane));
  513. }
  514. }
  515. });
  516. for (window, pane) in inserted_panes {
  517. self.notification_manager.remove_info_type(
  518. window,
  519. &pane,
  520. &InfoType::FetchingKlines,
  521. );
  522. }
  523. }
  524. Err(err) => {
  525. log::error!("{err}");
  526. }
  527. }
  528. Message::FetchTrades(
  529. window_id,
  530. pane,
  531. from_time,
  532. to_time,
  533. stream_type,
  534. ) => {
  535. if let StreamType::DepthAndTrades { exchange, ticker } = stream_type {
  536. if exchange == Exchange::BinanceFutures || exchange == Exchange::BinanceSpot {
  537. return Task::perform(
  538. binance::fetch_trades(ticker, from_time),
  539. move |result| match result {
  540. Ok(trades) => Message::DistributeFetchedTrades(
  541. window_id,
  542. pane,
  543. trades,
  544. stream_type,
  545. to_time,
  546. ),
  547. Err(err) => Message::ErrorOccurred(
  548. window_id,
  549. Some(pane),
  550. DashboardError::Fetch(err.to_string()),
  551. ),
  552. },
  553. );
  554. } else {
  555. self.notification_manager.remove_info_type(
  556. window_id,
  557. &pane,
  558. &InfoType::FetchingTrades(0),
  559. );
  560. return Task::done(Message::ErrorOccurred(
  561. window_id,
  562. Some(pane),
  563. DashboardError::Fetch(format!(
  564. "No trade fetch support for {exchange:?}"
  565. )),
  566. ));
  567. }
  568. }
  569. }
  570. Message::DistributeFetchedTrades(
  571. window_id,
  572. pane,
  573. trades,
  574. stream_type,
  575. to_time,
  576. ) => {
  577. let last_trade_time = trades.last()
  578. .map_or(0, |trade| trade.time);
  579. self.notification_manager.increment_fetching_trades(
  580. window_id,
  581. &pane,
  582. trades.len(),
  583. );
  584. if last_trade_time < to_time {
  585. match self.insert_fetched_trades(
  586. main_window.id,
  587. window_id,
  588. pane,
  589. &trades,
  590. false,
  591. ) {
  592. Ok(_) => {
  593. return Task::done(Message::FetchTrades(
  594. window_id,
  595. pane,
  596. last_trade_time,
  597. to_time,
  598. stream_type,
  599. ));
  600. }
  601. Err(err) => {
  602. self.notification_manager.remove_info_type(
  603. window_id,
  604. &pane,
  605. &InfoType::FetchingTrades(0),
  606. );
  607. return Task::done(
  608. Message::ErrorOccurred(window_id, Some(pane), err)
  609. );
  610. }
  611. }
  612. } else {
  613. self.notification_manager.remove_info_type(
  614. window_id,
  615. &pane,
  616. &InfoType::FetchingTrades(0),
  617. );
  618. match self.insert_fetched_trades(
  619. main_window.id,
  620. window_id,
  621. pane,
  622. &trades,
  623. true,
  624. ) {
  625. Ok(_) => {}
  626. Err(err) => {
  627. return Task::done(
  628. Message::ErrorOccurred(window_id, Some(pane), err)
  629. );
  630. }
  631. }
  632. }
  633. }
  634. Message::RefreshStreams => {
  635. self.pane_streams = self.get_all_diff_streams(main_window.id);
  636. }
  637. Message::ChartMessage(pane, window, message) => {
  638. if let ChartMessage::NewDataRange(req_id, fetch) = message {
  639. match fetch {
  640. FetchRange::Kline(from, to) => {
  641. let kline_stream = self
  642. .get_pane(main_window.id, window, pane)
  643. .and_then(|pane| {
  644. pane.stream
  645. .iter()
  646. .find(|stream| matches!(stream, StreamType::Kline { .. }))
  647. });
  648. if let Some(stream) = kline_stream {
  649. let stream = *stream;
  650. self.notification_manager.push(
  651. window,
  652. pane,
  653. Notification::Info(InfoType::FetchingKlines),
  654. );
  655. return get_kline_fetch_task(
  656. window,
  657. pane,
  658. stream,
  659. Some(req_id),
  660. Some((from, to)),
  661. );
  662. }
  663. }
  664. FetchRange::OpenInterest(from, to) => {
  665. let kline_stream = self
  666. .get_pane(main_window.id, window, pane)
  667. .and_then(|pane| {
  668. pane.stream
  669. .iter()
  670. .find(|stream| matches!(stream, StreamType::Kline { .. }))
  671. });
  672. if let Some(stream) = kline_stream {
  673. let stream = *stream;
  674. self.notification_manager.push(
  675. window,
  676. pane,
  677. Notification::Info(InfoType::FetchingOI),
  678. );
  679. return get_oi_fetch_task(
  680. window,
  681. pane,
  682. stream,
  683. Some(req_id),
  684. Some((from, to)),
  685. );
  686. }
  687. }
  688. FetchRange::Trades(from, to) => {
  689. if !self.trade_fetch_enabled {
  690. return Task::none();
  691. }
  692. let trade_stream = self
  693. .get_pane(main_window.id, window, pane)
  694. .and_then(|pane| {
  695. pane.stream.iter().find(|stream| {
  696. matches!(stream, StreamType::DepthAndTrades { .. })
  697. })
  698. });
  699. if let Some(stream) = trade_stream {
  700. let stream = *stream;
  701. self.notification_manager.push(
  702. window,
  703. pane,
  704. Notification::Info(InfoType::FetchingTrades(0)),
  705. );
  706. return Task::done(Message::FetchTrades(
  707. window,
  708. pane,
  709. from,
  710. to,
  711. stream,
  712. ));
  713. }
  714. }
  715. }
  716. }
  717. }
  718. }
  719. Task::none()
  720. }
  721. fn new_pane(
  722. &mut self,
  723. axis: pane_grid::Axis,
  724. main_window: &Window,
  725. pane_state: Option<PaneState>,
  726. ) -> Task<Message> {
  727. if self
  728. .focus
  729. .filter(|(window, _)| *window == main_window.id)
  730. .is_some()
  731. {
  732. // If there is any focused pane on main window, split it
  733. return self.split_pane(axis, main_window);
  734. } else {
  735. // If there is no focused pane, split the last pane or create a new empty grid
  736. let pane = self.panes.iter().last().map(|(pane, _)| pane).copied();
  737. if let Some(pane) = pane {
  738. let result = self.panes.split(
  739. axis,
  740. pane,
  741. pane_state.unwrap_or(PaneState::new(vec![], PaneSettings::default())),
  742. );
  743. if let Some((pane, _)) = result {
  744. return self.focus_pane(main_window.id, pane);
  745. }
  746. } else {
  747. let (state, pane) = pane_grid::State::new(
  748. pane_state.unwrap_or(PaneState::new(vec![], PaneSettings::default())),
  749. );
  750. self.panes = state;
  751. return self.focus_pane(main_window.id, pane);
  752. }
  753. }
  754. Task::none()
  755. }
  756. fn focus_pane(&mut self, window: window::Id, pane: pane_grid::Pane) -> Task<Message> {
  757. if self.focus != Some((window, pane)) {
  758. self.focus = Some((window, pane));
  759. }
  760. Task::none()
  761. }
  762. fn split_pane(&mut self, axis: pane_grid::Axis, main_window: &Window) -> Task<Message> {
  763. if let Some((window, pane)) = self.focus {
  764. if window == main_window.id {
  765. let result =
  766. self.panes
  767. .split(axis, pane, PaneState::new(vec![], PaneSettings::default()));
  768. if let Some((pane, _)) = result {
  769. return self.focus_pane(main_window.id, pane);
  770. }
  771. }
  772. }
  773. Task::none()
  774. }
  775. fn popout_pane(&mut self, main_window: &Window) -> Task<Message> {
  776. if let Some((_, id)) = self.focus.take() {
  777. if let Some((pane, _)) = self.panes.close(id) {
  778. let (window, task) = window::open(window::Settings {
  779. position: main_window
  780. .position
  781. .map(|point| window::Position::Specific(point + Vector::new(20.0, 20.0)))
  782. .unwrap_or_default(),
  783. exit_on_close_request: false,
  784. ..window::settings()
  785. });
  786. let (state, id) = pane_grid::State::new(pane);
  787. self.popout.insert(
  788. window,
  789. (state, (Point::new(0.0, 0.0), Size::new(1024.0, 768.0))),
  790. );
  791. return task.then(move |window| {
  792. Task::done(Message::Pane(window, pane::Message::PaneClicked(id)))
  793. });
  794. }
  795. }
  796. Task::none()
  797. }
  798. fn merge_pane(&mut self, main_window: &Window) -> Task<Message> {
  799. if let Some((window, pane)) = self.focus.take() {
  800. if let Some(pane_state) = self
  801. .popout
  802. .remove(&window)
  803. .and_then(|(mut panes, _)| panes.panes.remove(&pane))
  804. {
  805. let task =
  806. self.new_pane(pane_grid::Axis::Horizontal, main_window, Some(pane_state));
  807. return Task::batch(vec![window::close(window), task]);
  808. }
  809. }
  810. Task::none()
  811. }
  812. fn get_pane(
  813. &self,
  814. main_window: window::Id,
  815. window: window::Id,
  816. pane: pane_grid::Pane,
  817. ) -> Option<&PaneState> {
  818. if main_window == window {
  819. self.panes.get(pane)
  820. } else {
  821. self.popout
  822. .get(&window)
  823. .and_then(|(panes, _)| panes.get(pane))
  824. }
  825. }
  826. fn get_mut_pane(
  827. &mut self,
  828. main_window: window::Id,
  829. window: window::Id,
  830. pane: pane_grid::Pane,
  831. ) -> Option<&mut PaneState> {
  832. if main_window == window {
  833. self.panes.get_mut(pane)
  834. } else {
  835. self.popout
  836. .get_mut(&window)
  837. .and_then(|(panes, _)| panes.get_mut(pane))
  838. }
  839. }
  840. fn iter_all_panes(
  841. &self,
  842. main_window: window::Id,
  843. ) -> impl Iterator<Item = (window::Id, pane_grid::Pane, &PaneState)> {
  844. self.panes
  845. .iter()
  846. .map(move |(pane, state)| (main_window, *pane, state))
  847. .chain(self.popout.iter().flat_map(|(window_id, (panes, _))| {
  848. panes.iter().map(|(pane, state)| (*window_id, *pane, state))
  849. }))
  850. }
  851. fn iter_all_panes_mut(
  852. &mut self,
  853. main_window: window::Id,
  854. ) -> impl Iterator<Item = (window::Id, pane_grid::Pane, &mut PaneState)> {
  855. self.panes
  856. .iter_mut()
  857. .map(move |(pane, state)| (main_window, *pane, state))
  858. .chain(self.popout.iter_mut().flat_map(|(window_id, (panes, _))| {
  859. panes
  860. .iter_mut()
  861. .map(|(pane, state)| (*window_id, *pane, state))
  862. }))
  863. }
  864. pub fn view<'a>(
  865. &'a self,
  866. main_window: &'a Window,
  867. layout_locked: bool,
  868. timezone: &'a UserTimezone,
  869. ) -> Element<'a, Message> {
  870. let focus = self.focus;
  871. let mut pane_grid = PaneGrid::new(&self.panes, |id, pane, maximized| {
  872. let is_focused = !layout_locked && focus == Some((main_window.id, id));
  873. pane.view(
  874. id,
  875. self.panes.len(),
  876. is_focused,
  877. maximized,
  878. main_window.id,
  879. main_window,
  880. timezone,
  881. self.notification_manager.get(&main_window.id, &id),
  882. )
  883. })
  884. .spacing(6)
  885. .style(style::pane_grid);
  886. if !layout_locked {
  887. pane_grid = pane_grid
  888. .on_click(pane::Message::PaneClicked)
  889. .on_resize(8, pane::Message::PaneResized)
  890. .on_drag(pane::Message::PaneDragged);
  891. }
  892. let pane_grid: Element<_> = pane_grid.into();
  893. let base = container(pane_grid.map(move |message| Message::Pane(main_window.id, message)));
  894. if !self.notification_manager.global_notifications.is_empty() {
  895. dashboard_notification(
  896. base,
  897. create_notis_column(
  898. &self.notification_manager.global_notifications,
  899. Message::ClearLastGlobalNotification,
  900. ),
  901. )
  902. } else {
  903. base.into()
  904. }
  905. }
  906. pub fn view_window<'a>(
  907. &'a self,
  908. window: window::Id,
  909. main_window: &'a Window,
  910. layout_locked: bool,
  911. timezone: &'a UserTimezone,
  912. ) -> Element<'a, Message> {
  913. if let Some((state, _)) = self.popout.get(&window) {
  914. let content = container({
  915. let mut pane_grid = PaneGrid::new(state, |id, pane, _maximized| {
  916. let is_focused = self.focus == Some((window, id));
  917. pane.view(
  918. id,
  919. state.len(),
  920. is_focused,
  921. false,
  922. window,
  923. main_window,
  924. timezone,
  925. self.notification_manager.get(&window, &id),
  926. )
  927. });
  928. if !layout_locked {
  929. pane_grid = pane_grid.on_click(pane::Message::PaneClicked);
  930. }
  931. pane_grid
  932. })
  933. .width(Length::Fill)
  934. .height(Length::Fill)
  935. .padding(8);
  936. return Element::new(content).map(move |message| Message::Pane(window, message));
  937. } else {
  938. return Element::new(center("No pane found for window"))
  939. .map(move |message| Message::Pane(window, message));
  940. }
  941. }
  942. fn set_pane_ticksize(
  943. &mut self,
  944. main_window: window::Id,
  945. window: window::Id,
  946. pane: pane_grid::Pane,
  947. new_tick_multiply: TickMultiplier,
  948. ) -> Task<Message> {
  949. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  950. pane_state.settings.tick_multiply = Some(new_tick_multiply);
  951. if let Some(ticker_info) = pane_state.settings.ticker_info {
  952. match pane_state.content {
  953. PaneContent::Footprint(ref mut chart, _) => {
  954. chart.change_tick_size(
  955. new_tick_multiply.multiply_with_min_tick_size(ticker_info),
  956. );
  957. chart.reset_request_handler();
  958. }
  959. PaneContent::Heatmap(ref mut chart, _) => {
  960. chart.change_tick_size(
  961. new_tick_multiply.multiply_with_min_tick_size(ticker_info),
  962. );
  963. }
  964. _ => {
  965. return Task::done(Message::ErrorOccurred(
  966. window,
  967. Some(pane),
  968. DashboardError::PaneSet(
  969. "No chart found to change ticksize".to_string(),
  970. ),
  971. ));
  972. }
  973. }
  974. } else {
  975. return Task::done(Message::ErrorOccurred(
  976. window,
  977. Some(pane),
  978. DashboardError::PaneSet("No min ticksize found".to_string()),
  979. ));
  980. }
  981. } else {
  982. return Task::done(Message::ErrorOccurred(
  983. window,
  984. Some(pane),
  985. DashboardError::PaneSet("No pane found to change ticksize".to_string()),
  986. ));
  987. }
  988. Task::none()
  989. }
  990. fn set_pane_timeframe(
  991. &mut self,
  992. main_window: window::Id,
  993. window: window::Id,
  994. pane: pane_grid::Pane,
  995. new_timeframe: Timeframe,
  996. ) -> Result<&StreamType, DashboardError> {
  997. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  998. pane_state.settings.selected_timeframe = Some(new_timeframe);
  999. if let Some(stream_type) = pane_state
  1000. .stream
  1001. .iter_mut()
  1002. .find(|stream_type| matches!(stream_type, StreamType::Kline { .. }))
  1003. {
  1004. if let StreamType::Kline { timeframe, .. } = stream_type {
  1005. *timeframe = new_timeframe;
  1006. }
  1007. match &mut pane_state.content {
  1008. PaneContent::Candlestick(chart, _) => {
  1009. chart.set_loading_state(true);
  1010. return Ok(stream_type);
  1011. }
  1012. PaneContent::Footprint(chart, _) => {
  1013. chart.set_loading_state(true);
  1014. return Ok(stream_type);
  1015. }
  1016. _ => {}
  1017. }
  1018. }
  1019. }
  1020. Err(DashboardError::Unknown(
  1021. "Couldn't get the pane to change its timeframe".to_string(),
  1022. ))
  1023. }
  1024. fn set_pane_size_filter(
  1025. &mut self,
  1026. window: window::Id,
  1027. pane: pane_grid::Pane,
  1028. new_size_filter: f32,
  1029. is_trade_filter: bool,
  1030. main_window: window::Id,
  1031. ) -> Task<Message> {
  1032. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  1033. pane_state.settings.trade_size_filter = Some(new_size_filter);
  1034. match pane_state.content {
  1035. PaneContent::Heatmap(ref mut chart, _) => {
  1036. chart.set_size_filter(new_size_filter, is_trade_filter);
  1037. }
  1038. PaneContent::TimeAndSales(ref mut chart) => {
  1039. chart.set_size_filter(new_size_filter);
  1040. }
  1041. _ => {
  1042. return Task::done(Message::ErrorOccurred(
  1043. window,
  1044. Some(pane),
  1045. DashboardError::Unknown("No chart found to set size filter".to_string()),
  1046. ));
  1047. }
  1048. }
  1049. Task::none()
  1050. } else {
  1051. Task::done(Message::ErrorOccurred(
  1052. window,
  1053. Some(pane),
  1054. DashboardError::Unknown("No pane found to set size filter".to_string()),
  1055. ))
  1056. }
  1057. }
  1058. pub fn init_pane_task(
  1059. &mut self,
  1060. main_window: window::Id,
  1061. ticker: (Ticker, TickerInfo),
  1062. exchange: Exchange,
  1063. content: &str,
  1064. ) -> Task<Message> {
  1065. if let Some((window, selected_pane)) = self.focus {
  1066. if let Some(pane_state) = self.get_mut_pane(main_window, window, selected_pane) {
  1067. return pane_state
  1068. .init_content_task(content, exchange, ticker, selected_pane, window)
  1069. .map(move |message| Message::Pane(window, message));
  1070. }
  1071. } else {
  1072. self.notification_manager
  1073. .global_notifications
  1074. .push(Notification::Warn("Select a pane first".to_string()));
  1075. return Task::perform(
  1076. async { std::thread::sleep(std::time::Duration::from_secs(8)) },
  1077. move |()| Message::ClearLastGlobalNotification,
  1078. );
  1079. }
  1080. Task::none()
  1081. }
  1082. pub fn toggle_trade_fetch(&mut self, is_enabled: bool, main_window: &Window) {
  1083. self.trade_fetch_enabled = is_enabled;
  1084. self.iter_all_panes_mut(main_window.id)
  1085. .for_each(|(_, _, pane_state)| {
  1086. if let PaneContent::Footprint(chart, _) = &mut pane_state.content {
  1087. chart.reset_request_handler();
  1088. }
  1089. });
  1090. }
  1091. fn insert_fetched_trades(
  1092. &mut self,
  1093. main_window: window::Id,
  1094. window: window::Id,
  1095. pane: pane_grid::Pane,
  1096. trades: &[Trade],
  1097. is_batches_done: bool,
  1098. ) -> Result<(), DashboardError> {
  1099. self.get_mut_pane(main_window, window, pane)
  1100. .map_or_else(
  1101. || Err(
  1102. DashboardError::Unknown("Couldnt get the pane for fetched trades".to_string())
  1103. ),
  1104. |pane_state| match &mut pane_state.content {
  1105. PaneContent::Footprint(chart, _) => {
  1106. chart.insert_trades(trades.to_owned(), is_batches_done);
  1107. Ok(())
  1108. }
  1109. _ => Err(
  1110. DashboardError::Unknown("No matching chart found for fetched trades".to_string())
  1111. ),
  1112. }
  1113. )
  1114. }
  1115. pub fn update_latest_klines(
  1116. &mut self,
  1117. stream_type: &StreamType,
  1118. kline: &Kline,
  1119. main_window: window::Id,
  1120. ) -> Task<Message> {
  1121. let mut tasks = vec![];
  1122. let mut found_match = false;
  1123. self.iter_all_panes_mut(main_window)
  1124. .for_each(|(window, pane, pane_state)| {
  1125. if pane_state.matches_stream(stream_type) {
  1126. match &mut pane_state.content {
  1127. PaneContent::Candlestick(chart, _) => tasks.push(
  1128. chart
  1129. .update_latest_kline(kline)
  1130. .map(move |message| Message::ChartMessage(pane, window, message)),
  1131. ),
  1132. PaneContent::Footprint(chart, _) => tasks.push(
  1133. chart
  1134. .update_latest_kline(kline)
  1135. .map(move |message| Message::ChartMessage(pane, window, message)),
  1136. ),
  1137. _ => {}
  1138. }
  1139. found_match = true;
  1140. }
  1141. });
  1142. if !found_match {
  1143. log::error!("No matching pane found for the stream: {stream_type:?}");
  1144. tasks.push(Task::done(Message::RefreshStreams));
  1145. }
  1146. Task::batch(tasks)
  1147. }
  1148. pub fn update_depth_and_trades(
  1149. &mut self,
  1150. stream_type: &StreamType,
  1151. depth_update_t: i64,
  1152. depth: Depth,
  1153. trades_buffer: Vec<Trade>,
  1154. main_window: window::Id,
  1155. ) -> Task<Message> {
  1156. let mut found_match = false;
  1157. let trades_buffer = trades_buffer.into_boxed_slice();
  1158. self.iter_all_panes_mut(main_window)
  1159. .for_each(|(_, _, pane_state)| {
  1160. if pane_state.matches_stream(stream_type) {
  1161. match &mut pane_state.content {
  1162. PaneContent::Heatmap(chart, _) => {
  1163. chart.insert_datapoint(&trades_buffer, depth_update_t, &depth);
  1164. }
  1165. PaneContent::Footprint(chart, _) => {
  1166. chart.insert_datapoint(&trades_buffer, depth_update_t);
  1167. }
  1168. PaneContent::TimeAndSales(chart) => {
  1169. chart.update(&trades_buffer);
  1170. }
  1171. _ => {
  1172. log::error!("No chart found for the stream: {stream_type:?}");
  1173. }
  1174. }
  1175. found_match = true;
  1176. }
  1177. });
  1178. if found_match {
  1179. Task::none()
  1180. } else {
  1181. log::error!("No matching pane found for the stream: {stream_type:?}");
  1182. Task::done(Message::RefreshStreams)
  1183. }
  1184. }
  1185. fn update_chart_state(
  1186. &mut self,
  1187. pane: pane_grid::Pane,
  1188. window: window::Id,
  1189. chart_message: &ChartMessage,
  1190. main_window: window::Id,
  1191. ) -> Task<Message> {
  1192. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  1193. match pane_state.content {
  1194. PaneContent::Heatmap(ref mut chart, _) => chart
  1195. .update(chart_message)
  1196. .map(move |message| Message::ChartMessage(pane, window, message)),
  1197. PaneContent::Footprint(ref mut chart, _) => chart
  1198. .update(chart_message)
  1199. .map(move |message| Message::ChartMessage(pane, window, message)),
  1200. PaneContent::Candlestick(ref mut chart, _) => chart
  1201. .update(chart_message)
  1202. .map(move |message| Message::ChartMessage(pane, window, message)),
  1203. _ => Task::done(Message::ErrorOccurred(
  1204. window,
  1205. Some(pane),
  1206. DashboardError::Unknown("No chart found".to_string()),
  1207. )),
  1208. }
  1209. } else {
  1210. Task::done(Message::ErrorOccurred(
  1211. window,
  1212. Some(pane),
  1213. DashboardError::Unknown("No pane found to update its state".to_string()),
  1214. ))
  1215. }
  1216. }
  1217. fn get_all_diff_streams(
  1218. &mut self,
  1219. main_window: window::Id,
  1220. ) -> HashMap<Exchange, HashMap<Ticker, HashSet<StreamType>>> {
  1221. let mut pane_streams = HashMap::new();
  1222. self.iter_all_panes_mut(main_window)
  1223. .for_each(|(_, _, pane_state)| {
  1224. for stream_type in &pane_state.stream {
  1225. match stream_type {
  1226. StreamType::Kline {
  1227. exchange,
  1228. ticker,
  1229. timeframe,
  1230. } => {
  1231. let exchange = *exchange;
  1232. let ticker = *ticker;
  1233. let timeframe = *timeframe;
  1234. let exchange_map =
  1235. pane_streams.entry(exchange).or_insert(HashMap::new());
  1236. let ticker_map = exchange_map.entry(ticker).or_insert(HashSet::new());
  1237. ticker_map.insert(StreamType::Kline {
  1238. exchange,
  1239. ticker,
  1240. timeframe,
  1241. });
  1242. }
  1243. StreamType::DepthAndTrades { exchange, ticker } => {
  1244. let exchange = *exchange;
  1245. let ticker = *ticker;
  1246. let exchange_map =
  1247. pane_streams.entry(exchange).or_insert(HashMap::new());
  1248. let ticker_map = exchange_map.entry(ticker).or_insert(HashSet::new());
  1249. ticker_map.insert(StreamType::DepthAndTrades { exchange, ticker });
  1250. }
  1251. _ => {}
  1252. }
  1253. }
  1254. });
  1255. self.pane_streams.clone_from(&pane_streams);
  1256. pane_streams
  1257. }
  1258. }
  1259. fn get_oi_fetch_task(
  1260. window_id: window::Id,
  1261. pane: pane_grid::Pane,
  1262. stream: StreamType,
  1263. req_id: Option<uuid::Uuid>,
  1264. from_to_time: Option<(i64, i64)>,
  1265. ) -> Task<Message> {
  1266. match stream {
  1267. StreamType::Kline {
  1268. exchange,
  1269. ticker,
  1270. timeframe,
  1271. } => match exchange {
  1272. Exchange::BinanceFutures => Task::perform(
  1273. binance::fetch_historical_oi(ticker, from_to_time, timeframe)
  1274. .map_err(|err| format!("{err}")),
  1275. move |oi| Message::OIFetchEvent(req_id, oi, stream, pane, window_id),
  1276. ),
  1277. Exchange::BybitLinear => Task::perform(
  1278. bybit::fetch_historical_oi(ticker, from_to_time, timeframe)
  1279. .map_err(|err| format!("{err}")),
  1280. move |oi| Message::OIFetchEvent(req_id, oi, stream, pane, window_id),
  1281. ),
  1282. _ => {
  1283. log::error!("No OI fetch support for {exchange:?}");
  1284. Task::none()
  1285. },
  1286. },
  1287. _ => Task::none(),
  1288. }
  1289. }
  1290. fn get_kline_fetch_task(
  1291. window_id: window::Id,
  1292. pane: pane_grid::Pane,
  1293. stream: StreamType,
  1294. req_id: Option<uuid::Uuid>,
  1295. range: Option<(i64, i64)>,
  1296. ) -> Task<Message> {
  1297. match stream {
  1298. StreamType::Kline {
  1299. exchange,
  1300. ticker,
  1301. timeframe,
  1302. } => match exchange {
  1303. Exchange::BinanceFutures | Exchange::BinanceSpot => Task::perform(
  1304. binance::fetch_klines(ticker, timeframe, range)
  1305. .map_err(|err| format!("{err}")),
  1306. move |klines| Message::FetchEvent(req_id, klines, stream, pane, window_id),
  1307. ),
  1308. Exchange::BybitLinear | Exchange::BybitSpot => Task::perform(
  1309. bybit::fetch_klines(ticker, timeframe, range)
  1310. .map_err(|err| format!("{err}")),
  1311. move |klines| Message::FetchEvent(req_id, klines, stream, pane, window_id),
  1312. ),
  1313. },
  1314. _ => Task::none(),
  1315. }
  1316. }
  1317. fn klines_fetch_all_task(
  1318. streams: &HashMap<Exchange, HashMap<Ticker, HashSet<StreamType>>>,
  1319. ) -> Vec<Task<Message>> {
  1320. let mut tasks: Vec<Task<Message>> = vec![];
  1321. for (exchange, stream) in streams {
  1322. let mut kline_fetches = Vec::new();
  1323. for stream_types in stream.values() {
  1324. for stream_type in stream_types {
  1325. if let StreamType::Kline {
  1326. ticker, timeframe, ..
  1327. } = stream_type
  1328. {
  1329. kline_fetches.push((*ticker, *timeframe));
  1330. }
  1331. }
  1332. }
  1333. for (ticker, timeframe) in kline_fetches {
  1334. let (ticker, timeframe) = (ticker, timeframe);
  1335. let exchange = *exchange;
  1336. match exchange {
  1337. Exchange::BinanceFutures | Exchange::BinanceSpot => {
  1338. let fetch_klines = Task::perform(
  1339. binance::fetch_klines(ticker, timeframe, None)
  1340. .map_err(|err| format!("{err}")),
  1341. move |klines| {
  1342. Message::DistributeFetchedKlines(
  1343. StreamType::Kline {
  1344. exchange,
  1345. ticker,
  1346. timeframe,
  1347. },
  1348. klines,
  1349. )
  1350. },
  1351. );
  1352. tasks.push(fetch_klines);
  1353. }
  1354. Exchange::BybitLinear | Exchange::BybitSpot => {
  1355. let fetch_klines = Task::perform(
  1356. bybit::fetch_klines(ticker, timeframe, None)
  1357. .map_err(|err| format!("{err}")),
  1358. move |klines| {
  1359. Message::DistributeFetchedKlines(
  1360. StreamType::Kline {
  1361. exchange,
  1362. ticker,
  1363. timeframe,
  1364. },
  1365. klines,
  1366. )
  1367. },
  1368. );
  1369. tasks.push(fetch_klines);
  1370. }
  1371. }
  1372. }
  1373. }
  1374. tasks
  1375. }