Эх сурвалжийг харах

add timeframe to an Event enum for multiple stream support

Berke 1 жил өмнө
parent
commit
bf387f444e

+ 35 - 42
src/data_providers/binance/market_data.rs

@@ -2,7 +2,6 @@ use iced::futures;
 use iced::subscription::{self, Subscription};
 use serde::Deserialize;
 use serde_json::json;
-use hmac::{Hmac, Mac};
 
 mod string_to_f32 {
     use serde::{self, Deserialize, Deserializer};
@@ -175,30 +174,31 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
     )
 }
 
-pub fn connect_kline_stream(stream: (Ticker, Timeframe)) -> Subscription<Event> {
+pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event> {
     struct Connect;
 
     subscription::channel(
         std::any::TypeId::of::<Connect>(),
         100,
         move |mut output| async move {
-            let mut state = State::Disconnected;     
-
-            let symbol_str = match stream.0 {
-                Ticker::BTCUSDT => "btcusdt",
-                Ticker::ETHUSDT => "ethusdt",
-                Ticker::SOLUSDT => "solusdt",
-                Ticker::LTCUSDT => "ltcusdt",
-            };
-            let timeframe_str = match stream.1 {
-                Timeframe::M1 => "1m",
-                Timeframe::M3 => "3m",
-                Timeframe::M5 => "5m",
-                Timeframe::M15 => "15m",
-                Timeframe::M30 => "30m",
-            };
+            let mut state = State::Disconnected;    
 
-            let stream_str = format!("{}@kline_{}", symbol_str, timeframe_str);
+            let stream_str = vec.iter().map(|(ticker, timeframe)| {
+                let symbol_str = match ticker {
+                    Ticker::BTCUSDT => "btcusdt",
+                    Ticker::ETHUSDT => "ethusdt",
+                    Ticker::SOLUSDT => "solusdt",
+                    Ticker::LTCUSDT => "ltcusdt",
+                };
+                let timeframe_str = match timeframe {
+                    Timeframe::M1 => "1m",
+                    Timeframe::M3 => "3m",
+                    Timeframe::M5 => "5m",
+                    Timeframe::M15 => "15m",
+                    Timeframe::M30 => "30m",
+                };
+                format!("{}@kline_{}", symbol_str, timeframe_str)
+            }).collect::<Vec<String>>().join("/");
  
             loop {
                 match &mut state {
@@ -230,39 +230,29 @@ pub fn connect_kline_stream(stream: (Ticker, Timeframe)) -> Subscription<Event>
                             received = fused_websocket.select_next_some() => {
                                 match received {
                                     Ok(tungstenite::Message::Text(message)) => {
-                                        let parsed_message: Result<serde_json::Value, _> = serde_json::from_str(&message);
-                                        match parsed_message {
+                                        match serde_json::from_str::<serde_json::Value>(&message) {
                                             Ok(data) => {
-                                                if let Some(inner_data) = data.get("data") {
-                                                    if let Some(event_type) = inner_data["e"].as_str() {
-                                                        match event_type {
-                                                            "kline" => {
-                                                                if let Some(kline) = data["data"]["k"].as_object() {
-                                                                    let kline: Result<Kline, _> = serde_json::from_value(json!(kline));
-                                                                    match kline {
-                                                                        Ok(kline) => {
-                                                                            let _ = output.send(Event::KlineReceived(kline)).await;
-                                                                        },
-                                                                        Err(e) => {
-                                                                            dbg!(e);
-                                                                        }
-                                                                    }
+                                                match (data.get("data"), data["data"]["k"]["i"].as_str(), data["data"]["k"].as_object()) {
+                                                    (Some(inner_data), Some(interval), Some(kline_obj)) if inner_data["e"].as_str() == Some("kline") => {
+                                                        match serde_json::from_value::<Kline>(json!(kline_obj)) {
+                                                            Ok(kline) => {
+                                                                if let Some(timeframe) = vec.iter().find(|(_, tf)| tf.to_string() == interval) {
+                                                                    let _ = output.send(Event::KlineReceived(kline, timeframe.1)).await;
                                                                 }
                                                             },
-                                                            _ => {}
+                                                            Err(_) => continue,
                                                         }
-                                                    }
+                                                    },
+                                                    _ => continue,
                                                 }
                                             },
-                                            Err(e) => {
-                                                dbg!(e, message);
-                                            }
+                                            Err(_) => continue,
                                         }
-                                    }
+                                    },
                                     Err(_) => {
                                         let _ = output.send(Event::Disconnected).await;
                                         state = State::Disconnected;
-                                    }
+                                    },
                                     Ok(_) => continue,
                                 }
                             }
@@ -290,7 +280,7 @@ pub enum Event {
     Connected(Connection),
     Disconnected,
     DepthReceived(u64, Vec<(f32, f32)>, Vec<(f32, f32)>, Vec<Trade>),
-    KlineReceived(Kline),
+    KlineReceived(Kline, Timeframe),
 }
 
 #[derive(Debug, Clone)]
@@ -340,6 +330,9 @@ pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kl
     };
 
     let url = format!("https://fapi.binance.com/fapi/v1/klines?symbol={}&interval={}&limit=720", symbol_str, timeframe_str);
+
+    dbg!("fetching klines", &url);
+
     let response = reqwest::get(&url).await?;
     let value: serde_json::Value = response.json().await?;
     let fetched_klines: Result<Vec<FetchedKlines>, _> = serde_json::from_value(value);