incremental_json_writer.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. use serde::Serialize;
  2. use serde_json;
  3. use std::fs::{self, File, OpenOptions};
  4. use std::io::{self, BufWriter, Write};
  5. use std::path::PathBuf;
  6. use rust_decimal::Decimal;
  7. // --------------------- 错误处理 ---------------------
  8. #[derive(Debug)]
  9. pub enum WriterError {
  10. Serde(serde_json::Error),
  11. Io(io::Error),
  12. InvalidPath(String), // 用于处理路径错误
  13. }
  14. impl std::fmt::Display for WriterError {
  15. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  16. match self {
  17. WriterError::Serde(e) => write!(f, "JSON serialization error: {}", e),
  18. WriterError::Io(e) => write!(f, "File I/O error: {}", e),
  19. WriterError::InvalidPath(s) => write!(f, "Invalid path: {}", s),
  20. }
  21. }
  22. }
  23. impl std::error::Error for WriterError {}
  24. impl From<serde_json::Error> for WriterError {
  25. fn from(err: serde_json::Error) -> Self {
  26. WriterError::Serde(err)
  27. }
  28. }
  29. impl From<io::Error> for WriterError {
  30. fn from(err: io::Error) -> Self {
  31. WriterError::Io(err)
  32. }
  33. }
  34. // --------------------- 核心写入器 ---------------------
  35. pub struct IncrementalJsonWriter<T> where T: Serialize {
  36. base_file_path: PathBuf, // 文件的基础路径和名称前缀 (e.g., "data/my_log")
  37. current_file_idx: usize, // 当前文件的索引 (e.g., 0, 1, 2...)
  38. max_file_size_bytes: u64, // 最大文件大小 (字节)
  39. current_file_size_bytes: u64, // 当前文件已写入的字节数
  40. batch_size: usize,
  41. buffer: Vec<T>,
  42. // 使用 BufWriter 来提高写入效率,每次写入不需要立即刷新到磁盘
  43. current_writer: Option<BufWriter<File>>,
  44. }
  45. impl<T> IncrementalJsonWriter<T> where T: Serialize {
  46. /// 创建一个新的 IncrementalJsonWriter 实例。
  47. /// `base_path_prefix` 是文件路径和前缀,例如 "logs/events" 会生成 "logs/events_0.json", "logs/events_1.json"
  48. /// `max_file_size_mb` 是单个文件的最大大小,以 MB 为单位。
  49. pub fn new(base_path_prefix: &str, max_file_size_mb: u64, batch_size: usize) -> Result<Self, WriterError> {
  50. let base_file_path = PathBuf::from(base_path_prefix);
  51. let max_file_size_bytes = max_file_size_mb * 1024 * 1024; // MB 转换为字节
  52. // 确保父目录存在
  53. if let Some(parent) = base_file_path.parent() {
  54. if parent.as_os_str() != "" { // 避免在当前目录创建时创建不存在的父目录
  55. fs::create_dir_all(parent)?;
  56. }
  57. }
  58. Ok(IncrementalJsonWriter {
  59. base_file_path,
  60. current_file_idx: 0, // 从文件索引 0 开始
  61. max_file_size_bytes,
  62. current_file_size_bytes: 0,
  63. current_writer: None,
  64. batch_size,
  65. buffer: Vec::with_capacity(batch_size),
  66. })
  67. }
  68. /// 获取完整的文件路径 (e.g., "data/my_log_0.json")
  69. fn get_full_file_path(&self, index: usize) -> PathBuf {
  70. let mut path = self.base_file_path.clone(); // 复制基础路径
  71. let filename = format!("{}_{}.json",
  72. path.file_name().unwrap_or_default().to_string_lossy(), // 获取文件名部分
  73. index
  74. );
  75. path.set_file_name(filename); // 设置新的带索引的文件名
  76. path
  77. }
  78. /// 打开或创建一个新的文件,并设置 self.current_writer。
  79. /// 会查找最新的文件索引并从该文件继续写入,或者创建新的文件。
  80. fn open_current_file(&mut self) -> Result<(), WriterError> {
  81. // 先检查是否有打开的文件,如果有,先刷新并关闭
  82. if let Some(writer) = self.current_writer.take() {
  83. writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
  84. }
  85. // 查找最新的文件索引,以便在程序重启后可以继续写入
  86. // 这需要遍历所有可能的文件,找到最大的那个索引
  87. let mut latest_idx = 0;
  88. loop {
  89. let candidate_path = self.get_full_file_path(latest_idx);
  90. if candidate_path.exists() {
  91. latest_idx += 1;
  92. } else {
  93. break;
  94. }
  95. }
  96. // 如果最新索引文件存在,则从该文件继续。否则,使用 latest_idx-1 + 1 作为新文件。
  97. // 但为了简单和确定性,我们总是从索引0开始,然后简单地找到最后一个文件索引加上1
  98. // 注意:这种查找方式适用于程序每次启动都重新开始写文件的情况。
  99. // 如果需要接续之前未写满的文件,逻辑会更复杂:需要read_dir并解析文件名。
  100. // 为了本例的增量写入和文件切换,我们简单地从 current_file_idx + 1 开始。
  101. // 这里为了演示“增量写入”,我们默认从0开始写,然后自动递增。
  102. // 如果要实现“断点续写”,则需要在启动时扫描目录找到最大的 index 文件,并读取其大小。
  103. // For simplicity, let's just use current_file_idx as the next one to create
  104. // and always start from 0 if no file exists.
  105. let mut target_idx = self.current_file_idx;
  106. let mut target_path = self.get_full_file_path(target_idx);
  107. // 如果文件已存在,我们应该尝试从它上次的大小继续,或者直接创建新文件。
  108. // 这里策略是:如果当前文件已满或不存在,就开下一个文件。
  109. // 简化的策略:总是从 current_file_idx 开始,如果已存在,且未满,就追加;如果已满,就开下一个。
  110. // 更简单的策略:每次调用 open_current_file 都开一个新文件,或者检查当前文件是否真的需要切换
  111. // 寻找一个适合写入的新文件
  112. loop {
  113. let next_file_path = self.get_full_file_path(self.current_file_idx);
  114. if next_file_path.exists() {
  115. let metadata = fs::metadata(&next_file_path)?;
  116. if metadata.len() >= self.max_file_size_bytes {
  117. self.current_file_idx += 1; // 切换到下一个文件
  118. self.current_file_size_bytes = 0; // 重置大小
  119. continue; // 继续循环,尝试下一个文件
  120. } else {
  121. // 文件存在且未满,尝试追加写入
  122. self.current_file_size_bytes = metadata.len();
  123. target_path = next_file_path; // 找到目标文件
  124. break;
  125. }
  126. } else {
  127. // 文件不存在,这是个新文件
  128. self.current_file_size_bytes = 0;
  129. target_path = next_file_path;
  130. break;
  131. }
  132. }
  133. let file = OpenOptions::new()
  134. .create(true) // 如果文件不存在则创建
  135. .append(true) // 以追加模式打开
  136. .open(&target_path)?;
  137. self.current_writer = Some(BufWriter::new(file));
  138. Ok(())
  139. }
  140. /// 添加一条日志到缓冲区。如果缓冲区满,会触发一次写入。
  141. pub fn add_entry(&mut self, entry: T) -> Result<(), Box<dyn std::error::Error>> {
  142. self.buffer.push(entry);
  143. // 如果缓冲区达到批处理大小,执行写入
  144. if self.buffer.len() >= self.batch_size {
  145. self.flush()?;
  146. }
  147. Ok(())
  148. }
  149. /// 写入单个结构体数据到文件中。
  150. pub fn flush(&mut self) -> Result<(), WriterError> {
  151. // 序列化缓冲区中的每条日志为 JSON 字符串,并用换行符连接
  152. // 注意:这里每个 entry 都是一个独立的 JSON 对象
  153. let mut batch_json_lines = String::new();
  154. for entry in &self.buffer {
  155. // 将每条日志序列化为 JSON 字符串
  156. let json_line = serde_json::to_string(entry)?;
  157. batch_json_lines.push_str(&json_line);
  158. batch_json_lines.push('\n'); // JSON Lines 格式需要换行符
  159. }
  160. let data_len = batch_json_lines.as_bytes().len() as u64;
  161. // 如果没有打开的文件,或者当前文件已满,则打开新文件
  162. if self.current_writer.is_none() || self.current_file_size_bytes + data_len > self.max_file_size_bytes {
  163. // 如果 current_writer.is_some(),说明前一个文件已满,需要刷新并关闭
  164. if let Some(writer) = self.current_writer.take() {
  165. writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
  166. self.current_file_idx += 1; // 切换到下一个文件索引
  167. self.current_file_size_bytes = 0; // 重置计数
  168. }
  169. // 打开新的文件
  170. self.open_current_file()?;
  171. }
  172. // 写入数据
  173. if let Some(writer) = self.current_writer.as_mut() {
  174. writer.write_all(batch_json_lines.as_bytes())?;
  175. self.current_file_size_bytes += data_len;
  176. writer.flush()?; // 确保 BufWriter 的内容写入到文件
  177. writer.get_ref().sync_all()?; // 确保数据写入磁盘
  178. // 清空缓冲区,准备下一批
  179. self.buffer.clear();
  180. self.buffer.reserve(self.batch_size); // 重新预分配容量
  181. } else {
  182. // 这应该不会发生,因为前面已经确保了 writer 是 Some
  183. return Err(WriterError::Io(io::Error::new(io::ErrorKind::Other, "209 Writer not initialized unexpectedly")));
  184. }
  185. Ok(())
  186. }
  187. }
  188. // 当 IncrementalJsonWriter 实例被 Drop 时,确保文件被关闭并数据写入磁盘
  189. impl<T> Drop for IncrementalJsonWriter<T> where T: Serialize {
  190. fn drop(&mut self) {
  191. if let Some(mut writer) = self.current_writer.take() {
  192. // Try to flush and sync, but ignore errors during shutdown
  193. let _ = writer.flush();
  194. let _ = writer.into_inner().and_then(|f| Ok(f.sync_all()));
  195. }
  196. }
  197. }
  198. #[derive(Serialize, Debug)]
  199. pub struct OrderLog{
  200. // 挂单信息
  201. pub order_info: Vec<Vec<String>>,
  202. // 记录时间
  203. pub record_timestamp: i64
  204. }
  205. #[derive(Serialize, Debug)]
  206. pub struct FixPriceLog{
  207. // 定价
  208. pub fix_price: Decimal,
  209. // 记录时间
  210. pub record_timestamp: i64
  211. }
  212. // --------------------- 示例数据结构 ---------------------
  213. #[derive(Serialize, Debug)]
  214. struct LogEntry {
  215. timestamp: String,
  216. level: String,
  217. message: String,
  218. session_id: String,
  219. #[serde(skip_serializing_if = "Option::is_none")] // 如果为None则不序列化
  220. user_id: Option<u32>,
  221. }
  222. #[cfg(test)]
  223. mod tests {
  224. use crate::incremental_json_writer::{IncrementalJsonWriter, LogEntry};
  225. #[test]
  226. fn test_incremental_json_writer() {
  227. // 初始化写入器,文件前缀为 "logs/app_log",最大文件大小为 1MB
  228. let base_path = "../logs/app_log";
  229. let max_size_mb = 10; // 1 MB
  230. let mut writer = IncrementalJsonWriter::<LogEntry>::new(base_path, max_size_mb, 100usize).unwrap();
  231. println!("Starting to write log entries...");
  232. let mut user_id_counter = 1000;
  233. // 写入大量数据,直到触发文件切换
  234. for i in 0..200000 { // 写入20万条记录,看看文件切换效果
  235. let timestamp = chrono::Local::now().to_rfc3339();
  236. let level = if i % 10 == 0 { "ERROR" } else { "INFO" }.to_string();
  237. let message = format!("Processing record {} for user {}...", i, user_id_counter);
  238. let session_id = format!("sess_{}", i / 100);
  239. let user_id = if i % 5 == 0 {
  240. user_id_counter += 1;
  241. Some(user_id_counter)
  242. } else {
  243. None
  244. };
  245. let entry = LogEntry {
  246. timestamp,
  247. level,
  248. message,
  249. session_id,
  250. user_id,
  251. };
  252. writer.add_entry(entry).unwrap();
  253. }
  254. if writer.buffer.len() > 0 {
  255. writer.flush().unwrap(); // 确保所有数据都已写入
  256. }
  257. println!("\nFinished writing records. Final flush...");
  258. println!("Logs written to files under the '{}' directory.", base_path);
  259. }
  260. }