Jelajahi Sumber

所有pair的订阅部分已完成。等待测试。

skyffire 6 bulan lalu
induk
melakukan
184b8be672

+ 70 - 35
src/data_manager.rs

@@ -1,3 +1,38 @@
+use std::collections::HashMap;
+use rust_decimal::Decimal;
+use serde_json::Value;
+use anyhow::Result;
+
+pub struct DataManager {
+    pub exchange_info_map: HashMap<String, Value>,
+    pub klines_map: HashMap<String, Vec<Value>>,
+    pub asks_map: HashMap<String, HashMap<Decimal, Decimal>>,
+    pub bids_map: HashMap<String, HashMap<Decimal, Decimal>>,
+}
+
+impl DataManager {
+    pub fn new(exchange_info_map: HashMap<String, Value>) -> Self {
+        let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
+        let asks_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
+        let bids_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
+        
+        DataManager { 
+            exchange_info_map,
+            klines_map,
+            asks_map,
+            bids_map,
+        }
+    }
+    
+    pub async fn process_klines_map(symbol: String, depth: Value) -> Result<()> {
+        Ok(())
+    }
+    
+    pub async fn process_depth_data(symbol: String, depth: Value) -> Result<()> {
+        Ok(())
+    }
+}
+
 // use std::collections::BTreeMap;
 // use std::sync::Arc;
 // use std::sync::atomic::{AtomicBool};
@@ -15,7 +50,7 @@
 // use crate::core::Core;
 // use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
 // use crate::model::OrderInfo;
-// 
+//
 // // 参考 Bybit 合约 启动
 // pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 //                                              core_arc: Arc<Mutex<Core>>,
@@ -34,7 +69,7 @@
 //             // BybitSwapSubscribeType::PuKline("1".to_string()),
 //             // BybitSwapSubscribeType::PuTickers
 //         ]);
-// 
+//
 //         // 读取数据
 //         let core_arc_clone = Arc::clone(&core_arc);
 //         let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
@@ -42,21 +77,21 @@
 //         let mut records = rest.get_record("1".to_string()).await.unwrap();
 //         for record in records.iter_mut() {
 //             let core_arc_clone = core_arc.clone();
-// 
+//
 //             on_record(core_arc_clone, record).await
 //         }
-// 
+//
 //         let depth_asks = Arc::new(Mutex::new(Vec::new()));
 //         let depth_bids = Arc::new(Mutex::new(Vec::new()));
-// 
+//
 //         let fun = move |data: ResponseData| {
 //             // 在 async 块之前克隆 Arc
 //             let core_arc_cc = core_arc_clone.clone();
 //             let mul = multiplier.clone();
-// 
+//
 //             let depth_asks = Arc::clone(&depth_asks);
 //             let depth_bids = Arc::clone(&depth_bids);
-// 
+//
 //             async move {
 //                 let mut depth_asks = depth_asks.lock().await;
 //                 let mut depth_bids = depth_bids.lock().await;
@@ -64,14 +99,14 @@
 //                 on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids, ref_index).await
 //             }
 //         };
-// 
+//
 //         // 链接
 //         let write_tx_am = Arc::new(Mutex::new(write_tx));
 //         ws.set_symbols(symbols);
 //         ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
 //     });
 // }
-// 
+//
 // // 交易 bybit 合约 启动
 // pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 //                                    core_arc: Arc<Mutex<Core>>,
@@ -81,7 +116,7 @@
 //                                    exchange_params: BTreeMap<String, String>) {
 //     // 参考
 //     reference_bybit_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await;
-// 
+//
 //     // 交易
 //     spawn(async move {
 //         // 交易交易所需要启动私有ws
@@ -93,39 +128,39 @@
 //             BybitSwapSubscribeType::PrOrder,
 //             BybitSwapSubscribeType::PrWallet
 //         ]);
-// 
+//
 //         let core_arc_clone_private = core_arc.clone();
 //         let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
 //         let run_symbol = symbols.clone()[0].clone();
-// 
+//
 //         // 挂起私有ws
 //         let fun = move |data: ResponseData| {
 //             // 在 async 块之前克隆 Arc
 //             let core_arc_cc = core_arc_clone_private.clone();
 //             let mul = multiplier.clone();
 //             let rs = run_symbol.clone();
-// 
+//
 //             async move {
 //                 // 使用克隆后的 Arc,避免 move 语义
 //                 on_private_data(core_arc_cc, &mul, &rs, &data).await;
 //             }
 //         };
-// 
+//
 //         // 链接
 //         let write_tx_am = Arc::new(Mutex::new(write_tx));
 //         ws.set_symbols(symbols);
 //         ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
 //     });
 // }
-// 
+//
 // async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>, ref_index: usize) {
 //     let mut trace_stack = TraceStack::new(response.time, response.ins);
 //     trace_stack.on_after_span_line();
-// 
+//
 //     match response.channel.as_str() {
 //         "orderbook" => {
 //             trace_stack.set_source("bybit_usdt_swap.bookTicker".to_string());
-// 
+//
 //             let mut is_update = false;
 //             if response.data_type == "delta"  {
 //                 is_update = true;
@@ -137,19 +172,19 @@
 //                     depth_asks.clear();
 //                     depth_asks.append(&mut depth.asks);
 //                 }
-// 
+//
 //                 if depth.bids.len() != 0 {
 //                     depth_bids.clear();
 //                     depth_bids.append(&mut depth.bids);
 //                 }
-// 
+//
 //                 let result_depth = Depth {
 //                     time: depth.time,
 //                     symbol: depth.symbol,
 //                     asks: depth_asks.clone(),
 //                     bids: depth_bids.clone(),
 //                 };
-// 
+//
 //                 trace_stack.on_after_format();
 //                 on_depth(core_arc.clone(), &response.label, &mut trace_stack, &result_depth, ref_index).await;
 //                 // on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 1).await;
@@ -159,7 +194,7 @@
 //                 trace_stack.on_after_format();
 //                 on_depth(core_arc.clone(), &response.label, &mut trace_stack, &depth, ref_index).await;
 //                 // on_depth(core_arc, &response.label, &mut trace_stack, &depth, 1).await;
-// 
+//
 //                 depth_asks.clear();
 //                 depth_asks.append(&mut depth.asks);
 //                 depth_bids.clear();
@@ -168,10 +203,10 @@
 //         }
 //         "trade" => {
 //             trace_stack.set_source("bybit_usdt_swap.trade".to_string());
-// 
+//
 //             let mut trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul);
 //             trace_stack.on_after_format();
-// 
+//
 //             for trade in trades.iter_mut() {
 //                 on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, ref_index).await;
 //                 // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 1).await;
@@ -181,20 +216,20 @@
 //             trace_stack.set_source("bybit_usdt_swap.tickers".to_string());
 //             let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
 //             trace_stack.on_after_format();
-// 
+//
 //             on_ticker(core_arc, &mut trace_stack, &ticker).await;
 //         },
 //         // k线数据
 //         "kline" => {
 //             let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response);
-// 
+//
 //             if records.is_empty() {
 //                 return;
 //             }
-// 
+//
 //             for record in records.iter_mut() {
 //                 let core_arc_clone = core_arc.clone();
-// 
+//
 //                 on_record(core_arc_clone, record).await
 //             }
 //         },
@@ -204,11 +239,11 @@
 //         }
 //     }
 // }
-// 
+//
 // async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
 //     let mut trace_stack = TraceStack::new(response.time, response.ins);
 //     trace_stack.on_after_span_line();
-// 
+//
 //     match response.channel.as_str() {
 //         "wallet" => {
 //             let account = ExchangeStructHandler::account_info_handle(BybitSwap, response, run_symbol);
@@ -218,23 +253,23 @@
 //         "order" => {
 //             let orders = ExchangeStructHandler::order_handle(BybitSwap, response, ct_val);
 //             trace_stack.on_after_format();
-// 
+//
 //             let mut order_infos:Vec<OrderInfo> = Vec::new();
 //             for mut order in orders.order {
 //                 if order.status == "NULL" {
 //                     error!("bybit_usdt_swap 未识别的订单状态:{:?}", response);
-// 
+//
 //                     continue;
 //                 }
-// 
+//
 //                 // if order.deal_amount != Decimal::ZERO {
 //                 //     info!("bybit order 消息原文:{:?}", response);
 //                 // }
-// 
+//
 //                 let order_info = OrderInfo::parse_order_to_order_info(&mut order);
 //                 order_infos.push(order_info);
 //             }
-// 
+//
 //             let mut core = core_arc_clone.lock().await;
 //             core.update_order(order_infos, trace_stack).await;
 //         }
@@ -249,7 +284,7 @@
 //         }
 //     }
 // }
-// 
+//
 // fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
 //     BybitSwapLogin {
 //         api_key: exchange_params.get("access_key").unwrap().clone(),

+ 1 - 1
src/exchange/mexc_spot_client.rs

@@ -5,7 +5,7 @@ use reqwest::{Client};
 use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
-use tracing::{error, info, trace};
+use tracing::{error, trace};
 use ring::hmac;
 use serde_json::Value;
 use crate::exchange::response_base::Response;

+ 5 - 15
src/exchange/mexc_spot_ws.rs

@@ -1,19 +1,18 @@
-use std::fmt::format;
 use std::fs::File;
-use std::io::{Read, Write};
+use std::io::{ Write};
 use std::path::Path;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 
-use flate2::read::GzDecoder;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use prost::Message as ProstMessage;
 use serde_json::json;
 use serde_json::Value;
 use tokio::sync::Mutex;
-use tokio_tungstenite::tungstenite::{Error, Message};
+use tokio_tungstenite::tungstenite::{Message};
 use tracing::{error, info, trace};
+use anyhow::Result;
 
 use crate::exchange::response_base::Response;
 use crate::exchange::socket_tool::AbstractWsMode;
@@ -27,12 +26,9 @@ pub mod mexc_spot {
     include!(concat!(env!("OUT_DIR"), "/_.rs"));
 }
 
-use mexc_spot::PublicSpotKlineV3ApiMessage;
-use mexc_spot::KlineDataV3;
 // 引入新的结构体
+use mexc_spot::PublicSpotKlineV3ApiMessage;
 use mexc_spot::PublicIncreaseDepthsV3ApiMessage;
-use mexc_spot::DepthDataContentV3;
-use mexc_spot::DepthItemV3;
 
 #[derive(Debug)]
 #[derive(Clone)]
@@ -188,7 +184,7 @@ impl MexcSpotWs {
                                              is_shutdown_arc: Arc<AtomicBool>,
                                              handle_function: F,
                                              _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
-                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<()>
         where
             F: Fn(Response) -> Future + Clone + Send + 'static + Sync,
             Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
@@ -309,8 +305,6 @@ impl MexcSpotWs {
                 }
                 Err(e) => {
                     error!("尝试解析为 PublicSpotKlineV3ApiMessage 失败: {:?}", e);
-
-                    return Some(Response::new("".to_string(), 500, format!("Protobuf K 线顶层消息解析出错: {:?}", e), Value::Null));
                 }
             }
         }
@@ -365,10 +359,6 @@ impl MexcSpotWs {
                 }
                 Err(e) => {
                     error!("解析深度消息 PublicIncreaseDepthsV3ApiMessage 失败: {:?}", e);
-                    // 保存数据以供分析
-                    let file_path = Path::new("depth_error_data.bin");
-                    // ... 保存 po 到文件 ...
-                    return Some(Response::new("".to_string(), 500, format!("Protobuf 深度消息解析出错: {:?}", e), Value::Null));
                 }
             }
         }

+ 0 - 1
src/exchange/mod.rs

@@ -1,7 +1,6 @@
 pub mod response_base;
 
 mod types;
-pub mod ws_manager;
 pub mod mexc_spot_client;
 pub mod mexc_spot_ws;
 mod socket_tool;

+ 0 - 0
src/exchange/ws_manager.rs


+ 17 - 53
src/main.rs

@@ -7,17 +7,21 @@ mod utils;
 mod strategy;
 mod exchange;
 mod api;
+pub mod ws_manager;
 
 use std::collections::{HashMap, HashSet};
 use backtrace::Backtrace;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
-use anyhow::{anyhow, Context};
+use anyhow::{anyhow};
 use anyhow::Result;
 use serde_json::Value;
+use tokio::sync::Mutex;
 use tracing::{error, info, warn};
 use utils::log_setup;
+use crate::data_manager::DataManager;
 use crate::exchange::mexc_spot_client::MexcSpotClient;
+use crate::ws_manager::WsManager;
 
 #[tokio::main]
 async fn main() {
@@ -25,7 +29,7 @@ async fn main() {
 
     // 主进程控制
     let running = Arc::new(AtomicBool::new(true));
-    let client = Arc::new(tokio::sync::Mutex::new(MexcSpotClient::new_with_tag("MexcSpot".to_string(), None)));
+    let client_am = Arc::new(tokio::sync::Mutex::new(MexcSpotClient::new_with_tag("MexcSpot".to_string(), None)));
 
     // panic错误捕获,panic级别的错误直接退出
     let panic_running = running.clone();
@@ -57,11 +61,11 @@ async fn main() {
 
     // ---- 运行核心订阅逻辑 ----
     let task_running = running.clone();
-    let subscribe_client = client.clone();
+    let subscribe_client_am = client_am.clone();
     // 启动一个后台任务来执行订阅和数据处理
     let subscription_handle = tokio::spawn(async move {
         // 运行获取交易对和订阅 K 线的函数
-        if let Err(e) = run_mexc_subscriptions(task_running.clone(), subscribe_client).await {
+        if let Err(e) = run_mexc_subscriptions(task_running.clone(), subscribe_client_am).await {
             error!("运行 MEXC 订阅任务失败: {:?}", e);
             task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志
         }
@@ -79,8 +83,6 @@ async fn main() {
 
     // ---- 清理和关闭 ----
     // 等待订阅任务结束(如果它设计为可结束的话)
-    // 可能需要给 ws_manager 发送关闭信号
-    // ws_manager.shutdown().await; // 假设有这样一个方法
     info!("等待订阅任务完成...");
     // 可以给 subscription_handle 设置一个超时等待
     match tokio::time::timeout(tokio::time::Duration::from_secs(10), subscription_handle).await {
@@ -89,14 +91,6 @@ async fn main() {
         Err(_) => warn!("等待订阅任务超时。"),
     }
 
-    // 等待 WebSocket 管理器关闭 (如果它有自己的运行循环)
-    // info!("等待 WebSocket 管理器关闭...");
-    // match tokio::time::timeout(tokio::time::Duration::from_secs(10), ws_run_handle).await {
-    //     Ok(Ok(_)) => info!("WebSocket 管理器正常关闭。"),
-    //     Ok(Err(e)) => error!("WebSocket 管理器关闭时出错: {:?}", e),
-    //     Err(_) => warn!("等待 WebSocket 管理器关闭超时。"),
-    // }
-
     info!("应用程序已关闭。");
 }
 
@@ -108,24 +102,23 @@ async fn main() {
 /// # Returns
 pub async fn run_mexc_subscriptions(
     running: Arc<AtomicBool>, // 接收 running 标志,以便在出错时可以停止
-    client: Arc<tokio::sync::Mutex<MexcSpotClient>>,
+    client_am: Arc<tokio::sync::Mutex<MexcSpotClient>>,
 ) -> Result<()> {
     info!("开始获取 MEXC 交易对...");
 
     // 1. 获取所有交易对
     // 注意:这里的 .await? 会在出错时直接返回 Err,中断此函数
     // 你可能需要根据实际的API响应调整这里的类型和字段访问
-    let mut rest_client = client.lock().await;
+    let mut rest_client = client_am.lock().await;
 
     let default_symbols_response = rest_client.default_symbols().await;
     // info!("获取到的 default_symbols_value: {}", serde_json::to_string_pretty(&default_symbols_response.data).unwrap());
 
     let exchange_info_response = rest_client.exchange_info(Value::Null).await;
     // info!("获取到的 exchange_info_data: {}", serde_json::to_string_pretty(&exchange_info_response.data).unwrap());
-    
+
     // 调用过滤函数,注意现在传入的是 &Value
-    let filtered_map = process_exchange_info(&default_symbols_response.data, &exchange_info_response.data)
-        .context("处理交易所信息时发生错误")?;
+    let filtered_map = process_exchange_info(&default_symbols_response.data, &exchange_info_response.data)?;
     info!("成功过滤并转换了交易对信息,最终数量: {}", filtered_map.len());
     // 使用 filtered_map...
     if let Some(symbol_info) = filtered_map.get("BTCUSDT") { // 示例
@@ -141,40 +134,11 @@ pub async fn run_mexc_subscriptions(
     }
 
     let symbols: Vec<String> = filtered_map.keys().cloned().collect();
-    info!("成功获取 {} 个交易对,准备订阅 1 分钟 K 线...", symbols.len());
-
-    // // 2. 按规则订阅所有交易对的 1Min K 线
-    // // WsManager 应该负责处理如何将这些订阅分散到多个 WebSocket 连接上
-    // // WsManager 的 subscribe_kline_1m 方法需要被实现
-    // match ws_manager.subscribe_kline_1m(symbols).await {
-    //     Ok(_) => {
-    //         info!("已成功向 WsManager 发送所有 1 分钟 K 线订阅请求。");
-    //         // WsManager 内部应该已经启动了监听任务来接收数据
-    //         // 这个函数在这里的任务就完成了,数据处理将在 WsManager 内部或通过其暴露的回调/通道进行
-    //     }
-    //     Err(e) => {
-    //         error!("向 WsManager 发送 K 线订阅请求时出错: {:?}", e);
-    //         running.store(false, Ordering::Relaxed); // 订阅失败,设置停止标志
-    //         return Err(e); // 返回错误
-    //     }
-    // }
-
-    // (可选) 这里可以启动一个循环来监听 WsManager 的状态或接收处理后的数据
-    // 例如,如果 WsManager 通过 channel 发送数据:
-    // loop {
-    //    select! {
-    //       // 监听来自 ws_manager 的数据
-    //       Some(data) = ws_manager.data_receiver.recv() => {
-    //          // 处理数据...
-    //          // info!("收到数据: {:?}", data);
-    //       },
-    //       // 检查是否需要停止
-    //       _ = tokio::time::sleep(Duration::from_millis(100)), if !running.load(Ordering::Relaxed) => {
-    //          info!("订阅任务接收到停止信号,退出。");
-    //          break;
-    //       }
-    //    }
-    // }
+    info!("成功获取 {} 个交易对,准备订阅 1 分钟 K 线、深度数据(需要http初始化)...", symbols.len());
+    let data_manager_am = Arc::new(Mutex::new(DataManager::new(filtered_map)));
+    let mut ws_manager = WsManager::new(symbols, data_manager_am.clone(), running.clone());
+    ws_manager.subscribe_all().await?;
+    info!("所有订阅已提交,程序将继续运行并接收实时数据...");
 
     // 如果订阅本身是后台任务,并且这个函数只是触发订阅,那么到这里就可以返回 Ok 了
     Ok(())

+ 6 - 14
src/utils/rest_utils.rs

@@ -1,17 +1,14 @@
-use tracing::{info, trace};
 use crate::exchange::response_base::Response;
 
 #[derive(Clone)]
 pub struct RestUtils {
-    pub base_url: String
 }
 
 impl RestUtils {
-    pub fn new(base_url: String) -> RestUtils {
-        RestUtils { base_url }
+    pub fn new() -> RestUtils {
+        RestUtils { }
     }
-
-    //map数据转 get请求参数
+    
     pub fn parse_params_to_str(parameters: String) -> String {
         let mut params_str = String::from("");
         let parsed_json: serde_json::Value = serde_json::from_str(&parameters).unwrap();
@@ -22,21 +19,16 @@ impl RestUtils {
                     serde_json::Value::String(s) => s.clone(),
                     _ => value.to_string()
                 };
-                // trace!("Key: {}", key);
-                // trace!("Value: {}", formatted_value);
-                // let formatted_value = match value {
-                //     Value::String(s) => s.clone(),
-                //     _ => value.to_string()
-                // };
+                
                 let str = format!("{}={}", key, formatted_value);
                 let format_str = format!("{}{}{}", params_str, (if params_str.len() > 0 { "&" } else { "" }), str);
                 params_str = format_str;
             }
         }
-        // info!("---json-转字符串拼接:{}", params_str);
+        
         params_str.to_string()
     }
-    //res_data 解析
+    
     pub fn res_data_analysis(result: Result<Response, reqwest::Error>) -> Response {
         match result {
             Ok(res_data) => {

+ 73 - 0
src/ws_manager.rs

@@ -0,0 +1,73 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use tokio::sync::Mutex;
+use anyhow::Result;
+use tokio_tungstenite::tungstenite::Message;
+use tracing::{error, info};
+use crate::data_manager::DataManager;
+use crate::exchange::mexc_spot_ws::{MexcSpotWs, MexcSpotWsSubscribeType, MexcSpotWsType};
+use crate::exchange::response_base::Response;
+use crate::utils::log_setup::setup_logging;
+
+pub struct WsManager {
+    pub symbols: Vec<String>,
+    pub data_manager_am: Arc<Mutex<DataManager>>,
+    pub running: Arc<AtomicBool>,
+}
+
+impl WsManager {
+    pub fn new(symbols: Vec<String>, data_manager_am: Arc<Mutex<DataManager>>, running: Arc<AtomicBool>) -> WsManager {
+        WsManager {
+            symbols,
+            data_manager_am,
+            running
+        }
+    }
+
+    pub async fn subscribe_all(&mut self) -> Result<()> {
+        // 每批最大交易对数量
+        const BATCH_SIZE: usize = 15;
+
+        // 计算总共需要多少批次
+        let num_batches = (self.symbols.len() + BATCH_SIZE - 1) / BATCH_SIZE;
+
+        for i in 0..num_batches {
+            // 计算当前批次的起始和结束索引
+            let start_index = i * BATCH_SIZE;
+            let end_index = (start_index + BATCH_SIZE).min(self.symbols.len());
+
+            // 获取当前批次的交易对
+            let current_batch_symbols = self.symbols[start_index..end_index].to_vec();
+            info!("正在创建 [{}, {}) 的连接...", start_index, end_index);
+
+            // 这个通道主要是为了后面给这个ws发送消息
+            let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
+            let _guard = setup_logging().unwrap();
+
+            let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
+
+            ws.set_subscribe(vec![
+                MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
+                MexcSpotWsSubscribeType::PuFuturesDepth
+            ]);
+
+            ws.set_symbols(current_batch_symbols);
+
+            let fun = move |response: Response| {
+                if response.code != 200 {
+                    error!("{}", serde_json::to_string_pretty(&response.data).unwrap());
+                }
+
+                async move {}
+            };
+
+            // 链接
+            let write_tx_am = Arc::new(Mutex::new(write_tx));
+            ws.ws_connect_async(self.running.clone(), fun, &write_tx_am, write_rx)
+                .await
+                .expect("链接失败");
+        }
+
+        Ok(())
+    }
+}