瀏覽代碼

部分公共方法抽出来。

skyffire 1 年之前
父節點
當前提交
54e565fb18
共有 3 個文件被更改,包括 72 次插入74 次删除
  1. 10 74
      src/gate_usdt_swap_data_listener.rs
  2. 61 0
      src/listener_tools.rs
  3. 1 0
      src/main.rs

+ 10 - 74
src/gate_usdt_swap_data_listener.rs

@@ -1,28 +1,24 @@
 use std::collections::{BTreeMap, HashMap};
-// use std::collections::HashMap;
-// use std::mem::{size_of, size_of_val};
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
-// use std::time::Duration;
 use lazy_static::lazy_static;
-use rust_decimal::prelude::ToPrimitive;
-// use rust_decimal::prelude::ToPrimitive;
-use tokio::sync::Mutex;
+use tokio::sync::{Mutex};
 use tracing::info;
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use exchanges::response_base::ResponseData;
-use standard::{SpecialTrade, Trade};
+use standard::{SpecialTrade};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{generate_file_path, write_to_file};
+use crate::listener_tools::update_trade;
 
 // type DepthMap = HashMap<String, Vec<SpecialDepth>>;
-type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+const EXCHANGE_NAME: &str = "gate_usdt_swap";
 
 lazy_static! {
     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-    static ref TRADE_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
 }
 
 pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
@@ -74,75 +70,15 @@ pub async fn data_listener(response: ResponseData) {
             let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response);
 
             for trade in trades.iter() {
-                update_trade(&trade).await
+                let trades_map = TRADES_MAP.lock().await;
+
+                update_trade(&trade, trades_map, EXCHANGE_NAME).await
             }
         },
         // k线数据
-        "futures.candlesticks" => {
-
-        },
+        "futures.candlesticks" => {},
         _ => {
             info!("48 未知的数据类型: {:?}", response)
         }
     }
 }
-
-pub async fn update_trade(new_trade: &Trade) {
-    let mut trade_map = TRADE_MAP.lock().await;
-
-    if let Some(trades) = trade_map.get_mut(new_trade.symbol.as_str()) {
-        if let Some(last_trade) = trades.last() {
-            // 这里的last_trade.0是以元组形式进行访问。
-            let last_trade_minutes = last_trade.inner()[0].to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
-            let new_trade_minutes = new_trade.time.to_i64().unwrap() / 60000; // 同上
-
-            // 如果分钟数不同,则清空列表并添加新的depth
-            if last_trade_minutes != new_trade_minutes {
-                let depths_json = serde_json::to_string(trades).unwrap();
-                let path = generate_file_path("gate_usdt_swap",
-                                              new_trade.symbol.as_str(),
-                                              "trades",
-                                              last_trade_minutes);
-
-                info!(?path);
-
-                write_to_file(depths_json, path);
-
-                trades.clear();
-            }
-        }
-        trades.push(SpecialTrade::new(&new_trade));
-    } else {
-        // 如果该symbol不存在,则创建新的Vec并添加depth
-        trade_map.insert(new_trade.symbol.clone(), vec![SpecialTrade::new(&new_trade)]);
-    }
-}
-
-// // 更新深度数据
-// pub async fn update_depth(new_depth: &Depth) {
-//     let mut depth_map = DEPTH_MAP.lock().await;
-//
-//     if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {
-//         if let Some(last_depth) = depths.last() {
-//             let last_depth_minutes = last_depth.t.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
-//             let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
-//
-//             // 如果分钟数不同,则清空列表并添加新的depth
-//             if last_depth_minutes != new_depth_minutes {
-//                 let depths_json = serde_json::to_string(depths).unwrap();
-//                 let path = generate_file_path("gate_usdt_swap",
-//                                               new_depth.symbol.as_str(), "order_book", last_depth_minutes);
-//
-//                 info!(?path);
-//
-//                 write_to_file(depths_json, path);
-//
-//                 depths.clear();
-//             }
-//         }
-//         depths.push(SpecialDepth::new(&new_depth));
-//     } else {
-//         // 如果该symbol不存在,则创建新的Vec并添加depth
-//         depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]);
-//     }
-// }

+ 61 - 0
src/listener_tools.rs

@@ -0,0 +1,61 @@
+use rust_decimal::prelude::ToPrimitive;
+use tokio::sync::MutexGuard;
+use tracing::info;
+use standard::{SpecialTrade, Trade};
+use crate::gate_usdt_swap_data_listener::TradeMap;
+use crate::json_db_utils::{generate_file_path, write_to_file};
+
+pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
+    if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) {
+        if let Some(last_trade) = trades.last() {
+            // 这里的last_trade.0是以元组形式进行访问。
+            let last_trade_minutes = last_trade.inner()[0].to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
+            let new_trade_minutes = new_trade.time.to_i64().unwrap() / 60000; // 同上
+
+            // 如果分钟数不同,则清空列表并添加新的depth
+            if last_trade_minutes != new_trade_minutes {
+                let depths_json = serde_json::to_string(trades).unwrap();
+                let path = generate_file_path(exchange, new_trade.symbol.as_str(), "trades", last_trade_minutes);
+
+                info!(?path);
+
+                write_to_file(depths_json, path);
+
+                trades.clear();
+            }
+        }
+        trades.push(SpecialTrade::new(&new_trade));
+    } else {
+        // 如果该symbol不存在,则创建新的Vec并添加depth
+        trades_map.insert(new_trade.symbol.clone(), vec![SpecialTrade::new(&new_trade)]);
+    }
+}
+
+// // 更新深度数据
+// pub async fn update_depth(new_depth: &Depth) {
+//     let mut depth_map = DEPTH_MAP.lock().await;
+//
+//     if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {
+//         if let Some(last_depth) = depths.last() {
+//             let last_depth_minutes = last_depth.t.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
+//             let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
+//
+//             // 如果分钟数不同,则清空列表并添加新的depth
+//             if last_depth_minutes != new_depth_minutes {
+//                 let depths_json = serde_json::to_string(depths).unwrap();
+//                 let path = generate_file_path("gate_usdt_swap",
+//                                               new_depth.symbol.as_str(), "order_book", last_depth_minutes);
+//
+//                 info!(?path);
+//
+//                 write_to_file(depths_json, path);
+//
+//                 depths.clear();
+//             }
+//         }
+//         depths.push(SpecialDepth::new(&new_depth));
+//     } else {
+//         // 如果该symbol不存在,则创建新的Vec并添加depth
+//         depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]);
+//     }
+// }

+ 1 - 0
src/main.rs

@@ -2,6 +2,7 @@ mod gate_usdt_swap_data_listener;
 mod json_db_utils;
 mod control_c;
 mod server;
+mod listener_tools;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};