|
|
@@ -1,3 +1,4 @@
|
|
|
+use std::collections::HashMap;
|
|
|
use std::str::FromStr;
|
|
|
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
|
|
use anyhow::{anyhow, bail, Result};
|
|
|
@@ -11,6 +12,18 @@ pub struct DataManager {
|
|
|
pub best_ask: Decimal,
|
|
|
pub best_bid: Decimal,
|
|
|
|
|
|
+ // position的数据结构如下,用下划线连接就可以取得position
|
|
|
+ // {
|
|
|
+ // "symbol1_side1":{
|
|
|
+ // },
|
|
|
+ // "symbol1_side2":{
|
|
|
+ // },
|
|
|
+ // ..
|
|
|
+ // }
|
|
|
+ pub extended_position: HashMap<String, Value>,
|
|
|
+ pub extended_orders: HashMap<String, Value>,
|
|
|
+ pub extended_balance: Value,
|
|
|
+
|
|
|
pub delay_total: AtomicI64,
|
|
|
pub delay_count: AtomicU64,
|
|
|
}
|
|
|
@@ -21,7 +34,11 @@ impl DataManager {
|
|
|
DataManager {
|
|
|
best_ask: Default::default(),
|
|
|
best_bid: Default::default(),
|
|
|
-
|
|
|
+
|
|
|
+ extended_position: HashMap::new(),
|
|
|
+ extended_balance: Default::default(),
|
|
|
+ extended_orders: Default::default(),
|
|
|
+
|
|
|
delay_total: AtomicI64::new(0),
|
|
|
delay_count: AtomicU64::new(0),
|
|
|
}
|
|
|
@@ -59,9 +76,9 @@ impl DataManager {
|
|
|
// 3. 根据 topic_info 的内容进行分发 (match)
|
|
|
if topic_info.contains("ExtendedBestPrices") {
|
|
|
self.process_best_prices(&v).await?;
|
|
|
- } else if topic_info.contains("spot@public.aggre.depth.v3.api.pb") {
|
|
|
-
|
|
|
- } else {
|
|
|
+ } else if topic_info.contains("ExtendedAccount") {
|
|
|
+ self.process_account(&v).await?;
|
|
|
+ } else {
|
|
|
// 如果是未知的 topic,返回一个错误
|
|
|
bail!("Received a message with an unknown topic_info: {}, value: \n {}", topic_info, serde_json::to_string_pretty(&v)?);
|
|
|
}
|
|
|
@@ -111,4 +128,82 @@ impl DataManager {
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
+
|
|
|
+ pub async fn process_account(&mut self, value: &Value) -> Result<()> {
|
|
|
+ // 预先捕获整个 Value 的字符串表示,用于错误报告
|
|
|
+ let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
|
|
|
+
|
|
|
+ // 获取 data 字段
|
|
|
+ let data = value.get("data")
|
|
|
+ .ok_or_else(|| anyhow!("获取 'data' 字段失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ // 获取type字段,用于解析归类
|
|
|
+ let d_type = value.get("type")
|
|
|
+ .and_then(|v| v.as_str())
|
|
|
+ .ok_or_else(|| anyhow!("获取 'type' 字符串失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ match d_type {
|
|
|
+ "BALANCE" => {
|
|
|
+ let balance = data.get("balance")
|
|
|
+ .ok_or_else(|| anyhow!("获取 'data.balance' 字段失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ self.extended_balance = balance.clone();
|
|
|
+ }
|
|
|
+ "ORDER" => {
|
|
|
+ let orders = data.get("orders")
|
|
|
+ .and_then(|v| v.as_array())
|
|
|
+ .ok_or_else(|| anyhow!("获取 'data.orders' 字段失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ // 遍历order
|
|
|
+ for order in orders {
|
|
|
+ let id = order.get("id")
|
|
|
+ .and_then(|v| v.as_i64())
|
|
|
+ .ok_or_else(|| anyhow!("遍历获取 'order.id' 时失败,原始JSON:{}", value_str))?;
|
|
|
+ // 放入map中
|
|
|
+ self.extended_orders.insert(id.to_string(), order.clone());
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO 已完成订单,并且60分钟以上的要清库
|
|
|
+ }
|
|
|
+ "POSITION" => {
|
|
|
+ let positions = data.get("positions")
|
|
|
+ .and_then(|v| v.as_array())
|
|
|
+ .ok_or_else(|| anyhow!("获取 'data.positions' 字段失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ // 将map清空,position只保留最新信息
|
|
|
+ self.extended_position.clear();
|
|
|
+
|
|
|
+ // 将position装入对应的map中
|
|
|
+ for position in positions {
|
|
|
+ let status = position.get("status")
|
|
|
+ .and_then(|v| v.as_str())
|
|
|
+ .ok_or_else(|| anyhow!("获取 'position.status' 字符串失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ // 只保留当前的有效持仓
|
|
|
+ if status != "OPENED" {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let market = position.get("market")
|
|
|
+ .and_then(|v| v.as_str())
|
|
|
+ .ok_or_else(|| anyhow!("获取 'position.market' 字符串失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ let side = position.get("side")
|
|
|
+ .and_then(|v| v.as_str())
|
|
|
+ .ok_or_else(|| anyhow!("获取 'position.side' 字符串失败,原始 JSON: {}", value_str))?;
|
|
|
+
|
|
|
+ let key = format!("{}_{}", market, side);
|
|
|
+
|
|
|
+ // 将有效持仓放入position中
|
|
|
+ self.extended_position.insert(key, position.clone());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ "TRADE" => {}
|
|
|
+ &_ => {
|
|
|
+ bail!("未知类型的 'type' 原始 JSON: {}", value_str);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
}
|