Explorar el Código

simpler websocket & parser implementation (#9)

* initial impl. for binance market feed

* new crates for websocket and parsing/de

* code cleanup + better control flow

* conciser deserialization process

* fix reopened panes losing their "states" by keeping the state cache

* improved axis direction pick logic while splitting panes

* fix: pane losing its stream on re-open event

* new ws and parser implementations for bybit connection

* code cleanup + better error handling

* build flag needed by sonic-rs for SIMD

* add .cargo for build flags
Berke hace 1 año
padre
commit
5aa31fbefe
Se han modificado 7 ficheros con 1124 adiciones y 504 borrados
  1. 2 0
      .cargo/config.toml
  2. 1 2
      .gitignore
  3. 189 12
      Cargo.lock
  4. 9 1
      Cargo.toml
  5. 419 216
      src/data_providers/binance/market_data.rs
  6. 438 260
      src/data_providers/bybit/market_data.rs
  7. 66 13
      src/main.rs

+ 2 - 0
.cargo/config.toml

@@ -0,0 +1,2 @@
+[build]
+rustflags = ["-C", "target-cpu=native"]

+ 1 - 2
.gitignore

@@ -1,4 +1,3 @@
 .DS_Store
 /target
-/.vscode
-/.cargo
+/.vscode

+ 189 - 12
Cargo.lock

@@ -332,7 +332,7 @@ dependencies = [
  "tokio",
  "tokio-rustls 0.25.0",
  "tungstenite",
- "webpki-roots",
+ "webpki-roots 0.26.3",
 ]
 
 [[package]]
@@ -362,6 +362,12 @@ dependencies = [
  "rustc-demangle",
 ]
 
+[[package]]
+name = "base64"
+version = "0.21.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
+
 [[package]]
 name = "base64"
 version = "0.22.1"
@@ -1156,6 +1162,37 @@ version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
 
+[[package]]
+name = "faststr"
+version = "0.2.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f375fcf41ec4dac873a8028fba4210dbda5c86bba13d2d741e651b474f7c05a4"
+dependencies = [
+ "bytes",
+ "serde",
+ "simdutf8",
+]
+
+[[package]]
+name = "fastwebsockets"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "93da8b19e29f202ef35ddd20ddea8c86166850fce5ba2a3c3f3e3174cdbb0620"
+dependencies = [
+ "base64 0.21.7",
+ "bytes",
+ "http-body-util",
+ "hyper",
+ "hyper-util",
+ "pin-project",
+ "rand",
+ "sha1",
+ "simdutf8",
+ "thiserror",
+ "tokio",
+ "utf-8",
+]
+
 [[package]]
 name = "fdeflate"
 version = "0.3.4"
@@ -1193,7 +1230,7 @@ version = "0.11.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181"
 dependencies = [
- "spin",
+ "spin 0.9.8",
 ]
 
 [[package]]
@@ -1747,6 +1784,12 @@ version = "1.9.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9"
 
+[[package]]
+name = "httpdate"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
+
 [[package]]
 name = "hyper"
 version = "1.3.1"
@@ -1760,6 +1803,7 @@ dependencies = [
  "http",
  "http-body",
  "httparse",
+ "httpdate",
  "itoa",
  "pin-project-lite",
  "smallvec",
@@ -1877,12 +1921,17 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "async-tungstenite",
- "base64",
+ "base64 0.22.1",
+ "bytes",
  "chrono",
+ "fastwebsockets",
  "futures",
  "futures-util",
  "hex",
  "hmac",
+ "http-body-util",
+ "hyper",
+ "hyper-util",
  "iced 0.13.0-dev",
  "iced_aw",
  "iced_futures 0.12.0",
@@ -1896,11 +1945,14 @@ dependencies = [
  "serde",
  "serde_json",
  "sha2",
+ "sonic-rs",
  "tokio",
  "tokio-native-tls",
+ "tokio-rustls 0.24.1",
  "tokio-tungstenite",
  "tungstenite",
  "url",
+ "webpki-roots 0.23.1",
 ]
 
 [[package]]
@@ -3183,6 +3235,16 @@ dependencies = [
  "ttf-parser 0.21.1",
 ]
 
+[[package]]
+name = "page_size"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
 [[package]]
 name = "palette"
 version = "0.7.6"
@@ -3689,7 +3751,7 @@ version = "0.12.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37"
 dependencies = [
- "base64",
+ "base64 0.22.1",
  "bytes",
  "encoding_rs",
  "futures-core",
@@ -3726,6 +3788,21 @@ dependencies = [
  "winreg 0.52.0",
 ]
 
+[[package]]
+name = "ring"
+version = "0.16.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
+dependencies = [
+ "cc",
+ "libc",
+ "once_cell",
+ "spin 0.5.2",
+ "untrusted 0.7.1",
+ "web-sys",
+ "winapi",
+]
+
 [[package]]
 name = "ring"
 version = "0.17.8"
@@ -3736,8 +3813,8 @@ dependencies = [
  "cfg-if",
  "getrandom",
  "libc",
- "spin",
- "untrusted",
+ "spin 0.9.8",
+ "untrusted 0.9.0",
  "windows-sys 0.52.0",
 ]
 
@@ -3791,6 +3868,18 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "rustls"
+version = "0.21.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
+dependencies = [
+ "log",
+ "ring 0.17.8",
+ "rustls-webpki 0.101.7",
+ "sct",
+]
+
 [[package]]
 name = "rustls"
 version = "0.22.4"
@@ -3798,9 +3887,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
 dependencies = [
  "log",
- "ring",
+ "ring 0.17.8",
  "rustls-pki-types",
- "rustls-webpki",
+ "rustls-webpki 0.102.4",
  "subtle",
  "zeroize",
 ]
@@ -3813,7 +3902,7 @@ checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402"
 dependencies = [
  "once_cell",
  "rustls-pki-types",
- "rustls-webpki",
+ "rustls-webpki 0.102.4",
  "subtle",
  "zeroize",
 ]
@@ -3824,7 +3913,7 @@ version = "2.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d"
 dependencies = [
- "base64",
+ "base64 0.22.1",
  "rustls-pki-types",
 ]
 
@@ -3834,15 +3923,35 @@ version = "1.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d"
 
+[[package]]
+name = "rustls-webpki"
+version = "0.100.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3"
+dependencies = [
+ "ring 0.16.20",
+ "untrusted 0.7.1",
+]
+
+[[package]]
+name = "rustls-webpki"
+version = "0.101.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
+dependencies = [
+ "ring 0.17.8",
+ "untrusted 0.9.0",
+]
+
 [[package]]
 name = "rustls-webpki"
 version = "0.102.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e"
 dependencies = [
- "ring",
+ "ring 0.17.8",
  "rustls-pki-types",
- "untrusted",
+ "untrusted 0.9.0",
 ]
 
 [[package]]
@@ -3898,6 +4007,16 @@ version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
 
+[[package]]
+name = "sct"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
+dependencies = [
+ "ring 0.17.8",
+ "untrusted 0.9.0",
+]
+
 [[package]]
 name = "sctk-adwaita"
 version = "0.8.1"
@@ -4050,6 +4169,12 @@ version = "0.3.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
 
+[[package]]
+name = "simdutf8"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
+
 [[package]]
 name = "siphasher"
 version = "0.3.11"
@@ -4178,6 +4303,33 @@ dependencies = [
  "x11rb",
 ]
 
+[[package]]
+name = "sonic-rs"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7d16b7974db39ac84e6fb1b2aab10d9ef883e5eca1ed2a1bc71d6869e2c4675f"
+dependencies = [
+ "arrayref",
+ "bumpalo",
+ "bytes",
+ "cfg-if",
+ "faststr",
+ "itoa",
+ "page_size",
+ "parking_lot 0.12.3",
+ "ryu",
+ "serde",
+ "simdutf8",
+ "smallvec",
+ "thiserror",
+]
+
+[[package]]
+name = "spin"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
+
 [[package]]
 name = "spin"
 version = "0.9.8"
@@ -4456,6 +4608,16 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-rustls"
+version = "0.24.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
+dependencies = [
+ "rustls 0.21.12",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-rustls"
 version = "0.25.0"
@@ -4709,6 +4871,12 @@ version = "0.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
 
+[[package]]
+name = "untrusted"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
+
 [[package]]
 name = "untrusted"
 version = "0.9.0"
@@ -4989,6 +5157,15 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "webpki-roots"
+version = "0.23.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
+dependencies = [
+ "rustls-webpki 0.100.3",
+]
+
 [[package]]
 name = "webpki-roots"
 version = "0.26.3"

+ 9 - 1
Cargo.toml

@@ -12,7 +12,7 @@ plotters="0.3"
 chrono = "0.4.37"
 plotters-backend = "0.3.5"
 rand = "0.8.5"
-tokio = { version = "1.37.0", features = ["full"] }
+tokio = { version = "1.37.0", features = ["full", "macros"] }
 tokio-tungstenite = "0.21.0"
 url = "2.5.0"
 tokio-native-tls = "0.3.1"
@@ -31,6 +31,14 @@ iced_table = "0.12.0"
 iced_futures = "0.12.0"
 iced_aw = { version = "0.8.0", features = ["quad", "menu"] }
 anyhow = "1.0.86"
+bytes = "1.6.0"
+sonic-rs = "0.3.7"
+fastwebsockets = { version = "0.7.2", features = ["upgrade"] }
+http-body-util = "0.1.2"
+hyper = { version = "1", features = ["http1", "server", "client"] }
+hyper-util = { version = "0.1.0", features = ["tokio"] }
+tokio-rustls = "0.24.0"
+webpki-roots = "0.23.0"
 [dependencies.async-tungstenite]
 version = "0.25"
 features = ["tokio-rustls-webpki-roots"]

+ 419 - 216
src/data_providers/binance/market_data.rs

@@ -1,21 +1,35 @@
+use hyper::client::conn;
 use iced::futures;  
 use iced::subscription::{self, Subscription};
-use serde::{de, Deserialize, Deserializer};
+use serde::{de, Deserializer};
 use futures::sink::SinkExt;
-use futures::stream::StreamExt;
 
-use async_tungstenite::tungstenite;
 use serde_json::Value;
 use crate::{Ticker, Timeframe};
 
-#[derive(Debug)]
+use bytes::Bytes;
+
+use sonic_rs::{LazyValue, JsonValueTrait};
+use sonic_rs::{Deserialize, Serialize}; 
+use sonic_rs::{to_array_iter, to_object_iter_unchecked};
+
+use anyhow::{Context, Result};
+
+use fastwebsockets::{Frame, FragmentCollector, OpCode};
+use http_body_util::Empty;
+use hyper::header::{CONNECTION, UPGRADE};
+use hyper::upgrade::Upgraded;
+use hyper::Request;
+use hyper_util::rt::TokioIo;
+use tokio::net::TcpStream;
+use tokio_rustls::rustls::{ClientConfig, OwnedTrustAnchor};
+use tokio_rustls::TlsConnector;
+
 #[allow(clippy::large_enum_variant)]
 enum State {
     Disconnected,
     Connected(
-        async_tungstenite::WebSocketStream<
-            async_tungstenite::tokio::ConnectStream,
-        >,
+        FragmentCollector<TokioIo<Upgraded>>
     ),
 }
 
@@ -51,13 +65,13 @@ impl<'de> Deserialize<'de> for Order {
 #[derive(Debug, Deserialize, Clone)]
 pub struct FetchedDepth {
     #[serde(rename = "lastUpdateId")]
-    pub update_id: i64,
+    update_id: i64,
     #[serde(rename = "T")]
-    pub time: i64,
+    time: i64,
     #[serde(rename = "bids")]
-    pub bids: Vec<Order>,
+    bids: Vec<Order>,
     #[serde(rename = "asks")]
-    pub asks: Vec<Order>,
+    asks: Vec<Order>,
 }
 #[derive(Debug, Clone, Copy, Default)]
 pub struct Order {
@@ -70,15 +84,11 @@ pub struct LocalDepthCache {
     pub bids: Box<[Order]>,
     pub asks: Box<[Order]>,
 }
-#[derive(Debug, Deserialize, Clone, Default)]
+
 pub struct Depth {
-    #[serde(default)]
     pub last_update_id: i64,
-    #[serde(rename = "T")]
     pub time: i64,
-    #[serde(rename = "b")]
     pub bids: Vec<Order>,
-    #[serde(rename = "a")]
     pub asks: Vec<Order>,
 }
 
@@ -174,6 +184,231 @@ impl Depth {
     }
 }
 
+#[derive(Debug, Clone, Copy)]
+pub struct Trade {
+    pub time: i64,
+    pub is_sell: bool,
+    pub price: f32,
+    pub qty: f32,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct SonicDepth {
+	#[serde(rename = "T")]
+	time: u64,
+	#[serde(rename = "U")]
+	first_id: u64,
+	#[serde(rename = "u")]
+	final_id: u64,
+	#[serde(rename = "pu")]
+	prev_final_id: u64,
+	#[serde(rename = "b")]
+	bids: Vec<BidAsk>,
+	#[serde(rename = "a")]
+	asks: Vec<BidAsk>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct BidAsk {
+	#[serde(rename = "0")]
+	price: String,
+	#[serde(rename = "1")]
+	qty: String,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct SonicTrade {
+	#[serde(rename = "T")]
+	time: u64,
+	#[serde(rename = "p")]
+	price: String,
+	#[serde(rename = "q")]
+	qty: String,
+	#[serde(rename = "m")]
+	is_sell: bool,
+}
+
+#[derive(Deserialize, Debug, Clone)]
+struct SonicKline {
+    #[serde(rename = "t")]
+    time: u64,
+    #[serde(rename = "o")]
+    open: String,
+    #[serde(rename = "h")]
+    high: String,
+    #[serde(rename = "l")]
+    low: String,
+    #[serde(rename = "c")]
+    close: String,
+    #[serde(rename = "v")]
+    volume: String,
+    #[serde(rename = "V")]
+    taker_buy_base_asset_volume: String,
+    #[serde(rename = "i")]
+    interval: String,
+}
+
+#[derive(Deserialize, Debug, Clone)]
+struct SonicKlineWrap {
+    #[serde(rename = "k")]
+    kline: SonicKline,
+}
+
+#[derive(Debug)]
+enum StreamData {
+	Trade(SonicTrade),
+	Depth(SonicDepth),
+    Kline(SonicKline),
+}
+
+#[derive(Debug)]
+enum StreamName {
+    Depth,
+    Trade,
+    Kline,
+    Unknown,
+}
+impl StreamName {
+    fn from_symbol_and_type(symbol: &str, stream_type: &str) -> Self {
+        match stream_type {
+            _ if stream_type == format!("{symbol}@depth@100ms") => StreamName::Depth,
+            _ if stream_type == format!("{symbol}@trade") => StreamName::Trade,
+            _ if stream_type.starts_with(&format!("{symbol}@kline_")) => StreamName::Kline,
+            _ => StreamName::Unknown,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum StreamWrapper {
+	Trade,
+	Depth,
+    Kline,
+}
+
+fn feed_de(bytes: &Bytes, symbol: &str) -> Result<StreamData> {
+	let mut stream_type: Option<StreamWrapper> = None;
+
+	let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(bytes) };
+
+	for elem in iter {
+		let (k, v) = elem
+            .context("Error parsing stream")?;
+
+		if k == "stream" {
+			if let Some(val) = v.as_str() {
+                match StreamName::from_symbol_and_type(symbol, val) {
+					StreamName::Depth => {
+						stream_type = Some(StreamWrapper::Depth);
+					},
+					StreamName::Trade => {
+						stream_type = Some(StreamWrapper::Trade);
+					},
+                    StreamName::Kline => {
+                        stream_type = Some(StreamWrapper::Kline);
+                    },
+					_ => {
+                        eprintln!("Unknown stream name");
+                    }
+				}
+			}
+		} else if k == "data" {
+			match stream_type {
+				Some(StreamWrapper::Trade) => {
+					let trade: SonicTrade = sonic_rs::from_str(&v.as_raw_faststr())
+						.context("Error parsing trade")?;
+
+					return Ok(StreamData::Trade(trade));
+				},
+				Some(StreamWrapper::Depth) => {
+					let depth: SonicDepth = sonic_rs::from_str(&v.as_raw_faststr())
+						.context("Error parsing depth")?;
+
+					return Ok(StreamData::Depth(depth));
+				},
+                Some(StreamWrapper::Kline) => {
+                    let kline_wrap: SonicKlineWrap = sonic_rs::from_str(&v.as_raw_faststr())
+                        .context("Error parsing kline")?;
+
+                    return Ok(StreamData::Kline(kline_wrap.kline));
+                },
+				_ => {
+					eprintln!("Unknown stream type");
+				}
+			}
+		} else {
+			eprintln!("Unknown data: {:?}", k);
+		}
+	}
+
+	Err(anyhow::anyhow!("Unknown data"))
+}
+
+fn tls_connector() -> Result<TlsConnector> {
+	let mut root_store = tokio_rustls::rustls::RootCertStore::empty();
+
+	root_store.add_trust_anchors(
+		webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
+			OwnedTrustAnchor::from_subject_spki_name_constraints(
+			ta.subject,
+			ta.spki,
+			ta.name_constraints,
+			)
+		}),
+	);
+
+	let config = ClientConfig::builder()
+		.with_safe_defaults()
+		.with_root_certificates(root_store)
+		.with_no_client_auth();
+
+	Ok(TlsConnector::from(std::sync::Arc::new(config)))
+}
+
+async fn connect(domain: &str, streams: &str) -> Result<FragmentCollector<TokioIo<Upgraded>>> {
+	let mut addr = String::from(domain);
+	addr.push_str(":443");
+
+	let tcp_stream: TcpStream = TcpStream::connect(&addr).await?;
+	let tls_connector: TlsConnector = tls_connector().unwrap();
+	let domain: tokio_rustls::rustls::ServerName =
+	tokio_rustls::rustls::ServerName::try_from(domain).map_err(|_| {
+		std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid dnsname")
+	})?;
+
+	let tls_stream: tokio_rustls::client::TlsStream<TcpStream> = tls_connector.connect(domain, tcp_stream).await?;
+
+    let url = format!("wss://{}/stream?streams={}", &addr, streams);
+    println!("Connecting to {}", url);
+
+	let req: Request<Empty<Bytes>> = Request::builder()
+	.method("GET")
+	.uri(url)
+	.header("Host", &addr)
+	.header(UPGRADE, "websocket")
+	.header(CONNECTION, "upgrade")
+	.header(
+		"Sec-WebSocket-Key",
+		fastwebsockets::handshake::generate_key(),
+	)
+	.header("Sec-WebSocket-Version", "13")
+	.body(Empty::<Bytes>::new())?;
+
+	let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, tls_stream).await?;
+	Ok(FragmentCollector::new(ws))
+}
+struct SpawnExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
+where
+  Fut: std::future::Future + Send + 'static,
+  Fut::Output: Send + 'static,
+{
+  fn execute(&self, fut: Fut) {
+	tokio::task::spawn(fut);
+  }
+}
+
 pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
     struct Connect;
 
@@ -182,7 +417,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
         100,
         move |mut output| async move {
             let mut state = State::Disconnected;     
-            let mut trades_buffer = Vec::new(); 
+            let mut trades_buffer: Vec<Trade> = Vec::new(); 
 
             let symbol_str = match selected_ticker {
                 Ticker::BTCUSDT => "btcusdt",
@@ -198,17 +433,18 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
 
             let mut already_fetching: bool = false;
 
-            let mut prev_id: i64 = 0;
+            let mut prev_id: u64 = 0;
 
             let mut trade_latencies: Vec<i64> = Vec::new();
 
             loop {
                 match &mut state {
                     State::Disconnected => {        
-                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={stream_1}/{stream_2}");
+                        let streams = format!("{stream_1}/{stream_2}");
 
-                        if let Ok((websocket, _)) = async_tungstenite::tokio::connect_async(
-                            websocket_server,
+                        let domain: &str = "fstream.binance.com";
+
+                        if let Ok(websocket) = connect(domain, streams.as_str()
                         )
                         .await {
                             let (tx, rx) = tokio::sync::oneshot::channel();
@@ -235,7 +471,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                                     orderbook.fetched(depth);
                                     state = State::Connected(websocket);
                                 },
-                                Err(_) => orderbook.fetched(Depth::default()),
+                                Err(_) => output.send(Event::Disconnected).await.expect("Trying to send disconnect event..."),
                             }
                             
                         } else {
@@ -243,143 +479,142 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                            .await;
                            let _ = output.send(Event::Disconnected).await;
                         }
-                    }
-                    State::Connected(websocket) => {
-                        let mut fused_websocket = websocket.by_ref().fuse();
-
+                    },
+                    State::Connected(ws) => {
                         let feed_latency: FeedLatency;
 
-                        futures::select! {
-                            received = fused_websocket.select_next_some() => {
-                                match received {
-                                    Ok(tungstenite::Message::Text(message)) => {
-                                        let stream: Stream = serde_json::from_str(&message).unwrap_or(Stream { stream: String::new() });
-                                        
-                                        if stream.stream == stream_1 {
-                                            let agg_trade: AggTrade = serde_json::from_str(&message).unwrap();
-                                            trades_buffer.push(agg_trade.data);
-
-                                            let latency = chrono::Utc::now().timestamp_millis() - agg_trade.data.time;
-
-                                            trade_latencies.push(latency);
-                                            
-                                        } else if stream.stream == stream_2 {
-                                            if already_fetching {
-                                                println!("Already fetching...\n");
-
-                                                continue;
-                                            }
-
-                                            let depth_update: Value = serde_json::from_str(&message).unwrap();
-                                            let depth_data = depth_update.get("data").unwrap();
-
-                                            let first_update_id = depth_data.get("U").unwrap().as_i64().unwrap();
-                                            let final_update_id = depth_data.get("u").unwrap().as_i64().unwrap();
-
-                                            let last_final_update_id = depth_data.get("pu").unwrap().as_i64().unwrap();
-
-                                            let last_update_id = orderbook.get_fetch_id();
-
-                                            if (final_update_id <= last_update_id) || last_update_id == 0 {
-                                                continue;
-                                            }
-
-                                            if prev_id == 0 && (first_update_id > last_update_id + 1) || (last_update_id + 1 > final_update_id) {
-                                                println!("Out of sync on first event...\nU: {first_update_id}, last_id: {last_update_id}, u: {final_update_id}, pu: {last_final_update_id}\n");
-
-                                                let (tx, rx) = tokio::sync::oneshot::channel();
-                                                already_fetching = true;
+                        match ws.read_frame().await {
+                            Ok(msg) => match msg.opcode {
+                                OpCode::Text => {                    
+                                    let json_bytes: Bytes = Bytes::from(msg.payload.to_vec());
+                    
+                                    if let Ok(data) = feed_de(&json_bytes, symbol_str) {
+                                        match data {
+                                            StreamData::Trade(de_trade) => {
+                                                let trade = Trade {
+                                                    time: de_trade.time as i64,
+                                                    is_sell: de_trade.is_sell,
+                                                    price: str_f32_parse(&de_trade.price),
+                                                    qty: str_f32_parse(&de_trade.qty),
+                                                };
 
-                                                tokio::spawn(async move {
-                                                    let fetched_depth = fetch_depth(selected_ticker).await;
+                                                trade_latencies.push(
+                                                    chrono::Utc::now().timestamp_millis() - trade.time
+                                                );
 
-                                                    let depth: Depth = match fetched_depth {
+                                                trades_buffer.push(trade);
+                                            },
+                                            StreamData::Depth(de_depth) => {
+                                                if already_fetching {
+                                                    println!("Already fetching...\n");
+    
+                                                    continue;
+                                                }
+    
+                                                let last_update_id = orderbook.get_fetch_id() as u64;
+                                                
+                                                if (de_depth.final_id <= last_update_id) || last_update_id == 0 {
+                                                    continue;
+                                                }
+    
+                                                if prev_id == 0 && (de_depth.first_id > last_update_id + 1) || (last_update_id + 1 > de_depth.final_id) {
+                                                    println!("Out of sync at first event. Trying to resync...\n");
+    
+                                                    let (tx, rx) = tokio::sync::oneshot::channel();
+                                                    already_fetching = true;
+    
+                                                    tokio::spawn(async move {
+                                                        let fetched_depth = fetch_depth(selected_ticker).await;
+    
+                                                        let depth: Depth = match fetched_depth {
+                                                            Ok(depth) => {
+                                                                Depth {
+                                                                    last_update_id: depth.update_id,
+                                                                    time: depth.time,
+                                                                    bids: depth.bids,
+                                                                    asks: depth.asks,
+                                                                }
+                                                            },
+                                                            Err(_) => return,
+                                                        };
+    
+                                                        let _ = tx.send(depth);
+                                                    });
+                                                    match rx.await {
                                                         Ok(depth) => {
-                                                            Depth {
-                                                                last_update_id: depth.update_id,
-                                                                time: depth.time,
-                                                                bids: depth.bids,
-                                                                asks: depth.asks,
-                                                            }
+                                                            orderbook.fetched(depth)
                                                         },
-                                                        Err(_) => return,
-                                                    };
-
-                                                    let _ = tx.send(depth);
-                                                });
-                                                match rx.await {
-                                                    Ok(depth) => {
-                                                        orderbook.fetched(depth)
-                                                    },
-                                                    Err(_) => orderbook.fetched(Depth::default()),
+                                                        Err(_) => {
+                                                            state = State::Disconnected;
+                                                            output.send(Event::Disconnected).await.expect("Trying to send disconnect event...");
+                                                        },
+                                                    }
+                                                    already_fetching = false;
                                                 }
-                                                already_fetching = false;
-                                            }
-                                    
-                                            if (prev_id == 0) || (prev_id == last_final_update_id) {
-                                                let depth_update: DepthUpdate = serde_json::from_str(&message).unwrap();
-
-                                                let time = depth_update.data.time;
-
-                                                let depth_latency = chrono::Utc::now().timestamp_millis() - time;
-
-                                                let bids = depth_update.data.bids;
-                                                let asks = depth_update.data.asks;
-
-                                                let depth = Depth { last_update_id: final_update_id, time, bids, asks };
-
-                                                let (local_bids, local_asks) = orderbook.update_levels(depth);
-
-                                                let local_depth_cache = LocalDepthCache {
-                                                    time: time,
-                                                    bids: local_bids,
-                                                    asks: local_asks,
-                                                };
-                                                
-                                                if !trade_latencies.is_empty() {
-                                                    let avg_trade_latency = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
-
-                                                    feed_latency = FeedLatency {
+                                        
+                                                if (prev_id == 0) || (prev_id == de_depth.prev_final_id) {
+                                                    let time = de_depth.time as i64;
+    
+                                                    let depth_latency = chrono::Utc::now().timestamp_millis() - time;
+    
+                                                    let depth_update = Depth {
+                                                        last_update_id: de_depth.final_id as i64,
                                                         time,
-                                                        depth_latency,
-                                                        trade_latency: Some(avg_trade_latency),
+                                                        bids: de_depth.bids.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
+                                                        asks: de_depth.asks.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
+                                                    };
+    
+                                                    let (local_bids, local_asks) = orderbook.update_levels(depth_update);
+    
+                                                    let local_depth_cache = LocalDepthCache {
+                                                        time,
+                                                        bids: local_bids,
+                                                        asks: local_asks,
+                                                    };
+                                                    
+                                                    let avg_trade_latency = if !trade_latencies.is_empty() {
+                                                        let avg = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
+                                                        trade_latencies.clear();
+                                                        Some(avg)
+                                                    } else {
+                                                        None
                                                     };
-
-                                                    trade_latencies.clear();
-                                                } else {
                                                     feed_latency = FeedLatency {
                                                         time,
                                                         depth_latency,
-                                                        trade_latency: None,
+                                                        trade_latency: avg_trade_latency,
                                                     };
+    
+                                                    let _ = output.send(
+                                                        Event::DepthReceived(
+                                                            feed_latency,
+                                                            time, 
+                                                            local_depth_cache,
+                                                            std::mem::take(&mut trades_buffer)
+                                                        )
+                                                    ).await;
+    
+                                                    prev_id = de_depth.final_id;
+                                                } else {
+                                                    eprintln!("Out of sync...\n");
                                                 }
-
-                                                let _ = output.send(
-                                                    Event::DepthReceived(
-                                                        feed_latency,
-                                                        time, 
-                                                        local_depth_cache,
-                                                        std::mem::take(&mut trades_buffer)
-                                                    )
-                                                ).await;
-
-                                                prev_id = final_update_id;
-                                            } else {
-                                                println!("Out of sync...\n");
-                                            }
-
-                                        } else {
-                                            dbg!(stream.stream);
+                                            },
+                                            _ => {}
                                         }
+                                    } else {
+                                        eprintln!("\nUnknown data: {:?}", &json_bytes);
                                     }
-                                    Err(_) => {
-                                        let _ = output.send(Event::Disconnected).await;
-                                        state = State::Disconnected;
-                                    }
-                                    Ok(_) => continue,
                                 }
+                                OpCode::Close => {
+                                    eprintln!("Connection closed");
+                                    let _ = output.send(Event::Disconnected).await;
+                                }
+                                _ => {}
+                            },
+                            Err(e) => {
+                                println!("Error reading frame: {}", e);
                             }
-                        }
+                        };
                     }
                 }
             }
@@ -396,8 +631,10 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
         move |mut output| async move {
             let mut state = State::Disconnected;    
 
+            let mut symbol_str: &str = "";
+
             let stream_str = vec.iter().map(|(ticker, timeframe)| {
-                let symbol_str = match ticker {
+                symbol_str = match ticker {
                     Ticker::BTCUSDT => "btcusdt",
                     Ticker::ETHUSDT => "ethusdt",
                     Ticker::SOLUSDT => "solusdt",
@@ -416,10 +653,12 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
             loop {
                 match &mut state {
                     State::Disconnected => {
-                        let websocket_server = format!("wss://fstream.binance.com/stream?streams={stream_str}");
+                        let domain: &str = "fstream.binance.com";
+
+                        let streams = stream_str.as_str();
                         
-                        if let Ok((websocket, _)) = async_tungstenite::tokio::connect_async(
-                            websocket_server,
+                        if let Ok(websocket) = connect(
+                            domain, streams
                         )
                         .await {
                            state = State::Connected(websocket);
@@ -428,44 +667,35 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
                            .await;
                            let _ = output.send(Event::Disconnected).await;
                         }
-                    }
-                    State::Connected(websocket) => {
-                        let mut fused_websocket = websocket.by_ref().fuse();
-
-                        futures::select! {
-                            received = fused_websocket.select_next_some() => {
-                                match received {
-                                    Ok(tungstenite::Message::Text(message)) => {
-                                        match serde_json::from_str::<serde_json::Value>(&message) {
-                                            Ok(data) => {
-                                                match (data.get("data"), data["data"]["k"]["i"].as_str(), data["data"]["k"].as_object()) {
-                                                    (Some(inner_data), Some(interval), Some(kline_obj)) if inner_data["e"].as_str() == Some("kline") => {
-                                                        let kline = Kline {
-                                                            time: kline_obj["t"].as_u64().unwrap_or_default(),
-                                                            open: kline_obj["o"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            high: kline_obj["h"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            low: kline_obj["l"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            close: kline_obj["c"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            volume: kline_obj["v"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            taker_buy_base_asset_volume: kline_obj["V"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                        };
-                                                
-                                                        if let Some(timeframe) = vec.iter().find(|(_, tf)| tf.to_string() == interval) {
-                                                            let _ = output.send(Event::KlineReceived(kline, timeframe.1)).await;
-                                                        }
-                                                    },
-                                                    _ => continue,
-                                                }                                                
-                                            },
-                                            Err(_) => continue,
+                    },
+                    State::Connected(ws) => {
+                        match ws.read_frame().await {
+                            Ok(msg) => match msg.opcode {
+                                OpCode::Text => {                    
+                                    let json_bytes: Bytes = Bytes::from(msg.payload.to_vec());
+                    
+                                    if let Ok(StreamData::Kline(de_kline)) = feed_de(&json_bytes, symbol_str) {
+                                        let kline = Kline {
+                                            time: de_kline.time,
+                                            open: str_f32_parse(&de_kline.open),
+                                            high: str_f32_parse(&de_kline.high),
+                                            low: str_f32_parse(&de_kline.low),
+                                            close: str_f32_parse(&de_kline.close),
+                                            volume: str_f32_parse(&de_kline.volume),
+                                            taker_buy_base_asset_volume: str_f32_parse(&de_kline.taker_buy_base_asset_volume),
+                                        };
+
+                                        if let Some(timeframe) = vec.iter().find(|(_, tf)| tf.to_string() == de_kline.interval) {
+                                            let _ = output.send(Event::KlineReceived(kline, timeframe.1)).await;
                                         }
-                                    },
-                                    Err(_) => {
-                                        let _ = output.send(Event::Disconnected).await;
-                                        state = State::Disconnected;
-                                    },
-                                    Ok(_) => continue,
+                                    } else {
+                                        eprintln!("\nUnknown data: {:?}", &json_bytes);
+                                    }
                                 }
+                                _ => {}
+                            }, 
+                            Err(e) => {
+                                eprintln!("Error reading frame: {}", e);
                             }
                         }
                     }
@@ -475,6 +705,13 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
     )
 }
 
+fn str_f32_parse(s: &str) -> f32 {
+    s.parse::<f32>().unwrap_or_else(|e| {
+        eprintln!("Failed to parse float: {}, error: {}", s, e);
+        0.0
+    })
+}
+
 mod string_to_f32 {
     use serde::{self, Deserialize, Deserializer};
 
@@ -487,46 +724,14 @@ mod string_to_f32 {
     }
 }
 
-#[derive(Deserialize)]
-struct Stream {
-    stream: String,
-}
-#[derive(Deserialize, Debug)]
-struct AggTrade {
-    data: Trade,
-}
-#[derive(Deserialize, Debug)]
-struct DepthUpdate {
-    data: Depth,
-}
-
-#[derive(Deserialize, Debug, Clone, Copy)]
-pub struct Trade {
-    #[serde(rename = "T")]
-    pub time: i64,
-    #[serde(rename = "m")]
-    pub is_sell: bool,
-    #[serde(with = "string_to_f32", rename = "p")]
-    pub price: f32,
-    #[serde(with = "string_to_f32", rename = "q")]
-    pub qty: f32,
-}
-
-#[derive(Deserialize, Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy)]
 pub struct Kline {
-    #[serde(rename = "t")]
     pub time: u64,
-    #[serde(with = "string_to_f32", rename = "o")]
     pub open: f32,
-    #[serde(with = "string_to_f32", rename = "h")]
     pub high: f32,
-    #[serde(with = "string_to_f32", rename = "l")]
     pub low: f32,
-    #[serde(with = "string_to_f32", rename = "c")]
     pub close: f32,
-    #[serde(with = "string_to_f32", rename = "v")]
     pub volume: f32,
-    #[serde(with = "string_to_f32", rename = "V")]
     pub taker_buy_base_asset_volume: f32,
 }
 #[derive(Deserialize, Debug, Clone)]
@@ -623,8 +828,6 @@ pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32, reqwest::Error> {
     Ok(tick_size)
 }
 
-use anyhow::{Result, Context};
-
 pub async fn fetch_server_time() -> Result<i64> {
     let url = "https://fapi.binance.com/fapi/v1/time";
 

+ 438 - 260
src/data_providers/bybit/market_data.rs

@@ -1,22 +1,36 @@
+use hyper::client::conn;
 use iced::futures;  
 use iced::subscription::{self, Subscription};
-use serde::{de, Deserialize, Deserializer};
 use futures::sink::SinkExt;
-use futures::stream::StreamExt;
 
-use async_tungstenite::tungstenite;
 use serde_json::Value;
 use crate::data_providers::binance::market_data::FeedLatency;
 use crate::{Ticker, Timeframe};
 
-#[derive(Debug)]
+use bytes::Bytes;
+
+use sonic_rs::{LazyValue, JsonValueTrait};
+use sonic_rs::{Deserialize, Serialize}; 
+use sonic_rs::{to_array_iter, to_object_iter_unchecked};
+
+use anyhow::anyhow;
+use anyhow::{Context, Result};
+
+use fastwebsockets::{Frame, FragmentCollector, OpCode};
+use http_body_util::Empty;
+use hyper::header::{CONNECTION, UPGRADE};
+use hyper::upgrade::Upgraded;
+use hyper::Request;
+use hyper_util::rt::TokioIo;
+use tokio::net::TcpStream;
+use tokio_rustls::rustls::{ClientConfig, OwnedTrustAnchor};
+use tokio_rustls::TlsConnector;
+
 #[allow(clippy::large_enum_variant)]
 enum State {
     Disconnected,
     Connected(
-        async_tungstenite::WebSocketStream<
-            async_tungstenite::tokio::ConnectStream,
-        >,
+        FragmentCollector<TokioIo<Upgraded>>
     ),
 }
 
@@ -31,13 +45,6 @@ pub enum Event {
 #[derive(Debug, Clone)]
 pub struct Connection;
 
-#[derive(Debug, Deserialize, Clone)]
-pub struct FetchedDepth {
-    #[serde(rename = "b")]
-    pub bids: Vec<Order>,
-    #[serde(rename = "a")]
-    pub asks: Vec<Order>,
-}
 #[derive(Debug, Clone, Copy, Default)]
 pub struct Order {
     pub price: f32,
@@ -49,36 +56,14 @@ pub struct LocalDepthCache {
     pub bids: Box<[Order]>,
     pub asks: Box<[Order]>,
 }
-#[derive(Debug, Deserialize, Clone, Default)]
+#[derive(Debug, Clone, Default)]
 pub struct Depth {
-    #[serde(default)]
     pub last_update_id: i64,
-    #[serde(rename = "T")]
     pub time: i64,
-    #[serde(rename = "b")]
     pub bids: Vec<Order>,
-    #[serde(rename = "a")]
     pub asks: Vec<Order>,
 }
 
-use std::str::FromStr;
-impl<'de> Deserialize<'de> for Order {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-    where
-        D: Deserializer<'de>,
-    {
-        let value: Vec<String> = Deserialize::deserialize(deserializer)?;
-        if value.len() != 2 {
-            return Err(serde::de::Error::custom("Expected an array of two strings"));
-        }
-
-        let price = f32::from_str(&value[0]).map_err(serde::de::Error::custom)?;
-        let qty = f32::from_str(&value[1]).map_err(serde::de::Error::custom)?;
-
-        Ok(Order { price, qty })
-    }
-}
-
 impl Depth {
     pub fn new() -> Self {
         Self {
@@ -165,10 +150,250 @@ impl Depth {
 
         (local_bids.into_boxed_slice(), local_asks.into_boxed_slice())
     }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct SonicDepth {
+	#[serde(rename = "u")]
+	pub update_id: u64,
+	#[serde(rename = "b")]
+	pub bids: Vec<BidAsk>,
+	#[serde(rename = "a")]
+	pub asks: Vec<BidAsk>,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct BidAsk {
+	#[serde(rename = "0")]
+	pub price: String,
+	#[serde(rename = "1")]
+	pub qty: String,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct SonicTrade {
+	#[serde(rename = "T")]
+	pub time: u64,
+	#[serde(rename = "p")]
+	pub price: String,
+	#[serde(rename = "v")]
+	pub qty: String,
+	#[serde(rename = "S")]
+	pub is_sell: String,
+}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct SonicKline {
+    #[serde(rename = "start")]
+    pub time: u64,
+    #[serde(rename = "open")]
+    pub open: String,
+    #[serde(rename = "high")]
+    pub high: String,
+    #[serde(rename = "low")]
+    pub low: String,
+    #[serde(rename = "close")]
+    pub close: String,
+    #[serde(rename = "volume")]
+    pub volume: String,
+    #[serde(rename = "interval")]
+    pub interval: String,
+}
+
+#[derive(Debug)]
+enum StreamData {
+	Trade(Vec<SonicTrade>),
+	Depth(SonicDepth, String, i64),
+    Kline(Vec<SonicKline>),
+}
+
+#[derive(Debug)]
+enum StreamName {
+    Depth,
+    Trade,
+    Kline,
+    Unknown,
+}
+impl StreamName {
+    fn from_symbol_and_type(symbol: &str, stream_type: &str) -> Self {
+        match stream_type {
+            _ if stream_type == format!("orderbook.200.{symbol}") => StreamName::Depth,
+            _ if stream_type == format!("publicTrade.{symbol}") => StreamName::Trade,
+            _ if stream_type.starts_with(&format!("kline")) => StreamName::Kline,
+            _ => StreamName::Unknown,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum StreamWrapper {
+	Trade,
+	Depth,
+    Kline,
+}
+
+fn feed_de(bytes: &Bytes, symbol: &str) -> Result<StreamData> {
+    let mut stream_type: Option<StreamWrapper> = None;
+
+    let mut depth_wrap: Option<SonicDepth> = None;
+
+    let mut data_type: String = String::new();
+
+    let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(bytes) };
+
+    for elem in iter {
+        let (k, v) = elem.context("Error parsing stream")?;
 
-    pub fn get_fetch_id(&self) -> i64 {
-        self.last_update_id
+        if k == "topic" {
+            if let Some(val) = v.as_str() {
+                match StreamName::from_symbol_and_type(symbol, val) {
+                    StreamName::Depth => {
+                        stream_type = Some(StreamWrapper::Depth);
+                    },
+                    StreamName::Trade => {
+                        stream_type = Some(StreamWrapper::Trade);
+                    },
+                    StreamName::Kline => {
+                        stream_type = Some(StreamWrapper::Kline);
+                    },
+                    _ => {
+                        eprintln!("Unknown stream name");
+                    }
+                }
+            }
+        } else if k == "type" {
+            data_type = v.as_str().unwrap().to_owned();
+        } else if k == "data" {
+            match stream_type {
+                Some(StreamWrapper::Trade) => {
+                    let trade_wrap: Vec<SonicTrade> = sonic_rs::from_str(&v.as_raw_faststr())
+                        .context("Error parsing trade")?;
+
+                    return Ok(StreamData::Trade(trade_wrap));
+                },
+                Some(StreamWrapper::Depth) => {
+                    if depth_wrap.is_none() {
+                        depth_wrap = Some(SonicDepth {
+                            update_id: 0,
+                            bids: Vec::new(),
+                            asks: Vec::new(),
+                        });
+                    }
+                    depth_wrap = Some(sonic_rs::from_str(&v.as_raw_faststr())
+                        .context("Error parsing depth")?);
+                },
+                Some(StreamWrapper::Kline) => {
+                    let kline_wrap: Vec<SonicKline> = sonic_rs::from_str(&v.as_raw_faststr())
+                        .context("Error parsing kline")?;
+
+                    return Ok(StreamData::Kline(kline_wrap));
+                },
+                _ => {
+                    eprintln!("Unknown stream type");
+                }
+            }
+        } else if k == "cts" {
+            if let Some(dw) = depth_wrap {
+                let time: u64 = v.as_u64().context("Error parsing time")?;
+                
+                return Ok(StreamData::Depth(dw, data_type.to_string(), time as i64));
+            }
+        }
     }
+
+    Err(anyhow::anyhow!("Unknown data"))
+}
+
+fn tls_connector() -> Result<TlsConnector> {
+	let mut root_store = tokio_rustls::rustls::RootCertStore::empty();
+
+	root_store.add_trust_anchors(
+		webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
+			OwnedTrustAnchor::from_subject_spki_name_constraints(
+			ta.subject,
+			ta.spki,
+			ta.name_constraints,
+			)
+		}),
+	);
+
+	let config = ClientConfig::builder()
+		.with_safe_defaults()
+		.with_root_certificates(root_store)
+		.with_no_client_auth();
+
+	Ok(TlsConnector::from(std::sync::Arc::new(config)))
+}
+
+async fn connect(domain: &str) -> Result<FragmentCollector<TokioIo<Upgraded>>> {
+	let mut addr = String::from(domain);
+    addr.push_str(":443");
+
+	let tcp_stream: TcpStream = TcpStream::connect(&addr).await?;
+	let tls_connector: TlsConnector = tls_connector().unwrap();
+	let domain: tokio_rustls::rustls::ServerName =
+	tokio_rustls::rustls::ServerName::try_from(domain).map_err(|_| {
+		std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid dnsname")
+	})?;
+
+	let tls_stream: tokio_rustls::client::TlsStream<TcpStream> = tls_connector.connect(domain, tcp_stream).await?;
+
+    let url = format!("wss://stream.bybit.com/v5/public/linear");
+
+	let req: Request<Empty<Bytes>> = Request::builder()
+	.method("GET")
+	.uri(url)
+	.header("Host", &addr)
+	.header(UPGRADE, "websocket")
+	.header(CONNECTION, "upgrade")
+	.header(
+		"Sec-WebSocket-Key",
+		fastwebsockets::handshake::generate_key(),
+	)
+	.header("Sec-WebSocket-Version", "13")
+	.body(Empty::<Bytes>::new())?;
+
+	let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, tls_stream).await?;
+	Ok(FragmentCollector::new(ws))
+}
+struct SpawnExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
+where
+  Fut: std::future::Future + Send + 'static,
+  Fut::Output: Send + 'static,
+{
+  fn execute(&self, fut: Fut) {
+	tokio::task::spawn(fut);
+  }
+}
+
+fn str_f32_parse(s: &str) -> f32 {
+    s.parse::<f32>().unwrap_or_else(|e| {
+        eprintln!("Failed to parse float: {}, error: {}", s, e);
+        0.0
+    })
+}
+
+fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
+    Timeframe::ALL.iter().find(|&tf| tf.to_string() == format!("{}m", interval)).copied()
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct Trade {
+    pub time: i64,
+    pub is_sell: bool,
+    pub price: f32,
+    pub qty: f32,
+}
+#[derive(Deserialize, Debug, Clone, Copy)]
+pub struct Kline {
+    pub time: u64,
+    pub open: f32,
+    pub high: f32,
+    pub low: f32,
+    pub close: f32,
+    pub volume: f32,
 }
 
 pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
@@ -199,26 +424,23 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
             loop {
                 match &mut state {
                     State::Disconnected => {        
-                        let websocket_server = format!("wss://stream.bybit.com/v5/public/linear");
+                        let domain: &str = "stream.bybit.com";
 
-                        println!("Connecting to websocket server...\n");
-
-                        if let Ok((mut websocket, _)) = async_tungstenite::tokio::connect_async(
-                            websocket_server,
+                        if let Ok(mut websocket) = connect(domain
                         )
                         .await {
-                            let subscribe_message = serde_json::json!({
+                            let subscribe_message: String = serde_json::json!({
                                 "op": "subscribe",
-                                "args": [format!("publicTrade.{symbol_str}"), format!("orderbook.200.{symbol_str}")]
+                                "args": [stream_1, stream_2]
                             }).to_string();
     
-                            if let Err(e) = websocket.send(tungstenite::Message::Text(subscribe_message)).await {
+                            if let Err(e) = websocket.write_frame(Frame::text(fastwebsockets::Payload::Borrowed(subscribe_message.as_bytes()))).await {
                                 eprintln!("Failed subscribing: {}", e);
 
                                 let _ = output.send(Event::Disconnected).await;
 
                                 continue;
-                            } 
+                            }
 
                             state = State::Connected(websocket);
                             
@@ -229,95 +451,90 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
                         }
                     }
                     State::Connected(websocket) => {
-                        let mut fused_websocket = websocket.by_ref().fuse();
-
                         let feed_latency: FeedLatency;
 
-                        futures::select! {
-                            received = fused_websocket.select_next_some() => {
-                                match received {
-                                    Ok(tungstenite::Message::Text(message)) => {
-                                        match serde_json::from_str::<Stream>(&message) {
-                                            Ok(stream) => {
-                                                if stream.topic == stream_1 {
-                                                    stream.data.as_array().unwrap().iter().for_each(|trade| {
-                                                        if let Ok(trade) = serde_json::from_value::<Trade>(trade.clone()) {
-                                                            trades_buffer.push(trade);
-
-                                                            let latency = chrono::Utc::now().timestamp_millis() - trade.time;
-
-                                                            trade_latencies.push(latency);
-                                                        } else {
-                                                            eprintln!("Failed to deserialize trade: {:?}", trade);
-                                                        }
-                                                    });
-
-                                                } else if stream.topic == stream_2 {
-                                                    let update_id = stream.data["u"].as_i64().unwrap();
-
-                                                    if (stream.stream_type == "snapshot") || (update_id == 1) {
-                                                        let bids = stream.data["b"].as_array().unwrap();
-                                                        let asks = stream.data["a"].as_array().unwrap();
-
-                                                        let fetched_depth = Depth {
-                                                            last_update_id: update_id,
-                                                            time: stream.time,
-                                                            bids: bids.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
-                                                            asks: asks.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
-                                                        };
-
-                                                        orderbook.fetched(fetched_depth);
-
-                                                    } else if stream.stream_type == "delta" {
-                                                        let bids = stream.data["b"].as_array().unwrap();
-                                                        let asks = stream.data["a"].as_array().unwrap();
-
-                                                        let new_depth = Depth {
-                                                            last_update_id: update_id,
-                                                            time: stream.time,
-                                                            bids: bids.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
-                                                            asks: asks.iter().map(|x| serde_json::from_value::<Order>(x.clone()).unwrap()).collect(),
-                                                        };
-
-                                                        let (local_bids, local_asks) = orderbook.update_levels(new_depth);
-
-                                                        let depth_latency = chrono::Utc::now().timestamp_millis() - stream.time;
-
-                                                        if !trade_latencies.is_empty() {
-                                                            let avg_trade_latency = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
-        
-                                                            feed_latency = FeedLatency {
-                                                                time: stream.time,
-                                                                depth_latency,
-                                                                trade_latency: Some(avg_trade_latency),
-                                                            };
-        
-                                                            trade_latencies.clear();
-                                                        } else {
-                                                            feed_latency = FeedLatency {
-                                                                time: stream.time,
-                                                                depth_latency,
-                                                                trade_latency: None,
-                                                            };
-                                                        }
-
-                                                        let _ = output.send(Event::DepthReceived(feed_latency, stream.time, LocalDepthCache {
-                                                            time: stream.time,
-                                                            bids: local_bids,
-                                                            asks: local_asks,
-                                                        }, std::mem::take(&mut trades_buffer))).await;
-                                                    }
+                        match websocket.read_frame().await {
+                            Ok(msg) => match msg.opcode {
+                                OpCode::Text => {       
+                                    let json_bytes: Bytes = Bytes::from(msg.payload.to_vec());
+
+                                    if let Ok(data) = feed_de(&json_bytes, symbol_str) {
+                                        match data {
+                                            StreamData::Trade(de_trade_vec) => {
+                                                for de_trade in de_trade_vec.iter() {
+                                                    let trade = Trade {
+                                                        time: de_trade.time as i64,
+                                                        is_sell: if de_trade.is_sell == "Sell" { true } else { false },
+                                                        price: str_f32_parse(&de_trade.price),
+                                                        qty: str_f32_parse(&de_trade.qty),
+                                                    };
+
+                                                    trade_latencies.push(
+                                                        chrono::Utc::now().timestamp_millis() - trade.time
+                                                    );
+
+                                                    trades_buffer.push(trade);
+                                                }                                             
+                                            },
+                                            StreamData::Depth(de_depth, data_type, time) => {                                            
+                                                let depth_latency = chrono::Utc::now().timestamp_millis() - time;
+
+                                                let depth_update = Depth {
+                                                    last_update_id: de_depth.update_id as i64,
+                                                    time,
+                                                    bids: de_depth.bids.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
+                                                    asks: de_depth.asks.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
+                                                };
+
+                                                if (data_type == "snapshot") || (depth_update.last_update_id == 1) {
+                                                    orderbook.fetched(depth_update);
+
+                                                } else if data_type == "delta" {
+                                                    let (local_bids, local_asks) = orderbook.update_levels(depth_update);
+
+                                                    let local_depth_cache = LocalDepthCache {
+                                                        time,
+                                                        bids: local_bids,
+                                                        asks: local_asks,
+                                                    };
+
+                                                    let avg_trade_latency = if !trade_latencies.is_empty() {
+                                                        let avg = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
+                                                        trade_latencies.clear();
+                                                        Some(avg)
+                                                    } else {
+                                                        None
+                                                    };
+                                                    feed_latency = FeedLatency {
+                                                        time,
+                                                        depth_latency,
+                                                        trade_latency: avg_trade_latency,
+                                                    };
+
+                                                    let _ = output.send(
+                                                        Event::DepthReceived(
+                                                            feed_latency,
+                                                            time, 
+                                                            local_depth_cache,
+                                                            std::mem::take(&mut trades_buffer)
+                                                        )
+                                                    ).await;
                                                 }
                                             },
-                                            Err(e) => println!("Failed to deserialize message: {}. Error: {}", message, e),
+                                            _ => {
+                                                println!("Unknown data: {:?}", &data);
+                                            }
                                         }
                                     }
-                                    Err(_) => {
-                                        let _ = output.send(Event::Disconnected).await;
-                                        state = State::Disconnected;
-                                    }
-                                    Ok(_) => continue,
-                                }
+                                },
+                                OpCode::Close => {
+                                    eprintln!("Connection closed");
+                                    let _ = output.send(Event::Disconnected).await;
+                                },
+                                _ => {}
+                            },
+                            Err(e) => {
+                                println!("Error reading frame: {}", e);
                             }
                         }
                     }
@@ -326,51 +543,7 @@ pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
         },
     )
 }
-
-#[derive(Deserialize)]
-struct Stream {
-    topic: String,
-    #[serde(rename = "type")]
-    stream_type: String,
-    #[serde(rename = "cts", default)]
-    time: i64,
-    data: Value,
-}
  
-#[derive(Deserialize, Debug, Clone, Copy)]
-pub struct Trade {
-    #[serde(rename = "T")]
-    pub time: i64,
-    #[serde(rename = "S", deserialize_with = "deserialize_is_sell")]
-    pub is_sell: bool,
-    #[serde(with = "string_to_f32", rename = "p")]
-    pub price: f32,
-    #[serde(with = "string_to_f32", rename = "v")]
-    pub qty: f32,
-}
-fn deserialize_is_sell<'de, D>(deserializer: D) -> Result<bool, D::Error>
-where
-    D: Deserializer<'de>,
-{
-    let s: String = Deserialize::deserialize(deserializer)?;
-    match s.as_str() {
-        "Sell" => Ok(true),
-        "Buy" => Ok(false),
-        _ => Err(serde::de::Error::custom("Unexpected value for is_sell")),
-    }
-}
-mod string_to_f32 {
-    use serde::{self, Deserialize, Deserializer};
-
-    pub fn deserialize<'de, D>(deserializer: D) -> Result<f32, D::Error>
-    where
-        D: Deserializer<'de>,
-    {
-        let s: String = Deserialize::deserialize(deserializer)?;
-        s.parse::<f32>().map_err(serde::de::Error::custom)
-    }
-}
-
 pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event> {
     struct Connect;
 
@@ -380,8 +553,10 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
         move |mut output| async move {
             let mut state = State::Disconnected;    
 
+            let mut symbol_str: &str = "";
+
             let stream_str = vec.iter().map(|(ticker, timeframe)| {
-                let symbol_str = match ticker {
+                symbol_str = match ticker {
                     Ticker::BTCUSDT => "BTCUSDT",
                     Ticker::ETHUSDT => "ETHUSDT",
                     Ticker::SOLUSDT => "SOLUSDT",
@@ -400,10 +575,10 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
             loop {
                 match &mut state {
                     State::Disconnected => {
-                        let websocket_server = format!("wss://stream.bybit.com/v5/public/linear");
+                        let domain = "stream.bybit.com";
                         
-                        if let Ok((mut websocket, _)) = async_tungstenite::tokio::connect_async(
-                            websocket_server,
+                        if let Ok(mut websocket) = connect(
+                            domain,
                         )
                         .await {
                             let subscribe_message = serde_json::json!({
@@ -411,7 +586,7 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
                                 "args": stream_str 
                             }).to_string();
     
-                            if let Err(e) = websocket.send(tungstenite::Message::Text(subscribe_message)).await {
+                            if let Err(e) = websocket.write_frame(Frame::text(fastwebsockets::Payload::Borrowed(subscribe_message.as_bytes()))).await {
                                 eprintln!("Failed subscribing: {}", e);
 
                                 let _ = output.send(Event::Disconnected).await;
@@ -428,44 +603,37 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
                         }
                     }
                     State::Connected(websocket) => {
-                        let mut fused_websocket = websocket.by_ref().fuse();
+                        match websocket.read_frame().await {
+                            Ok(msg) => match msg.opcode {
+                                OpCode::Text => {                    
+                                    let json_bytes: Bytes = Bytes::from(msg.payload.to_vec());
                     
-                        futures::select! {
-                            received = fused_websocket.select_next_some() => {
-                                match received {
-                                    Ok(tungstenite::Message::Text(message)) => {
-                                        match serde_json::from_str::<serde_json::Value>(&message) {
-                                            Ok(data) => {
-                                                if let Some(data_array) = data["data"].as_array() {
-                                                    for kline_obj in data_array {
-                                                        let kline = Kline {
-                                                            time: kline_obj["start"].as_u64().unwrap_or_default(),
-                                                            open: kline_obj["open"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            high: kline_obj["high"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            low: kline_obj["low"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            close: kline_obj["close"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                            volume: kline_obj["volume"].as_str().unwrap_or_default().parse::<f32>().unwrap_or_default(),
-                                                        };
-
-                                                        let interval = kline_obj["interval"].as_str().unwrap_or_default();
-                     
-                                                        if let Some(timeframe) = string_to_timeframe(interval) {
-                                                            let _ = output.send(Event::KlineReceived(kline, timeframe)).await;
-                                                        } else {
-                                                            println!("Failed to find timeframe: {}, {:?}", interval, vec);
-                                                        }
-                                                    }
-                                                }
-                                            },
-                                            Err(_) => continue,
+                                    if let Ok(StreamData::Kline(de_kline_vec)) = feed_de(&json_bytes, symbol_str) {
+                                        for de_kline in de_kline_vec.iter() {
+                                            let kline = Kline {
+                                                time: de_kline.time,
+                                                open: str_f32_parse(&de_kline.open),
+                                                high: str_f32_parse(&de_kline.high),
+                                                low: str_f32_parse(&de_kline.low),
+                                                close: str_f32_parse(&de_kline.close),
+                                                volume: str_f32_parse(&de_kline.volume),
+                                            };
+
+                                            if let Some(timeframe) = string_to_timeframe(&de_kline.interval) {
+                                                let _ = output.send(Event::KlineReceived(kline, timeframe)).await;
+                                            } else {
+                                                eprintln!("Failed to find timeframe: {}, {:?}", &de_kline.interval, vec);
+                                            }
                                         }
-                                    },
-                                    Err(_) => {
-                                        let _ = output.send(Event::Disconnected).await;
-                                        state = State::Disconnected;
-                                    },
-                                    Ok(_) => continue,
+                                         
+                                    } else {
+                                        eprintln!("\nUnknown data: {:?}", &json_bytes);
+                                    }
                                 }
+                                _ => {}
+                            },
+                            Err(e) => {
+                                eprintln!("Error reading frame: {}", e);
                             }
                         }
                     }
@@ -475,20 +643,6 @@ pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event
     )
 }
 
-fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
-    Timeframe::ALL.iter().find(|&tf| tf.to_string() == format!("{}m", interval)).copied()
-}
-
-#[derive(Deserialize, Debug, Clone, Copy)]
-pub struct Kline {
-    pub time: u64,
-    pub open: f32,
-    pub high: f32,
-    pub low: f32,
-    pub close: f32,
-    pub volume: f32,
-}
-
 #[derive(Deserialize, Debug)]
 struct ApiResponse {
     #[serde(rename = "retCode")]
@@ -505,14 +659,14 @@ struct ApiResult {
     list: Vec<Vec<Value>>,
 }
 
-pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>, reqwest::Error> {
-    let symbol_str = match ticker {
+pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>> {
+    let symbol_str: &str = match ticker {
         Ticker::BTCUSDT => "BTCUSDT",
         Ticker::ETHUSDT => "ETHUSDT",
         Ticker::SOLUSDT => "SOLUSDT",
         Ticker::LTCUSDT => "LTCUSDT",
     };
-    let timeframe_str = match timeframe {
+    let timeframe_str: &str = match timeframe {
         Timeframe::M1 => "1",
         Timeframe::M3 => "3",
         Timeframe::M5 => "5",
@@ -520,29 +674,49 @@ pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kl
         Timeframe::M30 => "30",
     };
 
-    let url = format!("https://api.bybit.com/v5/market/kline?category=linear&symbol={symbol_str}&interval={timeframe_str}&limit=250");
+    let url: String = format!("https://api.bybit.com/v5/market/kline?category=linear&symbol={symbol_str}&interval={timeframe_str}&limit=250");
 
-    let response: reqwest::Response = reqwest::get(&url).await?;
-    let text: String = response.text().await?;
+    let response: reqwest::Response = reqwest::get(&url).await
+        .context("Failed to send request")?;
+    let text: String = response.text().await
+        .context("Failed to read response text")?;
 
-    let api_response: ApiResponse = serde_json::from_str(&text).unwrap();
+    let api_response: ApiResponse = sonic_rs::from_str(&text)
+        .context("Failed to parse JSON")?;
     
-    let klines: Vec<Kline> = api_response.result.list.iter().map(|kline| {
-        Kline {
-            time: kline[0].as_str().unwrap().parse::<u64>().unwrap(),
-            open: kline[1].as_str().unwrap().parse::<f32>().unwrap(),
-            high: kline[2].as_str().unwrap().parse::<f32>().unwrap(),
-            low: kline[3].as_str().unwrap().parse::<f32>().unwrap(),
-            close: kline[4].as_str().unwrap().parse::<f32>().unwrap(),
-            volume: kline[5].as_str().unwrap().parse::<f32>().unwrap(),
-        }
+    let klines: Result<Vec<Kline>, anyhow::Error> = api_response.result.list.iter().map(|kline| {
+        let time = kline[0].as_str().ok_or_else(|| anyhow!("Missing time value"))
+            .and_then(|s| s.parse::<u64>()
+            .context("Failed to parse time as u64"));
+        let open = kline[1].as_str().ok_or_else(|| anyhow!("Missing open value"))
+            .and_then(|s| s.parse::<f32>()
+            .context("Failed to parse open as f32"));
+        let high = kline[2].as_str().ok_or_else(|| anyhow!("Missing high value"))
+            .and_then(|s| s.parse::<f32>()
+            .context("Failed to parse high as f32"));
+        let low = kline[3].as_str().ok_or_else(|| anyhow!("Missing low value"))
+            .and_then(|s| s.parse::<f32>()
+            .context("Failed to parse low as f32"));
+        let close = kline[4].as_str().ok_or_else(|| anyhow!("Missing close value"))
+            .and_then(|s| s.parse::<f32>()
+            .context("Failed to parse close as f32"));
+        let volume = kline[5].as_str().ok_or_else(|| anyhow!("Missing volume value"))
+            .and_then(|s| s.parse::<f32>()
+            .context("Failed to parse volume as f32"));
+    
+        Ok(Kline {
+            time: time?,
+            open: open?,
+            high: high?,
+            low: low?,
+            close: close?,
+            volume: volume?,
+        })
     }).collect();
 
-    Ok(klines)
+    Ok(klines?)
 }
 
-use anyhow::{Result, Context};
-
 pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32> {
     let symbol_str = match ticker {
         Ticker::BTCUSDT => "BTCUSDT",
@@ -553,23 +727,27 @@ pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32> {
 
     let url = format!("https://api.bybit.com/v5/market/instruments-info?category=linear&symbol={}", symbol_str);
 
-    let response: reqwest::Response = reqwest::get(&url).await.context("Failed to send request")?;
-    let text: String = response.text().await.context("Failed to read response text")?;
-    let exchange_info: Value = serde_json::from_str(&text).context("Failed to parse JSON")?;
+    let response: reqwest::Response = reqwest::get(&url).await
+        .context("Failed to send request")?;
+    let text: String = response.text().await
+        .context("Failed to read response text")?;
+
+    let exchange_info: Value = sonic_rs::from_str(&text)
+        .context("Failed to parse JSON")?;
 
-    let result_list: &Vec<Value> = exchange_info["result"]["list"].as_array().context("Result list is not an array")?;
+    let result_list: &Vec<Value> = exchange_info["result"]["list"].as_array()
+        .context("Result list is not an array")?;
 
     for item in result_list {
         if item["symbol"] == symbol_str {
-            if let Some(price_filter) = item["priceFilter"].as_object() {
-                if let Some(tick_size_str) = price_filter.get("tickSize") {
-                    if let Ok(tick_size) = tick_size_str.as_str().unwrap().parse::<f32>() {
+            let price_filter: &serde_json::Map<String, Value> = item["priceFilter"].as_object()
+                .context("Price filter not found")?;
 
-                        println!("Tick size for {} is {}", symbol_str, tick_size);
-                        return Ok(tick_size);
-                    }
-                }
-            }
+            let tick_size_str: &str = price_filter.get("tickSize").context("Tick size not found")?.as_str()
+                .context("Tick size is not a string")?;
+
+            return Ok(tick_size_str.parse::<f32>()
+                .context("Failed to parse tick size")?);
         }
     }
 

+ 66 - 13
src/main.rs

@@ -7,9 +7,8 @@ mod charts;
 use charts::custom_line::{self, CustomLine};
 use charts::heatmap::{self, Heatmap};
 use charts::footprint::{self, Footprint};
-use iced::event;
 
-use std::collections::VecDeque;
+use std::collections::{VecDeque, HashMap};
 use std::vec;
 use chrono::{NaiveDateTime, DateTime, Utc};
 use iced::{
@@ -180,11 +179,11 @@ struct PaneSpec {
 }
 
 impl PaneSpec {
-    fn new(id: PaneId) -> Self {
+    fn new(id: PaneId, from_cache: (Option<Ticker>, Option<Timeframe>, Option<f32>)) -> Self {
         Self { 
             id,
             show_modal: false,
-            stream: (None, None, None),
+            stream: from_cache,
         }
     }
 }
@@ -219,6 +218,8 @@ pub enum MarketEvents {
 pub enum Message {
     Debug(String),
 
+    RestartStream(Option<pane_grid::Pane>, (Option<Ticker>, Option<Timeframe>, Option<f32>)),
+
     CustomLine(custom_line::Message),
     Candlestick(custom_line::Message),
     Heatmap(heatmap::Message),
@@ -301,6 +302,10 @@ struct State {
     exchange_latency: Option<(u32, u32)>,
 
     feed_latency_cache: VecDeque<FeedLatency>,
+    
+    pane_state_cache: HashMap<PaneId, (Option<Ticker>, Option<Timeframe>, Option<f32>)>,
+
+    last_axis_split: Option<pane_grid::Axis>,
 }
 
 impl State {
@@ -390,6 +395,10 @@ impl State {
             exchange_latency: None,
 
             feed_latency_cache: VecDeque::new(),
+
+            pane_state_cache: HashMap::new(),
+
+            last_axis_split: None,
         }
     }
 
@@ -541,6 +550,25 @@ impl State {
                 self.selected_exchange = Some(exchange);
                 Task::none()
             },
+            Message::RestartStream(pane, cached_state) => {
+                if let Some(pane) = pane {
+                    if let Some(timeframe) = cached_state.1 {
+                        Task::perform(
+                            async {
+                            },
+                            move |()| Message::TimeframeSelected(timeframe, pane)
+                        )
+                    } else {
+                        Task::perform(
+                            async {
+                            },
+                            move |()| Message::ErrorOccurred(format!("No timeframe found in pane state to stream"))
+                        )
+                    }
+                } else {
+                    Task::none()
+                }
+            }
             Message::WsToggle => {
                 self.ws_running = !self.ws_running;
 
@@ -922,19 +950,29 @@ impl State {
 
             // Pane grid
             Message::Split(axis, pane, pane_id) => {
-                let focus_pane = if let Some((pane, _)) = self.panes.split(axis, pane, PaneSpec::new(pane_id)) {
-                    Some(pane)
+                let cached_pane_state: (Option<Ticker>, Option<Timeframe>, Option<f32>) = *self.pane_state_cache.get(&pane_id).unwrap_or(&(None, None, None));
+
+                let new_pane = None;
+
+                let focus_pane = if let Some((new_pane, _)) = self.panes.split(axis, pane, PaneSpec::new(pane_id, cached_pane_state)) {
+                    Some(new_pane)
                 } else if let Some((&first_pane, _)) = self.panes.panes.iter().next() {
-                    self.panes.split(axis, first_pane, PaneSpec::new(pane_id)).map(|(pane, _)| pane)
+                    self.panes.split(axis, first_pane, PaneSpec::new(pane_id, cached_pane_state)).map(|(new_pane, _)| new_pane)
                 } else {
                     None
                 };
 
                 if Some(focus_pane).is_some() {
                     self.focus = focus_pane;
-                } 
+                }
 
-                Task::none()
+                self.last_axis_split = Some(axis);
+
+                Task::perform(
+                    async {
+                    },
+                    move |()| Message::RestartStream(new_pane, cached_pane_state)
+                )
             },
             Message::Clicked(pane) => {
                 self.focus = Some(pane);
@@ -962,14 +1000,17 @@ impl State {
                 self.panes.restore();
                 Task::none()
             },
-            Message::Close(pane) => {                
+            Message::Close(pane) => {       
+                let pane_state = self.panes.get(pane).unwrap();
+                
+                self.pane_state_cache.insert(pane_state.id, (pane_state.stream.0, pane_state.stream.1, pane_state.stream.2));
+
                 if let Some((_, sibling)) = self.panes.close(pane) {
                     self.focus = Some(sibling);
                 }
                 Task::none()
             },
             Message::ToggleLayoutLock => {
-                self.focus = None;
                 self.pane_lock = !self.pane_lock;
                 Task::none()
             },
@@ -1252,13 +1293,25 @@ impl State {
                 (PaneId::TimeAndSales, "Time & Sales"),
             ];
 
+            let pane_to_split = self.focus.unwrap_or_else(|| { dbg!("No focused pane found"); self.first_pane });
+
+            let mut axis_to_split = if rand::random() { pane_grid::Axis::Horizontal } else { pane_grid::Axis::Vertical };
+
+            if let Some(axis) = self.last_axis_split {
+                if axis == pane_grid::Axis::Horizontal {
+                    axis_to_split = pane_grid::Axis::Vertical;
+                } else {
+                    axis_to_split = pane_grid::Axis::Horizontal;
+                }
+            } 
+
             for (pane_id, label) in pane_info {
                 let button = button(label).width(iced::Pixels(200.0));
 
-                if self.panes.iter().any(|(_p, ps)| ps.id == pane_id) {
+                if self.panes.iter().any(|(_, ps)| ps.id == pane_id) {
                     buttons = buttons.push(button);
                 } else {
-                    let message = Message::Split(pane_grid::Axis::Vertical, self.first_pane, pane_id);
+                    let message = Message::Split(axis_to_split, pane_to_split, pane_id);
                     buttons = buttons.push(button.on_press(message));
                 }
             }