Bladeren bron

接入vwpin

JiahengHe 1 jaar geleden
bovenliggende
commit
9c256b532a

+ 2 - 0
global/src/predictor_state.rs

@@ -22,4 +22,6 @@ pub struct PredictorState {
     pub sigma_square: Decimal,                                                  // σ^2,波动性的平方
     pub gamma: Decimal,                                                         // γ,库存风险厌恶参数
     pub kappa: Decimal,                                                         // κ 订单簿 流动性 参数
+
+    pub vwpin: Decimal,                                                         // vwpin值 知情交易概率
 }

+ 14 - 0
standard/src/lib.rs

@@ -160,6 +160,20 @@ pub struct Trade {
     pub symbol: String,
 }
 
+/// 单位时间的交易数据
+/// - `time(Decimal)`: 交易时间戳(s)
+/// - `size(Decimal)`: 交易量,负数是卖方
+/// - `price(Decimal)`: 成交价格
+/// - `value(Decimal)`: 成交价值(计价币对)
+/// - `symbol(String)`: 成交符号
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct UnitTradeInfo {
+    pub time: Decimal,
+    pub sell_size: Decimal,
+    pub buy_size: Decimal,
+    pub total_size: Decimal
+}
+
 /// 特殊压缩结构体(订单流)
 /// - `0(Decimal)`: id
 /// - `1(Decimal)`: 交易更新时间戳(ms)

+ 126 - 12
strategy/src/avellaneda_stoikov.rs

@@ -1,15 +1,23 @@
 use std::cmp::{max, min};
-use std::collections::{BTreeMap, VecDeque};
+use std::collections::{BTreeMap, HashMap, VecDeque};
 use std::sync::Arc;
 use chrono::Utc;
 use rust_decimal::prelude::*;
 use rust_decimal_macros::dec;
 use tokio::sync::Mutex;
-use tracing::info;
+use tracing::{info};
 use global::cci::CentralControlInfo;
 use global::fixed_time_range_deque::FixedTimeRangeDeque;
 use global::predictor_state::PredictorState;
-use standard::{Depth, Record, Ticker, Trade};
+use standard::{Depth, Record, Ticker, Trade, UnitTradeInfo};
+
+/**
+1. 微价格(加权中间价)作为基价
+    S = (ap*bv+bp*av)/(av+bv)
+2. ROC 值 下单风控
+3. VWPIN 值 下单风控
+
+**/
 
 #[derive(Debug)]
 pub struct AvellanedaStoikov {
@@ -18,10 +26,15 @@ pub struct AvellanedaStoikov {
     pub trade_short_vec: FixedTimeRangeDeque<Trade>,                            // 交易队列
     pub spread_vec: FixedTimeRangeDeque<Decimal>,
     pub record_vec: VecDeque<Record>,                                           // 蜡烛队列
+    pub vwpin_source_map: HashMap<i64, UnitTradeInfo>,
 
+    pub volume24h: Decimal,                                                     // 最新的24小时成交数量
     pub mid_price: Decimal,                                                     // 中间价
     pub ask_price: Decimal,                                                     // 卖一价
+    pub ask_size: Decimal,                                                      // 卖一量
     pub bid_price: Decimal,                                                     // 买一价
+    pub bid_size: Decimal,                                                      // 买一量
+
     pub last_price: Decimal,                                                    // 最后成交价
     pub spread: Decimal,                                                        // 市场冲击
     pub spread_max: Decimal,                                                    // 最大市场冲击
@@ -35,6 +48,10 @@ pub struct AvellanedaStoikov {
     pub gamma: Decimal,                                                         // γ,库存风险厌恶参数
     pub kappa: Decimal,                                                         // κ 订单簿 流动性 参数
 
+    pub vwpin: Decimal,                                                         // vwpin值
+    pub vwpin_avg: Decimal,                                                     // vwpin平均值
+    pub vwpin_vec: VecDeque<Decimal>,
+
     pub flow_ratio: Decimal,                                                    // 资金流比例
     pub flow_ratio_short: Decimal,                                              // 资金流比例
     pub money_flow_index: Decimal,                                              // MFI
@@ -70,9 +87,14 @@ impl AvellanedaStoikov {
             trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
             record_vec: VecDeque::new(),
 
+            vwpin_source_map: Default::default(),
+            volume24h: Default::default(),
+
             mid_price: Default::default(),
             ask_price: Default::default(),
+            ask_size: Default::default(),
             bid_price: Default::default(),
+            bid_size: Default::default(),
             last_price: Default::default(),
             spread: Default::default(),
             spread_max: Default::default(),
@@ -100,6 +122,10 @@ impl AvellanedaStoikov {
             flow_ratio: Default::default(),
             flow_ratio_short: Default::default(),
             money_flow_index: Default::default(),
+
+            vwpin: Default::default(),
+            vwpin_avg: Default::default(),
+            vwpin_vec: VecDeque::new(),
         };
 
         avellaneda_stoikov
@@ -159,22 +185,69 @@ impl AvellanedaStoikov {
         self.processor().await;
     }
 
-    pub async fn on_trade(&mut self, trade: &Trade) {
-        self.trade_long_vec.push_back(trade.clone());
-        self.trade_short_vec.push_back(trade.clone());
+    pub async fn on_trade(&mut self, trades: Vec<Trade>) {
+        for trade in trades {
+            self.trade_long_vec.push_back(trade.clone());
+            self.trade_short_vec.push_back(trade.clone());
+
+            self.last_price = trade.price;
+            self.update_spread();
+            self.wvpin_source_update(&trade);
+        }
 
-        self.last_price = trade.price;
-        self.update_spread();
         self.processor().await;
     }
 
+    pub fn wvpin_source_update(&mut self, trade: &Trade) {
+        // 秒级
+        let time_key = trade.time.to_i64().unwrap() / 1000;
+
+        if self.vwpin_source_map.contains_key(&time_key) {
+            let value = self.vwpin_source_map.get_mut(&time_key).unwrap();
+            if trade.size > Decimal::ZERO {
+                value.buy_size = value.buy_size + trade.size;
+            } else {
+                value.sell_size = value.sell_size + trade.size.abs();
+            }
+            value.total_size = self.volume24h;
+        } else {
+            let mut unit_trade_info = UnitTradeInfo{
+                time: trade.time,
+                sell_size: Decimal::ZERO,
+                buy_size: Decimal::ZERO,
+                total_size: self.volume24h,
+            };
+            if trade.size > Decimal::ZERO {
+                unit_trade_info.buy_size = trade.size.abs();
+            } else {
+                unit_trade_info.sell_size = trade.size.abs();
+            }
+
+            self.vwpin_source_map.insert(time_key, unit_trade_info);
+        }
+        // 保留60单位秒以内的数据
+        if self.vwpin_source_map.len() > 60 {
+            // 找到具有最小时间戳的键并移除
+            if let Some(&oldest_key) = self.vwpin_source_map.keys().min() {
+                self.vwpin_source_map.remove(&oldest_key);
+            }
+        }
+
+        // 更新vwpin
+        self.update_vwpin();
+    }
+
+    pub async fn on_ticker(&mut self, ticker: &Ticker) {
+        if ticker.volume > Decimal::ZERO{
+            self.volume24h = ticker.volume;
+        }
+    }
+
     pub async fn update_level(&mut self) {
         self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
         self.level = min(self.level, dec!(6));
     }
 
-    pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
-
     pub async fn on_record(&mut self, record: &Record) {
         // 添加新蜡烛
         if self.record_vec.len() == 0 {
@@ -449,6 +522,38 @@ impl AvellanedaStoikov {
         self.flow_ratio_short = Self::calc_flow_ratio(&self.flow_ratio_short, &dec!(0), &mut self.trade_long_vec);
     }
 
+    pub fn update_vwpin(&mut self) {
+        if self.vwpin_source_map.len() == 60 {
+            let mut basic_vec: Vec<Decimal> = Vec::new();
+
+            for (_key, value) in &self.vwpin_source_map {
+                if value.total_size > Decimal::ZERO {
+                    let total_size = value.sell_size + value.buy_size;
+                    let vwpin_item = total_size / self.volume24h * (value.sell_size - value.buy_size).abs() / total_size;
+                    basic_vec.push(vwpin_item);
+                }
+            }
+
+            if basic_vec.len() > 0{
+                let vwpin = basic_vec.iter().copied().sum();
+                self.vwpin = vwpin;
+               // 10s后开始更新平均值
+               if basic_vec.len() > 10 {
+                   // 限制记录历史vwpin值数量,数量阈值100·
+                   if self.vwpin_vec.len() == 100 {
+                       self.vwpin_vec.pop_front();
+                   }
+                   self.vwpin_vec.push_back(vwpin);
+                   // 满数据后计算平均值
+                   if self.vwpin_vec.len() == 100 {
+                       let count_vwpin: Decimal = self.vwpin_vec.iter().copied().sum();
+                       self.vwpin_avg = count_vwpin / Decimal::from(self.vwpin_vec.len());
+                   }
+               }
+            }
+        }
+    }
+
     pub fn check_ready(&mut self) {
         if self.is_ready {
             return;
@@ -478,6 +583,14 @@ impl AvellanedaStoikov {
             return;
         }
 
+        if self.vwpin_source_map.len() < 60 {
+            return;
+        }
+
+        if self.vwpin_vec.len() < 60 {
+            return;
+        }
+
         if self.trade_long_vec.len() < 100 {
             return;
         }
@@ -525,9 +638,10 @@ impl AvellanedaStoikov {
             optimal_bid_price: self.optimal_bid_price,
 
             inventory: self.inventory,
-            sigma_square: self.money_flow_index,
+            sigma_square: self.vwpin_avg,
             gamma: self.flow_ratio_short,
-            kappa: self.t_diff,
+            kappa: self.vwpin,
+            vwpin: self.vwpin,
 
             flow_ratio: self.flow_ratio,
             ref_price: self.ref_price,

+ 5 - 5
strategy/src/binance_usdt_swap.rs

@@ -105,14 +105,14 @@ async fn on_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, run_symbol: &
         }
         "aggTrade" => {
             trace_stack.set_source("binance_usdt_swap.aggTrade".to_string());
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, response, multiplier);
+            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, response, multiplier);
             trace_stack.on_after_format();
 
-            for trade in trades.iter_mut() {
-                let core_arc_clone = core_arc.clone();
+            // for trade in trades.iter_mut() {
+            let core_arc_clone = core_arc.clone();
 
-                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
-            }
+            on_trade(core_arc_clone, &response.label, &mut trace_stack, trades).await;
+            // }
         }
         "bookTicker" => {
             trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());

+ 4 - 5
strategy/src/bybit_usdt_swap.rs

@@ -29,6 +29,7 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         ws.set_subscribe(vec![
             BybitSwapSubscribeType::PuOrderBook1,
             BybitSwapSubscribeType::PuTrade,
+            BybitSwapSubscribeType::PuTickers,
             BybitSwapSubscribeType::PuKline("1".to_string()),
             // BybitSwapSubscribeType::PuTickers
         ]);
@@ -159,14 +160,12 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
         "trade" => {
             trace_stack.set_source("bybit_usdt_swap.trade".to_string());
 
-            let mut trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul);
+            let trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul);
             trace_stack.on_after_format();
 
-            for trade in trades.iter_mut() {
-                let core_arc_clone = core_arc.clone();
+            let core_arc_clone = core_arc.clone();
 
-                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
-            }
+            on_trade(core_arc_clone, &response.label, &mut trace_stack, trades).await;
         }
         "tickers" => {
             trace_stack.set_source("bybit_usdt_swap.tickers".to_string());

+ 1 - 1
strategy/src/core.rs

@@ -608,7 +608,7 @@ impl Core {
         }
     }
 
-    pub async fn on_trade(&mut self, trade: &Trade, _name_ref: &String, _trace_stack: &mut TraceStack) {
+    pub async fn on_trade(&mut self, trade: Vec<Trade>, _name_ref: &String, _trace_stack: &mut TraceStack) {
         self.avellaneda_stoikov.on_trade(trade).await;
     }
 

+ 1 - 1
strategy/src/exchange_disguise.rs

@@ -130,7 +130,7 @@ pub async fn on_depth(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &
     core.on_depth(depth, &label, trace_stack).await;
 }
 
-pub async fn on_trade(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &mut TraceStack, trade: &Trade) {
+pub async fn on_trade(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &mut TraceStack, trade: Vec<Trade>) {
     let mut core = core_arc.lock().await;
     trace_stack.on_after_unlock_core();
 

+ 4 - 4
strategy/src/gate_usdt_swap.rs

@@ -115,14 +115,14 @@ async fn on_data(core_arc: Arc<Mutex<Core>>,
         }
         "futures.trades" => {
             trace_stack.set_source("gate_usdt_swap.trades".to_string());
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, response, multiplier);
+            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, response, multiplier);
             trace_stack.on_after_format();
 
-            for trade in trades.iter_mut() {
+            // for trade in trades.iter_mut() {
                 let core_arc_clone = core_arc.clone();
 
-                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
-            }
+                on_trade(core_arc_clone, &response.label, &mut trace_stack, trades).await;
+            // }
         }
         "futures.balances" => {
             let account = ExchangeStructHandler::account_info_handle(ExchangeEnum::GateSwap, response, &run_symbol);

+ 14 - 1
strategy/src/strategy.rs

@@ -101,6 +101,9 @@ pub struct Strategy {
     pub post_side: i64,                                             // 交易方向
     pub trade_vol_24h_w: Decimal,                                   // 24小时成交额(单位:万)
     pub grid: Decimal,                                              // 网格数量
+
+    // pub open_num: i64,
+    // pub no_open_num: i64
 }
 
 impl Strategy {
@@ -191,6 +194,8 @@ impl Strategy {
             post_side: 0,
             trade_vol_24h_w: Default::default(),
             grid: Decimal::from(params.grid),
+            // open_num: 0,
+            // no_open_num: 0,
         };
 
         // 交易名字
@@ -1110,7 +1115,15 @@ impl Strategy {
         }
 
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+        if predictor.vwpin_avg!=Decimal::ZERO && predictor.vwpin > predictor.vwpin_avg {
+           // self.no_open_num += 1;
+        } else {
+           // self.open_num += 1;
+            self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+        }
+        // info!("允许开仓信号数: {}, 不允许开仓信号数: :{}", self.open_num, self.no_open_num);
+        // self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter