mod utils; mod exchange; mod strategy; mod data_manager; use anyhow::{Result}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use backtrace::Backtrace; use tokio::spawn; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::Message; use tracing::{error, info, warn}; use utils::log_setup; use crate::data_manager::DataManager; use crate::exchange::extended_account::ExtendedAccount; use crate::exchange::extended_rest_client::ExtendedRestClient; use crate::exchange::extended_stream_client::ExtendedStreamClient; use crate::strategy::Strategy; use crate::utils::config::Config; use crate::utils::response::Response; #[tokio::main] async fn main() { // 日志初始化 let _guards = log_setup::setup_logging().unwrap(); // 主进程控制 let running = Arc::new(AtomicBool::new(true)); // panic错误捕获,panic级别的错误直接退出 let panic_running = running.clone(); std::panic::set_hook(Box::new(move |panic_info| { let msg = format!( "type=panic, msg={:?}, location={:?}", panic_info.to_string(), panic_info.location() ); // 生成并格式化完整的堆栈跟踪 let backtrace = Backtrace::new(); let stack_trace = format!("{:?}", backtrace); // 一并打印堆栈跟踪 warn!("{}\nStack Trace:\n{}", msg, stack_trace); panic_running.store(false, Ordering::Relaxed); })); // 配置文件 let config_result = Config::load(); let config = match config_result { Ok(config) => config, Err(error) => { panic!("Configuration error: {}", error); } }; // ---- 优雅停机处理 (示例: SIGINT/Ctrl+C) ---- //注意:Windows上可能不支持所有信号,SIGINT通常可用 let r = running.clone(); // 克隆 Arc 用于 SIGHUP/SIGTERM/SIGINT 处理 spawn(async move { tokio::signal::ctrl_c().await.expect("设置 Ctrl+C 处理器失败"); warn!("接收到退出信号 (Ctrl+C)... 开始关闭."); r.store(false, Ordering::Relaxed); }); // ---- 运行核心订阅逻辑 ---- info!("==================================== 应用程序启动 ======================================="); let task_running = running.clone(); let config_clone = config.clone(); // 启动一个后台任务来执行订阅和数据处理 let subscription_handle = spawn(async move { // 运行获取交易对和订阅 K 线的函数 if let Err(e) = run_extended_subscriptions(task_running.clone(), &config_clone).await { error!("运行 Extended 订阅任务失败: {:?}", e); task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志 } }); info!("主循环开始,等待退出信号..."); // ---- 主循环 ---- // 保持主线程活动,等待 running 标志变为 false while running.load(Ordering::Relaxed) { // 可以添加一些周期性检查或任务,但主要是等待 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } info!("应用程序正在关闭..."); // ---- 清理和关闭 ---- // 等待订阅任务结束(如果它设计为可结束的话) info!("等待订阅任务完成..."); // 可以给 subscription_handle 设置一个超时等待 match tokio::time::timeout(tokio::time::Duration::from_secs(10), subscription_handle).await { Ok(Ok(_)) => info!("订阅任务正常结束。"), Ok(Err(e)) => error!("订阅任务返回错误: {:?}", e), Err(_) => warn!("等待订阅任务超时。"), } info!("应用程序已关闭。"); } /// 运行 Extended 的主要订阅任务 /// /// # Arguments /// * `running` - 用于控制程序是否继续运行的原子布尔值 (Arc 包裹) /// /// # Returns pub async fn run_extended_subscriptions(running: Arc, config: &Config) -> Result<()> { let market = config.strategy.market.as_str(); let account = ExtendedAccount::new( config.account.api_key.as_str(), config.account.stark_public_key.as_str(), config.account.stark_private_key.as_str(), config.account.vault_number, ); let is_testnet = config.network.is_testnet; // 订阅数据的客户端 let stream_client_list = vec![ ExtendedStreamClient::best_prices(format!("ExtendedBestPrices_{}", market).as_str(), None, market, is_testnet), ExtendedStreamClient::account("ExtendedAccount", Some(account.clone()), is_testnet), ]; // rest客户端 let rest_client = ExtendedRestClient::new("ExtendedRestClient", Some(account), market, is_testnet).await?; let rest_client_am = Arc::new(Mutex::new(rest_client)); // 数据管理及消息分发 let data_manager = DataManager::new(); let data_manager_am = Arc::new(Mutex::new(data_manager)); // 策略执行 let strategy = Strategy::new(rest_client_am.clone(), &config.strategy); let strategy_am = Arc::new(Mutex::new(strategy)); // 异步去订阅、并阻塞 for mut stream_client in stream_client_list { let running_clone = Arc::clone(&running); // 定义需要处理数据的fun let dm = data_manager_am.clone(); let sm = strategy_am.clone(); let fun = move |response: Response| { if response.code != 200 { error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap()); } let dm_clone = Arc::clone(&dm); let sm_clone = Arc::clone(&sm); async move { let mut dm_guard = dm_clone.lock().await; // 记录消息延迟 dm_guard.record_latency(response.received_time, response.reach_time); // 交给消息分发函数,并在此处消费掉错误消息 if let Err(e) = dm_guard.dispatch_message(&response).await { warn!("消息分发过程中出现错误: {}", e); } // 随后执行策略 let mut sm_guard = sm_clone.lock().await; if let Err(e) = sm_guard.do_strategy(&dm_guard).await { warn!("策略执行过程中出现错误: {}", e); } } }; // 这个通道主要是为了后面给这个ws发送消息 let (write_tx, write_rx) = futures_channel::mpsc::unbounded::(); let write_tx_am = Arc::new(Mutex::new(write_tx)); spawn(async move { // 链接 stream_client.ws_connect_async(running_clone, fun, &write_tx_am, write_rx) .await .expect("ws链接失败"); }); } // // 网络延迟统计 // let running_clone = Arc::clone(&running); // spawn(async move { // let mut interval = tokio::time::interval(Duration::from_secs(10)); // // while running_clone.load(Ordering::SeqCst) { // interval.tick().await; // 等待下一个周期 // // if !running_clone.load(Ordering::SeqCst) { // break; // } // // let mut total_delay_sum = 0i64; // let mut total_message_count = 0u64; // // // --- 第一步:收集 DataManager 的统计数据 --- // // 直接调用 DataManager 的方法 // let manager_lock = data_manager_am.lock().await; // 锁定单个 DataManager // let (current_sum, current_count) = manager_lock.get_delay_stats(); // 使用原子读 // // // 使用 saturating_add 防止聚合时溢出 (虽然 u64 很大,但好习惯) // total_delay_sum = total_delay_sum.saturating_add(current_sum); // total_message_count = total_message_count.saturating_add(current_count); // // manager_lock.reset_delay_stats(); // 调用重置方法 (使用原子写) // // // --- 第二步:计算并报告平均延迟 --- // if total_message_count > 0 { // let average_delay = total_delay_sum as f64 / total_message_count as f64; // info!( // "当前 WS 平均延迟: {:.2} 毫秒 (基于 {} 条消息测量)", // average_delay, total_message_count // ); // } else { // info!("WS 延迟报告:本周期内未收到用于计算延迟的消息。"); // } // } // info!("延迟报告任务已停止。"); // }); Ok(()) }