Procházet zdrojové kódy

chore: reduce nested code on handling streams

Berke před 9 měsíci
rodič
revize
8f827d14a9

+ 8 - 1
src/data_providers.rs

@@ -26,7 +26,7 @@ pub enum State {
 pub enum Event {
     Connected(Connection),
     Disconnected(String),
-    DepthReceived(Ticker, i64, Depth, Vec<Trade>),
+    DepthReceived(Ticker, i64, Depth, Box<[Trade]>),
     KlineReceived(Ticker, Kline, Timeframe),
 }
 
@@ -609,6 +609,13 @@ async fn setup_websocket_connection(
     Ok(FragmentCollector::new(ws))
 }
 
+fn str_f32_parse(s: &str) -> f32 {
+    s.parse::<f32>().unwrap_or_else(|e| {
+        log::error!("Failed to parse float: {}, error: {}", s, e);
+        0.0
+    })
+}
+
 #[allow(unused_imports)]
 mod tests {
     use super::*;

+ 66 - 68
src/data_providers/binance.rs

@@ -10,21 +10,11 @@ use serde::{Deserialize, Serialize};
 use sonic_rs::{to_object_iter_unchecked, FastStr};
 
 use super::{
-    deserialize_string_to_f32, setup_tcp_connection, setup_tls_connection, setup_websocket_connection, 
-    Connection, Event, Kline, LocalDepthCache, MarketType, OpenInterest, Order, State, StreamError,
-    Ticker, TickerInfo, TickerStats, Timeframe, Trade, VecLocalDepthCache,
+    deserialize_string_to_f32, setup_tcp_connection, setup_tls_connection, setup_websocket_connection, str_f32_parse, 
+    Connection, Event, Kline, LocalDepthCache, MarketType, OpenInterest, Order, State, StreamError, 
+    Ticker, TickerInfo, TickerStats, Timeframe, Trade, VecLocalDepthCache
 };
 
-async fn connect(
-    domain: &str,
-    streams: &str,
-) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
-    let tcp_stream = setup_tcp_connection(domain).await?;
-    let tls_stream = setup_tls_connection(domain, tcp_stream).await?;
-    let url = format!("wss://{domain}/stream?streams={streams}");
-    setup_websocket_connection(domain, tls_stream, &url).await
-}
-
 mod string_to_f32 {
     use serde::{self, Deserialize, Deserializer};
 
@@ -37,13 +27,6 @@ mod string_to_f32 {
     }
 }
 
-fn str_f32_parse(s: &str) -> f32 {
-    s.parse::<f32>().unwrap_or_else(|e| {
-        log::error!("Failed to parse float: {}, error: {}", s, e);
-        0.0
-    })
-}
-
 #[derive(Debug, Deserialize, Clone)]
 pub struct FetchedPerpDepth {
     #[serde(rename = "lastUpdateId")]
@@ -235,6 +218,53 @@ fn feed_de(slice: &[u8], market: MarketType) -> Result<StreamData, StreamError>
     ))
 }
 
+async fn connect(
+    domain: &str,
+    streams: &str,
+) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
+    let tcp_stream = setup_tcp_connection(domain).await?;
+    let tls_stream = setup_tls_connection(domain, tcp_stream).await?;
+    let url = format!("wss://{domain}/stream?streams={streams}");
+    setup_websocket_connection(domain, tls_stream, &url).await
+}
+
+async fn try_resync(
+    ticker: Ticker,
+    orderbook: &mut LocalDepthCache,
+    state: &mut State,
+    output: &mut futures::channel::mpsc::Sender<Event>,
+    already_fetching: &mut bool,
+) {
+    let (tx, rx) = tokio::sync::oneshot::channel();
+    *already_fetching = true;
+
+    tokio::spawn(async move {
+        let result = fetch_depth(&ticker).await;
+        let _ = tx.send(result);
+    });
+    
+    match rx.await {
+        Ok(Ok(depth)) => {
+            orderbook.fetched(&depth);
+        }
+        Ok(Err(e)) => {
+            let _ = output
+                .send(Event::Disconnected(format!("Depth fetch failed: {}", e))).await;
+        }
+        Err(e) => {
+            *state = State::Disconnected;
+            
+            output
+                .send(Event::Disconnected(
+                    format!("Failed to send fetched depth for {ticker}, error: {e}")
+                ))
+                .await
+                .expect("Trying to send disconnect event...");
+        }
+    }
+    *already_fetching = false;
+}
+
 #[allow(unused_assignments)]
 pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
     stream::channel(100, move |mut output| async move {
@@ -334,29 +364,13 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                     {
                                                         log::warn!("Out of sync at first event. Trying to resync...\n");
 
-                                                        let (tx, rx) = tokio::sync::oneshot::channel();
-                                                        already_fetching = true;
-
-                                                        tokio::spawn(async move {
-                                                            let result = fetch_depth(&ticker).await;
-                                                            let _ = tx.send(result);
-                                                        });
-                                                        match rx.await {
-                                                            Ok(Ok(depth)) => {
-                                                                orderbook.fetched(&depth);
-                                                            }
-                                                            Ok(Err(e)) => {
-                                                                let _ = output
-                                                                    .send(Event::Disconnected(format!("Depth fetch failed: {}", e))).await;
-                                                            }
-                                                            Err(e) => {
-                                                                state = State::Disconnected;
-                                                                output.send(Event::Disconnected(
-                                                                        format!("Failed to send fetched depth for {symbol_str}, error: {e}")
-                                                                    )).await.expect("Trying to send disconnect event...");
-                                                            }
-                                                        }
-                                                        already_fetching = false;
+                                                        try_resync(
+                                                            ticker, 
+                                                            &mut orderbook, 
+                                                            &mut state, 
+                                                            &mut output, 
+                                                            &mut already_fetching
+                                                        ).await;
                                                     }
 
                                                     if (prev_id == 0) || (prev_id == de_depth.prev_final_id)
@@ -372,7 +386,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                                 ticker,
                                                                 time,
                                                                 orderbook.get_depth(),
-                                                                std::mem::take(&mut trades_buffer),
+                                                                std::mem::take(&mut trades_buffer).into_boxed_slice(),
                                                             ))
                                                             .await;
 
@@ -399,29 +413,13 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                     {
                                                         log::warn!("Out of sync at first event. Trying to resync...\n");
 
-                                                        let (tx, rx) = tokio::sync::oneshot::channel();
-                                                        already_fetching = true;
-
-                                                        tokio::spawn(async move {
-                                                            let result = fetch_depth(&ticker).await;
-                                                            let _ = tx.send(result);
-                                                        });
-                                                        match rx.await {
-                                                            Ok(Ok(depth)) => {
-                                                                orderbook.fetched(&depth);
-                                                            }
-                                                            Ok(Err(e)) => {
-                                                                let _ = output
-                                                                    .send(Event::Disconnected(format!("Depth fetch failed: {}", e))).await;
-                                                            }
-                                                            Err(e) => {
-                                                                state = State::Disconnected;
-                                                                output.send(Event::Disconnected(
-                                                                        format!("Failed to send fetched depth for {symbol_str}, error: {e}")
-                                                                    )).await.expect("Trying to send disconnect event...");
-                                                            }
-                                                        }
-                                                        already_fetching = false;
+                                                        try_resync(
+                                                            ticker, 
+                                                            &mut orderbook, 
+                                                            &mut state, 
+                                                            &mut output, 
+                                                            &mut already_fetching
+                                                        ).await;
                                                     }
 
                                                     if (prev_id == 0) || (prev_id == de_depth.first_id - 1)
@@ -437,7 +435,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                                 ticker,
                                                                 time,
                                                                 orderbook.get_depth(),
-                                                                std::mem::take(&mut trades_buffer),
+                                                                std::mem::take(&mut trades_buffer).into_boxed_slice(),
                                                             ))
                                                             .await;
 

+ 65 - 82
src/data_providers/bybit.rs

@@ -25,6 +25,7 @@ use crate::data_providers::{
 };
 use crate::{Ticker, Timeframe};
 
+use super::str_f32_parse;
 use super::OpenInterest;
 
 #[derive(Serialize, Deserialize, Debug)]
@@ -208,7 +209,10 @@ fn feed_de(
     Err(StreamError::UnknownError("Unknown data".to_string()))
 }
 
-async fn connect(domain: &str, market_type: MarketType) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
+async fn connect(
+    domain: &str, 
+    market_type: MarketType
+) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
     let tcp_stream = setup_tcp_connection(domain).await?;
     let tls_stream = setup_tls_connection(domain, tcp_stream).await?;
     let url = format!(
@@ -221,26 +225,43 @@ async fn connect(domain: &str, market_type: MarketType) -> Result<FragmentCollec
     setup_websocket_connection(domain, tls_stream, &url).await
 }
 
-fn str_f32_parse(s: &str) -> f32 {
-    s.parse::<f32>().unwrap_or_else(|e| {
-        log::error!("Failed to parse float: {}, error: {}", s, e);
-        0.0
-    })
-}
+async fn try_connect(
+    streams: &Value,
+    market_type: MarketType,
+    output: &mut futures::channel::mpsc::Sender<Event>,
+) -> State {
+    match connect("stream.bybit.com", market_type).await {
+        Ok(mut websocket) => {
+            if let Err(e) = websocket
+                .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
+                    streams.to_string().as_bytes(),
+                )))
+                .await
+            {
+                let _ = output
+                    .send(Event::Disconnected(format!("Failed subscribing: {e}")))
+                    .await;
+                return State::Disconnected;
+            }
 
-fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
-    Timeframe::ALL
-        .iter()
-        .find(|&tf| tf.to_minutes().to_string() == interval)
-        .copied()
+            let _ = output.send(Event::Connected(Connection)).await;
+            State::Connected(websocket)
+        }
+        Err(err) => {
+            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+            let _ = output
+                .send(Event::Disconnected(format!("Failed to connect: {err}")))
+                .await;
+            State::Disconnected
+        }
+    }
 }
 
 pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
     stream::channel(100, move |mut output| async move {
         let mut state: State = State::Disconnected;
 
-        let mut trades_buffer: Vec<Trade> = Vec::new();
-
         let (symbol_str, market_type) = ticker.get_string();
 
         let stream_1 = format!("publicTrade.{symbol_str}");
@@ -253,44 +274,22 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
             symbol_str,
         );
 
-        let mut orderbook: LocalDepthCache = LocalDepthCache::new();
+        let subscribe_message = serde_json::json!({
+            "op": "subscribe",
+            "args": [stream_1, stream_2]
+        });
+
+        let mut trades_buffer: Vec<Trade> = Vec::new();
+        let mut orderbook = LocalDepthCache::new();
 
         loop {
             match &mut state {
                 State::Disconnected => {
-                    let domain: &str = "stream.bybit.com";
-
-                    if let Ok(mut websocket) = connect(domain, market_type).await {
-                        let subscribe_message: String = serde_json::json!({
-                            "op": "subscribe",
-                            "args": [stream_1, stream_2]
-                        })
-                        .to_string();
-
-                        if let Err(e) = websocket
-                            .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
-                                subscribe_message.as_bytes(),
-                            )))
-                            .await
-                        {
-                            let _ = output
-                                .send(Event::Disconnected(format!("Failed subscribing: {e}")))
-                                .await;
-
-                            continue;
-                        }
-
-                        state = State::Connected(websocket);
-                        let _ = output.send(Event::Connected(Connection)).await;
-                    } else {
-                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
-
-                        let _ = output
-                            .send(Event::Disconnected(
-                                "Failed to connect to websocket".to_string(),
-                            ))
-                            .await;
-                    }
+                    state = try_connect(
+                        &subscribe_message, 
+                        market_type,
+                        &mut output
+                    ).await;
                 }
                 State::Connected(websocket) => match websocket.read_frame().await {
                     Ok(msg) => match msg.opcode {
@@ -343,7 +342,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                     ticker,
                                                     time,
                                                     orderbook.get_depth(),
-                                                    std::mem::take(&mut trades_buffer),
+                                                    std::mem::take(&mut trades_buffer).into_boxed_slice(),
                                                 ))
                                                 .await;
                                         }
@@ -390,43 +389,20 @@ pub fn connect_kline_stream(
                 format!("kline.{timeframe_str}.{}", ticker.get_string().0)
             })
             .collect::<Vec<String>>();
+    
+        let subscribe_message = serde_json::json!({
+            "op": "subscribe",
+            "args": stream_str
+        });
 
         loop {
             match &mut state {
                 State::Disconnected => {
-                    let domain = "stream.bybit.com";
-
-                    if let Ok(mut websocket) = connect(domain, market_type).await {
-                        let subscribe_message = serde_json::json!({
-                            "op": "subscribe",
-                            "args": stream_str
-                        })
-                        .to_string();
-
-                        if let Err(e) = websocket
-                            .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
-                                subscribe_message.as_bytes(),
-                            )))
-                            .await
-                        {
-                            let _ = output
-                                .send(Event::Disconnected(format!("Failed subscribing: {e}")))
-                                .await;
-
-                            continue;
-                        }
-
-                        state = State::Connected(websocket);
-                        let _ = output.send(Event::Connected(Connection)).await;
-                    } else {
-                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
-
-                        let _ = output
-                            .send(Event::Disconnected(
-                                "Failed to connect to websocket".to_string(),
-                            ))
-                            .await;
-                    }
+                    state = try_connect(
+                        &subscribe_message, 
+                        market_type,
+                        &mut output
+                    ).await;
                 }
                 State::Connected(websocket) => match websocket.read_frame().await {
                     Ok(msg) => match msg.opcode {
@@ -481,6 +457,13 @@ pub fn connect_kline_stream(
     })
 }
 
+fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
+    Timeframe::ALL
+        .iter()
+        .find(|&tf| tf.to_minutes().to_string() == interval)
+        .copied()
+}
+
 #[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
 #[serde(rename_all = "camelCase")]
 struct DeOpenInterest {

+ 1 - 3
src/screen/dashboard.rs

@@ -1260,13 +1260,11 @@ impl Dashboard {
         stream_type: &StreamType,
         depth_update_t: i64,
         depth: Depth,
-        trades_buffer: Vec<Trade>,
+        trades_buffer: Box<[Trade]>,
         main_window: window::Id,
     ) -> Task<Message> {
         let mut found_match = false;
 
-        let trades_buffer = trades_buffer.into_boxed_slice();
-
         self.iter_all_panes_mut(main_window)
             .for_each(|(_, _, pane_state)| {
                 if pane_state.matches_stream(stream_type) {