Kaynağa Gözat

chore: data structs maintained for a better reusability

Berke 1 yıl önce
ebeveyn
işleme
d4b52a5a2f

+ 1 - 1
src/charts/candlestick.rs

@@ -3,7 +3,7 @@ use iced::{
     alignment, mouse, widget::{button, canvas::{self, event::{self, Event}, stroke::Stroke, Cache, Canvas, Geometry, Path}}, Color, Element, Length, Point, Rectangle, Renderer, Size, Theme
 };
 use iced::widget::{Column, Row, Container, Text};
-use crate::{market_data::Kline, Timeframe};
+use crate::{data_providers::Kline, Timeframe};
 
 use super::{Chart, CommonChartData, Message, Interaction, AxisLabelXCanvas, AxisLabelYCanvas};
 use super::{chart_button, calculate_price_step, calculate_time_step};

+ 1 - 1
src/charts/footprint.rs

@@ -3,7 +3,7 @@ use iced::{
     alignment, mouse, widget::{button, canvas::{self, event::{self, Event}, stroke::Stroke, Canvas, Geometry, Path}}, Color, Element, Length, Point, Rectangle, Renderer, Size, Theme
 };
 use iced::widget::{Column, Row, Container, Text};
-use crate::data_providers::binance::market_data::{Kline, Trade};
+use crate::data_providers::{Kline, Trade};
 
 use super::{Chart, CommonChartData, Message, Interaction, AxisLabelXCanvas, AxisLabelYCanvas};
 use super::chart_button;

+ 5 - 4
src/charts/heatmap.rs

@@ -4,20 +4,21 @@ use iced::{
     alignment, color, mouse, widget::{button, canvas::{self, event::{self, Event}, stroke::Stroke, Cache, Canvas, Geometry, Path}}, window, Border, Color, Element, Length, Point, Rectangle, Renderer, Size, Theme, Vector
 };
 use iced::widget::{Column, Row, Container, Text};
-use crate::data_providers::binance::market_data::{LocalDepthCache, Trade};
+
+use crate::data_providers::{Depth, Trade};
 
 use super::{Chart, CommonChartData, Message, chart_button, Interaction, AxisLabelYCanvas, AxisLabelXCanvas};
 
 pub struct HeatmapChart {
     chart: CommonChartData,
-    data_points: BTreeMap<i64, (LocalDepthCache, Box<[Trade]>)>,
+    data_points: BTreeMap<i64, (Depth, Box<[Trade]>)>,
     tick_size: f32,
     y_scaling: f32,
     size_filter: f32,
 }
 
 impl Chart for HeatmapChart {
-    type DataPoint = BTreeMap<i64, (LocalDepthCache, Box<[Trade]>)>;
+    type DataPoint = BTreeMap<i64, (Depth, Box<[Trade]>)>;
 
     fn get_common_data(&self) -> &CommonChartData {
         &self.chart
@@ -55,7 +56,7 @@ impl HeatmapChart {
         trades_source
     }
 
-    pub fn insert_datapoint(&mut self, trades_buffer: Vec<Trade>, depth_update: i64, depth: LocalDepthCache) {
+    pub fn insert_datapoint(&mut self, trades_buffer: Vec<Trade>, depth_update: i64, depth: Depth) {
         let aggregate_time = 100; // 100 ms
         let rounded_depth_update = (depth_update / aggregate_time) * aggregate_time;
         

+ 141 - 1
src/data_providers.rs

@@ -1,2 +1,142 @@
 pub mod binance;
-pub mod bybit;
+pub mod bybit;
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct Order {
+    pub price: f32,
+    pub qty: f32,
+}
+#[derive(Debug, Clone, Default)]
+pub struct Depth {
+    pub time: i64,
+    pub bids: Box<[Order]>,
+    pub asks: Box<[Order]>,
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct LocalDepthCache {
+    pub last_update_id: i64,
+    pub time: i64,
+    pub bids: Vec<Order>,
+    pub asks: Vec<Order>,
+}
+
+impl LocalDepthCache {
+    pub fn new() -> Self {
+        Self {
+            last_update_id: 0,
+            time: 0,
+            bids: Vec::new(),
+            asks: Vec::new(),
+        }
+    }
+
+    pub fn fetched(&mut self, new_depth: LocalDepthCache) {
+        self.last_update_id = new_depth.last_update_id;        
+        self.time = new_depth.time;
+
+        self.bids = new_depth.bids;
+        self.asks = new_depth.asks;
+    }
+
+    pub fn update_depth_cache(&mut self, new_bids: &[Order], new_asks: &[Order]) {
+        for order in new_bids {
+            if order.qty == 0.0 {
+                self.bids.retain(|x| x.price != order.price);
+            } else if let Some(existing_order) = self.bids.iter_mut().find(|x| x.price == order.price) {
+                    existing_order.qty = order.qty;
+            } else {
+                self.bids.push(*order);
+            }
+        }
+        for order in new_asks {
+            if order.qty == 0.0 {
+                self.asks.retain(|x| x.price != order.price);
+            } else if let Some(existing_order) = self.asks.iter_mut().find(|x| x.price == order.price) {
+                existing_order.qty = order.qty;
+            } else {
+                self.asks.push(*order);
+            }
+        }
+    }
+
+    pub fn update_levels(&mut self, new_depth: LocalDepthCache) -> (Box<[Order]>, Box<[Order]>) {
+        self.last_update_id = new_depth.last_update_id;
+        self.time = new_depth.time;
+
+        let mut best_ask_price = f32::MAX;
+        let mut best_bid_price = 0.0f32;
+
+        self.bids.iter().for_each(|order| {
+            if order.price > best_bid_price {
+                best_bid_price = order.price;
+            }
+        });
+        self.asks.iter().for_each(|order| {
+            if order.price < best_ask_price {
+                best_ask_price = order.price;
+            }
+        });
+
+        let highest: f32 = best_ask_price * 1.001;
+        let lowest: f32 = best_bid_price * 0.999;
+
+        self.update_depth_cache(&new_depth.bids, &new_depth.asks);
+
+        let mut local_bids: Vec<Order> = Vec::new();
+        let mut local_asks: Vec<Order> = Vec::new();
+
+        for order in &self.bids {
+            if order.price >= lowest {
+                local_bids.push(*order);
+            }
+        }
+        for order in &self.asks {
+            if order.price <= highest {
+                local_asks.push(*order);
+            }
+        }
+
+        // first sort by price
+        local_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
+        local_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
+
+        (local_bids.into_boxed_slice(), local_asks.into_boxed_slice())
+    }
+
+    pub fn get_fetch_id(&self) -> i64 {
+        self.last_update_id
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct Trade {
+    pub time: i64,
+    pub is_sell: bool,
+    pub price: f32,
+    pub qty: f32,
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct Kline {
+    pub time: u64,
+    pub open: f32,
+    pub high: f32,
+    pub low: f32,
+    pub close: f32,
+    pub volume: f32,
+    pub taker_buy_base_asset_volume: f32,
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct FeedLatency {
+    pub time: i64,
+    pub depth_latency: i64,
+    pub trade_latency: Option<i64>,
+}
+
+pub trait DataProvider {
+    fn get_orderbook(&self, symbol: &str) -> Result<Depth, Box<dyn std::error::Error>>;
+
+    fn get_trades(&self, symbol: &str) -> Result<Vec<Trade>, Box<dyn std::error::Error>>;
+}

+ 11 - 140
src/data_providers/binance/market_data.rs

@@ -25,6 +25,8 @@ use tokio::net::TcpStream;
 use tokio_rustls::rustls::{ClientConfig, OwnedTrustAnchor};
 use tokio_rustls::TlsConnector;
 
+use crate::data_providers::{LocalDepthCache, Trade, Depth, Order, FeedLatency, Kline};
+
 #[allow(clippy::large_enum_variant)]
 enum State {
     Disconnected,
@@ -33,18 +35,11 @@ enum State {
     ),
 }
 
-#[derive(Debug, Clone, Copy)]
-pub struct FeedLatency {
-    pub time: i64,
-    pub depth_latency: i64,
-    pub trade_latency: Option<i64>,
-}
-
 #[derive(Debug, Clone)]
 pub enum Event {
     Connected(Connection),
     Disconnected,
-    DepthReceived(FeedLatency, i64, LocalDepthCache, Vec<Trade>),
+    DepthReceived(FeedLatency, i64, Depth, Vec<Trade>),
     KlineReceived(Kline, Timeframe),
 }
 
@@ -73,120 +68,6 @@ pub struct FetchedDepth {
     #[serde(rename = "asks")]
     asks: Vec<Order>,
 }
-#[derive(Debug, Clone, Copy, Default)]
-pub struct Order {
-    pub price: f32,
-    pub qty: f32,
-}
-#[derive(Debug, Clone, Default)]
-pub struct LocalDepthCache {
-    pub time: i64,
-    pub bids: Box<[Order]>,
-    pub asks: Box<[Order]>,
-}
-
-pub struct Depth {
-    pub last_update_id: i64,
-    pub time: i64,
-    pub bids: Vec<Order>,
-    pub asks: Vec<Order>,
-}
-
-impl Depth {
-    pub fn new() -> Self {
-        Self {
-            last_update_id: 0,
-            time: 0,
-            bids: Vec::new(),
-            asks: Vec::new(),
-        }
-    }
-
-    pub fn fetched(&mut self, new_depth: Depth) {
-        self.last_update_id = new_depth.last_update_id;        
-        self.time = new_depth.time;
-
-        self.bids = new_depth.bids;
-        self.asks = new_depth.asks;
-    }
-
-    pub fn update_depth_cache(&mut self, new_bids: &[Order], new_asks: &[Order]) {
-        for order in new_bids {
-            if order.qty == 0.0 {
-                self.bids.retain(|x| x.price != order.price);
-            } else if let Some(existing_order) = self.bids.iter_mut().find(|x| x.price == order.price) {
-                    existing_order.qty = order.qty;
-            } else {
-                self.bids.push(*order);
-            }
-        }
-        for order in new_asks {
-            if order.qty == 0.0 {
-                self.asks.retain(|x| x.price != order.price);
-            } else if let Some(existing_order) = self.asks.iter_mut().find(|x| x.price == order.price) {
-                existing_order.qty = order.qty;
-            } else {
-                self.asks.push(*order);
-            }
-        }
-    }
-
-    pub fn update_levels(&mut self, new_depth: Depth) -> (Box<[Order]>, Box<[Order]>) {
-        self.last_update_id = new_depth.last_update_id;
-        self.time = new_depth.time;
-
-        let mut best_ask_price = f32::MAX;
-        let mut best_bid_price = 0.0f32;
-
-        self.bids.iter().for_each(|order| {
-            if order.price > best_bid_price {
-                best_bid_price = order.price;
-            }
-        });
-        self.asks.iter().for_each(|order| {
-            if order.price < best_ask_price {
-                best_ask_price = order.price;
-            }
-        });
-
-        let highest: f32 = best_ask_price * 1.001;
-        let lowest: f32 = best_bid_price * 0.999;
-
-        self.update_depth_cache(&new_depth.bids, &new_depth.asks);
-
-        let mut local_bids: Vec<Order> = Vec::new();
-        let mut local_asks: Vec<Order> = Vec::new();
-
-        for order in &self.bids {
-            if order.price >= lowest {
-                local_bids.push(*order);
-            }
-        }
-        for order in &self.asks {
-            if order.price <= highest {
-                local_asks.push(*order);
-            }
-        }
-
-        // first sort by price
-        local_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
-        local_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
-
-        (local_bids.into_boxed_slice(), local_asks.into_boxed_slice())
-    }
-
-    pub fn get_fetch_id(&self) -> i64 {
-        self.last_update_id
-    }
-}
-
-#[derive(Debug, Clone, Copy)]
-pub struct Trade {
-    pub time: i64,
-    pub is_sell: bool,
-    pub price: f32,
-    pub qty: f32,
-}
 
 #[derive(Serialize, Deserialize, Debug)]
 struct SonicDepth {
@@ -425,7 +306,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
             let stream_1 = format!("{symbol_str}@trade");
             let stream_2 = format!("{symbol_str}@depth@100ms");
 
-            let mut orderbook: Depth = Depth::new();
+            let mut orderbook: LocalDepthCache = LocalDepthCache::new();
 
             let mut already_fetching: bool = false;
 
@@ -448,9 +329,9 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                             tokio::spawn(async move {
                                 let fetched_depth = fetch_depth(selected_ticker).await;
 
-                                let depth: Depth = match fetched_depth {
+                                let depth: LocalDepthCache = match fetched_depth {
                                     Ok(depth) => {
-                                        Depth {
+                                        LocalDepthCache {
                                             last_update_id: depth.update_id,
                                             time: depth.time,
                                             bids: depth.bids,
@@ -522,9 +403,9 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                                     tokio::spawn(async move {
                                                         let fetched_depth = fetch_depth(selected_ticker).await;
     
-                                                        let depth: Depth = match fetched_depth {
+                                                        let depth: LocalDepthCache = match fetched_depth {
                                                             Ok(depth) => {
-                                                                Depth {
+                                                                LocalDepthCache {
                                                                     last_update_id: depth.update_id,
                                                                     time: depth.time,
                                                                     bids: depth.bids,
@@ -553,7 +434,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
     
                                                     let depth_latency = chrono::Utc::now().timestamp_millis() - time;
     
-                                                    let depth_update = Depth {
+                                                    let depth_update = LocalDepthCache {
                                                         last_update_id: de_depth.final_id as i64,
                                                         time,
                                                         bids: de_depth.bids.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
@@ -562,7 +443,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
     
                                                     let (local_bids, local_asks) = orderbook.update_levels(depth_update);
     
-                                                    let local_depth_cache = LocalDepthCache {
+                                                    let current_depth = Depth {
                                                         time,
                                                         bids: local_bids,
                                                         asks: local_asks,
@@ -585,7 +466,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                                         Event::DepthReceived(
                                                             feed_latency,
                                                             time, 
-                                                            local_depth_cache,
+                                                            current_depth,
                                                             std::mem::take(&mut trades_buffer)
                                                         )
                                                     ).await;
@@ -720,16 +601,6 @@ mod string_to_f32 {
     }
 }
 
-#[derive(Debug, Clone, Copy)]
-pub struct Kline {
-    pub time: u64,
-    pub open: f32,
-    pub high: f32,
-    pub low: f32,
-    pub close: f32,
-    pub volume: f32,
-    pub taker_buy_base_asset_volume: f32,
-}
 #[derive(Deserialize, Debug, Clone)]
 struct FetchedKlines (
     u64,

+ 8 - 118
src/data_providers/bybit/market_data.rs

@@ -4,9 +4,6 @@ use iced::subscription::{self, Subscription};
 use futures::sink::SinkExt;
 
 use serde_json::Value;
-use crate::data_providers::binance::market_data::FeedLatency;
-use crate::{Ticker, Timeframe};
-
 use bytes::Bytes;
 
 use sonic_rs::{LazyValue, JsonValueTrait};
@@ -26,6 +23,9 @@ use tokio::net::TcpStream;
 use tokio_rustls::rustls::{ClientConfig, OwnedTrustAnchor};
 use tokio_rustls::TlsConnector;
 
+use crate::data_providers::{LocalDepthCache, Trade, Depth, Order, FeedLatency};
+use crate::{Ticker, Timeframe};
+
 #[allow(clippy::large_enum_variant)]
 enum State {
     Disconnected,
@@ -38,116 +38,13 @@ enum State {
 pub enum Event {
     Connected(Connection),
     Disconnected,
-    DepthReceived(FeedLatency, i64, LocalDepthCache, Vec<Trade>),
+    DepthReceived(FeedLatency, i64, Depth, Vec<Trade>),
     KlineReceived(Kline, Timeframe),
 }
 
 #[derive(Debug, Clone)]
 pub struct Connection;
 
-#[derive(Debug, Clone, Copy, Default)]
-pub struct Order {
-    pub price: f32,
-    pub qty: f32,
-}
-#[derive(Debug, Clone, Default)]
-pub struct LocalDepthCache {
-    pub time: i64,
-    pub bids: Box<[Order]>,
-    pub asks: Box<[Order]>,
-}
-#[derive(Debug, Clone, Default)]
-pub struct Depth {
-    pub last_update_id: i64,
-    pub time: i64,
-    pub bids: Vec<Order>,
-    pub asks: Vec<Order>,
-}
-
-impl Depth {
-    pub fn new() -> Self {
-        Self {
-            last_update_id: 0,
-            time: 0,
-            bids: Vec::new(),
-            asks: Vec::new(),
-        }
-    }
-
-    pub fn fetched(&mut self, new_depth: Depth) {
-        self.last_update_id = new_depth.last_update_id;        
-        self.time = new_depth.time;
-
-        self.bids = new_depth.bids;
-        self.asks = new_depth.asks;
-    }
-
-    pub fn update_depth_cache(&mut self, new_bids: &[Order], new_asks: &[Order]) {
-        for order in new_bids {
-            if order.qty == 0.0 {
-                self.bids.retain(|x| x.price != order.price);
-            } else if let Some(existing_order) = self.bids.iter_mut().find(|x| x.price == order.price) {
-                existing_order.qty = order.qty;
-            } else {
-                self.bids.push(*order);
-            }
-        }
-        for order in new_asks {
-            if order.qty == 0.0 {
-                self.asks.retain(|x| x.price != order.price);
-            } else if let Some(existing_order) = self.asks.iter_mut().find(|x| x.price == order.price) {
-                existing_order.qty = order.qty;
-            } else {
-                self.asks.push(*order);
-            }
-        }
-    }
-
-    pub fn update_levels(&mut self, new_depth: Depth) -> (Box<[Order]>, Box<[Order]>) {
-        self.last_update_id = new_depth.last_update_id;
-        self.time = new_depth.time;
-
-        let mut best_ask_price = f32::MAX;
-        let mut best_bid_price = 0.0f32;
-
-        self.bids.iter().for_each(|order| {
-            if order.price > best_bid_price {
-                best_bid_price = order.price;
-            }
-        });
-        self.asks.iter().for_each(|order| {
-            if order.price < best_ask_price {
-                best_ask_price = order.price;
-            }
-        });
-
-        let highest: f32 = best_ask_price * 1.001;
-        let lowest: f32 = best_bid_price * 0.999;
-
-        self.update_depth_cache(&new_depth.bids, &new_depth.asks);
-
-        let mut local_bids: Vec<Order> = Vec::new();
-        let mut local_asks: Vec<Order> = Vec::new();
-
-        for order in &self.bids {
-            if order.price >= lowest {
-                local_bids.push(*order);
-            }
-        }
-        for order in &self.asks {
-            if order.price <= highest {
-                local_asks.push(*order);
-            }
-        }
-
-        // first sort by price
-        local_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
-        local_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
-
-        (local_bids.into_boxed_slice(), local_asks.into_boxed_slice())
-    }
-}
-
 #[derive(Serialize, Deserialize, Debug)]
 struct SonicDepth {
 	#[serde(rename = "u")]
@@ -375,13 +272,6 @@ fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
     Timeframe::ALL.iter().find(|&tf| tf.to_string() == format!("{}m", interval)).copied()
 }
 
-#[derive(Debug, Clone, Copy)]
-pub struct Trade {
-    pub time: i64,
-    pub is_sell: bool,
-    pub price: f32,
-    pub qty: f32,
-}
 #[derive(Deserialize, Debug, Clone, Copy)]
 pub struct Kline {
     pub time: u64,
@@ -413,7 +303,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
             let stream_1 = format!("publicTrade.{symbol_str}");
             let stream_2 = format!("orderbook.200.{symbol_str}");
 
-            let mut orderbook: Depth = Depth::new();
+            let mut orderbook: LocalDepthCache = LocalDepthCache::new();
 
             let mut trade_latencies: Vec<i64> = Vec::new();
 
@@ -475,7 +365,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                             StreamData::Depth(de_depth, data_type, time) => {                                            
                                                 let depth_latency = chrono::Utc::now().timestamp_millis() - time;
 
-                                                let depth_update = Depth {
+                                                let depth_update = LocalDepthCache {
                                                     last_update_id: de_depth.update_id as i64,
                                                     time,
                                                     bids: de_depth.bids.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
@@ -488,7 +378,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                                 } else if data_type == "delta" {
                                                     let (local_bids, local_asks) = orderbook.update_levels(depth_update);
 
-                                                    let local_depth_cache = LocalDepthCache {
+                                                    let current_depth = Depth {
                                                         time,
                                                         bids: local_bids,
                                                         asks: local_asks,
@@ -511,7 +401,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                                         Event::DepthReceived(
                                                             feed_latency,
                                                             time, 
-                                                            local_depth_cache,
+                                                            current_depth,
                                                             std::mem::take(&mut trades_buffer)
                                                         )
                                                     ).await;

+ 57 - 108
src/main.rs

@@ -1,7 +1,6 @@
 #![windows_subsystem = "windows"]
 
 mod data_providers;
-use data_providers::binance::market_data::{self, FeedLatency};
 use data_providers::{binance, bybit};
 mod charts;
 use charts::footprint::{self, FootprintChart};
@@ -233,7 +232,7 @@ pub enum Message {
     ExchangeSelected(Exchange),
     MarketWsEvent(MarketEvents),
     WsToggle,
-    FetchEvent(Result<Vec<binance::market_data::Kline>, std::string::String>, PaneId, Timeframe),
+    FetchEvent(Result<Vec<data_providers::Kline>, std::string::String>, PaneId, Timeframe),
     
     // Pane grid
     Split(pane_grid::Axis, pane_grid::Pane, PaneId),
@@ -301,7 +300,7 @@ struct State {
 
     exchange_latency: Option<(u32, u32)>,
 
-    feed_latency_cache: VecDeque<FeedLatency>,
+    feed_latency_cache: VecDeque<data_providers::FeedLatency>,
     
     pane_state_cache: HashMap<PaneId, (Option<Ticker>, Option<Timeframe>, Option<f32>)>,
 
@@ -481,7 +480,7 @@ impl State {
                     match self.selected_exchange {
                         Some(Exchange::BinanceFutures) => {
                             let fetch_klines = Task::perform(
-                                market_data::fetch_klines(*selected_ticker, timeframe)
+                                binance::market_data::fetch_klines(*selected_ticker, timeframe)
                                     .map_err(|err| format!("{err}")), 
                                 move |klines| {
                                     Message::FetchEvent(klines, pane_id, timeframe)
@@ -498,8 +497,8 @@ impl State {
 
                                     match klines {
                                         Ok(klines) => {
-                                            let binance_klines: Vec<market_data::Kline> = klines.iter().map(|kline| {
-                                                market_data::Kline {
+                                            let binance_klines: Vec<data_providers::Kline> = klines.iter().map(|kline| {
+                                                data_providers::Kline {
                                                     time: kline.time,
                                                     open: kline.open,
                                                     high: kline.high,
@@ -560,7 +559,7 @@ impl State {
                 } else {
                     Task::none()
                 }
-            }
+            },
             Message::WsToggle => {
                 self.ws_running = !self.ws_running;
 
@@ -613,9 +612,9 @@ impl State {
                             match self.selected_exchange {
                                 Some(Exchange::BinanceFutures) => {
                                     let fetch_klines: Task<Message> = Task::perform(
-                                        market_data::fetch_klines(self.selected_ticker.unwrap_or(Ticker::BTCUSDT), selected_timeframe)
+                                        binance::market_data::fetch_klines(self.selected_ticker.unwrap_or(Ticker::BTCUSDT), selected_timeframe)
                                             .map_err(|err| format!("{err}")), 
-                                        move |klines: Result<Vec<market_data::Kline>, String>| {
+                                        move |klines: Result<Vec<data_providers::Kline>, String>| {
                                             Message::FetchEvent(klines, pane_id, selected_timeframe)
                                         }
                                     );
@@ -629,8 +628,8 @@ impl State {
 
                                             match klines {
                                                 Ok(klines) => {
-                                                    let binance_klines: Vec<market_data::Kline> = klines.iter().map(|kline| {
-                                                        market_data::Kline {
+                                                    let binance_klines: Vec<data_providers::Kline> = klines.iter().map(|kline| {
+                                                        data_providers::Kline {
                                                             time: kline.time,
                                                             open: kline.open,
                                                             high: kline.high,
@@ -688,7 +687,7 @@ impl State {
                             },
                             PaneId::FootprintChart => {
                                 if let Some(heatmap_chart) = &mut self.heatmap_chart {
-                                    let copied_trades: Vec<Trade> = heatmap_chart.get_raw_trades();
+                                    let copied_trades: Vec<data_providers::Trade> = heatmap_chart.get_raw_trades();
 
                                     let mut klines_raw: Vec<(i64, f32, f32, f32, f32, f32, f32)> = vec![];
                                     for kline in &klines {
@@ -771,40 +770,7 @@ impl State {
                                 }
                             }
 
-                            let mut depth_latency_sum: i64 = 0;
-                            let mut depth_latency_count: i64 = 0;
-                            let mut trade_latency_sum: i64 = 0;
-                            let mut trade_latency_count: i64 = 0;
-
-                            for feed_latency in self.feed_latency_cache.iter() {
-                                depth_latency_sum += feed_latency.depth_latency;
-                                depth_latency_count += 1;
-
-                                if let Some(trade_latency) = feed_latency.trade_latency {
-                                    trade_latency_sum += trade_latency;
-                                    trade_latency_count += 1;
-                                }
-                            }
-
-                            let average_depth_latency: Option<i64> = if depth_latency_count > 0 {
-                                Some(depth_latency_sum / depth_latency_count)
-                            } else {
-                                None
-                            };
-
-                            let average_trade_latency: Option<i64> = if trade_latency_count > 0 {
-                                Some(trade_latency_sum / trade_latency_count)
-                            } else {
-                                None
-                            };
-
-                            if let (Some(average_depth_latency), Some(average_trade_latency)) = (average_depth_latency, average_trade_latency) {
-                                self.exchange_latency = Some((average_depth_latency as u32, average_trade_latency as u32));
-                            }
-
-                            while self.feed_latency_cache.len() > 100 {
-                                self.feed_latency_cache.pop_front();
-                            }
+                            self.update_exchange_latency();
                         }
                     },
 
@@ -820,37 +786,17 @@ impl State {
                             println!("Bybit disconnected");
                         }
                         bybit::market_data::Event::DepthReceived(feed_latency, depth_update, depth, trades_buffer) => {
-
-                            // convert bybit trade to binance trade
-                            let mut binance_trades: Vec<binance::market_data::Trade> = vec![];
-                            
-                            for trade in trades_buffer.iter() {
-                                let binance_trade = binance::market_data::Trade {
-                                    price: trade.price,
-                                    qty: trade.qty,
-                                    time: trade.time,
-                                    is_sell: trade.is_sell,
-                                };
-                                binance_trades.push(binance_trade);
-                            }
-
-                            let local_depth = binance::market_data::LocalDepthCache {
-                                time: depth.time,
-                                bids: depth.bids.iter().map(|order| binance::market_data::Order { price: order.price, qty: order.qty }).collect(),
-                                asks: depth.asks.iter().map(|order| binance::market_data::Order { price: order.price, qty: order.qty }).collect(),
-                            };
-
-                            let trades_clone = binance_trades.clone();
-
                             if let Some(time_and_sales) = &mut self.time_and_sales {
-                                time_and_sales.update(&binance_trades);
+                                time_and_sales.update(&trades_buffer);
                             } 
 
+                            let trades_buffer_clone = trades_buffer.clone();
+
                             if let Some(chart) = &mut self.heatmap_chart {
-                                chart.insert_datapoint(binance_trades, depth_update, local_depth);
+                                chart.insert_datapoint(trades_buffer, depth_update, depth);
                             } 
                             if let Some(chart) = &mut self.footprint_chart {
-                                chart.insert_datapoint(trades_clone, depth_update);
+                                chart.insert_datapoint(trades_buffer_clone, depth_update);
                             }
 
                             self.feed_latency_cache.push_back(feed_latency);
@@ -859,7 +805,7 @@ impl State {
                             for (_, pane_state) in self.panes.iter() {
                                 if let Some(selected_timeframe) = pane_state.stream.1 {
                                     if selected_timeframe == timeframe {
-                                        let binance_kline = binance::market_data::Kline {
+                                        let binance_kline = data_providers::Kline {
                                             time: kline.time,
                                             open: kline.open,
                                             high: kline.high,
@@ -891,40 +837,7 @@ impl State {
                                 }
                             }
 
-                            let mut depth_latency_sum: i64 = 0;
-                            let mut depth_latency_count: i64 = 0;
-                            let mut trade_latency_sum: i64 = 0;
-                            let mut trade_latency_count: i64 = 0;
-
-                            for feed_latency in self.feed_latency_cache.iter() {
-                                depth_latency_sum += feed_latency.depth_latency;
-                                depth_latency_count += 1;
-
-                                if let Some(trade_latency) = feed_latency.trade_latency {
-                                    trade_latency_sum += trade_latency;
-                                    trade_latency_count += 1;
-                                }
-                            }
-
-                            let average_depth_latency = if depth_latency_count > 0 {
-                                Some(depth_latency_sum / depth_latency_count)
-                            } else {
-                                None
-                            };
-
-                            let average_trade_latency = if trade_latency_count > 0 {
-                                Some(trade_latency_sum / trade_latency_count)
-                            } else {
-                                None
-                            };
-
-                            if let (Some(average_depth_latency), Some(average_trade_latency)) = (average_depth_latency, average_trade_latency) {
-                                self.exchange_latency = Some((average_depth_latency as u32, average_trade_latency as u32));
-                            }
-
-                            while self.feed_latency_cache.len() > 100 {
-                                self.feed_latency_cache.pop_front();
-                            }
+                            self.update_exchange_latency();
                         }
                     }
                 };
@@ -1393,6 +1306,43 @@ impl State {
         
         Subscription::batch(subscriptions)
     }    
+
+    fn update_exchange_latency(&mut self) {
+        let mut depth_latency_sum: i64 = 0;
+        let mut depth_latency_count: i64 = 0;
+        let mut trade_latency_sum: i64 = 0;
+        let mut trade_latency_count: i64 = 0;
+
+        for feed_latency in self.feed_latency_cache.iter() {
+            depth_latency_sum += feed_latency.depth_latency;
+            depth_latency_count += 1;
+
+            if let Some(trade_latency) = feed_latency.trade_latency {
+                trade_latency_sum += trade_latency;
+                trade_latency_count += 1;
+            }
+        }
+
+        let average_depth_latency: Option<i64> = if depth_latency_count > 0 {
+            Some(depth_latency_sum / depth_latency_count)
+        } else {
+            None
+        };
+
+        let average_trade_latency: Option<i64> = if trade_latency_count > 0 {
+            Some(trade_latency_sum / trade_latency_count)
+        } else {
+            None
+        };
+
+        if let (Some(average_depth_latency), Some(average_trade_latency)) = (average_depth_latency, average_trade_latency) {
+            self.exchange_latency = Some((average_depth_latency as u32, average_trade_latency as u32));
+        }
+
+        while self.feed_latency_cache.len() > 100 {
+            self.feed_latency_cache.pop_front();
+        }
+    }
 }
 
 fn modal<'a, Message>(
@@ -1677,7 +1627,6 @@ impl TickMultiplier {
     }
 }
 
-use crate::market_data::Trade;
 struct ConvertedTrade {
     time: NaiveDateTime,
     price: f32,
@@ -1699,7 +1648,7 @@ impl TimeAndSales {
         self.size_filter = value;
     }
 
-    fn update(&mut self, trades_buffer: &Vec<Trade>) {
+    fn update(&mut self, trades_buffer: &Vec<data_providers::Trade>) {
         for trade in trades_buffer {
             let trade_time = NaiveDateTime::from_timestamp(trade.time / 1000, (trade.time % 1000) as u32 * 1_000_000);
             let converted_trade = ConvertedTrade {