use serde::Serialize; use serde_json; use std::fs::{self, File, OpenOptions}; use std::io::{self, BufWriter, Write}; use std::path::PathBuf; use rust_decimal::Decimal; // --------------------- 错误处理 --------------------- #[derive(Debug)] pub enum WriterError { Serde(serde_json::Error), Io(io::Error), InvalidPath(String), // 用于处理路径错误 } impl std::fmt::Display for WriterError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { WriterError::Serde(e) => write!(f, "JSON serialization error: {}", e), WriterError::Io(e) => write!(f, "File I/O error: {}", e), WriterError::InvalidPath(s) => write!(f, "Invalid path: {}", s), } } } impl std::error::Error for WriterError {} impl From for WriterError { fn from(err: serde_json::Error) -> Self { WriterError::Serde(err) } } impl From for WriterError { fn from(err: io::Error) -> Self { WriterError::Io(err) } } // --------------------- 核心写入器 --------------------- pub struct IncrementalJsonWriter where T: Serialize { base_file_path: PathBuf, // 文件的基础路径和名称前缀 (e.g., "data/my_log") current_file_idx: usize, // 当前文件的索引 (e.g., 0, 1, 2...) max_file_size_bytes: u64, // 最大文件大小 (字节) current_file_size_bytes: u64, // 当前文件已写入的字节数 batch_size: usize, buffer: Vec, // 使用 BufWriter 来提高写入效率,每次写入不需要立即刷新到磁盘 current_writer: Option>, } impl IncrementalJsonWriter where T: Serialize { /// 创建一个新的 IncrementalJsonWriter 实例。 /// `base_path_prefix` 是文件路径和前缀,例如 "logs/events" 会生成 "logs/events_0.json", "logs/events_1.json" /// `max_file_size_mb` 是单个文件的最大大小,以 MB 为单位。 pub fn new(base_path_prefix: &str, max_file_size_mb: u64, batch_size: usize) -> Result { let base_file_path = PathBuf::from(base_path_prefix); let max_file_size_bytes = max_file_size_mb * 1024 * 1024; // MB 转换为字节 // 确保父目录存在 if let Some(parent) = base_file_path.parent() { if parent.as_os_str() != "" { // 避免在当前目录创建时创建不存在的父目录 fs::create_dir_all(parent)?; } } Ok(IncrementalJsonWriter { base_file_path, current_file_idx: 0, // 从文件索引 0 开始 max_file_size_bytes, current_file_size_bytes: 0, current_writer: None, batch_size, buffer: Vec::with_capacity(batch_size), }) } /// 获取完整的文件路径 (e.g., "data/my_log_0.json") fn get_full_file_path(&self, index: usize) -> PathBuf { let mut path = self.base_file_path.clone(); // 复制基础路径 let filename = format!("{}_{}.json", path.file_name().unwrap_or_default().to_string_lossy(), // 获取文件名部分 index ); path.set_file_name(filename); // 设置新的带索引的文件名 path } /// 打开或创建一个新的文件,并设置 self.current_writer。 /// 会查找最新的文件索引并从该文件继续写入,或者创建新的文件。 fn open_current_file(&mut self) -> Result<(), WriterError> { // 先检查是否有打开的文件,如果有,先刷新并关闭 if let Some(writer) = self.current_writer.take() { writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘 } // 查找最新的文件索引,以便在程序重启后可以继续写入 // 这需要遍历所有可能的文件,找到最大的那个索引 let mut latest_idx = 0; loop { let candidate_path = self.get_full_file_path(latest_idx); if candidate_path.exists() { latest_idx += 1; } else { break; } } // 如果最新索引文件存在,则从该文件继续。否则,使用 latest_idx-1 + 1 作为新文件。 // 但为了简单和确定性,我们总是从索引0开始,然后简单地找到最后一个文件索引加上1 // 注意:这种查找方式适用于程序每次启动都重新开始写文件的情况。 // 如果需要接续之前未写满的文件,逻辑会更复杂:需要read_dir并解析文件名。 // 为了本例的增量写入和文件切换,我们简单地从 current_file_idx + 1 开始。 // 这里为了演示“增量写入”,我们默认从0开始写,然后自动递增。 // 如果要实现“断点续写”,则需要在启动时扫描目录找到最大的 index 文件,并读取其大小。 // For simplicity, let's just use current_file_idx as the next one to create // and always start from 0 if no file exists. let mut target_idx = self.current_file_idx; let mut target_path = self.get_full_file_path(target_idx); // 如果文件已存在,我们应该尝试从它上次的大小继续,或者直接创建新文件。 // 这里策略是:如果当前文件已满或不存在,就开下一个文件。 // 简化的策略:总是从 current_file_idx 开始,如果已存在,且未满,就追加;如果已满,就开下一个。 // 更简单的策略:每次调用 open_current_file 都开一个新文件,或者检查当前文件是否真的需要切换 // 寻找一个适合写入的新文件 loop { let next_file_path = self.get_full_file_path(self.current_file_idx); if next_file_path.exists() { let metadata = fs::metadata(&next_file_path)?; if metadata.len() >= self.max_file_size_bytes { self.current_file_idx += 1; // 切换到下一个文件 self.current_file_size_bytes = 0; // 重置大小 continue; // 继续循环,尝试下一个文件 } else { // 文件存在且未满,尝试追加写入 self.current_file_size_bytes = metadata.len(); target_path = next_file_path; // 找到目标文件 break; } } else { // 文件不存在,这是个新文件 self.current_file_size_bytes = 0; target_path = next_file_path; break; } } let file = OpenOptions::new() .create(true) // 如果文件不存在则创建 .append(true) // 以追加模式打开 .open(&target_path)?; self.current_writer = Some(BufWriter::new(file)); Ok(()) } /// 添加一条日志到缓冲区。如果缓冲区满,会触发一次写入。 pub fn add_entry(&mut self, entry: T) -> Result<(), Box> { self.buffer.push(entry); // 如果缓冲区达到批处理大小,执行写入 if self.buffer.len() >= self.batch_size { self.flush()?; } Ok(()) } /// 写入单个结构体数据到文件中。 pub fn flush(&mut self) -> Result<(), WriterError> { // 序列化缓冲区中的每条日志为 JSON 字符串,并用换行符连接 // 注意:这里每个 entry 都是一个独立的 JSON 对象 let mut batch_json_lines = String::new(); for entry in &self.buffer { // 将每条日志序列化为 JSON 字符串 let json_line = serde_json::to_string(entry)?; batch_json_lines.push_str(&json_line); batch_json_lines.push('\n'); // JSON Lines 格式需要换行符 } let data_len = batch_json_lines.as_bytes().len() as u64; // 如果没有打开的文件,或者当前文件已满,则打开新文件 if self.current_writer.is_none() || self.current_file_size_bytes + data_len > self.max_file_size_bytes { // 如果 current_writer.is_some(),说明前一个文件已满,需要刷新并关闭 if let Some(writer) = self.current_writer.take() { writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘 self.current_file_idx += 1; // 切换到下一个文件索引 self.current_file_size_bytes = 0; // 重置计数 } // 打开新的文件 self.open_current_file()?; } // 写入数据 if let Some(writer) = self.current_writer.as_mut() { writer.write_all(batch_json_lines.as_bytes())?; self.current_file_size_bytes += data_len; writer.flush()?; // 确保 BufWriter 的内容写入到文件 writer.get_ref().sync_all()?; // 确保数据写入磁盘 // 清空缓冲区,准备下一批 self.buffer.clear(); self.buffer.reserve(self.batch_size); // 重新预分配容量 } else { // 这应该不会发生,因为前面已经确保了 writer 是 Some return Err(WriterError::Io(io::Error::new(io::ErrorKind::Other, "209 Writer not initialized unexpectedly"))); } Ok(()) } } // 当 IncrementalJsonWriter 实例被 Drop 时,确保文件被关闭并数据写入磁盘 impl Drop for IncrementalJsonWriter where T: Serialize { fn drop(&mut self) { if let Some(mut writer) = self.current_writer.take() { // Try to flush and sync, but ignore errors during shutdown let _ = writer.flush(); let _ = writer.into_inner().and_then(|f| Ok(f.sync_all())); } } } #[derive(Serialize, Debug)] pub struct OrderLog{ // 挂单信息 pub order_info: Vec>, // 记录时间 pub record_timestamp: i64 } #[derive(Serialize, Debug)] pub struct FixPriceLog{ // 定价 pub fix_price: Decimal, // 记录时间 pub record_timestamp: i64 } // --------------------- 示例数据结构 --------------------- #[derive(Serialize, Debug)] struct LogEntry { timestamp: String, level: String, message: String, session_id: String, #[serde(skip_serializing_if = "Option::is_none")] // 如果为None则不序列化 user_id: Option, } #[cfg(test)] mod tests { use crate::incremental_json_writer::{IncrementalJsonWriter, LogEntry}; #[test] fn test_incremental_json_writer() { // 初始化写入器,文件前缀为 "logs/app_log",最大文件大小为 1MB let base_path = "../logs/app_log"; let max_size_mb = 10; // 1 MB let mut writer = IncrementalJsonWriter::::new(base_path, max_size_mb, 100usize).unwrap(); println!("Starting to write log entries..."); let mut user_id_counter = 1000; // 写入大量数据,直到触发文件切换 for i in 0..200000 { // 写入20万条记录,看看文件切换效果 let timestamp = chrono::Local::now().to_rfc3339(); let level = if i % 10 == 0 { "ERROR" } else { "INFO" }.to_string(); let message = format!("Processing record {} for user {}...", i, user_id_counter); let session_id = format!("sess_{}", i / 100); let user_id = if i % 5 == 0 { user_id_counter += 1; Some(user_id_counter) } else { None }; let entry = LogEntry { timestamp, level, message, session_id, user_id, }; writer.add_entry(entry).unwrap(); } if writer.buffer.len() > 0 { writer.flush().unwrap(); // 确保所有数据都已写入 } println!("\nFinished writing records. Final flush..."); println!("Logs written to files under the '{}' directory.", base_path); } }