Просмотр исходного кода

Bybit linear perps integration (#8)

* initial commit

* add "exchange" matches to event handlers

* impl. bybit to fetch/ws methods

* fix price level keys to suit with tick size adjustments

* add exchange match for timeframe selection

* fix tick size fetch and it's error handling with anyhow

* add ticksize fetch method for bybit

* anyhow for easier error handling

* workaround to handle a data discrepancy between exchanges
Berke 1 год назад
Родитель
Сommit
4560b33b4d

+ 7 - 0
Cargo.lock

@@ -132,6 +132,12 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "anyhow"
+version = "1.0.86"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
+
 [[package]]
 name = "approx"
 version = "0.5.1"
@@ -1869,6 +1875,7 @@ dependencies = [
 name = "iced-trade"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "async-tungstenite",
  "base64",
  "chrono",

+ 2 - 1
Cargo.toml

@@ -30,6 +30,7 @@ hex = "0.4.3"
 iced_table = "0.12.0"
 iced_futures = "0.12.0"
 iced_aw = { version = "0.8.0", features = ["quad", "menu"] }
+anyhow = "1.0.86"
 [dependencies.async-tungstenite]
 version = "0.25"
-features = ["tokio-rustls-webpki-roots"]
+features = ["tokio-rustls-webpki-roots"]

+ 43 - 19
src/charts/custom_line.rs

@@ -85,7 +85,13 @@ impl CustomLine {
 
     pub fn insert_datapoint(&mut self, kline: &Kline) {
         let buy_volume = kline.taker_buy_base_asset_volume;
-        let sell_volume = kline.volume - buy_volume;
+        let sell_volume: f32;
+        
+        if buy_volume != -1.0 {
+            sell_volume = kline.volume - buy_volume;
+        } else {
+            sell_volume = kline.volume;
+        }
         self.klines_raw.insert(kline.time as i64, (kline.open, kline.high, kline.low, kline.close, buy_volume, sell_volume));
 
         self.render_start();
@@ -527,20 +533,30 @@ impl canvas::Program<Message> for CustomLine {
                 );
                 frame.stroke(&wick, Stroke::default().with_color(color).with_width(1.0));
 
-                let buy_bar_height = (buy_volume / max_volume) * volume_area_height;
-                let sell_bar_height = (sell_volume / max_volume) * volume_area_height;
-                
-                let buy_bar = Path::rectangle(
-                    Point::new(x_position as f32, bounds.height - buy_bar_height), 
-                    Size::new(2.0 * self.scaling, buy_bar_height)
-                );
-                frame.fill(&buy_bar, Color::from_rgb8(81, 205, 160)); 
-                
-                let sell_bar = Path::rectangle(
-                    Point::new(x_position as f32 - (2.0 * self.scaling), bounds.height - sell_bar_height), 
-                    Size::new(2.0 * self.scaling, sell_bar_height)
-                );
-                frame.fill(&sell_bar, Color::from_rgb8(192, 80, 77)); 
+                if buy_volume != -1.0 {
+                    let buy_bar_height = (buy_volume / max_volume) * volume_area_height;
+                    let sell_bar_height = (sell_volume / max_volume) * volume_area_height;
+                    
+                    let buy_bar = Path::rectangle(
+                        Point::new(x_position as f32, bounds.height - buy_bar_height), 
+                        Size::new(2.0 * self.scaling, buy_bar_height)
+                    );
+                    frame.fill(&buy_bar, Color::from_rgb8(81, 205, 160)); 
+                    
+                    let sell_bar = Path::rectangle(
+                        Point::new(x_position as f32 - (2.0 * self.scaling), bounds.height - sell_bar_height), 
+                        Size::new(2.0 * self.scaling, sell_bar_height)
+                    );
+                    frame.fill(&sell_bar, Color::from_rgb8(192, 80, 77)); 
+                } else {
+                    let bar_height = ((sell_volume) / max_volume) * volume_area_height;
+                    
+                    let bar = Path::rectangle(
+                        Point::new(x_position as f32 - (2.0 * self.scaling), bounds.height - bar_height), 
+                        Size::new(4.0 * self.scaling, bar_height)
+                    );
+                    frame.fill(&bar, Color::from_rgba8(200, 200, 200, 0.4)); 
+                }
             }
         });
 
@@ -569,10 +585,18 @@ impl canvas::Program<Message> for CustomLine {
                     if let Some((_, kline)) = self.klines_raw.iter()
                         .find(|(time, _)| **time == rounded_timestamp as i64) {
 
-                        let tooltip_text = format!(
-                            "O: {} H: {} L: {} C: {}\nBuyV: {:.0} SellV: {:.0}",
-                            kline.0, kline.1, kline.2, kline.3, kline.4, kline.5
-                        );
+                        let tooltip_text: String;
+                        if kline.4 != -1.0 {
+                            tooltip_text = format!(
+                                "O: {} H: {} L: {} C: {}\nBuyV: {:.0} SellV: {:.0}",
+                                kline.0, kline.1, kline.2, kline.3, kline.4, kline.5
+                            );
+                        } else {
+                            tooltip_text = format!(
+                                "O: {} H: {} L: {} C: {}\nVolume: {:.0}",
+                                kline.0, kline.1, kline.2, kline.3, kline.5
+                            );
+                        }
 
                         let text = canvas::Text {
                             content: tooltip_text,

+ 44 - 21
src/charts/footprint.rs

@@ -55,7 +55,7 @@ impl Footprint {
         }
         for trade in trades_raw {
             let rounded_time = (trade.time / aggregate_time) * aggregate_time;
-            let price_level = (trade.price / tick_size).round() as i64 * (tick_size * 100.0) as i64;
+            let price_level: i64 = (trade.price * (1.0 / tick_size)).round() as i64;
 
             let entry: &mut (HashMap<i64, (f32, f32)>, (f32, f32, f32, f32, f32, f32)) = data_points
                 .entry(rounded_time)
@@ -168,7 +168,12 @@ impl Footprint {
             kline_value.2 = kline.low;
             kline_value.3 = kline.close;
             kline_value.4 = kline.taker_buy_base_asset_volume;
-            kline_value.5 = kline.volume - kline.taker_buy_base_asset_volume;
+
+            if kline_value.4 != -1.0 {
+                kline_value.5 = kline.volume - kline.taker_buy_base_asset_volume;
+            } else {
+                kline_value.5 = kline.volume;
+            }
         }
     }
     
@@ -572,22 +577,31 @@ impl canvas::Program<Message> for Footprint {
                 }
 
                 if max_volume > 0.0 {
-                    let buy_bar_height = (kline.4 / max_volume) * volume_area_height;
-                    let sell_bar_height = (kline.5 / max_volume) * volume_area_height;
-
-                    let sell_bar_width = 8.0 * self.scaling;
-                    let sell_bar_x_position = x_position - (5.0*self.scaling) - sell_bar_width;
-                    let sell_bar = Path::rectangle(
-                        Point::new(sell_bar_x_position, bounds.height - sell_bar_height), 
-                        Size::new(sell_bar_width, sell_bar_height)
-                    );
-                    frame.fill(&sell_bar, Color::from_rgb8(192, 80, 77)); 
+                    if kline.4 != -1.0 {
+                        let buy_bar_height = (kline.4 / max_volume) * volume_area_height;
+                        let sell_bar_height = (kline.5 / max_volume) * volume_area_height;
+
+                        let sell_bar_width = 8.0 * self.scaling;
+                        let sell_bar_x_position = x_position - (5.0*self.scaling) - sell_bar_width;
+                        let sell_bar = Path::rectangle(
+                            Point::new(sell_bar_x_position, bounds.height - sell_bar_height), 
+                            Size::new(sell_bar_width, sell_bar_height)
+                        );
+                        frame.fill(&sell_bar, Color::from_rgb8(192, 80, 77)); 
 
-                    let buy_bar = Path::rectangle(
-                        Point::new(x_position + (5.0*self.scaling), bounds.height - buy_bar_height), 
-                        Size::new(8.0 * self.scaling, buy_bar_height)
-                    );
-                    frame.fill(&buy_bar, Color::from_rgb8(81, 205, 160));
+                        let buy_bar = Path::rectangle(
+                            Point::new(x_position + (5.0*self.scaling), bounds.height - buy_bar_height), 
+                            Size::new(8.0 * self.scaling, buy_bar_height)
+                        );
+                        frame.fill(&buy_bar, Color::from_rgb8(81, 205, 160));
+                    } else {
+                        let bar_height = (kline.5 / max_volume) * volume_area_height;
+                        let bar = Path::rectangle(
+                            Point::new(x_position - (3.0*self.scaling), bounds.height - bar_height), 
+                            Size::new(6.0 * self.scaling, bar_height)
+                        );
+                        frame.fill(&bar, Color::from_rgba8(200, 200, 200, 0.4));
+                    }
                 }
             } 
             
@@ -630,10 +644,19 @@ impl canvas::Program<Message> for Footprint {
 
                     if let Some((_, kline)) = self.data_points.iter()
                         .find(|(time, _)| **time == rounded_timestamp) {
-                            let tooltip_text = format!(
-                                "O: {} H: {} L: {} C: {}\nBuyV: {:.0} SellV: {:.0}",
-                                kline.1.0, kline.1.1, kline.1.2, kline.1.3, kline.1.4, kline.1.5
-                            );
+
+                            let tooltip_text: String;
+                            if kline.1.4 != -1.0 {
+                                tooltip_text = format!(
+                                    "O: {} H: {} L: {} C: {}\nBuyV: {:.0} SellV: {:.0}",
+                                    kline.1.0, kline.1.1, kline.1.2, kline.1.3, kline.1.4, kline.1.5
+                                );
+                            } else {
+                                tooltip_text = format!(
+                                    "O: {} H: {} L: {} C: {}\nVolume: {:.0}",
+                                    kline.1.0, kline.1.1, kline.1.2, kline.1.3, kline.1.5
+                                );
+                            }
 
                             let text = canvas::Text {
                                 content: tooltip_text,

+ 2 - 1
src/data_providers.rs

@@ -1 +1,2 @@
-pub mod binance;
+pub mod binance;
+pub mod bybit;

+ 1 - 8
src/data_providers/binance/market_data.rs

@@ -1,8 +1,6 @@
-use hmac::digest::typenum::Or;
 use iced::futures;  
 use iced::subscription::{self, Subscription};
 use serde::{de, Deserialize, Deserializer};
-use futures::channel::mpsc;
 use futures::sink::SinkExt;
 use futures::stream::StreamExt;
 
@@ -10,11 +8,6 @@ use async_tungstenite::tungstenite;
 use serde_json::Value;
 use crate::{Ticker, Timeframe};
 
-use tokio::time::{interval, Duration};
-use futures::FutureExt;
-use std::sync::{Arc, RwLock, Mutex};
-use std::collections::{BTreeMap, HashMap};
-
 #[derive(Debug)]
 #[allow(clippy::large_enum_variant)]
 enum State {
@@ -35,7 +28,7 @@ pub enum Event {
 }
 
 #[derive(Debug, Clone)]
-pub struct Connection(mpsc::Sender<String>);
+pub struct Connection;
 
 impl<'de> Deserialize<'de> for Order {
     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>

+ 1 - 0
src/data_providers/bybit.rs

@@ -0,0 +1 @@
+pub mod market_data;

+ 553 - 0
src/data_providers/bybit/market_data.rs

@@ -0,0 +1,553 @@
+use std::os::macos::raw::stat;
+
+use iced::futures;  
+use iced::subscription::{self, Subscription};
+use serde::{de, Deserialize, Deserializer};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+
+use async_tungstenite::tungstenite;
+use serde_json::Value;
+use crate::{Ticker, Timeframe};
+
+#[derive(Debug)]
+#[allow(clippy::large_enum_variant)]
+enum State {
+    Disconnected,
+    Connected(
+        async_tungstenite::WebSocketStream<
+            async_tungstenite::tokio::ConnectStream,
+        >,
+    ),
+}
+
+#[derive(Debug, Clone)]
+pub enum Event {
+    Connected(Connection),
+    Disconnected,
+    DepthReceived(i64, LocalDepthCache, Vec<Trade>),
+    KlineReceived(Kline, Timeframe),
+}
+
+#[derive(Debug, Clone)]
+pub struct Connection;
+
+#[derive(Debug, Deserialize, Clone)]
+pub struct FetchedDepth {
+    #[serde(rename = "b")]
+    pub bids: Vec<Order>,
+    #[serde(rename = "a")]
+    pub asks: Vec<Order>,
+}
+#[derive(Debug, Clone, Copy, Default)]
+pub struct Order {
+    pub price: f32,
+    pub qty: f32,
+}
+#[derive(Debug, Clone, Default)]
+pub struct LocalDepthCache {
+    pub time: i64,
+    pub bids: Box<[Order]>,
+    pub asks: Box<[Order]>,
+}
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct Depth {
+    #[serde(default)]
+    pub last_update_id: i64,
+    #[serde(rename = "T")]
+    pub time: i64,
+    #[serde(rename = "b")]
+    pub bids: Vec<Order>,
+    #[serde(rename = "a")]
+    pub asks: Vec<Order>,
+}
+
+use std::str::FromStr;
+impl<'de> Deserialize<'de> for Order {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let value: Vec<String> = Deserialize::deserialize(deserializer)?;
+        if value.len() != 2 {
+            return Err(serde::de::Error::custom("Expected an array of two strings"));
+        }
+
+        let price = f32::from_str(&value[0]).map_err(serde::de::Error::custom)?;
+        let qty = f32::from_str(&value[1]).map_err(serde::de::Error::custom)?;
+
+        Ok(Order { price, qty })
+    }
+}
+
+impl Depth {
+    pub fn new() -> Self {
+        Self {
+            last_update_id: 0,
+            time: 0,
+            bids: Vec::new(),
+            asks: Vec::new(),
+        }
+    }
+
+    pub fn fetched(&mut self, new_depth: Depth) {
+        self.last_update_id = new_depth.last_update_id;        
+        self.time = new_depth.time;
+
+        self.bids = new_depth.bids;
+        self.asks = new_depth.asks;
+    }
+
+    pub fn update_depth_cache(&mut self, new_bids: &[Order], new_asks: &[Order]) {
+        for order in new_bids {
+            if order.qty == 0.0 {
+                self.bids.retain(|x| x.price != order.price);
+            } else {
+                if let Some(existing_order) = self.bids.iter_mut().find(|x| x.price == order.price) {
+                    existing_order.qty = order.qty;
+                } else {
+                    self.bids.push(*order);
+                }
+            }
+        }
+        for order in new_asks {
+            if order.qty == 0.0 {
+                self.asks.retain(|x| x.price != order.price);
+            } else {
+                if let Some(existing_order) = self.asks.iter_mut().find(|x| x.price == order.price) {
+                    existing_order.qty = order.qty;
+                } else {
+                    self.asks.push(*order);
+                }
+            }
+        }
+    }
+
+    pub fn update_levels(&mut self, new_depth: Depth) -> (Box<[Order]>, Box<[Order]>) {
+        self.last_update_id = new_depth.last_update_id;
+        self.time = new_depth.time;
+
+        let mut best_ask_price = f32::MAX;
+        let mut best_bid_price = 0.0f32;
+
+        self.bids.iter().for_each(|order| {
+            if order.price > best_bid_price {
+                best_bid_price = order.price;
+            }
+        });
+        self.asks.iter().for_each(|order| {
+            if order.price < best_ask_price {
+                best_ask_price = order.price;
+            }
+        });
+
+        let highest: f32 = best_ask_price * 1.001;
+        let lowest: f32 = best_bid_price * 0.999;
+
+        self.update_depth_cache(&new_depth.bids, &new_depth.asks);
+
+        let mut local_bids: Vec<Order> = Vec::new();
+        let mut local_asks: Vec<Order> = Vec::new();
+
+        for order in &self.bids {
+            if order.price >= lowest {
+                local_bids.push(*order);
+            }
+        }
+        for order in &self.asks {
+            if order.price <= highest {
+                local_asks.push(*order);
+            }
+        }
+
+        // first sort by price
+        local_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
+        local_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
+
+        (local_bids.into_boxed_slice(), local_asks.into_boxed_slice())
+    }
+
+    pub fn get_fetch_id(&self) -> i64 {
+        self.last_update_id
+    }
+}
+
+pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
+    struct Connect;
+
+    subscription::channel(
+        std::any::TypeId::of::<Connect>(),
+        100,
+        move |mut output| async move {
+            let mut state: State = State::Disconnected;  
+
+            let mut trades_buffer: Vec<Trade> = Vec::new();    
+
+            let symbol_str = match selected_ticker {
+                Ticker::BTCUSDT => "BTCUSDT",
+                Ticker::ETHUSDT => "ETHUSDT",
+                Ticker::SOLUSDT => "SOLUSDT",
+                Ticker::LTCUSDT => "LTCUSDT",
+            };
+
+            let stream_1 = format!("publicTrade.{symbol_str}");
+            let stream_2 = format!("orderbook.200.{symbol_str}");
+
+            let mut orderbook: Depth = Depth::new();
+
+            let mut already_fetching: bool = false;
+
+            let mut prev_id: i64 = 0;
+
+            loop {
+                match &mut state {
+                    State::Disconnected => {        
+                        let websocket_server = format!("wss://stream.bybit.com/v5/public/linear");
+
+                        println!("Connecting to websocket server...\n");
+
+                        if let Ok((mut websocket, _)) = async_tungstenite::tokio::connect_async(
+                            websocket_server,
+                        )
+                        .await {
+                            let subscribe_message = serde_json::json!({
+                                "op": "subscribe",
+                                "args": [format!("publicTrade.{symbol_str}"), format!("orderbook.200.{symbol_str}")]
+                            }).to_string();
+    
+                            if let Err(e) = websocket.send(tungstenite::Message::Text(subscribe_message)).await {
+                                eprintln!("Failed subscribing: {}", e);
+
+                                let _ = output.send(Event::Disconnected).await;
+
+                                continue;
+                            } 
+
+                            state = State::Connected(websocket);
+                            
+                        } else {
+                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+                            let _ = output.send(Event::Disconnected).await;
+                        }
+                    }
+                    State::Connected(websocket) => {
+                        let mut fused_websocket = websocket.by_ref().fuse();
+
+                        futures::select! {
+                            received = fused_websocket.select_next_some() => {
+                                match received {
+                                    Ok(tungstenite::Message::Text(message)) => {
+                                        match serde_json::from_str::<Stream>(&message) {
+                                            Ok(stream) => {
+                                                if stream.topic == stream_1 {
+                                                    stream.data.as_array().unwrap().iter().for_each(|trade| {
+                                                        if let Ok(trade) = serde_json::from_value::<Trade>(trade.clone()) {
+                                                            trades_buffer.push(trade);
+                                                        } else {
+                                                            println!("Failed to deserialize trade: {:?}", trade);
+                                                        }
+                                                    });
+
+                                                } else if stream.topic == stream_2 {
+
+                                                    if stream.stream_type == "snapshot" {
+                                                        let bids = stream.data["b"].as_array().unwrap();
+                                                        let asks = stream.data["a"].as_array().unwrap();
+
+                                                        let fetched_depth = Depth {
+                                                            last_update_id: stream.time,
+                                                            time: stream.time,
+                                                            bids: bids.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
+                                                            asks: asks.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
+                                                        };
+
+                                                        orderbook.fetched(fetched_depth);
+
+                                                    } else if stream.stream_type == "delta" {
+                                                        let bids = stream.data["b"].as_array().unwrap();
+                                                        let asks = stream.data["a"].as_array().unwrap();
+
+                                                        let new_depth = Depth {
+                                                            last_update_id: stream.time,
+                                                            time: stream.time,
+                                                            bids: bids.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
+                                                            asks: asks.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
+                                                        };
+
+                                                        let (local_bids, local_asks) = orderbook.update_levels(new_depth);
+
+                                                        let _ = output.send(Event::DepthReceived(stream.time, LocalDepthCache {
+                                                            time: stream.time,
+                                                            bids: local_bids,
+                                                            asks: local_asks,
+                                                        }, std::mem::take(&mut trades_buffer))).await;
+                                                    }
+                                                }
+                                            },
+                                            Err(e) => println!("Failed to deserialize message: {}. Error: {}", message, e),
+                                        }
+                                    }
+                                    Err(_) => {
+                                        let _ = output.send(Event::Disconnected).await;
+                                        state = State::Disconnected;
+                                    }
+                                    Ok(_) => continue,
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+    )
+}
+
+#[derive(Deserialize)]
+struct Stream {
+    topic: String,
+    #[serde(rename = "type")]
+    stream_type: String,
+    #[serde(rename = "ts")]
+    time: i64,
+    data: Value,
+}
+ 
+#[derive(Deserialize, Debug, Clone)]
+pub struct Trade {
+    #[serde(rename = "T")]
+    pub time: i64,
+    #[serde(rename = "S", deserialize_with = "deserialize_is_sell")]
+    pub is_sell: bool,
+    #[serde(with = "string_to_f32", rename = "p")]
+    pub price: f32,
+    #[serde(with = "string_to_f32", rename = "v")]
+    pub qty: f32,
+}
+fn deserialize_is_sell<'de, D>(deserializer: D) -> Result<bool, D::Error>
+where
+    D: Deserializer<'de>,
+{
+    let s: String = Deserialize::deserialize(deserializer)?;
+    match s.as_str() {
+        "Sell" => Ok(true),
+        "Buy" => Ok(false),
+        _ => Err(serde::de::Error::custom("Unexpected value for is_sell")),
+    }
+}
+mod string_to_f32 {
+    use serde::{self, Deserialize, Deserializer};
+
+    pub fn deserialize<'de, D>(deserializer: D) -> Result<f32, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let s: String = Deserialize::deserialize(deserializer)?;
+        s.parse::<f32>().map_err(serde::de::Error::custom)
+    }
+}
+
+pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event> {
+    struct Connect;
+
+    subscription::channel(
+        std::any::TypeId::of::<Connect>(),
+        100,
+        move |mut output| async move {
+            let mut state = State::Disconnected;    
+
+            let stream_str = vec.iter().map(|(ticker, timeframe)| {
+                let symbol_str = match ticker {
+                    Ticker::BTCUSDT => "BTCUSDT",
+                    Ticker::ETHUSDT => "ETHUSDT",
+                    Ticker::SOLUSDT => "SOLUSDT",
+                    Ticker::LTCUSDT => "LTCUSDT",
+                };
+                let timeframe_str = match timeframe {
+                    Timeframe::M1 => "1",
+                    Timeframe::M3 => "3",
+                    Timeframe::M5 => "5",
+                    Timeframe::M15 => "15",
+                    Timeframe::M30 => "30",
+                };
+                format!("kline.{timeframe_str}.{symbol_str}")
+            }).collect::<Vec<String>>();
+ 
+            loop {
+                match &mut state {
+                    State::Disconnected => {
+                        let websocket_server = format!("wss://stream.bybit.com/v5/public/linear");
+                        
+                        if let Ok((mut websocket, _)) = async_tungstenite::tokio::connect_async(
+                            websocket_server,
+                        )
+                        .await {
+                            let subscribe_message = serde_json::json!({
+                                "op": "subscribe",
+                                "args": stream_str 
+                            }).to_string();
+    
+                            if let Err(e) = websocket.send(tungstenite::Message::Text(subscribe_message)).await {
+                                eprintln!("Failed subscribing: {}", e);
+
+                                let _ = output.send(Event::Disconnected).await;
+
+                                continue;
+                            } 
+
+                            state = State::Connected(websocket);
+                            
+                        } else {
+                            tokio::time::sleep(tokio::time::Duration::from_secs(1))
+                           .await;
+                           let _ = output.send(Event::Disconnected).await;
+                        }
+                    }
+                    State::Connected(websocket) => {
+                        let mut fused_websocket = websocket.by_ref().fuse();
+                    
+                        futures::select! {
+                            received = fused_websocket.select_next_some() => {
+                                match received {
+                                    Ok(tungstenite::Message::Text(message)) => {
+                                        match serde_json::from_str::<serde_json::Value>(&message) {
+                                            Ok(data) => {
+                                                if let Some(data_array) = data["data"].as_array() {
+                                                    for kline_obj in data_array {
+                                                        let kline = Kline {
+                                                            time: kline_obj["start"].as_u64().unwrap_or_default(),
+                                                            open: kline_obj["open"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            high: kline_obj["high"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            low: kline_obj["low"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            close: kline_obj["close"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                            volume: kline_obj["volume"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
+                                                        };
+
+                                                        let interval = kline_obj["interval"].as_str().unwrap_or_default();
+                     
+                                                        if let Some(timeframe) = string_to_timeframe(interval) {
+                                                            let _ = output.send(Event::KlineReceived(kline, timeframe)).await;
+                                                        } else {
+                                                            println!("Failed to find timeframe: {}, {:?}", interval, vec);
+                                                        }
+                                                    }
+                                                }
+                                            },
+                                            Err(_) => continue,
+                                        }
+                                    },
+                                    Err(_) => {
+                                        let _ = output.send(Event::Disconnected).await;
+                                        state = State::Disconnected;
+                                    },
+                                    Ok(_) => continue,
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+    )
+}
+
+fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
+    Timeframe::ALL.iter().find(|&tf| tf.to_string() == format!("{}m", interval)).copied()
+}
+
+#[derive(Deserialize, Debug, Clone, Copy)]
+pub struct Kline {
+    pub time: u64,
+    pub open: f32,
+    pub high: f32,
+    pub low: f32,
+    pub close: f32,
+    pub volume: f32,
+}
+
+#[derive(Deserialize, Debug)]
+struct ApiResponse {
+    #[serde(rename = "retCode")]
+    ret_code: u32,
+    #[serde(rename = "retMsg")]
+    ret_msg: String,
+    result: ApiResult,
+}
+
+#[derive(Deserialize, Debug)]
+struct ApiResult {
+    symbol: String,
+    category: String,
+    list: Vec<Vec<Value>>,
+}
+
+pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>, reqwest::Error> {
+    let symbol_str = match ticker {
+        Ticker::BTCUSDT => "BTCUSDT",
+        Ticker::ETHUSDT => "ETHUSDT",
+        Ticker::SOLUSDT => "SOLUSDT",
+        Ticker::LTCUSDT => "LTCUSDT",
+    };
+    let timeframe_str = match timeframe {
+        Timeframe::M1 => "1",
+        Timeframe::M3 => "3",
+        Timeframe::M5 => "5",
+        Timeframe::M15 => "15",
+        Timeframe::M30 => "30",
+    };
+
+    let url = format!("https://api.bybit.com/v5/market/kline?category=linear&symbol={symbol_str}&interval={timeframe_str}&limit=250");
+
+    let response: reqwest::Response = reqwest::get(&url).await?;
+    let text: String = response.text().await?;
+
+    let api_response: ApiResponse = serde_json::from_str(&text).unwrap();
+    
+    let klines: Vec<Kline> = api_response.result.list.iter().map(|kline| {
+        Kline {
+            time: kline[0].as_str().unwrap().parse::<u64>().unwrap(),
+            open: kline[1].as_str().unwrap().parse::<f32>().unwrap(),
+            high: kline[2].as_str().unwrap().parse::<f32>().unwrap(),
+            low: kline[3].as_str().unwrap().parse::<f32>().unwrap(),
+            close: kline[4].as_str().unwrap().parse::<f32>().unwrap(),
+            volume: kline[5].as_str().unwrap().parse::<f32>().unwrap(),
+        }
+    }).collect();
+
+    Ok(klines)
+}
+
+use anyhow::{Result, Context};
+
+pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32> {
+    let symbol_str = match ticker {
+        Ticker::BTCUSDT => "BTCUSDT",
+        Ticker::ETHUSDT => "ETHUSDT",
+        Ticker::SOLUSDT => "SOLUSDT",
+        Ticker::LTCUSDT => "LTCUSDT",
+    };
+
+    let url = format!("https://api.bybit.com/v5/market/instruments-info?category=linear&symbol={}", symbol_str);
+
+    let response: reqwest::Response = reqwest::get(&url).await.context("Failed to send request")?;
+    let text: String = response.text().await.context("Failed to read response text")?;
+    let exchange_info: Value = serde_json::from_str(&text).context("Failed to parse JSON")?;
+
+    let result_list: &Vec<Value> = exchange_info["result"]["list"].as_array().context("Result list is not an array")?;
+
+    for item in result_list {
+        if item["symbol"] == symbol_str {
+            if let Some(price_filter) = item["priceFilter"].as_object() {
+                if let Some(tick_size_str) = price_filter.get("tickSize") {
+                    if let Ok(tick_size) = tick_size_str.as_str().unwrap().parse::<f32>() {
+
+                        println!("Tick size for {} is {}", symbol_str, tick_size);
+                        return Ok(tick_size);
+                    }
+                }
+            }
+        }
+    }
+
+    anyhow::bail!("Tick size not found for symbol {}", symbol_str)
+}

+ 344 - 104
src/main.rs

@@ -1,11 +1,13 @@
 #![windows_subsystem = "windows"]
 
 mod data_providers;
-use data_providers::binance::{user_data, market_data};
+use data_providers::binance::market_data;
+use data_providers::{binance, bybit};
 mod charts;
 use charts::custom_line::{self, CustomLine};
 use charts::heatmap::{self, Heatmap};
 use charts::footprint::{self, Footprint};
+use iced::event;
 
 use std::vec;
 use chrono::{NaiveDateTime, DateTime, Utc};
@@ -21,6 +23,28 @@ use iced::widget::{
 };
 use futures::TryFutureExt;
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum Exchange {
+    BinanceFutures,
+    BybitLinear,
+}
+
+impl std::fmt::Display for Exchange {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{}",
+            match self {
+                Exchange::BinanceFutures => "Binance Futures",
+                Exchange::BybitLinear => "Bybit Linear",
+            }
+        )
+    }
+}
+impl Exchange {
+    const ALL: [Exchange; 2] = [Exchange::BinanceFutures, Exchange::BybitLinear];
+}
+
 impl std::fmt::Display for Ticker {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(
@@ -105,18 +129,29 @@ impl From<Icon> for char {
 }
 
 #[derive(Debug)]
-enum WsState {
-    Connected(market_data::Connection),
+enum BinanceWsState {
+    Connected(binance::market_data::Connection),
+    Disconnected,
+}
+impl Default for BinanceWsState {
+    fn default() -> Self {
+        Self::Disconnected
+    }
+}
+
+#[derive(Debug)]
+enum BybitWsState {
+    Connected(bybit::market_data::Connection),
     Disconnected,
 }
-impl Default for WsState {
+impl Default for BybitWsState {
     fn default() -> Self {
         Self::Disconnected
     }
 }
 
 enum UserWsState {
-    Connected(user_data::Connection),
+    Connected(binance::user_data::Connection),
     Disconnected,
 }
 impl Default for UserWsState {
@@ -173,6 +208,12 @@ impl Default for State {
     }
 }
 
+#[derive(Debug, Clone)]
+pub enum MarketEvents {
+    Binance(binance::market_data::Event),
+    Bybit(bybit::market_data::Event),
+}
+
 #[derive(Debug, Clone)]
 pub enum Message {
     Debug(String),
@@ -187,10 +228,10 @@ pub enum Message {
     UserKeyError,
     TickerSelected(Ticker),
     TimeframeSelected(Timeframe, pane_grid::Pane),
-    ExchangeSelected(&'static str),
-    MarketWsEvent(market_data::Event),
+    ExchangeSelected(Exchange),
+    MarketWsEvent(MarketEvents),
     WsToggle,
-    FetchEvent(Result<Vec<market_data::Kline>, std::string::String>, PaneId, Timeframe),
+    FetchEvent(Result<Vec<binance::market_data::Kline>, std::string::String>, PaneId, Timeframe),
     
     // Pane grid
     Split(pane_grid::Axis, pane_grid::Pane, PaneId),
@@ -217,6 +258,8 @@ pub enum Message {
 
     TicksizeSelected(u16),
     SetMinTickSize(f32),
+    
+    ErrorOccurred(String),
 }
 
 struct State {
@@ -231,8 +274,11 @@ struct State {
     // data streams
     listen_key: Option<String>,
     selected_ticker: Option<Ticker>,
-    selected_exchange: Option<&'static str>,
-    ws_state: WsState,
+    selected_exchange: Option<Exchange>,
+
+    binance_ws_state: BinanceWsState,
+    bybit_ws_state: BybitWsState,
+
     user_ws_state: UserWsState,
     ws_running: bool,
 
@@ -324,8 +370,9 @@ impl State {
 
             listen_key: None,
             selected_ticker: None,
-            selected_exchange: Some("Binance Futures"),
-            ws_state: WsState::Disconnected,
+            selected_exchange: Some(Exchange::BinanceFutures),
+            binance_ws_state: BinanceWsState::Disconnected,
+            bybit_ws_state: BybitWsState::Disconnected,
             user_ws_state: UserWsState::Disconnected,
             ws_running: false,
             panes,
@@ -412,24 +459,60 @@ impl State {
 
                 self.kline_stream = false;
                 
-                let mut Tasks = vec![];
-                let mut dropped_streams = vec![];
+                let mut tasks = vec![];
 
                 if let Some(pane) = self.panes.panes.get_mut(&pane) {
                     let pane_id = pane.id;
 
                     pane.stream.1 = Some(timeframe);
-                    
-                    let fetch_klines = Task::perform(
-                    market_data::fetch_klines(*selected_ticker, timeframe)
-                        .map_err(|err| format!("{err}")), 
-                    move |klines| {
-                        Message::FetchEvent(klines, pane_id, timeframe)
-                    });
-
-                    dropped_streams.push(pane.id);
-                    
-                    Tasks.push(fetch_klines);                                  
+
+                    match self.selected_exchange {
+                        Some(Exchange::BinanceFutures) => {
+                            let fetch_klines = Task::perform(
+                                market_data::fetch_klines(*selected_ticker, timeframe)
+                                    .map_err(|err| format!("{err}")), 
+                                move |klines| {
+                                    Message::FetchEvent(klines, pane_id, timeframe)
+                                }
+                            );
+                            
+                            tasks.push(fetch_klines);
+                        },
+                        Some(Exchange::BybitLinear) => {
+                            let fetch_klines: Task<Message> = Task::perform(
+                                bybit::market_data::fetch_klines(self.selected_ticker.unwrap_or(Ticker::BTCUSDT), timeframe)
+                                    .map_err(|err| format!("{err}")), 
+                                move |klines: Result<Vec<bybit::market_data::Kline>, String>| {
+
+                                    match klines {
+                                        Ok(klines) => {
+                                            let binance_klines: Vec<market_data::Kline> = klines.iter().map(|kline| {
+                                                market_data::Kline {
+                                                    time: kline.time,
+                                                    open: kline.open,
+                                                    high: kline.high,
+                                                    low: kline.low,
+                                                    close: kline.close,
+                                                    volume: kline.volume,
+                                                    taker_buy_base_asset_volume: -1.0,
+                                                }
+                                            }).collect();
+
+                                            Message::FetchEvent(Ok(binance_klines), pane_id, timeframe)
+                                        },
+                                        Err(err) => {
+                                            Message::Debug(err)
+                                        }
+                                    }
+                                }
+                            );
+                            
+                            tasks.push(fetch_klines);
+                        },
+                        None => {
+                            eprintln!("No exchange selected");
+                        }
+                    }                               
                 };
         
                 // sleep to drop existent stream and create new one
@@ -439,9 +522,9 @@ impl State {
                     },
                     move |()| Message::CutTheKlineStream
                 );
-                Tasks.push(remove_active_stream);
+                tasks.push(remove_active_stream);
 
-                Task::batch(Tasks)
+                Task::batch(tasks)
             },
             Message::ExchangeSelected(exchange) => {
                 self.selected_exchange = Some(exchange);
@@ -462,36 +545,74 @@ impl State {
                         }
                         if pane_state.id == PaneId::FootprintChart {
                             let fetch_ticksize: Task<Message> = Task::perform(
-                                market_data::fetch_ticksize(self.selected_ticker.unwrap_or(Ticker::BTCUSDT))
-                                    .map_err(|err| format!("{err}")), 
-                                move |ticksize: Result<f32, String>| {
-                                    Message::SetMinTickSize(ticksize.unwrap_or(1.0))
+                                bybit::market_data::fetch_ticksize(self.selected_ticker.unwrap_or(Ticker::BTCUSDT)),
+                                move |result| match result {
+                                    Ok(ticksize) => Message::SetMinTickSize(ticksize),
+                                    Err(err) => {
+                                        Message::ErrorOccurred(err.to_string())
+                                    }
                                 }
                             );
                             tasks.push(fetch_ticksize);
                         }
 
-                        let selected_timeframe: Timeframe = match pane_state.stream.1 {
-                            Some(timeframe) => timeframe,
-                            None => Timeframe::M1,
-                        };
-
-                        let pane_id: PaneId = pane_state.id;
-
-                        let fetch_klines: Task<Message> = Task::perform(
-                            market_data::fetch_klines(self.selected_ticker.unwrap_or(Ticker::BTCUSDT), selected_timeframe)
-                                .map_err(|err| format!("{err}")), 
-                            move |klines: Result<Vec<market_data::Kline>, String>| {
-                                Message::FetchEvent(klines, pane_id, selected_timeframe)
+                        if let Some(selected_timeframe) = pane_state.stream.1 {
+
+                            let pane_id: PaneId = pane_state.id;
+
+                            match self.selected_exchange {
+                                Some(Exchange::BinanceFutures) => {
+                                    let fetch_klines: Task<Message> = Task::perform(
+                                        market_data::fetch_klines(self.selected_ticker.unwrap_or(Ticker::BTCUSDT), selected_timeframe)
+                                            .map_err(|err| format!("{err}")), 
+                                        move |klines: Result<Vec<market_data::Kline>, String>| {
+                                            Message::FetchEvent(klines, pane_id, selected_timeframe)
+                                        }
+                                    );
+                                    tasks.push(fetch_klines);
+                                },
+                                Some(Exchange::BybitLinear) => {
+                                    let fetch_klines: Task<Message> = Task::perform(
+                                        bybit::market_data::fetch_klines(self.selected_ticker.unwrap_or(Ticker::BTCUSDT), selected_timeframe)
+                                            .map_err(|err| format!("{err}")), 
+                                        move |klines: Result<Vec<bybit::market_data::Kline>, String>| {
+
+                                            match klines {
+                                                Ok(klines) => {
+                                                    let binance_klines: Vec<market_data::Kline> = klines.iter().map(|kline| {
+                                                        market_data::Kline {
+                                                            time: kline.time,
+                                                            open: kline.open,
+                                                            high: kline.high,
+                                                            low: kline.low,
+                                                            close: kline.close,
+                                                            volume: kline.volume,
+                                                            taker_buy_base_asset_volume: -1.0,
+                                                        }
+                                                    }).collect();
+
+                                                    Message::FetchEvent(Ok(binance_klines), pane_id, selected_timeframe)
+                                                },
+                                                Err(err) => {
+                                                    Message::Debug(err)
+                                                }
+                                            }
+                                        }
+                                    );
+                                    tasks.push(fetch_klines);
+                                },
+                                None => {
+                                    eprintln!("No exchange selected");
+                                }
                             }
-                        );
-                        tasks.push(fetch_klines);
+                        }
                     };
                     
                     Task::batch(tasks)
 
                 } else {
-                    self.ws_state = WsState::Disconnected;
+                    self.binance_ws_state = BinanceWsState::Disconnected;
+                    self.bybit_ws_state = BybitWsState::Disconnected;
 
                     self.heatmap_chart = None;
                     self.candlestick_chart = None;
@@ -514,7 +635,7 @@ impl State {
                             },
                             PaneId::FootprintChart => {
                                 if let Some(heatmap_chart) = &mut self.heatmap_chart {
-                                    let copied_trades = heatmap_chart.get_raw_trades();
+                                    let copied_trades: Vec<Trade> = heatmap_chart.get_raw_trades();
 
                                     let mut klines_raw: Vec<(i64, f32, f32, f32, f32, f32, f32)> = vec![];
                                     for kline in &klines {
@@ -524,10 +645,6 @@ impl State {
                                         klines_raw.push((kline.time as i64, kline.open, kline.high, kline.low, kline.close, buy_volume, sell_volume));
                                     }
 
-                                    // get the latest 20 klines
-                                    let copied_klines: Vec<(i64, f32, f32, f32, f32, f32, f32)> = 
-                                        klines_raw.iter().rev().take(20).rev().copied().collect::<Vec<(i64, f32, f32, f32, f32, f32, f32)>>();
-
                                     let timeframe_u16: u16 = match timeframe {
                                         Timeframe::M1 => 1,
                                         Timeframe::M3 => 3,
@@ -538,7 +655,7 @@ impl State {
 
                                     let tick_size = self.tick_multiply as f32 * self.min_tick_size.unwrap_or_default();
 
-                                    self.footprint_chart = Some(Footprint::new(timeframe_u16, tick_size, copied_klines, copied_trades));
+                                    self.footprint_chart = Some(Footprint::new(timeframe_u16, tick_size, klines_raw, copied_trades));
                                 }
                             },
                             _ => {}
@@ -553,47 +670,132 @@ impl State {
             },
             Message::MarketWsEvent(event) => {
                 match event {
-                    market_data::Event::Connected(connection) => {
-                        self.ws_state = WsState::Connected(connection);
-                    }
-                    market_data::Event::Disconnected => {
-                        self.ws_state = WsState::Disconnected;
-                    }
-                    market_data::Event::DepthReceived(depth_update, depth, trades_buffer) => {
-                        if let Some(time_and_sales) = &mut self.time_and_sales {
-                            time_and_sales.update(&trades_buffer);
-                        } 
-
-                        let trades_buffer_clone = trades_buffer.clone();
-
-                        if let Some(chart) = &mut self.heatmap_chart {
-                            chart.insert_datapoint(trades_buffer, depth_update, depth);
-                        } 
-                        if let Some(chart) = &mut self.footprint_chart {
-                            chart.insert_datapoint(trades_buffer_clone, depth_update);
+                    MarketEvents::Binance(event) => match event {
+                        binance::market_data::Event::Connected(connection) => {
+                            self.binance_ws_state = BinanceWsState::Connected(connection);
                         }
-                    }
-                    market_data::Event::KlineReceived(kline, timeframe) => {
-                        for (pane, pane_state) in self.panes.iter() {
-                            if let Some(selected_timeframe) = pane_state.stream.1 {
-                                if selected_timeframe == timeframe {
-                                    match pane_state.id {
-                                        PaneId::CandlestickChart => {
-                                            if let Some(candlestick_chart) = &mut self.candlestick_chart {
-                                                candlestick_chart.insert_datapoint(&kline);
-                                            }
-                                        },
-                                        PaneId::CustomChart => {
-                                            if let Some(custom_line) = &mut self.custom_line {
-                                                custom_line.insert_datapoint(&kline);
-                                            }
-                                        },
-                                        PaneId::FootprintChart => {
-                                            if let Some(footprint_chart) = &mut self.footprint_chart {
-                                                footprint_chart.update_latest_kline(&kline);
-                                            }
-                                        },
-                                        _ => {}
+                        binance::market_data::Event::Disconnected => {
+                            self.binance_ws_state = BinanceWsState::Disconnected;
+                        }
+                        binance::market_data::Event::DepthReceived(depth_update, depth, trades_buffer) => {
+                            if let Some(time_and_sales) = &mut self.time_and_sales {
+                                time_and_sales.update(&trades_buffer);
+                            } 
+
+                            let trades_buffer_clone = trades_buffer.clone();
+
+                            if let Some(chart) = &mut self.heatmap_chart {
+                                chart.insert_datapoint(trades_buffer, depth_update, depth);
+                            } 
+                            if let Some(chart) = &mut self.footprint_chart {
+                                chart.insert_datapoint(trades_buffer_clone, depth_update);
+                            }
+                        }
+                        binance::market_data::Event::KlineReceived(kline, timeframe) => {
+                            for (_, pane_state) in self.panes.iter() {
+                                if let Some(selected_timeframe) = pane_state.stream.1 {
+                                    if selected_timeframe == timeframe {
+                                        match pane_state.id {
+                                            PaneId::CandlestickChart => {
+                                                if let Some(candlestick_chart) = &mut self.candlestick_chart {
+                                                    candlestick_chart.insert_datapoint(&kline);
+                                                }
+                                            },
+                                            PaneId::CustomChart => {
+                                                if let Some(custom_line) = &mut self.custom_line {
+                                                    custom_line.insert_datapoint(&kline);
+                                                }
+                                            },
+                                            PaneId::FootprintChart => {
+                                                if let Some(footprint_chart) = &mut self.footprint_chart {
+                                                    footprint_chart.update_latest_kline(&kline);
+                                                }
+                                            },
+                                            _ => {}
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    },
+
+                    MarketEvents::Bybit(event) => match event {
+                        bybit::market_data::Event::Connected(connection) => {
+                            self.bybit_ws_state = BybitWsState::Connected(connection);
+
+                            println!("Bybit connected");
+                        }
+                        bybit::market_data::Event::Disconnected => {
+                            self.bybit_ws_state = BybitWsState::Disconnected;
+
+                            println!("Bybit disconnected");
+                        }
+                        bybit::market_data::Event::DepthReceived(depth_update, depth, trades_buffer) => {
+
+                            // convert bybit trade to binance trade
+                            let mut binance_trades: Vec<binance::market_data::Trade> = vec![];
+                            
+                            for trade in trades_buffer.iter() {
+                                let binance_trade = binance::market_data::Trade {
+                                    price: trade.price,
+                                    qty: trade.qty,
+                                    time: trade.time,
+                                    is_sell: trade.is_sell,
+                                };
+                                binance_trades.push(binance_trade);
+                            }
+
+                            let local_depth = binance::market_data::LocalDepthCache {
+                                time: depth.time,
+                                bids: depth.bids.iter().map(|order| binance::market_data::Order { price: order.price, qty: order.qty }).collect(),
+                                asks: depth.asks.iter().map(|order| binance::market_data::Order { price: order.price, qty: order.qty }).collect(),
+                            };
+
+                            let trades_clone = binance_trades.clone();
+
+                            if let Some(time_and_sales) = &mut self.time_and_sales {
+                                time_and_sales.update(&binance_trades);
+                            } 
+
+                            if let Some(chart) = &mut self.heatmap_chart {
+                                chart.insert_datapoint(binance_trades, depth_update, local_depth);
+                            } 
+                            if let Some(chart) = &mut self.footprint_chart {
+                                chart.insert_datapoint(trades_clone, depth_update);
+                            }
+                        }
+                        bybit::market_data::Event::KlineReceived(kline, timeframe) => {
+                            for (_, pane_state) in self.panes.iter() {
+                                if let Some(selected_timeframe) = pane_state.stream.1 {
+                                    if selected_timeframe == timeframe {
+                                        let binance_kline = binance::market_data::Kline {
+                                            time: kline.time,
+                                            open: kline.open,
+                                            high: kline.high,
+                                            low: kline.low,
+                                            close: kline.close,
+                                            volume: kline.volume,
+                                            taker_buy_base_asset_volume: -1.0,
+                                        };
+
+                                        match pane_state.id {
+                                            PaneId::CandlestickChart => {
+                                                if let Some(candlestick_chart) = &mut self.candlestick_chart {
+                                                    candlestick_chart.insert_datapoint(&binance_kline);
+                                                }
+                                            },
+                                            PaneId::CustomChart => {
+                                                if let Some(custom_line) = &mut self.custom_line {
+                                                    custom_line.insert_datapoint(&binance_kline);
+                                                }
+                                            },
+                                            PaneId::FootprintChart => {
+                                                if let Some(footprint_chart) = &mut self.footprint_chart {
+                                                    footprint_chart.update_latest_kline(&binance_kline);
+                                                }
+                                            },
+                                            _ => {}
+                                        }
                                     }
                                 }
                             }
@@ -731,6 +933,11 @@ impl State {
                 self.show_layout_modal = false;
                 Task::none()
             },
+
+            Message::ErrorOccurred(err) => {
+                eprintln!("{err}");
+                Task::none()
+            },
         }
     }
 
@@ -852,7 +1059,7 @@ impl State {
             ).placeholder("Choose a ticker...");
             
             let exchange_selector = pick_list(
-                &["Binance Futures"][..],
+                &Exchange::ALL[..],
                 self.selected_exchange,
                 Message::ExchangeSelected,
             ).placeholder("Choose an exchange...");
@@ -932,21 +1139,54 @@ impl State {
 
         if self.ws_running {
             if let Some(ticker) = &self.selected_ticker {
-                let binance_market_stream = market_data::connect_market_stream(*ticker).map(Message::MarketWsEvent);
-                subscriptions.push(binance_market_stream);
+                match self.selected_exchange {
+                    Some(Exchange::BinanceFutures) => {
+                        let binance_market_stream: Subscription<Message> = binance::market_data::connect_market_stream(*ticker)
+                            .map(|arg0: binance::market_data::Event| Message::MarketWsEvent(MarketEvents::Binance(arg0)));
 
-                let mut streams: Vec<(Ticker, Timeframe)> = vec![];
+                        subscriptions.push(binance_market_stream);
 
-                for (_, pane_state) in self.panes.iter() {
-                    let ticker = pane_state.stream.0.unwrap_or(Ticker::BTCUSDT);
-                    let timeframe = pane_state.stream.1.unwrap_or(Timeframe::M1);
+                        let mut streams: Vec<(Ticker, Timeframe)> = vec![];
 
-                    streams.push((ticker, timeframe));
-                }
+                        for (_, pane_state) in self.panes.iter() {
+                            if let (Some(ticker), Some(timeframe)) = (pane_state.stream.0, pane_state.stream.1) {
+                                streams.push((ticker, timeframe));
+                            }
+                        }
+
+                        if !streams.is_empty() && self.kline_stream {
+                            let binance_kline_streams: Subscription<Message> = binance::market_data::connect_kline_stream(streams)
+                                .map(|arg0: binance::market_data::Event| Message::MarketWsEvent(MarketEvents::Binance(arg0)));
+
+                            subscriptions.push(binance_kline_streams);
+                        }
+                    },
+
+                    Some(Exchange::BybitLinear) => {
+                        let bybit_market_stream: Subscription<Message> = bybit::market_data::connect_market_stream(*ticker)
+                            .map(|arg0: bybit::market_data::Event| Message::MarketWsEvent(MarketEvents::Bybit(arg0)));
+
+                        subscriptions.push(bybit_market_stream);
+
+                        let mut streams: Vec<(Ticker, Timeframe)> = vec![];
+
+                        for (_, pane_state) in self.panes.iter() {
+                            if let (Some(ticker), Some(timeframe)) = (pane_state.stream.0, pane_state.stream.1) {
+                                streams.push((ticker, timeframe));
+                            }
+                        }
+
+                        if !streams.is_empty() && self.kline_stream {
+                            let bybit_kline_streams: Subscription<Message> = bybit::market_data::connect_kline_stream(streams)
+                                .map(|arg0: bybit::market_data::Event| Message::MarketWsEvent(MarketEvents::Bybit(arg0)));
+
+                            subscriptions.push(bybit_kline_streams);
+                        }
+                    },
 
-                if !streams.is_empty() && self.kline_stream {
-                    let binance_kline_streams = market_data::connect_kline_stream(streams).map(Message::MarketWsEvent);
-                    subscriptions.push(binance_kline_streams);
+                    None => {
+                        println!("No exchange selected");
+                    },
                 }
             }
         }