Ver código fonte

与S1一样了,重点研究资金流。

skyffire 1 ano atrás
pai
commit
3457300854

+ 2 - 1
standard/Cargo.toml

@@ -19,4 +19,5 @@ futures = "0.3"
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 toml = "0.5.11"
-futures-channel = "0.3.29"
+futures-channel = "0.3.29"
+lazy_static = "1.4.0"

+ 4 - 2
standard/src/binance_swap.rs

@@ -177,13 +177,14 @@ impl Platform for BinanceSwap {
         if res_data.code == 200 {
             let res_data_json: serde_json::Value = res_data.data;
             let result = Ticker {
-                time: res_data_json["time"].as_i64().unwrap(),
+                time: Decimal::from_i64(res_data_json["time"].as_i64().unwrap()).unwrap(),
                 high: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 low: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 sell: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 buy: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 last: dec!(-1),
                 volume: dec!(-1),
+                open_interest: Default::default(),
             };
             Ok(result)
         } else {
@@ -197,13 +198,14 @@ impl Platform for BinanceSwap {
         if res_data.code == 200 {
             let res_data_json: serde_json::Value = res_data.data;
             let result = Ticker {
-                time: res_data_json["time"].as_i64().unwrap(),
+                time: Decimal::from_i64(res_data_json["time"].as_i64().unwrap()).unwrap(),
                 high: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 low: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 sell: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 buy: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 last: dec!(-1),
                 volume: dec!(-1),
+                open_interest: Default::default(),
             };
             Ok(result)
         } else {

+ 6 - 4
standard/src/bybit_swap.rs

@@ -193,13 +193,14 @@ impl Platform for BybitSwap {
             }
             let value = list[0].clone();
             Ok(Ticker{
-                time: chrono::Utc::now().timestamp_millis(),
+                time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                 high: value.high_price24h,
                 low: value.low_price24h,
                 sell: value.ask1_price,
                 buy: value.bid1_price,
                 last: value.last_price,
-                volume: value.volume24h
+                volume: value.volume24h,
+                open_interest: Default::default(),
             })
         } else {
             Err(Error::new(ErrorKind::Other, res_data.to_string()))
@@ -220,13 +221,14 @@ impl Platform for BybitSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: value.high_price24h,
                         low: value.low_price24h,
                         sell: value.ask1_price,
                         buy: value.bid1_price,
                         last: value.last_price,
-                        volume: value.volume24h
+                        volume: value.volume24h,
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }

+ 47 - 2
standard/src/bybit_swap_handle.rs

@@ -1,12 +1,16 @@
 use std::str::FromStr;
-use rust_decimal::Decimal;
+use std::sync::Arc;
+use chrono::Utc;
+use lazy_static::lazy_static;
+use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
 use serde_json::{from_value, Value};
+use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Depth, Trade};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Depth, Trade, Ticker};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -193,6 +197,47 @@ pub fn handle_book_ticker(res_data: &ResponseData, mul: &Decimal) -> Depth {
     }
 }
 
+lazy_static! {
+    static ref TICKER: Arc<Mutex<Ticker>> = Arc::new(Mutex::new(Ticker::new()));
+}
+
+pub async fn handle_ticker(res_data: &ResponseData) -> Ticker {
+    let mut ticker = TICKER.lock().await;
+
+    ticker.time = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
+    ticker.high = match res_data.data["highPrice24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.high }
+    };
+    ticker.low = match res_data.data["lowPrice24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.low }
+    };
+    ticker.sell = match res_data.data["ask1Price"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.sell }
+    };
+    ticker.buy = match res_data.data["bid1Price"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.buy }
+    };
+    ticker.last = match res_data.data["lastPrice"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.last }
+    };
+    ticker.volume = match res_data.data["volume24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.volume }
+    };
+    ticker.open_interest = match res_data.data["openInterest"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.open_interest }
+    };
+    // let s = res_data.data["symbol"].as_str().unwrap().replace("USDT", "_USDT");
+
+    ticker.clone()
+}
+
 pub fn format_depth_items(value: Value, mul: &Decimal) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for val in value.as_array().unwrap() {

+ 4 - 2
standard/src/coinex_swap.rs

@@ -182,13 +182,14 @@ impl Platform for CoinexSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }
@@ -211,13 +212,14 @@ impl Platform for CoinexSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }

+ 13 - 3
standard/src/exchange_struct_handler.rs

@@ -4,7 +4,7 @@ use rust_decimal::prelude::FromPrimitive;
 use tracing::error;
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_swap_handle};
+use crate::{binance_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_swap_handle, Ticker};
 use crate::{Record, Trade, Depth};
 use crate::{Account, OrderBook, Position, SpecialOrder};
 
@@ -98,7 +98,6 @@ impl ExchangeStructHandler {
             symbol,
         }
     }
-
     // 处理成交信息,关于mul的问题:
     //     因为部分交易所比较特殊,返回的深度的数量是张数,这里是标准化成U量的形式;
     //     如果是不需要处理的交易所,传个Decimal::ONE就行了
@@ -158,7 +157,6 @@ impl ExchangeStructHandler {
             // }
         }
     }
-
     // 处理BookTicker信息
     pub fn book_ticker_handle(exchange: ExchangeEnum, res_data: &ResponseData, mul: &Decimal) -> Depth {
         match exchange {
@@ -231,6 +229,18 @@ impl ExchangeStructHandler {
             }
         }
     }
+    // 处理ticker信息
+    pub async fn ticker_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Ticker {
+        match exchange {
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_ticker(res_data).await
+            },
+            _ => {
+                error!("未找到该交易所!trades_handle: {:?}", exchange);
+                panic!("未找到该交易所!trades_handle: {:?}", exchange);
+            }
+        }
+    }
     // 处理账号信息
     pub fn account_info_handle(exchange: ExchangeEnum, res_data: &ResponseData, symbol: &String) -> Account {
         match exchange {

+ 4 - 2
standard/src/gate_swap.rs

@@ -165,13 +165,14 @@ impl Platform for GateSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high_24h"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low_24h"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["lowest_ask"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["highest_bid"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume_24h"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }
@@ -195,13 +196,14 @@ impl Platform for GateSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high_24h"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low_24h"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["lowest_ask"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["highest_bid"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume_24h"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }

+ 4 - 2
standard/src/lib.rs

@@ -416,25 +416,27 @@ impl Order {
 /// - `volume(Decimal)`: 最近成交量
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Ticker {
-    pub time: i64,
+    pub time: Decimal,
     pub high: Decimal,
     pub low: Decimal,
     pub sell: Decimal,
     pub buy: Decimal,
     pub last: Decimal,
     pub volume: Decimal,
+    pub open_interest: Decimal,
 }
 
 impl Ticker {
     pub fn new() -> Ticker {
         Ticker {
-            time: 0,
+            time: Default::default(),
             high: Default::default(),
             low: Default::default(),
             sell: Default::default(),
             buy: Default::default(),
             last: Default::default(),
             volume: Default::default(),
+            open_interest: Default::default(),
         }
     }
 }

+ 116 - 130
strategy/src/avellaneda_stoikov.rs

@@ -48,17 +48,12 @@ pub struct AvellanedaStoikov {
 
     pub is_ready: bool,
     pub prev_trade_time: i64,                                                   // 上次交易时间,也就是t
-    pub cross_time: i64,                                                        // 上次穿過0軸的時間
-    pub cross_time_diff: i64,                                                   // 穿越0軸的diff
-    pub cross_time_diff_avg: i64,                                               // 穿越0軸的diff的平均值
     pub t_diff: Decimal,                                                        // (T-t)
-    pub prev_close_time: i64,                                                   // 上次平倉時間
-    pub prev_prev_close_time: i64,                                              // 上上次平倉時間
 }
 
 impl AvellanedaStoikov {
     // 时间窗口大小(微秒)
-    const MAX_TIME_RANGE_MICROS: i64 = 5 * 60_000_000;
+    const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
     const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000;
     const TRADE_SHORT_RANGE_MICROS: i64 = 20_000_000;
     // const ONE_MILLION: Decimal = dec!(1_000_000);
@@ -99,14 +94,9 @@ impl AvellanedaStoikov {
 
             is_ready: false,
             prev_trade_time: Utc::now().timestamp_micros(),
-            cross_time: 0,
-            cross_time_diff: 0,
-            cross_time_diff_avg: 0,
             t_diff: Default::default(),
             flow_ratio_long: Decimal::ONE,
             level: Default::default(),
-            prev_close_time: Utc::now().timestamp_micros(),
-            prev_prev_close_time: Utc::now().timestamp_micros(),
             flow_ratio_short: Default::default(),
         };
 
@@ -185,16 +175,12 @@ impl AvellanedaStoikov {
         self.prev_trade_time = Utc::now().timestamp_micros();
         self.inventory = (inventory / (min_amount_value / self.mid_price)).round();
 
-        // if self.inventory.is_zero() {
-        //     self.prev_prev_close_time = self.prev_close_time;
-        //     self.prev_close_time = Utc::now().timestamp_millis();
-        // }
         self.update_level().await;
         self.processor().await;
     }
 
     pub fn update_sigma_square(&mut self) {
-        self.sigma_square = self.spread_max * dec!(1.618);
+        self.sigma_square = self.spread_max * dec!(0.5);
         self.sigma_square.rescale(10);
     }
 
@@ -206,7 +192,7 @@ impl AvellanedaStoikov {
         // };
         // self.gamma.rescale(8);
 
-        self.gamma = dec!(1) * Self::IRA;
+        self.gamma = dec!(0.236) * Self::IRA;
     }
 
     pub fn update_kappa(&mut self) {
@@ -233,56 +219,43 @@ impl AvellanedaStoikov {
     }
 
     pub fn update_delta(&mut self) {
-        let pos_edge = self.gamma * self.sigma_square * self.inventory.abs().powd(Decimal::TWO) * self.t_diff;
-
-        self.base_delta = self.sigma_square;
+        if self.gamma != Decimal::ZERO {
+            // let pos_edge = if self.inventory.abs() < dec!(5) {
+            //     self.gamma * self.sigma_square * self.inventory.abs().powd(Decimal::TWO) * self.t_diff
+            // } else {
+            //     Decimal::PI * self.gamma * self.sigma_square * self.inventory.abs().powd(Decimal::TWO) * self.t_diff
+            // };
+            let pos_edge = self.gamma * self.sigma_square * self.inventory.abs().powd(dec!(2)) * self.t_diff;
 
-        self.bid_delta = self.base_delta;
-        self.ask_delta = self.base_delta;
+            self.base_delta = self.gamma * self.sigma_square * self.t_diff / Decimal::TWO + (Decimal::ONE / self.gamma) * (Decimal::ONE + self.gamma / self.kappa).ln();
+            self.ratio_edge = self.flow_ratio_long * self.sigma_square;
 
-        if self.inventory > Decimal::ZERO {
-            self.bid_delta += pos_edge;
-        } else if self.inventory < Decimal::ZERO {
-            self.ask_delta += pos_edge;
-        }
+            self.bid_delta = self.base_delta;
+            self.ask_delta = self.base_delta;
 
-        if self.flow_ratio_long.is_zero() || self.cross_time_diff < 20 {
-            self.ask_delta += self.base_delta;
-            self.bid_delta += self.base_delta;
-
-            return;
-        }
+            if self.inventory > Decimal::ZERO {
+                self.bid_delta += pos_edge;
+            } else if self.inventory < Decimal::ZERO {
+                self.ask_delta += pos_edge;
+            }
 
-        if self.flow_ratio_long < Decimal::ZERO {
-            if self.flow_ratio_short > Decimal::ZERO {
-                self.ask_delta -= self.base_delta * (self.flow_ratio_short.abs() * Decimal::PI);
-                self.bid_delta += self.base_delta;
-            } else if self.flow_ratio_short < Decimal::ZERO && self.inventory < Decimal::ZERO {
-                self.ask_delta += self.base_delta;
-                // self.bid_delta -= self.base_delta * (self.flow_ratio_short.abs() + self.flow_ratio_long.abs());
-                self.bid_delta -= self.base_delta * (self.flow_ratio_short.abs() * dec!(1.5));
-            } else {
-                self.ask_delta += self.base_delta;
-                self.bid_delta += self.base_delta;
+            if self.ratio_edge > Decimal::ZERO {
+                self.ask_delta = self.ask_delta - self.ratio_edge.abs() * (Decimal::TWO - self.t_diff);
+                self.bid_delta = self.bid_delta + self.ratio_edge.abs() * dec!(16);
+            } else if self.ratio_edge < Decimal::ZERO {
+                self.ask_delta = self.ask_delta + self.ratio_edge.abs() * dec!(16);
+                self.bid_delta = self.bid_delta - self.ratio_edge.abs() * (Decimal::TWO - self.t_diff);
             }
-        } else if self.flow_ratio_long > Decimal::ZERO {
-            if self.flow_ratio_short > Decimal::ZERO && self.inventory > Decimal::ZERO {
-                // self.ask_delta -= self.base_delta * (self.flow_ratio_short.abs() + self.flow_ratio_long.abs());
-                self.ask_delta -= self.base_delta * (self.flow_ratio_short.abs() * dec!(1.5));
-                self.bid_delta += self.base_delta;
-            } else if self.flow_ratio_short < Decimal::ZERO {
-                self.ask_delta += self.base_delta;
-                self.bid_delta -= self.base_delta * (self.flow_ratio_short.abs() * Decimal::PI);
-            } else {
-                self.ask_delta += self.base_delta;
-                self.bid_delta += self.base_delta;
+
+            if self.init_delta_plus.is_zero() {
+                self.init_delta_plus = (self.bid_delta + self.ask_delta) / Decimal::TWO
             }
         }
     }
 
     pub fn update_optimal_ask_and_bid(&mut self) {
-        self.optimal_ask_price = max(self.ref_price + self.ask_delta / dec!(2), self.ask_price);
-        self.optimal_bid_price = min(self.ref_price - self.bid_delta / dec!(2), self.bid_price);
+        self.optimal_ask_price = max(self.ref_price + self.ask_delta / Decimal::TWO, self.ask_price);
+        self.optimal_bid_price = min(self.ref_price - self.bid_delta / Decimal::TWO, self.bid_price);
     }
 
     pub fn update_t_diff(&mut self) {
@@ -295,46 +268,35 @@ impl AvellanedaStoikov {
     }
 
     fn calc_flow_ratio(_prev_flow_ratio: &Decimal, min_volume: &Decimal, trades: &mut FixedTimeRangeDeque<Trade>) -> Decimal {
-        // 使用EMA來更新資金流,確保平滑性
-        // let a = Decimal::TWO / dec!(50);
-
-        let mut flow_in_value = Decimal::ZERO;
-        let mut flow_out_value = Decimal::ZERO;
-        for (index, trade_iter) in trades.deque.iter().enumerate() {
-            if index == 0 {
-                continue
-            }
-
-            let prev_trade_iter = trades.deque.get(index - 1).unwrap();
-            let trade = trade_iter;
-            if trade.price > prev_trade_iter.price {
-                flow_in_value += trade.value * (prev_trade_iter.price - trade.price).abs();
-                // flow_in_value += Decimal::ONE;
-            } else if trade.price < prev_trade_iter.price {
-                flow_out_value += trade.value * (prev_trade_iter.price - trade.price).abs();
-                // flow_out_value += Decimal::ONE;
-            } else {
-                // if trade.size > Decimal::ZERO {
-                //     flow_in_value += trade.value;
-                // } else {
-                //     flow_out_value += trade.value;
-                // }
-            }
-
-            // if trade_iter.size > Decimal::ZERO {
-            //     flow_in_value += trade_iter.value;
-            // } else {
-            //     flow_out_value += trade_iter.value;
-            // }
-        }
-
-        if flow_out_value + flow_in_value > *min_volume {
-            // let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
-            // a * now + (Decimal::ONE - a) * prev_flow_ratio
-            (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value)
-        } else {
-            Decimal::ZERO
-        }
+        // let mut flow_in_value = Decimal::ZERO;
+        // let mut flow_out_value = Decimal::ZERO;
+        // for (index, trade_iter) in trades.deque.iter().enumerate() {
+        //     if index == 0 {
+        //         continue
+        //     }
+        //
+        //     let prev_trade_iter = trades.deque.get(index - 1).unwrap();
+        //     let trade = trade_iter;
+        //     if trade.price > prev_trade_iter.price {
+        //         flow_in_value += trade.value * (prev_trade_iter.price - trade.price).abs();
+        //         // flow_in_value += Decimal::ONE;
+        //     } else if trade.price < prev_trade_iter.price {
+        //         flow_out_value += trade.value * (prev_trade_iter.price - trade.price).abs();
+        //         // flow_out_value += Decimal::ONE;
+        //     } else {
+        //         // if trade.size > Decimal::ZERO {
+        //         //     flow_in_value += trade.value;
+        //         // } else {
+        //         //     flow_out_value += trade.value;
+        //         // }
+        //     }
+        //
+        //     // if trade_iter.size > Decimal::ZERO {
+        //     //     flow_in_value += trade_iter.value;
+        //     // } else {
+        //     //     flow_out_value += trade_iter.value;
+        //     // }
+        // }
 
         // if self.trade_vec.deque.len() > 1 {
         //     let prev_trade_iter = self.trade_vec.deque.get(self.trade_vec.deque.len() - 2).unwrap();
@@ -369,42 +331,66 @@ impl AvellanedaStoikov {
         //     }
         // }
 
-        // self.flow_in_value = Decimal::ZERO;
-        // self.flow_out_value = Decimal::ZERO;
-        // for trade_iter in self.trade_vec.deque.iter() {
-        //     if trade_iter.size > Decimal::ZERO {
-        //         self.flow_in_value += trade_iter.value;
-        //     } else {
-        //         self.flow_out_value += trade_iter.value;
-        //     }
-        // }
+        let mut flow_in_value = Decimal::ZERO;
+        let mut flow_out_value = Decimal::ZERO;
+        for trade_iter in trades.deque.iter() {
+            if trade_iter.size > Decimal::ZERO {
+                flow_in_value += trade_iter.value;
+            } else {
+                flow_out_value += trade_iter.value;
+            }
+        }
+
+        // 使用EMA來更新資金流,確保平滑性
+        // let a = Decimal::TWO / dec!(50);
+        if flow_out_value + flow_in_value > *min_volume {
+            // let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
+            // a * now + (Decimal::ONE - a) * prev_flow_ratio
+            (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value)
+        } else {
+            Decimal::ZERO
+        }
     }
 
-    pub fn update_flow_ratio(&mut self) {
-        let prev_flow_ratio_long = self.flow_ratio_long;
-        self.flow_ratio_long = Self::calc_flow_ratio(&self.flow_ratio_long, &dec!(0), &mut self.trade_long_vec);
-        let time = Utc::now().timestamp_millis();
-        // let mut is_cross = false;
-        if (self.flow_ratio_long > Decimal::ZERO && prev_flow_ratio_long <= Decimal::ZERO)
-            || (self.flow_ratio_long < Decimal::ZERO && prev_flow_ratio_long >= Decimal::ZERO) {
-            self.cross_time = time;
-            // is_cross = true;
+    fn calc_flow_ratio_2(_prev_flow_ratio: &Decimal, min_volume: &Decimal, trades: &mut FixedTimeRangeDeque<Trade>) -> Decimal {
+        let mut flow_in_value = Decimal::ZERO;
+        let mut flow_out_value = Decimal::ZERO;
+        for (index, trade_iter) in trades.deque.iter().enumerate() {
+            if index == 0 {
+                continue
+            }
+
+            let prev_trade_iter = trades.deque.get(index - 1).unwrap();
+            let trade = trade_iter;
+            if trade.price > prev_trade_iter.price {
+                flow_in_value += trade.value;
+                // flow_in_value += Decimal::ONE;
+            } else if trade.price < prev_trade_iter.price {
+                flow_out_value += trade.value;
+                // flow_out_value += Decimal::ONE;
+            } else {
+                if trade.size > Decimal::ZERO {
+                    flow_in_value += trade.value;
+                } else {
+                    flow_out_value += trade.value;
+                }
+            }
         }
 
-        if self.cross_time != 0 {
-            // let prev_cross_time_diff = self.cross_time_diff;
-            self.cross_time_diff = (time - self.cross_time) / 1000;
-
-            // if is_cross {
-            //     self.cross_time_diff_avg = if self.cross_time_diff_avg == 0 {
-            //         prev_cross_time_diff
-            //     } else {
-            //         (self.cross_time_diff_avg * 5 + prev_cross_time_diff * 5) / 10
-            //     };
-            // }
+        // 使用EMA來更新資金流,確保平滑性
+        // let a = Decimal::TWO / dec!(50);
+        if flow_out_value + flow_in_value > *min_volume {
+            // let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
+            // a * now + (Decimal::ONE - a) * prev_flow_ratio
+            (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value)
+        } else {
+            Decimal::ZERO
         }
+    }
 
-        self.flow_ratio_short = Self::calc_flow_ratio(&self.flow_ratio_short, &dec!(0), &mut self.trade_short_vec);
+    pub fn update_flow_ratio(&mut self) {
+        self.flow_ratio_long = Self::calc_flow_ratio_2(&self.flow_ratio_long, &dec!(0), &mut self.trade_long_vec);
+        self.flow_ratio_short = Self::calc_flow_ratio(&self.flow_ratio_short, &dec!(0), &mut self.trade_long_vec);
     }
 
     pub fn check_ready(&mut self) {
@@ -449,7 +435,7 @@ impl AvellanedaStoikov {
         self.update_t_diff();
         // info!(?self.t_diff);
         self.update_flow_ratio();
-        // info!(?self.flow_ratio);
+        // info!(?self.flow_ratio_long);
         self.update_sigma_square();
         // info!(?self.sigma_square);
         self.update_gamma();
@@ -459,7 +445,7 @@ impl AvellanedaStoikov {
         self.update_ref_price();
         // info!(?self.ref_price);
         self.update_delta();
-        // info!(?self.delta_ask, ?self.delta_bid);
+        // info!(?self.ask_delta, ?self.bid_delta);
         self.update_optimal_ask_and_bid();
         // info!("=============================================");
 
@@ -485,7 +471,7 @@ impl AvellanedaStoikov {
             inventory: self.inventory,
             sigma_square: self.flow_ratio_long,
             gamma: self.flow_ratio_short,
-            kappa: Decimal::from(self.cross_time_diff),
+            kappa: self.t_diff,
 
             flow_ratio: self.flow_ratio_long,
             ref_price: self.ref_price,

+ 6 - 1
strategy/src/bybit_usdt_swap.rs

@@ -28,7 +28,8 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
         ws.set_subscribe(vec![
             BybitSwapSubscribeType::PuOrderBook1,
-            BybitSwapSubscribeType::PuTrade
+            BybitSwapSubscribeType::PuTrade,
+            BybitSwapSubscribeType::PuTickers
         ]);
 
         // 读取数据
@@ -158,6 +159,10 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
                 on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
             }
         }
+        "tickers" => {
+            let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
+            info!(?ticker)
+        }
         _ => {
             error!("未知推送类型");
             error!(?response);

+ 4 - 3
strategy/src/core.rs

@@ -503,7 +503,7 @@ impl Core {
                         // 更新策略时间
                         self.strategy.local_time = Utc::now().timestamp_millis();
                         // trace_stack.on_before_strategy();
-                        let mut order = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders);
+                        let mut order = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders, &self.local_coin, &self.local_cash);
                         // trace_stack.on_after_strategy();
                         // 记录指令触发信息
                         if order.is_not_empty() {
@@ -637,7 +637,7 @@ impl Core {
             self.strategy.local_time = Utc::now().timestamp_millis();
 
             // 产生交易信号
-            let mut orders = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders);
+            let mut orders = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders, &self.local_coin, &self.local_cash);
             trace_stack.on_after_strategy();
 
             if orders.is_not_empty() {
@@ -745,13 +745,14 @@ impl Core {
             let bp = self.tickers.get(i).unwrap().buy;
             let ap = self.tickers.get(i).unwrap().sell;
             ref_tickers.insert(i.clone(), Ticker {
-                time: 0,
+                time: Decimal::ZERO,
                 high: Default::default(),
                 low: Default::default(),
                 sell: ap,
                 buy: bp,
                 last: Default::default(),
                 volume: Default::default(),
+                open_interest: Default::default(),
             });
         }
         self.ref_price = self.avellaneda_stoikov.get_ref_price(&ref_tickers);

+ 18 - 8
strategy/src/strategy.rs

@@ -826,10 +826,10 @@ impl Strategy {
     pub fn _cancel_open(&self, command: &mut OrderCommand, local_orders: &HashMap<String, OrderInfo>) {
         // debug!(?command);
         // 挂单范围
-        let long_upper = self.open_dist[0];
-        let long_lower = self.open_dist[1];
-        let short_lower = self.open_dist[2];
-        let short_upper = self.open_dist[3];
+        // let long_upper = self.open_dist[0];
+        // let long_lower = self.open_dist[1];
+        // let short_lower = self.open_dist[2];
+        // let short_upper = self.open_dist[3];
 
         for order_client_id in local_orders.keys() {
             let order = local_orders.get(order_client_id).unwrap();
@@ -839,14 +839,16 @@ impl Strategy {
             // 开多订单处理
             if order.side == "kd".to_string() {
                 // 在价格范围内时不处理
-                if (order.price <= long_upper && order.price >= long_lower) || self.local_time - order.local_time <= 100 {
+                // if (order.price <= long_upper && order.price >= long_lower) || self.local_time - order.local_time <= 200 {
+                if self.local_time - order.local_time <= 200 {
                     continue
                 }
                 // debug!(?key, ?order.price, ?long_upper, ?long_lower);
                 command.cancel.insert(key, value);
             } else if order.side == "kk".to_string() { // 开空订单处理
                 // 在价格范围内时不处理
-                if (order.price >= short_lower && order.price <= short_upper) || self.local_time - order.local_time <= 100 {
+                // if (order.price >= short_lower && order.price <= short_upper) || self.local_time - order.local_time <= 200 {
+                if self.local_time - order.local_time <= 200 {
                     continue
                 }
                 // debug!(?key, ?order.price, ?short_lower, ?short_upper);
@@ -1083,7 +1085,15 @@ impl Strategy {
         return command;
     }
 
-    pub fn do_strategy(&mut self, predictor: &mut AvellanedaStoikov, local_orders: &HashMap<String, OrderInfo>) -> OrderCommand {
+    pub fn do_strategy(&mut self, predictor: &mut AvellanedaStoikov, local_orders: &HashMap<String, OrderInfo>, local_coin: &Decimal, local_cash: &Decimal) -> OrderCommand {
+        // 更新当前账户余额
+        self.coin = local_coin.clone();
+        self.cash = local_cash.clone();
+        self.equity = local_cash + local_coin * self.mp;
+        if self.equity > self.max_equity {
+            self.max_equity = self.equity;
+        }
+
         self.ref_ap = predictor.optimal_ask_price;
         self.ref_bp = predictor.optimal_bid_price;
         self.ref_price = predictor.ref_price;
@@ -1100,7 +1110,7 @@ impl Strategy {
         }
 
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+        // self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter