Przeglądaj źródła

add market feed latency info

Berke 1 rok temu
rodzic
commit
54727dc18d
1 zmienionych plików z 150 dodań i 13 usunięć
  1. 150 13
      src/main.rs

+ 150 - 13
src/main.rs

@@ -1,7 +1,7 @@
 #![windows_subsystem = "windows"]
 
 mod data_providers;
-use data_providers::binance::market_data;
+use data_providers::binance::market_data::{self, FeedLatency};
 use data_providers::{binance, bybit};
 mod charts;
 use charts::custom_line::{self, CustomLine};
@@ -9,6 +9,7 @@ use charts::heatmap::{self, Heatmap};
 use charts::footprint::{self, Footprint};
 use iced::event;
 
+use std::collections::VecDeque;
 use std::vec;
 use chrono::{NaiveDateTime, DateTime, Utc};
 use iced::{
@@ -296,6 +297,10 @@ struct State {
 
     tick_multiply: u16,
     min_tick_size: Option<f32>,
+
+    exchange_latency: Option<u32>,
+
+    feed_latency_cache: VecDeque<FeedLatency>,
 }
 
 impl State {
@@ -381,6 +386,10 @@ impl State {
             pane_lock: false,
             tick_multiply: 1,
             min_tick_size: None, 
+
+            exchange_latency: None,
+
+            feed_latency_cache: VecDeque::new(),
         }
     }
 
@@ -544,16 +553,35 @@ impl State {
                             self.time_and_sales = Some(TimeAndSales::new());
                         }
                         if pane_state.id == PaneId::FootprintChart {
-                            let fetch_ticksize: Task<Message> = Task::perform(
-                                bybit::market_data::fetch_ticksize(self.selected_ticker.unwrap_or(Ticker::BTCUSDT)),
-                                move |result| match result {
-                                    Ok(ticksize) => Message::SetMinTickSize(ticksize),
-                                    Err(err) => {
-                                        Message::ErrorOccurred(err.to_string())
-                                    }
+                            match self.selected_exchange {
+                                Some(Exchange::BinanceFutures) => {
+                                    let fetch_ticksize: Task<Message> = Task::perform(
+                                        binance::market_data::fetch_ticksize(self.selected_ticker.unwrap_or(Ticker::BTCUSDT)),
+                                        move |result| match result {
+                                            Ok(ticksize) => Message::SetMinTickSize(ticksize),
+                                            Err(err) => {
+                                                Message::ErrorOccurred(err.to_string())
+                                            }
+                                        }
+                                    );
+                                    tasks.push(fetch_ticksize);
+                                },
+                                Some(Exchange::BybitLinear) => {
+                                    let fetch_ticksize: Task<Message> = Task::perform(
+                                        bybit::market_data::fetch_ticksize(self.selected_ticker.unwrap_or(Ticker::BTCUSDT)),
+                                        move |result| match result {
+                                            Ok(ticksize) => Message::SetMinTickSize(ticksize),
+                                            Err(err) => {
+                                                Message::ErrorOccurred(err.to_string())
+                                            }
+                                        }
+                                    );
+                                    tasks.push(fetch_ticksize);
+                                },
+                                None => {
+                                    eprintln!("No exchange selected");
                                 }
-                            );
-                            tasks.push(fetch_ticksize);
+                            }
                         }
 
                         if let Some(selected_timeframe) = pane_state.stream.1 {
@@ -620,6 +648,9 @@ impl State {
                     self.custom_line = None;
                     self.footprint_chart = None;
 
+                    self.exchange_latency = None;
+                    self.feed_latency_cache.clear();
+
                     Task::none()
                 }
             },       
@@ -677,7 +708,7 @@ impl State {
                         binance::market_data::Event::Disconnected => {
                             self.binance_ws_state = BinanceWsState::Disconnected;
                         }
-                        binance::market_data::Event::DepthReceived(depth_update, depth, trades_buffer) => {
+                        binance::market_data::Event::DepthReceived(feed_latency, depth_update, depth, trades_buffer) => {
                             if let Some(time_and_sales) = &mut self.time_and_sales {
                                 time_and_sales.update(&trades_buffer);
                             } 
@@ -690,6 +721,8 @@ impl State {
                             if let Some(chart) = &mut self.footprint_chart {
                                 chart.insert_datapoint(trades_buffer_clone, depth_update);
                             }
+
+                            self.feed_latency_cache.push_back(feed_latency);
                         }
                         binance::market_data::Event::KlineReceived(kline, timeframe) => {
                             for (_, pane_state) in self.panes.iter() {
@@ -716,6 +749,48 @@ impl State {
                                     }
                                 }
                             }
+
+                            let mut depth_latency_sum: i64 = 0;
+                            let mut depth_latency_count: i64 = 0;
+                            let mut trade_latency_sum: i64 = 0;
+                            let mut trade_latency_count: i64 = 0;
+
+                            for feed_latency in self.feed_latency_cache.iter() {
+                                depth_latency_sum += feed_latency.depth_latency;
+                                depth_latency_count += 1;
+
+                                if let Some(trade_latency) = feed_latency.trade_latency {
+                                    trade_latency_sum += trade_latency;
+                                    trade_latency_count += 1;
+                                }
+                            }
+
+                            let average_depth_latency = if depth_latency_count > 0 {
+                                Some(depth_latency_sum / depth_latency_count)
+                            } else {
+                                None
+                            };
+
+                            let average_trade_latency = if trade_latency_count > 0 {
+                                Some(trade_latency_sum / trade_latency_count)
+                            } else {
+                                None
+                            };
+
+                            let highest_average_latency = match (average_depth_latency, average_trade_latency) {
+                                (Some(depth), Some(trade)) => Some(std::cmp::max(depth, trade) as u32),
+                                (Some(depth), None) => Some(depth as u32),
+                                (None, Some(trade)) => Some(trade as u32),
+                                (None, None) => None,
+                            };
+
+                            if let Some(latency) = highest_average_latency {
+                                self.exchange_latency = Some(latency);
+                            }
+
+                            while self.feed_latency_cache.len() > 100 {
+                                self.feed_latency_cache.pop_front();
+                            }
                         }
                     },
 
@@ -730,7 +805,7 @@ impl State {
 
                             println!("Bybit disconnected");
                         }
-                        bybit::market_data::Event::DepthReceived(depth_update, depth, trades_buffer) => {
+                        bybit::market_data::Event::DepthReceived(feed_latency, depth_update, depth, trades_buffer) => {
 
                             // convert bybit trade to binance trade
                             let mut binance_trades: Vec<binance::market_data::Trade> = vec![];
@@ -763,6 +838,8 @@ impl State {
                             if let Some(chart) = &mut self.footprint_chart {
                                 chart.insert_datapoint(trades_clone, depth_update);
                             }
+
+                            self.feed_latency_cache.push_back(feed_latency);
                         }
                         bybit::market_data::Event::KlineReceived(kline, timeframe) => {
                             for (_, pane_state) in self.panes.iter() {
@@ -799,6 +876,50 @@ impl State {
                                     }
                                 }
                             }
+
+                            let mut depth_latency_sum: i64 = 0;
+                            let mut depth_latency_count: i64 = 0;
+                            let mut trade_latency_sum: i64 = 0;
+                            let mut trade_latency_count: i64 = 0;
+
+                            for feed_latency in self.feed_latency_cache.iter() {
+                                // Always add depth_latency
+                                depth_latency_sum += feed_latency.depth_latency;
+                                depth_latency_count += 1;
+
+                                // Add trade_latency if it exists
+                                if let Some(trade_latency) = feed_latency.trade_latency {
+                                    trade_latency_sum += trade_latency;
+                                    trade_latency_count += 1;
+                                }
+                            }
+
+                            let average_depth_latency = if depth_latency_count > 0 {
+                                Some(depth_latency_sum / depth_latency_count)
+                            } else {
+                                None
+                            };
+
+                            let average_trade_latency = if trade_latency_count > 0 {
+                                Some(trade_latency_sum / trade_latency_count)
+                            } else {
+                                None
+                            };
+
+                            let highest_average_latency = match (average_depth_latency, average_trade_latency) {
+                                (Some(depth), Some(trade)) => Some(std::cmp::max(depth, trade) as u32),
+                                (Some(depth), None) => Some(depth as u32),
+                                (None, Some(trade)) => Some(trade as u32),
+                                (None, None) => None,
+                            };
+
+                            if let Some(latency) = highest_average_latency {
+                                self.exchange_latency = Some(latency);
+                            }
+
+                            while self.feed_latency_cache.len() > 100 {
+                                self.feed_latency_cache.pop_front();
+                            }
                         }
                     }
                 };
@@ -1050,7 +1171,23 @@ impl State {
 
         if self.ws_running {
             ws_controls = ws_controls.push(
-                Text::new(self.selected_ticker.unwrap_or_else(|| { dbg!("No ticker found"); Ticker::BTCUSDT } ).to_string()).size(20));
+                Row::new()
+                    .spacing(10)
+                    .align_items(Alignment::Center)
+                    .push(
+                        Column::new()
+                            .align_items(Alignment::Start)
+                            .push(
+                        Text::new(self.selected_exchange.unwrap_or_else(|| { dbg!("No exchange found"); Exchange::BinanceFutures }).to_string()).size(10)
+                            )
+                            .push(
+                                tooltip(Text::new(format!("{} ms", self.exchange_latency.unwrap_or_default())).size(10), "Exchange feed latency", tooltip::Position::Bottom).style(style::tooltip)
+                            )
+                    )
+                    .push(
+                Text::new(self.selected_ticker.unwrap_or_else(|| { dbg!("No ticker found"); Ticker::BTCUSDT } ).to_string()).size(20)
+                    )   
+            );
         } else {
             let symbol_pick_list = pick_list(
                 &Ticker::ALL[..],