Browse Source

使用日期来区分文件夹,并且使用.await方式来save文件。

skyffire 1 year ago
parent
commit
63227f774f
4 changed files with 57 additions and 38 deletions
  1. 1 1
      global/src/log_utils.rs
  2. 4 2
      src/gate_usdt_swap_data_listener.rs
  3. 48 32
      src/json_db_utils.rs
  4. 4 3
      src/listener_tools.rs

+ 1 - 1
global/src/log_utils.rs

@@ -53,7 +53,7 @@ pub fn init_log_with_info() {
     let _ = final_init(tracing::Level::INFO.as_str(), 0, "test".to_string());
 }
 
-pub fn final_init(level: &str, port: u32, app_name: String) -> WorkerGuard {
+pub fn final_init(level: &str, _port: u32, app_name: String) -> WorkerGuard {
     let mut path = String::new();
     path.push_str("./logs");
 

+ 4 - 2
src/gate_usdt_swap_data_listener.rs

@@ -69,9 +69,11 @@ pub async fn data_listener(response: ResponseData) {
         "futures.trades" => {
             let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response);
 
-            let trades_map = TRADES_MAP.lock().await;
+            for trade in trades.iter() {
+                let trades_map = TRADES_MAP.lock().await;
 
-            update_trade(&trades[trades.len() - 1], trades_map, EXCHANGE_NAME).await
+                update_trade(&trade, trades_map, EXCHANGE_NAME).await
+            }
         },
         // k线数据
         "futures.candlesticks" => {

+ 48 - 32
src/json_db_utils.rs

@@ -1,39 +1,38 @@
 use std::path::{Path, PathBuf};
+use chrono::{FixedOffset, TimeZone, Utc};
 use serde_json::Value;
 use tokio::fs::File;
 use tokio::io::AsyncWriteExt;
-use tokio::{fs, spawn};
+use tokio::{fs};
 use tracing::{error, info};
 use standard::SpecialTrade;
 
-pub fn write_to_file(json_data: String, file_path: String) {
-    spawn(async move {
-        // 尝试创建文件路径
-        if let Err(e) = fs::create_dir_all(
-            // 获取文件目录路径
-            Path::new(&file_path)
-                .parent() // 获取父目录(即文件路径除去文件名后的部分)
-                .unwrap_or_else(|| Path::new("")), // 如果没有父目录,使用当前目录
-        )
-            .await
-        {
-            // 如果创建路径失败,打印错误日志
-            error!("创建目录错误: {:?}", e);
-            return; // 结束任务
-        }
+pub async fn write_to_file(json_data: String, file_path: String) {
+    // 尝试创建文件路径
+    if let Err(e) = fs::create_dir_all(
+        // 获取文件目录路径
+        Path::new(&file_path)
+            .parent() // 获取父目录(即文件路径除去文件名后的部分)
+            .unwrap_or_else(|| Path::new("")), // 如果没有父目录,使用当前目录
+    )
+        .await
+    {
+        // 如果创建路径失败,打印错误日志
+        error!("创建目录错误: {:?}", e);
+        return; // 结束任务
+    }
 
-        // 异步地执行文件写入操作
-        if let Err(e) = async {
-            let mut file = File::create(&file_path).await?;
-            file.write_all(json_data.as_bytes()).await?;
-            Result::<(), std::io::Error>::Ok(())
-        }
-            .await
-        {
-            // 如果发生错误,只打印错误日志
-            error!("json db写入错误: {:?}", e);
-        }
-    });
+    // 异步地执行文件写入操作
+    if let Err(e) = async {
+        let mut file = File::create(&file_path).await?;
+        file.write_all(json_data.as_bytes()).await?;
+        Result::<(), std::io::Error>::Ok(())
+    }
+        .await
+    {
+        // 如果发生错误,只打印错误日志
+        error!("json db写入错误: {:?}", e);
+    }
 }
 
 // 根据时间戳生成文件名列表
@@ -44,7 +43,8 @@ fn generate_filenames(start_timestamp: i64, end_timestamp: i64, exchange: &str,
 
     let mut minute = end_minute;
     while minute >= start_minute {
-        filenames.push(generate_file_path(exchange, symbol, subscriber_type, minute));
+        let date_str = minute_to_date(minute);
+        filenames.push(generate_file_path(exchange, date_str.as_str(), symbol, subscriber_type, minute));
 
         minute -= 1;
     }
@@ -52,8 +52,24 @@ fn generate_filenames(start_timestamp: i64, end_timestamp: i64, exchange: &str,
     filenames
 }
 
-pub fn generate_file_path(exchange: &str, symbol: &str, subscriber_type: &str, serial: i64) -> String {
-    return format!("db/{}/{}/{}/{}.json", exchange, symbol, subscriber_type, serial)
+pub fn generate_file_path(exchange: &str, formatted_date: &str, symbol: &str, subscriber_type: &str, serial: i64) -> String {
+    return format!("db/{}/{}/{}/{}/{}.json", exchange, formatted_date, symbol, subscriber_type, serial)
+}
+
+// 函数:将分钟数转换为日期字符串,格式为 YYYYMMDD
+pub fn minute_to_date(minute: i64) -> String {
+    // 将分钟转换为秒
+    let seconds = minute * 60;
+
+    // 创建一个代表东八区(GMT+8)的时间偏移
+    let east_eight_zone = FixedOffset::east_opt(8 * 3600).unwrap();
+
+    // 使用 UTC 时间创建 DateTime 对象,然后将其转换为东八区时间
+    let datetime_utc = Utc.timestamp_opt(seconds, 0).unwrap();
+    let datetime_east_eight = datetime_utc.with_timezone(&east_eight_zone);
+
+    // 返回日期字符串(格式 YYYYMMDD)
+    datetime_east_eight.format("%Y%m%d").to_string()
 }
 
 // 从本地json的db里面加载SpecialTrade
@@ -104,7 +120,7 @@ async fn write_test() {
     use tokio::time::sleep;
 
     // 调用函数,不需要等待它完成
-    write_to_file("{\"key\": \"value\"}".to_string(), "db/test.json".to_string());
+    write_to_file("{\"key\": \"value\"}".to_string(), "db/test.json".to_string()).await;
 
     sleep(Duration::from_secs(2)).await;
 }

+ 4 - 3
src/listener_tools.rs

@@ -3,7 +3,7 @@ use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
 use standard::{SpecialTrade, Trade};
 use crate::gate_usdt_swap_data_listener::TradeMap;
-use crate::json_db_utils::{generate_file_path, write_to_file};
+use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
 
 pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
     if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) {
@@ -15,11 +15,12 @@ pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, Trad
             // 如果分钟数不同,则清空列表并添加新的depth
             if last_trade_minutes != new_trade_minutes {
                 let depths_json = serde_json::to_string(trades).unwrap();
-                let path = generate_file_path(exchange, new_trade.symbol.as_str(), "trades", last_trade_minutes);
+                let date_str = minute_to_date(last_trade_minutes);
+                let path = generate_file_path(exchange, date_str.as_str(), new_trade.symbol.as_str(), "trades", last_trade_minutes);
 
                 // info!(?path);
 
-                write_to_file(depths_json, path);
+                write_to_file(depths_json, path).await;
 
                 trades.clear();
             }