data_manager.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. use std::str::FromStr;
  2. use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
  3. use anyhow::{anyhow, bail, Result};
  4. use rust_decimal::Decimal;
  5. use serde_json::Value;
  6. use tracing::{info, warn};
  7. use crate::utils::response::Response;
  8. pub struct DataManager {
  9. pub best_ask: Decimal,
  10. pub best_bid: Decimal,
  11. pub delay_total: AtomicI64,
  12. pub delay_count: AtomicU64,
  13. }
  14. impl DataManager {
  15. pub fn new() -> Self {
  16. DataManager {
  17. best_ask: Default::default(),
  18. best_bid: Default::default(),
  19. delay_total: AtomicI64::new(0),
  20. delay_count: AtomicU64::new(0),
  21. }
  22. }
  23. pub fn record_latency(&self, received_at: i64, origin_timestamp: i64) {
  24. if let Some(delay) = received_at.checked_sub(origin_timestamp) {
  25. self.delay_total.fetch_add(delay, Ordering::Relaxed); // 原子加
  26. self.delay_count.fetch_add(1, Ordering::Relaxed); // 原子加
  27. } else {
  28. warn!("时间戳计算出现问题: received_at={}, origin_timestamp={}", received_at, origin_timestamp);
  29. }
  30. }
  31. // 获取当前的统计数据
  32. pub fn get_delay_stats(&self) -> (i64, u64) {
  33. let total = self.delay_total.load(Ordering::Relaxed);
  34. let count = self.delay_count.load(Ordering::Relaxed);
  35. (total, count)
  36. }
  37. // 重置统计数据 -> 这个是关键!
  38. pub fn reset_delay_stats(&self) {
  39. self.delay_total.store(0, Ordering::Relaxed); // 原子写
  40. self.delay_count.store(0, Ordering::Relaxed); // 原子写
  41. }
  42. pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> {
  43. // 1. 预解析为通用的 Value
  44. let v = response.data.clone();
  45. // info!("准备分发的消息:{}, {}", serde_json::to_string_pretty(&v)?, response.label);
  46. // 2. 获取 topic_info 字段用于路由消息,在该策略中extended可以用label
  47. let topic_info = &response.label;
  48. // 3. 根据 topic_info 的内容进行分发 (match)
  49. if topic_info.contains("ExtendedBestPrices") {
  50. self.process_best_prices(&v).await?;
  51. } else if topic_info.contains("spot@public.aggre.depth.v3.api.pb") {
  52. } else {
  53. // 如果是未知的 topic,返回一个错误
  54. bail!("Received a message with an unknown topic_info: {}", topic_info);
  55. }
  56. Ok(())
  57. }
  58. pub async fn process_best_prices(&mut self, value: &Value) -> Result<()> {
  59. // 预先捕获整个 Value 的字符串表示,用于错误报告
  60. let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
  61. // 尝试获取 data 字段
  62. let data = value.get("data")
  63. .ok_or_else(|| anyhow!("获取 'data' 字段失败,原始 JSON: {}", value_str))?;
  64. // 尝试从 data 中获取 "a" (asks) 数组
  65. let asks_array = data.get("a")
  66. .and_then(|v| v.as_array()) // and_then 链式调用,确保只有当 v 存在且是数组时才继续
  67. .ok_or_else(|| anyhow!("获取 'data.a' 数组失败,原始 JSON: {}", value_str))?;
  68. // 尝试从 data 中获取 "b" (bids) 数组
  69. let bids_array = data.get("b")
  70. .and_then(|v| v.as_array())
  71. .ok_or_else(|| anyhow!("获取 'data.b' 数组失败,原始 JSON: {}", value_str))?;
  72. // 如若有发送asks信息
  73. if asks_array.len() > 0 {
  74. let ask_item = &asks_array[0];
  75. let p = ask_item.get("p")
  76. .and_then(|v| v.as_str())
  77. .ok_or_else(|| anyhow!("获取 'data.a.p' 字符串失败,原始 JSON: {}", value_str))?;
  78. self.best_ask = Decimal::from_str(p)
  79. .map_err(|e| anyhow!("将价格字符串 '{}' 解析为 Decimal 失败: {},原始 JSON: {}", p, e, value_str))?;
  80. }
  81. // 如若有发送bids信息
  82. if bids_array.len() > 0 {
  83. let bid_item = &bids_array[0];
  84. let p = bid_item.get("p")
  85. .and_then(|v| v.as_str())
  86. .ok_or_else(|| anyhow!("获取 'data.b.p' 字符串失败,原始 JSON: {}", value_str))?;
  87. self.best_bid = Decimal::from_str(p)
  88. .map_err(|e| anyhow!("将价格字符串 '{}' 解析为 Decimal 失败: {},原始 JSON: {}", p, e, value_str))?;
  89. }
  90. info!("{}, {}", self.best_ask, self.best_bid);
  91. Ok(())
  92. }
  93. }