Pārlūkot izejas kodu

更新自动删除文件

DESKTOP-NE65RNK\Citrus_limon 1 gadu atpakaļ
vecāks
revīzija
eff9581e81

+ 9 - 0
src/binance_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 use std::collections::{BTreeMap, HashMap};
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 use lazy_static::lazy_static;
 use rust_decimal::Decimal;
 use tokio::sync::{Mutex, MutexGuard};
@@ -11,6 +12,7 @@ use exchanges::response_base::ResponseData;
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use standard::{Depth, OrderBook};
+use crate::json_db_utils::delete_db_by_exchange;
 use crate::listener_tools::{DepthMap, RecordMap, TradeMap, update_depth, update_record, update_trade};
 const EXCHANGE_NAME: &str = "binance_usdt_swap";
 
@@ -61,6 +63,13 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+        }
+    });
 }
 
 // 读取数据

+ 9 - 0
src/bingx_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 // use std::collections::{BTreeMap, HashMap};
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -12,6 +13,7 @@
 // use serde_json::json;
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "bingx_usdt_swap";
@@ -64,6 +66,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 9 - 0
src/bitget_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 // use std::collections::{BTreeMap, HashMap};
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use tokio::sync::{Mutex};
 // use tracing::info;
@@ -9,6 +10,7 @@
 // use exchanges::response_base::ResponseData;
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "bitget_usdt_swap";
@@ -55,6 +57,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 10 - 2
src/bitmart_usdt_swap_data_listener.rs

@@ -2,6 +2,7 @@
 // use std::str::FromStr;
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -12,6 +13,7 @@
 // use serde_json::json;
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "bitmart_usdt_swap";
@@ -56,15 +58,21 @@
 //             let mut ws = BitMartSwapWs::new_with_tag(ws_name, false, None, BitMartSwapWsType::Public);
 //             ws.set_subscribe(vec![
 //                 BitMartSwapSubscribeType::PuFuturesTrades,
-//                 BitMartSwapSubscribeType::PuFuturesRecords
+//                 BitMartSwapSubscribeType::PuFuturesRecords,
 //                 // BitMartSwapSubscribeType::PuFuturesDepth,
-//
 //             ]);
 //             // 建立链接
 //             ws.set_symbols(symbols_chunk);
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 14 - 5
src/coinex_usdt_swap_data_listener.rs

@@ -3,6 +3,7 @@ use std::collections::{BTreeMap, HashMap};
 use std::str::FromStr;
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 use lazy_static::lazy_static;
 use rust_decimal::Decimal;
 use tokio::sync::{Mutex, MutexGuard};
@@ -13,6 +14,7 @@ use exchanges::response_base::ResponseData;
 use standard::{Record};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::delete_db_by_exchange;
 use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 
 const EXCHANGE_NAME: &str = "coinex_usdt_swap";
@@ -58,6 +60,13 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+        }
+    });
 }
 
 // 读取数据
@@ -72,7 +81,7 @@ pub async fn data_listener(response: ResponseData) {
             // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
             //
             // update_depth(&depth).await;
-        },
+        }
         // 订单流数据
         "deals.update" => {
             let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::CoinexSwap, &response);
@@ -86,16 +95,16 @@ pub async fn data_listener(response: ResponseData) {
 
                 // k线数据更新
                 let record = parse_trades_to_record(trade.symbol.clone(), trades_map);
-                let record_map= RECORD_MAP.lock().await;
+                let record_map = RECORD_MAP.lock().await;
                 update_record(&record, record_map, EXCHANGE_NAME).await;
 
                 // 订单流数据更新
                 let trades_map_1 = TRADES_MAP.lock().await;
                 update_trade(trade, trades_map_1, EXCHANGE_NAME).await;
             }
-        },
+        }
         // pong消息不处理
-        "" => {},
+        "" => {}
         _ => {
             info!("85 未知的数据类型: {:?}", response)
         }
@@ -139,7 +148,7 @@ pub fn parse_trades_to_record(symbol: String, mut trades_map: MutexGuard<'_, Tra
             close,
             volume,
             symbol,
-        }
+        };
     }
 
     Record {

+ 9 - 0
src/coinsph_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 // use std::collections::{BTreeMap, HashMap};
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -13,6 +14,7 @@
 // use serde_json::json;
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "coinsph_usdt_swap";
@@ -65,6 +67,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 9 - 0
src/cointr_usdt_swap_data_listener.rs

@@ -2,6 +2,7 @@
 // use std::str::FromStr;
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -13,6 +14,7 @@
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
 // use standard::utils::symbol_out_mapper;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "cointr_usdt_swap";
@@ -66,6 +68,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 9 - 0
src/gate_usdt_spot_data_listener.rs

@@ -1,6 +1,7 @@
 // use std::collections::{BTreeMap, HashMap};
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -13,6 +14,7 @@
 // use standard::{Depth, OrderBook, SimpleDepth};
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
 //
 // const EXCHANGE_NAME: &str = "gate_usdt_spot";
@@ -67,6 +69,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 10 - 0
src/gate_usdt_swap_data_listener.rs

@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
 use std::str::FromStr;
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 use lazy_static::lazy_static;
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
@@ -13,6 +14,7 @@ use exchanges::response_base::ResponseData;
 use standard::{Depth, OrderBook, SimpleDepth};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::delete_db_by_exchange;
 use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
 
 const EXCHANGE_NAME: &str = "gate_usdt_swap";
@@ -65,6 +67,14 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
+
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+        }
+    });
 }
 
 // 读取数据

+ 133 - 124
src/htx_usdt_swap_data_listener.rs

@@ -1,124 +1,133 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use lazy_static::lazy_static;
-use serde_json::json;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::htx_swap_rest::HtxSwapRest;
-use exchanges::htx_swap_ws::{HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
-use exchanges::response_base::ResponseData;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
-
-const EXCHANGE_NAME: &str = "htx_usdt_swap";
-
-lazy_static! {
-    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "htx_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut htx_rest = HtxSwapRest::new(false, login);
-    let params = json!({
-        "contract_type": "swap"
-    });
-    let response = htx_rest.get_market(params).await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let data = response.data.as_array().unwrap();
-        for info in data {
-            symbols.push(info["contract_code"].as_str().unwrap().to_string().replace("-", "_"))
-        }
-    }
-
-    info!(?symbols);
-
-    // 将 symbols 分成每份20个元素的小块
-    for chunk in symbols.chunks(20) {
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let ws_name = name.to_string();
-        let write_tx_clone = Arc::clone(&write_tx_am);
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = HtxSwapWs::new_with_tag(ws_name, None, HtxSwapWsType::Public);
-            ws.set_subscribe(vec![
-                HtxSwapSubscribeType::PuFuturesRecords,
-                HtxSwapSubscribeType::PuFuturesTrades
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
-        });
-    }
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    let (origin_symbol, simple_channel) = extract_parts(response.channel.as_str());
-    let mut r = response.clone();
-    r.data["symbol"] = json!(origin_symbol);
-
-    match simple_channel.as_str() {
-        // 深度数据
-        "深度" => {
-            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::HtxSwap, &r);
-            //
-            // update_depth(&depth).await;
-        },
-        // 订单流数据
-        "trade.detail" => {
-            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::HtxSwap, &r);
-
-            for trade in trades.iter() {
-                let trades_map = TRADES_MAP.lock().await;
-
-                update_trade(&trade, trades_map, EXCHANGE_NAME).await
-            }
-        },
-        // k线数据
-        "kline.1min" => {
-            let records = ExchangeStructHandler::records_handle(ExchangeEnum::HtxSwap, &r);
-
-            if records.is_empty() {
-                return;
-            }
-
-            let record_map= RECORD_MAP.lock().await;
-            update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
-        },
-        _ => {
-            info!("98 未知的数据类型: {:?}", response)
-        }
-    }
-}
-
-fn extract_parts(input: &str) -> (String, String) {
-    // 使用 `split` 方法以点号分割字符串,并收集到 Vec 中
-    let parts: Vec<&str> = input.split('.').collect();
-
-    // 从分割后的数组中获取交易对和详情部分
-    // `parts[1]` 应该是 "BTC-USDT",`parts[2]` 是 "trade" 或 "kline",`parts[3]` 是 "detail" 或 "1min"
-    let pair = parts.get(1).unwrap_or(&"").to_string().replace("-", "_");
-    let detail = if parts.len() > 3 {
-        format!("{}.{}", parts[2], parts[3])
-    } else {
-        input.to_string()
-    };
-
-    (pair, detail)
-}
+// use std::collections::{BTreeMap, HashMap};
+// use std::sync::{Arc};
+// use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
+// use lazy_static::lazy_static;
+// use serde_json::json;
+// use tokio::sync::{Mutex};
+// use tracing::info;
+// use exchanges::htx_swap_rest::HtxSwapRest;
+// use exchanges::htx_swap_ws::{HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use standard::exchange::ExchangeEnum;
+// use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
+// use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
+//
+// const EXCHANGE_NAME: &str = "htx_usdt_swap";
+//
+// lazy_static! {
+//     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+//     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+//     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+// }
+//
+// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+//     let name = "htx_usdt_swap_listener";
+//     // 订阅所有币种
+//     let login = BTreeMap::new();
+//     let mut htx_rest = HtxSwapRest::new(false, login);
+//     let params = json!({
+//         "contract_type": "swap"
+//     });
+//     let response = htx_rest.get_market(params).await;
+//     let mut symbols = vec![];
+//     if response.code == 200 {
+//         let data = response.data.as_array().unwrap();
+//         for info in data {
+//             symbols.push(info["contract_code"].as_str().unwrap().to_string().replace("-", "_"))
+//         }
+//     }
+//
+//     info!(?symbols);
+//
+//     // 将 symbols 分成每份20个元素的小块
+//     for chunk in symbols.chunks(20) {
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//         let write_tx_am = Arc::new(Mutex::new(write_tx));
+//
+//         let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+//         let ws_name = name.to_string();
+//         let write_tx_clone = Arc::clone(&write_tx_am);
+//         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+//
+//         tokio::spawn(async move {
+//             let mut ws = HtxSwapWs::new_with_tag(ws_name, None, HtxSwapWsType::Public);
+//             ws.set_subscribe(vec![
+//                 HtxSwapSubscribeType::PuFuturesRecords,
+//                 HtxSwapSubscribeType::PuFuturesTrades,
+//             ]);
+//
+//             // 建立链接
+//             ws.set_symbols(symbols_chunk);
+//             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
+//         });
+//     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
+// }
+//
+// // 读取数据
+// pub async fn data_listener(response: ResponseData) {
+//     if response.code != 200 {
+//         return;
+//     }
+//
+//     let (origin_symbol, simple_channel) = extract_parts(response.channel.as_str());
+//     let mut r = response.clone();
+//     r.data["symbol"] = json!(origin_symbol);
+//
+//     match simple_channel.as_str() {
+//         // 深度数据
+//         "深度" => {
+//             // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::HtxSwap, &r);
+//             //
+//             // update_depth(&depth).await;
+//         }
+//         // 订单流数据
+//         "trade.detail" => {
+//             let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::HtxSwap, &r);
+//
+//             for trade in trades.iter() {
+//                 let trades_map = TRADES_MAP.lock().await;
+//
+//                 update_trade(&trade, trades_map, EXCHANGE_NAME).await
+//             }
+//         }
+//         // k线数据
+//         "kline.1min" => {
+//             let records = ExchangeStructHandler::records_handle(ExchangeEnum::HtxSwap, &r);
+//
+//             if records.is_empty() {
+//                 return;
+//             }
+//
+//             let record_map = RECORD_MAP.lock().await;
+//             update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
+//         }
+//         _ => {
+//             info!("98 未知的数据类型: {:?}", response)
+//         }
+//     }
+// }
+//
+// fn extract_parts(input: &str) -> (String, String) {
+//     // 使用 `split` 方法以点号分割字符串,并收集到 Vec 中
+//     let parts: Vec<&str> = input.split('.').collect();
+//
+//     // 从分割后的数组中获取交易对和详情部分
+//     // `parts[1]` 应该是 "BTC-USDT",`parts[2]` 是 "trade" 或 "kline",`parts[3]` 是 "detail" 或 "1min"
+//     let pair = parts.get(1).unwrap_or(&"").to_string().replace("-", "_");
+//     let detail = if parts.len() > 3 {
+//         format!("{}.{}", parts[2], parts[3])
+//     } else {
+//         input.to_string()
+//     };
+//
+//     (pair, detail)
+// }

+ 75 - 4
src/json_db_utils.rs

@@ -1,5 +1,7 @@
 use std::path::{Path, PathBuf};
+use std::str::FromStr;
 use chrono::{FixedOffset, TimeZone, Utc};
+use rust_decimal::Decimal;
 use rust_decimal::prelude::ToPrimitive;
 use serde_json::Value;
 use tokio::fs::File;
@@ -54,7 +56,7 @@ fn generate_filenames(start_timestamp: i64, end_timestamp: i64, exchange: &str,
 }
 
 pub fn generate_file_path(exchange: &str, formatted_date: &str, symbol: &str, subscriber_type: &str, serial: i64) -> String {
-    return format!("db/{}/{}/{}/{}/{}.json", exchange, formatted_date, symbol, subscriber_type, serial)
+    return format!("db/{}/{}/{}/{}/{}.json", exchange, formatted_date, symbol, subscriber_type, serial);
 }
 
 // 函数:将分钟数转换为日期字符串,格式为 YYYYMMDD
@@ -143,7 +145,7 @@ pub async fn collect_depth_json(start_timestamp: i64, end_timestamp: i64, exchan
                     // 不在时间范围内的就不要返回了
                     let t = depth.t.to_i64().unwrap();
                     if t < start_timestamp || t > end_timestamp {
-                        continue
+                        continue;
                     }
 
                     depths.push(depth.clone())
@@ -174,7 +176,7 @@ pub async fn collect_simple_depth_json(start_timestamp: i64, end_timestamp: i64,
                     // 不在时间范围内的就不要返回了
                     let t = depth.time.to_i64().unwrap();
                     if t < start_timestamp || t > end_timestamp {
-                        continue
+                        continue;
                     }
 
                     simple_depths.push(depth.clone())
@@ -186,6 +188,75 @@ pub async fn collect_simple_depth_json(start_timestamp: i64, end_timestamp: i64,
     serde_json::to_value(&simple_depths).unwrap()
 }
 
+// 清除指定时间前db文件
+pub async fn delete_db_by_exchange(exchange: &str, categories: Vec<&str>, retention_minute: i64) {
+    let exchange_path = format!("./db/{}", exchange);
+    let directories_name = get_directory(&exchange_path).await;
+    let symbols = get_symbols_by_exchange(exchange);
+    // 获取5小时前分钟时间戳
+    let minute_timestamp = (Utc::now().timestamp_millis() - retention_minute * 60 * 1000) / 60 / 1000;
+    let day = minute_to_date(minute_timestamp);
+    for directory in directories_name.iter() {
+        if Decimal::from_str(&directory).unwrap() < Decimal::from_str(&day).unwrap() {
+            let directory_path = format!("{}/{}", exchange_path, directory);
+            delete_directory(&directory_path).await;
+        } else {
+            for symbol in symbols.as_array().unwrap() {
+                for category in categories.clone() {
+                    let trades_files_path = format!("{}/{}/{}/{}", exchange_path, directory, symbol.as_str().unwrap(), category);
+                    let trades_files_name = get_directory(&trades_files_path).await;
+                    for file in trades_files_name {
+                        let file_name: Vec<&str> = file.split(".").collect();
+                        if Decimal::from_str(&file_name[0]).unwrap() < Decimal::from_str(&minute_timestamp.to_string()).unwrap() {
+                            let path = format!("{}/{}", trades_files_path, file);
+                            delete_directory(&path).await;
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+// 获取目录下文件、文件夹
+async fn get_directory(target_directory: &str) -> Vec<String> {
+    let mut files_name: Vec<String> = Vec::new();
+    if let Ok(mut entries) = fs::read_dir(target_directory).await {
+        // 遍历条目并处理文件
+        while let Ok(Some(entry)) = entries.next_entry().await {
+            let path = entry.path();
+            files_name.push(path.file_name().unwrap().to_str().unwrap().to_string());
+        };
+    } else {
+        error!("获取目录下文件、文件夹失败!目录:{}",target_directory);
+    }
+    files_name
+}
+
+// 删除目录下文件、文件夹
+async fn delete_directory(target_directory: &str) {
+    let path = Path::new(target_directory);
+    if let Ok(metadata) = fs::metadata(path).await {
+        if metadata.is_file() {
+            // 删除文件
+            match fs::remove_file(path).await {
+                Ok(_) => info!("删除文件成功: {}", path.display()),
+                Err(e) => error!("删除文件失败 {}: {}", path.display(), e),
+            }
+        } else if metadata.is_dir() {
+            // 删除文件夹及其内容
+            match fs::remove_dir_all(path).await {
+                Ok(_) => info!("删除文件夹成功: {}", path.display()),
+                Err(e) => error!("删除文件夹失败 {}: {}", path.display(), e),
+            }
+        } else {
+            error!("未知类型文件: {}", path.display());
+        }
+    } else {
+        error!("没有找到路径: {}", path.display());
+    }
+}
+
 fn find_latest_directory(path: &Path) -> std::io::Result<Option<PathBuf>> {
     let mut latest: Option<(PathBuf, std::time::SystemTime)> = None;
 
@@ -235,7 +306,7 @@ pub fn get_symbols_by_exchange(exchange: &str) -> Value {
     }
 
     info!(?symbols);
-    return serde_json::to_value(&symbols).unwrap()
+    return serde_json::to_value(&symbols).unwrap();
 }
 
 #[tokio::test]

+ 9 - 0
src/kucoin_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 // use std::collections::{BTreeMap, HashMap};
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -12,6 +13,7 @@
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
 // use standard::utils::symbol_out_mapper;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "kucoin_usdt_swap";
@@ -65,6 +67,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 1 - 1
src/main.rs

@@ -51,11 +51,11 @@ async fn main() {
     // cointr_usdt_swap_data_listener::run_listener(running.clone()).await;
     // bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // gate_usdt_spot_data_listener::run_listener(running.clone()).await;
+    // htx_usdt_swap_data_listener::run_listener(running.clone()).await;
 
     binance_usdt_swap_data_listener::run_listener(running.clone()).await;
     gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    htx_usdt_swap_data_listener::run_listener(running.clone()).await;
     phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();

+ 9 - 0
src/mexc_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 // use std::collections::{BTreeMap, HashMap};
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -13,6 +14,7 @@
 // use serde_json::json;
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "mexc_usdt_swap";
@@ -65,6 +67,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 9 - 0
src/okx_usdt_swap_data_listener.rs

@@ -2,6 +2,7 @@
 // use std::str::FromStr;
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -13,6 +14,7 @@
 // use serde_json::json;
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "okx_usdt_swap";
@@ -81,6 +83,13 @@
 //             ws.ws_connect_async(pub_is_shutdown_clone, data_listener, &pub_write_tx_am, pub_write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据

+ 9 - 0
src/phemex_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 use std::collections::{BTreeMap, HashMap};
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
 use lazy_static::lazy_static;
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
@@ -12,6 +13,7 @@ use serde_json::{json};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use standard::utils::symbol_out_mapper;
+use crate::json_db_utils::delete_db_by_exchange;
 use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 
 const EXCHANGE_NAME: &str = "phemex_usdt_swap";
@@ -66,6 +68,13 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+        }
+    });
 }
 
 // 读取数据

+ 9 - 0
src/woo_usdt_swap_data_listener.rs

@@ -1,6 +1,7 @@
 // use std::collections::{BTreeMap, HashMap};
 // use std::sync::{Arc};
 // use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
 // use lazy_static::lazy_static;
 // use rust_decimal::Decimal;
 // use rust_decimal_macros::dec;
@@ -12,6 +13,7 @@
 // use standard::exchange::ExchangeEnum;
 // use standard::exchange_struct_handler::ExchangeStructHandler;
 // use standard::utils::symbol_out_mapper;
+// use crate::json_db_utils::delete_db_by_exchange;
 // use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
 //
 // const EXCHANGE_NAME: &str = "woo_usdt_swap";
@@ -64,6 +66,13 @@
 //             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
 //         });
 //     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
 // }
 //
 // // 读取数据