소스 검색

架构整理。

skyfffire 3 주 전
부모
커밋
f99160ff3a
9개의 변경된 파일83개의 추가작업 그리고 191개의 파일을 삭제
  1. 0 3
      Cargo.toml
  2. 6 0
      src/exchange/extended_account.rs
  3. 60 143
      src/exchange/extended_stream_client.rs
  4. 2 4
      src/exchange/mod.rs
  5. 0 0
      src/utils/error.rs
  6. 4 2
      src/utils/mod.rs
  7. 0 0
      src/utils/proxy.rs
  8. 1 1
      src/utils/response.rs
  9. 10 38
      src/utils/stream_utils.rs

+ 0 - 3
Cargo.toml

@@ -19,9 +19,6 @@ base64 = "0.13"
 
 futures-channel = "0.3.28"
 
-# 解压缩
-flate2 = "1.0"
-
 # WebSocket 客户端,基于 tokio 构建,用于订阅 K 线和深度
 tokio-tungstenite= { git = "https://github.com/skyfffire/tokio-tungstenite-proxy.git" }
 

+ 6 - 0
src/exchange/extended_account.rs

@@ -0,0 +1,6 @@
+#[derive(Clone, Debug)]
+pub struct ExtendedAccount {
+    // pub access_key: String,
+    // pub secret_key: String,
+    // pub pass_key: String,
+}

+ 60 - 143
src/exchange/extended_ws.rs → src/exchange/extended_stream_client.rs

@@ -6,144 +6,46 @@ use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use serde_json::json;
 use serde_json::Value;
 use tokio::sync::Mutex;
-use tokio_tungstenite::tungstenite::{Message};
-use tracing::{error, info, trace, warn};
+use tokio_tungstenite::tungstenite::{http, Message};
+use tracing::{error, trace, warn};
 use anyhow::Result;
-
-use crate::exchange::response_base::Response;
-use crate::exchange::socket_tool::{AbstractWsMode, HeartbeatType};
-
-//类型
-pub enum ExtendedWsType {
-    PublicAndPrivate,
-}
-
-//订阅频道
-#[derive(Clone)]
-pub enum ExtendedWsSubscribeType {
-    // 深度
-    PuFuturesDepth,
-    // K线数据,Min -> 分钟; Hour -> 小时; Day -> 天; Week -> 周, M -> 月
-    // Min1
-    // Min5
-    // Min15
-    // Min30
-    // Min60
-    // Hour4
-    // Hour8
-    // Day1
-    // Week1
-    // Month1
-    PuFuturesRecords(String),
-}
-
-//账号信息
-#[derive(Clone, Debug)]
-pub struct ExtendedAccount {
-    // pub access_key: String,
-    // pub secret_key: String,
-    // pub pass_key: String,
-}
+use tokio_tungstenite::tungstenite::handshake::client::{generate_key, Request};
+use crate::exchange::extended_account::ExtendedAccount;
+use crate::utils::response::Response;
+use crate::utils::stream_utils::{StreamUtils, HeartbeatType};
 
 #[derive(Clone)]
 #[allow(dead_code)]
-pub struct ExtendedWs {
-    //类型
+pub struct ExtendedStreamClient {
+    // 标签
     tag: String,
-    //地址
+    // 地址
     address_url: String,
-    //账号
-    login_param: Option<ExtendedAccount>,
-    //币对
-    symbol_s: Vec<String>,
-    //订阅
-    subscribe_types: Vec<ExtendedWsSubscribeType>,
-    //心跳间隔
+    // 账号
+    account_option: Option<ExtendedAccount>,
+    // 心跳间隔
     heartbeat_time: u64,
 }
 
-impl ExtendedWs {
+impl ExtendedStreamClient {
     // ============================================= 构造函数 ================================================
-    pub fn new(tag: String, login_param: Option<ExtendedAccount>, ws_type: ExtendedWsType) -> ExtendedWs {
-        /*******公共频道-私有频道数据组装*/
-        let address_url = match ws_type {
-            ExtendedWsType::PublicAndPrivate => {
-                let url = "wss://api.starknet.extended.exchange/stream.extended.exchange/v1/orderbooks/BTC-USD".to_string();
-                // let url = "wss://api.starknet.sepolia.extended.exchange/stream.extended.exchange/v1/orderbooks/BTC-USD".to_string();
-                url
-            }
-        };
+    fn new(tag: String, account_option: Option<ExtendedAccount>, subscribe_pattern: String) -> ExtendedStreamClient {
+        let host = "wss://api.starknet.extended.exchange/stream.extended.exchange/v1/".to_string();  // mainnet
+        // let host = "wss://api.starknet.sepolia.extended.exchange/stream.extended.exchange/v1/".to_string();  // testnet
+        
+        let address_url = format!("{}{}", host, subscribe_pattern);
 
-        ExtendedWs {
+        ExtendedStreamClient {
             tag,
             address_url,
-            login_param,
-            symbol_s: vec![],
-            subscribe_types: vec![],
+            account_option,
             heartbeat_time: 1000 * 10,
         }
     }
 
     // ============================================= 订阅函数 ================================================
-    // 手动添加订阅信息
-    pub fn set_subscribe(&mut self, subscribe_types: Vec<ExtendedWsSubscribeType>) {
-        self.subscribe_types.extend(subscribe_types);
-    }
-    // 手动添加币对
-    pub fn set_symbols(&mut self, mut symbol_array: Vec<String>) {
-        for symbol in symbol_array.iter_mut() {
-            // 大写
-            *symbol = symbol.to_uppercase();
-            // 字符串替换
-            *symbol = symbol.replace("_", "");
-        }
-        self.symbol_s = symbol_array;
-    }
-    fn contains_pr(&self) -> bool {
-        for t in self.subscribe_types.clone() {
-            if match t {
-                ExtendedWsSubscribeType::PuFuturesRecords(_) => false,
-                ExtendedWsSubscribeType::PuFuturesDepth => false,
-            } {
-                return true;
-            }
-        }
-        false
-    }
-
-    // 订阅枚举解析
-    pub fn enum_to_string(symbol: String, subscribe_type: ExtendedWsSubscribeType) -> Value {
-        match subscribe_type {
-            // 深度
-            ExtendedWsSubscribeType::PuFuturesDepth => {
-                json!({
-                    "method": "SUBSCRIPTION",
-                    "params": [
-                        format!("spot@public.aggre.depth.v3.api.pb@10ms@{symbol}")
-                    ]
-                })
-            }
-            // k线
-            ExtendedWsSubscribeType::PuFuturesRecords(interval) => {
-                json!({
-                    "method": "SUBSCRIPTION",
-                    "params": [
-                        format!("spot@public.kline.v3.api.pb@{symbol}@{interval}")
-                    ]
-                })
-            }
-        }
-    }
-    // 订阅信息生成
-    pub fn get_subscription(&self) -> Vec<String> {
-        let mut array = vec![];
-        for symbol in &self.symbol_s {
-            for subscribe_type in &self.subscribe_types {
-                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
-                array.push(ty_str.to_string());
-            }
-        }
-        array
+    pub fn order_books(tag: String, account_option: Option<ExtendedAccount>, symbol: String) -> ExtendedStreamClient {
+        Self::new(tag, account_option, format!("orderbooks/{}", symbol))
     }
 
     // 链接
@@ -156,8 +58,6 @@ impl ExtendedWs {
             F: Fn(Response) -> Future + Clone + Send + 'static + Sync,
             Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
     {
-        let login_is = self.contains_pr();
-        let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let tag = self.tag.clone();
 
@@ -166,32 +66,57 @@ impl ExtendedWs {
         let heartbeat_time = self.heartbeat_time.clone();
         tokio::spawn(async move {
             let ping_obj = json!({"method":"PING"});
-            AbstractWsMode::ping_pong(write_tx_clone1, HeartbeatType::Custom(ping_obj.to_string()), heartbeat_time).await;
+            StreamUtils::ping_pong(write_tx_clone1, HeartbeatType::Custom(ping_obj.to_string()), heartbeat_time).await;
         });
 
-
-        // 设置订阅
-        let subscribe_array = subscription.clone();
-        if login_is {
-            //登录相关
+        if self.account_option.is_some() {
+            // 登录相关
         }
 
+        // 提取host
+        let parsed_uri: http::Uri = address_url.parse()?;
+        let host_domain = parsed_uri.host().ok_or("URI 缺少主机名").unwrap().to_string();
+        let host_header_value = if let Some(port) = parsed_uri.port_u16() {
+            // 如果端口不是默认的 80 (for ws) 或 443 (for wss),则需要包含端口
+            // 这里只是简单地判断,更严谨的判断可以根据 scheme 来
+            match parsed_uri.scheme_str() {
+                Some("ws") if port == 80 => host_domain.to_string(),
+                Some("wss") if port == 443 => host_domain.to_string(),
+                _ => format!("{}:{}", host_domain, port), // 否则包含端口
+            }
+        } else {
+            host_domain.to_string() // 没有端口或使用默认端口
+        };
+
         // 链接
         let t2 = tokio::spawn(async move {
             let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
 
             loop {
+                // 通过构建request的方式进行ws链接,可以携带header
+                let request = Request::builder()
+                    .method("GET")
+                    .uri(&address_url)
+                    .header("Sec-WebSocket-Key", generate_key())
+                    .header("Sec-WebSocket-Version", "13")
+                    .header("Host", host_header_value.clone())
+                    .header("User-Agent", "RustClient/1.0")
+                    .header("Upgrade", "websocket")
+                    .header("Connection", "Upgrade")
+                    .body(())
+                    .unwrap();
+
                 trace!("Extended_usdt_swap socket 连接中……");
-                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
-                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
+                StreamUtils::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), request,
+                                              false, tag.clone(), vec![], write_to_socket_rx_arc.clone(),
+                                              Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
 
                 warn!("Extended_usdt_swap socket 断连,1s以后重连……");
                 tokio::time::sleep(Duration::from_secs(1)).await;
             }
         });
 
-        tokio::try_join!(t2).unwrap();
+        tokio::try_join!(t2)?;
         trace!("线程-心跳与链接-结束");
 
         Ok(())
@@ -361,8 +286,8 @@ mod tests {
     use tokio::sync::Mutex;
     use tokio_tungstenite::tungstenite::Message;
     use tracing::info;
-    use crate::exchange::extended_ws::{ExtendedWs, ExtendedWsSubscribeType, ExtendedWsType};
-    use crate::exchange::response_base::Response;
+    use crate::exchange::extended_stream_client::{ExtendedStreamClient};
+    use crate::utils::response::Response;
     use crate::utils::log_setup::setup_logging;
 
     #[tokio::test]
@@ -371,15 +296,7 @@ mod tests {
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
         let _guard = setup_logging().unwrap();
 
-        let mut ws = ExtendedWs::new("Extended".to_string(), None, ExtendedWsType::PublicAndPrivate);
-
-        // ws.set_subscribe(vec![
-        //     ExtendedWsSubscribeType::PuFuturesRecords("Min1".to_string()),
-        //     ExtendedWsSubscribeType::PuFuturesRecords("Min3".to_string()),
-        //     ExtendedWsSubscribeType::PuFuturesDepth
-        // ]);
-
-        // ws.set_symbols(vec!["BTC_USDT".to_string()]);
+        let mut ws = ExtendedStreamClient::order_books("Extended".to_string(), None, "BTC-USD".to_string());
 
         let fun = move |response: Response| {
             info!("{}", serde_json::to_string_pretty(&response.data).unwrap());

+ 2 - 4
src/exchange/mod.rs

@@ -1,4 +1,2 @@
-mod proxy;
-mod socket_tool;
-mod response_base;
-mod extended_ws;
+mod extended_stream_client;
+mod extended_account;

+ 0 - 0
src/utils/error.rs


+ 4 - 2
src/utils/mod.rs

@@ -1,3 +1,5 @@
-mod error;
 pub mod log_setup;
-pub mod rest_utils;
+pub mod rest_utils;
+pub mod stream_utils;
+pub(crate) mod response;
+pub(crate) mod proxy;

+ 0 - 0
src/exchange/proxy.rs → src/utils/proxy.rs


+ 1 - 1
src/exchange/response_base.rs → src/utils/response.rs

@@ -11,7 +11,7 @@ pub struct Response {
     pub data: Value,
     pub ins: Instant,           // 数据接收的ins
     pub time: i64,              // 数据接受的时间
-    pub reach_time: i64,        // 远程数据时间 弃用
+    pub reach_time: i64,        // 远程数据时间
     pub data_type: String       // 數據類型, 例如 bybit 深度信息:snapshot(全量),delta(增量)
 }
 

+ 10 - 38
src/exchange/socket_tool.rs → src/utils/stream_utils.rs

@@ -10,14 +10,13 @@ use tokio::net::TcpStream;
 use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
-use tokio_tungstenite::tungstenite::{http, Error, Message};
+use tokio_tungstenite::tungstenite::{Error, Message};
 use tokio_tungstenite::tungstenite::handshake::client::Request;
-use tokio_tungstenite::tungstenite::handshake::client::generate_key;
 use tracing::{error, info, trace, warn};
 
-use crate::exchange::proxy;
-use crate::exchange::proxy::{ProxyEnum, ProxyResponseEnum};
-use crate::exchange::response_base::Response;
+use crate::utils::proxy;
+use crate::utils::proxy::{ProxyEnum, ProxyResponseEnum};
+use crate::utils::response::Response;
 
 #[derive(Debug)]
 pub enum HeartbeatType {
@@ -26,9 +25,9 @@ pub enum HeartbeatType {
     Custom(String),
 }
 
-pub struct AbstractWsMode {}
+pub struct StreamUtils {}
 
-impl AbstractWsMode {
+impl StreamUtils {
     pub async fn ws_connected<T, PI, PO, F, B, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
                                                        is_first_login: bool,
                                                        label: String,
@@ -78,7 +77,7 @@ impl AbstractWsMode {
                     continue;
                 }
 
-                let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong, message_binary);
+                let response_data = StreamUtils::analysis_message(message, message_text, message_ping, message_pong, message_binary);
                 // let response_data = func(message);
                 if response_data.is_some() {
                     let mut data = response_data.unwrap();
@@ -168,7 +167,7 @@ impl AbstractWsMode {
     //创建链接
     pub async fn ws_connect_async<T, PI, PO, F, B, Future>(is_shutdown_arc: Arc<AtomicBool>,
                                                            handle_function: F,
-                                                           address_url: String,
+                                                           request: Request,
                                                            is_first_login: bool,
                                                            label: String,
                                                            subscribe_array: Vec<String>,
@@ -184,34 +183,6 @@ impl AbstractWsMode {
               F: Fn(Response) -> Future + Clone,
               Future: future::Future<Output=()> + Send + 'static,
     {
-        // 提取host
-        let parsed_uri: http::Uri = address_url.parse().unwrap();
-        let host_domain = parsed_uri.host().ok_or("URI 缺少主机名").unwrap().to_string();
-        let host_header_value = if let Some(port) = parsed_uri.port_u16() {
-            // 如果端口不是默认的 80 (for ws) 或 443 (for wss),则需要包含端口
-            // 这里只是简单地判断,更严谨的判断可以根据 scheme 来
-            match parsed_uri.scheme_str() {
-                Some("ws") if port == 80 => host_domain.to_string(),
-                Some("wss") if port == 443 => host_domain.to_string(),
-                _ => format!("{}:{}", host_domain, port), // 否则包含端口
-            }
-        } else {
-            host_domain.to_string() // 没有端口或使用默认端口
-        };
-
-        // 通过构建request的方式进行ws链接,可以携带header
-        let request = Request::builder()
-            .method("GET")
-            .uri(&address_url)
-            .header("Sec-WebSocket-Key", generate_key())
-            .header("Sec-WebSocket-Version", "13")
-            .header("Host", host_header_value)
-            .header("User-Agent", "RustClient/1.0")
-            .header("Upgrade", "websocket")
-            .header("Connection", "Upgrade")
-            .body(())
-            .unwrap();
-
         // 判断是否通过代理访问
         let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
             ProxyResponseEnum::NO => {
@@ -224,9 +195,10 @@ impl AbstractWsMode {
             }
         };
 
+        let uri = request.uri().to_string();
         match connect_async(request, proxy).await {
             Ok((ws_stream, _)) => {
-                trace!("socket 链接成功,{}。", address_url);
+                trace!("socket 链接成功,{}。", uri);
 
                 Self::ws_connected(write_to_socket_rx_arc,
                                    is_first_login,