main.rs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. mod utils;
  2. mod exchange;
  3. mod strategy;
  4. mod data_manager;
  5. use anyhow::{Result};
  6. use std::sync::Arc;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use backtrace::Backtrace;
  9. use tokio::spawn;
  10. use tokio::sync::Mutex;
  11. use tokio_tungstenite::tungstenite::Message;
  12. use tracing::{error, info, warn};
  13. use utils::log_setup;
  14. use crate::data_manager::DataManager;
  15. use crate::exchange::extended_account::ExtendedAccount;
  16. use crate::exchange::extended_rest_client::ExtendedRestClient;
  17. use crate::exchange::extended_stream_client::ExtendedStreamClient;
  18. use crate::strategy::Strategy;
  19. use crate::utils::config::Config;
  20. use crate::utils::response::Response;
  21. #[tokio::main]
  22. async fn main() {
  23. // 日志初始化
  24. let _guards = log_setup::setup_logging().unwrap();
  25. // 主进程控制
  26. let running = Arc::new(AtomicBool::new(true));
  27. // panic错误捕获,panic级别的错误直接退出
  28. let panic_running = running.clone();
  29. std::panic::set_hook(Box::new(move |panic_info| {
  30. let msg = format!(
  31. "type=panic, msg={:?}, location={:?}",
  32. panic_info.to_string(),
  33. panic_info.location()
  34. );
  35. // 生成并格式化完整的堆栈跟踪
  36. let backtrace = Backtrace::new();
  37. let stack_trace = format!("{:?}", backtrace);
  38. // 一并打印堆栈跟踪
  39. warn!("{}\nStack Trace:\n{}", msg, stack_trace);
  40. panic_running.store(false, Ordering::Relaxed);
  41. }));
  42. // 配置文件
  43. let config_result = Config::load();
  44. let config = match config_result {
  45. Ok(config) => config,
  46. Err(error) => {
  47. panic!("Configuration error: {}", error);
  48. }
  49. };
  50. // ---- 优雅停机处理 (示例: SIGINT/Ctrl+C) ----
  51. //注意:Windows上可能不支持所有信号,SIGINT通常可用
  52. let r = running.clone(); // 克隆 Arc 用于 SIGHUP/SIGTERM/SIGINT 处理
  53. spawn(async move {
  54. tokio::signal::ctrl_c().await.expect("设置 Ctrl+C 处理器失败");
  55. warn!("接收到退出信号 (Ctrl+C)... 开始关闭.");
  56. r.store(false, Ordering::Relaxed);
  57. });
  58. // ---- 运行核心订阅逻辑 ----
  59. info!("==================================== 应用程序启动 =======================================");
  60. let task_running = running.clone();
  61. let config_clone = config.clone();
  62. // 启动一个后台任务来执行订阅和数据处理
  63. let subscription_handle = spawn(async move {
  64. // 运行获取交易对和订阅 K 线的函数
  65. if let Err(e) = run_extended_subscriptions(task_running.clone(), &config_clone).await {
  66. error!("运行 Extended 订阅任务失败: {:?}", e);
  67. task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志
  68. }
  69. });
  70. info!("主循环开始,等待退出信号...");
  71. // ---- 主循环 ----
  72. // 保持主线程活动,等待 running 标志变为 false
  73. while running.load(Ordering::Relaxed) {
  74. // 可以添加一些周期性检查或任务,但主要是等待
  75. tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  76. }
  77. info!("应用程序正在关闭...");
  78. // ---- 清理和关闭 ----
  79. // 等待订阅任务结束(如果它设计为可结束的话)
  80. info!("等待清退任务完成...");
  81. // 可以给 subscription_handle 设置一个超时等待
  82. match tokio::time::timeout(tokio::time::Duration::from_secs(30), subscription_handle).await {
  83. Ok(Ok(_)) => info!("清退任务正常结束。"),
  84. Ok(Err(e)) => error!("清退任务返回错误: {:?}", e),
  85. Err(_) => warn!("等待清退任务超时。"),
  86. }
  87. info!("应用程序已关闭。");
  88. }
  89. /// 运行 Extended 的主要订阅任务
  90. ///
  91. /// # Arguments
  92. /// * `running` - 用于控制程序是否继续运行的原子布尔值 (Arc 包裹)
  93. ///
  94. /// # Returns
  95. pub async fn run_extended_subscriptions(running: Arc<AtomicBool>, config: &Config) -> Result<()> {
  96. let market = config.strategy.market.as_str();
  97. let account = ExtendedAccount::new(
  98. config.account.api_key.as_str(),
  99. config.account.stark_public_key.as_str(),
  100. config.account.stark_private_key.as_str(),
  101. config.account.vault_number,
  102. );
  103. let is_testnet = config.network.is_testnet;
  104. // 订阅数据的客户端
  105. let stream_client_list = vec![
  106. ExtendedStreamClient::best_prices(format!("ExtendedBestPrices_{}", market).as_str(), None, market, is_testnet),
  107. ExtendedStreamClient::account("ExtendedAccount", Some(account.clone()), is_testnet),
  108. ];
  109. // rest客户端
  110. let rest_client = ExtendedRestClient::new("ExtendedRestClient", Some(account), market, is_testnet).await?;
  111. let rest_client_am = Arc::new(Mutex::new(rest_client));
  112. // 数据管理及消息分发
  113. let data_manager = DataManager::new();
  114. let data_manager_am = Arc::new(Mutex::new(data_manager));
  115. // 策略执行
  116. let strategy = Strategy::new(rest_client_am.clone(), &config.strategy);
  117. let strategy_am = Arc::new(Mutex::new(strategy));
  118. // 异步去订阅、并阻塞
  119. for mut stream_client in stream_client_list {
  120. let running_clone = Arc::clone(&running);
  121. // 定义需要处理数据的fun
  122. let dm = data_manager_am.clone();
  123. let sm = strategy_am.clone();
  124. let fun = move |response: Response| {
  125. if response.code != 200 {
  126. error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
  127. }
  128. let dm_clone = Arc::clone(&dm);
  129. let sm_clone = Arc::clone(&sm);
  130. async move {
  131. let mut dm_guard = dm_clone.lock().await;
  132. // 记录消息延迟
  133. dm_guard.record_latency(response.received_time, response.reach_time);
  134. // 交给消息分发函数,并在此处消费掉错误消息
  135. if let Err(e) = dm_guard.dispatch_message(&response).await {
  136. warn!("消息分发过程中出现错误: {}", e);
  137. }
  138. // 随后执行策略
  139. let mut sm_guard = sm_clone.lock().await;
  140. if let Err(e) = sm_guard.do_strategy(&dm_guard).await {
  141. warn!("策略执行过程中出现错误: {}", e);
  142. }
  143. }
  144. };
  145. // 这个通道主要是为了后面给这个ws发送消息
  146. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  147. let write_tx_am = Arc::new(Mutex::new(write_tx));
  148. spawn(async move {
  149. // 链接
  150. stream_client.ws_connect_async(running_clone, fun, &write_tx_am, write_rx)
  151. .await
  152. .expect("ws链接失败");
  153. });
  154. }
  155. // 等待系统退出,并执行后续程序
  156. while running.load(Ordering::Relaxed) {
  157. // 可以添加一些周期性检查或任务,但主要是等待
  158. tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  159. }
  160. info!("执行后续清退策略……");
  161. let mut s = strategy_am.lock().await;
  162. let dm = data_manager_am.lock().await;
  163. s.shutdown(market, &dm).await?;
  164. Ok(())
  165. }