浏览代码

部分公共属性抽出来。

skyffire 1 年之前
父节点
当前提交
9e2952fdeb

+ 22 - 10
src/binance_usdt_swap_data_listener.rs

@@ -7,18 +7,13 @@ use tracing::info;
 use exchanges::binance_swap_rest::BinanceSwapRest;
 use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use exchanges::response_base::ResponseData;
-use standard::{Record, SpecialTrade};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::{update_record, update_trade};
-
-// type DepthMap = HashMap<String, Vec<SpecialDepth>>;
-pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
-pub type RecordMap = HashMap<String, Record>;
+use crate::listener_tools::{DepthMap, RecordMap, TradeMap, update_depth, update_record, update_trade};
 const EXCHANGE_NAME: &str = "binance_usdt_swap";
 
 lazy_static! {
-    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
 }
@@ -62,6 +57,22 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
+
+    // // 2024-5-7任务 单独订阅ETH的全档位深度数据
+    // let ws_name = name.to_string();
+    // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    // let write_tx_am = Arc::new(Mutex::new(write_tx));
+    // let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+    // tokio::spawn(async move {
+    //     let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
+    //     ws.set_subscribe(vec![
+    //         BinanceSwapSubscribeType::PuDepth20levels100ms
+    //     ]);
+    //
+    //     // 建立链接
+    //     ws.set_symbols(vec!["ETH_USDT".to_string()]);
+    //     ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    // });
 }
 
 // 读取数据
@@ -73,9 +84,10 @@ pub async fn data_listener(response: ResponseData) {
     match response.channel.as_str() {
         // 深度数据
         "深度" => {
-            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
-            //
-            // update_depth(&depth).await;
+            let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
+            let depth_map = DEPTH_MAP.lock().await;
+
+            update_depth(&depth, depth_map, EXCHANGE_NAME).await;
         },
         // 订单流数据
         "aggTrade" => {

+ 1 - 5
src/bitget_usdt_swap_data_listener.rs

@@ -7,14 +7,10 @@ use tracing::info;
 use exchanges::bitget_swap_rest::BitgetSwapRest;
 use exchanges::bitget_swap_ws::{BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
 use exchanges::response_base::ResponseData;
-use standard::{Record, SpecialTrade};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::{update_record, update_trade};
+use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 
-// type DepthMap = HashMap<String, Vec<SpecialDepth>>;
-pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
-pub type RecordMap = HashMap<String, Record>;
 const EXCHANGE_NAME: &str = "bitget_usdt_swap";
 
 lazy_static! {

+ 2 - 5
src/coinex_usdt_swap_data_listener.rs

@@ -10,14 +10,11 @@ use tracing::info;
 use exchanges::coinex_swap_rest::CoinexSwapRest;
 use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs};
 use exchanges::response_base::ResponseData;
-use standard::{Record, SpecialTrade};
+use standard::{Record};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::{update_record, update_trade};
+use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 
-// type DepthMap = HashMap<String, Vec<SpecialDepth>>;
-pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
-pub type RecordMap = HashMap<String, Record>;
 const EXCHANGE_NAME: &str = "coinex_usdt_swap";
 
 lazy_static! {

+ 1 - 5
src/gate_usdt_swap_data_listener.rs

@@ -7,14 +7,10 @@ use tracing::info;
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use exchanges::response_base::ResponseData;
-use standard::{Record, SpecialTrade};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::{update_record, update_trade};
+use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 
-// type DepthMap = HashMap<String, Vec<SpecialDepth>>;
-pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
-pub type RecordMap = HashMap<String, Record>;
 const EXCHANGE_NAME: &str = "gate_usdt_swap";
 
 lazy_static! {

+ 30 - 30
src/listener_tools.rs

@@ -1,11 +1,15 @@
+use std::collections::HashMap;
 use std::str::FromStr;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
-use standard::{Record, SpecialTrade, Trade};
-use crate::gate_usdt_swap_data_listener::{RecordMap, TradeMap};
+use standard::{Depth, Record, SpecialDepth, SpecialTrade, Trade};
 use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
 
+pub type DepthMap = HashMap<String, Vec<SpecialDepth>>;
+pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+pub type RecordMap = HashMap<String, Record>;
+
 // 更新订单流数据
 pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
     if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) {
@@ -58,31 +62,27 @@ pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_,
     records_map.insert(new_record.symbol.clone(), new_record.clone());
 }
 
-// // 更新深度数据
-// pub async fn update_depth(new_depth: &Depth) {
-//     let mut depth_map = DEPTH_MAP.lock().await;
-//
-//     if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {
-//         if let Some(last_depth) = depths.last() {
-//             let last_depth_minutes = last_depth.t.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
-//             let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
-//
-//             // 如果分钟数不同,则清空列表并添加新的depth
-//             if last_depth_minutes != new_depth_minutes {
-//                 let depths_json = serde_json::to_string(depths).unwrap();
-//                 let path = generate_file_path("gate_usdt_swap",
-//                                               new_depth.symbol.as_str(), "order_book", last_depth_minutes);
-//
-//                 info!(?path);
-//
-//                 write_to_file(depths_json, path);
-//
-//                 depths.clear();
-//             }
-//         }
-//         depths.push(SpecialDepth::new(&new_depth));
-//     } else {
-//         // 如果该symbol不存在,则创建新的Vec并添加depth
-//         depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]);
-//     }
-// }
+// 更新深度数据
+pub async fn update_depth(new_depth: &Depth, mut depth_map: MutexGuard<'_, DepthMap>, exchange: &str) {
+    if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {
+        if let Some(last_depth) = depths.last() {
+            let last_depth_minutes = last_depth.t.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
+            let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
+
+            // 如果分钟数不同,则清空列表并添加新的depth
+            if last_depth_minutes != new_depth_minutes {
+                let depths_json = serde_json::to_string(depths).unwrap();
+                let date_str = minute_to_date(last_depth_minutes);
+                let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book", last_depth_minutes);
+
+                write_to_file(depths_json, path).await;
+
+                depths.clear();
+            }
+        }
+        depths.push(SpecialDepth::new(&new_depth));
+    } else {
+        // 如果该symbol不存在,则创建新的Vec并添加depth
+        depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]);
+    }
+}

+ 23 - 0
standard/src/lib.rs

@@ -147,6 +147,29 @@ impl Depth {
     }
 }
 
+/// 特殊Depth结构体(市场深度),用于存放到本地数据库中
+/// - `a(Vec<Vec<Decimal>>)`: asks;
+/// - `b(Vec<Vec<Decimal>>)`: bids;
+/// - `t(Decimal)`: 数据生成时间
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct SpecialDepth {
+    pub b: Vec<Vec<Decimal>>,
+    pub a: Vec<Vec<Decimal>>,
+    pub t: Decimal,
+}
+
+impl SpecialDepth {
+    pub fn new(depth: &Depth) -> SpecialDepth {
+        let bids = depth.bids.iter().map(|ob| vec![ob.price, ob.amount]).collect::<Vec<_>>();
+        let asks = depth.asks.iter().map(|ob| vec![ob.price, ob.amount]).collect::<Vec<_>>();
+        SpecialDepth {
+            b: bids,
+            a: asks,
+            t: depth.time,
+        }
+    }
+}
+
 /// 特殊Ticker结构体(市场行情)
 /// - `sell(Decimal)`: 卖一价
 /// - `buy(Decimal)`: 买一价