skyffire пре 6 месеци
родитељ
комит
6c1bce9b64
4 измењених фајлова са 392 додато и 6 уклоњено
  1. 1 1
      rust-toolchain
  2. 258 0
      src/data_manager.rs
  3. 3 3
      src/exchange/mod.rs
  4. 130 2
      src/main.rs

+ 1 - 1
rust-toolchain

@@ -1 +1 @@
-1.81.0
+1.83.0

+ 258 - 0
src/data_manager.rs

@@ -0,0 +1,258 @@
+// use std::collections::BTreeMap;
+// use std::sync::Arc;
+// use std::sync::atomic::{AtomicBool};
+// use rust_decimal::Decimal;
+// use tokio::{spawn};
+// use tokio::sync::Mutex;
+// use tracing::{error};
+// use tokio_tungstenite::tungstenite::Message;
+// use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use global::trace_stack::TraceStack;
+// use standard::exchange::ExchangeEnum::BybitSwap;
+// use standard::exchange_struct_handler::ExchangeStructHandler;
+// use standard::{Depth, OrderBook};
+// use crate::core::Core;
+// use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
+// use crate::model::OrderInfo;
+// 
+// // 参考 Bybit 合约 启动
+// pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
+//                                              core_arc: Arc<Mutex<Core>>,
+//                                              name: String,
+//                                              symbols: Vec<String>,
+//                                              is_colo: bool,
+//                                              ref_index: usize
+// ) {
+//     spawn(async move {
+//         //创建读写通道
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
+//         let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
+//         ws.set_subscribe(vec![
+//             BybitSwapSubscribeType::PuTrade,
+//             BybitSwapSubscribeType::PuOrderBook1,
+//             // BybitSwapSubscribeType::PuKline("1".to_string()),
+//             // BybitSwapSubscribeType::PuTickers
+//         ]);
+// 
+//         // 读取数据
+//         let core_arc_clone = Arc::clone(&core_arc);
+//         let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
+//         let multiplier = rest.get_self_market().multiplier;
+//         let mut records = rest.get_record("1".to_string()).await.unwrap();
+//         for record in records.iter_mut() {
+//             let core_arc_clone = core_arc.clone();
+// 
+//             on_record(core_arc_clone, record).await
+//         }
+// 
+//         let depth_asks = Arc::new(Mutex::new(Vec::new()));
+//         let depth_bids = Arc::new(Mutex::new(Vec::new()));
+// 
+//         let fun = move |data: ResponseData| {
+//             // 在 async 块之前克隆 Arc
+//             let core_arc_cc = core_arc_clone.clone();
+//             let mul = multiplier.clone();
+// 
+//             let depth_asks = Arc::clone(&depth_asks);
+//             let depth_bids = Arc::clone(&depth_bids);
+// 
+//             async move {
+//                 let mut depth_asks = depth_asks.lock().await;
+//                 let mut depth_bids = depth_bids.lock().await;
+//                 // 使用克隆后的 Arc,避免 move 语义
+//                 on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids, ref_index).await
+//             }
+//         };
+// 
+//         // 链接
+//         let write_tx_am = Arc::new(Mutex::new(write_tx));
+//         ws.set_symbols(symbols);
+//         ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
+//     });
+// }
+// 
+// // 交易 bybit 合约 启动
+// pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
+//                                    core_arc: Arc<Mutex<Core>>,
+//                                    name: String,
+//                                    symbols: Vec<String>,
+//                                    is_colo: bool,
+//                                    exchange_params: BTreeMap<String, String>) {
+//     // 参考
+//     reference_bybit_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await;
+// 
+//     // 交易
+//     spawn(async move {
+//         // 交易交易所需要启动私有ws
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//         let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
+//         let mut ws = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
+//         ws.set_subscribe(vec![
+//             BybitSwapSubscribeType::PrPosition,
+//             BybitSwapSubscribeType::PrOrder,
+//             BybitSwapSubscribeType::PrWallet
+//         ]);
+// 
+//         let core_arc_clone_private = core_arc.clone();
+//         let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
+//         let run_symbol = symbols.clone()[0].clone();
+// 
+//         // 挂起私有ws
+//         let fun = move |data: ResponseData| {
+//             // 在 async 块之前克隆 Arc
+//             let core_arc_cc = core_arc_clone_private.clone();
+//             let mul = multiplier.clone();
+//             let rs = run_symbol.clone();
+// 
+//             async move {
+//                 // 使用克隆后的 Arc,避免 move 语义
+//                 on_private_data(core_arc_cc, &mul, &rs, &data).await;
+//             }
+//         };
+// 
+//         // 链接
+//         let write_tx_am = Arc::new(Mutex::new(write_tx));
+//         ws.set_symbols(symbols);
+//         ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
+//     });
+// }
+// 
+// async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>, ref_index: usize) {
+//     let mut trace_stack = TraceStack::new(response.time, response.ins);
+//     trace_stack.on_after_span_line();
+// 
+//     match response.channel.as_str() {
+//         "orderbook" => {
+//             trace_stack.set_source("bybit_usdt_swap.bookTicker".to_string());
+// 
+//             let mut is_update = false;
+//             if response.data_type == "delta"  {
+//                 is_update = true;
+//             }
+//             let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, mul);
+//             // 是增量更新
+//             if is_update {
+//                 if depth.asks.len() != 0 {
+//                     depth_asks.clear();
+//                     depth_asks.append(&mut depth.asks);
+//                 }
+// 
+//                 if depth.bids.len() != 0 {
+//                     depth_bids.clear();
+//                     depth_bids.append(&mut depth.bids);
+//                 }
+// 
+//                 let result_depth = Depth {
+//                     time: depth.time,
+//                     symbol: depth.symbol,
+//                     asks: depth_asks.clone(),
+//                     bids: depth_bids.clone(),
+//                 };
+// 
+//                 trace_stack.on_after_format();
+//                 on_depth(core_arc.clone(), &response.label, &mut trace_stack, &result_depth, ref_index).await;
+//                 // on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 1).await;
+//             }
+//             // 全量
+//             else {
+//                 trace_stack.on_after_format();
+//                 on_depth(core_arc.clone(), &response.label, &mut trace_stack, &depth, ref_index).await;
+//                 // on_depth(core_arc, &response.label, &mut trace_stack, &depth, 1).await;
+// 
+//                 depth_asks.clear();
+//                 depth_asks.append(&mut depth.asks);
+//                 depth_bids.clear();
+//                 depth_bids.append(&mut depth.bids);
+//             }
+//         }
+//         "trade" => {
+//             trace_stack.set_source("bybit_usdt_swap.trade".to_string());
+// 
+//             let mut trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul);
+//             trace_stack.on_after_format();
+// 
+//             for trade in trades.iter_mut() {
+//                 on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, ref_index).await;
+//                 // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 1).await;
+//             }
+//         }
+//         "tickers" => {
+//             trace_stack.set_source("bybit_usdt_swap.tickers".to_string());
+//             let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
+//             trace_stack.on_after_format();
+// 
+//             on_ticker(core_arc, &mut trace_stack, &ticker).await;
+//         },
+//         // k线数据
+//         "kline" => {
+//             let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response);
+// 
+//             if records.is_empty() {
+//                 return;
+//             }
+// 
+//             for record in records.iter_mut() {
+//                 let core_arc_clone = core_arc.clone();
+// 
+//                 on_record(core_arc_clone, record).await
+//             }
+//         },
+//         _ => {
+//             error!("未知推送类型");
+//             error!(?response);
+//         }
+//     }
+// }
+// 
+// async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
+//     let mut trace_stack = TraceStack::new(response.time, response.ins);
+//     trace_stack.on_after_span_line();
+// 
+//     match response.channel.as_str() {
+//         "wallet" => {
+//             let account = ExchangeStructHandler::account_info_handle(BybitSwap, response, run_symbol);
+//             let mut core = core_arc_clone.lock().await;
+//             core.update_equity(account).await;
+//         }
+//         "order" => {
+//             let orders = ExchangeStructHandler::order_handle(BybitSwap, response, ct_val);
+//             trace_stack.on_after_format();
+// 
+//             let mut order_infos:Vec<OrderInfo> = Vec::new();
+//             for mut order in orders.order {
+//                 if order.status == "NULL" {
+//                     error!("bybit_usdt_swap 未识别的订单状态:{:?}", response);
+// 
+//                     continue;
+//                 }
+// 
+//                 // if order.deal_amount != Decimal::ZERO {
+//                 //     info!("bybit order 消息原文:{:?}", response);
+//                 // }
+// 
+//                 let order_info = OrderInfo::parse_order_to_order_info(&mut order);
+//                 order_infos.push(order_info);
+//             }
+// 
+//             let mut core = core_arc_clone.lock().await;
+//             core.update_order(order_infos, trace_stack).await;
+//         }
+//         "position" => {
+//             let positions = ExchangeStructHandler::position_handle(BybitSwap, response, ct_val);
+//             let mut core = core_arc_clone.lock().await;
+//             core.update_position(positions).await;
+//         }
+//         _ => {
+//             error!("未知推送类型");
+//             error!(?response);
+//         }
+//     }
+// }
+// 
+// fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
+//     BybitSwapLogin {
+//         api_key: exchange_params.get("access_key").unwrap().clone(),
+//         secret_key: exchange_params.get("secret_key").unwrap().clone(),
+//     }
+// }

+ 3 - 3
src/exchange/mod.rs

@@ -1,8 +1,8 @@
 pub mod response_base;
 pub mod response_base;
 
 
 mod types;
 mod types;
-mod ws_manager;
-mod mexc_spot_client;
-mod mexc_spot_ws;
+pub mod ws_manager;
+pub mod mexc_spot_client;
+pub mod mexc_spot_ws;
 mod socket_tool;
 mod socket_tool;
 mod proxy;
 mod proxy;

+ 130 - 2
src/main.rs

@@ -11,8 +11,9 @@ mod api;
 use backtrace::Backtrace;
 use backtrace::Backtrace;
 use std::sync::Arc;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::atomic::{AtomicBool, Ordering};
-use tracing::{info, warn};
+use tracing::{error, info, warn};
 use utils::log_setup;
 use utils::log_setup;
+use crate::exchange::mexc_spot_client::MexcSpotClient;
 
 
 #[tokio::main]
 #[tokio::main]
 async fn main() {
 async fn main() {
@@ -20,6 +21,7 @@ async fn main() {
 
 
     // 主进程控制
     // 主进程控制
     let running = Arc::new(AtomicBool::new(true));
     let running = Arc::new(AtomicBool::new(true));
+    let client = Arc::new(tokio::sync::Mutex::new(MexcSpotClient::new_with_tag("MexcSpot".to_string(), None)));
 
 
     // panic错误捕获,panic级别的错误直接退出
     // panic错误捕获,panic级别的错误直接退出
     let panic_running = running.clone();
     let panic_running = running.clone();
@@ -39,5 +41,131 @@ async fn main() {
         panic_running.store(false, Ordering::Relaxed);
         panic_running.store(false, Ordering::Relaxed);
     }));
     }));
 
 
-    info!("测试完成");
+    // ---- 优雅停机处理 (示例: SIGINT/Ctrl+C) ----
+    //注意:Windows上可能不支持所有信号,SIGINT通常可用
+    let r = running.clone(); // 克隆 Arc 用于 SIGHUP/SIGTERM/SIGINT 处理
+    tokio::spawn(async move {
+        tokio::signal::ctrl_c().await.expect("设置 Ctrl+C 处理器失败");
+        warn!("接收到退出信号 (Ctrl+C)... 开始关闭.");
+        r.store(false, Ordering::Relaxed);
+    });
+    info!("应用程序启动...");
+
+    // ---- 运行核心订阅逻辑 ----
+    let task_running = running.clone();
+    let subscribe_client = client.clone();
+    // 启动一个后台任务来执行订阅和数据处理
+    let subscription_handle = tokio::spawn(async move {
+        // 运行获取交易对和订阅 K 线的函数
+        if !run_mexc_subscriptions(task_running.clone(), subscribe_client).await {
+            error!("运行 MEXC 订阅任务失败");
+            task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志
+        }
+    });
+
+    info!("主循环开始,等待退出信号...");
+    // ---- 主循环 ----
+    // 保持主线程活动,等待 running 标志变为 false
+    while running.load(Ordering::Relaxed) {
+        // 可以添加一些周期性检查或任务,但主要是等待
+        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+    }
+
+    info!("应用程序正在关闭...");
+
+    // ---- 清理和关闭 ----
+    // 等待订阅任务结束(如果它设计为可结束的话)
+    // 可能需要给 ws_manager 发送关闭信号
+    // ws_manager.shutdown().await; // 假设有这样一个方法
+    info!("等待订阅任务完成...");
+    // 可以给 subscription_handle 设置一个超时等待
+    match tokio::time::timeout(tokio::time::Duration::from_secs(10), subscription_handle).await {
+        Ok(Ok(_)) => info!("订阅任务正常结束。"),
+        Ok(Err(e)) => error!("订阅任务返回错误: {:?}", e),
+        Err(_) => warn!("等待订阅任务超时。"),
+    }
+
+    // 等待 WebSocket 管理器关闭 (如果它有自己的运行循环)
+    // info!("等待 WebSocket 管理器关闭...");
+    // match tokio::time::timeout(tokio::time::Duration::from_secs(10), ws_run_handle).await {
+    //     Ok(Ok(_)) => info!("WebSocket 管理器正常关闭。"),
+    //     Ok(Err(e)) => error!("WebSocket 管理器关闭时出错: {:?}", e),
+    //     Err(_) => warn!("等待 WebSocket 管理器关闭超时。"),
+    // }
+
+    info!("应用程序已关闭。");
+}
+
+/// 运行 MEXC 交易对获取和 K 线订阅的核心逻辑
+///
+/// # Arguments
+/// * `running` - 用于控制程序是否继续运行的原子布尔值 (Arc 包裹)
+///
+/// # Returns
+pub async fn run_mexc_subscriptions(
+    running: Arc<AtomicBool>, // 接收 running 标志,以便在出错时可以停止
+    client: Arc<tokio::sync::Mutex<MexcSpotClient>>,
+) -> bool {
+    info!("开始获取 MEXC 交易对...");
+
+    // 1. 获取所有交易对
+    // 注意:这里的 .await? 会在出错时直接返回 Err,中断此函数
+    // 你可能需要根据实际的API响应调整这里的类型和字段访问
+    let mut rest_client = client.lock().await;
+    let exchange_info_response = rest_client.exchange_info(serde_json::Value::Null).await;
+    let default_symbols_response = rest_client.default_symbols().await;
+
+    // // 提取交易对名称 (例如 "BTC_USDT")
+    // // 假设 SymbolInfo 结构体有一个名为 `symbol` 的 String 字段
+    // // 同时,我们可能需要过滤掉一些不活跃或不交易的交易对(根据API返回的状态字段)
+    // let symbols: Vec<String> = symbols_info
+    //     .iter()
+    //     // .filter(|info| info.status == "ENABLED") // 假设有状态字段可以过滤
+    //     .map(|info| info.symbol.clone()) // 假设字段名为 symbol
+    //     .collect();
+    //
+    // if symbols.is_empty() {
+    //     warn!("未能获取到任何有效的交易对,请检查网络或 API 接口。");
+    //     // 根据情况决定是否需要停止程序
+    //     // running.store(false, Ordering::Relaxed);
+    //     return false; // 或者返回特定错误
+    // }
+    //
+    // info!("成功获取 {} 个交易对,准备订阅 1 分钟 K 线...", symbols.len());
+
+    // // 2. 按规则订阅所有交易对的 1Min K 线
+    // // WsManager 应该负责处理如何将这些订阅分散到多个 WebSocket 连接上
+    // // WsManager 的 subscribe_kline_1m 方法需要被实现
+    // match ws_manager.subscribe_kline_1m(symbols).await {
+    //     Ok(_) => {
+    //         info!("已成功向 WsManager 发送所有 1 分钟 K 线订阅请求。");
+    //         // WsManager 内部应该已经启动了监听任务来接收数据
+    //         // 这个函数在这里的任务就完成了,数据处理将在 WsManager 内部或通过其暴露的回调/通道进行
+    //     }
+    //     Err(e) => {
+    //         error!("向 WsManager 发送 K 线订阅请求时出错: {:?}", e);
+    //         running.store(false, Ordering::Relaxed); // 订阅失败,设置停止标志
+    //         return Err(e); // 返回错误
+    //     }
+    // }
+
+    // (可选) 这里可以启动一个循环来监听 WsManager 的状态或接收处理后的数据
+    // 例如,如果 WsManager 通过 channel 发送数据:
+    // loop {
+    //    select! {
+    //       // 监听来自 ws_manager 的数据
+    //       Some(data) = ws_manager.data_receiver.recv() => {
+    //          // 处理数据...
+    //          // info!("收到数据: {:?}", data);
+    //       },
+    //       // 检查是否需要停止
+    //       _ = tokio::time::sleep(Duration::from_millis(100)), if !running.load(Ordering::Relaxed) => {
+    //          info!("订阅任务接收到停止信号,退出。");
+    //          break;
+    //       }
+    //    }
+    // }
+
+    // 如果订阅本身是后台任务,并且这个函数只是触发订阅,那么到这里就可以返回 Ok 了
+    true
 }
 }