|
|
@@ -8,9 +8,13 @@ mod strategy;
|
|
|
mod exchange;
|
|
|
mod api;
|
|
|
|
|
|
+use std::collections::{HashMap, HashSet};
|
|
|
use backtrace::Backtrace;
|
|
|
use std::sync::Arc;
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
+use anyhow::anyhow;
|
|
|
+use anyhow::Result;
|
|
|
+use serde_json::Value;
|
|
|
use tracing::{error, info, warn};
|
|
|
use utils::log_setup;
|
|
|
use crate::exchange::mexc_spot_client::MexcSpotClient;
|
|
|
@@ -57,8 +61,8 @@ async fn main() {
|
|
|
// 启动一个后台任务来执行订阅和数据处理
|
|
|
let subscription_handle = tokio::spawn(async move {
|
|
|
// 运行获取交易对和订阅 K 线的函数
|
|
|
- if !run_mexc_subscriptions(task_running.clone(), subscribe_client).await {
|
|
|
- error!("运行 MEXC 订阅任务失败");
|
|
|
+ if let Err(e) = run_mexc_subscriptions(task_running.clone(), subscribe_client).await {
|
|
|
+ error!("运行 MEXC 订阅任务失败: {:?}", e);
|
|
|
task_running.store(false, Ordering::Relaxed); // 如果启动失败,也设置停止标志
|
|
|
}
|
|
|
});
|
|
|
@@ -105,15 +109,45 @@ async fn main() {
|
|
|
pub async fn run_mexc_subscriptions(
|
|
|
running: Arc<AtomicBool>, // 接收 running 标志,以便在出错时可以停止
|
|
|
client: Arc<tokio::sync::Mutex<MexcSpotClient>>,
|
|
|
-) -> bool {
|
|
|
+) -> Result<()> {
|
|
|
info!("开始获取 MEXC 交易对...");
|
|
|
|
|
|
// 1. 获取所有交易对
|
|
|
// 注意:这里的 .await? 会在出错时直接返回 Err,中断此函数
|
|
|
// 你可能需要根据实际的API响应调整这里的类型和字段访问
|
|
|
let mut rest_client = client.lock().await;
|
|
|
- let exchange_info_response = rest_client.exchange_info(serde_json::Value::Null).await;
|
|
|
+
|
|
|
let default_symbols_response = rest_client.default_symbols().await;
|
|
|
+ // info!("获取到的 default_symbols_value: {}", serde_json::to_string_pretty(&default_symbols_response.data).unwrap());
|
|
|
+
|
|
|
+ let exchange_info_response = rest_client.exchange_info(serde_json::Value::Null).await;
|
|
|
+ // info!("获取到的 exchange_info_data: {}", serde_json::to_string_pretty(&exchange_info_response.data).unwrap());
|
|
|
+ // 调用处理函数,注意现在传入的是 &Value
|
|
|
+
|
|
|
+ match process_exchange_info(&default_symbols_response.data, &exchange_info_response.data) {
|
|
|
+ Ok(filtered_map) => {
|
|
|
+ info!("成功过滤并转换了交易对信息,最终数量: {}", filtered_map.len());
|
|
|
+
|
|
|
+ // 使用 filtered_map...
|
|
|
+ if let Some(symbol_info) = filtered_map.get("BTCUSDT") { // 示例
|
|
|
+ let pretty_info = serde_json::to_string_pretty(symbol_info).unwrap_or_else(|e| format!("序列化错误: {}", e));
|
|
|
+ info!("BTCUSDT 的信息: {}", pretty_info);
|
|
|
+ } else if !filtered_map.is_empty() {
|
|
|
+ if let Some((first_key, first_value)) = filtered_map.iter().next() {
|
|
|
+ let pretty_info = serde_json::to_string_pretty(first_value).unwrap_or_else(|e| format!("序列化错误: {}", e));
|
|
|
+ info!("过滤后 map 中的第一个交易对 '{}' 的信息: {}", first_key, pretty_info);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ info!("过滤后的 Map 为空。");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ error!("处理交易所信息时发生错误: {}", e);
|
|
|
+ return Err(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
// // 提取交易对名称 (例如 "BTC_USDT")
|
|
|
// // 假设 SymbolInfo 结构体有一个名为 `symbol` 的 String 字段
|
|
|
@@ -167,5 +201,78 @@ pub async fn run_mexc_subscriptions(
|
|
|
// }
|
|
|
|
|
|
// 如果订阅本身是后台任务,并且这个函数只是触发订阅,那么到这里就可以返回 Ok 了
|
|
|
- true
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/// 处理交易所信息,根据一个包含默认交易对列表的 Value 过滤并转换为 Map
|
|
|
+///
|
|
|
+/// # Arguments
|
|
|
+///
|
|
|
+/// * `default_symbols_value` - 一个 `serde_json::Value`,预期结构为 `{"code": number, "data": ["SYMBOL1", "SYMBOL2", ...]}`。
|
|
|
+/// * `exchange_info_data` - 从 `/api/v3/exchangeInfo` 获取到的 `serde_json::Value`,其中应包含 "symbols" 字段。
|
|
|
+///
|
|
|
+/// # Returns
|
|
|
+///
|
|
|
+/// * `Result<HashMap<String, Value>>` - 如果成功,返回一个 HashMap,Key 是交易对符号 (String),Value 是该交易对的完整 JSON 对象 (Value)。
|
|
|
+/// * `Err(anyhow::Error)` - 如果发生错误(例如,输入 Value 结构不正确、缺少字段或数据格式不正确)。
|
|
|
+fn process_exchange_info(
|
|
|
+ default_symbols_value: &Value,
|
|
|
+ exchange_info_data: &Value,
|
|
|
+) -> Result<HashMap<String, Value>> {
|
|
|
+ // 1. 从 default_symbols_value 中提取 "data" 数组,并转换为 HashSet<String>
|
|
|
+ let default_symbols_set: HashSet<String> = default_symbols_value
|
|
|
+ .get("data") // 获取 "data" 字段 (Option<&Value>)
|
|
|
+ .and_then(Value::as_array) // 尝试转为数组引用 (Option<&Vec<Value>>)
|
|
|
+ .ok_or_else(|| anyhow!("在 default_symbols 数据中未找到 'data' 字段或其不是一个数组"))? // 失败则返回错误
|
|
|
+ .iter() // 迭代数组中的 &Value
|
|
|
+ .filter_map(|v| { // 过滤并转换
|
|
|
+ if let Some(s) = v.as_str() { // 尝试将 &Value 转为 &str
|
|
|
+ Some(s.to_string()) // 如果是字符串,转为 String
|
|
|
+ } else {
|
|
|
+ warn!("在 default_symbols 的 data 数组中发现非字符串元素,已跳过: {:?}", v);
|
|
|
+ None // 否则忽略
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .collect(); // 收集成 HashSet<String>
|
|
|
+
|
|
|
+ info!("从 default_symbols 数据中加载了 {} 个默认交易对用于过滤。", default_symbols_set.len());
|
|
|
+
|
|
|
+ // 2. 初始化结果 HashMap
|
|
|
+ let mut filtered_symbols_map: HashMap<String, Value> = HashMap::new();
|
|
|
+
|
|
|
+ // 3. 从 exchange_info_data 中提取 "symbols" 数组
|
|
|
+ let symbols_array = exchange_info_data
|
|
|
+ .get("symbols")
|
|
|
+ .and_then(Value::as_array)
|
|
|
+ .ok_or_else(|| anyhow!("在 exchange_info 数据中未找到 'symbols' 字段或其不是一个数组"))?;
|
|
|
+
|
|
|
+ info!("从 exchangeInfo 获取到 {} 个交易对信息,开始过滤...", symbols_array.len());
|
|
|
+
|
|
|
+ // 4. 遍历 symbols 数组,进行过滤和转换 (这部分逻辑不变)
|
|
|
+ for symbol_info_value in symbols_array {
|
|
|
+ if !symbol_info_value.is_object() {
|
|
|
+ warn!("发现一个非对象的条目,已跳过: {:?}", symbol_info_value);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let symbol_str_opt = symbol_info_value.get("symbol").and_then(Value::as_str);
|
|
|
+ let status_str_opt = symbol_info_value.get("status").and_then(Value::as_str);
|
|
|
+
|
|
|
+ match (symbol_str_opt, status_str_opt) {
|
|
|
+ (Some(symbol), Some(status)) => {
|
|
|
+ if status == "1" && default_symbols_set.contains(symbol) {
|
|
|
+ filtered_symbols_map.insert(symbol.to_string(), symbol_info_value.clone());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ _ => {
|
|
|
+ warn!("交易对信息缺少 'symbol' 或 'status' 字段,或类型不是字符串,已跳过: {:?}", symbol_info_value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ info!("过滤完成,得到 {} 个符合条件的交易对。", filtered_symbols_map.len());
|
|
|
+
|
|
|
+ // 5. 返回结果 Map
|
|
|
+ Ok(filtered_symbols_map)
|
|
|
}
|