Jelajahi Sumber

可以正常存放数据了。

skyffire 1 tahun lalu
induk
melakukan
3fad98f9ea
4 mengubah file dengan 95 tambahan dan 9 penghapusan
  1. 2 2
      .gitignore
  2. 35 6
      src/gate_usdt_swap_data_listener.rs
  3. 49 0
      src/json_db_utils.rs
  4. 9 1
      src/main.rs

+ 2 - 2
.gitignore

@@ -8,5 +8,5 @@ config.toml*
 /logs*
 /test_account.toml
 
-config.json
-config.json*
+*.json
+/db

+ 35 - 6
src/gate_usdt_swap_data_listener.rs

@@ -4,6 +4,7 @@ use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 use lazy_static::lazy_static;
+use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tracing::info;
@@ -12,6 +13,7 @@ use exchanges::response_base::ResponseData;
 use standard::{Depth, OrderBook};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::{generate_file_path, write_to_file};
 
 type DepthMap = HashMap<String, Vec<Depth>>;
 
@@ -42,8 +44,6 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
 
 // 读取数据
 pub async fn data_listener(response: ResponseData) {
-    let mut depth_map = DEPTH_MAP.lock().await;
-
     if response.code != 200 {
         return;
     }
@@ -53,9 +53,7 @@ pub async fn data_listener(response: ResponseData) {
         "futures.order_book" => {
             let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
 
-            depth_map.entry(depth.symbol.clone())
-                .or_insert_with(Vec::new)
-                .push(depth);
+            update_depth(&depth).await;
         },
         // 订单流数据
         "futures.trades" => {
@@ -69,12 +67,43 @@ pub async fn data_listener(response: ResponseData) {
             info!("48 未知的数据类型: {:?}", response)
         }
     }
+}
 
+// 更新深度数据
+pub async fn update_depth(new_depth: &Depth) {
+    let mut depth_map = DEPTH_MAP.lock().await;
+
+    // 估算当前depth的内存占用率
     let mut last_print_time = LAST_PRINT_TIME.lock().await;
-    if last_print_time.elapsed() >= Duration::from_secs(60) {
+    if last_print_time.elapsed() >= Duration::from_secs(10) {
         estimate_depth_map_memory_usage(&depth_map);
         *last_print_time = Instant::now();
     }
+
+    if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {
+        if let Some(last_depth) = depths.last() {
+            let last_depth_minutes = last_depth.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
+            let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
+
+            // 如果分钟数不同,则清空列表并添加新的depth
+            if last_depth_minutes != new_depth_minutes {
+                let depths_json = serde_json::to_string(depths).unwrap();
+                let path = generate_file_path("gate_usdt_swap",
+                                              new_depth.symbol.as_str(), "order_book", last_depth_minutes);
+
+                info!(?depths_json);
+                info!(?path);
+
+                write_to_file(depths_json, path);
+
+                depths.clear();
+            }
+        }
+        depths.push(new_depth.clone());
+    } else {
+        // 如果该symbol不存在,则创建新的Vec并添加depth
+        depth_map.insert(new_depth.symbol.clone(), vec![new_depth.clone()]);
+    }
 }
 
 pub fn estimate_depth_map_memory_usage(depth_map: &DepthMap) {

+ 49 - 0
src/json_db_utils.rs

@@ -0,0 +1,49 @@
+use tokio::fs::File;
+use tokio::io::AsyncWriteExt;
+use tokio::{fs, spawn};
+use tracing::{error};
+
+pub fn write_to_file(json_data: String, file_path: String) {
+    spawn(async move {
+        // 尝试创建文件路径
+        if let Err(e) = fs::create_dir_all(
+            // 获取文件目录路径
+            std::path::Path::new(&file_path)
+                .parent() // 获取父目录(即文件路径除去文件名后的部分)
+                .unwrap_or_else(|| std::path::Path::new("")), // 如果没有父目录,使用当前目录
+        )
+            .await
+        {
+            // 如果创建路径失败,打印错误日志
+            error!("创建目录错误: {:?}", e);
+            return; // 结束任务
+        }
+
+        // 异步地执行文件写入操作
+        if let Err(e) = async {
+            let mut file = File::create(&file_path).await?;
+            file.write_all(json_data.as_bytes()).await?;
+            Result::<(), std::io::Error>::Ok(())
+        }
+            .await
+        {
+            // 如果发生错误,只打印错误日志
+            error!("json db写入错误: {:?}", e);
+        }
+    });
+}
+
+pub fn generate_file_path(exchange: &str, symbol: &str, subscriber_type: &str, serial: i64) -> String {
+    return format!("db/{}/{}/{}/{}.json", exchange, symbol, subscriber_type, serial)
+}
+
+#[tokio::test]
+async fn write_test() {
+    use std::time::Duration;
+    use tokio::time::sleep;
+
+    // 调用函数,不需要等待它完成
+    write_to_file("{\"key\": \"value\"}".to_string(), "db/test.json".to_string());
+
+    sleep(Duration::from_secs(2)).await;
+}

+ 9 - 1
src/main.rs

@@ -1,9 +1,10 @@
 mod gate_usdt_swap_data_listener;
+mod json_db_utils;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
-use tracing::info;
+use tracing::{info, warn};
 use tracing_appender_timezone::non_blocking::WorkerGuard;
 
 // 日志级别配置
@@ -20,6 +21,13 @@ async fn main() {
     let running = Arc::new(AtomicBool::new(true));
     // 启动gate监听器
     gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // panic错误捕获,panic级别的错误直接退出
+    let panic_running = running.clone();
+    std::panic::set_hook(Box::new(move |panic_info| {
+        let msg = format!("type=panic, msg={:?}, location={:?}", panic_info.to_string(), panic_info.location());
+        warn!("{}", msg);
+        panic_running.store(false, Ordering::Relaxed);
+    }));
     // 每一秒检查一次程序是否结束
     while running.load(Ordering::Relaxed) {
         tokio::time::sleep(Duration::from_secs(1)).await;