Selaa lähdekoodia

add kline stream; refactored websocket logic to allow this

Berke 1 vuosi sitten
vanhempi
commit
c269307786
1 muutettua tiedostoa jossa 50 lisäystä ja 8 poistoa
  1. 50 8
      src/ws_binance.rs

+ 50 - 8
src/ws_binance.rs

@@ -22,9 +22,9 @@ use futures::FutureExt;
 use async_tungstenite::tungstenite;
 
 #[derive(Deserialize, Debug, Clone)]
-pub struct TradeWrapper {
+pub struct StreamWrapper {
     pub stream: String,
-    pub data: Trade,
+    pub data: serde_json::Value,
 }
 #[derive(Deserialize, Debug, Clone)]
 pub struct Trade {
@@ -37,6 +37,24 @@ pub struct Trade {
     #[serde(with = "string_to_f32", rename = "q")]
     pub qty: f32,
 }
+#[derive(Deserialize, Debug, Clone)]
+pub struct Kline {
+    #[serde(rename = "t")]
+    pub time: u64,
+    #[serde(with = "string_to_f32", rename = "o")]
+    pub open: f32,
+    #[serde(with = "string_to_f32", rename = "h")]
+    pub high: f32,
+    #[serde(with = "string_to_f32", rename = "l")]
+    pub low: f32,
+    #[serde(with = "string_to_f32", rename = "c")]
+    pub close: f32,
+    #[serde(with = "string_to_f32", rename = "v")]
+    pub volume: f32,
+    #[serde(with = "string_to_f32", rename = "V")]
+    pub taker_buy_base_asset_volume: f32,
+}
+
 pub fn connect(selected_ticker: String) -> Subscription<Event> {
     struct Connect;
 
@@ -51,7 +69,7 @@ pub fn connect(selected_ticker: String) -> Subscription<Event> {
             loop {
                 match &mut state {
                     State::Disconnected => {
-                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}@aggTrade", selected_ticker.to_lowercase());
+                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={}@aggTrade/{}@kline_1m", selected_ticker.to_lowercase(), selected_ticker.to_lowercase());
 
                         match async_tungstenite::tokio::connect_async(
                             websocket_server,
@@ -78,13 +96,36 @@ pub fn connect(selected_ticker: String) -> Subscription<Event> {
                             received = fused_websocket.select_next_some() => {
                                 match received {
                                     Ok(tungstenite::Message::Text(message)) => {
-                                        let parsed_message: Result<TradeWrapper, _> = serde_json::from_str(&message);
+                                        let parsed_message: Result<StreamWrapper, _> = serde_json::from_str(&message);
                                         match parsed_message {
-                                            Ok(message) => {
-                                                trades_buffer.push(message);
+                                            Ok(mut wrapper) => {
+                                                if wrapper.stream.contains("aggTrade") {
+                                                    let trade: Result<Trade, _> = serde_json::from_str(&wrapper.data.to_string());
+                                                    match trade {
+                                                        Ok(trade) => {
+                                                            trades_buffer.push(trade);
+                                                        },
+                                                        Err(e) => {
+                                                            dbg!(e);
+                                                        }
+                                                    }
+                                                } else if wrapper.stream.contains("kline") {
+                                                    if let Some(kline) = wrapper.data.get_mut("k") {
+                                                        let kline: Result<Kline, _> = serde_json::from_value(kline.take());
+                                                        match kline {
+                                                            Ok(kline) => {
+                                                                let _ = output.send(Event::KlineReceived(kline)).await;
+                                                            },
+                                                            Err(e) => {
+                                                                dbg!(e);
+                                                            }
+                                                        }
+                                                    }
+                                                }
                                             },
                                             Err(e) => {
                                                 dbg!(e);
+                                                dbg!(message); 
                                             }
                                         }
                                     }
@@ -97,7 +138,7 @@ pub fn connect(selected_ticker: String) -> Subscription<Event> {
                             }
                             _ = tokio::time::sleep(buffer_flush_interval).fuse() => {
                                 if !trades_buffer.is_empty() {
-                                    let _ = output.send(Event::MessageReceived(std::mem::take(&mut trades_buffer))).await;
+                                    let _ = output.send(Event::TradeReceived(std::mem::take(&mut trades_buffer))).await;
                                 }
                             }
                         }
@@ -123,7 +164,8 @@ enum State {
 pub enum Event {
     Connected(Connection),
     Disconnected,
-    MessageReceived(Vec<TradeWrapper>),
+    TradeReceived(Vec<Trade>),
+    KlineReceived(Kline),
 }
 
 #[derive(Debug, Clone)]