Эх сурвалжийг харах

change timeframes w/o disconnecting from websocket

Berke 1 жил өмнө
parent
commit
5350078b92
1 өөрчлөгдсөн 84 нэмэгдсэн , 14 устгасан
  1. 84 14
      src/main.rs

+ 84 - 14
src/main.rs

@@ -55,7 +55,7 @@ impl std::fmt::Display for Ticker {
         )
     }
 }
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub enum Ticker {
     BTCUSDT,
     ETHUSDT,
@@ -81,7 +81,7 @@ impl std::fmt::Display for Timeframe {
         )
     }
 }
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub enum Timeframe {
     M1,
     M3,
@@ -124,6 +124,7 @@ impl From<Icon> for char {
     }
 }
 
+#[derive(Debug)]
 enum WsState {
     Connected(market_data::Connection),
     Disconnected,
@@ -233,6 +234,8 @@ pub enum Message {
     // Slider
     SliderChanged(PaneId, f32),
     SyncWithHeatmap(bool),
+
+    CreateActiveStream(Ticker, Timeframe),
 }
 
 struct State {
@@ -250,6 +253,8 @@ struct State {
     user_ws_state: UserWsState,
     ws_running: bool,
 
+    active_kline_streams: HashMap<(Ticker, Timeframe), (bool, WsState)>,
+
     // pane grid
     panes_open: HashMap<PaneId, bool>,
     panes: pane_grid::State<Pane>,
@@ -294,7 +299,7 @@ impl Application for State {
     fn new(_flags: Self::Flags) -> (Self, Command<Self::Message>) {
         let (panes, first_pane) = pane_grid::State::new(Pane::new(PaneId::CustomChart));
 
-        let mut panes_open = HashMap::new();
+        let mut panes_open: HashMap<PaneId, bool> = HashMap::new();
         panes_open.insert(PaneId::HeatmapChart, true);
         panes_open.insert(PaneId::CandlestickChart, false);
         panes_open.insert(PaneId::TimeAndSales, false);
@@ -315,6 +320,7 @@ impl Application for State {
                 selected_timeframe: Some(Timeframe::M1),
                 selected_exchange: Some("Binance Futures"),
                 ws_state: WsState::Disconnected,
+                active_kline_streams: HashMap::new(),
                 user_ws_state: UserWsState::Disconnected,
                 ws_running: false,
                 panes,
@@ -412,7 +418,47 @@ impl Application for State {
             },
             Message::TimeframeSelected(timeframe) => {
                 self.selected_timeframe = Some(timeframe);
-                Command::none()
+
+                self.active_kline_streams.clear();
+        
+                let selected_ticker = match &self.selected_ticker {
+                    Some(ticker) => ticker,
+                    None => {
+                        eprintln!("No ticker selected");
+                        return Command::none();
+                    }
+                };
+                let selected_timeframe = match &self.selected_timeframe {
+                    Some(timeframe) => timeframe,
+                    None => {
+                        eprintln!("No timeframe selected");
+                        return Command::none();
+                    }
+                };
+            
+                let fetch_klines = Command::perform(
+                    market_data::fetch_klines(*selected_ticker, *selected_timeframe)
+                        .map_err(|err| format!("{}", err)), 
+                    |klines| {
+                        Message::FetchEvent(klines)
+                    }
+                );
+
+                let selected_ticker_clone = selected_ticker.clone();
+                let selected_timeframe_clone = selected_timeframe.clone();
+                
+                // sleep to drop existent stream and create new one
+                let remove_active_stream = Command::perform(
+                    async {
+                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+                    },
+                    move |_| Message::CreateActiveStream(selected_ticker_clone, selected_timeframe_clone)
+                );
+
+                self.custom_line = None;
+                self.candlestick_chart = None;
+
+                Command::batch(vec![fetch_klines, remove_active_stream])
             },
             Message::ExchangeSelected(exchange) => {
                 self.selected_exchange = Some(exchange);
@@ -440,7 +486,7 @@ impl Application for State {
                     };
             
                     let fetch_klines = Command::perform(
-                        market_data::fetch_klines(selected_ticker.to_string(), selected_timeframe.to_string())
+                        market_data::fetch_klines(*selected_ticker, *selected_timeframe)
                             .map_err(|err| format!("{}", err)), 
                         |klines| {
                             Message::FetchEvent(klines)
@@ -530,6 +576,12 @@ impl Application for State {
                                     self.time_and_sales = Some(TimeAndSales::new());
                                 }
                             }
+                            if *pane_id == PaneId::CustomChart {
+                                self.active_kline_streams.insert(
+                                    (selected_ticker.clone(), selected_timeframe.clone()),
+                                    (true, WsState::Disconnected)
+                                );
+                            }
                         }
                     }
 
@@ -572,7 +624,7 @@ impl Application for State {
 
                         self.candlestick_chart = Some(CandlestickChart::new(klines, timeframe_in_minutes));
 
-                        self.custom_line = Some(CustomLine::new(klines_clone, timeframe_in_minutes))
+                        self.custom_line = Some(CustomLine::new(klines_clone, timeframe_in_minutes));
                     },
                     Err(err) => {
                         eprintln!("Error fetching klines: {}", err);
@@ -974,6 +1026,13 @@ impl Application for State {
             
                 Command::none()
             },
+            Message::CreateActiveStream(ticker, timeframe) => {
+                self.active_kline_streams.insert(
+                    (ticker, timeframe),
+                    (true, WsState::Disconnected)
+                );
+                Command::none()
+            },
         }
     }
 
@@ -1095,6 +1154,13 @@ impl Application for State {
             .align_items(Alignment::Center)
             .push(ws_button);
 
+
+        let timeframe_pick_list = pick_list(
+            &Timeframe::ALL[..],
+            self.selected_timeframe,
+            Message::TimeframeSelected,
+        );
+
         if !self.ws_running {
             let symbol_pick_list = pick_list(
                 &Ticker::ALL[..],
@@ -1102,11 +1168,7 @@ impl Application for State {
                 Message::TickerSelected,
             ).placeholder("Choose a ticker...");
             
-            let timeframe_pick_list = pick_list(
-                &Timeframe::ALL[..],
-                self.selected_timeframe,
-                Message::TimeframeSelected,
-            );
+            
             let exchange_selector = pick_list(
                 &["Binance Futures"][..],
                 self.selected_exchange,
@@ -1119,7 +1181,8 @@ impl Application for State {
                 .push(timeframe_pick_list);
                 
         } else {
-            ws_controls = ws_controls.push(Text::new(self.selected_ticker.unwrap_or_else(|| { dbg!("No ticker found"); Ticker::BTCUSDT } ).to_string()).size(20));
+            ws_controls = ws_controls.push(
+                Text::new(self.selected_ticker.unwrap_or_else(|| { dbg!("No ticker found"); Ticker::BTCUSDT } ).to_string()).size(20)).push(timeframe_pick_list);;
         }
 
         let content = Column::new()
@@ -1153,9 +1216,16 @@ impl Application for State {
     
         if self.ws_running {
             self.selected_ticker.and_then(|ticker| {
-                self.selected_timeframe.map(|timeframe| {
-                    let binance_market_stream = market_data::connect_market_stream(ticker, timeframe).map(Message::MarketWsEvent);
+                self.selected_timeframe.map(|_timeframe| {
+                    let binance_market_stream = market_data::connect_market_stream(ticker).map(Message::MarketWsEvent);
                     subscriptions.push(binance_market_stream);
+
+                    for (stream, (active, _state)) in &self.active_kline_streams {
+                        if *active {
+                            let binance_market_stream = market_data::connect_kline_stream(*stream).map(Message::MarketWsEvent);
+                            subscriptions.push(binance_market_stream);
+                        }
+                    }
                 })
             });
         }