ソースを参照

完成定价算法,待接入策略逻辑,移除了500ms撤单

JiahengHe 5 ヶ月 前
コミット
108847a005

+ 20 - 20
global/src/log_utils.rs

@@ -46,26 +46,26 @@ impl<S> Layer<S> for ReportingLayer
 }
 
 pub fn send_remote_err_log(msg: String) {
-    tokio::spawn(async move {
-        let encoded_str = base64::encode(msg.clone());
-        let mut request_json_data = HashMap::new();
-        request_json_data.insert("serverName", "As");
-        request_json_data.insert("data", encoded_str.as_str());
-
-        let res = Client::new().post("https://hhh.liangjiang.cc/api/log/addError?key=d64a8sc874sa8c4as5")
-            .json(&request_json_data)
-            .send()
-            .await;
-
-        match res {
-            Ok(_resp) => {
-                // let body = _resp.text().await.unwrap();
-            }
-            Err(err) => {
-                warn!("log的error监听器发送远端报错失败:{:?}", err);
-            }
-        }
-    });
+    // tokio::spawn(async move {
+    //     let encoded_str = base64::encode(msg.clone());
+    //     let mut request_json_data = HashMap::new();
+    //     request_json_data.insert("serverName", "As");
+    //     request_json_data.insert("data", encoded_str.as_str());
+    //
+    //     let res = Client::new().post("https://hhh.liangjiang.cc/api/log/addError?key=d64a8sc874sa8c4as5")
+    //         .json(&request_json_data)
+    //         .send()
+    //         .await;
+    //
+    //     match res {
+    //         Ok(_resp) => {
+    //             // let body = _resp.text().await.unwrap();
+    //         }
+    //         Err(err) => {
+    //             warn!("log的error监听器发送远端报错失败:{:?}", err);
+    //         }
+    //     }
+    // });
 }
 
 pub fn init_log_with_debug() {

+ 2 - 2
standard/src/binance_swap_handle.rs

@@ -3,7 +3,7 @@ use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
 use serde_json::Value;
 use exchanges::response_base::ResponseData;
-use crate::{MarketOrder, SpecialDepth, SpecialTicker};
+use crate::{Depth, MarketOrder, SpecialDepth, SpecialTicker};
 
 
 // 处理特殊Ticker信息
@@ -37,4 +37,4 @@ pub fn format_depth_items(value: Value) -> Vec<MarketOrder> {
         })
     }
     return depth_items;
-}
+}

+ 9 - 3
standard/src/handle_info.rs

@@ -7,7 +7,7 @@ use tracing::{error, info};
 use exchanges::response_base::ResponseData;
 use global::public_params;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, coinex_swap_handle, htx_swap_handle};
+use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, coinex_swap_handle, htx_swap_handle, Depth};
 use crate::{Account, MarketOrder, Position, SpecialDepth, SpecialOrder, SpecialTicker};
 
 #[allow(dead_code)]
@@ -18,7 +18,8 @@ pub struct DepthParam {
     pub depth_asks: Vec<MarketOrder>,
     pub depth_bids: Vec<MarketOrder>,
     pub t: Decimal,
-    pub create_at: i64
+    pub create_at: i64,
+    pub timestamp_us: u64
 }
 
 #[allow(dead_code)]
@@ -190,6 +191,10 @@ impl HandleSwapInfo {
         // 运算、组装
         make_special_depth(label, &mut format_depth.depth_asks, &mut format_depth.depth_bids, format_depth.t, format_depth.create_at)
     }
+    
+    pub fn handle_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthParam{
+        format_depth(exchange, res_data)
+    }
 }
 
 
@@ -341,7 +346,8 @@ pub fn format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthPar
         depth_asks,
         depth_bids,
         t,
-        create_at
+        create_at,
+        timestamp_us: chrono::Utc::now().timestamp_millis() as u64,
     }
 }
 

+ 30 - 0
standard/src/lib.rs

@@ -134,6 +134,7 @@ impl Account {
 /// - `bids(Vec<MarketOrder>)`: 买方深度列表;
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Depth {
+    pub timestamp_us: u64,
     pub time: i64,
     pub asks: Vec<MarketOrder>,
     pub bids: Vec<MarketOrder>,
@@ -142,6 +143,7 @@ pub struct Depth {
 impl Depth {
     pub fn new() -> Depth {
         Depth {
+            timestamp_us: 0,
             time: 0,
             asks: vec![],
             bids: vec![],
@@ -424,6 +426,34 @@ impl Position {
     }
 }
 
+
+/// 交易方向:买入或卖出
+#[derive(Debug, Clone)]
+pub enum TradeSide {
+    Buy = 1,
+    Sell = -1,
+}
+
+impl From<&str> for TradeSide {
+    fn from(s: &str) -> Self {
+        match s.to_lowercase().as_str() {
+            "buy" => TradeSide::Buy,
+            "sell" => TradeSide::Sell,
+            _ => panic!("Invalid trade side: {}", s), // 实际应用中应更优雅地处理错误
+        }
+    }
+}
+
+/// 单笔交易数据
+#[derive(Debug, Clone)]
+pub struct Trade {
+    pub timestamp_us: u64, // 时间戳,单位微秒 (microseconds)
+    pub price: Decimal,
+    pub amount: Decimal,
+    pub side: TradeSide,
+}
+
+
 /// 交易所统一方法接口
 ///
 /// 使用方法前需实例化

+ 8 - 25
strategy/src/binance_usdt_swap.rs

@@ -4,13 +4,14 @@ use std::sync::atomic::AtomicBool;
 use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::Message;
-use tracing::error;
+use tracing::{error, info};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::{TraceStack};
 use standard::exchange::ExchangeEnum::BinanceSwap;
 use crate::core::Core;
 use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use crate::exchange_disguise::{on_special_depth};
+use crate::model::OriginalTradeBa;
 
 // 参考 币安 合约 启动
 pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
@@ -27,7 +28,7 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         ws.set_subscribe(vec![
             // BinanceSwapSubscribeType::PuDepth20levels100ms,
             BinanceSwapSubscribeType::PuBookTicker,
-            // BinanceSwapSubscribeType::PuAggTrade
+            BinanceSwapSubscribeType::PuAggTrade
         ]);
 
         // 读取数据
@@ -58,27 +59,8 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
 
     match response.channel.as_str() {
         "aggTrade" => {
-            // let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-            // let name = data.label.clone();
-
-            // 订单流逻辑
-            // on_trade(trade.clone(), core_arc_clone.clone()).await;
-
-            // 原本的逻辑
-            // let mut core = core_arc_clone.lock().await;
-            // let str = data.label.clone();
-            // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
-            //     *_max_buy = Decimal::ZERO;
-            //     *_min_sell = Decimal::ZERO;
-            //     core.is_update.remove(str.as_str());
-            // }
-            // if trade.p > *_max_buy || *_max_buy == Decimal::ZERO {
-            //     *_max_buy = trade.p
-            // }
-            // if trade.p < *_min_sell || *_min_sell == Decimal::ZERO {
-            //     *_min_sell = trade.p
-            // }
-            // core.max_buy_min_sell_cache.insert(data.label, vec![*_max_buy, *_min_sell]);
+            let trade: Vec<OriginalTradeBa> = serde_json::from_str(response.data.as_str().unwrap()).unwrap();
+            info!("收到aggTrade信息,内容:{}", response.data.to_string());
         }
         "bookTicker" => {
             trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
@@ -91,10 +73,11 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
         "depth" => {
             trace_stack.set_source("binance_usdt_swap.depth".to_string());
             // 将depth数据转换为模拟深度
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, &response);
+            let depth = standard::handle_info::HandleSwapInfo::handle_depth(BinanceSwap, &response);
             trace_stack.on_after_format();
 
-            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            info!("depth asks: {:?}", depth.depth_asks);
+            info!("depth bids: {:?}", depth.depth_bids);
         }
         _ => {
             error!("未知推送类型");

+ 5 - 1
strategy/src/core.rs

@@ -24,7 +24,7 @@ use global::trade::Trade;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
-
+use crate::fix_price::PricingEngine;
 use crate::model::{LocalPosition, OrderInfo, TokenParam};
 use crate::predictor::Predictor;
 use crate::strategy::Strategy;
@@ -44,6 +44,8 @@ pub struct Core {
     pub quote: String,
     //
     pub strategy: Strategy,
+
+    pub pricing_engine: PricingEngine,
     // 本地挂单表
     pub local_orders: HashMap<String, OrderInfo>,
     // 本地订单缓存队列
@@ -136,6 +138,7 @@ impl Core {
                      cci_arc: Arc<Mutex<CentralControlInfo>>) -> Core {
         let symbol = params.pair.clone();
         let pairs: Vec<&str> = params.pair.split('_').collect();
+        let pricing_engine = PricingEngine::new(100);
         let mut core_obj = Core {
             params: params.clone(),
             start_time: 0,
@@ -145,6 +148,7 @@ impl Core {
             // 现货底仓
             hold_coin: clip(params.hold_coin, Decimal::ZERO, Decimal::ONE_HUNDRED * Decimal::ONE_HUNDRED),
             strategy: Strategy::new(&params, true),
+            pricing_engine,
             local_orders: Default::default(),
             local_orders_backup: Default::default(),
             local_orders_backup_cid: Default::default(),

+ 425 - 0
strategy/src/fix_price.rs

@@ -0,0 +1,425 @@
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::{FromPrimitive, Signed, ToPrimitive};
+use standard::{MarketOrder, Trade, TradeSide};
+use standard::handle_info::DepthParam;
+// --- 1. 数据结构定义 ---
+
+/// 预处理后的累计交易组
+#[derive(Debug, Clone)]
+pub struct TradeCum {
+    pub v: Decimal, // 累计交易量 (带符号: 买入为正,卖出为负)
+    pub price: Decimal, // 组内平均交易价格
+    pub trades: Vec<Trade>, // 组内原始交易列表 (用于 midtrade 计算)
+}
+
+/// 定价模型输出结果
+#[derive(Debug)]
+pub struct PricingOutput {
+    pub depth_theo: Decimal, // 基于深度行情计算的理论价格
+    pub trade_theo: Decimal, // 基于累计交易组计算的理论价格
+}
+
+/// 定价引擎的状态,包含所有计算所需的历史数据和中间变量
+pub struct PricingState {
+    pub last_depth: Option<DepthParam>, // 上一次收到的深度行情
+    pub last_trade_flow_cum_sum_ema2: Decimal, // 上一次深度定价时计算的 tradeflow_cumsum_ema2
+    pub last_trade_flow_cum_sum_ema5: Decimal, // 上一次深度定价时计算的 tradeflow_cumsum_ema5
+    pub last_trade_flow_intra_tick_cum_sum_ema2_log: Decimal, // 上一次深度定价时计算的 tradeflow_cumsum_ema2_log,用于 trade_cum 定价
+    pub last_trade_flow_intra_tick_cum_sum_ema5_log: Decimal, // 上一次深度定价时计算的 tradeflow_cumsum_ema5_log,用于 trade_cum 定价
+    pub depth_minus_mid_at_last_depth: Decimal, // 上一次深度定价时计算的 depth_minus_mid,用于 trade_cum 定价
+    pub trade_cums_between_depths: Vec<TradeCum>, // 存储在两幅深度行情之间截取的所有完整 trade_cum 组
+    pub last_trade_timestamp_us: u64, // 上一笔原始交易的时间戳,用于交易截取
+    pub current_trade_group: Vec<Trade>, // 当前正在累积的原始交易组
+    pub trade_interception_tau_us: u64, // 交易截取的时间阈值,0.1ms = 100微秒
+    pub last_calculated_depth_theo: Decimal, // 最后一次计算的 depth_theo
+    pub last_calculated_trade_theo: Decimal, // 最后一次计算的 trade_theo
+}
+
+impl Default for PricingState {
+    fn default() -> Self {
+        PricingState {
+            last_depth: None,
+            last_trade_flow_cum_sum_ema2: Decimal::ZERO,
+            last_trade_flow_cum_sum_ema5: Decimal::ZERO,
+            last_trade_flow_intra_tick_cum_sum_ema2_log: Decimal::ZERO,
+            last_trade_flow_intra_tick_cum_sum_ema5_log: Decimal::ZERO,
+            depth_minus_mid_at_last_depth: Decimal::ZERO,
+            trade_cums_between_depths: Vec::new(),
+            last_trade_timestamp_us: 0,
+            current_trade_group: Vec::new(),
+            trade_interception_tau_us: 100, // 默认 0.1ms = 100微秒
+            last_calculated_depth_theo: Decimal::ZERO,
+            last_calculated_trade_theo: Decimal::ZERO,
+        }
+    }
+}
+
+/// 辅助函数:计算带符号的对数 log(1 + |x|) * sign(x)
+fn signed_log(val: Decimal) -> Decimal {
+    if val == Decimal::ZERO {
+        Decimal::ZERO
+    } else {
+        // 1. 将 Decimal 转换为 f64
+        //    to_f64() 返回 Option<f64> 因为 Decimal 可能超出 f64 的范围
+        let val_f64 = val.to_f64()
+            .expect("Decimal value is too large or too small or invalid for f64 conversion");
+
+        // 2. 在 f64 上执行计算:ln(|val| + 1) * sign(val)
+        //    f64 提供 abs() 和 ln_1p() (计算 ln(1+x))
+        let abs_val_f64 = val_f64.abs();
+        let ln_result_f64 = abs_val_f64.ln_1p(); // ln_1p(x) is ln(1+x)
+
+        //    获取原始 Decimal 的符号,并转换为 f64 (-1.0, 0.0, 1.0)
+        let sign_f64 = val.signum().to_f64()
+            .expect("Signum conversion failed unexpectedly"); // signum is -1, 0, 1, always convertible
+
+        let final_f64 = ln_result_f64 * sign_f64;
+
+        // 3. 将 f64 结果转回 Decimal
+        //    from_f64() 返回 Option<Decimal> 因为 f64 可能无法精确表示为 Decimal
+        Decimal::from_f64(final_f64)
+            .expect("Failed to convert f64 result back to Decimal")
+    }
+}
+
+// --- 2. 交易数据预处理 (Trade Data Pre-processing) ---
+
+/// 处理单笔原始交易数据。
+/// 根据时间阈值 `tau` 将交易分组。
+/// 如果一个交易组完成,则返回 `Some(TradeCum)`,否则返回 `None`。
+pub fn process_raw_trade(state: &mut PricingState, trade: Trade) -> Option<TradeCum> {
+    let current_timestamp = trade.timestamp_us;
+
+    // 如果是第一笔交易,或者当前交易与上一笔交易的时间差在阈值内,则加入当前组
+    if state.last_trade_timestamp_us == 0 || (current_timestamp - state.last_trade_timestamp_us) <= state.trade_interception_tau_us {
+        state.current_trade_group.push(trade);
+    } else {
+        // 否则,当前交易开始一个新的组,先处理并返回上一个完整的组
+        if!state.current_trade_group.is_empty() {
+            let trade_cum = create_trade_cum(&state.current_trade_group);
+            state.current_trade_group.clear();
+            state.current_trade_group.push(trade); // 将当前交易加入新组
+            state.last_trade_timestamp_us = current_timestamp;
+            return Some(trade_cum);
+        } else {
+            // 理论上不应发生,但如果当前组为空,则直接将交易加入
+            state.current_trade_group.push(trade);
+        }
+    }
+    state.last_trade_timestamp_us = current_timestamp;
+    None
+}
+
+/// 辅助函数:从一组原始交易中创建 `TradeCum`
+fn create_trade_cum(trades: &Vec<Trade>) -> TradeCum {
+    let n = trades.len();
+    if n == 0 {
+        return TradeCum { v: Decimal::ZERO, price: Decimal::ZERO, trades: trades.to_vec() };
+    }
+    let trade_cum_v = trades.iter().map(|t| t.amount * Decimal::from_i32(t.side.clone() as i32).unwrap()).sum();
+    let n_decimal = Decimal::from_usize(n).unwrap();
+    let trade_cum_price = trades.iter().map(|t| t.price).sum::<Decimal>() / n_decimal;
+    TradeCum {
+        v: trade_cum_v,
+        price: trade_cum_price,
+        trades: trades.to_vec(),
+    }
+}
+
+/// 刷新任何未完成的交易组。
+/// 在处理深度行情前调用,以确保所有在深度更新前发生的交易都被处理。
+pub fn flush_trade_group(state: &mut PricingState) -> Option<TradeCum> {
+    if!state.current_trade_group.is_empty() {
+        let trade_cum = create_trade_cum(&state.current_trade_group);
+        state.current_trade_group.clear();
+        return Some(trade_cum);
+    }
+    None
+}
+
+// --- 3. 深度定价 (DepthParam Pricing) ---
+
+/// 根据当前深度行情和期间的累计交易组计算 `depth_theo`。
+/// 同时更新 `PricingState` 中用于后续计算的 EMA 值和 `depth_minus_mid`。
+pub fn calculate_depth_pricing(state: &mut PricingState, current_depth: DepthParam) -> Decimal {
+    let mut depth_theo = Decimal::ZERO;
+    let three = Decimal::TWO + Decimal::ONE;
+
+    // 如果没有上一幅深度行情,则无法计算 depth_theo,只存储当前深度并返回默认值
+    if state.last_depth.is_none() {
+        state.last_depth = Some(current_depth);
+        return depth_theo;
+    }
+
+    // 1. 提取前3档行情
+    let top3_bids: Vec<MarketOrder> = current_depth.depth_bids.iter().take(3).cloned().collect();
+    let top3_asks: Vec<MarketOrder> = current_depth.depth_asks.iter().take(3).cloned().collect();
+
+    // 2. 计算买卖盘平均价格和总数量
+    let bid_qty_all: Decimal = top3_bids.iter().map(|d| d.amount).sum();
+    let ask_qty_all: Decimal = top3_asks.iter().map(|d| d.amount).sum();
+
+    let bid_avg_price = if bid_qty_all > Decimal::ZERO {
+        top3_bids.iter().map(|d| d.price * d.amount).sum::<Decimal>() / bid_qty_all
+    } else {
+        Decimal::ZERO // 如果没有买盘数量,平均价格为0
+    };
+    let ask_avg_price = if ask_qty_all > Decimal::ZERO {
+        top3_asks.iter().map(|d| d.price * d.amount).sum::<Decimal>() / ask_qty_all
+    } else {
+        Decimal::ZERO // 如果没有卖盘数量,平均价格为0
+    };
+
+    // 3. 计算 ibary_p (买卖盘平均价格的加权平均值)
+    let ibary_p = if (bid_qty_all + ask_qty_all) > Decimal::ZERO {
+        (bid_avg_price * ask_qty_all + ask_avg_price * bid_qty_all) / (bid_qty_all + ask_qty_all)
+    } else {
+        // 如果总数量为0,则使用买卖平均价格的简单平均值(如果价格可用)
+        (bid_avg_price + ask_avg_price) / Decimal::TWO
+    };
+
+    // 4. 对两幅深度行情之间所有 trade_cum 组的 v 求和,并求 log
+    let tradeflow_cumsum: Decimal = state.trade_cums_between_depths.iter().map(|tc| tc.v).sum();
+    let trade_flow_cum_sum_log = signed_log(tradeflow_cumsum);
+
+    // 5. 对 tradeflow_intraTick_cumsum 计算 EMA,并求 log
+    let tradeflow_cumsum_ema2 = (Decimal::TWO / three) * tradeflow_cumsum + (Decimal::ONE / three) * state.last_trade_flow_cum_sum_ema2;
+    let tradeflow_cumsum_ema5 = (Decimal::ONE / three) * tradeflow_cumsum + (Decimal::TWO / three) * state.last_trade_flow_cum_sum_ema5;
+
+    let tradeflow_cumsum_ema2_log = signed_log(tradeflow_cumsum_ema2);
+    let tradeflow_cumsum_ema5_log = signed_log(tradeflow_cumsum_ema5);
+
+    // 6. 计算 midprice (买一价和卖一价的中间价) 和 depth_minus_mid
+    let midprice = if!current_depth.depth_bids.is_empty() &&!current_depth.depth_asks.is_empty() {
+        (current_depth.depth_bids.get(0).unwrap().price + current_depth.depth_asks.get(0).unwrap().price) / Decimal::TWO
+    } else {
+        Decimal::ZERO // 如果订单簿为空,midprice 为 0
+    };
+    let depth_minus_mid = ibary_p - midprice;
+    let number_1 = Decimal::from_str("7.722e-7").unwrap();
+    let number_2 = Decimal::from_str("2.157e-7").unwrap();
+    let number_3 = Decimal::from_str("3.478e-7").unwrap();
+    let number_4 = Decimal::from_str("0.3429").unwrap();
+
+    // 7. 计算 depth_theo
+    let alpha = number_1 * trade_flow_cum_sum_log
+        + number_2 * tradeflow_cumsum_ema2_log
+        - number_3 * tradeflow_cumsum_ema5_log
+        + number_4 * depth_minus_mid;
+    depth_theo = midprice + alpha;
+
+    // 更新状态,供下一次深度定价和累计交易定价使用
+    state.last_depth = Some(current_depth);
+    state.last_trade_flow_cum_sum_ema2 = tradeflow_cumsum_ema2;
+    state.last_trade_flow_cum_sum_ema5 = tradeflow_cumsum_ema5;
+    state.last_trade_flow_intra_tick_cum_sum_ema2_log = tradeflow_cumsum_ema2_log;
+    state.last_trade_flow_intra_tick_cum_sum_ema5_log = tradeflow_cumsum_ema5_log;
+    state.depth_minus_mid_at_last_depth = depth_minus_mid;
+    state.trade_cums_between_depths.clear(); // 清空两幅深度行情之间的 trade_cum 缓冲区
+
+    state.last_calculated_depth_theo = depth_theo; // 存储计算结果
+    depth_theo
+}
+
+// --- 4. 累计交易定价 (Trade_cum Pricing) ---
+
+/// 根据当前完整的 `TradeCum` 组和之前的深度/交易流状态计算 `trade_theo`。
+pub fn calculate_trade_cum_pricing(state: &mut PricingState, current_trade_cum: &TradeCum) -> Decimal {
+    let mut trade_theo = Decimal::ZERO;
+
+    // 必须有上一幅深度行情才能进行计算
+    let last_depth = match state.last_depth.as_ref() {
+        Some(d) => d,
+        None => return trade_theo, // 如果没有,返回默认值
+    };
+
+    // 1. 计算 midtrade (迭代计算)
+    // 初始化 askp_0 和 bidp_0 为上一幅深度的最佳买卖价
+    let mut current_ask_price = last_depth.depth_asks.get(0).map_or(Decimal::ZERO, |d| d.price);
+    let mut current_bid_price = last_depth.depth_bids.get(0).map_or(Decimal::ZERO, |d| d.price);
+
+    let mut midtrade_n = Decimal::ZERO; // 将保存当前 trade_cum 组中最后一笔交易的 midtrade
+
+    for trade_i in &current_trade_cum.trades {
+        match trade_i.side {
+            TradeSide::Sell => {
+                // 如果是卖单,更新 bid price 为交易价格,ask price 保持不变
+                current_bid_price = trade_i.price;
+            },
+            TradeSide::Buy => {
+                // 如果是买单,更新 ask price 为交易价格,bid price 保持不变
+                current_ask_price = trade_i.price;
+            },
+        }
+        // 计算当前交易后的 midtrade
+        midtrade_n = (current_ask_price + current_bid_price) / Decimal::TWO;
+    }
+
+    // 2. 计算 tradeflow_log 和 trade_flow_cum_sum_log
+    let tradeflow_log = signed_log(current_trade_cum.v);
+
+    // trade_flow_cum_sum_n 是包括 `previous_trade_cums` (即 `state.trade_cums_between_depths` 缓冲区)
+    // 和 `current_trade_cum` 在内的所有 `trade_cum` 组的 `v` 总和。
+    let trade_flow_cum_sum_n: Decimal = state.trade_cums_between_depths.iter().map(|tc| tc.v).sum::<Decimal>() + current_trade_cum.v;
+    let trade_flow_cum_sum_log = signed_log(trade_flow_cum_sum_n);
+    let number_1 = Decimal::from_str("2.461e-6").unwrap();
+    let number_2 = Decimal::from_str("8.486e-7").unwrap();
+    let number_3 = Decimal::from_str("5.788e-7").unwrap();
+    let number_4 = Decimal::from_str("5.989e-7").unwrap();
+    let number_5 = Decimal::from_str("0.1854").unwrap();
+
+    // 3. 计算 trade_theo
+    let alpha = number_1 * tradeflow_log
+        + number_2 * trade_flow_cum_sum_log
+        + number_3 * state.last_trade_flow_intra_tick_cum_sum_ema2_log
+        - number_4 * state.last_trade_flow_intra_tick_cum_sum_ema5_log
+        + number_5 * state.depth_minus_mid_at_last_depth;
+    trade_theo = midtrade_n + alpha;
+
+    state.last_calculated_trade_theo = trade_theo; // 存储计算结果
+    trade_theo
+}
+
+// --- 主定价引擎 (Main Pricing Engine) ---
+
+/// 定价引擎,负责协调 WebSocket 消息处理和定价计算。
+pub struct PricingEngine {
+    state: PricingState,
+}
+
+impl PricingEngine {
+    /// 创建一个新的定价引擎实例。
+    /// `trade_interception_tau_us` 是交易截取的时间阈值,单位微秒。
+    pub fn new(trade_interception_tau_us: u64) -> Self {
+        let mut state = PricingState::default();
+        state.trade_interception_tau_us = trade_interception_tau_us;
+        PricingEngine {
+            state,
+        }
+    }
+
+    /// 当从 WebSocket 接收到新的原始交易消息时调用此方法。
+    /// 它会处理交易,如果形成一个完整的 `TradeCum` 组,则计算并返回 `trade_theo` 价格。
+    pub fn on_trade_message(&mut self, trade: Trade) -> Option<Decimal> {
+        if let Some(trade_cum) = process_raw_trade(&mut self.state, trade) {
+            // 一个完整的 trade_cum 组已形成。
+            // 将其添加到缓冲区,供下一次深度定价使用。
+            self.state.trade_cums_between_depths.push(trade_cum.clone());
+            // 基于这个新的 trade_cum 组计算 trade_theo
+            let trade_theo = calculate_trade_cum_pricing(&mut self.state, &trade_cum);
+            Some(trade_theo)
+        } else {
+            None
+        }
+    }
+
+    /// 当从 WebSocket 接收到新的深度行情消息时调用此方法。
+    /// 它会刷新任何待处理的交易组,计算 `depth_theo` 价格,并返回它。
+    pub fn on_depth_message(&mut self, depth: DepthParam) -> Decimal {
+        // 在计算 depth_theo 之前,确保所有在当前深度更新前发生的交易组都被刷新。
+        if let Some(trade_cum) = flush_trade_group(&mut self.state) {
+            self.state.trade_cums_between_depths.push(trade_cum);
+        }
+
+        let depth_theo = calculate_depth_pricing(&mut self.state, depth);
+        depth_theo
+    }
+
+    /// 返回最后一次计算的 `depth_theo` 和 `trade_theo` 价格。
+    pub fn get_current_pricing_output(&self) -> PricingOutput {
+        PricingOutput {
+            depth_theo: self.state.last_calculated_depth_theo,
+            trade_theo: self.state.last_calculated_trade_theo,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::str::FromStr;
+    use rust_decimal::Decimal;
+    use standard::handle_info::DepthParam;
+    use crate::fix_price::{MarketOrder, PricingEngine, Trade, TradeSide};
+
+    #[test]
+    fn clip_test() {
+        // 初始化定价引擎,设置交易截取阈值为 0.1ms (100微秒)
+        let mut pricing_engine = PricingEngine::new(100);
+
+        // --- 模拟接收交易消息 ---
+        println!("--- 模拟交易消息处理 ---");
+        let trade1 = Trade { timestamp_us: 1_000_000, price: Decimal::from_str("30000").unwrap(), amount: Decimal::from_str("0.1").unwrap(), side: TradeSide::Buy };
+        let trade2 = Trade { timestamp_us: 1_000_050, price: Decimal::from_str("30000.1").unwrap(), amount: Decimal::from_str("0.5").unwrap(), side: TradeSide::Buy };
+        let trade3 = Trade { timestamp_us: 1_000_150, price: Decimal::from_str("29999.8").unwrap(), amount: Decimal::from_str("0.2").unwrap(), side: TradeSide::Sell }; // 超过 100us 阈值,会触发 trade1, trade2 形成一个 TradeCum
+
+        if let Some(trade_theo) = pricing_engine.on_trade_message(trade1) {
+            println!("Trade 1 触发 trade_theo: {}", trade_theo);
+        } else {
+            println!("Trade 1 未触发 trade_theo (组未完成)");
+        }
+
+        if let Some(trade_theo) = pricing_engine.on_trade_message(trade2) {
+            println!("Trade 2 触发 trade_theo: {}", trade_theo);
+        } else {
+            println!("Trade 2 未触发 trade_theo (组未完成)");
+        }
+
+        if let Some(trade_theo) = pricing_engine.on_trade_message(trade3) {
+            println!("Trade 3 触发 trade_theo: {}", trade_theo);
+        } else {
+            println!("Trade 3 未触发 trade_theo (组未完成)");
+        }
+
+        // --- 模拟接收深度行情消息 ---
+        println!("\n--- 模拟深度行情处理 ---");
+        let depth1 = DepthParam {
+            timestamp_us: 1_000_200,
+            t: Decimal::ZERO,
+            create_at: 0,
+            depth_bids: vec![
+                MarketOrder{
+                    price: Decimal::from_str("29999").unwrap(),
+                    amount: Decimal::from_str("1").unwrap(),
+                },
+                MarketOrder{
+                    price: Decimal::from_str("29998.5").unwrap(),
+                    amount: Decimal::from_str("0.3").unwrap(),
+                },
+                MarketOrder{
+                    price: Decimal::from_str("29998").unwrap(),
+                    amount: Decimal::from_str("0.2").unwrap(),
+                }
+            ],
+            depth_asks: vec![
+                MarketOrder{
+                    price: Decimal::from_str("29999.5").unwrap(),
+                    amount: Decimal::from_str("0.3").unwrap(),
+                },
+                MarketOrder{
+                    price: Decimal::from_str("29999.3").unwrap(),
+                    amount: Decimal::from_str("0.2").unwrap(),
+                },
+                MarketOrder{
+                    price: Decimal::from_str("29999.2").unwrap(),
+                    amount: Decimal::from_str("0.3").unwrap(),
+                }
+            ],
+        };
+        let depth_theo = pricing_engine.on_depth_message(depth1);
+        println!("DepthParam 1 触发 depth_theo: {}", depth_theo);
+
+        // --- 模拟后续交易消息 ---
+        println!("\n--- 模拟后续交易消息处理 ---");
+        let trade4 = Trade { timestamp_us: 1_000_300, price: Decimal::from_str("30000.5").unwrap(), amount: Decimal::from_str("0.15").unwrap(), side: TradeSide::Buy };
+        if let Some(trade_theo) = pricing_engine.on_trade_message(trade4) {
+            println!("Trade 4 触发 trade_theo: {}", trade_theo);
+        } else {
+            println!("Trade 4 未触发 trade_theo (组未完成)");
+        }
+
+        // 获取当前最新的定价结果
+        let final_pricing = pricing_engine.get_current_pricing_output();
+        println!("\n最终定价结果: {:?}", final_pricing);
+    }
+}

+ 2 - 1
strategy/src/lib.rs

@@ -15,4 +15,5 @@ mod bybit_usdt_swap;
 mod bitget_usdt_swap;
 mod coinex_usdt_swap;
 mod htx_usdt_swap;
-pub mod clear_core;
+pub mod clear_core;
+mod fix_price;

+ 10 - 10
strategy/src/strategy.rs

@@ -106,8 +106,8 @@ pub struct Strategy {
     pub grid: Decimal,                                              // 网格数量
 
     // 速度限制,至少0.5秒才取消订单
-    pub prev_place_order_timestamp: i64,                            // 上次挂单的时间
-    pub min_cancel_interval_mills: i64,                             // 至少要挂这么久才允许撤销
+    // pub prev_place_order_timestamp: i64,                            // 上次挂单的时间
+    // pub min_cancel_interval_mills: i64,                             // 至少要挂这么久才允许撤销
 }
 
 impl Strategy {
@@ -199,8 +199,8 @@ impl Strategy {
             post_side: 0,
             trade_vol_24h_w: Default::default(),
             grid: Decimal::from(params.grid),
-            prev_place_order_timestamp: 0,
-            min_cancel_interval_mills: 500,
+            // prev_place_order_timestamp: 0,
+            // min_cancel_interval_mills: 500,
         };
 
         // 交易名字
@@ -1025,9 +1025,9 @@ impl Strategy {
     // #[instrument(skip(self, command), level="TRACE")]
     pub fn _cancel_open(&self, command: &mut OrderCommand, local_orders: &HashMap<String, OrderInfo>) {
         // 强制性时间间隔
-        if self.prev_place_order_timestamp + self.min_cancel_interval_mills > Utc::now().timestamp_millis() {
-            return;
-        }
+        // if self.prev_place_order_timestamp + self.min_cancel_interval_mills > Utc::now().timestamp_millis() {
+        //     return;
+        // }
 
         // debug!(?command);
         // 挂单范围
@@ -1293,9 +1293,9 @@ impl Strategy {
         self._update_request_num(&mut command);                     // 统计刷新频率
 
         // 如果提交了订单,则更新最后提交时间
-        if command.limits_open.len() != 0 {
-            self.prev_place_order_timestamp = Utc::now().timestamp_millis();
-        }
+        // if command.limits_open.len() != 0 {
+        //     self.prev_place_order_timestamp = Utc::now().timestamp_millis();
+        // }
 
         // if command.limits_open.len() != 0 || command.limits_close.len() != 0 {
         //     let name = self.params.account_name.clone();