json_db_utils.rs 13 KB


  1. use std::path::{Path, PathBuf};
  2. use std::str::FromStr;
  3. use chrono::{FixedOffset, TimeZone, Utc};
  4. use rust_decimal::Decimal;
  5. use rust_decimal::prelude::ToPrimitive;
  6. use serde_json::Value;
  7. use tokio::fs::File;
  8. use tokio::io::AsyncWriteExt;
  9. use tokio::{fs};
  10. use tracing::{error, info};
  11. use standard::{Record, SimpleDepth, SpecialDepth, SpecialTrade};
  12. pub async fn write_to_file(json_data: String, file_path: String) {
  13. // 尝试创建文件路径
  14. if let Err(e) = fs::create_dir_all(
  15. // 获取文件目录路径
  16. Path::new(&file_path)
  17. .parent() // 获取父目录(即文件路径除去文件名后的部分)
  18. .unwrap_or_else(|| Path::new("")), // 如果没有父目录,使用当前目录
  19. )
  20. .await
  21. {
  22. // 如果创建路径失败,打印错误日志
  23. error!("创建目录错误: {:?}", e);
  24. return; // 结束任务
  25. }
  26. // 异步地执行文件写入操作
  27. if let Err(e) = async {
  28. let mut file = File::create(&file_path).await?;
  29. file.write_all(json_data.as_bytes()).await?;
  30. Result::<(), std::io::Error>::Ok(())
  31. }
  32. .await
  33. {
  34. // 如果发生错误,只打印错误日志
  35. error!("json db写入错误: {:?}", e);
  36. }
  37. }
  38. // 根据时间戳生成文件名列表
  39. fn generate_filenames(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str, subscriber_type: &str) -> Vec<String> {
  40. let mut filenames = Vec::new();
  41. let start_minute = start_timestamp / 60000; // 转换为分钟
  42. let end_minute = end_timestamp / 60000; // 转换为分钟
  43. let mut minute = end_minute;
  44. while minute >= start_minute {
  45. let date_str = minute_to_date(minute);
  46. filenames.push(generate_file_path(exchange, date_str.as_str(), symbol, subscriber_type, minute));
  47. minute -= 1;
  48. }
  49. filenames
  50. }
  51. pub fn generate_file_path(exchange: &str, formatted_date: &str, symbol: &str, subscriber_type: &str, serial: i64) -> String {
  52. return format!("db/{}/{}/{}/{}/{}.json", exchange, formatted_date, symbol, subscriber_type, serial);
  53. }
  54. // 函数:将分钟数转换为日期字符串,格式为 YYYYMMDD
  55. pub fn minute_to_date(minute: i64) -> String {
  56. // 将分钟转换为秒
  57. let seconds = minute * 60;
  58. // 创建一个代表东八区(GMT+8)的时间偏移
  59. let east_eight_zone = FixedOffset::east_opt(8 * 3600).unwrap();
  60. // 使用 UTC 时间创建 DateTime 对象,然后将其转换为东八区时间
  61. let datetime_utc = Utc.timestamp_opt(seconds, 0).unwrap();
  62. let datetime_east_eight = datetime_utc.with_timezone(&east_eight_zone);
  63. // 返回日期字符串(格式 YYYYMMDD)
  64. datetime_east_eight.format("%Y%m%d").to_string()
  65. }
  66. // 将一个时间段范围内的所有SpecialTrade返回(以json形式)
  67. pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
  68. let mut all_trades = Vec::new();
  69. let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "trades");
  70. for filename in filenames {
  71. let file_path = PathBuf::from(filename.as_str());
  72. let file_content = fs::read_to_string(file_path).await;
  73. // 检查文件内容是否成功读取
  74. let mut trades = if let Ok(content) = file_content {
  75. // 尝试反序列化文件内容
  76. if let Ok(trades) = serde_json::from_str::<Vec<SpecialTrade>>(&content) {
  77. trades // 成功反序列化,返回结果
  78. } else {
  79. vec![] // 反序列化失败,返回空 Vec
  80. }
  81. } else {
  82. vec![] // 读取文件失败,返回空 Vec
  83. };
  84. trades.reverse();
  85. // info!("{} 找到 {} 条", filename, trades.len());
  86. all_trades.append(&mut trades);
  87. }
  88. serde_json::to_value(&all_trades).unwrap()
  89. }
  90. // 将一个时间段范围内的所有Record返回(以json形式)
  91. pub async fn collect_records_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
  92. let mut records = Vec::new();
  93. let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "record");
  94. for filename in filenames {
  95. let file_path = PathBuf::from(filename.as_str());
  96. let file_content = fs::read_to_string(file_path).await;
  97. // 检查文件内容是否成功读取
  98. if let Ok(content) = file_content {
  99. // 尝试反序列化文件内容
  100. if let Ok(record) = serde_json::from_str::<Record>(&content) {
  101. // info!("{} 找到 1 条", filename);
  102. records.push(record.clone());
  103. }
  104. }
  105. }
  106. serde_json::to_value(&records).unwrap()
  107. }
  108. // 将一个时间段范围内的所有Depth返回(以json形式)
  109. pub async fn collect_depth_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
  110. let mut depths = Vec::new();
  111. let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "order_book");
  112. for filename in filenames {
  113. let file_path = PathBuf::from(filename.as_str());
  114. let file_content = fs::read_to_string(file_path).await;
  115. // 检查文件内容是否成功读取
  116. if let Ok(content) = file_content {
  117. // 尝试反序列化文件内容
  118. if let Ok(depth_list) = serde_json::from_str::<Vec<SpecialDepth>>(&content) {
  119. // info!("{} 找到 1 条", filename);
  120. for depth in depth_list.iter().rev() {
  121. // 不在时间范围内的就不要返回了
  122. let t = depth.t.to_i64().unwrap();
  123. if t < start_timestamp || t > end_timestamp {
  124. continue;
  125. }
  126. depths.push(depth.clone())
  127. }
  128. }
  129. }
  130. }
  131. serde_json::to_value(&depths).unwrap()
  132. }
  133. // 将一个时间段范围内的所有SimpleDepth返回(以json形式)
  134. pub async fn collect_simple_depth_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
  135. let mut simple_depths = Vec::new();
  136. let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "order_book_simple");
  137. for filename in filenames {
  138. let file_path = PathBuf::from(filename.as_str());
  139. let file_content = fs::read_to_string(file_path).await;
  140. // 检查文件内容是否成功读取
  141. if let Ok(content) = file_content {
  142. // 尝试反序列化文件内容
  143. if let Ok(depth_list) = serde_json::from_str::<Vec<SimpleDepth>>(&content) {
  144. // info!("{} 找到 1 条", filename);
  145. for depth in depth_list.iter().rev() {
  146. // 不在时间范围内的就不要返回了
  147. let t = depth.time.to_i64().unwrap();
  148. if t < start_timestamp || t > end_timestamp {
  149. continue;
  150. }
  151. simple_depths.push(depth.clone())
  152. }
  153. }
  154. }
  155. }
  156. serde_json::to_value(&simple_depths).unwrap()
  157. }
  158. // 清除指定时间前db文件
  159. pub async fn delete_db_by_exchange(exchange: &str, categories: Vec<&str>, retention_minute: i64) {
  160. let exchange_path = format!("./db/{}", exchange);
  161. let directories_name = get_directory(&exchange_path).await;
  162. let symbols = get_symbols_by_exchange(exchange);
  163. // 获取5小时前分钟时间戳
  164. let minute_timestamp = (Utc::now().timestamp_millis() - retention_minute * 60 * 1000) / 60 / 1000;
  165. let day = minute_to_date(minute_timestamp);
  166. for directory in directories_name.iter() {
  167. if Decimal::from_str(&directory).unwrap() < Decimal::from_str(&day).unwrap() {
  168. let directory_path = format!("{}/{}", exchange_path, directory);
  169. delete_directory(&directory_path).await;
  170. } else {
  171. for symbol in symbols.as_array().unwrap() {
  172. for category in categories.clone() {
  173. let trades_files_path = format!("{}/{}/{}/{}", exchange_path, directory, symbol.as_str().unwrap(), category);
  174. let trades_files_name = get_directory(&trades_files_path).await;
  175. for file in trades_files_name {
  176. let file_name: Vec<&str> = file.split(".").collect();
  177. if Decimal::from_str(&file_name[0]).unwrap() < Decimal::from_str(&minute_timestamp.to_string()).unwrap() {
  178. let path = format!("{}/{}", trades_files_path, file);
  179. delete_directory(&path).await;
  180. }
  181. }
  182. }
  183. }
  184. }
  185. }
  186. }
  187. // 获取目录下文件、文件夹
  188. async fn get_directory(target_directory: &str) -> Vec<String> {
  189. let mut files_name: Vec<String> = Vec::new();
  190. if let Ok(mut entries) = fs::read_dir(target_directory).await {
  191. // 遍历条目并处理文件
  192. while let Ok(Some(entry)) = entries.next_entry().await {
  193. let path = entry.path();
  194. files_name.push(path.file_name().unwrap().to_str().unwrap().to_string());
  195. };
  196. } else {
  197. error!("获取目录下文件、文件夹失败!目录:{}",target_directory);
  198. }
  199. files_name
  200. }
  201. // 删除目录下文件、文件夹
  202. async fn delete_directory(target_directory: &str) {
  203. let path = Path::new(target_directory);
  204. if let Ok(metadata) = fs::metadata(path).await {
  205. if metadata.is_file() {
  206. // 删除文件
  207. match fs::remove_file(path).await {
  208. Ok(_) => info!("删除文件成功: {}", path.display()),
  209. Err(e) => error!("删除文件失败 {}: {}", path.display(), e),
  210. }
  211. } else if metadata.is_dir() {
  212. // 删除文件夹及其内容
  213. match fs::remove_dir_all(path).await {
  214. Ok(_) => info!("删除文件夹成功: {}", path.display()),
  215. Err(e) => error!("删除文件夹失败 {}: {}", path.display(), e),
  216. }
  217. } else {
  218. error!("未知类型文件: {}", path.display());
  219. }
  220. } else {
  221. error!("没有找到路径: {}", path.display());
  222. }
  223. }
  224. fn find_latest_directory(path: &Path) -> std::io::Result<Option<PathBuf>> {
  225. let mut latest: Option<PathBuf> = None;
  226. for entry in std::fs::read_dir(path)? {
  227. let entry = entry?;
  228. let path = entry.path();
  229. // 仅处理目录
  230. if path.is_dir() {
  231. // 更新最新目录的逻辑
  232. if let Some(ref latest_path) = latest {
  233. // 比较当前目录名与最新目录名
  234. if path.file_name().unwrap() > latest_path.file_name().unwrap() {
  235. latest = Some(path);
  236. }
  237. } else {
  238. latest = Some(path);
  239. }
  240. }
  241. }
  242. Ok(latest)
  243. }
  244. fn list_directories(path: &Path) -> std::io::Result<Vec<PathBuf>> {
  245. let mut directories = Vec::new();
  246. for entry in std::fs::read_dir(path)? {
  247. let entry = entry?;
  248. if entry.path().is_dir() {
  249. directories.push(entry.path());
  250. }
  251. }
  252. Ok(directories)
  253. }
  254. // 获取某个交易所的所有币对(获取最新能获取到的)
  255. pub fn get_symbols_by_exchange(exchange: &str) -> Value {
  256. let mut symbols = vec![];
  257. let path_str = format!("./db/{}", exchange);
  258. let path = Path::new(&path_str);
  259. let latest_directory = find_latest_directory(path);
  260. match latest_directory {
  261. Ok(dir) => {
  262. let latest_dir = dir.unwrap();
  263. info!("找到最后一日生成的目录: {}", latest_dir.to_str().unwrap());
  264. let subdirectories = list_directories(&latest_dir).unwrap();
  265. for sub_dir in subdirectories {
  266. symbols.push(sub_dir.file_name().unwrap().to_str().unwrap().to_string())
  267. }
  268. }
  269. Err(_) => {
  270. return serde_json::to_value(&symbols).unwrap();
  271. }
  272. }
  273. return serde_json::to_value(&symbols).unwrap();
  274. }
  275. #[tokio::test]
  276. async fn read_symbols_test() {
  277. use global::log_utils::init_log_with_info;
  278. init_log_with_info();
  279. get_symbols_by_exchange("bitget_usdt_swap");
  280. }
  281. #[tokio::test]
  282. async fn read_test() {
  283. use global::log_utils::init_log_with_info;
  284. init_log_with_info();
  285. let rst = collect_special_trades_json(1712894400000, 1712912400000, "gate_usdt_swap", "CFX_USDT").await;
  286. info!("{}", rst)
  287. }
  288. #[tokio::test]
  289. async fn write_test() {
  290. use std::time::Duration;
  291. use tokio::time::sleep;
  292. // 调用函数,不需要等待它完成
  293. write_to_file("{\"key\": \"value\"}".to_string(), "db/test.json".to_string()).await;
  294. sleep(Duration::from_secs(2)).await;
  295. }