| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- use serde::Serialize;
- use serde_json;
- use std::fs::{self, File, OpenOptions};
- use std::io::{self, BufWriter, Write};
- use std::path::PathBuf;
- use rust_decimal::Decimal;
- // --------------------- 错误处理 ---------------------
- #[derive(Debug)]
- pub enum WriterError {
- Serde(serde_json::Error),
- Io(io::Error),
- InvalidPath(String), // 用于处理路径错误
- }
- impl std::fmt::Display for WriterError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- WriterError::Serde(e) => write!(f, "JSON serialization error: {}", e),
- WriterError::Io(e) => write!(f, "File I/O error: {}", e),
- WriterError::InvalidPath(s) => write!(f, "Invalid path: {}", s),
- }
- }
- }
- impl std::error::Error for WriterError {}
- impl From<serde_json::Error> for WriterError {
- fn from(err: serde_json::Error) -> Self {
- WriterError::Serde(err)
- }
- }
- impl From<io::Error> for WriterError {
- fn from(err: io::Error) -> Self {
- WriterError::Io(err)
- }
- }
- // --------------------- 核心写入器 ---------------------
- pub struct IncrementalJsonWriter<T> where T: Serialize {
- base_file_path: PathBuf, // 文件的基础路径和名称前缀 (e.g., "data/my_log")
- current_file_idx: usize, // 当前文件的索引 (e.g., 0, 1, 2...)
- max_file_size_bytes: u64, // 最大文件大小 (字节)
- current_file_size_bytes: u64, // 当前文件已写入的字节数
- batch_size: usize,
- buffer: Vec<T>,
- // 使用 BufWriter 来提高写入效率,每次写入不需要立即刷新到磁盘
- current_writer: Option<BufWriter<File>>,
- }
- impl<T> IncrementalJsonWriter<T> where T: Serialize {
- /// 创建一个新的 IncrementalJsonWriter 实例。
- /// `base_path_prefix` 是文件路径和前缀,例如 "logs/events" 会生成 "logs/events_0.json", "logs/events_1.json"
- /// `max_file_size_mb` 是单个文件的最大大小,以 MB 为单位。
- pub fn new(base_path_prefix: &str, max_file_size_mb: u64, batch_size: usize) -> Result<Self, WriterError> {
- let base_file_path = PathBuf::from(base_path_prefix);
- let max_file_size_bytes = max_file_size_mb * 1024 * 1024; // MB 转换为字节
- // 确保父目录存在
- if let Some(parent) = base_file_path.parent() {
- if parent.as_os_str() != "" { // 避免在当前目录创建时创建不存在的父目录
- fs::create_dir_all(parent)?;
- }
- }
- Ok(IncrementalJsonWriter {
- base_file_path,
- current_file_idx: 0, // 从文件索引 0 开始
- max_file_size_bytes,
- current_file_size_bytes: 0,
- current_writer: None,
- batch_size,
- buffer: Vec::with_capacity(batch_size),
- })
- }
- /// 获取完整的文件路径 (e.g., "data/my_log_0.json")
- fn get_full_file_path(&self, index: usize) -> PathBuf {
- let mut path = self.base_file_path.clone(); // 复制基础路径
- let filename = format!("{}_{}.json",
- path.file_name().unwrap_or_default().to_string_lossy(), // 获取文件名部分
- index
- );
- path.set_file_name(filename); // 设置新的带索引的文件名
- path
- }
- /// 打开或创建一个新的文件,并设置 self.current_writer。
- /// 会查找最新的文件索引并从该文件继续写入,或者创建新的文件。
- fn open_current_file(&mut self) -> Result<(), WriterError> {
- // 先检查是否有打开的文件,如果有,先刷新并关闭
- if let Some(writer) = self.current_writer.take() {
- writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
- }
- // 查找最新的文件索引,以便在程序重启后可以继续写入
- // 这需要遍历所有可能的文件,找到最大的那个索引
- let mut latest_idx = 0;
- loop {
- let candidate_path = self.get_full_file_path(latest_idx);
- if candidate_path.exists() {
- latest_idx += 1;
- } else {
- break;
- }
- }
- // 如果最新索引文件存在,则从该文件继续。否则,使用 latest_idx-1 + 1 作为新文件。
- // 但为了简单和确定性,我们总是从索引0开始,然后简单地找到最后一个文件索引加上1
- // 注意:这种查找方式适用于程序每次启动都重新开始写文件的情况。
- // 如果需要接续之前未写满的文件,逻辑会更复杂:需要read_dir并解析文件名。
- // 为了本例的增量写入和文件切换,我们简单地从 current_file_idx + 1 开始。
- // 这里为了演示“增量写入”,我们默认从0开始写,然后自动递增。
- // 如果要实现“断点续写”,则需要在启动时扫描目录找到最大的 index 文件,并读取其大小。
- // For simplicity, let's just use current_file_idx as the next one to create
- // and always start from 0 if no file exists.
- let mut target_idx = self.current_file_idx;
- let mut target_path = self.get_full_file_path(target_idx);
- // 如果文件已存在,我们应该尝试从它上次的大小继续,或者直接创建新文件。
- // 这里策略是:如果当前文件已满或不存在,就开下一个文件。
- // 简化的策略:总是从 current_file_idx 开始,如果已存在,且未满,就追加;如果已满,就开下一个。
- // 更简单的策略:每次调用 open_current_file 都开一个新文件,或者检查当前文件是否真的需要切换
- // 寻找一个适合写入的新文件
- loop {
- let next_file_path = self.get_full_file_path(self.current_file_idx);
- if next_file_path.exists() {
- let metadata = fs::metadata(&next_file_path)?;
- if metadata.len() >= self.max_file_size_bytes {
- self.current_file_idx += 1; // 切换到下一个文件
- self.current_file_size_bytes = 0; // 重置大小
- continue; // 继续循环,尝试下一个文件
- } else {
- // 文件存在且未满,尝试追加写入
- self.current_file_size_bytes = metadata.len();
- target_path = next_file_path; // 找到目标文件
- break;
- }
- } else {
- // 文件不存在,这是个新文件
- self.current_file_size_bytes = 0;
- target_path = next_file_path;
- break;
- }
- }
- let file = OpenOptions::new()
- .create(true) // 如果文件不存在则创建
- .append(true) // 以追加模式打开
- .open(&target_path)?;
- self.current_writer = Some(BufWriter::new(file));
- Ok(())
- }
- /// 添加一条日志到缓冲区。如果缓冲区满,会触发一次写入。
- pub fn add_entry(&mut self, entry: T) -> Result<(), Box<dyn std::error::Error>> {
- self.buffer.push(entry);
- // 如果缓冲区达到批处理大小,执行写入
- if self.buffer.len() >= self.batch_size {
- self.flush()?;
- }
- Ok(())
- }
- /// 写入单个结构体数据到文件中。
- pub fn flush(&mut self) -> Result<(), WriterError> {
- // 序列化缓冲区中的每条日志为 JSON 字符串,并用换行符连接
- // 注意:这里每个 entry 都是一个独立的 JSON 对象
- let mut batch_json_lines = String::new();
- for entry in &self.buffer {
- // 将每条日志序列化为 JSON 字符串
- let json_line = serde_json::to_string(entry)?;
- batch_json_lines.push_str(&json_line);
- batch_json_lines.push('\n'); // JSON Lines 格式需要换行符
- }
- let data_len = batch_json_lines.as_bytes().len() as u64;
- // 如果没有打开的文件,或者当前文件已满,则打开新文件
- if self.current_writer.is_none() || self.current_file_size_bytes + data_len > self.max_file_size_bytes {
- // 如果 current_writer.is_some(),说明前一个文件已满,需要刷新并关闭
- if let Some(writer) = self.current_writer.take() {
- writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
- self.current_file_idx += 1; // 切换到下一个文件索引
- self.current_file_size_bytes = 0; // 重置计数
- }
- // 打开新的文件
- self.open_current_file()?;
- }
- // 写入数据
- if let Some(writer) = self.current_writer.as_mut() {
- writer.write_all(batch_json_lines.as_bytes())?;
- self.current_file_size_bytes += data_len;
- writer.flush()?; // 确保 BufWriter 的内容写入到文件
- writer.get_ref().sync_all()?; // 确保数据写入磁盘
-
- // 清空缓冲区,准备下一批
- self.buffer.clear();
- self.buffer.reserve(self.batch_size); // 重新预分配容量
- } else {
- // 这应该不会发生,因为前面已经确保了 writer 是 Some
- return Err(WriterError::Io(io::Error::new(io::ErrorKind::Other, "209 Writer not initialized unexpectedly")));
- }
- Ok(())
- }
- }
- // 当 IncrementalJsonWriter 实例被 Drop 时,确保文件被关闭并数据写入磁盘
- impl<T> Drop for IncrementalJsonWriter<T> where T: Serialize {
- fn drop(&mut self) {
- if let Some(mut writer) = self.current_writer.take() {
- // Try to flush and sync, but ignore errors during shutdown
- let _ = writer.flush();
- let _ = writer.into_inner().and_then(|f| Ok(f.sync_all()));
- }
- }
- }
- #[derive(Serialize, Debug)]
- pub struct OrderLog{
- // 挂单信息
- pub order_info: Vec<Vec<String>>,
- // 记录时间
- pub record_timestamp: i64
- }
- #[derive(Serialize, Debug)]
- pub struct FixPriceLog{
- // 定价
- pub fix_price: Decimal,
- // 记录时间
- pub record_timestamp: i64
- }
- // --------------------- 示例数据结构 ---------------------
- #[derive(Serialize, Debug)]
- struct LogEntry {
- timestamp: String,
- level: String,
- message: String,
- session_id: String,
- #[serde(skip_serializing_if = "Option::is_none")] // 如果为None则不序列化
- user_id: Option<u32>,
- }
- #[cfg(test)]
- mod tests {
- use crate::incremental_json_writer::{IncrementalJsonWriter, LogEntry};
- #[test]
- fn test_incremental_json_writer() {
- // 初始化写入器,文件前缀为 "logs/app_log",最大文件大小为 1MB
- let base_path = "../logs/app_log";
- let max_size_mb = 10; // 1 MB
- let mut writer = IncrementalJsonWriter::<LogEntry>::new(base_path, max_size_mb, 100usize).unwrap();
- println!("Starting to write log entries...");
- let mut user_id_counter = 1000;
- // 写入大量数据,直到触发文件切换
- for i in 0..200000 { // 写入20万条记录,看看文件切换效果
- let timestamp = chrono::Local::now().to_rfc3339();
- let level = if i % 10 == 0 { "ERROR" } else { "INFO" }.to_string();
- let message = format!("Processing record {} for user {}...", i, user_id_counter);
- let session_id = format!("sess_{}", i / 100);
- let user_id = if i % 5 == 0 {
- user_id_counter += 1;
- Some(user_id_counter)
- } else {
- None
- };
- let entry = LogEntry {
- timestamp,
- level,
- message,
- session_id,
- user_id,
- };
- writer.add_entry(entry).unwrap();
- }
- if writer.buffer.len() > 0 {
- writer.flush().unwrap(); // 确保所有数据都已写入
- }
- println!("\nFinished writing records. Final flush...");
- println!("Logs written to files under the '{}' directory.", base_path);
- }
- }
|