data_manager.rs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. use std::collections::HashMap;
  2. use std::str::FromStr;
  3. use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
  4. use anyhow::{anyhow, bail, Result};
  5. use rust_decimal::Decimal;
  6. use serde_json::Value;
  7. use tracing::{warn};
  8. use crate::utils::response::Response;
  9. #[allow(dead_code)]
  10. pub struct DataManager {
  11. pub best_ask: Decimal,
  12. pub best_bid: Decimal,
  13. // position的数据结构如下,用下划线连接就可以取得position
  14. // {
  15. // "symbol1_side1":{
  16. // },
  17. // "symbol1_side2":{
  18. // },
  19. // ..
  20. // }
  21. pub extended_position: HashMap<String, Value>,
  22. pub extended_orders: HashMap<String, Value>,
  23. pub extended_balance: Value,
  24. pub delay_total: AtomicI64,
  25. pub delay_count: AtomicU64,
  26. }
  27. #[allow(dead_code)]
  28. impl DataManager {
  29. pub fn new() -> Self {
  30. DataManager {
  31. best_ask: Default::default(),
  32. best_bid: Default::default(),
  33. extended_position: HashMap::new(),
  34. extended_balance: Default::default(),
  35. extended_orders: Default::default(),
  36. delay_total: AtomicI64::new(0),
  37. delay_count: AtomicU64::new(0),
  38. }
  39. }
  40. pub fn record_latency(&self, received_at: i64, origin_timestamp: i64) {
  41. if let Some(delay) = received_at.checked_sub(origin_timestamp) {
  42. self.delay_total.fetch_add(delay, Ordering::Relaxed); // 原子加
  43. self.delay_count.fetch_add(1, Ordering::Relaxed); // 原子加
  44. } else {
  45. warn!("时间戳计算出现问题: received_at={}, origin_timestamp={}", received_at, origin_timestamp);
  46. }
  47. }
  48. // 获取当前的统计数据
  49. pub fn get_delay_stats(&self) -> (i64, u64) {
  50. let total = self.delay_total.load(Ordering::Relaxed);
  51. let count = self.delay_count.load(Ordering::Relaxed);
  52. (total, count)
  53. }
  54. // 重置统计数据 -> 这个是关键!
  55. pub fn reset_delay_stats(&self) {
  56. self.delay_total.store(0, Ordering::Relaxed); // 原子写
  57. self.delay_count.store(0, Ordering::Relaxed); // 原子写
  58. }
  59. pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> {
  60. // 1. 预解析为通用的 Value
  61. let v = response.data.clone();
  62. // 2. 获取 topic_info 字段用于路由消息,在该策略中extended可以用label
  63. let topic_info = &response.label;
  64. // 3. 根据 topic_info 的内容进行分发 (match)
  65. if topic_info.contains("ExtendedBestPrices") {
  66. // 数据不新鲜直接跳过
  67. if response.reach_time - response.received_time > 100 {
  68. return Ok(());
  69. }
  70. self.process_best_prices(&v).await?;
  71. } else if topic_info.contains("ExtendedAccount") {
  72. self.process_account(&v).await?;
  73. } else {
  74. // 如果是未知的 topic,返回一个错误
  75. bail!("Received a message with an unknown topic_info: {}, value: \n {}", topic_info, serde_json::to_string_pretty(&v)?);
  76. }
  77. Ok(())
  78. }
  79. pub async fn process_best_prices(&mut self, value: &Value) -> Result<()> {
  80. // 预先捕获整个 Value 的字符串表示,用于错误报告
  81. let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
  82. // 尝试获取 data 字段
  83. let data = value.get("data")
  84. .ok_or_else(|| anyhow!("获取 'data' 字段失败,原始 JSON: {}", value_str))?;
  85. // 尝试从 data 中获取 "a" (asks) 数组
  86. let asks_array = data.get("a")
  87. .and_then(|v| v.as_array()) // and_then 链式调用,确保只有当 v 存在且是数组时才继续
  88. .ok_or_else(|| anyhow!("获取 'data.a' 数组失败,原始 JSON: {}", value_str))?;
  89. // 尝试从 data 中获取 "b" (bids) 数组
  90. let bids_array = data.get("b")
  91. .and_then(|v| v.as_array())
  92. .ok_or_else(|| anyhow!("获取 'data.b' 数组失败,原始 JSON: {}", value_str))?;
  93. // 如若有发送asks信息
  94. if asks_array.len() > 0 {
  95. let ask_item = &asks_array[0];
  96. let p = ask_item.get("p")
  97. .and_then(|v| v.as_str())
  98. .ok_or_else(|| anyhow!("获取 'data.a.p' 字符串失败,原始 JSON: {}", value_str))?;
  99. self.best_ask = Decimal::from_str(p)
  100. .map_err(|e| anyhow!("将价格字符串 '{}' 解析为 Decimal 失败: {},原始 JSON: {}", p, e, value_str))?;
  101. }
  102. // 如若有发送bids信息
  103. if bids_array.len() > 0 {
  104. let bid_item = &bids_array[0];
  105. let p = bid_item.get("p")
  106. .and_then(|v| v.as_str())
  107. .ok_or_else(|| anyhow!("获取 'data.b.p' 字符串失败,原始 JSON: {}", value_str))?;
  108. self.best_bid = Decimal::from_str(p)
  109. .map_err(|e| anyhow!("将价格字符串 '{}' 解析为 Decimal 失败: {},原始 JSON: {}", p, e, value_str))?;
  110. }
  111. Ok(())
  112. }
  113. pub async fn process_account(&mut self, value: &Value) -> Result<()> {
  114. // 预先捕获整个 Value 的字符串表示,用于错误报告
  115. let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
  116. // 获取 data 字段
  117. let data = value.get("data")
  118. .ok_or_else(|| anyhow!("获取 'data' 字段失败,原始 JSON: {}", value_str))?;
  119. // 获取type字段,用于解析归类
  120. let d_type = value.get("type")
  121. .and_then(|v| v.as_str())
  122. .ok_or_else(|| anyhow!("获取 'type' 字符串失败,原始 JSON: {}", value_str))?;
  123. match d_type {
  124. "BALANCE" => {
  125. let balance = data.get("balance")
  126. .ok_or_else(|| anyhow!("获取 'data.balance' 字段失败,原始 JSON: {}", value_str))?;
  127. self.extended_balance = balance.clone();
  128. }
  129. "ORDER" => {
  130. let orders = data.get("orders")
  131. .and_then(|v| v.as_array())
  132. .ok_or_else(|| anyhow!("获取 'data.orders' 字段失败,原始 JSON: {}", value_str))?;
  133. // 遍历order
  134. for order in orders {
  135. let id = order.get("id")
  136. .and_then(|v| v.as_i64())
  137. .ok_or_else(|| anyhow!("遍历获取 'order.id' 时失败,原始JSON:{}", value_str))?;
  138. // 放入map中
  139. self.extended_orders.insert(id.to_string(), order.clone());
  140. }
  141. // TODO 已完成订单,并且60分钟以上的要清库
  142. }
  143. "POSITION" => {
  144. let positions = data.get("positions")
  145. .and_then(|v| v.as_array())
  146. .ok_or_else(|| anyhow!("获取 'data.positions' 字段失败,原始 JSON: {}", value_str))?;
  147. // 将map清空,position只保留最新信息
  148. self.extended_position.clear();
  149. // 将position装入对应的map中
  150. for position in positions {
  151. let status = position.get("status")
  152. .and_then(|v| v.as_str())
  153. .ok_or_else(|| anyhow!("获取 'position.status' 字符串失败,原始 JSON: {}", value_str))?;
  154. // 只保留当前的有效持仓
  155. if status != "OPENED" {
  156. continue;
  157. }
  158. let market = position.get("market")
  159. .and_then(|v| v.as_str())
  160. .ok_or_else(|| anyhow!("获取 'position.market' 字符串失败,原始 JSON: {}", value_str))?;
  161. let side = position.get("side")
  162. .and_then(|v| v.as_str())
  163. .ok_or_else(|| anyhow!("获取 'position.side' 字符串失败,原始 JSON: {}", value_str))?;
  164. let key = format!("{}_{}", market, side);
  165. // 将有效持仓放入position中
  166. self.extended_position.insert(key, position.clone());
  167. }
  168. }
  169. "TRADE" => {}
  170. &_ => {
  171. bail!("未知类型的 'type' 原始 JSON: {}", value_str);
  172. }
  173. }
  174. Ok(())
  175. }
  176. }