main.rs 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. mod utils;
  2. mod exchange;
  3. mod strategy;
  4. mod data_manager;
  5. use anyhow::{Result};
  6. use std::sync::Arc;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use backtrace::Backtrace;
  9. use tokio::spawn;
  10. use tokio::sync::Mutex;
  11. use tokio_tungstenite::tungstenite::Message;
  12. use tracing::{error, info, warn};
  13. use utils::log_setup;
  14. use crate::data_manager::DataManager;
  15. use crate::exchange::extended_account::ExtendedAccount;
  16. use crate::exchange::extended_rest_client::ExtendedRestClient;
  17. use crate::exchange::extended_stream_client::ExtendedStreamClient;
  18. use crate::strategy::Strategy;
  19. use crate::utils::response::Response;
  20. #[tokio::main]
  21. async fn main() {
  22. let _guards = log_setup::setup_logging().unwrap();
  23. // 主进程控制
  24. let running = Arc::new(AtomicBool::new(true));
  25. // panic错误捕获,panic级别的错误直接退出
  26. let panic_running = running.clone();
  27. std::panic::set_hook(Box::new(move |panic_info| {
  28. let msg = format!(
  29. "type=panic, msg={:?}, location={:?}",
  30. panic_info.to_string(),
  31. panic_info.location()
  32. );
  33. // 生成并格式化完整的堆栈跟踪
  34. let backtrace = Backtrace::new();
  35. let stack_trace = format!("{:?}", backtrace);
  36. // 一并打印堆栈跟踪
  37. warn!("{}\nStack Trace:\n{}", msg, stack_trace);
  38. panic_running.store(false, Ordering::Relaxed);
  39. }));
  40. // ---- 优雅停机处理 (示例: SIGINT/Ctrl+C) ----
  41. //注意:Windows上可能不支持所有信号,SIGINT通常可用
  42. let r = running.clone(); // 克隆 Arc 用于 SIGHUP/SIGTERM/SIGINT 处理
  43. tokio::spawn(async move {
  44. tokio::signal::ctrl_c().await.expect("设置 Ctrl+C 处理器失败");
  45. warn!("接收到退出信号 (Ctrl+C)... 开始关闭.");
  46. r.store(false, Ordering::Relaxed);
  47. });
  48. // ---- 运行核心订阅逻辑 ----
  49. info!("==================================== 应用程序启动 =======================================");
  50. let task_running = running.clone();
  51. // 启动一个后台任务来执行订阅和数据处理
  52. let subscription_handle = tokio::spawn(async move {
  53. // 运行获取交易对和订阅 K 线的函数
  54. if let Err(e) = run_extended_subscriptions(task_running.clone()).await {
  55. error!("运行 Extended 订阅任务失败: {:?}", e);
  56. task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志
  57. }
  58. });
  59. info!("主循环开始,等待退出信号...");
  60. // ---- 主循环 ----
  61. // 保持主线程活动,等待 running 标志变为 false
  62. while running.load(Ordering::Relaxed) {
  63. // 可以添加一些周期性检查或任务,但主要是等待
  64. tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  65. }
  66. info!("应用程序正在关闭...");
  67. // ---- 清理和关闭 ----
  68. // 等待订阅任务结束(如果它设计为可结束的话)
  69. info!("等待订阅任务完成...");
  70. // 可以给 subscription_handle 设置一个超时等待
  71. match tokio::time::timeout(tokio::time::Duration::from_secs(10), subscription_handle).await {
  72. Ok(Ok(_)) => info!("订阅任务正常结束。"),
  73. Ok(Err(e)) => error!("订阅任务返回错误: {:?}", e),
  74. Err(_) => warn!("等待订阅任务超时。"),
  75. }
  76. info!("应用程序已关闭。");
  77. }
  78. /// 运行 Extended 的主要订阅任务
  79. ///
  80. /// # Arguments
  81. /// * `running` - 用于控制程序是否继续运行的原子布尔值 (Arc 包裹)
  82. ///
  83. /// # Returns
  84. pub async fn run_extended_subscriptions(running: Arc<AtomicBool>) -> Result<()> {
  85. let market = "BTC-USD";
  86. let account = ExtendedAccount::new(
  87. "9ae4030902ab469a1bae8a90464e2e91",
  88. "0x71e16e49b717b851ced8347cf0dfa8f490bfb826323b9af624a66285dc99672",
  89. "0x47cdde8952945c13460f9129644eade096100810fba59de05452b34aacecff6",
  90. 220844,
  91. );
  92. let is_testnet = false;
  93. // 订阅数据的客户端
  94. let stream_client_list = vec![
  95. ExtendedStreamClient::best_prices(format!("ExtendedBestPrices_{}", market).as_str(), None, market, is_testnet),
  96. ExtendedStreamClient::account("ExtendedAccount", Some(account.clone()), is_testnet),
  97. ];
  98. // rest客户端
  99. let rest_client = ExtendedRestClient::new("ExtendedRestClient", Some(account), market, is_testnet).await?;
  100. let rest_client_am = Arc::new(Mutex::new(rest_client));
  101. // 数据管理及消息分发
  102. let data_manager = DataManager::new();
  103. let data_manager_am = Arc::new(Mutex::new(data_manager));
  104. // 策略执行
  105. let strategy = Strategy::new(rest_client_am.clone());
  106. let strategy_am = Arc::new(Mutex::new(strategy));
  107. // 异步去订阅、并阻塞
  108. for mut stream_client in stream_client_list {
  109. let running_clone = Arc::clone(&running);
  110. // 定义需要处理数据的fun
  111. let dm = data_manager_am.clone();
  112. let sm = strategy_am.clone();
  113. let fun = move |response: Response| {
  114. if response.code != 200 {
  115. error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
  116. }
  117. let dm_clone = Arc::clone(&dm);
  118. let sm_clone = Arc::clone(&sm);
  119. async move {
  120. // 数据不新鲜直接跳过
  121. if response.reach_time - response.received_time > 100 {
  122. return
  123. }
  124. let mut dm_guard = dm_clone.lock().await;
  125. // 记录消息延迟
  126. dm_guard.record_latency(response.received_time, response.reach_time);
  127. // 交给消息分发函数,并在此处消费掉错误消息
  128. if let Err(e) = dm_guard.dispatch_message(&response).await {
  129. warn!("消息分发过程中出现错误: {}", e);
  130. }
  131. // 随后执行策略
  132. let mut sm_guard = sm_clone.lock().await;
  133. if let Err(e) = sm_guard.do_strategy(&dm_guard).await {
  134. warn!("策略执行过程中出现错误: {}", e);
  135. }
  136. }
  137. };
  138. // 这个通道主要是为了后面给这个ws发送消息
  139. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  140. let write_tx_am = Arc::new(Mutex::new(write_tx));
  141. spawn(async move {
  142. // 链接
  143. stream_client.ws_connect_async(running_clone, fun, &write_tx_am, write_rx)
  144. .await
  145. .expect("ws链接失败");
  146. });
  147. }
  148. // // 网络延迟统计
  149. // let running_clone = Arc::clone(&running);
  150. // spawn(async move {
  151. // let mut interval = tokio::time::interval(Duration::from_secs(10));
  152. //
  153. // while running_clone.load(Ordering::SeqCst) {
  154. // interval.tick().await; // 等待下一个周期
  155. //
  156. // if !running_clone.load(Ordering::SeqCst) {
  157. // break;
  158. // }
  159. //
  160. // let mut total_delay_sum = 0i64;
  161. // let mut total_message_count = 0u64;
  162. //
  163. // // --- 第一步:收集 DataManager 的统计数据 ---
  164. // // 直接调用 DataManager 的方法
  165. // let manager_lock = data_manager_am.lock().await; // 锁定单个 DataManager
  166. // let (current_sum, current_count) = manager_lock.get_delay_stats(); // 使用原子读
  167. //
  168. // // 使用 saturating_add 防止聚合时溢出 (虽然 u64 很大,但好习惯)
  169. // total_delay_sum = total_delay_sum.saturating_add(current_sum);
  170. // total_message_count = total_message_count.saturating_add(current_count);
  171. //
  172. // manager_lock.reset_delay_stats(); // 调用重置方法 (使用原子写)
  173. //
  174. // // --- 第二步:计算并报告平均延迟 ---
  175. // if total_message_count > 0 {
  176. // let average_delay = total_delay_sum as f64 / total_message_count as f64;
  177. // info!(
  178. // "当前 WS 平均延迟: {:.2} 毫秒 (基于 {} 条消息测量)",
  179. // average_delay, total_message_count
  180. // );
  181. // } else {
  182. // info!("WS 延迟报告:本周期内未收到用于计算延迟的消息。");
  183. // }
  184. // }
  185. // info!("延迟报告任务已停止。");
  186. // });
  187. Ok(())
  188. }