Selaa lähdekoodia

add depth stream to rely on it to update the plots, instead of manual timer

Berke 1 vuosi sitten
vanhempi
commit
836223519e
1 muutettua tiedostoa jossa 21 lisäystä ja 10 poistoa
  1. 21 10
      src/ws_binance.rs

+ 21 - 10
src/ws_binance.rs

@@ -17,7 +17,6 @@ mod string_to_f32 {
 use futures::channel::mpsc;
 use futures::sink::SinkExt;
 use futures::stream::StreamExt;
-use futures::FutureExt;
 
 use async_tungstenite::tungstenite;
 
@@ -37,6 +36,11 @@ pub struct Trade {
     #[serde(with = "string_to_f32", rename = "q")]
     pub qty: f32,
 }
+#[derive(Debug, Clone)]
+pub struct Depth {
+    pub bids: Vec<(f32, f32)>,
+    pub asks: Vec<(f32, f32)>,
+}
 #[derive(Deserialize, Debug, Clone)]
 pub struct Kline {
     #[serde(rename = "t")]
@@ -64,13 +68,13 @@ pub fn connect(selected_ticker: String) -> Subscription<Event> {
         |mut output| async move {
             let mut state = State::Disconnected;     
             let mut trades_buffer = Vec::new(); 
-            let buffer_flush_interval = tokio::time::Duration::from_millis(1000 / 30);
  
             loop {
                 match &mut state {
                     State::Disconnected => {
-                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}@aggTrade/{}@kline_1m", selected_ticker.to_lowercase(), selected_ticker.to_lowercase());
-
+                        let symbol = selected_ticker.to_lowercase();
+                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}@aggTrade/{}@depth5@100ms/{}@kline_1m", symbol, symbol, symbol);
+                        
                         match async_tungstenite::tokio::connect_async(
                             websocket_server,
                         )
@@ -109,6 +113,18 @@ pub fn connect(selected_ticker: String) -> Subscription<Event> {
                                                             dbg!(e);
                                                         }
                                                     }
+                                                } else if wrapper.stream.contains("depth") {
+                                                    if let Some(bids_data) = wrapper.data.get_mut("b") {
+                                                        let bids: Vec<(String, String)> = serde_json::from_value(bids_data.take()).unwrap();
+                                                        let bids: Vec<(f32, f32)> = bids.into_iter().map(|(price, qty)| (price.parse().unwrap(), qty.parse().unwrap())).collect();
+                                                
+                                                        if let Some(asks_data) = wrapper.data.get_mut("a") {
+                                                            let asks: Vec<(String, String)> = serde_json::from_value(asks_data.take()).unwrap();
+                                                            let asks: Vec<(f32, f32)> = asks.into_iter().map(|(price, qty)| (price.parse().unwrap(), qty.parse().unwrap())).collect();
+                                                
+                                                            let _ = output.send(Event::DepthReceived(bids, asks, std::mem::take(&mut trades_buffer))).await;
+                                                        }
+                                                    }
                                                 } else if wrapper.stream.contains("kline") {
                                                     if let Some(kline) = wrapper.data.get_mut("k") {
                                                         let kline: Result<Kline, _> = serde_json::from_value(kline.take());
@@ -136,11 +152,6 @@ pub fn connect(selected_ticker: String) -> Subscription<Event> {
                                     Ok(_) => continue,
                                 }
                             }
-                            _ = tokio::time::sleep(buffer_flush_interval).fuse() => {
-                                if !trades_buffer.is_empty() {
-                                    let _ = output.send(Event::TradeReceived(std::mem::take(&mut trades_buffer))).await;
-                                }
-                            }
                         }
                     }
                 }
@@ -164,7 +175,7 @@ enum State {
 pub enum Event {
     Connected(Connection),
     Disconnected,
-    TradeReceived(Vec<Trade>),
+    DepthReceived(Vec<(f32, f32)>, Vec<(f32, f32)>, Vec<Trade>),
     KlineReceived(Kline),
 }