Jelajahi Sumber

deserialization optimization, code readability

Berke 1 tahun lalu
induk
melakukan
6fc8f434d1
1 mengubah file dengan 131 tambahan dan 127 penghapusan
  1. 131 127
      src/data_providers/binance/market_data.rs

+ 131 - 127
src/data_providers/binance/market_data.rs

@@ -1,67 +1,36 @@
 use iced::futures;  
 use iced::subscription::{self, Subscription};
-use serde::Deserialize;
-use serde_json::json;
-
-mod string_to_f32 {
-    use serde::{self, Deserialize, Deserializer};
-
-    pub fn deserialize<'de, D>(deserializer: D) -> Result<f32, D::Error>
-    where
-        D: Deserializer<'de>,
-    {
-        let s = String::deserialize(deserializer)?;
-        s.parse::<f32>().map_err(serde::de::Error::custom)
-    }
-}
-
+use serde::{Deserialize, Deserializer};
 use futures::channel::mpsc;
 use futures::sink::SinkExt;
 use futures::stream::StreamExt;
 
 use async_tungstenite::tungstenite;
-
 use crate::{Ticker, Timeframe};
 
-#[derive(Deserialize, Debug, Clone)]
-pub struct StreamWrapper {
-    pub stream: String,
-    pub data: serde_json::Value,
-}
-#[derive(Deserialize, Debug, Clone)]
-pub struct Trade {
-    #[serde(rename = "T")]
-    pub time: u64,
-    #[serde(rename = "m")]
-    pub is_sell: bool,
-    #[serde(with = "string_to_f32", rename = "p")]
-    pub price: f32,
-    #[serde(with = "string_to_f32", rename = "q")]
-    pub qty: f32,
+#[derive(Debug)]
+#[allow(clippy::large_enum_variant)]
+enum State {
+    Disconnected,
+    Connected(
+        async_tungstenite::WebSocketStream<
+            async_tungstenite::tokio::ConnectStream,
+        >,
+    ),
 }
+
 #[derive(Debug, Clone)]
-pub struct Depth {
-    pub bids: Vec<(f32, f32)>,
-    pub asks: Vec<(f32, f32)>,
-}
-#[derive(Deserialize, Debug, Clone)]
-pub struct Kline {
-    #[serde(rename = "t")]
-    pub time: u64,
-    #[serde(with = "string_to_f32", rename = "o")]
-    pub open: f32,
-    #[serde(with = "string_to_f32", rename = "h")]
-    pub high: f32,
-    #[serde(with = "string_to_f32", rename = "l")]
-    pub low: f32,
-    #[serde(with = "string_to_f32", rename = "c")]
-    pub close: f32,
-    #[serde(with = "string_to_f32", rename = "v")]
-    pub volume: f32,
-    #[serde(with = "string_to_f32", rename = "V")]
-    pub taker_buy_base_asset_volume: f32,
+pub enum Event {
+    Connected(Connection),
+    Disconnected,
+    DepthReceived(i64, Depth, Vec<Trade>),
+    KlineReceived(Kline, Timeframe),
 }
 
+#[derive(Debug, Clone)]
+pub struct Connection(mpsc::Sender<String>);
+
+
 pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
     struct Connect;
 
@@ -79,12 +48,13 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                 Ticker::LTCUSDT => "ltcusdt",
             };
 
-            let streams = format!("{}@aggTrade/{}@depth20@100ms", symbol_str, symbol_str);
+            let stream_1 = format!("{}@aggTrade", symbol_str);
+            let stream_2 = format!("{}@depth20@100ms", symbol_str);
             
             loop {
                 match &mut state {
                     State::Disconnected => {        
-                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}", streams);
+                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}/{}", stream_1, stream_2);
 
                         match async_tungstenite::tokio::connect_async(
                             websocket_server,
@@ -111,52 +81,25 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                             received = fused_websocket.select_next_some() => {
                                 match received {
                                     Ok(tungstenite::Message::Text(message)) => {
-                                        let parsed_message: Result<serde_json::Value, _> = serde_json::from_str(&message);
-                                        match parsed_message {
-                                            Ok(data) => {
-                                                if let Some(inner_data) = data.get("data") {
-                                                    if let Some(event_type) = inner_data["e"].as_str() {
-                                                        match event_type {
-                                                            "aggTrade" => {
-                                                                let trade: Result<Trade, _> = serde_json::from_value(data["data"].clone());
-                                                                match trade {
-                                                                    Ok(trade) => {
-                                                                        trades_buffer.push(trade);
-                                                                    },
-                                                                    Err(e) => {
-                                                                        dbg!(e);
-                                                                    }
-                                                                }
-                                                            },
-                                                            "depthUpdate" => {
-                                                                let update_time = data["data"]["T"].as_u64().unwrap();
-
-                                                                if let Some(bids_data) = data["data"]["b"].as_array() {
-                                                                    let bids: Vec<(f32, f32)> = bids_data.iter().map(|bid| {
-                                                                        let price = bid[0].as_str().unwrap().parse().unwrap();
-                                                                        let qty = bid[1].as_str().unwrap().parse().unwrap();
-                                                                        (price, qty)
-                                                                    }).collect();
-
-                                                                    if let Some(asks_data) = data["data"]["a"].as_array() {
-                                                                        let asks: Vec<(f32, f32)> = asks_data.iter().map(|ask| {
-                                                                            let price = ask[0].as_str().unwrap().parse().unwrap();
-                                                                            let qty = ask[1].as_str().unwrap().parse().unwrap();
-                                                                            (price, qty)
-                                                                        }).collect();
-
-                                                                        let _ = output.send(Event::DepthReceived(update_time, bids, asks, std::mem::take(&mut trades_buffer))).await;
-                                                                    }
-                                                                }
-                                                            },
-                                                            _ => {}
-                                                        }
-                                                    }
-                                                }
-                                            },
-                                            Err(e) => {
-                                                dbg!(e, message);
-                                            }
+                                        let stream: Stream = serde_json::from_str(&message).unwrap_or(Stream { stream: "".to_string() });
+                                        if stream.stream == stream_1 {
+                                            let agg_trade: AggTrade = serde_json::from_str(&message).unwrap();
+                                            trades_buffer.push(agg_trade.data);
+                                        } else if stream.stream == stream_2 {
+                                            let depth_update: DepthUpdate = serde_json::from_str(&message).unwrap();
+                                            let _ = output.send(
+                                                Event::DepthReceived(
+                                                    depth_update.data.time, 
+                                                    Depth {
+                                                        time: depth_update.data.time,
+                                                        bids: depth_update.data.bids,
+                                                        asks: depth_update.data.asks,
+                                                    }, 
+                                                    std::mem::take(&mut trades_buffer)
+                                                )
+                                            ).await;
+                                        } else {
+                                            dbg!(stream.stream);
                                         }
                                     }
                                     Err(_) => {
@@ -234,17 +177,22 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
                                             Ok(data) => {
                                                 match (data.get("data"), data["data"]["k"]["i"].as_str(), data["data"]["k"].as_object()) {
                                                     (Some(inner_data), Some(interval), Some(kline_obj)) if inner_data["e"].as_str() == Some("kline") => {
-                                                        match serde_json::from_value::<Kline>(json!(kline_obj)) {
-                                                            Ok(kline) => {
-                                                                if let Some(timeframe) = vec.iter().find(|(_, tf)| tf.to_string() == interval) {
-                                                                    let _ = output.send(Event::KlineReceived(kline, timeframe.1)).await;
-                                                                }
-                                                            },
-                                                            Err(_) => continue,
+                                                        let kline = Kline {
+                                                            time: kline_obj["t"].as_u64().unwrap_or_default(),
+                                                            open: kline_obj["o"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            high: kline_obj["h"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            low: kline_obj["l"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            close: kline_obj["c"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            volume: kline_obj["v"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            taker_buy_base_asset_volume: kline_obj["V"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                        };
+                                                
+                                                        if let Some(timeframe) = vec.iter().find(|(_, tf)| tf.to_string() == interval) {
+                                                            let _ = output.send(Event::KlineReceived(kline, timeframe.1)).await;
                                                         }
                                                     },
                                                     _ => continue,
-                                                }
+                                                }                                                
                                             },
                                             Err(_) => continue,
                                         }
@@ -264,28 +212,85 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
     )
 }
 
-#[derive(Debug)]
-#[allow(clippy::large_enum_variant)]
-enum State {
-    Disconnected,
-    Connected(
-        async_tungstenite::WebSocketStream<
-            async_tungstenite::tokio::ConnectStream,
-        >,
-    ),
+mod string_to_f32 {
+    use serde::{self, Deserialize, Deserializer};
+
+    pub fn deserialize<'de, D>(deserializer: D) -> Result<f32, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let s: &str = <&str>::deserialize(deserializer)?;
+        s.parse::<f32>().map_err(serde::de::Error::custom)
+    }
 }
 
-#[derive(Debug, Clone)]
-pub enum Event {
-    Connected(Connection),
-    Disconnected,
-    DepthReceived(u64, Vec<(f32, f32)>, Vec<(f32, f32)>, Vec<Trade>),
-    KlineReceived(Kline, Timeframe),
+#[derive(Deserialize)]
+struct Stream {
+    stream: String,
+}
+#[derive(Deserialize, Debug)]
+struct AggTrade {
+    data: Trade,
+}
+#[derive(Deserialize, Debug)]
+struct DepthUpdate {
+    data: Depth,
 }
 
+#[derive(Deserialize, Debug, Clone)]
+pub struct Trade {
+    #[serde(rename = "T")]
+    pub time: i64,
+    #[serde(rename = "m")]
+    pub is_sell: bool,
+    #[serde(with = "string_to_f32", rename = "p")]
+    pub price: f32,
+    #[serde(with = "string_to_f32", rename = "q")]
+    pub qty: f32,
+}
+#[derive(Debug, Deserialize, Clone)]
+pub struct Depth {
+    #[serde(rename = "T")]
+    pub time: i64,
+    #[serde(rename = "b")]
+    pub bids: Vec<Order>,
+    #[serde(rename = "a")]
+    pub asks: Vec<Order>,
+}
 #[derive(Debug, Clone)]
-pub struct Connection(mpsc::Sender<String>);
+pub struct Order {
+    pub price: f32,
+    pub qty: f32,
+}
+impl<'de> Deserialize<'de> for Order {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let arr: Vec<&str> = Vec::<&str>::deserialize(deserializer)?;
+        let price: f32 = arr[0].parse::<f32>().map_err(serde::de::Error::custom)?;
+        let qty: f32 = arr[1].parse::<f32>().map_err(serde::de::Error::custom)?;
+        Ok(Order { price, qty })
+    }
+}
 
+#[derive(Deserialize, Debug, Clone)]
+pub struct Kline {
+    #[serde(rename = "t")]
+    pub time: u64,
+    #[serde(with = "string_to_f32", rename = "o")]
+    pub open: f32,
+    #[serde(with = "string_to_f32", rename = "h")]
+    pub high: f32,
+    #[serde(with = "string_to_f32", rename = "l")]
+    pub low: f32,
+    #[serde(with = "string_to_f32", rename = "c")]
+    pub close: f32,
+    #[serde(with = "string_to_f32", rename = "v")]
+    pub volume: f32,
+    #[serde(with = "string_to_f32", rename = "V")]
+    pub taker_buy_base_asset_volume: f32,
+}
 #[derive(Deserialize, Debug, Clone)]
 struct FetchedKlines (
     u64,
@@ -331,11 +336,10 @@ pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kl
 
     let url = format!("https://fapi.binance.com/fapi/v1/klines?symbol={}&interval={}&limit=720", symbol_str, timeframe_str);
 
-    dbg!("fetching klines", &url);
-
     let response = reqwest::get(&url).await?;
-    let value: serde_json::Value = response.json().await?;
-    let fetched_klines: Result<Vec<FetchedKlines>, _> = serde_json::from_value(value);
+    let text = response.text().await?;
+    let fetched_klines: Result<Vec<FetchedKlines>, _> = serde_json::from_str(&text);
     let klines: Vec<Kline> = fetched_klines.unwrap().into_iter().map(Kline::from).collect();
+
     Ok(klines)
 }