| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- mod utils;
- mod exchange;
- mod strategy;
- mod data_manager;
- use anyhow::{Result};
- use std::sync::Arc;
- use std::sync::atomic::{AtomicBool, Ordering};
- use backtrace::Backtrace;
- use tokio::spawn;
- use tokio::sync::Mutex;
- use tokio_tungstenite::tungstenite::Message;
- use tracing::{error, info, warn};
- use utils::log_setup;
- use crate::data_manager::DataManager;
- use crate::exchange::extended_account::ExtendedAccount;
- use crate::exchange::extended_rest_client::ExtendedRestClient;
- use crate::exchange::extended_stream_client::ExtendedStreamClient;
- use crate::strategy::Strategy;
- use crate::utils::config::Config;
- use crate::utils::response::Response;
- #[tokio::main]
- async fn main() {
- // 日志初始化
- let _guards = log_setup::setup_logging().unwrap();
- // 主进程控制
- let running = Arc::new(AtomicBool::new(true));
- // panic错误捕获,panic级别的错误直接退出
- let panic_running = running.clone();
- std::panic::set_hook(Box::new(move |panic_info| {
- let msg = format!(
- "type=panic, msg={:?}, location={:?}",
- panic_info.to_string(),
- panic_info.location()
- );
- // 生成并格式化完整的堆栈跟踪
- let backtrace = Backtrace::new();
- let stack_trace = format!("{:?}", backtrace);
- // 一并打印堆栈跟踪
- warn!("{}\nStack Trace:\n{}", msg, stack_trace);
- panic_running.store(false, Ordering::Relaxed);
- }));
- // 配置文件
- let config_result = Config::load();
- let config = match config_result {
- Ok(config) => config,
- Err(error) => {
- panic!("Configuration error: {}", error);
- }
- };
- // ---- 优雅停机处理 (示例: SIGINT/Ctrl+C) ----
- //注意:Windows上可能不支持所有信号,SIGINT通常可用
- let r = running.clone(); // 克隆 Arc 用于 SIGHUP/SIGTERM/SIGINT 处理
- spawn(async move {
- tokio::signal::ctrl_c().await.expect("设置 Ctrl+C 处理器失败");
- warn!("接收到退出信号 (Ctrl+C)... 开始关闭.");
- r.store(false, Ordering::Relaxed);
- });
- // ---- 运行核心订阅逻辑 ----
- info!("==================================== 应用程序启动 =======================================");
- let task_running = running.clone();
- let config_clone = config.clone();
- // 启动一个后台任务来执行订阅和数据处理
- let subscription_handle = spawn(async move {
- // 运行获取交易对和订阅 K 线的函数
- if let Err(e) = run_extended_subscriptions(task_running.clone(), &config_clone).await {
- error!("运行 Extended 订阅任务失败: {:?}", e);
- task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志
- }
- });
- info!("主循环开始,等待退出信号...");
- // ---- 主循环 ----
- // 保持主线程活动,等待 running 标志变为 false
- while running.load(Ordering::Relaxed) {
- // 可以添加一些周期性检查或任务,但主要是等待
- tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
- }
- info!("应用程序正在关闭...");
- // ---- 清理和关闭 ----
- // 等待订阅任务结束(如果它设计为可结束的话)
- info!("等待清退任务完成...");
- // 可以给 subscription_handle 设置一个超时等待
- match tokio::time::timeout(tokio::time::Duration::from_secs(30), subscription_handle).await {
- Ok(Ok(_)) => info!("清退任务正常结束。"),
- Ok(Err(e)) => error!("清退任务返回错误: {:?}", e),
- Err(_) => warn!("等待清退任务超时。"),
- }
- info!("应用程序已关闭。");
- }
- /// 运行 Extended 的主要订阅任务
- ///
- /// # Arguments
- /// * `running` - 用于控制程序是否继续运行的原子布尔值 (Arc 包裹)
- ///
- /// # Returns
- pub async fn run_extended_subscriptions(running: Arc<AtomicBool>, config: &Config) -> Result<()> {
- let market = config.strategy.market.as_str();
- let account = ExtendedAccount::new(
- config.account.api_key.as_str(),
- config.account.stark_public_key.as_str(),
- config.account.stark_private_key.as_str(),
- config.account.vault_number,
- );
- let is_testnet = config.network.is_testnet;
- // 订阅数据的客户端
- let stream_client_list = vec![
- ExtendedStreamClient::best_prices(format!("ExtendedBestPrices_{}", market).as_str(), None, market, is_testnet),
- ExtendedStreamClient::account("ExtendedAccount", Some(account.clone()), is_testnet),
- ];
-
- // rest客户端
- let rest_client = ExtendedRestClient::new("ExtendedRestClient", Some(account), market, is_testnet).await?;
- let rest_client_am = Arc::new(Mutex::new(rest_client));
- // 数据管理及消息分发
- let data_manager = DataManager::new();
- let data_manager_am = Arc::new(Mutex::new(data_manager));
- // 策略执行
- let strategy = Strategy::new(rest_client_am.clone(), &config.strategy);
- let strategy_am = Arc::new(Mutex::new(strategy));
- // 异步去订阅、并阻塞
- for mut stream_client in stream_client_list {
- let running_clone = Arc::clone(&running);
- // 定义需要处理数据的fun
- let dm = data_manager_am.clone();
- let sm = strategy_am.clone();
- let fun = move |response: Response| {
- if response.code != 200 {
- error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
- }
- let dm_clone = Arc::clone(&dm);
- let sm_clone = Arc::clone(&sm);
- async move {
- let mut dm_guard = dm_clone.lock().await;
- // 记录消息延迟
- dm_guard.record_latency(response.received_time, response.reach_time);
- // 交给消息分发函数,并在此处消费掉错误消息
- if let Err(e) = dm_guard.dispatch_message(&response).await {
- warn!("消息分发过程中出现错误: {}", e);
- }
- // 随后执行策略
- let mut sm_guard = sm_clone.lock().await;
- if let Err(e) = sm_guard.do_strategy(&dm_guard).await {
- warn!("策略执行过程中出现错误: {}", e);
- }
- }
- };
- // 这个通道主要是为了后面给这个ws发送消息
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
- let write_tx_am = Arc::new(Mutex::new(write_tx));
- spawn(async move {
- // 链接
- stream_client.ws_connect_async(running_clone, fun, &write_tx_am, write_rx)
- .await
- .expect("ws链接失败");
- });
- }
- // 等待系统退出,并执行后续程序
- while running.load(Ordering::Relaxed) {
- // 可以添加一些周期性检查或任务,但主要是等待
- tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
- }
- info!("执行后续清退策略……");
- let mut s = strategy_am.lock().await;
- let dm = data_manager_am.lock().await;
- s.shutdown(market, &dm).await?;
- Ok(())
- }
|