Bläddra i källkod

separated kline stream to have more control on streams w/o distruption

Berke 1 år sedan
förälder
incheckning
7cc78f7bf8
1 ändrade filer med 109 tillägg och 14 borttagningar
  1. 109 14
      src/data_providers/binance/market_data.rs

+ 109 - 14
src/data_providers/binance/market_data.rs

@@ -63,7 +63,7 @@ pub struct Kline {
     pub taker_buy_base_asset_volume: f32,
 }
 
-pub fn connect_market_stream(selected_ticker: Ticker, timeframe: Timeframe) -> Subscription<Event> {
+pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
     struct Connect;
 
     subscription::channel(
@@ -79,19 +79,14 @@ pub fn connect_market_stream(selected_ticker: Ticker, timeframe: Timeframe) -> S
                 Ticker::SOLUSDT => "solusdt",
                 Ticker::LTCUSDT => "ltcusdt",
             };
-            let timeframe_str = match timeframe {
-                Timeframe::M1 => "1m",
-                Timeframe::M3 => "3m",
-                Timeframe::M5 => "5m",
-                Timeframe::M15 => "15m",
-                Timeframe::M30 => "30m",
-            };
- 
+
+            let streams = format!("{}@aggTrade/{}@depth20@100ms", symbol_str, symbol_str);
+            
             loop {
                 match &mut state {
-                    State::Disconnected => {
-                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}@aggTrade/{}@depth20@100ms/{}@kline_{}", symbol_str, symbol_str, symbol_str, timeframe_str);
-                        
+                    State::Disconnected => {        
+                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}", streams);
+
                         match async_tungstenite::tokio::connect_async(
                             websocket_server,
                         )
@@ -155,6 +150,92 @@ pub fn connect_market_stream(selected_ticker: Ticker, timeframe: Timeframe) -> S
                                                                     }
                                                                 }
                                                             },
+                                                            _ => {}
+                                                        }
+                                                    }
+                                                }
+                                            },
+                                            Err(e) => {
+                                                dbg!(e, message);
+                                            }
+                                        }
+                                    }
+                                    Err(_) => {
+                                        let _ = output.send(Event::Disconnected).await;
+                                        state = State::Disconnected;
+                                    }
+                                    Ok(_) => continue,
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+    )
+}
+
+pub fn connect_kline_stream(stream: (Ticker, Timeframe)) -> Subscription<Event> {
+    struct Connect;
+
+    subscription::channel(
+        std::any::TypeId::of::<Connect>(),
+        100,
+        move |mut output| async move {
+            let mut state = State::Disconnected;     
+
+            let symbol_str = match stream.0 {
+                Ticker::BTCUSDT => "btcusdt",
+                Ticker::ETHUSDT => "ethusdt",
+                Ticker::SOLUSDT => "solusdt",
+                Ticker::LTCUSDT => "ltcusdt",
+            };
+            let timeframe_str = match stream.1 {
+                Timeframe::M1 => "1m",
+                Timeframe::M3 => "3m",
+                Timeframe::M5 => "5m",
+                Timeframe::M15 => "15m",
+                Timeframe::M30 => "30m",
+            };
+
+            let stream_str = format!("{}@kline_{}", symbol_str, timeframe_str);
+ 
+            loop {
+                match &mut state {
+                    State::Disconnected => {
+                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}", stream_str);
+                        
+                        match async_tungstenite::tokio::connect_async(
+                            websocket_server,
+                        )
+                        .await
+                        {
+                            Ok((websocket, _)) => {
+                                state = State::Connected(websocket);
+                            }
+                            Err(_) => {
+                                tokio::time::sleep(
+                                    tokio::time::Duration::from_secs(1),
+                                )
+                                .await;
+
+                                let _ = output.send(Event::Disconnected).await;
+                            }
+                        }
+                    }
+                    State::Connected(websocket) => {
+                        let mut fused_websocket = websocket.by_ref().fuse();
+
+                        futures::select! {
+                            received = fused_websocket.select_next_some() => {
+                                match received {
+                                    Ok(tungstenite::Message::Text(message)) => {
+                                        let parsed_message: Result<serde_json::Value, _> = serde_json::from_str(&message);
+                                        match parsed_message {
+                                            Ok(data) => {
+                                                if let Some(inner_data) = data.get("data") {
+                                                    if let Some(event_type) = inner_data["e"].as_str() {
+                                                        match event_type {
                                                             "kline" => {
                                                                 if let Some(kline) = data["data"]["k"].as_object() {
                                                                     let kline: Result<Kline, _> = serde_json::from_value(json!(kline));
@@ -243,8 +324,22 @@ impl From<FetchedKlines> for Kline {
         }
     }
 }
-pub async fn fetch_klines(ticker: String, timeframe: String) -> Result<Vec<Kline>, reqwest::Error> {
-    let url = format!("https://fapi.binance.com/fapi/v1/klines?symbol={}&interval={}&limit=720", ticker.to_lowercase(), timeframe);
+pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>, reqwest::Error> {
+    let symbol_str = match ticker {
+        Ticker::BTCUSDT => "btcusdt",
+        Ticker::ETHUSDT => "ethusdt",
+        Ticker::SOLUSDT => "solusdt",
+        Ticker::LTCUSDT => "ltcusdt",
+    };
+    let timeframe_str = match timeframe {
+        Timeframe::M1 => "1m",
+        Timeframe::M3 => "3m",
+        Timeframe::M5 => "5m",
+        Timeframe::M15 => "15m",
+        Timeframe::M30 => "30m",
+    };
+
+    let url = format!("https://fapi.binance.com/fapi/v1/klines?symbol={}&interval={}&limit=720", symbol_str, timeframe_str);
     let response = reqwest::get(&url).await?;
     let value: serde_json::Value = response.json().await?;
     let fetched_klines: Result<Vec<FetchedKlines>, _> = serde_json::from_value(value);