use std::collections::HashMap; use std::str::FromStr; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use anyhow::{anyhow, bail, Result}; use rust_decimal::Decimal; use serde_json::Value; use tracing::{warn}; use crate::utils::response::Response; #[allow(dead_code)] pub struct DataManager { pub best_ask: Decimal, pub best_bid: Decimal, // position的数据结构如下,用下划线连接就可以取得position // { // "symbol1_side1":{ // }, // "symbol1_side2":{ // }, // .. // } pub extended_position: HashMap, pub extended_orders: HashMap, pub extended_balance: Value, pub delay_total: AtomicI64, pub delay_count: AtomicU64, } #[allow(dead_code)] impl DataManager { pub fn new() -> Self { DataManager { best_ask: Default::default(), best_bid: Default::default(), extended_position: HashMap::new(), extended_balance: Default::default(), extended_orders: Default::default(), delay_total: AtomicI64::new(0), delay_count: AtomicU64::new(0), } } pub fn record_latency(&self, received_at: i64, origin_timestamp: i64) { if let Some(delay) = received_at.checked_sub(origin_timestamp) { self.delay_total.fetch_add(delay, Ordering::Relaxed); // 原子加 self.delay_count.fetch_add(1, Ordering::Relaxed); // 原子加 } else { warn!("时间戳计算出现问题: received_at={}, origin_timestamp={}", received_at, origin_timestamp); } } // 获取当前的统计数据 pub fn get_delay_stats(&self) -> (i64, u64) { let total = self.delay_total.load(Ordering::Relaxed); let count = self.delay_count.load(Ordering::Relaxed); (total, count) } // 重置统计数据 -> 这个是关键! pub fn reset_delay_stats(&self) { self.delay_total.store(0, Ordering::Relaxed); // 原子写 self.delay_count.store(0, Ordering::Relaxed); // 原子写 } pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> { // 1. 预解析为通用的 Value let v = response.data.clone(); // 2. 获取 topic_info 字段用于路由消息,在该策略中extended可以用label let topic_info = &response.label; // 3. 根据 topic_info 的内容进行分发 (match) if topic_info.contains("ExtendedBestPrices") { // 数据不新鲜直接跳过 if response.reach_time - response.received_time > 100 { return Ok(()); } self.process_best_prices(&v).await?; } else if topic_info.contains("ExtendedAccount") { self.process_account(&v).await?; } else { // 如果是未知的 topic,返回一个错误 bail!("Received a message with an unknown topic_info: {}, value: \n {}", topic_info, serde_json::to_string_pretty(&v)?); } Ok(()) } pub async fn process_best_prices(&mut self, value: &Value) -> Result<()> { // 预先捕获整个 Value 的字符串表示,用于错误报告 let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string()); // 尝试获取 data 字段 let data = value.get("data") .ok_or_else(|| anyhow!("获取 'data' 字段失败,原始 JSON: {}", value_str))?; // 尝试从 data 中获取 "a" (asks) 数组 let asks_array = data.get("a") .and_then(|v| v.as_array()) // and_then 链式调用,确保只有当 v 存在且是数组时才继续 .ok_or_else(|| anyhow!("获取 'data.a' 数组失败,原始 JSON: {}", value_str))?; // 尝试从 data 中获取 "b" (bids) 数组 let bids_array = data.get("b") .and_then(|v| v.as_array()) .ok_or_else(|| anyhow!("获取 'data.b' 数组失败,原始 JSON: {}", value_str))?; // 如若有发送asks信息 if asks_array.len() > 0 { let ask_item = &asks_array[0]; let p = ask_item.get("p") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow!("获取 'data.a.p' 字符串失败,原始 JSON: {}", value_str))?; self.best_ask = Decimal::from_str(p) .map_err(|e| anyhow!("将价格字符串 '{}' 解析为 Decimal 失败: {},原始 JSON: {}", p, e, value_str))?; } // 如若有发送bids信息 if bids_array.len() > 0 { let bid_item = &bids_array[0]; let p = bid_item.get("p") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow!("获取 'data.b.p' 字符串失败,原始 JSON: {}", value_str))?; self.best_bid = Decimal::from_str(p) .map_err(|e| anyhow!("将价格字符串 '{}' 解析为 Decimal 失败: {},原始 JSON: {}", p, e, value_str))?; } Ok(()) } pub async fn process_account(&mut self, value: &Value) -> Result<()> { // 预先捕获整个 Value 的字符串表示,用于错误报告 let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string()); // 获取 data 字段 let data = value.get("data") .ok_or_else(|| anyhow!("获取 'data' 字段失败,原始 JSON: {}", value_str))?; // 获取type字段,用于解析归类 let d_type = value.get("type") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow!("获取 'type' 字符串失败,原始 JSON: {}", value_str))?; match d_type { "BALANCE" => { let balance = data.get("balance") .ok_or_else(|| anyhow!("获取 'data.balance' 字段失败,原始 JSON: {}", value_str))?; self.extended_balance = balance.clone(); } "ORDER" => { let orders = data.get("orders") .and_then(|v| v.as_array()) .ok_or_else(|| anyhow!("获取 'data.orders' 字段失败,原始 JSON: {}", value_str))?; // 遍历order for order in orders { let id = order.get("id") .and_then(|v| v.as_i64()) .ok_or_else(|| anyhow!("遍历获取 'order.id' 时失败,原始JSON:{}", value_str))?; // 放入map中 self.extended_orders.insert(id.to_string(), order.clone()); } // TODO 已完成订单,并且60分钟以上的要清库 } "POSITION" => { let positions = data.get("positions") .and_then(|v| v.as_array()) .ok_or_else(|| anyhow!("获取 'data.positions' 字段失败,原始 JSON: {}", value_str))?; // 将map清空,position只保留最新信息 self.extended_position.clear(); // 将position装入对应的map中 for position in positions { let status = position.get("status") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow!("获取 'position.status' 字符串失败,原始 JSON: {}", value_str))?; // 只保留当前的有效持仓 if status != "OPENED" { continue; } let market = position.get("market") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow!("获取 'position.market' 字符串失败,原始 JSON: {}", value_str))?; let side = position.get("side") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow!("获取 'position.side' 字符串失败,原始 JSON: {}", value_str))?; let key = format!("{}_{}", market, side); // 将有效持仓放入position中 self.extended_position.insert(key, position.clone()); } } "TRADE" => {} &_ => { bail!("未知类型的 'type' 原始 JSON: {}", value_str); } } Ok(()) } }