dashboard.rs 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553
  1. pub mod pane;
  2. use futures::TryFutureExt;
  3. pub use pane::{PaneState, PaneContent, PaneSettings};
  4. use crate::{
  5. charts::{
  6. candlestick::CandlestickChart, footprint::FootprintChart, Message as ChartMessage
  7. },
  8. data_providers::{
  9. binance, bybit, fetcher::FetchRange, Depth, Exchange, Kline, OpenInterest, TickMultiplier, Ticker, TickerInfo, Timeframe, Trade
  10. },
  11. screen::InfoType,
  12. style,
  13. window::{self, Window},
  14. StreamType,
  15. };
  16. use super::{
  17. create_notis_column, modal::dashboard_notification, DashboardError, Notification,
  18. NotificationManager, UserTimezone,
  19. };
  20. use std::{
  21. collections::{HashMap, HashSet},
  22. vec,
  23. };
  24. use iced::{
  25. widget::{
  26. center, container,
  27. pane_grid::{self, Configuration},
  28. PaneGrid,
  29. },
  30. Element, Length, Point, Size, Task, Vector,
  31. };
  32. #[derive(Debug, Clone)]
  33. pub enum Message {
  34. Pane(window::Id, pane::Message),
  35. SavePopoutSpecs(HashMap<window::Id, (Point, Size)>),
  36. ResetLayout,
  37. ErrorOccurred(window::Id, Option<pane_grid::Pane>, DashboardError),
  38. ClearLastNotification(window::Id, pane_grid::Pane),
  39. ClearLastGlobalNotification,
  40. LayoutFetchAll,
  41. RefreshStreams,
  42. // Kline fetching
  43. FetchEvent(
  44. Option<uuid::Uuid>,
  45. Result<Vec<Kline>, String>,
  46. StreamType,
  47. pane_grid::Pane,
  48. window::Id,
  49. ),
  50. OIFetchEvent(
  51. Option<uuid::Uuid>,
  52. Result<Vec<OpenInterest>, String>,
  53. StreamType,
  54. pane_grid::Pane,
  55. window::Id,
  56. ),
  57. DistributeFetchedKlines(StreamType, Result<Vec<Kline>, String>),
  58. ChartMessage(pane_grid::Pane, window::Id, ChartMessage),
  59. // Batched trade fetching
  60. FetchTrades(
  61. window::Id,
  62. pane_grid::Pane,
  63. i64,
  64. i64,
  65. StreamType,
  66. ),
  67. DistributeFetchedTrades(
  68. window::Id,
  69. pane_grid::Pane,
  70. Vec<Trade>,
  71. StreamType,
  72. i64,
  73. ),
  74. }
  75. pub struct Dashboard {
  76. pub panes: pane_grid::State<PaneState>,
  77. pub focus: Option<(window::Id, pane_grid::Pane)>,
  78. pub popout: HashMap<window::Id, (pane_grid::State<PaneState>, (Point, Size))>,
  79. pub layout_lock: bool,
  80. pub pane_streams: HashMap<Exchange, HashMap<Ticker, HashSet<StreamType>>>,
  81. notification_manager: NotificationManager,
  82. tickers_info: HashMap<Exchange, HashMap<Ticker, Option<TickerInfo>>>,
  83. timezone: UserTimezone,
  84. pub trade_fetch_enabled: bool,
  85. }
  86. impl Default for Dashboard {
  87. fn default() -> Self {
  88. Self::empty()
  89. }
  90. }
  91. impl Dashboard {
  92. fn empty() -> Self {
  93. Self {
  94. panes: pane_grid::State::with_configuration(Self::default_pane_config()),
  95. focus: None,
  96. layout_lock: false,
  97. pane_streams: HashMap::new(),
  98. notification_manager: NotificationManager::new(),
  99. tickers_info: HashMap::new(),
  100. popout: HashMap::new(),
  101. timezone: UserTimezone::default(),
  102. trade_fetch_enabled: false,
  103. }
  104. }
  105. fn default_pane_config() -> Configuration<PaneState> {
  106. Configuration::Split {
  107. axis: pane_grid::Axis::Vertical,
  108. ratio: 0.8,
  109. a: Box::new(Configuration::Split {
  110. axis: pane_grid::Axis::Horizontal,
  111. ratio: 0.4,
  112. a: Box::new(Configuration::Split {
  113. axis: pane_grid::Axis::Vertical,
  114. ratio: 0.5,
  115. a: Box::new(Configuration::Pane(PaneState {
  116. modal: pane::PaneModal::None,
  117. stream: vec![],
  118. content: PaneContent::Starter,
  119. settings: PaneSettings::default(),
  120. })),
  121. b: Box::new(Configuration::Pane(PaneState {
  122. modal: pane::PaneModal::None,
  123. stream: vec![],
  124. content: PaneContent::Starter,
  125. settings: PaneSettings::default(),
  126. })),
  127. }),
  128. b: Box::new(Configuration::Split {
  129. axis: pane_grid::Axis::Vertical,
  130. ratio: 0.5,
  131. a: Box::new(Configuration::Pane(PaneState {
  132. modal: pane::PaneModal::None,
  133. stream: vec![],
  134. content: PaneContent::Starter,
  135. settings: PaneSettings::default(),
  136. })),
  137. b: Box::new(Configuration::Pane(PaneState {
  138. modal: pane::PaneModal::None,
  139. stream: vec![],
  140. content: PaneContent::Starter,
  141. settings: PaneSettings::default(),
  142. })),
  143. }),
  144. }),
  145. b: Box::new(Configuration::Pane(PaneState {
  146. modal: pane::PaneModal::None,
  147. stream: vec![],
  148. content: PaneContent::Starter,
  149. settings: PaneSettings::default(),
  150. })),
  151. }
  152. }
  153. pub fn from_config(
  154. panes: Configuration<PaneState>,
  155. popout_windows: Vec<(Configuration<PaneState>, (Point, Size))>,
  156. trade_fetch_enabled: bool,
  157. ) -> Self {
  158. let panes = pane_grid::State::with_configuration(panes);
  159. let mut popout = HashMap::new();
  160. for (pane, specs) in popout_windows {
  161. popout.insert(
  162. window::Id::unique(),
  163. (pane_grid::State::with_configuration(pane), specs),
  164. );
  165. }
  166. Self {
  167. panes,
  168. focus: None,
  169. layout_lock: false,
  170. pane_streams: HashMap::new(),
  171. notification_manager: NotificationManager::new(),
  172. tickers_info: HashMap::new(),
  173. popout,
  174. timezone: UserTimezone::default(),
  175. trade_fetch_enabled,
  176. }
  177. }
  178. pub fn load_layout(&mut self) -> Task<Message> {
  179. let mut open_popouts_tasks: Vec<Task<Message>> = vec![];
  180. let mut new_popout: Vec<(
  181. iced::window::Id,
  182. (pane_grid::State<PaneState>, (Point, Size)),
  183. )> = Vec::new();
  184. let mut keys_to_remove: Vec<(iced::window::Id, (Point, Size))> = Vec::new();
  185. for (old_window_id, (_, specs)) in &self.popout {
  186. keys_to_remove.push((*old_window_id, *specs));
  187. }
  188. // remove keys and open new windows
  189. for (old_window_id, (pos, size)) in keys_to_remove {
  190. let (window, task) = window::open(window::Settings {
  191. position: window::Position::Specific(pos),
  192. size,
  193. exit_on_close_request: false,
  194. ..window::settings()
  195. });
  196. open_popouts_tasks.push(task.then(|_| Task::none()));
  197. if let Some((removed_pane, specs)) = self.popout.remove(&old_window_id) {
  198. new_popout.push((window, (removed_pane, specs)));
  199. }
  200. }
  201. // assign new windows to old panes
  202. for (window, (pane, specs)) in new_popout {
  203. self.popout.insert(window, (pane, specs));
  204. }
  205. Task::batch(open_popouts_tasks).chain(Task::batch(vec![
  206. Task::done(Message::RefreshStreams),
  207. Task::done(Message::LayoutFetchAll),
  208. ]))
  209. }
  210. pub fn reset_layout(&mut self) -> Task<Message> {
  211. Task::done(Message::ResetLayout)
  212. }
  213. pub fn update(&mut self, message: Message, main_window: &Window) -> Task<Message> {
  214. match message {
  215. Message::ResetLayout => {
  216. self.panes = pane_grid::State::with_configuration(Self::default_pane_config());
  217. self.focus = None;
  218. self.layout_lock = false;
  219. (self.popout, self.pane_streams) = (HashMap::new(), HashMap::new());
  220. }
  221. Message::SavePopoutSpecs(specs) => {
  222. for (window_id, (position, size)) in specs {
  223. if let Some((_, specs)) = self.popout.get_mut(&window_id) {
  224. *specs = (position, size);
  225. }
  226. }
  227. }
  228. Message::ClearLastNotification(window, pane) => {
  229. self.notification_manager.remove_last(&window, &pane);
  230. }
  231. Message::ClearLastGlobalNotification => {
  232. self.notification_manager.global_notifications.pop();
  233. }
  234. Message::ErrorOccurred(window, pane, err) => {
  235. if let Some(pane) = pane {
  236. self.notification_manager.handle_error(window, pane, err);
  237. return Task::perform(
  238. async { std::thread::sleep(std::time::Duration::from_secs(15)) },
  239. move |()| Message::ClearLastNotification(window, pane),
  240. );
  241. }
  242. }
  243. Message::Pane(window, message) => {
  244. match message {
  245. pane::Message::PaneClicked(pane) => {
  246. self.focus = Some((window, pane));
  247. }
  248. pane::Message::PaneResized(pane_grid::ResizeEvent { split, ratio }) => {
  249. self.panes.resize(split, ratio);
  250. }
  251. pane::Message::PaneDragged(event) => {
  252. if let pane_grid::DragEvent::Dropped { pane, target } = event {
  253. self.panes.drop(pane, target);
  254. self.focus = None;
  255. }
  256. }
  257. pane::Message::SplitPane(axis, pane) => {
  258. let focus_pane = if let Some((new_pane, _)) = self.panes.split(
  259. axis,
  260. pane,
  261. PaneState::new(vec![], PaneSettings::default()),
  262. ) {
  263. Some(new_pane)
  264. } else {
  265. None
  266. };
  267. if Some(focus_pane).is_some() {
  268. self.focus = Some((window, focus_pane.unwrap()));
  269. }
  270. }
  271. pane::Message::ClosePane(pane) => {
  272. if let Some((_, sibling)) = self.panes.close(pane) {
  273. self.focus = Some((window, sibling));
  274. }
  275. }
  276. pane::Message::MaximizePane(pane) => {
  277. self.panes.maximize(pane);
  278. }
  279. pane::Message::Restore => {
  280. self.panes.restore();
  281. }
  282. pane::Message::ReplacePane(pane) => {
  283. if let Some(pane) = self.panes.get_mut(pane) {
  284. *pane = PaneState::new(vec![], PaneSettings::default());
  285. }
  286. }
  287. pane::Message::ToggleModal(pane, modal_type) => {
  288. if let Some(pane) = self.get_mut_pane(main_window.id, window, pane) {
  289. if modal_type == pane.modal {
  290. pane.modal = pane::PaneModal::None;
  291. } else {
  292. pane.modal = modal_type;
  293. }
  294. };
  295. }
  296. pane::Message::ChartUserUpdate(pane, chart_message) => {
  297. return self.update_chart_state(
  298. pane,
  299. window,
  300. &chart_message,
  301. main_window.id,
  302. );
  303. }
  304. pane::Message::SliderChanged(pane, value, is_trade_filter) => {
  305. return self.set_pane_size_filter(
  306. window,
  307. pane,
  308. value,
  309. is_trade_filter,
  310. main_window.id,
  311. );
  312. }
  313. pane::Message::InitPaneContent(window, content_str, is_pane, pane_stream) => {
  314. let pane;
  315. if let Some(parent_pane) = is_pane {
  316. pane = parent_pane;
  317. } else {
  318. pane = self.panes.iter().next().map(|(pane, _)| *pane).unwrap();
  319. }
  320. let err_occurred = |err| {
  321. Task::done(Message::ErrorOccurred(window, Some(pane), err))
  322. };
  323. let timezone = self.timezone;
  324. let ticker_info = match self.get_ticker_info(&pane_stream) {
  325. Some(info) => info,
  326. None => {
  327. return err_occurred(DashboardError::PaneSet(
  328. "No ticker info found".to_string(),
  329. ));
  330. }
  331. };
  332. // set pane's stream and content identifiers
  333. if let Some(pane_state) = self.get_mut_pane(main_window.id, window, pane) {
  334. if let Err(err) = pane_state.set_content(
  335. ticker_info,
  336. &content_str,
  337. timezone
  338. ) {
  339. return err_occurred(err);
  340. }
  341. } else {
  342. return err_occurred(DashboardError::PaneSet(
  343. "No pane found".to_string()
  344. ));
  345. }
  346. // prepare unique streams for websocket
  347. for stream in &pane_stream {
  348. match stream {
  349. StreamType::Kline {
  350. exchange, ticker, ..
  351. }
  352. | StreamType::DepthAndTrades { exchange, ticker } => {
  353. self.pane_streams
  354. .entry(*exchange)
  355. .or_default()
  356. .entry(*ticker)
  357. .or_default()
  358. .insert(*stream);
  359. }
  360. _ => {}
  361. }
  362. }
  363. log::info!("{:?}", &self.pane_streams);
  364. // get fetch tasks for pane's content
  365. if ["footprint", "candlestick", "heatmap"]
  366. .contains(&content_str.as_str())
  367. {
  368. for stream in &pane_stream {
  369. if let StreamType::Kline { .. } = stream {
  370. if ["candlestick", "footprint"]
  371. .contains(&content_str.as_str())
  372. {
  373. return get_kline_fetch_task(
  374. window, pane, *stream, None, None,
  375. );
  376. }
  377. }
  378. }
  379. }
  380. }
  381. pane::Message::TimeframeSelected(timeframe, pane) => {
  382. self.notification_manager.clear(&window, &pane);
  383. match self.set_pane_timeframe(main_window.id, window, pane, timeframe) {
  384. Ok(stream_type) => {
  385. if let StreamType::Kline { .. } = stream_type {
  386. let task = get_kline_fetch_task(
  387. window,
  388. pane,
  389. *stream_type,
  390. None,
  391. None,
  392. );
  393. self.notification_manager.push(
  394. window,
  395. pane,
  396. Notification::Info(InfoType::FetchingKlines),
  397. );
  398. return Task::done(Message::RefreshStreams)
  399. .chain(task);
  400. }
  401. }
  402. Err(err) => {
  403. return Task::done(
  404. Message::ErrorOccurred(window, Some(pane), err)
  405. );
  406. }
  407. }
  408. }
  409. pane::Message::TicksizeSelected(tick_multiply, pane) => {
  410. self.notification_manager.clear(&window, &pane);
  411. return self.set_pane_ticksize(main_window.id, window, pane, tick_multiply);
  412. }
  413. pane::Message::Popout => return self.popout_pane(main_window),
  414. pane::Message::Merge => return self.merge_pane(main_window),
  415. pane::Message::ToggleIndicator(pane, indicator_str) => {
  416. if let Some(pane_state) = self.get_mut_pane(main_window.id, window, pane) {
  417. pane_state.content.toggle_indicator(indicator_str);
  418. }
  419. }
  420. pane::Message::HideNotification(pane, notification) => {
  421. self.notification_manager.find_and_remove(window, pane, notification);
  422. }
  423. }
  424. }
  425. Message::FetchEvent(req_id, klines, pane_stream, pane_id, window) => {
  426. self.notification_manager.remove_info_type(
  427. window,
  428. &pane_id,
  429. &InfoType::FetchingKlines,
  430. );
  431. match klines {
  432. Ok(klines) => {
  433. if let StreamType::Kline { timeframe, .. } = pane_stream {
  434. let timezone = self.timezone;
  435. if let Some(pane_state) =
  436. self.get_mut_pane(main_window.id, window, pane_id)
  437. {
  438. pane_state.insert_klines_vec(req_id, timeframe, &klines, timezone);
  439. }
  440. }
  441. }
  442. Err(err) => {
  443. return Task::done(Message::ErrorOccurred(
  444. window,
  445. Some(pane_id),
  446. DashboardError::Fetch(err)
  447. ));
  448. }
  449. }
  450. }
  451. Message::OIFetchEvent(req_id, oi, pane_stream, pane_id, window) => {
  452. self.notification_manager.remove_info_type(
  453. window,
  454. &pane_id,
  455. &InfoType::FetchingOI,
  456. );
  457. if let Some(pane_state) =
  458. self.get_mut_pane(main_window.id, window, pane_id)
  459. {
  460. match oi {
  461. Ok(oi) => {
  462. if let StreamType::Kline { .. } = pane_stream {
  463. pane_state.insert_oi_vec(req_id, oi);
  464. }
  465. }
  466. Err(err) => {
  467. return Task::done(Message::ErrorOccurred(
  468. window,
  469. Some(pane_id),
  470. DashboardError::Fetch(err),
  471. ))
  472. }
  473. }
  474. }
  475. }
  476. Message::LayoutFetchAll => {
  477. let mut fetched_panes = vec![];
  478. self.iter_all_panes(main_window.id)
  479. .for_each(|(window, pane, pane_state)| match pane_state.content {
  480. PaneContent::Candlestick(_, _) | PaneContent::Footprint(_, _) => {
  481. fetched_panes.push((window, pane));
  482. }
  483. _ => {}
  484. });
  485. for (window, pane) in fetched_panes {
  486. self.notification_manager.push(
  487. window,
  488. pane,
  489. Notification::Info(InfoType::FetchingKlines),
  490. );
  491. }
  492. return Task::batch(klines_fetch_all_task(&self.pane_streams));
  493. }
  494. Message::DistributeFetchedKlines(stream_type, klines) => match klines {
  495. Ok(klines) => {
  496. let mut inserted_panes = vec![];
  497. let timezone = self.timezone;
  498. self.iter_all_panes_mut(main_window.id)
  499. .for_each(|(window, pane, state)| {
  500. if state.matches_stream(&stream_type) {
  501. if let StreamType::Kline { timeframe, .. } = stream_type {
  502. match &mut state.content {
  503. PaneContent::Candlestick(chart, indicators) => {
  504. let tick_size = chart.get_tick_size();
  505. *chart = CandlestickChart::new(
  506. chart.get_chart_layout(),
  507. klines.clone(),
  508. timeframe,
  509. tick_size,
  510. timezone,
  511. indicators,
  512. );
  513. }
  514. PaneContent::Footprint(chart, indicators) => {
  515. let (raw_trades, tick_size) =
  516. (chart.get_raw_trades(), chart.get_tick_size());
  517. *chart = FootprintChart::new(
  518. chart.get_chart_layout(),
  519. timeframe,
  520. tick_size,
  521. klines.clone(),
  522. raw_trades,
  523. timezone,
  524. indicators,
  525. );
  526. }
  527. _ => {}
  528. }
  529. inserted_panes.push((window, pane));
  530. }
  531. }
  532. });
  533. for (window, pane) in inserted_panes {
  534. self.notification_manager.remove_info_type(
  535. window,
  536. &pane,
  537. &InfoType::FetchingKlines,
  538. );
  539. }
  540. }
  541. Err(err) => {
  542. log::error!("{err}");
  543. }
  544. }
  545. Message::FetchTrades(
  546. window_id,
  547. pane,
  548. from_time,
  549. to_time,
  550. stream_type,
  551. ) => {
  552. if let StreamType::DepthAndTrades { exchange, ticker } = stream_type {
  553. if exchange == Exchange::BinanceFutures || exchange == Exchange::BinanceSpot {
  554. return Task::perform(
  555. binance::fetch_trades(ticker, from_time),
  556. move |result| match result {
  557. Ok(trades) => Message::DistributeFetchedTrades(
  558. window_id,
  559. pane,
  560. trades,
  561. stream_type,
  562. to_time,
  563. ),
  564. Err(err) => Message::ErrorOccurred(
  565. window_id,
  566. Some(pane),
  567. DashboardError::Fetch(err.to_string()),
  568. ),
  569. },
  570. );
  571. } else {
  572. self.notification_manager.remove_info_type(
  573. window_id,
  574. &pane,
  575. &InfoType::FetchingTrades(0),
  576. );
  577. return Task::done(Message::ErrorOccurred(
  578. window_id,
  579. Some(pane),
  580. DashboardError::Fetch(format!(
  581. "No trade fetch support for {exchange:?}"
  582. )),
  583. ));
  584. }
  585. }
  586. }
  587. Message::DistributeFetchedTrades(
  588. window_id,
  589. pane,
  590. trades,
  591. stream_type,
  592. to_time,
  593. ) => {
  594. let last_trade_time = trades.last()
  595. .map_or(0, |trade| trade.time);
  596. self.notification_manager.increment_fetching_trades(
  597. window_id,
  598. &pane,
  599. trades.len(),
  600. );
  601. if last_trade_time < to_time {
  602. match self.insert_fetched_trades(
  603. main_window.id,
  604. window_id,
  605. pane,
  606. &trades,
  607. false,
  608. ) {
  609. Ok(_) => {
  610. return Task::done(Message::FetchTrades(
  611. window_id,
  612. pane,
  613. last_trade_time,
  614. to_time,
  615. stream_type,
  616. ));
  617. }
  618. Err(err) => {
  619. self.notification_manager.remove_info_type(
  620. window_id,
  621. &pane,
  622. &InfoType::FetchingTrades(0),
  623. );
  624. return Task::done(
  625. Message::ErrorOccurred(window_id, Some(pane), err)
  626. );
  627. }
  628. }
  629. } else {
  630. self.notification_manager.remove_info_type(
  631. window_id,
  632. &pane,
  633. &InfoType::FetchingTrades(0),
  634. );
  635. match self.insert_fetched_trades(
  636. main_window.id,
  637. window_id,
  638. pane,
  639. &trades,
  640. true,
  641. ) {
  642. Ok(_) => {}
  643. Err(err) => {
  644. return Task::done(
  645. Message::ErrorOccurred(window_id, Some(pane), err)
  646. );
  647. }
  648. }
  649. }
  650. }
  651. Message::RefreshStreams => {
  652. self.pane_streams = self.get_all_diff_streams(main_window.id);
  653. }
  654. Message::ChartMessage(pane, window, message) => {
  655. if let ChartMessage::NewDataRange(req_id, fetch) = message {
  656. match fetch {
  657. FetchRange::Kline(from, to) => {
  658. let kline_stream = self
  659. .get_pane(main_window.id, window, pane)
  660. .and_then(|pane| {
  661. pane.stream
  662. .iter()
  663. .find(|stream| matches!(stream, StreamType::Kline { .. }))
  664. });
  665. if let Some(stream) = kline_stream {
  666. let stream = *stream;
  667. self.notification_manager.push(
  668. window,
  669. pane,
  670. Notification::Info(InfoType::FetchingKlines),
  671. );
  672. return get_kline_fetch_task(
  673. window,
  674. pane,
  675. stream,
  676. Some(req_id),
  677. Some((from, to)),
  678. );
  679. }
  680. }
  681. FetchRange::OpenInterest(from, to) => {
  682. let kline_stream = self
  683. .get_pane(main_window.id, window, pane)
  684. .and_then(|pane| {
  685. pane.stream
  686. .iter()
  687. .find(|stream| matches!(stream, StreamType::Kline { .. }))
  688. });
  689. if let Some(stream) = kline_stream {
  690. let stream = *stream;
  691. self.notification_manager.push(
  692. window,
  693. pane,
  694. Notification::Info(InfoType::FetchingOI),
  695. );
  696. return get_oi_fetch_task(
  697. window,
  698. pane,
  699. stream,
  700. Some(req_id),
  701. Some((from, to)),
  702. );
  703. }
  704. }
  705. FetchRange::Trades(from, to) => {
  706. if !self.trade_fetch_enabled {
  707. return Task::none();
  708. }
  709. let trade_stream = self
  710. .get_pane(main_window.id, window, pane)
  711. .and_then(|pane| {
  712. pane.stream.iter().find(|stream| {
  713. matches!(stream, StreamType::DepthAndTrades { .. })
  714. })
  715. });
  716. if let Some(stream) = trade_stream {
  717. let stream = *stream;
  718. self.notification_manager.push(
  719. window,
  720. pane,
  721. Notification::Info(InfoType::FetchingTrades(0)),
  722. );
  723. return Task::done(Message::FetchTrades(
  724. window,
  725. pane,
  726. from,
  727. to,
  728. stream,
  729. ));
  730. }
  731. }
  732. }
  733. }
  734. }
  735. }
  736. Task::none()
  737. }
  738. fn new_pane(
  739. &mut self,
  740. axis: pane_grid::Axis,
  741. main_window: &Window,
  742. pane_state: Option<PaneState>,
  743. ) -> Task<Message> {
  744. if self
  745. .focus
  746. .filter(|(window, _)| *window == main_window.id)
  747. .is_some()
  748. {
  749. // If there is any focused pane on main window, split it
  750. return self.split_pane(axis, main_window);
  751. } else {
  752. // If there is no focused pane, split the last pane or create a new empty grid
  753. let pane = self.panes.iter().last().map(|(pane, _)| pane).copied();
  754. if let Some(pane) = pane {
  755. let result = self.panes.split(
  756. axis,
  757. pane,
  758. pane_state.unwrap_or(PaneState::new(vec![], PaneSettings::default())),
  759. );
  760. if let Some((pane, _)) = result {
  761. return self.focus_pane(main_window.id, pane);
  762. }
  763. } else {
  764. let (state, pane) = pane_grid::State::new(
  765. pane_state.unwrap_or(PaneState::new(vec![], PaneSettings::default())),
  766. );
  767. self.panes = state;
  768. return self.focus_pane(main_window.id, pane);
  769. }
  770. }
  771. Task::none()
  772. }
  773. fn focus_pane(&mut self, window: window::Id, pane: pane_grid::Pane) -> Task<Message> {
  774. if self.focus != Some((window, pane)) {
  775. self.focus = Some((window, pane));
  776. }
  777. Task::none()
  778. }
  779. fn split_pane(&mut self, axis: pane_grid::Axis, main_window: &Window) -> Task<Message> {
  780. if let Some((window, pane)) = self.focus {
  781. if window == main_window.id {
  782. let result =
  783. self.panes
  784. .split(axis, pane, PaneState::new(vec![], PaneSettings::default()));
  785. if let Some((pane, _)) = result {
  786. return self.focus_pane(main_window.id, pane);
  787. }
  788. }
  789. }
  790. Task::none()
  791. }
  792. fn popout_pane(&mut self, main_window: &Window) -> Task<Message> {
  793. if let Some((_, id)) = self.focus.take() {
  794. if let Some((pane, _)) = self.panes.close(id) {
  795. let (window, task) = window::open(window::Settings {
  796. position: main_window
  797. .position
  798. .map(|point| window::Position::Specific(point + Vector::new(20.0, 20.0)))
  799. .unwrap_or_default(),
  800. exit_on_close_request: false,
  801. ..window::settings()
  802. });
  803. let (state, id) = pane_grid::State::new(pane);
  804. self.popout.insert(
  805. window,
  806. (state, (Point::new(0.0, 0.0), Size::new(1024.0, 768.0))),
  807. );
  808. return task.then(move |window| {
  809. Task::done(Message::Pane(window, pane::Message::PaneClicked(id)))
  810. });
  811. }
  812. }
  813. Task::none()
  814. }
  815. fn merge_pane(&mut self, main_window: &Window) -> Task<Message> {
  816. if let Some((window, pane)) = self.focus.take() {
  817. if let Some(pane_state) = self
  818. .popout
  819. .remove(&window)
  820. .and_then(|(mut panes, _)| panes.panes.remove(&pane))
  821. {
  822. let task =
  823. self.new_pane(pane_grid::Axis::Horizontal, main_window, Some(pane_state));
  824. return Task::batch(vec![window::close(window), task]);
  825. }
  826. }
  827. Task::none()
  828. }
  829. fn get_pane(
  830. &self,
  831. main_window: window::Id,
  832. window: window::Id,
  833. pane: pane_grid::Pane,
  834. ) -> Option<&PaneState> {
  835. if main_window == window {
  836. self.panes.get(pane)
  837. } else {
  838. self.popout
  839. .get(&window)
  840. .and_then(|(panes, _)| panes.get(pane))
  841. }
  842. }
  843. fn get_mut_pane(
  844. &mut self,
  845. main_window: window::Id,
  846. window: window::Id,
  847. pane: pane_grid::Pane,
  848. ) -> Option<&mut PaneState> {
  849. if main_window == window {
  850. self.panes.get_mut(pane)
  851. } else {
  852. self.popout
  853. .get_mut(&window)
  854. .and_then(|(panes, _)| panes.get_mut(pane))
  855. }
  856. }
  857. fn iter_all_panes(
  858. &self,
  859. main_window: window::Id,
  860. ) -> impl Iterator<Item = (window::Id, pane_grid::Pane, &PaneState)> {
  861. self.panes
  862. .iter()
  863. .map(move |(pane, state)| (main_window, *pane, state))
  864. .chain(self.popout.iter().flat_map(|(window_id, (panes, _))| {
  865. panes.iter().map(|(pane, state)| (*window_id, *pane, state))
  866. }))
  867. }
  868. fn iter_all_panes_mut(
  869. &mut self,
  870. main_window: window::Id,
  871. ) -> impl Iterator<Item = (window::Id, pane_grid::Pane, &mut PaneState)> {
  872. self.panes
  873. .iter_mut()
  874. .map(move |(pane, state)| (main_window, *pane, state))
  875. .chain(self.popout.iter_mut().flat_map(|(window_id, (panes, _))| {
  876. panes
  877. .iter_mut()
  878. .map(|(pane, state)| (*window_id, *pane, state))
  879. }))
  880. }
  881. pub fn view<'a>(&'a self, main_window: &'a Window) -> Element<'_, Message> {
  882. let focus = self.focus;
  883. let pane_locked = self.layout_lock;
  884. let mut pane_grid = PaneGrid::new(&self.panes, |id, pane, maximized| {
  885. let is_focused = !pane_locked && focus == Some((main_window.id, id));
  886. pane.view(
  887. id,
  888. self.panes.len(),
  889. is_focused,
  890. maximized,
  891. main_window.id,
  892. main_window,
  893. self.notification_manager.get(&main_window.id, &id),
  894. )
  895. })
  896. .spacing(6)
  897. .style(style::pane_grid);
  898. if !pane_locked {
  899. pane_grid = pane_grid
  900. .on_click(pane::Message::PaneClicked)
  901. .on_resize(8, pane::Message::PaneResized)
  902. .on_drag(pane::Message::PaneDragged);
  903. }
  904. let pane_grid: Element<_> = pane_grid.into();
  905. let base = container(pane_grid.map(move |message| Message::Pane(main_window.id, message)));
  906. if !self.notification_manager.global_notifications.is_empty() {
  907. dashboard_notification(
  908. base,
  909. create_notis_column(
  910. &self.notification_manager.global_notifications,
  911. Message::ClearLastGlobalNotification,
  912. ),
  913. )
  914. } else {
  915. base.into()
  916. }
  917. }
  918. pub fn view_window<'a>(
  919. &'a self,
  920. window: window::Id,
  921. main_window: &'a Window,
  922. ) -> Element<'a, Message> {
  923. if let Some((state, _)) = self.popout.get(&window) {
  924. let content = container({
  925. let mut pane_grid = PaneGrid::new(state, |id, pane, _maximized| {
  926. let is_focused = self.focus == Some((window, id));
  927. pane.view(
  928. id,
  929. state.len(),
  930. is_focused,
  931. false,
  932. window,
  933. main_window,
  934. self.notification_manager.get(&window, &id),
  935. )
  936. });
  937. if !self.layout_lock {
  938. pane_grid = pane_grid.on_click(pane::Message::PaneClicked);
  939. }
  940. pane_grid
  941. })
  942. .width(Length::Fill)
  943. .height(Length::Fill)
  944. .padding(8);
  945. return Element::new(content).map(move |message| Message::Pane(window, message));
  946. } else {
  947. return Element::new(center("No pane found for window"))
  948. .map(move |message| Message::Pane(window, message));
  949. }
  950. }
  951. pub fn set_tickers_info(
  952. &mut self,
  953. tickers_info_map: HashMap<Exchange, HashMap<Ticker, Option<TickerInfo>>>,
  954. ) {
  955. self.tickers_info = tickers_info_map;
  956. }
  957. fn get_ticker_info(&self, pane_stream: &[StreamType]) -> Option<TickerInfo> {
  958. pane_stream
  959. .iter()
  960. .filter_map(|stream| match stream {
  961. StreamType::Kline {exchange, ticker, ..}
  962. | StreamType::DepthAndTrades { exchange, ticker } => {
  963. self.tickers_info.get(exchange).and_then(|exchange_map| {
  964. exchange_map
  965. .get(ticker)
  966. .and_then(|ticker_info| *ticker_info)
  967. })
  968. }
  969. _ => None,
  970. })
  971. .next()
  972. }
  973. fn set_pane_ticksize(
  974. &mut self,
  975. main_window: window::Id,
  976. window: window::Id,
  977. pane: pane_grid::Pane,
  978. new_tick_multiply: TickMultiplier,
  979. ) -> Task<Message> {
  980. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  981. pane_state.settings.tick_multiply = Some(new_tick_multiply);
  982. if let Some(ticker_info) = pane_state.settings.ticker_info {
  983. match pane_state.content {
  984. PaneContent::Footprint(ref mut chart, _) => {
  985. chart.change_tick_size(
  986. new_tick_multiply.multiply_with_min_tick_size(ticker_info),
  987. );
  988. chart.reset_request_handler();
  989. }
  990. PaneContent::Heatmap(ref mut chart, _) => {
  991. chart.change_tick_size(
  992. new_tick_multiply.multiply_with_min_tick_size(ticker_info),
  993. );
  994. }
  995. _ => {
  996. return Task::done(Message::ErrorOccurred(
  997. window,
  998. Some(pane),
  999. DashboardError::PaneSet(
  1000. "No chart found to change ticksize".to_string(),
  1001. ),
  1002. ));
  1003. }
  1004. }
  1005. } else {
  1006. return Task::done(Message::ErrorOccurred(
  1007. window,
  1008. Some(pane),
  1009. DashboardError::PaneSet("No min ticksize found".to_string()),
  1010. ));
  1011. }
  1012. } else {
  1013. return Task::done(Message::ErrorOccurred(
  1014. window,
  1015. Some(pane),
  1016. DashboardError::PaneSet("No pane found to change ticksize".to_string()),
  1017. ));
  1018. }
  1019. Task::none()
  1020. }
  1021. fn set_pane_timeframe(
  1022. &mut self,
  1023. main_window: window::Id,
  1024. window: window::Id,
  1025. pane: pane_grid::Pane,
  1026. new_timeframe: Timeframe,
  1027. ) -> Result<&StreamType, DashboardError> {
  1028. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  1029. pane_state.settings.selected_timeframe = Some(new_timeframe);
  1030. if let Some(stream_type) = pane_state
  1031. .stream
  1032. .iter_mut()
  1033. .find(|stream_type| matches!(stream_type, StreamType::Kline { .. }))
  1034. {
  1035. if let StreamType::Kline { timeframe, .. } = stream_type {
  1036. *timeframe = new_timeframe;
  1037. }
  1038. match &mut pane_state.content {
  1039. PaneContent::Candlestick(chart, _) => {
  1040. chart.set_loading_state(true);
  1041. return Ok(stream_type);
  1042. }
  1043. PaneContent::Footprint(chart, _) => {
  1044. chart.set_loading_state(true);
  1045. return Ok(stream_type);
  1046. }
  1047. _ => {}
  1048. }
  1049. }
  1050. }
  1051. Err(DashboardError::Unknown(
  1052. "Couldn't get the pane to change its timeframe".to_string(),
  1053. ))
  1054. }
  1055. fn set_pane_size_filter(
  1056. &mut self,
  1057. window: window::Id,
  1058. pane: pane_grid::Pane,
  1059. new_size_filter: f32,
  1060. is_trade_filter: bool,
  1061. main_window: window::Id,
  1062. ) -> Task<Message> {
  1063. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  1064. pane_state.settings.trade_size_filter = Some(new_size_filter);
  1065. match pane_state.content {
  1066. PaneContent::Heatmap(ref mut chart, _) => {
  1067. chart.set_size_filter(new_size_filter, is_trade_filter);
  1068. }
  1069. PaneContent::TimeAndSales(ref mut chart) => {
  1070. chart.set_size_filter(new_size_filter);
  1071. }
  1072. _ => {
  1073. return Task::done(Message::ErrorOccurred(
  1074. window,
  1075. Some(pane),
  1076. DashboardError::Unknown("No chart found to set size filter".to_string()),
  1077. ));
  1078. }
  1079. }
  1080. Task::none()
  1081. } else {
  1082. Task::done(Message::ErrorOccurred(
  1083. window,
  1084. Some(pane),
  1085. DashboardError::Unknown("No pane found to set size filter".to_string()),
  1086. ))
  1087. }
  1088. }
  1089. pub fn init_pane_task(
  1090. &mut self,
  1091. main_window: window::Id,
  1092. ticker: Ticker,
  1093. exchange: Exchange,
  1094. content: &str,
  1095. ) -> Task<Message> {
  1096. if let Some((window, selected_pane)) = self.focus {
  1097. if let Some(pane_state) = self.get_mut_pane(main_window, window, selected_pane) {
  1098. return pane_state
  1099. .init_content_task(content, exchange, ticker, selected_pane, window)
  1100. .map(move |message| Message::Pane(window, message));
  1101. }
  1102. } else {
  1103. self.notification_manager
  1104. .global_notifications
  1105. .push(Notification::Warn("Select a pane first".to_string()));
  1106. return Task::perform(
  1107. async { std::thread::sleep(std::time::Duration::from_secs(8)) },
  1108. move |()| Message::ClearLastGlobalNotification,
  1109. );
  1110. }
  1111. Task::none()
  1112. }
  1113. pub fn toggle_trade_fetch(&mut self, is_enabled: bool, main_window: &Window) {
  1114. self.trade_fetch_enabled = is_enabled;
  1115. self.iter_all_panes_mut(main_window.id)
  1116. .for_each(|(_, _, pane_state)| {
  1117. if let PaneContent::Footprint(chart, _) = &mut pane_state.content {
  1118. chart.reset_request_handler();
  1119. }
  1120. });
  1121. }
  1122. fn insert_fetched_trades(
  1123. &mut self,
  1124. main_window: window::Id,
  1125. window: window::Id,
  1126. pane: pane_grid::Pane,
  1127. trades: &[Trade],
  1128. is_batches_done: bool,
  1129. ) -> Result<(), DashboardError> {
  1130. self.get_mut_pane(main_window, window, pane)
  1131. .map_or_else(
  1132. || Err(
  1133. DashboardError::Unknown("Couldnt get the pane for fetched trades".to_string())
  1134. ),
  1135. |pane_state| match &mut pane_state.content {
  1136. PaneContent::Footprint(chart, _) => {
  1137. chart.insert_trades(trades.to_owned(), is_batches_done);
  1138. Ok(())
  1139. }
  1140. _ => Err(
  1141. DashboardError::Unknown("No matching chart found for fetched trades".to_string())
  1142. ),
  1143. }
  1144. )
  1145. }
  1146. pub fn update_latest_klines(
  1147. &mut self,
  1148. stream_type: &StreamType,
  1149. kline: &Kline,
  1150. main_window: window::Id,
  1151. ) -> Task<Message> {
  1152. let mut tasks = vec![];
  1153. let mut found_match = false;
  1154. self.iter_all_panes_mut(main_window)
  1155. .for_each(|(window, pane, pane_state)| {
  1156. if pane_state.matches_stream(stream_type) {
  1157. match &mut pane_state.content {
  1158. PaneContent::Candlestick(chart, _) => tasks.push(
  1159. chart
  1160. .update_latest_kline(kline)
  1161. .map(move |message| Message::ChartMessage(pane, window, message)),
  1162. ),
  1163. PaneContent::Footprint(chart, _) => tasks.push(
  1164. chart
  1165. .update_latest_kline(kline)
  1166. .map(move |message| Message::ChartMessage(pane, window, message)),
  1167. ),
  1168. _ => {}
  1169. }
  1170. found_match = true;
  1171. }
  1172. });
  1173. if !found_match {
  1174. log::error!("No matching pane found for the stream: {stream_type:?}");
  1175. tasks.push(Task::done(Message::RefreshStreams));
  1176. }
  1177. Task::batch(tasks)
  1178. }
  1179. pub fn update_depth_and_trades(
  1180. &mut self,
  1181. stream_type: &StreamType,
  1182. depth_update_t: i64,
  1183. depth: Depth,
  1184. trades_buffer: Vec<Trade>,
  1185. main_window: window::Id,
  1186. ) -> Task<Message> {
  1187. let mut found_match = false;
  1188. let trades_buffer = trades_buffer.into_boxed_slice();
  1189. self.iter_all_panes_mut(main_window)
  1190. .for_each(|(_, _, pane_state)| {
  1191. if pane_state.matches_stream(stream_type) {
  1192. match &mut pane_state.content {
  1193. PaneContent::Heatmap(chart, _) => {
  1194. chart.insert_datapoint(&trades_buffer, depth_update_t, &depth);
  1195. }
  1196. PaneContent::Footprint(chart, _) => {
  1197. chart.insert_datapoint(&trades_buffer, depth_update_t);
  1198. }
  1199. PaneContent::TimeAndSales(chart) => {
  1200. chart.update(&trades_buffer);
  1201. }
  1202. _ => {
  1203. log::error!("No chart found for the stream: {stream_type:?}");
  1204. }
  1205. }
  1206. found_match = true;
  1207. }
  1208. });
  1209. if found_match {
  1210. Task::none()
  1211. } else {
  1212. log::error!("No matching pane found for the stream: {stream_type:?}");
  1213. Task::done(Message::RefreshStreams)
  1214. }
  1215. }
  1216. fn update_chart_state(
  1217. &mut self,
  1218. pane: pane_grid::Pane,
  1219. window: window::Id,
  1220. chart_message: &ChartMessage,
  1221. main_window: window::Id,
  1222. ) -> Task<Message> {
  1223. if let Some(pane_state) = self.get_mut_pane(main_window, window, pane) {
  1224. match pane_state.content {
  1225. PaneContent::Heatmap(ref mut chart, _) => chart
  1226. .update(chart_message)
  1227. .map(move |message| Message::ChartMessage(pane, window, message)),
  1228. PaneContent::Footprint(ref mut chart, _) => chart
  1229. .update(chart_message)
  1230. .map(move |message| Message::ChartMessage(pane, window, message)),
  1231. PaneContent::Candlestick(ref mut chart, _) => chart
  1232. .update(chart_message)
  1233. .map(move |message| Message::ChartMessage(pane, window, message)),
  1234. _ => Task::done(Message::ErrorOccurred(
  1235. window,
  1236. Some(pane),
  1237. DashboardError::Unknown("No chart found".to_string()),
  1238. )),
  1239. }
  1240. } else {
  1241. Task::done(Message::ErrorOccurred(
  1242. window,
  1243. Some(pane),
  1244. DashboardError::Unknown("No pane found to update its state".to_string()),
  1245. ))
  1246. }
  1247. }
  1248. fn get_all_diff_streams(
  1249. &mut self,
  1250. main_window: window::Id,
  1251. ) -> HashMap<Exchange, HashMap<Ticker, HashSet<StreamType>>> {
  1252. let mut pane_streams = HashMap::new();
  1253. self.iter_all_panes_mut(main_window)
  1254. .for_each(|(_, _, pane_state)| {
  1255. for stream_type in &pane_state.stream {
  1256. match stream_type {
  1257. StreamType::Kline {
  1258. exchange,
  1259. ticker,
  1260. timeframe,
  1261. } => {
  1262. let exchange = *exchange;
  1263. let ticker = *ticker;
  1264. let timeframe = *timeframe;
  1265. let exchange_map =
  1266. pane_streams.entry(exchange).or_insert(HashMap::new());
  1267. let ticker_map = exchange_map.entry(ticker).or_insert(HashSet::new());
  1268. ticker_map.insert(StreamType::Kline {
  1269. exchange,
  1270. ticker,
  1271. timeframe,
  1272. });
  1273. }
  1274. StreamType::DepthAndTrades { exchange, ticker } => {
  1275. let exchange = *exchange;
  1276. let ticker = *ticker;
  1277. let exchange_map =
  1278. pane_streams.entry(exchange).or_insert(HashMap::new());
  1279. let ticker_map = exchange_map.entry(ticker).or_insert(HashSet::new());
  1280. ticker_map.insert(StreamType::DepthAndTrades { exchange, ticker });
  1281. }
  1282. _ => {}
  1283. }
  1284. }
  1285. });
  1286. self.pane_streams.clone_from(&pane_streams);
  1287. pane_streams
  1288. }
  1289. pub fn get_timezone(&self) -> UserTimezone {
  1290. self.timezone
  1291. }
  1292. pub fn set_timezone(&mut self, main_window: window::Id, timezone: UserTimezone) {
  1293. self.timezone = timezone;
  1294. self.iter_all_panes_mut(main_window)
  1295. .for_each(|(_, _, pane)| {
  1296. pane.content.change_timezone(timezone);
  1297. });
  1298. }
  1299. }
  1300. fn get_oi_fetch_task(
  1301. window_id: window::Id,
  1302. pane: pane_grid::Pane,
  1303. stream: StreamType,
  1304. req_id: Option<uuid::Uuid>,
  1305. from_to_time: Option<(i64, i64)>,
  1306. ) -> Task<Message> {
  1307. match stream {
  1308. StreamType::Kline {
  1309. exchange,
  1310. ticker,
  1311. timeframe,
  1312. } => match exchange {
  1313. Exchange::BinanceFutures => Task::perform(
  1314. binance::fetch_historical_oi(ticker, from_to_time, timeframe)
  1315. .map_err(|err| format!("{err}")),
  1316. move |oi| Message::OIFetchEvent(req_id, oi, stream, pane, window_id),
  1317. ),
  1318. Exchange::BybitLinear => Task::perform(
  1319. bybit::fetch_historical_oi(ticker, from_to_time, timeframe)
  1320. .map_err(|err| format!("{err}")),
  1321. move |oi| Message::OIFetchEvent(req_id, oi, stream, pane, window_id),
  1322. ),
  1323. _ => {
  1324. log::error!("No OI fetch support for {exchange:?}");
  1325. Task::none()
  1326. },
  1327. },
  1328. _ => Task::none(),
  1329. }
  1330. }
  1331. fn get_kline_fetch_task(
  1332. window_id: window::Id,
  1333. pane: pane_grid::Pane,
  1334. stream: StreamType,
  1335. req_id: Option<uuid::Uuid>,
  1336. range: Option<(i64, i64)>,
  1337. ) -> Task<Message> {
  1338. match stream {
  1339. StreamType::Kline {
  1340. exchange,
  1341. ticker,
  1342. timeframe,
  1343. } => match exchange {
  1344. Exchange::BinanceFutures | Exchange::BinanceSpot => Task::perform(
  1345. binance::fetch_klines(ticker, timeframe, range)
  1346. .map_err(|err| format!("{err}")),
  1347. move |klines| Message::FetchEvent(req_id, klines, stream, pane, window_id),
  1348. ),
  1349. Exchange::BybitLinear | Exchange::BybitSpot => Task::perform(
  1350. bybit::fetch_klines(ticker, timeframe, range)
  1351. .map_err(|err| format!("{err}")),
  1352. move |klines| Message::FetchEvent(req_id, klines, stream, pane, window_id),
  1353. ),
  1354. },
  1355. _ => Task::none(),
  1356. }
  1357. }
  1358. fn klines_fetch_all_task(
  1359. streams: &HashMap<Exchange, HashMap<Ticker, HashSet<StreamType>>>,
  1360. ) -> Vec<Task<Message>> {
  1361. let mut tasks: Vec<Task<Message>> = vec![];
  1362. for (exchange, stream) in streams {
  1363. let mut kline_fetches = Vec::new();
  1364. for stream_types in stream.values() {
  1365. for stream_type in stream_types {
  1366. if let StreamType::Kline {
  1367. ticker, timeframe, ..
  1368. } = stream_type
  1369. {
  1370. kline_fetches.push((*ticker, *timeframe));
  1371. }
  1372. }
  1373. }
  1374. for (ticker, timeframe) in kline_fetches {
  1375. let (ticker, timeframe) = (ticker, timeframe);
  1376. let exchange = *exchange;
  1377. match exchange {
  1378. Exchange::BinanceFutures | Exchange::BinanceSpot => {
  1379. let fetch_klines = Task::perform(
  1380. binance::fetch_klines(ticker, timeframe, None)
  1381. .map_err(|err| format!("{err}")),
  1382. move |klines| {
  1383. Message::DistributeFetchedKlines(
  1384. StreamType::Kline {
  1385. exchange,
  1386. ticker,
  1387. timeframe,
  1388. },
  1389. klines,
  1390. )
  1391. },
  1392. );
  1393. tasks.push(fetch_klines);
  1394. }
  1395. Exchange::BybitLinear | Exchange::BybitSpot => {
  1396. let fetch_klines = Task::perform(
  1397. bybit::fetch_klines(ticker, timeframe, None)
  1398. .map_err(|err| format!("{err}")),
  1399. move |klines| {
  1400. Message::DistributeFetchedKlines(
  1401. StreamType::Kline {
  1402. exchange,
  1403. ticker,
  1404. timeframe,
  1405. },
  1406. klines,
  1407. )
  1408. },
  1409. );
  1410. tasks.push(fetch_klines);
  1411. }
  1412. }
  1413. }
  1414. }
  1415. tasks
  1416. }