Browse Source

新版架构,降低延迟,自己参考自己。

skyfffire 11 months ago
parent
commit
77fe26647b

+ 1 - 1
global/src/cci.rs

@@ -16,7 +16,7 @@ pub struct CentralControlInfo {
 
 impl CentralControlInfo {
     // 时间窗口大小(微秒)
-    const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
+    const MAX_TIME_RANGE_MICROS: i64 = 2 * 60_000_000;
 
     pub fn new() -> Self {
         Self {

+ 168 - 71
strategy/src/avellaneda_stoikov.rs

@@ -39,6 +39,12 @@ pub struct AvellanedaStoikov {
     pub gamma: Decimal,                                                         // γ,库存风险厌恶参数
     pub kappa: Decimal,                                                         // κ 订单簿 流动性 参数
 
+    pub flow_ratio: Decimal,                                                    // 资金流比例
+    pub money_flow_index: Decimal,                                              // MFI
+    pub long_trade_len_dec: Decimal,
+    pub short_trade_len_dec: Decimal,
+    pub error_rate: Decimal,                                                    // 犯错概率(预估)
+
     pub ask_delta: Decimal,                                                     // δa
     pub bid_delta: Decimal,                                                     // δb
     pub base_delta: Decimal,                                                    // 基础挂单距离
@@ -52,6 +58,9 @@ pub struct AvellanedaStoikov {
     pub prev_trade_time: i64,                                                   // 上次交易时间,也就是t
     pub close_price: Decimal,                                                   // 平仓价格
     pub t_diff: Decimal,                                                        // (T-t)
+    pub last_update_time: Decimal,
+    pub last_index: Decimal,
+    pub prev_insert_time: Decimal,
 }
 
 impl AvellanedaStoikov {
@@ -60,7 +69,7 @@ impl AvellanedaStoikov {
     const TIME_DIFF_RANGE_MICROS: i64 = 3 * 60_000_000;
     const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000;
     // const SPREAD_RANGE_MICROS: i64 = 15 * 60_000_000;
-    const TRADE_SHORT_RANGE_MICROS: i64 = 60_000_000;
+    const TRADE_SHORT_RANGE_MICROS: i64 = 30_000_000;
     // const ONE_MILLION: Decimal = dec!(1_000_000);
     // const TWENTY_THOUSAND: Decimal = dec!(20_000);
     const IRA: Decimal = dec!(1);
@@ -108,7 +117,15 @@ impl AvellanedaStoikov {
             close_price: Default::default(),
             t_diff: Default::default(),
             level: Default::default(),
+            flow_ratio: Default::default(),
+            money_flow_index: Default::default(),
             pos_amount: Default::default(),
+            error_rate: Default::default(),
+            long_trade_len_dec: Default::default(),
+            short_trade_len_dec: Default::default(),
+            last_update_time: Default::default(),
+            last_index: Default::default(),
+            prev_insert_time: Default::default(),
         };
 
         avellaneda_stoikov
@@ -124,57 +141,57 @@ impl AvellanedaStoikov {
     }
 
     // 更新最佳市场冲击
-    pub fn update_spread_best(&mut self) {
-        self.spread_best = self.spread_max;
-        let mut max_count = 0usize;
-
-        for (spread, count) in self.spread_count_map.iter() {
-            // info!(?spread, ?count);
-
-            if *count < max_count {
-                continue
-            }
+    // pub fn update_spread_best(&mut self) {
+    //     self.spread_best = self.spread_max;
+    //     let mut max_count = 0usize;
+    //
+    //     for (spread, count) in self.spread_count_map.iter() {
+    //         // info!(?spread, ?count);
+    //
+    //         if *count < max_count {
+    //             continue
+    //         }
+    //
+    //         self.spread_best = *spread;
+    //         max_count = *count;
+    //     }
+    //     // info!("======================")
+    // }
 
-            self.spread_best = *spread;
-            max_count = *count;
+    pub fn update_spread(&mut self) {
+        // if self.trade_long_vec.len() > 0 {
+        let prev_depth_0 = &self.depth_vec[0];
+        if prev_depth_0.time.is_zero() {
+            return;
         }
-        // info!("======================")
-    }
 
-    pub fn update_spread(&mut self) {
-        if self.trade_long_vec.len() > 0 {
-            //
-            let last_trade = self.trade_long_vec.get(self.trade_long_vec.len() - 1).unwrap();
-            let last_trade_price = last_trade.price;
-            let last_trade_time = last_trade.time;
-
-            let mut first_trade_price = last_trade.price;
-            for trade in self.trade_long_vec.deque.iter().rev() {
-                if last_trade_time - trade.time > Decimal::TEN {
-                    break;
-                }
-
-                first_trade_price = trade.price;
+        let prev_mid_price = (prev_depth_0.asks[0].price + prev_depth_0.bids[0].price) / Decimal::TWO;
+        let now_spread = (prev_mid_price - self.mid_price).abs();
+
+        if !now_spread.is_zero() {
+            self.spread = now_spread;
+            self.spread_vec.push(self.spread);
+            // if last_trade_price > first_trade_price {
+            //     self.spread_long_vec.push(self.spread);
+            // } else {
+            //     self.spread_short_vec.push(self.spread);
+            // }
+            self.spread_count_map.insert(self.spread, self.spread_count_map.get(&self.spread).unwrap_or(&0) + 1);
+
+            if self.spread_vec.len() > 2_000 {
+                let pop_value = self.spread_vec.remove(0);
+                self.spread_count_map.insert(pop_value, self.spread_count_map.get(&pop_value).unwrap() - 1);
             }
 
-            let now_spread = (last_trade_price - first_trade_price).abs();
-            if !now_spread.is_zero() {
-                self.spread = now_spread;
-                self.spread_vec.push(self.spread);
-                self.spread_count_map.insert(self.spread, self.spread_count_map.get(&self.spread).unwrap_or(&0) + 1);
-
-                if self.spread_vec.len() > 1_000 {
-                    let pop_value = self.spread_vec.remove(0);
-                    self.spread_count_map.insert(pop_value, self.spread_count_map.get(&pop_value).unwrap() - 1);
-                }
-
-                self.update_spread_max();
-                self.update_spread_best();
-            }
+            self.update_spread_max();
+            // self.update_spread_best();
         }
+        // }
     }
 
     pub async fn on_depth(&mut self, depth: &Depth, index: usize) {
+        self.last_update_time = depth.time;
+        self.last_index = Decimal::from(index);
         self.update_fair_price(depth, index).await;
 
         if index == 0 {
@@ -189,10 +206,17 @@ impl AvellanedaStoikov {
         self.processor().await;
     }
 
-    pub async fn on_trade(&mut self, trade: &Trade) {
+    pub async fn on_trade(&mut self, trade: &Trade, _index: usize) {
+        // self.last_update_time = trade.time;
+
         self.trade_long_vec.push_back(trade.clone());
         self.trade_short_vec.push_back(trade.clone());
 
+        self.long_trade_len_dec = Decimal::from_usize(self.trade_long_vec.len()).unwrap();
+        self.short_trade_len_dec = Decimal::from_usize(self.trade_short_vec.len()).unwrap();
+        self.error_rate = self.short_trade_len_dec / self.long_trade_len_dec;
+        self.error_rate.rescale(4);
+
         self.last_price = trade.price;
         self.processor().await;
     }
@@ -242,14 +266,44 @@ impl AvellanedaStoikov {
         if self.record_vec.len() > 4 {
             self.record_vec.pop_front();
         }
+
+        // 如果蜡烛数量足够,则更新mfi
+        if self.record_vec.len() >= 4 {
+            self.update_mfi();
+        }
+    }
+
+    pub fn update_mfi(&mut self) {
+        let mut money_flow_in = Decimal::ZERO;
+        let mut money_flow_out = Decimal::ZERO;
+        let _3 = dec!(3);
+
+        for record in self.record_vec.iter() {
+            let typical_price = (record.high + record.low + record.close) / _3;
+            let money_flow = typical_price * record.volume;
+            if record.close > record.open {
+                money_flow_in += money_flow;
+            } else if record.close < record.open {
+                money_flow_out += money_flow;
+            }
+        }
+
+        self.money_flow_index = if money_flow_out.is_zero() {
+            Decimal::ONE_HUNDRED
+        } else {
+            let money_flow_ratio = money_flow_in / money_flow_out;
+
+            Decimal::ONE_HUNDRED - Decimal::ONE_HUNDRED / (Decimal::ONE + money_flow_ratio)
+        };
+
+        self.update_flow_ratio();
     }
 
     pub async fn on_inventory(&mut self, pos_amount: &Decimal, min_amount_value: &Decimal) {
         let prev_inventory = self.inventory;
         self.pos_amount = pos_amount.clone();
-        self.inventory = (pos_amount / (min_amount_value / self.mid_price)).round();
-        // 发现开仓行为
-        if prev_inventory.is_zero() && prev_inventory != self.inventory {
+        self.inventory = (pos_amount / (min_amount_value / self.mid_price)).floor();
+        if prev_inventory != self.inventory {
             self.prev_trade_time = Utc::now().timestamp_micros();
             self.close_price = self.order_ref_price;
         }
@@ -280,7 +334,6 @@ impl AvellanedaStoikov {
             // let v0_rate = self.volume_vec[0] / (self.volume_vec[0] + self.volume_vec[1]);
             // let v1_rate = self.volume_vec[1] / (self.volume_vec[0] + self.volume_vec[1]);
 
-
             let sma = self.depth_vec[1].asks[0].price;
             let smb = self.depth_vec[1].bids[0].price;
 
@@ -289,8 +342,8 @@ impl AvellanedaStoikov {
 
             let price_diff = mp0 - mp1;
 
-            // self.ref_price = self.fair_price_vec[0] * v0_rate + self.fair_price_vec[1] * v1_rate + price_diff / Decimal::TWO;
             self.ref_price = (self.fair_price_vec[0] + self.fair_price_vec[1] + price_diff) / Decimal::TWO;
+            // self.ref_price = (self.fair_price_vec[0] + self.fair_price_vec[1]) / Decimal::TWO;
         }
     }
 
@@ -360,6 +413,16 @@ impl AvellanedaStoikov {
     //     }
     // }
 
+    pub fn update_flow_ratio(&mut self) {
+        self.flow_ratio = if self.money_flow_index > dec!(80) {
+            (self.money_flow_index - dec!(80)) / dec!(20)
+        } else if self.money_flow_index < dec!(20) {
+            Decimal::NEGATIVE_ONE * (dec!(20) - self.money_flow_index) / dec!(20)
+        } else {
+            Decimal::ZERO
+        };
+    }
+
     pub fn check_ready(&mut self) {
         if self.is_ready {
             return;
@@ -412,38 +475,72 @@ impl AvellanedaStoikov {
             return;
         }
 
-        let mut cci = self.cci_arc.lock().await;
         let mut smm = Decimal::ZERO;
         if !self.depth_vec[1].time.is_zero() {
             let sma = self.depth_vec[1].asks[0].price;
             let smb = self.depth_vec[1].bids[0].price;
             smm = (sma + smb) / Decimal::TWO;
         }
-        cci.predictor_state_vec.push_back(PredictorState {
-            update_time: Decimal::from_i64(Utc::now().timestamp_millis()).unwrap(),
-
-            mid_price: self.last_price,
-            ask_price: self.ask_price,
-            bid_price: self.bid_price,
-            last_price: self.last_price,
-            spread: smm,
-            spread_max: self.mid_price,
-            spread_min: self.ref_price,
-            optimal_ask_price: self.optimal_ask_price,
-            optimal_bid_price: self.optimal_bid_price,
-
-            inventory: self.inventory,
-            sigma_square: self.sigma_square,
-            gamma: self.spread,
-            kappa: self.kappa,
-
-            flow_ratio: Decimal::ZERO,
-            ref_price: self.fair_price_vec[0],
+
+
+        let cci_arc = self.cci_arc.clone();
+        let now = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
+        let mid_price = self.last_price;
+        let ask_price = self.ask_price;
+        let bid_price = self.bid_price;
+        let last_price = self.last_price;
+
+        let spread = smm;
+        let spread_max = self.mid_price;
+        let spread_min = self.ref_price;
+        let optimal_ask_price = self.optimal_ask_price;
+        let optimal_bid_price = self.optimal_bid_price;
+
+        let inventory = self.inventory;
+        let sigma_square = self.error_rate;
+        let gamma = now - self.last_update_time;
+        let kappa = self.short_trade_len_dec;
+
+        let flow_ratio = Decimal::ZERO;
+        let ref_price = self.ref_price;
+
+        let need_append = now - self.prev_insert_time > Decimal::ONE_HUNDRED;
+        if !need_append {
+            return;
+        }
+
+        self.prev_insert_time = now;
+        // 数据量太多导致的,减少一些吧
+        tokio::spawn(async move {
+            let mut cci = cci_arc.lock().await;
+
+            cci.predictor_state_vec.push_back(PredictorState {
+                update_time: now,
+
+                mid_price,
+                ask_price,
+                bid_price,
+                last_price,
+
+                spread,
+                spread_max,
+                spread_min,
+                optimal_ask_price,
+                optimal_bid_price,
+
+                inventory,
+                sigma_square,
+                gamma,
+                kappa,
+
+                flow_ratio,
+                ref_price,
+            });
         });
     }
 
     // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
     pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
-        return vec![];
+        vec![]
     }
 }

+ 1 - 1
strategy/src/binance_usdt_swap.rs

@@ -111,7 +111,7 @@ async fn on_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, run_symbol: &
             for trade in trades.iter_mut() {
                 let core_arc_clone = core_arc.clone();
 
-                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
+                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade, 1).await;
             }
         }
         "bookTicker" => {

+ 3 - 3
strategy/src/bitget_usdt_swap.rs

@@ -27,8 +27,8 @@ pub async fn reference_bitget_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         let mut ws = BitgetSwapWs::new_label(name, is_colo, None, BitgetSwapWsType::Public);
         ws.set_subscribe(vec![
             BitgetSwapSubscribeType::PuBooks1,
-            BitgetSwapSubscribeType::PuKline("1".to_string()),
-            BitgetSwapSubscribeType::PuTrade,
+            // BitgetSwapSubscribeType::PuKline("1".to_string()),
+            // BitgetSwapSubscribeType::PuTrade,
         ]);
 
 
@@ -209,7 +209,7 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
             for trade in trades.iter_mut() {
                 let core_arc_clone = core_arc.clone();
 
-                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
+                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade, 1).await;
             }
         }
         "tickers" => {

+ 12 - 11
strategy/src/bybit_usdt_swap.rs

@@ -29,7 +29,7 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         ws.set_subscribe(vec![
             BybitSwapSubscribeType::PuTrade,
             BybitSwapSubscribeType::PuOrderBook1,
-            // BybitSwapSubscribeType::PuKline("1".to_string()),
+            BybitSwapSubscribeType::PuKline("1".to_string()),
             // BybitSwapSubscribeType::PuTickers
         ]);
 
@@ -78,11 +78,11 @@ pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
                                    is_colo: bool,
                                    exchange_params: BTreeMap<String, String>) {
     // 参考
-    let name_c = name.clone();
-    let symbols_c = symbols.clone();
-    let is_shutdown_arc_c = is_shutdown_arc.clone();
-    let core_arc_c = core_arc.clone();
-    reference_bybit_swap_run(is_shutdown_arc_c, core_arc_c, name_c, symbols_c, is_colo).await;
+    // let name_c = name.clone();
+    // let symbols_c = symbols.clone();
+    // let is_shutdown_arc_c = is_shutdown_arc.clone();
+    // let core_arc_c = core_arc.clone();
+    // reference_bybit_swap_run(is_shutdown_arc_c, core_arc_c, name_c, symbols_c, is_colo).await;
 
     // 交易
     spawn(async move {
@@ -153,12 +153,14 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
                 };
 
                 trace_stack.on_after_format();
-                on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 0).await;
+                on_depth(core_arc.clone(), &response.label, &mut trace_stack, &result_depth, 0).await;
+                on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 1).await;
             }
             // 全量
             else {
                 trace_stack.on_after_format();
-                on_depth(core_arc, &response.label, &mut trace_stack, &depth, 0).await;
+                on_depth(core_arc.clone(), &response.label, &mut trace_stack, &depth, 0).await;
+                on_depth(core_arc, &response.label, &mut trace_stack, &depth, 1).await;
 
                 depth_asks.clear();
                 depth_asks.append(&mut depth.asks);
@@ -173,9 +175,8 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
             trace_stack.on_after_format();
 
             for trade in trades.iter_mut() {
-                let core_arc_clone = core_arc.clone();
-
-                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
+                on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 0).await;
+                on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 1).await;
             }
         }
         "tickers" => {

+ 2 - 2
strategy/src/core.rs

@@ -608,8 +608,8 @@ impl Core {
         }
     }
 
-    pub async fn on_trade(&mut self, trade: &Trade, _name_ref: &String, _trace_stack: &mut TraceStack) {
-        self.avellaneda_stoikov.on_trade(trade).await;
+    pub async fn on_trade(&mut self, trade: &Trade, _name_ref: &String, _trace_stack: &mut TraceStack, index: usize) {
+        self.avellaneda_stoikov.on_trade(trade, index).await;
     }
 
     pub async fn on_ticker(&mut self, ticker: &Ticker, _trace_stack: &mut TraceStack) {

+ 2 - 2
strategy/src/exchange_disguise.rs

@@ -131,11 +131,11 @@ pub async fn on_depth(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &
     core.on_depth(depth, &label, trace_stack, index).await;
 }
 
-pub async fn on_trade(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &mut TraceStack, trade: &Trade) {
+pub async fn on_trade(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &mut TraceStack, trade: &Trade, index: usize) {
     let mut core = core_arc.lock().await;
     trace_stack.on_after_unlock_core();
 
-    core.on_trade(trade, &label, trace_stack).await;
+    core.on_trade(trade, &label, trace_stack, index).await;
 }
 
 pub async fn on_ticker(core_arc: Arc<Mutex<Core>>, trace_stack: &mut TraceStack, ticker: &Ticker) {

+ 1 - 1
strategy/src/gate_usdt_swap.rs

@@ -121,7 +121,7 @@ async fn on_data(core_arc: Arc<Mutex<Core>>,
             for trade in trades.iter_mut() {
                 let core_arc_clone = core_arc.clone();
 
-                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
+                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade, 1).await;
             }
         }
         "futures.balances" => {

+ 2 - 2
strategy/src/strategy.rs

@@ -989,7 +989,7 @@ impl Strategy {
                 // debug!(?order);
                 command.limits_open.insert(client_id, order);
             } else {
-                info!("下单价值太小,要求:{},但是价格:{}, 数量:{}", self.min_amount_value, target_buy_price, amount);
+                info!("下单价值太小,要求:{},但是价格:{}, 数量:{}", self.min_amount_value, target_buy_price, amount);
             }
         }
         // 挂空单
@@ -1031,7 +1031,7 @@ impl Strategy {
                 // debug!(?order);
                 command.limits_open.insert(client_id, order);
             } else {
-                info!("下单价值太小,要求:{},但是价格:{}, 数量:{}", self.min_amount_value, target_sell_price, amount);
+                info!("下单价值太小,要求:{},但是价格:{}, 数量:{}, inv={}", self.min_amount_value, target_sell_price, amount, predictor.inventory.abs());
             }
         }
     }