فهرست منبع

增加参考、交易交易所定价和挂单记录

JiahengHe 4 ماه پیش
والد
کامیت
15891578e1
3فایلهای تغییر یافته به همراه355 افزوده شده و 5 حذف شده
  1. 56 4
      strategy/src/core.rs
  2. 297 0
      strategy/src/incremental_json_writer.rs
  3. 2 1
      strategy/src/lib.rs

+ 56 - 4
strategy/src/core.rs

@@ -24,7 +24,7 @@ use global::trade::Trade;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
-
+use crate::incremental_json_writer::{FixPriceLog, IncrementalJsonWriter, OrderLog};
 use crate::model::{LocalPosition, OrderInfo, TokenParam};
 use crate::predictor::Predictor;
 use crate::strategy::Strategy;
@@ -124,6 +124,10 @@ pub struct Core {
     pub is_sigma_allow_open: bool, // 是否允许开单
 
     pub trading_volume: Decimal, // 交易量统计
+
+    pub order_json_writer: IncrementalJsonWriter<OrderLog>, // 下单信息记录
+    pub ref_fix_price_writer: IncrementalJsonWriter<FixPriceLog>, // 参考所定价记录
+    pub trans_fix_price_writer: IncrementalJsonWriter<FixPriceLog>, // 交易所定价记录
 }
 
 impl Core {
@@ -265,6 +269,10 @@ impl Core {
             is_sigma_abnormal: false,
             is_sigma_allow_open: true,
             trading_volume: Default::default(),
+
+            order_json_writer: IncrementalJsonWriter::new("test_data/order_log/order_json", 5, 100usize).unwrap(),
+            ref_fix_price_writer: IncrementalJsonWriter::new("test_data/ref_price_log/ref_fix_price", 5, 100usize).unwrap(),
+            trans_fix_price_writer: IncrementalJsonWriter::new("test_data/trans_price_log/trans_fix_price", 5, 100usize).unwrap(),
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -326,6 +334,7 @@ impl Core {
             info!("\n\n");
             info!("接收到订单信息①:{:?}", data);
         }
+        let now_time = Utc::now().timestamp_millis();
         /*
          更新订单
             首先直接复写本地订单
@@ -342,7 +351,7 @@ impl Core {
             为了防止下单失败依然有订单成交 本地需要做一个缓存
         */
         // 触发订单更新
-        self.trade_order_update_time = Utc::now().timestamp_millis();
+        self.trade_order_update_time = now_time;
 
         // 更新跟踪
         if self.local_orders.contains_key(&data.client_id) {
@@ -541,7 +550,18 @@ impl Core {
                         // 记录指令触发信息
                         if order.is_not_empty() {
                             // info!("触发onOrder");
+                            if !order.limits_open.is_empty() || !order.limits_close.is_empty(){
+                                let mut limit_open = order.limits_open.values().cloned().collect::<Vec<Vec<String>>>();
+                                let limit_close = order.limits_close.values().cloned().collect::<Vec<Vec<String>>>();
+                                limit_open.extend(limit_close);
+                                let order_log = OrderLog{
+                                    order_info: limit_open,
+                                    record_timestamp: now_time,
+                                };
+                                self.order_json_writer.add_entry(order_log).unwrap();
+                            }
                             self._update_local_orders(&mut order);
+
                             //交易所处理订单信号
                             let mut platform_rest_fb = self.platform_rest.clone_box();
                             let mut ts = trace_stack.clone();
@@ -717,8 +737,10 @@ impl Core {
             // 判断是否需要触发ontick 对行情进行过滤
             // 过滤条件 价格变化很大 时间间隔很长
             let mut flag = 0;
-            let bid_price_rate = (depth[BID_PRICE_INDEX] - self.depths.get(name_ref).unwrap()[BID_PRICE_INDEX]).abs() / depth[BID_PRICE_INDEX];
-            let ask_price_rate = (depth[ASK_PRICE_INDEX] - self.depths.get(name_ref).unwrap()[ASK_PRICE_INDEX]).abs() / depth[ASK_PRICE_INDEX];
+            let bid = depth[BID_PRICE_INDEX];
+            let ask = depth[ASK_PRICE_INDEX];
+            let bid_price_rate = (bid - self.depths.get(name_ref).unwrap()[BID_PRICE_INDEX]).abs() / bid;
+            let ask_price_rate = (ask - self.depths.get(name_ref).unwrap()[ASK_PRICE_INDEX]).abs() / ask;
             let rate = dec!(0.0002);
             if bid_price_rate > rate || ask_price_rate > rate || Utc::now().timestamp_millis() - self.on_tick_event_time > 50 {
                 // 允许交易
@@ -747,6 +769,16 @@ impl Core {
                 trace_stack.on_after_strategy();
 
                 if orders.is_not_empty() {
+                    if !orders.limits_open.is_empty() || !orders.limits_close.is_empty(){
+                        let mut limit_open = orders.limits_open.values().cloned().collect::<Vec<Vec<String>>>();
+                        let limit_close = orders.limits_close.values().cloned().collect::<Vec<Vec<String>>>();
+                        limit_open.extend(limit_close);
+                        let order_log = OrderLog{
+                            order_info: limit_open,
+                            record_timestamp: now_time
+                        };
+                        self.order_json_writer.add_entry(order_log).unwrap();
+                    }
                     let mut platform_rest_fb = self.platform_rest.clone_box();
                     // 先更新本地记录再发单。
                     self._update_local_orders(&mut orders);
@@ -766,6 +798,13 @@ impl Core {
                     }
                 }
             }
+            // 记录参考所定价
+            let ref_fix_price = (bid + ask)/Decimal::TWO;
+            let ref_fix_price_log = FixPriceLog{
+                fix_price: ref_fix_price,
+                record_timestamp: now_time
+            };
+            self.ref_fix_price_writer.add_entry(ref_fix_price_log).unwrap();
         }
 
         {
@@ -922,6 +961,7 @@ impl Core {
 
     // #[instrument(skip(self), level="TRACE")]
     pub fn update_trade_msg(&mut self) {
+        let now_time = Utc::now().timestamp_millis();
         // 更新保证金
         self.local_cash = self.local_cash.round_dp(10);
         self.local_coin = self.local_coin.round_dp(10);
@@ -944,6 +984,14 @@ impl Core {
             });
         }
         self.ref_price = self.predictor.get_ref_price(&ref_tickers);
+        // 交易交易所定价记录
+        let trans_fix_price = self.ref_price.last().unwrap().iter().sum::<Decimal>()/Decimal::TWO;
+        let trans_fix_price_log = FixPriceLog {
+            fix_price: trans_fix_price,
+            record_timestamp: now_time,
+        };
+        // let single_signal_start_time = std::time::Instant::now();
+        self.trans_fix_price_writer.add_entry(trans_fix_price_log).unwrap();
     }
 
     // 本地记录所有报单信息
@@ -1623,6 +1671,10 @@ impl Core {
 
         self.clear_position_and_orders(Decimal::ZERO).await;
 
+        info!("-------------------------写入未满的记录----------------------------");
+        self.trans_fix_price_writer.flush().unwrap();
+        self.order_json_writer.flush().unwrap();
+        self.ref_fix_price_writer.flush().unwrap();
         info!("订单、仓位清除完毕,为避免api失效导致遗漏仓位,建议人工复查。");
         info!("停机原因:{}。", self.exit_msg);
     }

+ 297 - 0
strategy/src/incremental_json_writer.rs

@@ -0,0 +1,297 @@
+use serde::Serialize;
+use serde_json;
+use std::fs::{self, File, OpenOptions};
+use std::io::{self, BufWriter, Write};
+use std::path::PathBuf;
+use rust_decimal::Decimal;
+
+// --------------------- 错误处理 ---------------------
+#[derive(Debug)]
+pub enum WriterError {
+    Serde(serde_json::Error),
+    Io(io::Error),
+    InvalidPath(String), // 用于处理路径错误
+}
+
+impl std::fmt::Display for WriterError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            WriterError::Serde(e) => write!(f, "JSON serialization error: {}", e),
+            WriterError::Io(e) => write!(f, "File I/O error: {}", e),
+            WriterError::InvalidPath(s) => write!(f, "Invalid path: {}", s),
+        }
+    }
+}
+
+impl std::error::Error for WriterError {}
+
+impl From<serde_json::Error> for WriterError {
+    fn from(err: serde_json::Error) -> Self {
+        WriterError::Serde(err)
+    }
+}
+
+impl From<io::Error> for WriterError {
+    fn from(err: io::Error) -> Self {
+        WriterError::Io(err)
+    }
+}
+
+// --------------------- 核心写入器 ---------------------
+pub struct IncrementalJsonWriter<T> where T: Serialize {
+    base_file_path: PathBuf, // 文件的基础路径和名称前缀 (e.g., "data/my_log")
+    current_file_idx: usize, // 当前文件的索引 (e.g., 0, 1, 2...)
+    max_file_size_bytes: u64, // 最大文件大小 (字节)
+    current_file_size_bytes: u64, // 当前文件已写入的字节数
+    batch_size: usize,
+    buffer: Vec<T>,
+    // 使用 BufWriter 来提高写入效率,每次写入不需要立即刷新到磁盘
+    current_writer: Option<BufWriter<File>>,
+}
+
+impl<T> IncrementalJsonWriter<T> where T: Serialize {
+    /// 创建一个新的 IncrementalJsonWriter 实例。
+    /// `base_path_prefix` 是文件路径和前缀,例如 "logs/events" 会生成 "logs/events_0.json", "logs/events_1.json"
+    /// `max_file_size_mb` 是单个文件的最大大小,以 MB 为单位。
+    pub fn new(base_path_prefix: &str, max_file_size_mb: u64, batch_size: usize) -> Result<Self, WriterError> {
+        let base_file_path = PathBuf::from(base_path_prefix);
+        let max_file_size_bytes = max_file_size_mb * 1024 * 1024; // MB 转换为字节
+
+        // 确保父目录存在
+        if let Some(parent) = base_file_path.parent() {
+            if parent.as_os_str() != "" { // 避免在当前目录创建时创建不存在的父目录
+                fs::create_dir_all(parent)?;
+            }
+        }
+
+        Ok(IncrementalJsonWriter {
+            base_file_path,
+            current_file_idx: 0, // 从文件索引 0 开始
+            max_file_size_bytes,
+            current_file_size_bytes: 0,
+            current_writer: None,
+            batch_size,
+            buffer: Vec::with_capacity(batch_size),
+        })
+    }
+
+    /// 获取完整的文件路径 (e.g., "data/my_log_0.json")
+    fn get_full_file_path(&self, index: usize) -> PathBuf {
+        let mut path = self.base_file_path.clone(); // 复制基础路径
+        let filename = format!("{}_{}.json",
+                               path.file_name().unwrap_or_default().to_string_lossy(), // 获取文件名部分
+                               index
+        );
+        path.set_file_name(filename); // 设置新的带索引的文件名
+        path
+    }
+
+    /// 打开或创建一个新的文件,并设置 self.current_writer。
+    /// 会查找最新的文件索引并从该文件继续写入,或者创建新的文件。
+    fn open_current_file(&mut self) -> Result<(), WriterError> {
+        // 先检查是否有打开的文件,如果有,先刷新并关闭
+        if let Some(writer) = self.current_writer.take() {
+            writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
+        }
+
+        // 查找最新的文件索引,以便在程序重启后可以继续写入
+        // 这需要遍历所有可能的文件,找到最大的那个索引
+        let mut latest_idx = 0;
+        loop {
+            let candidate_path = self.get_full_file_path(latest_idx);
+            if candidate_path.exists() {
+                latest_idx += 1;
+            } else {
+                break;
+            }
+        }
+        // 如果最新索引文件存在,则从该文件继续。否则,使用 latest_idx-1 + 1 作为新文件。
+        // 但为了简单和确定性,我们总是从索引0开始,然后简单地找到最后一个文件索引加上1
+        // 注意:这种查找方式适用于程序每次启动都重新开始写文件的情况。
+        // 如果需要接续之前未写满的文件,逻辑会更复杂:需要read_dir并解析文件名。
+        // 为了本例的增量写入和文件切换,我们简单地从 current_file_idx + 1 开始。
+        // 这里为了演示“增量写入”,我们默认从0开始写,然后自动递增。
+        // 如果要实现“断点续写”,则需要在启动时扫描目录找到最大的 index 文件,并读取其大小。
+        // For simplicity, let's just use current_file_idx as the next one to create
+        // and always start from 0 if no file exists.
+
+        let mut target_idx = self.current_file_idx;
+        let mut target_path = self.get_full_file_path(target_idx);
+
+        // 如果文件已存在,我们应该尝试从它上次的大小继续,或者直接创建新文件。
+        // 这里策略是:如果当前文件已满或不存在,就开下一个文件。
+        // 简化的策略:总是从 current_file_idx 开始,如果已存在,且未满,就追加;如果已满,就开下一个。
+        // 更简单的策略:每次调用 open_current_file 都开一个新文件,或者检查当前文件是否真的需要切换
+
+        // 寻找一个适合写入的新文件
+        loop {
+            let next_file_path = self.get_full_file_path(self.current_file_idx);
+            if next_file_path.exists() {
+                let metadata = fs::metadata(&next_file_path)?;
+                if metadata.len() >= self.max_file_size_bytes {
+                    self.current_file_idx += 1; // 切换到下一个文件
+                    self.current_file_size_bytes = 0; // 重置大小
+                    continue; // 继续循环,尝试下一个文件
+                } else {
+                    // 文件存在且未满,尝试追加写入
+                    self.current_file_size_bytes = metadata.len();
+                    target_path = next_file_path; // 找到目标文件
+                    break;
+                }
+            } else {
+                // 文件不存在,这是个新文件
+                self.current_file_size_bytes = 0;
+                target_path = next_file_path;
+                break;
+            }
+        }
+
+        let file = OpenOptions::new()
+            .create(true) // 如果文件不存在则创建
+            .append(true) // 以追加模式打开
+            .open(&target_path)?;
+
+        self.current_writer = Some(BufWriter::new(file));
+        Ok(())
+    }
+
+    /// 添加一条日志到缓冲区。如果缓冲区满,会触发一次写入。
+    pub fn add_entry(&mut self, entry: T) -> Result<(), Box<dyn std::error::Error>> {
+        self.buffer.push(entry);
+
+        // 如果缓冲区达到批处理大小,执行写入
+        if self.buffer.len() >= self.batch_size {
+            self.flush()?;
+        }
+
+        Ok(())
+    }
+
+    /// 写入单个结构体数据到文件中。
+    pub fn flush(&mut self) -> Result<(), WriterError> {
+        // 序列化缓冲区中的每条日志为 JSON 字符串,并用换行符连接
+        // 注意:这里每个 entry 都是一个独立的 JSON 对象
+        let mut batch_json_lines = String::new();
+        for entry in &self.buffer {
+            // 将每条日志序列化为 JSON 字符串
+            let json_line = serde_json::to_string(entry)?;
+            batch_json_lines.push_str(&json_line);
+            batch_json_lines.push('\n'); // JSON Lines 格式需要换行符
+        }
+
+        let data_len = batch_json_lines.as_bytes().len() as u64;
+
+        // 如果没有打开的文件,或者当前文件已满,则打开新文件
+        if self.current_writer.is_none() || self.current_file_size_bytes + data_len > self.max_file_size_bytes {
+            // 如果 current_writer.is_some(),说明前一个文件已满,需要刷新并关闭
+            if let Some(writer) = self.current_writer.take() {
+                writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
+                self.current_file_idx += 1; // 切换到下一个文件索引
+                self.current_file_size_bytes = 0; // 重置计数
+            }
+            // 打开新的文件
+            self.open_current_file()?;
+        }
+
+        // 写入数据
+        if let Some(writer) = self.current_writer.as_mut() {
+            writer.write_all(batch_json_lines.as_bytes())?;
+            self.current_file_size_bytes += data_len;
+
+            writer.flush()?; // 确保 BufWriter 的内容写入到文件
+            writer.get_ref().sync_all()?; // 确保数据写入磁盘
+            
+            // 清空缓冲区,准备下一批
+            self.buffer.clear();
+            self.buffer.reserve(self.batch_size); // 重新预分配容量
+        } else {
+            // 这应该不会发生,因为前面已经确保了 writer 是 Some
+            return Err(WriterError::Io(io::Error::new(io::ErrorKind::Other, "209 Writer not initialized unexpectedly")));
+        }
+
+        Ok(())
+    }
+}
+
+// 当 IncrementalJsonWriter 实例被 Drop 时,确保文件被关闭并数据写入磁盘
+impl<T>  Drop for IncrementalJsonWriter<T> where T: Serialize {
+    fn drop(&mut self) {
+        if let Some(mut writer) = self.current_writer.take() {
+            // Try to flush and sync, but ignore errors during shutdown
+            let _ = writer.flush();
+            let _ = writer.into_inner().and_then(|f| Ok(f.sync_all()));
+        }
+    }
+}
+#[derive(Serialize, Debug)]
+pub struct OrderLog{
+    // 挂单信息
+    pub order_info: Vec<Vec<String>>,
+    // 记录时间
+    pub record_timestamp: i64
+}
+
+#[derive(Serialize, Debug)]
+pub struct FixPriceLog{
+    // 定价
+    pub fix_price: Decimal,
+    // 记录时间
+    pub record_timestamp: i64
+}
+
+// --------------------- 示例数据结构 ---------------------
+#[derive(Serialize, Debug)]
+struct LogEntry {
+    timestamp: String,
+    level: String,
+    message: String,
+    session_id: String,
+    #[serde(skip_serializing_if = "Option::is_none")] // 如果为None则不序列化
+    user_id: Option<u32>,
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::incremental_json_writer::{IncrementalJsonWriter, LogEntry};
+    #[test]
+    fn test_incremental_json_writer() {
+        // 初始化写入器,文件前缀为 "logs/app_log",最大文件大小为 1MB
+        let base_path = "../logs/app_log";
+        let max_size_mb = 10; // 1 MB
+        let mut writer = IncrementalJsonWriter::<LogEntry>::new(base_path, max_size_mb, 100usize).unwrap();
+
+        println!("Starting to write log entries...");
+
+        let mut user_id_counter = 1000;
+
+        // 写入大量数据,直到触发文件切换
+        for i in 0..200000 { // 写入20万条记录,看看文件切换效果
+            let timestamp = chrono::Local::now().to_rfc3339();
+            let level = if i % 10 == 0 { "ERROR" } else { "INFO" }.to_string();
+            let message = format!("Processing record {} for user {}...", i, user_id_counter);
+            let session_id = format!("sess_{}", i / 100);
+
+            let user_id = if i % 5 == 0 {
+                user_id_counter += 1;
+                Some(user_id_counter)
+            } else {
+                None
+            };
+
+            let entry = LogEntry {
+                timestamp,
+                level,
+                message,
+                session_id,
+                user_id,
+            };
+
+            writer.add_entry(entry).unwrap();
+        }
+        if writer.buffer.len() > 0 {
+            writer.flush().unwrap(); // 确保所有数据都已写入
+        }
+        println!("\nFinished writing records. Final flush...");
+        println!("Logs written to files under the '{}' directory.", base_path);
+    }
+}

+ 2 - 1
strategy/src/lib.rs

@@ -15,4 +15,5 @@ mod bybit_usdt_swap;
 mod bitget_usdt_swap;
 mod coinex_usdt_swap;
 mod htx_usdt_swap;
-pub mod clear_core;
+pub mod clear_core;
+pub mod incremental_json_writer;