소스 검색

add feed latency setters

Berke 1 년 전
부모
커밋
33062ae677
2개의 변경된 파일90개의 추가작업 그리고 13개의 파일을 삭제
  1. 58 4
      src/data_providers/binance/market_data.rs
  2. 32 9
      src/data_providers/bybit/market_data.rs

+ 58 - 4
src/data_providers/binance/market_data.rs

@@ -19,11 +19,18 @@ enum State {
     ),
 }
 
+#[derive(Debug, Clone, Copy)]
+pub struct FeedLatency {
+    pub time: i64,
+    pub depth_latency: i64,
+    pub trade_latency: Option<i64>,
+}
+
 #[derive(Debug, Clone)]
 pub enum Event {
     Connected(Connection),
     Disconnected,
-    DepthReceived(i64, LocalDepthCache, Vec<Trade>),
+    DepthReceived(FeedLatency, i64, LocalDepthCache, Vec<Trade>),
     KlineReceived(Kline, Timeframe),
 }
 
@@ -184,7 +191,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                 Ticker::LTCUSDT => "ltcusdt",
             };
 
-            let stream_1 = format!("{symbol_str}@aggTrade");
+            let stream_1 = format!("{symbol_str}@trade");
             let stream_2 = format!("{symbol_str}@depth@100ms");
 
             let mut orderbook: Depth = Depth::new();
@@ -193,6 +200,8 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
 
             let mut prev_id: i64 = 0;
 
+            let mut trade_latencies: Vec<i64> = Vec::new();
+
             loop {
                 match &mut state {
                     State::Disconnected => {        
@@ -238,6 +247,8 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                     State::Connected(websocket) => {
                         let mut fused_websocket = websocket.by_ref().fuse();
 
+                        let feed_latency: FeedLatency;
+
                         futures::select! {
                             received = fused_websocket.select_next_some() => {
                                 match received {
@@ -247,6 +258,10 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                         if stream.stream == stream_1 {
                                             let agg_trade: AggTrade = serde_json::from_str(&message).unwrap();
                                             trades_buffer.push(agg_trade.data);
+
+                                            let latency = chrono::Utc::now().timestamp_millis() - agg_trade.data.time;
+
+                                            trade_latencies.push(latency);
                                             
                                         } else if stream.stream == stream_2 {
                                             if already_fetching {
@@ -305,6 +320,9 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                                 let depth_update: DepthUpdate = serde_json::from_str(&message).unwrap();
 
                                                 let time = depth_update.data.time;
+
+                                                let depth_latency = chrono::Utc::now().timestamp_millis() - time;
+
                                                 let bids = depth_update.data.bids;
                                                 let asks = depth_update.data.asks;
 
@@ -317,9 +335,28 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                                     bids: local_bids,
                                                     asks: local_asks,
                                                 };
+                                                
+                                                if !trade_latencies.is_empty() {
+                                                    let avg_trade_latency = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
+
+                                                    feed_latency = FeedLatency {
+                                                        time,
+                                                        depth_latency,
+                                                        trade_latency: Some(avg_trade_latency),
+                                                    };
+
+                                                    trade_latencies.clear();
+                                                } else {
+                                                    feed_latency = FeedLatency {
+                                                        time,
+                                                        depth_latency,
+                                                        trade_latency: None,
+                                                    };
+                                                }
 
                                                 let _ = output.send(
                                                     Event::DepthReceived(
+                                                        feed_latency,
                                                         time, 
                                                         local_depth_cache,
                                                         std::mem::take(&mut trades_buffer)
@@ -375,7 +412,7 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
                 };
                 format!("{symbol_str}@kline_{timeframe_str}")
             }).collect::<Vec<String>>().join("/");
- 
+
             loop {
                 match &mut state {
                     State::Disconnected => {
@@ -463,7 +500,7 @@ struct DepthUpdate {
     data: Depth,
 }
 
-#[derive(Deserialize, Debug, Clone)]
+#[derive(Deserialize, Debug, Clone, Copy)]
 pub struct Trade {
     #[serde(rename = "T")]
     pub time: i64,
@@ -584,4 +621,21 @@ pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32, reqwest::Error> {
     let tick_size = symbol["filters"].as_array().unwrap().iter().find(|x| x["filterType"].as_str().unwrap() == "PRICE_FILTER").unwrap()["tickSize"].as_str().unwrap().parse::<f32>().unwrap();
 
     Ok(tick_size)
+}
+
+use anyhow::{Result, Context};
+
+pub async fn fetch_server_time() -> Result<i64> {
+    let url = "https://fapi.binance.com/fapi/v1/time";
+
+    let response = reqwest::get(url).await.context("Failed to send request")?;
+    let text = response.text().await.context("Failed to read response")?;
+    
+    let server_time: Value = serde_json::from_str(&text).context("Failed to parse JSON")?;
+
+    if let Some(time) = server_time["serverTime"].as_i64() {
+        Ok(time)
+    } else {
+        anyhow::bail!("Invalid server time")
+    }
 }

+ 32 - 9
src/data_providers/bybit/market_data.rs

@@ -1,5 +1,3 @@
-use std::os::macos::raw::stat;
-
 use iced::futures;  
 use iced::subscription::{self, Subscription};
 use serde::{de, Deserialize, Deserializer};
@@ -8,6 +6,7 @@ use futures::stream::StreamExt;
 
 use async_tungstenite::tungstenite;
 use serde_json::Value;
+use crate::data_providers::binance::market_data::FeedLatency;
 use crate::{Ticker, Timeframe};
 
 #[derive(Debug)]
@@ -25,7 +24,7 @@ enum State {
 pub enum Event {
     Connected(Connection),
     Disconnected,
-    DepthReceived(i64, LocalDepthCache, Vec<Trade>),
+    DepthReceived(FeedLatency, i64, LocalDepthCache, Vec<Trade>),
     KlineReceived(Kline, Timeframe),
 }
 
@@ -195,9 +194,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
 
             let mut orderbook: Depth = Depth::new();
 
-            let mut already_fetching: bool = false;
-
-            let mut prev_id: i64 = 0;
+            let mut trade_latencies: Vec<i64> = Vec::new();
 
             loop {
                 match &mut state {
@@ -234,6 +231,8 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                     State::Connected(websocket) => {
                         let mut fused_websocket = websocket.by_ref().fuse();
 
+                        let feed_latency: FeedLatency;
+
                         futures::select! {
                             received = fused_websocket.select_next_some() => {
                                 match received {
@@ -244,8 +243,12 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                                     stream.data.as_array().unwrap().iter().for_each(|trade| {
                                                         if let Ok(trade) = serde_json::from_value::<Trade>(trade.clone()) {
                                                             trades_buffer.push(trade);
+
+                                                            let latency = chrono::Utc::now().timestamp_millis() - trade.time;
+
+                                                            trade_latencies.push(latency);
                                                         } else {
-                                                            println!("Failed to deserialize trade: {:?}", trade);
+                                                            eprintln!("Failed to deserialize trade: {:?}", trade);
                                                         }
                                                     });
 
@@ -277,7 +280,27 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
 
                                                         let (local_bids, local_asks) = orderbook.update_levels(new_depth);
 
-                                                        let _ = output.send(Event::DepthReceived(stream.time, LocalDepthCache {
+                                                        let depth_latency = chrono::Utc::now().timestamp_millis() - stream.time;
+
+                                                        if !trade_latencies.is_empty() {
+                                                            let avg_trade_latency = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
+        
+                                                            feed_latency = FeedLatency {
+                                                                time: stream.time,
+                                                                depth_latency,
+                                                                trade_latency: Some(avg_trade_latency),
+                                                            };
+        
+                                                            trade_latencies.clear();
+                                                        } else {
+                                                            feed_latency = FeedLatency {
+                                                                time: stream.time,
+                                                                depth_latency,
+                                                                trade_latency: None,
+                                                            };
+                                                        }
+
+                                                        let _ = output.send(Event::DepthReceived(feed_latency, stream.time, LocalDepthCache {
                                                             time: stream.time,
                                                             bids: local_bids,
                                                             asks: local_asks,
@@ -313,7 +336,7 @@ struct Stream {
     data: Value,
 }
  
-#[derive(Deserialize, Debug, Clone)]
+#[derive(Deserialize, Debug, Clone, Copy)]
 pub struct Trade {
     #[serde(rename = "T")]
     pub time: i64,