Ver código fonte

数据应该是对接好了,下一步是研究怎么显示到图表上面

skyffire 9 meses atrás
pai
commit
20f40bbaa2
4 arquivos alterados com 247 adições e 234 exclusões
  1. 77 36
      Cargo.lock
  2. 3 1
      Cargo.toml
  3. 150 196
      src/data_providers/china_futures.rs
  4. 17 1
      src/main.rs

+ 77 - 36
Cargo.lock

@@ -20,9 +20,9 @@ checksum = "c71b1793ee61086797f5c80b6efa2b8ffa6d5dd703f118545808a7f2e27f7046"
 
 [[package]]
 name = "addr2line"
-version = "0.22.0"
+version = "0.24.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678"
+checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
 dependencies = [
  "gimli",
 ]
@@ -357,17 +357,17 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
 
 [[package]]
 name = "backtrace"
-version = "0.3.73"
+version = "0.3.74"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a"
+checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
 dependencies = [
  "addr2line",
- "cc",
  "cfg-if",
  "libc",
- "miniz_oxide 0.7.4",
+ "miniz_oxide 0.8.0",
  "object",
  "rustc-demangle",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -547,6 +547,16 @@ version = "1.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
 
+[[package]]
+name = "bytes"
+version = "0.4.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
+dependencies = [
+ "byteorder",
+ "iovec",
+]
+
 [[package]]
 name = "bytes"
 version = "1.9.0"
@@ -745,7 +755,7 @@ version = "4.6.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "memchr",
 ]
 
@@ -1262,7 +1272,7 @@ version = "0.2.19"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f375fcf41ec4dac873a8028fba4210dbda5c86bba13d2d741e651b474f7c05a4"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "serde",
  "simdutf8",
 ]
@@ -1274,7 +1284,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "26da0c7b5cef45c521a6f9cdfffdfeb6c9f5804fbac332deb5ae254634c7a6be"
 dependencies = [
  "base64 0.21.7",
- "bytes",
+ "bytes 1.9.0",
  "http-body-util",
  "hyper",
  "hyper-util",
@@ -1326,13 +1336,14 @@ name = "flowsurface"
 version = "0.5.0"
 dependencies = [
  "async-tungstenite",
+ "backtrace",
  "base64 0.22.1",
- "bytes",
+ "bytes 1.9.0",
  "chrono",
  "csv",
  "fastwebsockets",
  "fern",
- "futures",
+ "futures 0.3.31",
  "futures-util",
  "hex",
  "hmac",
@@ -1357,6 +1368,7 @@ dependencies = [
  "tokio-native-tls",
  "tokio-rustls 0.24.1",
  "tokio-tungstenite",
+ "tokio-util",
  "tracing",
  "tracing-appender-timezone",
  "tracing-subscriber",
@@ -1462,6 +1474,12 @@ version = "2.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
 
+[[package]]
+name = "futures"
+version = "0.1.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
+
 [[package]]
 name = "futures"
 version = "0.3.31"
@@ -1553,6 +1571,7 @@ version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
 dependencies = [
+ "futures 0.1.31",
  "futures-channel",
  "futures-core",
  "futures-io",
@@ -1563,6 +1582,7 @@ dependencies = [
  "pin-project-lite",
  "pin-utils",
  "slab",
+ "tokio-io",
 ]
 
 [[package]]
@@ -1608,9 +1628,9 @@ dependencies = [
 
 [[package]]
 name = "gimli"
-version = "0.29.0"
+version = "0.31.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
+checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
 
 [[package]]
 name = "gl_generator"
@@ -1730,7 +1750,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab"
 dependencies = [
  "atomic-waker",
- "bytes",
+ "bytes 1.9.0",
  "fnv",
  "futures-core",
  "futures-sink",
@@ -1816,7 +1836,7 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "fnv",
  "itoa",
 ]
@@ -1827,7 +1847,7 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "http",
 ]
 
@@ -1837,7 +1857,7 @@ version = "0.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "futures-util",
  "http",
  "http-body",
@@ -1862,7 +1882,7 @@ version = "1.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "futures-channel",
  "futures-util",
  "h2",
@@ -1900,7 +1920,7 @@ version = "0.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "http-body-util",
  "hyper",
  "hyper-util",
@@ -1916,7 +1936,7 @@ version = "0.1.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "futures-channel",
  "futures-util",
  "http",
@@ -1973,7 +1993,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0013a238275494641bf8f1732a23a808196540dc67b22ff97099c044ae4c8a1c"
 dependencies = [
  "bitflags 2.6.0",
- "bytes",
+ "bytes 1.9.0",
  "glam",
  "log",
  "num-traits",
@@ -1991,7 +2011,7 @@ version = "0.14.0-dev"
 source = "git+https://github.com/iced-rs/iced?rev=e722c4ee4f80833ba0b1013cadd546ebc3f490ce#e722c4ee4f80833ba0b1013cadd546ebc3f490ce"
 dependencies = [
  "bitflags 2.6.0",
- "bytes",
+ "bytes 1.9.0",
  "dark-light",
  "glam",
  "log",
@@ -2009,7 +2029,7 @@ version = "0.13.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0c04a6745ba2e80f32cf01e034fd00d853aa4f4cd8b91888099cb7aaee0d5d7c"
 dependencies = [
- "futures",
+ "futures 0.3.31",
  "iced_core 0.13.2",
  "log",
  "rustc-hash 2.0.0",
@@ -2022,7 +2042,7 @@ name = "iced_futures"
 version = "0.14.0-dev"
 source = "git+https://github.com/iced-rs/iced?rev=e722c4ee4f80833ba0b1013cadd546ebc3f490ce#e722c4ee4f80833ba0b1013cadd546ebc3f490ce"
 dependencies = [
- "futures",
+ "futures 0.3.31",
  "iced_core 0.14.0-dev",
  "log",
  "rustc-hash 2.0.0",
@@ -2069,7 +2089,7 @@ name = "iced_runtime"
 version = "0.14.0-dev"
 source = "git+https://github.com/iced-rs/iced?rev=e722c4ee4f80833ba0b1013cadd546ebc3f490ce#e722c4ee4f80833ba0b1013cadd546ebc3f490ce"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "iced_core 0.14.0-dev",
  "iced_futures 0.14.0-dev",
  "raw-window-handle",
@@ -2098,7 +2118,7 @@ source = "git+https://github.com/iced-rs/iced?rev=e722c4ee4f80833ba0b1013cadd546
 dependencies = [
  "bitflags 2.6.0",
  "bytemuck",
- "futures",
+ "futures 0.3.31",
  "glam",
  "glyphon",
  "guillotiere",
@@ -2328,6 +2348,15 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "iovec"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "ipnet"
 version = "2.9.0"
@@ -2449,9 +2478,9 @@ checksum = "03087c2bad5e1034e8cace5926dec053fb3790248370865f5117a7d0213354c8"
 
 [[package]]
 name = "libc"
-version = "0.2.155"
+version = "0.2.169"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
+checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
 
 [[package]]
 name = "libloading"
@@ -3761,7 +3790,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f"
 dependencies = [
  "base64 0.22.1",
- "bytes",
+ "bytes 1.9.0",
  "encoding_rs",
  "futures-core",
  "futures-util",
@@ -3835,7 +3864,7 @@ checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b"
 dependencies = [
  "bitvec",
  "bytecheck",
- "bytes",
+ "bytes 1.9.0",
  "hashbrown 0.12.3",
  "ptr_meta",
  "rend",
@@ -3880,7 +3909,7 @@ checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555"
 dependencies = [
  "arrayvec",
  "borsh",
- "bytes",
+ "bytes 1.9.0",
  "num-traits",
  "rand",
  "rkyv",
@@ -4392,7 +4421,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0275f9f2f07d47556fe60c2759da8bc4be6083b047b491b2d476aa0bfa558eb1"
 dependencies = [
  "bumpalo",
- "bytes",
+ "bytes 1.9.0",
  "cfg-if",
  "faststr",
  "itoa",
@@ -4740,7 +4769,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551"
 dependencies = [
  "backtrace",
- "bytes",
+ "bytes 1.9.0",
  "libc",
  "mio",
  "parking_lot 0.12.3",
@@ -4751,6 +4780,17 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "tokio-io"
+version = "0.1.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
+dependencies = [
+ "bytes 0.4.12",
+ "futures 0.1.31",
+ "log",
+]
+
 [[package]]
 name = "tokio-macros"
 version = "2.4.0"
@@ -4822,8 +4862,9 @@ version = "0.7.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
 dependencies = [
- "bytes",
+ "bytes 1.9.0",
  "futures-core",
+ "futures-io",
  "futures-sink",
  "pin-project-lite",
  "tokio",
@@ -4956,7 +4997,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
 dependencies = [
  "byteorder",
- "bytes",
+ "bytes 1.9.0",
  "data-encoding",
  "http",
  "httparse",
@@ -5213,7 +5254,7 @@ version = "0.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f"
 dependencies = [
- "futures",
+ "futures 0.3.31",
  "js-sys",
  "parking_lot 0.11.2",
  "pin-utils",

+ 3 - 1
Cargo.toml

@@ -15,7 +15,7 @@ base64 = "0.22.1"
 native-tls = "0.2.12"
 tungstenite = "0.21.0"
 futures = "0.3.31"
-futures-util = "0.3.31"
+futures-util = { version = "0.3.31", features = ["io-compat"] }
 serde_json = "1.0.132"
 serde = { version = "1.0", features = ["derive"] }
 reqwest = { version = "0.12.9", features = ["json"] }
@@ -47,6 +47,8 @@ rust_decimal = "1.36.0"
 uuid = { version = "1.11.0", features = ["v4"] }
 zip = "2.2.1"
 csv = "1.3.1"
+tokio-util = { version = "0.7", features = ["compat"] }
+backtrace = "0.3.74"
 
 [dependencies.async-tungstenite]
 version = "0.25"

+ 150 - 196
src/data_providers/china_futures.rs

@@ -1,8 +1,9 @@
-use tracing::warn;
-use crate::data_providers::deserialize_string_to_f32;
+use tracing::{info, warn};
+use crate::data_providers::{deserialize_string_to_f32, SpawnExecutor};
 use crate::data_providers::deserialize_string_to_i64;
 use std::collections::HashMap;
-
+use std::vec;
+use bytes::Bytes;
 use iced::{
     stream, 
     futures::{sink::SinkExt, Stream},
@@ -12,15 +13,17 @@ use regex::Regex;
 use serde_json::json;
 use serde_json::Value;
 
-use sonic_rs::{JsonValueTrait, Deserialize, Serialize};
+use sonic_rs::{Deserialize, JsonValueTrait, Serialize};
 use sonic_rs::to_object_iter_unchecked;
 
 use fastwebsockets::{Frame, FragmentCollector, OpCode};
+use http_body_util::Empty;
+use hyper::header::{CONNECTION, UPGRADE};
+use hyper::Request;
 use hyper::upgrade::Upgraded;
 use hyper_util::rt::TokioIo;
-
+use tokio::net::TcpStream;
 use crate::data_providers::{
-    setup_tcp_connection, setup_tls_connection, setup_websocket_connection, 
     Connection, Event, Kline, LocalDepthCache, MarketType, Order, State, 
     StreamError, TickerInfo, TickerStats, Trade, VecLocalDepthCache,
 };
@@ -33,11 +36,11 @@ use tracing::{error};
 
 #[derive(Serialize, Deserialize, Debug)]
 struct SonicDepth {
-    #[serde(rename = "u")]
+    #[serde(rename = "time")]
     pub update_id: u64,
-    #[serde(rename = "b")]
+    #[serde(rename = "bids")]
     pub bids: Vec<BidAsk>,
-    #[serde(rename = "a")]
+    #[serde(rename = "asks")]
     pub asks: Vec<BidAsk>,
 }
 
@@ -51,181 +54,128 @@ struct BidAsk {
 
 #[derive(Serialize, Deserialize, Debug)]
 struct SonicTrade {
-    #[serde(rename = "T")]
+    #[serde(rename = "time")]
     pub time: u64,
-    #[serde(rename = "p")]
+    #[serde(rename = "last_price")]
     pub price: String,
-    #[serde(rename = "v")]
+    #[serde(rename = "last_qty")]
     pub qty: String,
-    #[serde(rename = "S")]
-    pub is_sell: String,
+    #[serde(rename = "side")]
+    pub side: String,
 }
 
-// #[derive(Deserialize, Debug, Clone)]
-// pub struct SonicKline {
-//     #[serde(rename = "start")]
-//     pub time: u64,
-//     #[serde(rename = "open")]
-//     pub open: String,
-//     #[serde(rename = "high")]
-//     pub high: String,
-//     #[serde(rename = "low")]
-//     pub low: String,
-//     #[serde(rename = "close")]
-//     pub close: String,
-//     #[serde(rename = "volume")]
-//     pub volume: String,
-//     #[serde(rename = "interval")]
-//     pub interval: String,
-// }
-
 #[derive(Debug)]
 enum StreamData {
     Trade(Vec<SonicTrade>),
-    Depth(SonicDepth, String, i64),
+    Depth(SonicDepth, i64),
     // Kline(Ticker, Vec<SonicKline>),
 }
 
-#[derive(Debug)]
-enum StreamName {
-    Depth(Ticker),
-    Trade(Ticker),
-    Kline(Ticker),
-    Unknown,
-}
-impl StreamName {
-    fn from_topic(topic: &str, is_ticker: Option<Ticker>, market_type: MarketType) -> Self {
-        let parts: Vec<&str> = topic.split('.').collect();
-
-        if let Some(ticker_str) = parts.last() {
-            let ticker = is_ticker.unwrap_or_else(|| Ticker::new(ticker_str, market_type));
-
-            match parts.first() {
-                Some(&"publicTrade") => StreamName::Trade(ticker),
-                Some(&"orderbook") => StreamName::Depth(ticker),
-                Some(&"kline") => StreamName::Kline(ticker),
-                _ => StreamName::Unknown,
-            }
-        } else {
-            StreamName::Unknown
-        }
-    }
-}
-
-#[derive(Debug)]
-enum StreamWrapper {
-    Trade,
-    Depth,
-    Kline,
-}
-
 #[allow(unused_assignments)]
 fn feed_de(
-    slice: &[u8], 
-    ticker: Option<Ticker>, 
-    market_type: MarketType
-) -> Result<StreamData, StreamError> {
-    let mut stream_type: Option<StreamWrapper> = None;
-    let mut depth_wrap: Option<SonicDepth> = None;
-
-    let mut data_type = String::new();
-    // let mut topic_ticker = Ticker::default();
+    slice: &[u8]
+) -> Result<Vec<StreamData>, StreamError> {
+    // // 这里应该是做缓存,之前的bybit数据推送是增量的
+    // let mut depth_wrap: Option<SonicDepth> = None;
 
     let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(slice) };
+    let mut trade = SonicTrade {
+        time: 0,
+        price: "".to_string(),
+        qty: "".to_string(),
+        side: "".to_string(),
+    };
+    let mut depth = SonicDepth {
+        update_id: 0,
+        bids: vec![],
+        asks: vec![],
+    };
 
     for elem in iter {
         let (k, v) = elem.map_err(|e| StreamError::ParseError(e.to_string()))?;
-        
-        if k == "topic" {
-            if let Some(val) = v.as_str() {
-                let mut is_ticker = None;
-
-                if let Some(ticker) = ticker {
-                    is_ticker = Some(ticker);
-                }
+        // let v = &v.as_raw_faststr();
+        match k.as_str() {
+            "time" => {
+                let t: u64 = v.as_u64().unwrap();
 
-                match StreamName::from_topic(val, is_ticker, market_type) {
-                    StreamName::Depth(_ticker) => {
-                        stream_type = Some(StreamWrapper::Depth);
-
-                        // topic_ticker = ticker;
-                    }
-                    StreamName::Trade(_ticker) => {
-                        stream_type = Some(StreamWrapper::Trade);
+                trade.time = t;
+                depth.update_id = t;
+            }
+            "last_price" => {
+                trade.price = v.to_string();
+            }
+            "last_qty" => {
+                trade.qty = v.to_string();
+            }
+            "side" => {
+                trade.side = v.to_string();
+            }
+            "asks" => {
+                let asks = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
+                    .unwrap();
 
-                        // topic_ticker = ticker;
-                    }
-                    StreamName::Kline(_ticker) => {
-                        stream_type = Some(StreamWrapper::Kline);
+                for ask in asks.as_array().unwrap() {
+                    let order_book = BidAsk {
+                        price: ask.as_array().unwrap()[0].to_string(),
+                        qty: ask.as_array().unwrap()[1].to_string(),
+                    };
 
-                        // topic_ticker = ticker;
-                    }
-                    _ => {
-                        error!("Unknown stream name");
-                    }
+                    depth.asks.push(order_book);
                 }
             }
-        } else if k == "type" {
-            v.as_str().unwrap().clone_into(&mut data_type);
-        } else if k == "data" {
-            match stream_type {
-                Some(StreamWrapper::Trade) => {
-                    let trade_wrap: Vec<SonicTrade> = sonic_rs::from_str(&v.as_raw_faststr())
-                        .map_err(|e| StreamError::ParseError(e.to_string()))?;
-
-                    return Ok(StreamData::Trade(trade_wrap));
-                }
-                Some(StreamWrapper::Depth) => {
-                    if depth_wrap.is_none() {
-                        depth_wrap = Some(SonicDepth {
-                            update_id: 0,
-                            bids: Vec::new(),
-                            asks: Vec::new(),
-                        });
-                    }
-                    depth_wrap = Some(
-                        sonic_rs::from_str(&v.as_raw_faststr())
-                            .map_err(|e| StreamError::ParseError(e.to_string()))?,
-                    );
-                }
-                // Some(StreamWrapper::Kline) => {
-                //     let kline_wrap: Vec<SonicKline> = sonic_rs::from_str(&v.as_raw_faststr())
-                //         .map_err(|e| StreamError::ParseError(e.to_string()))?;
-                //
-                //     return Ok(StreamData::Kline(topic_ticker, kline_wrap));
-                // }
-                _ => {
-                    error!("Unknown stream type");
-                }
-            }
-        } else if k == "cts" {
-            if let Some(dw) = depth_wrap {
-                let time: u64 = v
-                    .as_u64()
-                    .ok_or_else(|| StreamError::ParseError("Failed to parse u64".to_string()))?;
+            "bids" => {
+                let bids = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
+                    .unwrap();
+
+                for bid in bids.as_array().unwrap() {
+                    let order_book = BidAsk {
+                        price: bid.as_array().unwrap()[0].to_string(),
+                        qty: bid.as_array().unwrap()[1].to_string(),
+                    };
 
-                return Ok(StreamData::Depth(dw, data_type.to_string(), time as i64));
+                    depth.bids.push(order_book);
+                }
             }
+            &_ => {}
         }
     }
 
-    Err(StreamError::UnknownError("Unknown data".to_string()))
+    let update_id = depth.update_id;
+    Ok(vec![StreamData::Trade(vec![trade]), StreamData::Depth(depth, update_id as i64)])
 }
 
-async fn connect(
-    domain: &str,
-) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
-    let tcp_stream = setup_tcp_connection(domain).await?;
-    let tls_stream = setup_tls_connection(domain, tcp_stream).await?;
-    let url = "ws://127.0.0.1:6789";
-    setup_websocket_connection(domain, tls_stream, &url).await
+async fn connect() -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
+    let url = "ws://localhost:6789";
+    let addr = "localhost:6789";
+    let stream = TcpStream::connect(&addr).await
+        .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
+
+    // 2. 构建 WebSocket 握手请求
+    let req = Request::builder()
+        .method("GET")
+        .uri(url)
+        .header("Host", "localhost")
+        .header(UPGRADE, "websocket")
+        .header(CONNECTION, "upgrade")
+        .header("Sec-WebSocket-Key", fastwebsockets::handshake::generate_key())
+        .header("Sec-WebSocket-Version", "13")
+        .header("Sec-WebSocket-Protocol", "rust-websocket") // 可选协议
+        .header("User-Agent", "rust-client/1.0") // 添加 UA 头
+        .body(Empty::<Bytes>::new())
+        .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
+
+    let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
+        .await
+        .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
+
+    Ok(FragmentCollector::new(ws))
 }
 
 async fn try_connect(
     streams: &Value,
     output: &mut futures::channel::mpsc::Sender<Event>,
 ) -> State {
-    match connect("127.0.0.1:6789").await {
+    match connect().await {
         Ok(mut websocket) => {
             if let Err(e) = websocket
                 .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
@@ -288,61 +238,65 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                 State::Connected(websocket) => match websocket.read_frame().await {
                     Ok(msg) => match msg.opcode {
                         OpCode::Text => {
-                            if let Ok(data) = feed_de(&msg.payload[..], Some(ticker), market_type) {
-                                match data {
-                                    StreamData::Trade(de_trade_vec) => {
-                                        for de_trade in &de_trade_vec {
-                                            let trade = Trade {
-                                                time: de_trade.time as i64,
-                                                is_sell: de_trade.is_sell == "Sell",
-                                                price: str_f32_parse(&de_trade.price),
-                                                qty: str_f32_parse(&de_trade.qty),
-                                            };
-
-                                            trades_buffer.push(trade);
+                            let result = feed_de(&msg.payload[..]);
+                            match result {
+                                Ok(data_vec) => {
+                                    let trade_handle_rst = &data_vec[0];
+                                    let depth_handle_rst = &data_vec[1];
+
+                                    match trade_handle_rst {
+                                        StreamData::Trade(de_trade_vec) => {
+                                            for de_trade in de_trade_vec {
+                                                let trade = Trade {
+                                                    time: de_trade.time as i64,
+                                                    is_sell: de_trade.side == "sell",
+                                                    price: str_f32_parse(&de_trade.price),
+                                                    qty: str_f32_parse(&de_trade.qty),
+                                                };
+
+                                                trades_buffer.push(trade);
+                                            }
                                         }
+                                        StreamData::Depth(_, _) => {}
                                     }
-                                    StreamData::Depth(de_depth, data_type, time) => {
-                                        let depth_update = VecLocalDepthCache {
-                                            last_update_id: de_depth.update_id as i64,
-                                            time,
-                                            bids: de_depth
-                                                .bids
-                                                .iter()
-                                                .map(|x| Order {
-                                                    price: str_f32_parse(&x.price),
-                                                    qty: str_f32_parse(&x.qty),
-                                                })
-                                                .collect(),
-                                            asks: de_depth
-                                                .asks
-                                                .iter()
-                                                .map(|x| Order {
-                                                    price: str_f32_parse(&x.price),
-                                                    qty: str_f32_parse(&x.qty),
-                                                })
-                                                .collect(),
-                                        };
-
-                                        if (data_type == "snapshot")
-                                            || (depth_update.last_update_id == 1)
-                                        {
+
+                                    match depth_handle_rst {
+                                        StreamData::Depth(de_depth, time) => {
+                                            let t = time.clone();
+
+                                            let depth_update = VecLocalDepthCache {
+                                                last_update_id: de_depth.update_id as i64,
+                                                time: t,
+                                                bids: de_depth
+                                                    .bids
+                                                    .iter()
+                                                    .map(|x| Order {
+                                                        price: str_f32_parse(&x.price),
+                                                        qty: str_f32_parse(&x.qty),
+                                                    })
+                                                    .collect(),
+                                                asks: de_depth
+                                                    .asks
+                                                    .iter()
+                                                    .map(|x| Order {
+                                                        price: str_f32_parse(&x.price),
+                                                        qty: str_f32_parse(&x.qty),
+                                                    })
+                                                    .collect(),
+                                            };
+
                                             orderbook.fetched(&depth_update);
-                                        } else if data_type == "delta" {
-                                            orderbook.update_depth_cache(&depth_update);
-
-                                            let _ = output
-                                                .send(Event::DepthReceived(
-                                                    ticker,
-                                                    time,
-                                                    orderbook.get_depth(),
-                                                    std::mem::take(&mut trades_buffer).into_boxed_slice(),
-                                                ))
-                                                .await;
                                         }
+
+                                        StreamData::Trade(_) => {}
                                     }
                                 }
+                                Err(e) => {
+                                    // 处理错误
+                                    error!("处理数据失败: {}", e);
+                                }
                             }
+
                         }
                         OpCode::Close => {
                             state = State::Disconnected;

+ 17 - 1
src/main.rs

@@ -31,11 +31,27 @@ use iced::{
 use iced_futures::MaybeSend;
 use futures::{StreamExt, TryFutureExt};
 use std::{collections::HashMap, vec, future::Future};
-use tracing::{error, info};
+use backtrace::Backtrace;
+use tracing::{error, info, warn};
 
 fn main() {
     let _guard = logger::final_init("info");
 
+    std::panic::set_hook(Box::new(move |panic_info| {
+        let msg = format!(
+            "type=panic, msg={:?}, location={:?}",
+            panic_info.to_string(),
+            panic_info.location()
+        );
+
+        // 生成并格式化完整的堆栈跟踪
+        let backtrace = Backtrace::new();
+        let stack_trace = format!("{:?}", backtrace);
+
+        // 一并打印堆栈跟踪
+        warn!("{}\nStack Trace:\n{}", msg, stack_trace);
+    }));
+
     info!("----------------------------面板已启动----------------------");
 
     std::thread::spawn(|| {