Răsfoiți Sursa

improve self-healing for websocket conn. condition

Berke 1 an în urmă
părinte
comite
0776c9f43d
1 a modificat fișierele cu 62 adăugiri și 60 ștergeri
  1. 62 60
      src/data_providers/binance/market_data.rs

+ 62 - 60
src/data_providers/binance/market_data.rs

@@ -3,7 +3,6 @@ use futures::stream::Stream;
 use serde::Deserializer;
 use futures::sink::SinkExt;
 
-use serde_json::Value;
 use crate::{Ticker, Timeframe};
 
 use bytes::Bytes;
@@ -23,7 +22,9 @@ use tokio::net::TcpStream;
 use tokio_rustls::rustls::{ClientConfig, OwnedTrustAnchor};
 use tokio_rustls::TlsConnector;
 
-use crate::data_providers::{LocalDepthCache, Trade, Depth, Order, FeedLatency, Kline};
+use crate::data_providers::{
+    LocalDepthCache, Trade, Depth, Order, FeedLatency, Kline, StreamError,
+};
 
 #[allow(clippy::large_enum_variant)]
 enum State {
@@ -339,7 +340,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                             tokio::spawn(async move {
                                 let fetched_depth = fetch_depth(selected_ticker).await;
 
-                                let depth: LocalDepthCache = match fetched_depth {
+                                let depth = match fetched_depth {
                                     Ok(depth) => {
                                         LocalDepthCache {
                                             last_update_id: depth.update_id,
@@ -360,6 +361,8 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                 Ok(depth) => {
                                     orderbook.fetched(depth);
 
+                                    prev_id = 0;
+
                                     state = State::Connected(websocket);
                                     let _ = output.send(Event::Connected(Connection)).await;                                 
                                 },
@@ -423,7 +426,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                     tokio::spawn(async move {
                                                         let fetched_depth = fetch_depth(selected_ticker).await;
     
-                                                        let depth: LocalDepthCache = match fetched_depth {
+                                                        let depth = match fetched_depth {
                                                             Ok(depth) => {
                                                                 LocalDepthCache {
                                                                     last_update_id: depth.update_id,
@@ -497,7 +500,12 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
     
                                                     prev_id = de_depth.final_id;
                                                 } else {
-                                                    log::error!("Out of sync...\n");
+                                                    state = State::Disconnected;
+                                                    let _ = output.send(
+                                                        Event::Disconnected(
+                                                            format!("Out of sync. Expected update_id: {}, got: {}", de_depth.prev_final_id, prev_id)
+                                                        )
+                                                    ).await;
                                                 }
                                             },
                                             _ => {}
@@ -647,12 +655,12 @@ struct FetchedKlines (
     #[serde(with = "string_to_f32")] f32,
     #[serde(with = "string_to_f32")] f32,
     #[serde(with = "string_to_f32")] f32,
-    u64,
-    String,
-    u32,
+    (),
+    (),
+    (),
     #[serde(with = "string_to_f32")] f32,
-    String,
-    String,
+    (),
+    (),
 );
 impl From<FetchedKlines> for Kline {
     fn from(fetched: FetchedKlines) -> Self {
@@ -669,13 +677,8 @@ impl From<FetchedKlines> for Kline {
     }
 }
 
-pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>, reqwest::Error> {
-    let symbol_str = match ticker {
-        Ticker::BTCUSDT => "btcusdt",
-        Ticker::ETHUSDT => "ethusdt",
-        Ticker::SOLUSDT => "solusdt",
-        Ticker::LTCUSDT => "ltcusdt",
-    };
+pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>, StreamError> {
+    let symbol_str = ticker.get_string();
     let timeframe_str = match timeframe {
         Timeframe::M1 => "1m",
         Timeframe::M3 => "3m",
@@ -686,65 +689,64 @@ pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kl
 
     let url = format!("https://fapi.binance.com/fapi/v1/klines?symbol={symbol_str}&interval={timeframe_str}&limit=720");
 
-    let response = reqwest::get(&url).await?;
-    let text = response.text().await?;
-    let fetched_klines: Result<Vec<FetchedKlines>, _> = serde_json::from_str(&text);
-    let klines: Vec<Kline> = fetched_klines.unwrap().into_iter().map(Kline::from).collect();
+    let response = reqwest::get(&url)
+        .await.map_err(StreamError::FetchError)?;
+    let text = response.text()
+        .await.map_err(StreamError::FetchError)?;
+
+    let fetched_klines: Vec<FetchedKlines> = serde_json::from_str(&text)
+        .map_err(|e| StreamError::ParseError(format!("Failed to parse klines: {}", e)))?;
+
+    let klines= fetched_klines.into_iter().map(Kline::from).collect();
 
     Ok(klines)
 }
 
-pub async fn fetch_depth(ticker: Ticker) -> Result<FetchedDepth, reqwest::Error> {
-    let symbol_str = match ticker {
-        Ticker::BTCUSDT => "btcusdt",
-        Ticker::ETHUSDT => "ethusdt",
-        Ticker::SOLUSDT => "solusdt",
-        Ticker::LTCUSDT => "ltcusdt",
-    };
+pub async fn fetch_depth(ticker: Ticker) -> Result<FetchedDepth, StreamError> {
+    let symbol_str = ticker.get_string();
 
     let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={symbol_str}&limit=1000");
 
-    let response = reqwest::get(&url).await?;
-    let text = response.text().await?;
-    let depth: FetchedDepth = serde_json::from_str(&text).unwrap();
+    let response = reqwest::get(&url)
+        .await.map_err(StreamError::FetchError)?;
+    let text = response.text().await
+        .map_err(StreamError::FetchError)?;
+
+    let depth: FetchedDepth = serde_json::from_str(&text).map_err(|e| {
+        log::error!("Failed to parse depth: {}", text);
+        StreamError::ParseError(e.to_string())
+    })?;
 
     Ok(depth)
 }
 
-pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32, reqwest::Error> {
-    let symbol_str = match ticker {
-        Ticker::BTCUSDT => "BTCUSDT",
-        Ticker::ETHUSDT => "ETHUSDT",
-        Ticker::SOLUSDT => "SOLUSDT",
-        Ticker::LTCUSDT => "LTCUSDT",
-    };
-
+pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32, StreamError> {
+    let symbol_str = ticker.get_string().to_uppercase();
     let url = "https://fapi.binance.com/fapi/v1/exchangeInfo".to_string();
 
-    let response = reqwest::get(&url).await?;
-    let text = response.text().await?;
-    let exchange_info: Value = serde_json::from_str(&text).unwrap();
+    let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
+    let text = response.text().await.map_err(StreamError::FetchError)?;
 
-    let symbols = exchange_info["symbols"].as_array().unwrap();
+    let exchange_info: serde_json::Value = serde_json::from_str(&text)
+        .map_err(|e| StreamError::ParseError(format!("Failed to parse exchange info: {}", e)))?;
 
-    let symbol = symbols.iter().find(|x| x["symbol"].as_str().unwrap() == symbol_str).unwrap();
+    let symbols = exchange_info["symbols"].as_array()
+        .ok_or_else(|| StreamError::ParseError("Missing symbols array".to_string()))?;
 
-    let tick_size = symbol["filters"].as_array().unwrap().iter().find(|x| x["filterType"].as_str().unwrap() == "PRICE_FILTER").unwrap()["tickSize"].as_str().unwrap().parse::<f32>().unwrap();
+    let symbol = symbols.iter()
+        .find(|x| x["symbol"].as_str().unwrap_or_default() == symbol_str)
+        .ok_or_else(|| StreamError::ParseError(format!("Symbol {} not found", symbol_str)))?;
 
-    Ok(tick_size)
-}
+    let tick_size_str = symbol["filters"].as_array()
+        .ok_or_else(|| StreamError::ParseError("Missing filters array".to_string()))?
+        .iter()
+        .find(|x| x["filterType"].as_str().unwrap_or_default() == "PRICE_FILTER")
+        .ok_or_else(|| StreamError::ParseError("PRICE_FILTER not found".to_string()))?
+        ["tickSize"].as_str()
+        .ok_or_else(|| StreamError::ParseError("tickSize not found".to_string()))?;
 
-pub async fn fetch_server_time() -> Result<i64> {
-    let url = "https://fapi.binance.com/fapi/v1/time";
+    let tick_size = tick_size_str.parse::<f32>()
+        .map_err(|e| StreamError::ParseError(format!("Failed to parse tickSize: {}", e)))?;
 
-    let response = reqwest::get(url).await.context("Failed to send request")?;
-    let text = response.text().await.context("Failed to read response")?;
-    
-    let server_time: Value = serde_json::from_str(&text).context("Failed to parse JSON")?;
-
-    if let Some(time) = server_time["serverTime"].as_i64() {
-        Ok(time)
-    } else {
-        anyhow::bail!("Invalid server time")
-    }
-}
+    Ok(tick_size)
+}