||
- use std::path::{Path, PathBuf};
- use std::str::FromStr;
- use chrono::{FixedOffset, TimeZone, Utc};
- use rust_decimal::Decimal;
- use rust_decimal::prelude::ToPrimitive;
- use serde_json::Value;
- use tokio::fs::File;
- use tokio::io::AsyncWriteExt;
- use tokio::{fs};
- use tracing::{error, info};
- use standard::{Record, SimpleDepth, SpecialDepth, SpecialTrade};
- pub async fn write_to_file(json_data: String, file_path: String) {
- // 尝试创建文件路径
- if let Err(e) = fs::create_dir_all(
- // 获取文件目录路径
- Path::new(&file_path)
- .parent() // 获取父目录(即文件路径除去文件名后的部分)
- .unwrap_or_else(|| Path::new("")), // 如果没有父目录,使用当前目录
- )
- .await
- {
- // 如果创建路径失败,打印错误日志
- error!("创建目录错误: {:?}", e);
- return; // 结束任务
- }
- // 异步地执行文件写入操作
- if let Err(e) = async {
- let mut file = File::create(&file_path).await?;
- file.write_all(json_data.as_bytes()).await?;
- Result::<(), std::io::Error>::Ok(())
- }
- .await
- {
- // 如果发生错误,只打印错误日志
- error!("json db写入错误: {:?}", e);
- }
- }
- // 根据时间戳生成文件名列表
- fn generate_filenames(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str, subscriber_type: &str) -> Vec<String> {
- let mut filenames = Vec::new();
- let start_minute = start_timestamp / 60000; // 转换为分钟
- let end_minute = end_timestamp / 60000; // 转换为分钟
- let mut minute = end_minute;
- while minute >= start_minute {
- let date_str = minute_to_date(minute);
- filenames.push(generate_file_path(exchange, date_str.as_str(), symbol, subscriber_type, minute));
- minute -= 1;
- }
- filenames
- }
- pub fn generate_file_path(exchange: &str, formatted_date: &str, symbol: &str, subscriber_type: &str, serial: i64) -> String {
- return format!("db/{}/{}/{}/{}/{}.json", exchange, formatted_date, symbol, subscriber_type, serial);
- }
- // 函数:将分钟数转换为日期字符串,格式为 YYYYMMDD
- pub fn minute_to_date(minute: i64) -> String {
- // 将分钟转换为秒
- let seconds = minute * 60;
- // 创建一个代表东八区(GMT+8)的时间偏移
- let east_eight_zone = FixedOffset::east_opt(8 * 3600).unwrap();
- // 使用 UTC 时间创建 DateTime 对象,然后将其转换为东八区时间
- let datetime_utc = Utc.timestamp_opt(seconds, 0).unwrap();
- let datetime_east_eight = datetime_utc.with_timezone(&east_eight_zone);
- // 返回日期字符串(格式 YYYYMMDD)
- datetime_east_eight.format("%Y%m%d").to_string()
- }
- // 将一个时间段范围内的所有SpecialTrade返回(以json形式)
- pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
- let mut all_trades = Vec::new();
- let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "trades");
- for filename in filenames {
- let file_path = PathBuf::from(filename.as_str());
- let file_content = fs::read_to_string(file_path).await;
- // 检查文件内容是否成功读取
- let mut trades = if let Ok(content) = file_content {
- // 尝试反序列化文件内容
- if let Ok(trades) = serde_json::from_str::<Vec<SpecialTrade>>(&content) {
- trades // 成功反序列化,返回结果
- } else {
- vec![] // 反序列化失败,返回空 Vec
- }
- } else {
- vec![] // 读取文件失败,返回空 Vec
- };
- trades.reverse();
- // info!("{} 找到 {} 条", filename, trades.len());
- all_trades.append(&mut trades);
- }
- serde_json::to_value(&all_trades).unwrap()
- }
- // 将一个时间段范围内的所有Record返回(以json形式)
- pub async fn collect_records_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
- let mut records = Vec::new();
- let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "record");
- for filename in filenames {
- let file_path = PathBuf::from(filename.as_str());
- let file_content = fs::read_to_string(file_path).await;
- // 检查文件内容是否成功读取
- if let Ok(content) = file_content {
- // 尝试反序列化文件内容
- if let Ok(record) = serde_json::from_str::<Record>(&content) {
- // info!("{} 找到 1 条", filename);
- records.push(record.clone());
- }
- }
- }
- serde_json::to_value(&records).unwrap()
- }
- // 将一个时间段范围内的所有Depth返回(以json形式)
- pub async fn collect_depth_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
- let mut depths = Vec::new();
- let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "order_book");
- for filename in filenames {
- let file_path = PathBuf::from(filename.as_str());
- let file_content = fs::read_to_string(file_path).await;
- // 检查文件内容是否成功读取
- if let Ok(content) = file_content {
- // 尝试反序列化文件内容
- if let Ok(depth_list) = serde_json::from_str::<Vec<SpecialDepth>>(&content) {
- // info!("{} 找到 1 条", filename);
- for depth in depth_list.iter().rev() {
- // 不在时间范围内的就不要返回了
- let t = depth.t.to_i64().unwrap();
- if t < start_timestamp || t > end_timestamp {
- continue;
- }
- depths.push(depth.clone())
- }
- }
- }
- }
- serde_json::to_value(&depths).unwrap()
- }
- // 将一个时间段范围内的所有SimpleDepth返回(以json形式)
- pub async fn collect_simple_depth_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
- let mut simple_depths = Vec::new();
- let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "order_book_simple");
- for filename in filenames {
- let file_path = PathBuf::from(filename.as_str());
- let file_content = fs::read_to_string(file_path).await;
- // 检查文件内容是否成功读取
- if let Ok(content) = file_content {
- // 尝试反序列化文件内容
- if let Ok(depth_list) = serde_json::from_str::<Vec<SimpleDepth>>(&content) {
- // info!("{} 找到 1 条", filename);
- for depth in depth_list.iter().rev() {
- // 不在时间范围内的就不要返回了
- let t = depth.time.to_i64().unwrap();
- if t < start_timestamp || t > end_timestamp {
- continue;
- }
- simple_depths.push(depth.clone())
- }
- }
- }
- }
- serde_json::to_value(&simple_depths).unwrap()
- }
- // 清除指定时间前db文件
- pub async fn delete_db_by_exchange(exchange: &str, categories: Vec<&str>, retention_minute: i64) {
- let exchange_path = format!("./db/{}", exchange);
- let directories_name = get_directory(&exchange_path).await;
- let symbols = get_symbols_by_exchange(exchange);
- // 获取5小时前分钟时间戳
- let minute_timestamp = (Utc::now().timestamp_millis() - retention_minute * 60 * 1000) / 60 / 1000;
- let day = minute_to_date(minute_timestamp);
- for directory in directories_name.iter() {
- if Decimal::from_str(&directory).unwrap() < Decimal::from_str(&day).unwrap() {
- let directory_path = format!("{}/{}", exchange_path, directory);
- delete_directory(&directory_path).await;
- } else {
- for symbol in symbols.as_array().unwrap() {
- for category in categories.clone() {
- let trades_files_path = format!("{}/{}/{}/{}", exchange_path, directory, symbol.as_str().unwrap(), category);
- let trades_files_name = get_directory(&trades_files_path).await;
- for file in trades_files_name {
- let file_name: Vec<&str> = file.split(".").collect();
- if Decimal::from_str(&file_name[0]).unwrap() < Decimal::from_str(&minute_timestamp.to_string()).unwrap() {
- let path = format!("{}/{}", trades_files_path, file);
- delete_directory(&path).await;
- }
- }
- }
- }
- }
- }
- }
- // 获取目录下文件、文件夹
- async fn get_directory(target_directory: &str) -> Vec<String> {
- let mut files_name: Vec<String> = Vec::new();
- if let Ok(mut entries) = fs::read_dir(target_directory).await {
- // 遍历条目并处理文件
- while let Ok(Some(entry)) = entries.next_entry().await {
- let path = entry.path();
- files_name.push(path.file_name().unwrap().to_str().unwrap().to_string());
- };
- } else {
- error!("获取目录下文件、文件夹失败!目录:{}",target_directory);
- }
- files_name
- }
- // 删除目录下文件、文件夹
- async fn delete_directory(target_directory: &str) {
- let path = Path::new(target_directory);
- if let Ok(metadata) = fs::metadata(path).await {
- if metadata.is_file() {
- // 删除文件
- match fs::remove_file(path).await {
- Ok(_) => info!("删除文件成功: {}", path.display()),
- Err(e) => error!("删除文件失败 {}: {}", path.display(), e),
- }
- } else if metadata.is_dir() {
- // 删除文件夹及其内容
- match fs::remove_dir_all(path).await {
- Ok(_) => info!("删除文件夹成功: {}", path.display()),
- Err(e) => error!("删除文件夹失败 {}: {}", path.display(), e),
- }
- } else {
- error!("未知类型文件: {}", path.display());
- }
- } else {
- error!("没有找到路径: {}", path.display());
- }
- }
- fn find_latest_directory(path: &Path) -> std::io::Result<Option<PathBuf>> {
- let mut latest: Option<PathBuf> = None;
- for entry in std::fs::read_dir(path)? {
- let entry = entry?;
- let path = entry.path();
- // 仅处理目录
- if path.is_dir() {
- // 更新最新目录的逻辑
- if let Some(ref latest_path) = latest {
- // 比较当前目录名与最新目录名
- if path.file_name().unwrap() > latest_path.file_name().unwrap() {
- latest = Some(path);
- }
- } else {
- latest = Some(path);
- }
- }
- }
- Ok(latest)
- }
- fn list_directories(path: &Path) -> std::io::Result<Vec<PathBuf>> {
- let mut directories = Vec::new();
- for entry in std::fs::read_dir(path)? {
- let entry = entry?;
- if entry.path().is_dir() {
- directories.push(entry.path());
- }
- }
- Ok(directories)
- }
- // 获取某个交易所的所有币对(获取最新能获取到的)
- pub fn get_symbols_by_exchange(exchange: &str) -> Value {
- let mut symbols = vec![];
- let path_str = format!("./db/{}", exchange);
- let path = Path::new(&path_str);
- let latest_directory = find_latest_directory(path);
- match latest_directory {
- Ok(dir) => {
- let latest_dir = dir.unwrap();
- info!("找到最后一日生成的目录: {}", latest_dir.to_str().unwrap());
- let subdirectories = list_directories(&latest_dir).unwrap();
- for sub_dir in subdirectories {
- symbols.push(sub_dir.file_name().unwrap().to_str().unwrap().to_string())
- }
- }
- Err(_) => {
- return serde_json::to_value(&symbols).unwrap();
- }
- }
- return serde_json::to_value(&symbols).unwrap();
- }
- #[tokio::test]
- async fn read_symbols_test() {
- use global::log_utils::init_log_with_info;
- init_log_with_info();
- get_symbols_by_exchange("bitget_usdt_swap");
- }
- #[tokio::test]
- async fn read_test() {
- use global::log_utils::init_log_with_info;
- init_log_with_info();
- let rst = collect_special_trades_json(1712894400000, 1712912400000, "gate_usdt_swap", "CFX_USDT").await;
- info!("{}", rst)
- }
- #[tokio::test]
- async fn write_test() {
- use std::time::Duration;
- use tokio::time::sleep;
- // 调用函数,不需要等待它完成
- write_to_file("{\"key\": \"value\"}".to_string(), "db/test.json".to_string()).await;
- sleep(Duration::from_secs(2)).await;
- }
|