Bläddra i källkod

网络层架构整理完毕(直到消息分发)

skyfffire 3 veckor sedan
förälder
incheckning
d0456e0006
7 ändrade filer med 293 tillägg och 23 borttagningar
  1. 82 0
      src/data_manager.rs
  2. 11 10
      src/exchange/extended_stream_client.rs
  3. 2 2
      src/exchange/mod.rs
  4. 172 1
      src/main.rs
  5. 16 0
      src/strategy.rs
  6. 6 6
      src/utils/response.rs
  7. 4 4
      src/utils/stream_utils.rs

+ 82 - 0
src/data_manager.rs

@@ -0,0 +1,82 @@
+use std::cmp::Reverse;
+use std::collections::{BTreeMap, HashMap};
+use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
+use rust_decimal::Decimal;
+use serde_json::{from_value, Value};
+use anyhow::{bail, Context, Result};
+use serde::{Deserialize, Serialize};
+use tracing::{info, warn};
+use crate::utils::response::Response;
+
+pub struct DataManager {
+    pub asks_map: HashMap<String, BTreeMap<Decimal, Decimal>>,
+    pub bids_map: HashMap<String, BTreeMap<Reverse<Decimal>, Decimal>>,
+
+    pub delay_total: AtomicI64,
+    pub delay_count: AtomicU64,
+}
+
+impl DataManager {
+    pub fn new() -> Self {
+        let asks_map: HashMap<String, BTreeMap<Decimal, Decimal>> = HashMap::new();
+        let bids_map: HashMap<String, BTreeMap<Reverse<Decimal>, Decimal>> = HashMap::new();
+
+        DataManager {
+            asks_map,
+            bids_map,
+            delay_total: AtomicI64::new(0),
+            delay_count: AtomicU64::new(0),
+        }
+    }
+
+    pub fn record_latency(&self, received_at: i64, origin_timestamp: i64) {
+        if let Some(delay) = received_at.checked_sub(origin_timestamp) {
+            self.delay_total.fetch_add(delay, Ordering::Relaxed);       // 原子加
+            self.delay_count.fetch_add(1, Ordering::Relaxed);       // 原子加
+        } else {
+            warn!("时间戳计算出现问题: received_at={}, origin_timestamp={}", received_at, origin_timestamp);
+        }
+    }
+
+    // 获取当前的统计数据
+    pub fn get_delay_stats(&self) -> (i64, u64) {
+        let total = self.delay_total.load(Ordering::Relaxed);
+        let count = self.delay_count.load(Ordering::Relaxed);
+        (total, count)
+    }
+
+    // 重置统计数据 -> 这个是关键!
+    pub fn reset_delay_stats(&self) {
+        self.delay_total.store(0, Ordering::Relaxed); // 原子写
+        self.delay_count.store(0, Ordering::Relaxed); // 原子写
+    }
+    
+    pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> {
+        // // 1. 预解析为通用的 Value
+        // let v = response.data.clone();
+        // 
+        // info!("准备分发的消息:{}", serde_json::to_string_pretty(&v)?);
+
+        // 2. 获取 topic_info 字段用于路由
+        // and_then 确保了 get 返回 Some 时才调用 as_str
+        // context 在任何一步失败时提供错误信息 (字段不存在,或不是字符串)
+        // let topic_info = v
+        //     .get("topic_info")
+        //     .and_then(Value::as_str)
+        //     .context("Message is missing 'topic_info' field or it's not a string")?;
+        //
+        // // 3. 根据 topic_info 的内容进行分发 (match)
+        // if topic_info.contains("spot@public.kline.v3.api.pb") {
+        //     // 如果是K线数据,调用 process_kline
+        //     self.process_klines(&v).await?;
+        // } else if topic_info.contains("spot@public.aggre.depth.v3.api.pb") {
+        //     // 如果是增量深度数据,调用 process_depth_update
+        //     self.process_depth_update(&v).await?;
+        // } else {
+        //     // 如果是未知的 topic,返回一个错误
+        //     bail!("Received a message with an unknown topic_info: {}", topic_info);
+        // }
+
+        Ok(())
+    }
+}

+ 11 - 10
src/exchange/extended_stream_client.rs

@@ -7,9 +7,11 @@ use serde_json::json;
 use serde_json::Value;
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{http, Message};
-use tracing::{error, trace, warn};
+use tracing::{error, info, trace, warn};
 use anyhow::Result;
+use chrono::Utc;
 use tokio_tungstenite::tungstenite::handshake::client::{generate_key, Request};
+use tracing_subscriber::fmt::format::json;
 use crate::exchange::extended_account::ExtendedAccount;
 use crate::utils::response::Response;
 use crate::utils::stream_utils::{StreamUtils, HeartbeatType};
@@ -126,16 +128,15 @@ impl ExtendedStreamClient {
     pub fn message_text(text: String) -> Option<Response> {
         let mut res_data = Response::new("".to_string(), -201, "success".to_string(), Value::Null);
         let json_value: Value = serde_json::from_str(&text).unwrap();
+        
+        // info!("等待解析:{}", serde_json::to_string_pretty(&json_value).unwrap());
 
-        match json_value["msg"].as_str() {
-            Some(msg) => {
-                res_data.message = json_value["msg"].to_string();
-
-                if msg.contains("Not Subscribed successfully!") {
-                    res_data.code = 500
-                } else {
-                    res_data.channel = json_value["msg"].to_string();
-                }
+        match json_value["ts"].as_i64() {
+            Some(ts) => {
+                res_data.reach_time = ts;
+                res_data.received_time = Utc::now().timestamp_millis();
+                res_data.code = 200;
+                res_data.data = json_value.clone();
             }
             None => {
                 res_data.data = json_value.clone();

+ 2 - 2
src/exchange/mod.rs

@@ -1,2 +1,2 @@
-mod extended_stream_client;
-mod extended_account;
+pub mod extended_stream_client;
+pub mod extended_account;

+ 172 - 1
src/main.rs

@@ -1,11 +1,182 @@
 mod utils;
 mod exchange;
+mod strategy;
+mod data_manager;
 
+use anyhow::Result;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use backtrace::Backtrace;
+use tokio::spawn;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::Message;
+use tracing::{error, info, warn};
 use utils::log_setup;
+use crate::data_manager::DataManager;
+use crate::exchange::extended_stream_client::ExtendedStreamClient;
+use crate::utils::response::Response;
 
 #[tokio::main]
 async fn main() {
     let _guards = log_setup::setup_logging().unwrap();
 
-    tracing::info!("Hello, world!");
+    // 主进程控制
+    let running = Arc::new(AtomicBool::new(true));
+
+    // panic错误捕获,panic级别的错误直接退出
+    let panic_running = running.clone();
+    std::panic::set_hook(Box::new(move |panic_info| {
+        let msg = format!(
+            "type=panic, msg={:?}, location={:?}",
+            panic_info.to_string(),
+            panic_info.location()
+        );
+
+        // 生成并格式化完整的堆栈跟踪
+        let backtrace = Backtrace::new();
+        let stack_trace = format!("{:?}", backtrace);
+
+        // 一并打印堆栈跟踪
+        warn!("{}\nStack Trace:\n{}", msg, stack_trace);
+        panic_running.store(false, Ordering::Relaxed);
+    }));
+
+    // ---- 优雅停机处理 (示例: SIGINT/Ctrl+C) ----
+    //注意:Windows上可能不支持所有信号,SIGINT通常可用
+    let r = running.clone(); // 克隆 Arc 用于 SIGHUP/SIGTERM/SIGINT 处理
+    tokio::spawn(async move {
+        tokio::signal::ctrl_c().await.expect("设置 Ctrl+C 处理器失败");
+        warn!("接收到退出信号 (Ctrl+C)... 开始关闭.");
+        r.store(false, Ordering::Relaxed);
+    });
+
+    // ---- 运行核心订阅逻辑 ----
+    info!("==================================== 应用程序启动 =======================================");
+    let task_running = running.clone();
+    // 启动一个后台任务来执行订阅和数据处理
+    let subscription_handle = tokio::spawn(async move {
+        // 运行获取交易对和订阅 K 线的函数
+        if let Err(e) = run_extended_subscriptions(task_running.clone()).await {
+            error!("运行 Extended 订阅任务失败: {:?}", e);
+            task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志
+        }
+    });
+    info!("主循环开始,等待退出信号...");
+    // ---- 主循环 ----
+    // 保持主线程活动,等待 running 标志变为 false
+    while running.load(Ordering::Relaxed) {
+        // 可以添加一些周期性检查或任务,但主要是等待
+        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+    }
+
+    info!("应用程序正在关闭...");
+
+    // ---- 清理和关闭 ----
+    // 等待订阅任务结束(如果它设计为可结束的话)
+    info!("等待订阅任务完成...");
+    // 可以给 subscription_handle 设置一个超时等待
+    match tokio::time::timeout(tokio::time::Duration::from_secs(10), subscription_handle).await {
+        Ok(Ok(_)) => info!("订阅任务正常结束。"),
+        Ok(Err(e)) => error!("订阅任务返回错误: {:?}", e),
+        Err(_) => warn!("等待订阅任务超时。"),
+    }
+
+    info!("应用程序已关闭。");
+}
+
+/// 运行 Extended 的主要订阅任务
+///
+/// # Arguments
+/// * `running` - 用于控制程序是否继续运行的原子布尔值 (Arc 包裹)
+///
+/// # Returns
+pub async fn run_extended_subscriptions(running: Arc<AtomicBool>) -> Result<()> {
+    let stream_client_list = vec![
+        ExtendedStreamClient::order_books("ExtendedOrderBooks".to_string(), None, "BTC-USD".to_string())
+    ];
+
+    // 数据管理及消息分发
+    let data_manager = DataManager::new();
+    let data_manager_am = Arc::new(Mutex::new(data_manager));
+
+    // 异步去订阅、并阻塞
+    for mut stream_client in stream_client_list {
+        let running_clone = Arc::clone(&running);
+
+        // 定义需要处理数据的fun
+        let dm = data_manager_am.clone();
+        let fun = move |response: Response| {
+            if response.code != 200 {
+                error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
+
+                panic!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
+            }
+
+            // info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
+            let dm_clone = Arc::clone(&dm);
+            async move {
+                let mut dm_guard = dm_clone.lock().await;
+                
+                // 记录消息延迟
+                dm_guard.record_latency(response.received_time, response.reach_time);
+
+                // 交给消息分发函数
+                dm_guard.dispatch_message(&response).await.unwrap();
+            }
+        };
+
+        // 这个通道主要是为了后面给这个ws发送消息
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+
+        spawn(async move {
+            // 链接
+            stream_client.ws_connect_async(running_clone, fun, &write_tx_am, write_rx)
+                .await
+                .expect("ws链接失败");
+        });
+    }
+
+    // // 网络延迟统计
+    // let running_clone = Arc::clone(&running);
+    // spawn(async move {
+    //     let mut interval = tokio::time::interval(Duration::from_secs(10));
+    //
+    //     while running_clone.load(Ordering::SeqCst) {
+    //         interval.tick().await; // 等待下一个周期
+    //
+    //         if !running_clone.load(Ordering::SeqCst) {
+    //             break;
+    //         }
+    //
+    //         let mut total_delay_sum = 0i64;
+    //         let mut total_message_count = 0u64;
+    //
+    //         // --- 第一步:收集 DataManager 的统计数据 ---
+    //         // 直接调用 DataManager 的方法
+    //         let manager_lock = data_manager_am.lock().await; // 锁定单个 DataManager
+    //         let (current_sum, current_count) = manager_lock.get_delay_stats(); // 使用原子读
+    //
+    //         // 使用 saturating_add 防止聚合时溢出 (虽然 u64 很大,但好习惯)
+    //         total_delay_sum = total_delay_sum.saturating_add(current_sum);
+    //         total_message_count = total_message_count.saturating_add(current_count);
+    //
+    //         manager_lock.reset_delay_stats(); // 调用重置方法 (使用原子写)
+    //
+    //         // --- 第二步:计算并报告平均延迟 ---
+    //         if total_message_count > 0 {
+    //             let average_delay = total_delay_sum as f64 / total_message_count as f64;
+    //             info!(
+    //                     "当前 WS 平均延迟: {:.2} 毫秒 (基于 {} 条消息测量)",
+    //                     average_delay, total_message_count
+    //                 );
+    //         } else {
+    //             info!("WS 延迟报告:本周期内未收到用于计算延迟的消息。");
+    //         }
+    //     }
+    //     info!("延迟报告任务已停止。");
+    // });
+
+    Ok(())
 }

+ 16 - 0
src/strategy.rs

@@ -0,0 +1,16 @@
+use serde_json::Value;
+use tracing::info;
+
+struct Strategy {
+    
+}
+
+impl Strategy {
+    pub fn new() -> Strategy {
+        Strategy {}
+    }
+    
+    pub fn on_message(message: &Value) {
+        info!("{}", message);
+    }
+}

+ 6 - 6
src/utils/response.rs

@@ -9,8 +9,8 @@ pub struct Response {
     pub message: String,
     pub channel: String,
     pub data: Value,
-    pub ins: Instant,           // 数据接收的ins
-    pub time: i64,              // 数据接受的时间
+    pub instant: Instant,       // 数据接收的instant,记录到各个模块的时间消耗
+    pub received_time: i64,     // 数据接受的时间
     pub reach_time: i64,        // 远程数据时间
     pub data_type: String       // 數據類型, 例如 bybit 深度信息:snapshot(全量),delta(增量)
 }
@@ -23,10 +23,10 @@ impl Response {
             message,
             data,
             channel: "".to_string(),
-            time: 0,
+            received_time: 0,
             reach_time: 0,
             data_type: String::new(),
-            ins: Instant::now(),
+            instant: Instant::now(),
         }
     }
     pub fn error(label: String, message: String) -> Response {
@@ -36,10 +36,10 @@ impl Response {
             message: format!("{}", &message),
             data: Value::Null,
             channel: "".to_string(),
-            time: 0,
+            received_time: 0,
             reach_time: 0,
             data_type: String::new(),
-            ins: Instant::now(),
+            instant: Instant::now(),
         }
     }
 

+ 4 - 4
src/utils/stream_utils.rs

@@ -87,8 +87,8 @@ impl StreamUtils {
 
                     if code == 200 {
                         let mut data_c = data.clone();
-                        data_c.ins = Instant::now();
-                        data_c.time = Utc::now().timestamp_millis();
+                        data_c.instant = Instant::now();
+                        data_c.received_time = Utc::now().timestamp_millis();
 
                         handle_function(data_c).await;
                     }
@@ -104,8 +104,8 @@ impl StreamUtils {
                     match code {
                         200 => {
                             let mut data_c = data.clone();
-                            data_c.ins = Instant::now();
-                            data_c.time = Utc::now().timestamp_millis();
+                            data_c.instant = Instant::now();
+                            data_c.received_time = Utc::now().timestamp_millis();
 
                             handle_function(data_c).await;
                         }