Просмотр исходного кода

修改定价,只替换参考交易所中间价。增加定价数据记录

JiahengHe 4 месяцев назад
Родитель
Сommit
4c2dfdd923

+ 1 - 1
standard/tests/gate_swap_test.rs

@@ -266,7 +266,7 @@ async fn test_command_order() {
     command.limits_open.insert("888888".to_string(), vec!["100".to_string(), "kd".to_string(), "0.18".to_string(), "888888".to_string()]);
     command.limits_close.insert("999999".to_string(), vec!["100".to_string(), "kk".to_string(), "0.25".to_string(), "999999".to_string()]);
     command.check.insert("888888".to_string(), vec!["999999".to_string(), "".to_string()]);
-    gate_swap_exchange.command_order(command, Default::default()).await;
+    // gate_swap_exchange.command_order(command, Default::default()).await;
 
     loop {
         if let Ok(order) = order_receiver.try_recv() {

+ 1 - 0
strategy/src/binance_usdt_swap.rs

@@ -103,6 +103,7 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
                 let price = core.pricing_engine.on_depth_message(depth);
                 match price {
                     Some(price) => {
+                        core.fix_prices.insert(response.label.clone(), price);
                         core.on_ref_price_update(&response.label, price, &mut trace_stack).await;
                     },
                     _ => {}

+ 1 - 0
strategy/src/clear_core.rs

@@ -183,6 +183,7 @@ impl ClearCore {
                 alpha: vec![],
                 gamma: Default::default(),
                 avg_spread_list: vec![],
+                ref_fix_price_list: vec![],
             },
             market: Market {
                 symbol: symbol.clone(),

+ 89 - 34
strategy/src/core.rs

@@ -18,14 +18,14 @@ use tokio::time::sleep;
 use tracing::{error, info, warn};
 use global::cci::CentralControlInfo;
 use global::params::Params;
-use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
+use global::public_params::{BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
 use global::trade::Trade;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
-use standard::handle_info::DepthParam;
 use crate::fix_price::PricingEngine;
+use crate::incremental_json_writer::{FixPriceLog, IncrementalJsonWriter, OrderLog};
 use crate::model::{LocalPosition, OrderInfo, TokenParam};
 use crate::predictor::Predictor;
 use crate::strategy::Strategy;
@@ -127,7 +127,12 @@ pub struct Core {
     pub is_sigma_allow_open: bool, // 是否允许开单
 
     pub trading_volume: Decimal, // 交易量统计
-    pub last_ref_price: Decimal, // 最后一次进入挂单的参考价格
+    pub last_fix_price: Decimal, // 最后一次进入挂单的定价价格
+    pub fix_prices: HashMap<String, Decimal>, // 最新定价
+
+    pub order_json_writer: IncrementalJsonWriter<OrderLog>, // 下单信息记录
+    pub ref_fix_price_writer: IncrementalJsonWriter<FixPriceLog>, // 参考所定价记录
+    pub trans_fix_price_writer: IncrementalJsonWriter<FixPriceLog>, // 交易所定价记录
 }
 
 impl Core {
@@ -202,6 +207,7 @@ impl Core {
                 alpha: vec![],
                 gamma: Default::default(),
                 avg_spread_list: vec![],
+                ref_fix_price_list: vec![],
             },
             market: Market {
                 symbol: symbol.clone(),
@@ -271,7 +277,11 @@ impl Core {
             is_sigma_abnormal: false,
             is_sigma_allow_open: true,
             trading_volume: Default::default(),
-            last_ref_price: Default::default(),
+            last_fix_price: Default::default(),
+            fix_prices: Default::default(),
+            order_json_writer: IncrementalJsonWriter::new("test_data/order_log/order_json", 5, 100usize).unwrap(),
+            ref_fix_price_writer: IncrementalJsonWriter::new("test_data/ref_price_log/ref_fix_price", 5, 100usize).unwrap(),
+            trans_fix_price_writer: IncrementalJsonWriter::new("test_data/trans_price_log/trans_fix_price", 5, 100usize).unwrap(),
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -333,6 +343,7 @@ impl Core {
             info!("\n\n");
             info!("接收到订单信息①:{:?}", data);
         }
+        let now_time = Utc::now().timestamp_millis();
         /*
          更新订单
             首先直接复写本地订单
@@ -349,7 +360,7 @@ impl Core {
             为了防止下单失败依然有订单成交 本地需要做一个缓存
         */
         // 触发订单更新
-        self.trade_order_update_time = Utc::now().timestamp_millis();
+        self.trade_order_update_time = now_time;
 
         // 更新跟踪
         if self.local_orders.contains_key(&data.client_id) {
@@ -547,6 +558,16 @@ impl Core {
                         // trace_stack.on_after_strategy();
                         // 记录指令触发信息
                         if order.is_not_empty() {
+                            if !order.limits_open.is_empty() || !order.limits_close.is_empty(){
+                                let mut limit_open = order.limits_open.values().cloned().collect::<Vec<Vec<String>>>();
+                                let limit_close = order.limits_close.values().cloned().collect::<Vec<Vec<String>>>();
+                                limit_open.extend(limit_close);
+                                let order_log = OrderLog{
+                                    order_info: limit_open,
+                                    record_timestamp: now_time,
+                                };
+                                self.order_json_writer.add_entry(order_log).unwrap();
+                            }
                             // info!("触发onOrder");
                             self._update_local_orders(&mut order);
                             //交易所处理订单信号
@@ -662,14 +683,15 @@ impl Core {
             return;
         }
         // 检查 market 行情
-        self.agg_market = self.get_all_market_data();
-        if self.agg_market.len() != LENGTH * (1usize + self.ref_num as usize) {
-            self.log_ready_status(format!("550聚合行情未准备好: market长度:{}, 检验数: {}", self.agg_market.len(), LENGTH * (1usize + self.ref_num as usize)));
+        let (market_info, fix_price) = self.get_all_market_data();
+        self.agg_market = market_info;
+        if self.agg_market.len() != LENGTH * (1usize + self.ref_num as usize) || fix_price == Decimal::ZERO {
+            self.log_ready_status(format!("550聚合行情未准备好: market长度:{}, fix_price: {}, 检验数: {}", self.agg_market.len(), fix_price, LENGTH * (1usize + self.ref_num as usize)));
             return;
         } else {
             // 如果准备就绪,则可以开始交易
             info!("----------------------------------聚合行情准备就绪,可以开始交易---------------------------------");
-            self.predictor.market_info_handler(&self.agg_market);
+            self.predictor.market_info_handler(&self.agg_market, fix_price);
             self.ready = 1;
         }
     }
@@ -693,7 +715,7 @@ impl Core {
         // 过滤条件 价格变化很大 时间间隔很长
         let mut flag = 0;
 
-        let price_rate = (ref_price -  self.last_ref_price).abs() / ref_price;
+        let price_rate = (ref_price -  self.last_fix_price).abs() / ref_price;
         let rate = dec!(0.0002);
         // 验证这次获取的预定价格要比上次的预定价格相差0.0002以上并且距离上次进入开单的时间间隔 > 50
         if price_rate > rate || Utc::now().timestamp_millis() - self.on_tick_event_time > 50 {
@@ -701,16 +723,14 @@ impl Core {
             flag = 1;
             // 更新ontick触发时间记录
             self.on_tick_event_time = Utc::now().timestamp_millis();
-            // // 全局记录一下最后的预定价格
-            self.last_ref_price = ref_price;
+            // 全局记录一下最后的预定价格
+            self.last_fix_price = ref_price;
         }
 
         // 允许交易
         if self.mode_signal == 0 && self.ready == 1 && flag == 1 {
             // 更新交易数据
             self.update_trade_msg();
-            // 更新预定价格
-            self.ref_price = vec![vec![ref_price, ref_price]];
             // 触发事件撤单逻辑
             // 更新策略时间
             self.strategy.local_time = Utc::now().timestamp_millis();
@@ -728,6 +748,16 @@ impl Core {
             trace_stack.on_after_strategy();
 
             if orders.is_not_empty() {
+                if !orders.limits_open.is_empty() || !orders.limits_close.is_empty(){
+                    let mut limit_open = orders.limits_open.values().cloned().collect::<Vec<Vec<String>>>();
+                    let limit_close = orders.limits_close.values().cloned().collect::<Vec<Vec<String>>>();
+                    limit_open.extend(limit_close);
+                    let order_log = OrderLog{
+                        order_info: limit_open,
+                        record_timestamp: now_time
+                    };
+                    self.order_json_writer.add_entry(order_log).unwrap();
+                }
                 let mut platform_rest_fb = self.platform_rest.clone_box();
                 // 先更新本地记录再发单。
                 self._update_local_orders(&mut orders);
@@ -747,6 +777,12 @@ impl Core {
                 }
             }
         }
+        // 记录参考所定价
+        let ref_fix_price_log = FixPriceLog{
+            fix_price: ref_price,
+            record_timestamp: now_time
+        };
+        self.ref_fix_price_writer.add_entry(ref_fix_price_log).unwrap();
     }
 
     // #[instrument(skip(self, depth, name_ref, trace_stack), level="TRACE")]
@@ -987,13 +1023,15 @@ impl Core {
          */
         // 更新聚合市场数据
         // 更新聚合市场信息
-        self.agg_market = self.get_all_market_data();
+        let (market_info, fix_price) = self.get_all_market_data();
+        self.agg_market = market_info;
         // 更新预测器
-        self.predictor.market_info_handler(&self.agg_market);
+        self.predictor.market_info_handler(&self.agg_market, fix_price);
     }
 
     // #[instrument(skip(self), level="TRACE")]
     pub fn update_trade_msg(&mut self) {
+        let now_time = Utc::now().timestamp_millis();
         // 更新保证金
         self.local_cash = self.local_cash.round_dp(10);
         self.local_coin = self.local_coin.round_dp(10);
@@ -1001,22 +1039,30 @@ impl Core {
         // self.trade_msg.position = self.local_position_by_orders.clone();
         // self.trade_msg.orders = self.local_orders.clone();
         // 更新 ref
-        // let mut ref_tickers: BTreeMap<String, Ticker> = BTreeMap::new();
-        // for i in &self.ref_name {
-        //     let bp = self.tickers.get(i).unwrap().buy;
-        //     let ap = self.tickers.get(i).unwrap().sell;
-        //     ref_tickers.insert(i.clone(), Ticker {
-        //         time: 0,
-        //         high: Default::default(),
-        //         low: Default::default(),
-        //         sell: ap,
-        //         buy: bp,
-        //         last: Default::default(),
-        //         volume: Default::default(),
-        //     });
-        // }
-        // ref_price = [[bid_price, ask_price]]
-        // self.ref_price = self.predictor.get_ref_price(&ref_tickers);
+        let mut ref_tickers: BTreeMap<String, Ticker> = BTreeMap::new();
+        for i in &self.ref_name {
+            let bp = self.tickers.get(i).unwrap().buy;
+            let ap = self.tickers.get(i).unwrap().sell;
+            ref_tickers.insert(i.clone(), Ticker {
+                time: 0,
+                high: Default::default(),
+                low: Default::default(),
+                sell: ap,
+                buy: bp,
+                last: Default::default(),
+                volume: Default::default(),
+            });
+        }
+        // [bid, ask]
+        self.ref_price = self.predictor.get_ref_price(&ref_tickers);
+        let trans_fix_price = self.ref_price.last().unwrap().iter().sum::<Decimal>()/Decimal::TWO;
+        let trans_fix_price_log = FixPriceLog {
+            fix_price: trans_fix_price,
+            record_timestamp: now_time,
+        };
+        // let single_signal_start_time = std::time::Instant::now();
+        self.trans_fix_price_writer.add_entry(trans_fix_price_log).unwrap();
+        // info!("单次交易交易所定价写入完毕,耗时: {:?}", single_signal_start_time.elapsed())
     }
 
     // 本地记录所有报单信息
@@ -1080,9 +1126,10 @@ impl Core {
 
     // 获取深度信息
     // #[instrument(skip(self), level="TRACE")]
-    pub fn get_all_market_data(&mut self) -> Vec<Decimal> {
+    pub fn get_all_market_data(&mut self) -> (Vec<Decimal>, Decimal) {
         // 只能定时触发 组合市场信息=交易盘口+参考盘口
         let mut market: Vec<Decimal> = Vec::new();
+        let mut fix_price = Decimal::ZERO;
         // 获取交易盘口市场信息
         let mut data: Vec<Decimal> = self.local_depths.get(&self.trade_name).unwrap().clone();
         self.is_update.insert(self.symbol.clone(), true);
@@ -1097,8 +1144,12 @@ impl Core {
             max_min_price = self.max_buy_min_sell_cache.get(i).unwrap().clone();
             data.append(&mut max_min_price);
             market.append(&mut data);
+            // 获取定价,暂只考虑一个参考所的情况
+            fix_price = self.fix_prices.get(i).unwrap().clone();
         }
-        return market;
+
+
+        return (market, fix_price);
     }
 
     // #[instrument(skip(self), level="TRACE")]
@@ -1696,6 +1747,10 @@ impl Core {
 
         self.clear_position_and_orders(Decimal::ZERO).await;
 
+        info!("-------------------------写入未满的记录----------------------------");
+        self.trans_fix_price_writer.flush().unwrap();
+        self.order_json_writer.flush().unwrap();
+        self.ref_fix_price_writer.flush().unwrap();
         info!("订单、仓位清除完毕,为避免api失效导致遗漏仓位,建议人工复查。");
         info!("停机原因:{}。", self.exit_msg);
     }

+ 297 - 0
strategy/src/incremental_json_writer.rs

@@ -0,0 +1,297 @@
+use serde::Serialize;
+use serde_json;
+use std::fs::{self, File, OpenOptions};
+use std::io::{self, BufWriter, Write};
+use std::path::PathBuf;
+use rust_decimal::Decimal;
+
+// --------------------- 错误处理 ---------------------
+#[derive(Debug)]
+pub enum WriterError {
+    Serde(serde_json::Error),
+    Io(io::Error),
+    InvalidPath(String), // 用于处理路径错误
+}
+
+impl std::fmt::Display for WriterError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            WriterError::Serde(e) => write!(f, "JSON serialization error: {}", e),
+            WriterError::Io(e) => write!(f, "File I/O error: {}", e),
+            WriterError::InvalidPath(s) => write!(f, "Invalid path: {}", s),
+        }
+    }
+}
+
+impl std::error::Error for WriterError {}
+
+impl From<serde_json::Error> for WriterError {
+    fn from(err: serde_json::Error) -> Self {
+        WriterError::Serde(err)
+    }
+}
+
+impl From<io::Error> for WriterError {
+    fn from(err: io::Error) -> Self {
+        WriterError::Io(err)
+    }
+}
+
+// --------------------- 核心写入器 ---------------------
+pub struct IncrementalJsonWriter<T> where T: Serialize {
+    base_file_path: PathBuf, // 文件的基础路径和名称前缀 (e.g., "data/my_log")
+    current_file_idx: usize, // 当前文件的索引 (e.g., 0, 1, 2...)
+    max_file_size_bytes: u64, // 最大文件大小 (字节)
+    current_file_size_bytes: u64, // 当前文件已写入的字节数
+    batch_size: usize,
+    buffer: Vec<T>,
+    // 使用 BufWriter 来提高写入效率,每次写入不需要立即刷新到磁盘
+    current_writer: Option<BufWriter<File>>,
+}
+
+impl<T> IncrementalJsonWriter<T> where T: Serialize {
+    /// 创建一个新的 IncrementalJsonWriter 实例。
+    /// `base_path_prefix` 是文件路径和前缀,例如 "logs/events" 会生成 "logs/events_0.json", "logs/events_1.json"
+    /// `max_file_size_mb` 是单个文件的最大大小,以 MB 为单位。
+    pub fn new(base_path_prefix: &str, max_file_size_mb: u64, batch_size: usize) -> Result<Self, WriterError> {
+        let base_file_path = PathBuf::from(base_path_prefix);
+        let max_file_size_bytes = max_file_size_mb * 1024 * 1024; // MB 转换为字节
+
+        // 确保父目录存在
+        if let Some(parent) = base_file_path.parent() {
+            if parent.as_os_str() != "" { // 避免在当前目录创建时创建不存在的父目录
+                fs::create_dir_all(parent)?;
+            }
+        }
+
+        Ok(IncrementalJsonWriter {
+            base_file_path,
+            current_file_idx: 0, // 从文件索引 0 开始
+            max_file_size_bytes,
+            current_file_size_bytes: 0,
+            current_writer: None,
+            batch_size,
+            buffer: Vec::with_capacity(batch_size),
+        })
+    }
+
+    /// 获取完整的文件路径 (e.g., "data/my_log_0.json")
+    fn get_full_file_path(&self, index: usize) -> PathBuf {
+        let mut path = self.base_file_path.clone(); // 复制基础路径
+        let filename = format!("{}_{}.json",
+                               path.file_name().unwrap_or_default().to_string_lossy(), // 获取文件名部分
+                               index
+        );
+        path.set_file_name(filename); // 设置新的带索引的文件名
+        path
+    }
+
+    /// 打开或创建一个新的文件,并设置 self.current_writer。
+    /// 会查找最新的文件索引并从该文件继续写入,或者创建新的文件。
+    fn open_current_file(&mut self) -> Result<(), WriterError> {
+        // 先检查是否有打开的文件,如果有,先刷新并关闭
+        if let Some(writer) = self.current_writer.take() {
+            writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
+        }
+
+        // 查找最新的文件索引,以便在程序重启后可以继续写入
+        // 这需要遍历所有可能的文件,找到最大的那个索引
+        let mut latest_idx = 0;
+        loop {
+            let candidate_path = self.get_full_file_path(latest_idx);
+            if candidate_path.exists() {
+                latest_idx += 1;
+            } else {
+                break;
+            }
+        }
+        // 如果最新索引文件存在,则从该文件继续。否则,使用 latest_idx-1 + 1 作为新文件。
+        // 但为了简单和确定性,我们总是从索引0开始,然后简单地找到最后一个文件索引加上1
+        // 注意:这种查找方式适用于程序每次启动都重新开始写文件的情况。
+        // 如果需要接续之前未写满的文件,逻辑会更复杂:需要read_dir并解析文件名。
+        // 为了本例的增量写入和文件切换,我们简单地从 current_file_idx + 1 开始。
+        // 这里为了演示“增量写入”,我们默认从0开始写,然后自动递增。
+        // 如果要实现“断点续写”,则需要在启动时扫描目录找到最大的 index 文件,并读取其大小。
+        // For simplicity, let's just use current_file_idx as the next one to create
+        // and always start from 0 if no file exists.
+
+        let mut target_idx = self.current_file_idx;
+        let mut target_path = self.get_full_file_path(target_idx);
+
+        // 如果文件已存在,我们应该尝试从它上次的大小继续,或者直接创建新文件。
+        // 这里策略是:如果当前文件已满或不存在,就开下一个文件。
+        // 简化的策略:总是从 current_file_idx 开始,如果已存在,且未满,就追加;如果已满,就开下一个。
+        // 更简单的策略:每次调用 open_current_file 都开一个新文件,或者检查当前文件是否真的需要切换
+
+        // 寻找一个适合写入的新文件
+        loop {
+            let next_file_path = self.get_full_file_path(self.current_file_idx);
+            if next_file_path.exists() {
+                let metadata = fs::metadata(&next_file_path)?;
+                if metadata.len() >= self.max_file_size_bytes {
+                    self.current_file_idx += 1; // 切换到下一个文件
+                    self.current_file_size_bytes = 0; // 重置大小
+                    continue; // 继续循环,尝试下一个文件
+                } else {
+                    // 文件存在且未满,尝试追加写入
+                    self.current_file_size_bytes = metadata.len();
+                    target_path = next_file_path; // 找到目标文件
+                    break;
+                }
+            } else {
+                // 文件不存在,这是个新文件
+                self.current_file_size_bytes = 0;
+                target_path = next_file_path;
+                break;
+            }
+        }
+
+        let file = OpenOptions::new()
+            .create(true) // 如果文件不存在则创建
+            .append(true) // 以追加模式打开
+            .open(&target_path)?;
+
+        self.current_writer = Some(BufWriter::new(file));
+        Ok(())
+    }
+
+    /// 添加一条日志到缓冲区。如果缓冲区满,会触发一次写入。
+    pub fn add_entry(&mut self, entry: T) -> Result<(), Box<dyn std::error::Error>> {
+        self.buffer.push(entry);
+
+        // 如果缓冲区达到批处理大小,执行写入
+        if self.buffer.len() >= self.batch_size {
+            self.flush()?;
+        }
+
+        Ok(())
+    }
+
+    /// 写入单个结构体数据到文件中。
+    pub fn flush(&mut self) -> Result<(), WriterError> {
+        // 序列化缓冲区中的每条日志为 JSON 字符串,并用换行符连接
+        // 注意:这里每个 entry 都是一个独立的 JSON 对象
+        let mut batch_json_lines = String::new();
+        for entry in &self.buffer {
+            // 将每条日志序列化为 JSON 字符串
+            let json_line = serde_json::to_string(entry)?;
+            batch_json_lines.push_str(&json_line);
+            batch_json_lines.push('\n'); // JSON Lines 格式需要换行符
+        }
+
+        let data_len = batch_json_lines.as_bytes().len() as u64;
+
+        // 如果没有打开的文件,或者当前文件已满,则打开新文件
+        if self.current_writer.is_none() || self.current_file_size_bytes + data_len > self.max_file_size_bytes {
+            // 如果 current_writer.is_some(),说明前一个文件已满,需要刷新并关闭
+            if let Some(writer) = self.current_writer.take() {
+                writer.into_inner().unwrap().sync_all()?; // 确保所有数据写入磁盘
+                self.current_file_idx += 1; // 切换到下一个文件索引
+                self.current_file_size_bytes = 0; // 重置计数
+            }
+            // 打开新的文件
+            self.open_current_file()?;
+        }
+
+        // 写入数据
+        if let Some(writer) = self.current_writer.as_mut() {
+            writer.write_all(batch_json_lines.as_bytes())?;
+            self.current_file_size_bytes += data_len;
+
+            writer.flush()?; // 确保 BufWriter 的内容写入到文件
+            writer.get_ref().sync_all()?; // 确保数据写入磁盘
+            
+            // 清空缓冲区,准备下一批
+            self.buffer.clear();
+            self.buffer.reserve(self.batch_size); // 重新预分配容量
+        } else {
+            // 这应该不会发生,因为前面已经确保了 writer 是 Some
+            return Err(WriterError::Io(io::Error::new(io::ErrorKind::Other, "209 Writer not initialized unexpectedly")));
+        }
+
+        Ok(())
+    }
+}
+
+// 当 IncrementalJsonWriter 实例被 Drop 时,确保文件被关闭并数据写入磁盘
+impl<T>  Drop for IncrementalJsonWriter<T> where T: Serialize {
+    fn drop(&mut self) {
+        if let Some(mut writer) = self.current_writer.take() {
+            // Try to flush and sync, but ignore errors during shutdown
+            let _ = writer.flush();
+            let _ = writer.into_inner().and_then(|f| Ok(f.sync_all()));
+        }
+    }
+}
+#[derive(Serialize, Debug)]
+pub struct OrderLog{
+    // 挂单信息
+    pub order_info: Vec<Vec<String>>,
+    // 记录时间
+    pub record_timestamp: i64
+}
+
+#[derive(Serialize, Debug)]
+pub struct FixPriceLog{
+    // 定价
+    pub fix_price: Decimal,
+    // 记录时间
+    pub record_timestamp: i64
+}
+
+// --------------------- 示例数据结构 ---------------------
+#[derive(Serialize, Debug)]
+struct LogEntry {
+    timestamp: String,
+    level: String,
+    message: String,
+    session_id: String,
+    #[serde(skip_serializing_if = "Option::is_none")] // 如果为None则不序列化
+    user_id: Option<u32>,
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::incremental_json_writer::{IncrementalJsonWriter, LogEntry};
+    #[test]
+    fn test_incremental_json_writer() {
+        // 初始化写入器,文件前缀为 "logs/app_log",最大文件大小为 1MB
+        let base_path = "../logs/app_log";
+        let max_size_mb = 10; // 1 MB
+        let mut writer = IncrementalJsonWriter::<LogEntry>::new(base_path, max_size_mb, 100usize).unwrap();
+
+        println!("Starting to write log entries...");
+
+        let mut user_id_counter = 1000;
+
+        // 写入大量数据,直到触发文件切换
+        for i in 0..200000 { // 写入20万条记录,看看文件切换效果
+            let timestamp = chrono::Local::now().to_rfc3339();
+            let level = if i % 10 == 0 { "ERROR" } else { "INFO" }.to_string();
+            let message = format!("Processing record {} for user {}...", i, user_id_counter);
+            let session_id = format!("sess_{}", i / 100);
+
+            let user_id = if i % 5 == 0 {
+                user_id_counter += 1;
+                Some(user_id_counter)
+            } else {
+                None
+            };
+
+            let entry = LogEntry {
+                timestamp,
+                level,
+                message,
+                session_id,
+                user_id,
+            };
+
+            writer.add_entry(entry).unwrap();
+        }
+        if writer.buffer.len() > 0 {
+            writer.flush().unwrap(); // 确保所有数据都已写入
+        }
+        println!("\nFinished writing records. Final flush...");
+        println!("Logs written to files under the '{}' directory.", base_path);
+    }
+}

+ 87 - 0
strategy/src/json_writer.rs

@@ -0,0 +1,87 @@
+use serde::{Serialize, Deserialize};
+use serde_json;
+use std::io::{self, Write, BufWriter};
+use std::fs::{File, OpenOptions};
+use std::path::Path;
+use chrono::{Local, Timelike};
+
+
+/// 用于批量写入 LogEntry 到 JSON Lines 文件的结构体
+struct LogBatchWriter<T> where T: Serialize {
+    buffer: Vec<T>,
+    writer: BufWriter<File>,
+    batch_size: usize,
+}
+
+impl<T> LogBatchWriter<T> where T: Serialize {
+    /// 创建一个新的 LogBatchWriter
+    /// file_path: 目标文件路径
+    /// batch_size: 每批写入的日志数量阈值
+    ///
+    /// 文件将以追加模式打开,如果不存在则创建。
+    fn new(file_path: &str, batch_size: usize) -> io::Result<Self> {
+        let file = OpenOptions::new()
+            .append(true) // 追加模式
+            .create(true) // 如果文件不存在则创建
+            .open(file_path)?;
+
+        let writer = BufWriter::new(file);
+
+        Ok(LogBatchWriter {
+            buffer: Vec::with_capacity(batch_size), // 预分配容量
+            writer,
+            batch_size,
+        })
+    }
+
+    /// 添加一条日志到缓冲区。如果缓冲区满,会触发一次写入。
+    fn add_entry(&mut self, entry: T) -> Result<(), Box<dyn std::error::Error>> {
+        self.buffer.push(entry);
+
+        // 如果缓冲区达到批处理大小,执行写入
+        if self.buffer.len() >= self.batch_size {
+            self.flush()?;
+        }
+
+        Ok(())
+    }
+
+    /// 将缓冲区中的所有日志写入文件并清空缓冲区。
+    /// 公开方法,供用户在程序结束时强制写入剩余数据。
+    pub fn flush(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+        if self.buffer.is_empty() {
+            return Ok(()); // 缓冲区为空,无需写入
+        }
+
+        // 序列化缓冲区中的每条日志为 JSON 字符串,并用换行符连接
+        // 注意:这里每个 entry 都是一个独立的 JSON 对象
+        let mut batch_json_lines = String::new();
+        for entry in &self.buffer {
+            // 将每条日志序列化为 JSON 字符串
+            let json_line = serde_json::to_string(entry)?;
+            batch_json_lines.push_str(&json_line);
+            batch_json_lines.push('\n'); // JSON Lines 格式需要换行符
+        }
+
+        // 将整个批次的 JSON Lines 字符串写入文件
+        self.writer.write_all(batch_json_lines.as_bytes())?;
+        self.writer.flush()?; // 确保 BufWriter 的内容写入到文件
+
+        // 清空缓冲区,准备下一批
+        self.buffer.clear();
+        self.buffer.reserve(self.batch_size); // 重新预分配容量
+
+        Ok(())
+    }
+}
+
+// 可选:实现 Drop trait,在 LogBatchWriter 生命周期结束时尝试刷新。
+// 注意:Drop trait 不能返回 Result,所以错误处理在此受限,通常只能打印错误或忽略。
+// 建议用户在显式的地方调用 flush() 并处理 Result。
+impl<T> Drop for LogBatchWriter<T> where T: Serialize{
+    fn drop(&mut self) {
+        if let Err(e) = self.flush() { // flush() 可能返回 Err
+            eprintln!("错误:LogBatchWriter 在 Drop 时刷新失败: {}", e);
+        }
+    }
+}

+ 3 - 1
strategy/src/lib.rs

@@ -16,4 +16,6 @@ mod bitget_usdt_swap;
 mod coinex_usdt_swap;
 mod htx_usdt_swap;
 pub mod clear_core;
-mod fix_price;
+mod fix_price;
+mod incremental_json_writer;
+mod json_writer;

+ 31 - 20
strategy/src/predictor.rs

@@ -15,6 +15,7 @@ pub struct Predictor {
     pub alpha: Vec<Decimal>,                                            // 价格系数
     pub gamma: Decimal,                                                 // 定价系数
     pub avg_spread_list: Vec<Decimal>,                                  // 平均价差
+    pub ref_fix_price_list: Vec<Decimal>,                               // 参考交易所定价数组
 }
 
 /*
@@ -33,6 +34,7 @@ impl Predictor {
             alpha: vec![Decimal::new(1, 0); 100],
             gamma: Decimal::from_f64(0.999).unwrap(),
             avg_spread_list: vec![dec!(0); ref_exchange_length],
+            ref_fix_price_list: vec![]
         }
     }
 
@@ -59,13 +61,16 @@ impl Predictor {
 
         // 更新参考ref_mid_price
         let mut ref_mid_price_per_exchange = vec![];
-        for ref_index in 0..self.ref_exchange_length {
-            let ref_bid_price = last_market_info[public_params::LENGTH*(1+ref_index)+public_params::BID_PRICE_INDEX];
-            let ref_ask_price = last_market_info[public_params::LENGTH*(1+ref_index)+public_params::ASK_PRICE_INDEX];
-            let ref_mid_price = (ref_bid_price + ref_ask_price) * dec!(0.5);
-            // 依照交易所次序添加到ref_mid_price_per_exchange中
-            ref_mid_price_per_exchange.push(ref_mid_price);
-        }
+        // for ref_index in 0..self.ref_exchange_length {
+        //     let ref_bid_price = last_market_info[public_params::LENGTH*(1+ref_index)+public_params::BID_PRICE_INDEX];
+        //     let ref_ask_price = last_market_info[public_params::LENGTH*(1+ref_index)+public_params::ASK_PRICE_INDEX];
+        //     let ref_mid_price = (ref_bid_price + ref_ask_price) * dec!(0.5);
+        //     // 依照交易所次序添加到ref_mid_price_per_exchange中
+        //     ref_mid_price_per_exchange.push(ref_mid_price);
+        // }
+
+        // 使用参考所的新定价替换原先的mid_price
+        ref_mid_price_per_exchange.push(self.ref_fix_price_list.last().unwrap().clone());
         self.ref_mid_price_per_exchange_per_frame.push(ref_mid_price_per_exchange);
 
         // 价差更新
@@ -80,13 +85,13 @@ impl Predictor {
 
         for ref_index in 0..self.ref_exchange_length {
             let bias = last_ref_mid_price_per_exchange[ref_index] * self.alpha[ref_index] - mid_price_last;
-
+        
             let mut gamma = self.gamma;
             // 如果程序刚刚启动,gamma值不能太大
             if self.loop_count < 100 {
                 gamma = dec!(0.9);
             }
-
+        
             // 检测是否初始化
             if dec!(0).eq(&self.avg_spread_list[ref_index]) {
                 self.avg_spread_list[ref_index] = bias;
@@ -115,7 +120,7 @@ impl Predictor {
 
     // 市场信息处理器,也是python里的onTime方法
     // #[instrument(skip(self, new_market_info), level="TRACE")]
-    pub fn  market_info_handler(&mut self, new_market_info: &Vec<Decimal>) {
+    pub fn market_info_handler(&mut self, new_market_info: &Vec<Decimal>, fix_price: Decimal) {
         // 空数据不处理
         if new_market_info.len() == 0 {
             return;
@@ -125,6 +130,7 @@ impl Predictor {
             self.loop_count += 1;
         }
         self.market_info_list.push(new_market_info.clone());
+        self.ref_fix_price_list.push(fix_price);
         (*self).processor();
         (*self).check_length();
     }
@@ -134,6 +140,8 @@ impl Predictor {
     pub fn get_ref_price(&mut self, ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
         let mut ref_price_list = vec![];
         let ref_exchange_names: Vec<_> = ref_ticker_map.keys().collect();
+        // 交易交易所参考价
+        let mut trans_price= Decimal::ZERO;
         for ref_index in 0..ref_exchange_names.len() {
             let ref_exchange = ref_exchange_names[ref_index];
 
@@ -142,10 +150,12 @@ impl Predictor {
             let ask_price = ticker.sell;
             let ref_bid_price = bid_price * self.alpha[ref_index] - self.avg_spread_list[ref_index];
             let ref_ask_price = ask_price * self.alpha[ref_index] - self.avg_spread_list[ref_index];
-
+            trans_price = (ref_ask_price + ref_bid_price)/Decimal::TWO;
             ref_price_list.push(vec![ref_bid_price, ref_ask_price]);
         }
-
+        // 参考交易所参考价
+        let ref_price = self.ref_mid_price_per_exchange_per_frame.last().unwrap().last().unwrap().clone();
+        
         return ref_price_list;
     }
 }
@@ -155,6 +165,7 @@ mod tests {
     use std::collections::BTreeMap;
     use std::io;
     use std::io::Write;
+    use rust_decimal::Decimal;
     use rust_decimal_macros::dec;
     use standard::Ticker;
     use crate::predictor::Predictor;
@@ -180,9 +191,9 @@ mod tests {
     fn market_info_handler_test() {
         let mut predictor = Predictor::new(1);
         let market_info_0 = vec![dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79)];
-        predictor.market_info_handler(&market_info_0);
+        predictor.market_info_handler(&market_info_0, Decimal::ZERO);
         let market_info_1 = vec![dec!(0.98), dec!(0.99), dec!(0.56), dec!(0.49), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79)];
-        predictor.market_info_handler(&market_info_1);
+        predictor.market_info_handler(&market_info_1, Decimal::ZERO);
     }
 
     #[test]
@@ -206,22 +217,22 @@ mod tests {
 
         let mut market_info = vec![];
         market_info = vec![dec!(0.99), dec!(1.0), dec!(0.991), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79)];
-        predictor.market_info_handler(&market_info);
+        predictor.market_info_handler(&market_info, Decimal::ZERO);
         println!("market info 0: {:?}", predictor.get_ref_price(&ref_ticker_map));
         market_info = vec![dec!(0.98), dec!(0.99), dec!(0.981), dec!(0.49), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
+        predictor.market_info_handler(&market_info, Decimal::ZERO);
         println!("market info 1: {:?}", predictor.get_ref_price(&ref_ticker_map));
         market_info = vec![dec!(0.93), dec!(1.0), dec!(0.931), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
+        predictor.market_info_handler(&market_info, Decimal::ZERO);
         println!("market info 2: {:?}", predictor.get_ref_price(&ref_ticker_map));
         market_info = vec![dec!(0.98), dec!(0.49), dec!(0.981), dec!(0.49), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
+        predictor.market_info_handler(&market_info, Decimal::ZERO);
         println!("market info 3: {:?}", predictor.get_ref_price(&ref_ticker_map));
         market_info = vec![dec!(0.99), dec!(1.0), dec!(0.991), dec!(0.69), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
+        predictor.market_info_handler(&market_info, Decimal::ZERO);
         println!("market info 4: {:?}", predictor.get_ref_price(&ref_ticker_map));
         market_info = vec![dec!(0.98), dec!(0.969), dec!(0.981), dec!(0.49), dec!(0.99), dec!(1.0), dec!(1.0), dec!(1.0), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
+        predictor.market_info_handler(&market_info, Decimal::ZERO);
         println!("market info 5: {:?}", predictor.get_ref_price(&ref_ticker_map));
     }
 }