瀏覽代碼

今天完成了数据传递的问题,明天可以开始动预定价格生成算法之类的了。

skyffire 1 年之前
父節點
當前提交
19cba4fb34

+ 9 - 2
standard/src/binance_swap_handle.rs

@@ -30,9 +30,14 @@ use crate::{OrderBook, Record, Trade};
 pub fn format_depth_items(value: Value) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
+        let price = Decimal::from_str(value[0].as_str().unwrap()).unwrap();
+        let size = Decimal::from_str(value[1].as_str().unwrap()).unwrap();
+        let value = price * size;
+
         depth_items.push(OrderBook {
-            price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
-            amount: Decimal::from_str(value[1].as_str().unwrap()).unwrap(),
+            price,
+            size,
+            value,
         })
     }
     return depth_items;
@@ -50,12 +55,14 @@ pub fn format_trade_items(response: &ResponseData) -> Vec<Trade> {
     }
     let price = Decimal::from_str(data["p"].as_str().unwrap().to_string().as_str()).unwrap();
     let symbol = data["s"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+    let value = (price * size).abs();
 
     vec![Trade {
         id,
         time,
         size,
         price,
+        value,
         symbol,
     }]
 }

+ 12 - 7
standard/src/exchange_struct_handler.rs

@@ -12,8 +12,10 @@ pub struct ExchangeStructHandler;
 
 #[allow(dead_code)]
 impl ExchangeStructHandler {
-    // 处理深度信息
-    pub fn order_book_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Depth {
+    // 处理深度信息,关于mul的问题:
+    //     因为部分交易所比较特殊,返回的深度的数量是张数,这里是标准化成U量的形式;
+    //     如果是不需要处理的交易所,传个Decimal::ONE就行了
+    pub fn order_book_handle(exchange: ExchangeEnum, res_data: &ResponseData, mul: &Decimal) -> Depth {
         let depth_asks: Vec<OrderBook>;
         let depth_bids: Vec<OrderBook>;
         let t: Decimal;
@@ -26,8 +28,8 @@ impl ExchangeStructHandler {
                 res_data.data["s"].as_str().unwrap().to_string().replace("USDT", "_USDT")
             }
             ExchangeEnum::GateSwap => {
-                depth_asks = gate_swap_handle::format_depth_items(&res_data.data["asks"]);
-                depth_bids = gate_swap_handle::format_depth_items(&res_data.data["bids"]);
+                depth_asks = gate_swap_handle::format_depth_items(&res_data.data["asks"], mul);
+                depth_bids = gate_swap_handle::format_depth_items(&res_data.data["bids"], mul);
                 t = Decimal::from_str(&res_data.data["t"].to_string()).unwrap();
 
                 res_data.data["contract"].as_str().unwrap().to_string()
@@ -93,11 +95,14 @@ impl ExchangeStructHandler {
             symbol,
         }
     }
-    // 处理成交信息
-    pub fn trades_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Vec<Trade> {
+
+    // 处理成交信息,关于mul的问题:
+    //     因为部分交易所比较特殊,返回的深度的数量是张数,这里是标准化成U量的形式;
+    //     如果是不需要处理的交易所,传个Decimal::ONE就行了
+    pub fn trades_handle(exchange: ExchangeEnum, res_data: &ResponseData, mul: &Decimal) -> Vec<Trade> {
         match exchange {
             ExchangeEnum::GateSwap => {
-                gate_swap_handle::format_trade_items(&res_data)
+                gate_swap_handle::format_trade_items(&res_data, mul)
             }
             // ExchangeEnum::GateSpot => {
             //     gate_spot_handle::format_trade_items(&res_data)

+ 15 - 4
standard/src/gate_spot_handle.rs

@@ -26,19 +26,30 @@ pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
         let info = value.as_array().unwrap();
+
+        let price = Decimal::from_str(info[0].as_str().unwrap()).unwrap();
+        let size = Decimal::from_str(info[1].as_str().unwrap()).unwrap();
+        let value = price * size;
+
         depth_items.push(OrderBook {
-            price: Decimal::from_str(info[0].as_str().unwrap()).unwrap(),
-            amount: Decimal::from_str(info[1].as_str().unwrap()).unwrap(),
+            price,
+            size,
+            value,
         })
     }
     return depth_items;
 }
 pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
+    let size = Decimal::from_str(res_data.data["amount"].as_str().unwrap()).unwrap();
+    let price = Decimal::from_str(res_data.data["price"].as_str().unwrap().to_string().as_str()).unwrap();
+    let value = (size * price).abs();
+
     return vec![Trade {
         id: res_data.data["id"].to_string(),
         time: Decimal::from_str(res_data.data["create_time_ms"].as_str().unwrap()).unwrap().floor(),
-        size: Decimal::from_str(res_data.data["amount"].as_str().unwrap()).unwrap(),
-        price: Decimal::from_str(res_data.data["price"].as_str().unwrap().to_string().as_str()).unwrap(),
+        size,
+        price,
+        value,
         symbol: res_data.data["currency_pair"].as_str().unwrap().to_string(),
     }];
 }

+ 17 - 6
standard/src/gate_swap_handle.rs

@@ -162,27 +162,38 @@ pub fn handle_records(value: &Value) -> Vec<Record> {
     return records
 }
 
-pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
+pub fn format_depth_items(value: &Value, mul: &Decimal) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
+        let price = Decimal::from_str(value["p"].as_str().unwrap()).unwrap();
+        let size = Decimal::from_f64(value["s"].as_f64().unwrap()).unwrap();
+        let value = size * price * mul;
+
         depth_items.push(OrderBook {
-            price: Decimal::from_str(value["p"].as_str().unwrap()).unwrap(),
-            amount: Decimal::from_f64(value["s"].as_f64().unwrap()).unwrap(),
+            price,
+            size,
+            value
         })
     }
     return depth_items;
 }
 
-pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
+pub fn format_trade_items(res_data: &ResponseData, mul: &Decimal) -> Vec<Trade> {
     let result = res_data.data.as_array().unwrap();
     let mut trades = vec![];
 
     for item in result {
+        // 因为gate的量都是张数,所以要进行真实交易量处理
+        let size = Decimal::from_i64(item["size"].as_i64().unwrap()).unwrap();
+        let price = Decimal::from_str(item["price"].as_str().unwrap().to_string().as_str()).unwrap();
+        let value = (size * mul * price).abs();
+
         trades.push(Trade {
             id: item["id"].to_string(),
             time: Decimal::from_i64(item["create_time_ms"].as_i64().unwrap()).unwrap(),
-            size: Decimal::from_i64(item["size"].as_i64().unwrap()).unwrap(),
-            price: Decimal::from_str(item["price"].as_str().unwrap().to_string().as_str()).unwrap(),
+            size,
+            price,
+            value,
             symbol: item["contract"].as_str().unwrap().to_string(),
         })
     }

+ 13 - 8
standard/src/lib.rs

@@ -148,6 +148,7 @@ impl Account {
 /// - `time(Decimal)`: 交易更新时间戳(ms)
 /// - `size(Decimal)`: 交易量,负数是卖方
 /// - `price(Decimal)`: 成交价格
+/// - `value(Decimal)`: 成交价值(计价币对)
 /// - `symbol(String)`: 成交符号
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct Trade {
@@ -155,7 +156,8 @@ pub struct Trade {
     pub time: Decimal,
     pub size: Decimal,
     pub price: Decimal,
-    pub symbol: String
+    pub value: Decimal,
+    pub symbol: String,
 }
 
 /// 特殊压缩结构体(订单流)
@@ -223,8 +225,8 @@ pub struct SpecialDepth {
 
 impl SpecialDepth {
     pub fn new(depth: &Depth) -> SpecialDepth {
-        let bids = depth.bids.iter().map(|ob| vec![ob.price, ob.amount]).collect::<Vec<_>>();
-        let asks = depth.asks.iter().map(|ob| vec![ob.price, ob.amount]).collect::<Vec<_>>();
+        let bids = depth.bids.iter().map(|ob| vec![ob.price, ob.size]).collect::<Vec<_>>();
+        let asks = depth.asks.iter().map(|ob| vec![ob.price, ob.size]).collect::<Vec<_>>();
         SpecialDepth {
             b: bids,
             a: asks,
@@ -248,11 +250,11 @@ impl SimpleDepth {
         let mut total_size = Decimal::ZERO;
 
         for ask in &depth.asks {
-            total_size += ask.price * ask.amount;
+            total_size += ask.price * ask.size;
         }
 
         for bid in &depth.bids {
-            total_size += bid.price * bid.amount;
+            total_size += bid.price * bid.size;
         }
 
         total_size.rescale(2);
@@ -296,18 +298,21 @@ impl SpecialTicker {
 
 /// OrderBook结构体(市场深度单)
 /// - `price(Decimal)`: 价格
-/// - `amount(Decimal)`: 数量
+/// - `size(Decimal)`: 数量
+/// - `value(Decimal)`: 价值(计价币)
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct OrderBook {
     pub price: Decimal,
-    pub amount: Decimal,
+    pub size: Decimal,
+    pub value: Decimal,
 }
 
 impl OrderBook {
     pub fn new() -> OrderBook {
         OrderBook {
             price: Default::default(),
-            amount: Default::default(),
+            size: Default::default(),
+            value: Default::default(),
         }
     }
 }

+ 18 - 5
standard/src/okx_swap_handle.rs

@@ -27,9 +27,14 @@ pub fn handle_records(value: &Value) -> Vec<Record> {
 pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
+        let price = Decimal::from_f64(value[0].as_f64().unwrap()).unwrap();
+        let size = Decimal::from_f64(value[1].as_f64().unwrap()).unwrap();
+        let value = price * size;
+
         depth_items.push(OrderBook {
-            price: Decimal::from_f64(value[0].as_f64().unwrap()).unwrap(),
-            amount: Decimal::from_f64(value[1].as_f64().unwrap()).unwrap(),
+            price,
+            size,
+            value,
         })
     }
     return depth_items;
@@ -41,12 +46,20 @@ pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
 
     for item in result {
         let side = item["side"].as_str().unwrap() == "buy";
-        let size = Decimal::from_str(item["sz"].as_str().unwrap()).unwrap();
+        let size = if side {
+            Decimal::from_str(item["sz"].as_str().unwrap()).unwrap()
+        } else {
+            -Decimal::from_str(item["sz"].as_str().unwrap()).unwrap()
+        };
+        let price = Decimal::from_str(item["px"].as_str().unwrap().to_string().as_str()).unwrap();
+        let value = (size * price).abs();
+
         trades.push(Trade {
             id: item["tradeId"].as_str().unwrap().to_string(),
             time: Decimal::from_str(item["ts"].as_str().unwrap()).unwrap(),
-            size: if side { size } else { -size },
-            price: Decimal::from_str(item["px"].as_str().unwrap().to_string().as_str()).unwrap(),
+            size,
+            price,
+            value,
             symbol: item["instId"].as_str().unwrap().to_string().replace("-", "_"),
         })
     }

+ 0 - 10
strategy/src/clear_core.rs

@@ -295,16 +295,6 @@ impl ClearCore {
         return core_obj;
     }
 
-    pub fn log_ready_status(&mut self, msg: String) {
-        // 隔一会再打印未准备就绪的台词
-        let now_timestamp = Utc::now().timestamp_millis();
-        if now_timestamp - self.prev_log_ready_timestamp > self.log_ready_log_interval {
-            self.prev_log_ready_timestamp = now_timestamp;
-            info!("{}", msg);
-        }
-    }
-
-
     // #[instrument(skip(self, data), level="TRACE")]
     pub async fn update_position(&mut self, data: Vec<Position>) {
         if data.is_empty() {

+ 117 - 237
strategy/src/core.rs

@@ -1,13 +1,13 @@
 use tokio::time::Instant;
 use std::cmp::max;
-use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::collections::{BTreeMap, HashMap};
 use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use chrono::{Utc};
-use rust_decimal::{Decimal, MathematicalOps};
+use rust_decimal::{Decimal};
 use rust_decimal::prelude::{ToPrimitive};
 use rust_decimal_macros::dec;
 use tokio::spawn;
@@ -18,10 +18,8 @@ use tokio::time::sleep;
 use tracing::{error, info, warn};
 use global::cci::CentralControlInfo;
 use global::params::Params;
-use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
-use global::trade::Trade;
-use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
+use standard::{Account, Depth, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker, Trade};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, GateSwap};
 
@@ -116,12 +114,6 @@ pub struct Core {
     pub agg_market: Vec<Decimal>,
     pub ref_price: Vec<Vec<Decimal>>,
     pub predict: Decimal,
-
-    // 波动率指标相关数据(最近100条)
-    pub trade_vec: VecDeque<Trade>,  // 行情数据 此处价格取买一卖一中间价,时间为交易所的数据创建时间
-    pub sigma_vec: VecDeque<Decimal>,  // 波动率记录
-    pub is_sigma_abnormal: bool,  // 是否sigma反常
-    pub is_sigma_allow_open: bool, // 是否允许开单
 }
 
 impl Core {
@@ -258,10 +250,6 @@ impl Core {
             agg_market: vec![],
             ref_price: vec![],
             predict: Default::default(),
-            trade_vec: VecDeque::with_capacity(100),
-            sigma_vec: VecDeque::with_capacity(100),
-            is_sigma_abnormal: false,
-            is_sigma_allow_open: true,
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -389,10 +377,12 @@ impl Core {
                         filled_order.side = side.clone();
 
                         info!("移除本地订单:{:?}, local_by_orders: {:?}", filled_order, self.local_position_by_orders);
-                        if self.exchange.contains("spot") { // 如果是现货交易 还需要修改equity
+                        // 如果是现货交易 还需要修改equity
+                        if self.exchange.contains("spot") {
                             // 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
                             let fee = data.fee;
-                            if side == "kd" { // buy  开多
+                            // buy  开多
+                            if side == "kd" {
                                 self.local_buy_amount += filled - fee;
                                 self.local_buy_value += (filled - fee) * filled_price;
                                 let new_long_pos = self.local_position_by_orders.long_pos + filled - fee;
@@ -406,7 +396,9 @@ impl Core {
                                 }
                                 self.local_cash -= filled * filled_price;
                                 self.local_coin = filled - fee;
-                            } else if side == "pd" { // sell 平多
+
+                            // sell 平多
+                            } else if side == "pd" {
                                 self.local_sell_amount += filled;
                                 self.local_sell_value += filled * filled_price;
                                 self.local_profit += filled * (filled_price - self.local_position_by_orders.long_avg);
@@ -419,7 +411,9 @@ impl Core {
                                 }
                                 self.local_cash += filled * filled_price - fee;
                                 self.local_coin -= filled;
-                            } else if side == "pk" { // buy 平空
+
+                            // buy 平空
+                            } else if side == "pk" {
                                 self.local_buy_amount += filled - fee;
                                 self.local_buy_value += (filled - fee) * filled_price;
                                 self.local_profit += filled * (self.local_position_by_orders.short_avg - filled_price);
@@ -432,7 +426,9 @@ impl Core {
                                 }
                                 self.local_cash -= filled * filled_price;
                                 self.local_coin += filled - fee;
-                            } else if side == "kk" { // sell 开空
+
+                            // sell 开空
+                            } else if side == "kk" {
                                 self.local_sell_amount += filled;
                                 self.local_sell_value += filled * filled_price;
                                 let new_short_pos = self.local_position_by_orders.short_pos + filled;
@@ -449,9 +445,11 @@ impl Core {
                             } else {
                                 info!("错误的仓位方向{}", side);
                             }
-                        } else { // 合约订单流仓位计算
-                            let mut this_order_profit = Decimal::ZERO;
-                            if side == "kd" { // buy 开多
+
+                        // 合约订单流仓位计算
+                        } else {
+                            // buy 开多
+                            if side == "kd" {
                                 self.local_buy_amount += filled;
                                 self.local_buy_value += filled * filled_price;
                                 let new_long_pos = self.local_position_by_orders.long_pos + filled;
@@ -463,7 +461,9 @@ impl Core {
                                         self.local_position_by_orders.long_avg + filled * filled_price) / new_long_pos;
                                     self.local_position_by_orders.long_pos = self.local_position_by_orders.long_pos + filled;
                                 }
-                            } else if side == "kk" { // sell 开空
+
+                            // sell 开空
+                            } else if side == "kk" {
                                 self.local_sell_amount += filled;
                                 self.local_sell_value += filled * filled_price;
                                 let new_short_pos = self.local_position_by_orders.short_pos + filled;
@@ -474,20 +474,20 @@ impl Core {
                                     self.local_position_by_orders.short_avg = (self.local_position_by_orders.short_pos * self.local_position_by_orders.short_avg + filled * filled_price) / new_short_pos;
                                     self.local_position_by_orders.short_pos = self.local_position_by_orders.short_pos + filled;
                                 }
-                            } else if side == "pd" { // sell 平多
+
+                            // sell 平多
+                            } else if side == "pd" {
                                 self.local_sell_amount += filled;
                                 self.local_sell_value += filled * filled_price;
-                                this_order_profit = filled * (filled_price - self.local_position_by_orders.long_avg);
-                                self.local_profit += this_order_profit;
                                 self.local_position_by_orders.long_pos = self.local_position_by_orders.long_pos - filled;
                                 if self.local_position_by_orders.long_pos == Decimal::ZERO {
                                     self.local_position_by_orders.long_avg = Decimal::ZERO;
                                 }
-                            } else if side == "pk" { // buy 平空
+
+                            // buy 平空
+                            } else if side == "pk" {
                                 self.local_buy_amount += filled;
                                 self.local_buy_value += filled * filled_price;
-                                this_order_profit = filled * (self.local_position_by_orders.short_avg - filled_price);
-                                self.local_profit += this_order_profit;
                                 self.local_position_by_orders.short_pos = self.local_position_by_orders.short_pos - filled;
                                 if self.local_position_by_orders.short_pos == Decimal::ZERO {
                                     self.local_position_by_orders.short_avg = Decimal::ZERO;
@@ -499,14 +499,6 @@ impl Core {
                             if data.fee > Decimal::ZERO {
                                 self.local_profit -= data.fee;
                             }
-                            // 订单亏损 并且 波动率异常 不允许开单
-                            if this_order_profit < Decimal::ZERO && self.is_sigma_abnormal {
-                                self.is_sigma_allow_open = false;
-                                info!("交易触发亏损,不允许开单!");
-                                info!("sigma_vec:{:?}" , self.sigma_vec);
-                            } else {
-                                self.is_sigma_allow_open = true;
-                            }
                         }
                         // info!("成交单耗时数据:{}", time_record.to_string());
                         info!("更新推算仓位 {:?}", self.local_position_by_orders);
@@ -532,7 +524,6 @@ impl Core {
                                                               &self.local_coin,
                                                               &self.ref_price,
                                                               &self.predict,
-                                                              &self.is_sigma_allow_open,
                                                               &trace_stack.ins);
                         // trace_stack.on_after_strategy();
                         // 记录指令触发信息
@@ -594,37 +585,38 @@ impl Core {
     // #[instrument(skip(self), level="TRACE")]
     pub fn check_ready(&mut self) {
         // 检查 ticker 行情
-        for i in &self.ref_name {
-            if self.tickers.is_empty() || !self.tickers.contains_key(i) {
-                self.log_ready_status(format!("529参考盘口ticker未准备好: {:?}", self.tickers));
-                return;
-            } else {
-                if self.tickers.get(i).unwrap().buy == dec!(0) || self.tickers.get(i).unwrap().sell == dec!(0) {
-                    self.log_ready_status(format!("533参考盘口ticker未准备好: {:?}", self.tickers));
-                    return;
-                }
-            }
-        }
-        if self.tickers.contains_key(&self.trade_name) {
-            if self.tickers.get(&self.trade_name).unwrap().buy == dec!(0) || self.tickers.get(&self.trade_name).unwrap().sell == dec!(0) {
-                self.log_ready_status(format!("540交易盘口ticker未准备好: {:?}", self.tickers));
-                return;
-            }
-        } else {
-            self.log_ready_status(format!("544交易盘口ticker未准备好: {:?}", self.tickers));
-            return;
-        }
-        // 检查 market 行情
-        self.agg_market = self.get_all_market_data();
-        if self.agg_market.len() != LENGTH * (1usize + self.ref_num as usize) {
-            self.log_ready_status(format!("550聚合行情未准备好: market长度:{}, 检验数: {}", self.agg_market.len(), LENGTH * (1usize + self.ref_num as usize)));
-            return;
-        } else {
-            // 如果准备就绪,则可以开始交易
-            info!("----------------------------------聚合行情准备就绪,可以开始交易---------------------------------");
-            self.predictor.market_info_handler(&self.agg_market);
-            self.ready = 1;
-        }
+        // for i in &self.ref_name {
+        //     if self.tickers.is_empty() || !self.tickers.contains_key(i) {
+        //         self.log_ready_status(format!("529参考盘口ticker未准备好: {:?}", self.tickers));
+        //         return;
+        //     } else {
+        //         if self.tickers.get(i).unwrap().buy == dec!(0) || self.tickers.get(i).unwrap().sell == dec!(0) {
+        //             self.log_ready_status(format!("533参考盘口ticker未准备好: {:?}", self.tickers));
+        //             return;
+        //         }
+        //     }
+        // }
+        // if self.tickers.contains_key(&self.trade_name) {
+        //     if self.tickers.get(&self.trade_name).unwrap().buy == dec!(0) || self.tickers.get(&self.trade_name).unwrap().sell == dec!(0) {
+        //         self.log_ready_status(format!("540交易盘口ticker未准备好: {:?}", self.tickers));
+        //         return;
+        //     }
+        // } else {
+        //     self.log_ready_status(format!("544交易盘口ticker未准备好: {:?}", self.tickers));
+        //     return;
+        // }
+        // // 检查 market 行情
+        // self.agg_market = self.get_all_market_data();
+        // if self.agg_market.len() != LENGTH * (1usize + self.ref_num as usize) {
+        //     self.log_ready_status(format!("550聚合行情未准备好: market长度:{}, 检验数: {}", self.agg_market.len(), LENGTH * (1usize + self.ref_num as usize)));
+        //     return;
+        // } else {
+        //     // 如果准备就绪,则可以开始交易
+        //     info!("----------------------------------聚合行情准备就绪,可以开始交易---------------------------------");
+        //     self.predictor.market_info_handler(&self.agg_market);
+        //     self.ready = 1;
+        // }
+        self.ready = 1;
     }
 
     // #[instrument(skip(self, msg), level="TRACE")]
@@ -637,9 +629,13 @@ impl Core {
         }
     }
 
+    pub async fn on_trade(&mut self, trade: &Trade, _name_ref: &String, _trace_stack: &mut TraceStack) {
+        self.predictor.on_trade(trade);
+    }
+
     // #[instrument(skip(self, depth, name_ref, trace_stack), level="TRACE")]
-    pub async fn on_depth_update(&mut self, depth: &Vec<Decimal>, name_ref: &String, trace_stack: &mut TraceStack) {
-        // 要从回调传入的深度信息中获取data.name
+    pub async fn on_depth(&mut self, depth: &Depth, name_ref: &String, trace_stack: &mut TraceStack) {
+        // ================================ 刷新更新间隔 =========================================
         let now_time = Utc::now().timestamp_millis();
         if self.market_update_time.contains_key(name_ref) && *self.market_update_time.get(name_ref).unwrap() != 0i64 {
             let interval = Decimal::from(now_time - self.market_update_time.get(name_ref).unwrap());
@@ -652,82 +648,47 @@ impl Core {
         }
         self.market_update_time.insert(name_ref.clone(), now_time);
 
-        // 判断是否需要触发on_depth
-        // 是否是交易盘口
-        if *name_ref == self.trade_name {
-            // 允许交易
-            if self.mode_signal == 0 && self.ready == 1 {
-                self.on_agg_market();
-            }
-        } else if *name_ref == self.ref_name[0] { // 判断是否为当前跟踪的盘口
-            // 写入行情数据
-            let ticker = self.tickers.get(name_ref).unwrap();
-            if self.trade_vec.len() == 100 {
-                self.trade_vec.pop_front();
-            }
-            self.trade_vec.push_back(Trade::new_by_ticker(ticker.mid_price.clone(), ticker.create_at/1000));
-            // 更新波动率
-            if self.trade_vec.len() > 99 {
-                self.calc_sigma();
-                // 波动率正常,但还是不开单信号,允许开单
-                if !self.is_sigma_abnormal && !self.is_sigma_allow_open {
-                    self.is_sigma_allow_open = true;
-                }
-            }
-            // 判断是否需要触发ontick 对行情进行过滤
-            // 过滤条件 价格变化很大 时间间隔很长
-            let mut flag = 0;
-            let bid_price_rate = (depth[BID_PRICE_INDEX] - self.depths.get(name_ref).unwrap()[BID_PRICE_INDEX]).abs() / depth[BID_PRICE_INDEX];
-            let ask_price_rate = (depth[ASK_PRICE_INDEX] - self.depths.get(name_ref).unwrap()[ASK_PRICE_INDEX]).abs() / depth[ASK_PRICE_INDEX];
-            let rate = dec!(0.0002);
-            if bid_price_rate > rate || ask_price_rate > rate || Utc::now().timestamp_millis() - self.on_tick_event_time > 50 {
-                // 允许交易
-                flag = 1;
-                // 更新ontick触发时间记录
-                self.on_tick_event_time = Utc::now().timestamp_millis();
-            }
-            // 允许交易
-            if self.mode_signal == 0 && self.ready == 1 && flag == 1 {
-                // 更新交易数据
-                self.update_trade_msg();
-                // 触发事件撤单逻辑
-                // 更新策略时间
-                self.strategy.local_time = Utc::now().timestamp_millis();
-
-                // 产生交易信号
-                let mut orders = self.strategy.on_tick(&self.local_orders,
-                                                       &self.local_position_by_orders,
-                                                       &self.agg_market,
-                                                       &self.local_cash,
-                                                       &self.local_coin,
-                                                       &self.ref_price,
-                                                       &self.predict,
-                                                       &self.is_sigma_allow_open,
-                                                       &trace_stack.ins);
-                trace_stack.on_after_strategy();
-
-                if orders.is_not_empty() {
-                    let mut platform_rest_fb = self.platform_rest.clone_box();
-                    // 先更新本地记录再发单。
-                    self._update_local_orders(&mut orders);
-                    // info!("订单指令:{:?}", orders);
-                    let mut ts = trace_stack.clone();
-                    spawn(async move {
-                        platform_rest_fb.command_order(&mut orders, &mut ts).await;
-                    });
-
-                    // 更新中控账户相关信息
-                    {
-                        let mut now_balance = self.strategy.equity / self.used_pct;
-                        now_balance.rescale(4);
-
-                        let mut cci = self.cci_arc.lock().await;
-                        cci.now_balance = now_balance;
-                    }
+        // ================================ 在系统已经准备就绪的情况下,更新相关参数 =========================================
+        if self.mode_signal == 0 && self.ready == 1 {
+            // 更新预定价格
+            self.predictor.on_depth(depth);
+            // 触发事件撤单逻辑
+            // 更新策略时间
+            self.strategy.local_time = Utc::now().timestamp_millis();
+
+            // 产生交易信号
+            let mut orders = self.strategy.on_tick(&self.local_orders,
+                                                   &self.local_position_by_orders,
+                                                   &self.agg_market,
+                                                   &self.local_cash,
+                                                   &self.local_coin,
+                                                   &self.ref_price,
+                                                   &self.predict,
+                                                   &trace_stack.ins);
+            trace_stack.on_after_strategy();
+
+            if orders.is_not_empty() {
+                let mut platform_rest_fb = self.platform_rest.clone_box();
+                // 先更新本地记录再发单。
+                self._update_local_orders(&mut orders);
+                // info!("订单指令:{:?}", orders);
+                let mut ts = trace_stack.clone();
+                spawn(async move {
+                    platform_rest_fb.command_order(&mut orders, &mut ts).await;
+                });
+
+                // 更新账户余额
+                {
+                    let mut now_balance = self.strategy.equity / self.used_pct;
+                    now_balance.rescale(4);
+
+                    let mut cci = self.cci_arc.lock().await;
+                    cci.now_balance = now_balance;
                 }
             }
         }
 
+        // 同步更新余额数据到中控的信息
         {
             let mut unrealized_pn_l = self.local_profit;
             unrealized_pn_l.rescale(4);
@@ -741,87 +702,6 @@ impl Core {
         }
     }
 
-    pub fn calc_sigma(&mut self) {
-        for (index, trade) in self.trade_vec.iter().enumerate() {
-            if index == 0 {
-                continue
-            }
-
-            // 计算过去至多100条数据的sigma值 sigma^2 = (1 / (tn-t0))*sum((S(tk) - S(tk-1)) ^ 2)
-            let mut sigma_index = index - 1;
-            let t_last = Decimal::from_str(&format!("{}", trade.time)).unwrap();
-
-            let mut _t_first = Decimal::from_str(&format!("{}", trade.time)).unwrap();
-            // 右值
-            let mut total_right = Decimal::ZERO;
-            loop {
-                let flag_trade = self.trade_vec.get(sigma_index).unwrap();
-                let next_trade = self.trade_vec.get(sigma_index + 1).unwrap();
-
-                // 下标合法性判断
-                if sigma_index == 0 || sigma_index + 100 <= index {
-                    _t_first = Decimal::from_str(&format!("{}", flag_trade.time)).unwrap();
-                    break;
-                }
-
-                // 计算差值
-                let diff = Decimal::ONE - flag_trade.price / next_trade.price;
-                total_right += diff * diff;
-
-                sigma_index = sigma_index - 1;
-            }
-            let sigma_square = if _t_first == t_last {
-                let time_diff = Decimal::ONE;
-                (Decimal::ONE / time_diff) * total_right
-            } else {
-                let time_diff = (t_last - _t_first) / Decimal::ONE_THOUSAND;
-                (Decimal::ONE / time_diff) * total_right
-            };
-            let mut sigma = sigma_square.sqrt().unwrap();
-            sigma.rescale(6);
-            if self.sigma_vec.len() == 100 {
-                self.sigma_vec.pop_front();
-            }
-            self.sigma_vec.push_back(sigma);
-        }
-        if self.sigma_vec.len() > 99 {
-            let sigma = match self.sigma_vec.back() {
-                Some(&value) => value,
-                None => Decimal::TEN,
-            };
-
-            // 计算过去至多100个sigma值的平均值
-            let sigma_ma = if self.sigma_vec.len() > 0 {
-                let mut sigma_ma_index = self.sigma_vec.len();
-                let mut sigma_total = Decimal::ZERO;
-                let mut sigma_count = Decimal::ZERO;
-                loop {
-                    if sigma_ma_index == 0 || sigma_ma_index + 99 < self.sigma_vec.len() {
-                        break
-                    }
-                    // 步进
-                    sigma_ma_index -= 1;
-                    // 计算
-                    sigma_total += self.sigma_vec[sigma_ma_index];
-                    sigma_count += Decimal::ONE;
-                }
-                let mut sigma_ma = sigma_total / sigma_count;
-                sigma_ma.rescale(6);
-
-                sigma_ma
-            } else {
-                sigma
-            };
-            // sigma值大于平均值定义为波动率异常
-            if sigma > sigma_ma {
-                self.is_sigma_abnormal = true;
-                // info!("sigma: {}, sigma_ma: {}, sigma_vec: {:?}", sigma, sigma_ma, self.sigma_vec);
-            } else {
-                self.is_sigma_abnormal = false;
-            }
-        }
-    }
-
     // #[instrument(skip(self, data), level="TRACE")]
     pub async fn update_position(&mut self, data: Vec<Position>) {
         if data.is_empty() {
@@ -867,18 +747,18 @@ impl Core {
     }
 
     // #[instrument(skip(self), level="TRACE")]
-    pub fn on_agg_market(&mut self) {
-        /* 处理聚合行情
-           1. 获取聚合行情
-           2. 更新预测器
-           3. 触发tick回测
-         */
-        // 更新聚合市场数据
-        // 更新聚合市场信息
-        self.agg_market = self.get_all_market_data();
-        // 更新预测器
-        self.predictor.market_info_handler(&self.agg_market);
-    }
+    // pub fn on_agg_market(&mut self) {
+    //     /* 处理聚合行情
+    //        1. 获取聚合行情
+    //        2. 更新预测器
+    //        3. 触发tick回测
+    //      */
+    //     // 更新聚合市场数据
+    //     // 更新聚合市场信息
+    //     self.agg_market = self.get_all_market_data();
+    //     // 更新预测器
+    //     self.predictor.market_info_handler(&self.agg_market);
+    // }
 
     // #[instrument(skip(self), level="TRACE")]
     pub fn update_trade_msg(&mut self) {

+ 32 - 20
strategy/src/exchange_disguise.rs

@@ -1,11 +1,9 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
-use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
-use standard::SpecialDepth;
-
+use standard::{Depth, Trade};
 use crate::binance_usdt_swap::reference_binance_swap_run;
 use crate::gate_usdt_swap::gate_swap_run;
 use crate::core::Core;
@@ -101,25 +99,39 @@ pub async fn run_reference_exchange(is_shutdown_arc: Arc<AtomicBool>,
     }
 }
 
-pub async fn on_special_depth(_core_arc: Arc<Mutex<Core>>,
-                              update_flag_u: &mut Decimal,
-                              _label: &String,
-                              trace_stack: &mut TraceStack,
-                              special_depth: &SpecialDepth) {
-    if special_depth.t > *update_flag_u {
-        // let mut core = core_arc.lock().await;
-        trace_stack.on_after_unlock_core();
+// pub async fn on_special_depth(_core_arc: Arc<Mutex<Core>>,
+//                               update_flag_u: &mut Decimal,
+//                               _label: &String,
+//                               trace_stack: &mut TraceStack,
+//                               special_depth: &SpecialDepth) {
+//     if special_depth.t > *update_flag_u {
+//         // let mut core = core_arc.lock().await;
+//         trace_stack.on_after_unlock_core();
+//
+//         // core.tickers.insert(label.clone(), special_depth.ticker.clone());
+//         // core.depths.insert(label.clone(), special_depth.depth.clone());
+//         //
+//         // // 触发depth更新
+//         // core.on_depth_update(&(special_depth.depth), &label, trace_stack).await;
+//         //
+//         // core.local_depths.insert(special_depth.name.clone(), special_depth.depth.clone());
+//
+//         *update_flag_u = special_depth.t;
+//     }
+// }
 
-        // core.tickers.insert(label.clone(), special_depth.ticker.clone());
-        // core.depths.insert(label.clone(), special_depth.depth.clone());
-        //
-        // // 触发depth更新
-        // core.on_depth_update(&(special_depth.depth), &label, trace_stack).await;
-        //
-        // core.local_depths.insert(special_depth.name.clone(), special_depth.depth.clone());
+pub async fn on_depth(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &mut TraceStack, depth: &Depth) {
+    let mut core = core_arc.lock().await;
+    trace_stack.on_after_unlock_core();
 
-        *update_flag_u = special_depth.t;
-    }
+    core.on_depth(depth, &label, trace_stack).await;
+}
+
+pub async fn on_trade(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &mut TraceStack, trade: &Trade) {
+    let mut core = core_arc.lock().await;
+    trace_stack.on_after_unlock_core();
+
+    core.on_trade(trade, &label, trace_stack).await;
 }
 
 pub async fn on_order() {}

+ 21 - 31
strategy/src/gate_usdt_swap.rs

@@ -9,7 +9,10 @@ use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::{TraceStack};
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::core::Core;
+use crate::exchange_disguise::{on_depth, on_trade};
 
 // 1交易、0参考 gate 合约 启动
 pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
@@ -62,7 +65,6 @@ pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         }
 
         // 读取数据
-        let mut update_flag_u = Decimal::ZERO;
         let core_arc_clone = Arc::clone(&core_arc);
         let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
         let run_symbol = symbols.clone()[0].clone();
@@ -75,7 +77,6 @@ pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 
             async move {
                 on_data(core_arc_cc,
-                        &mut update_flag_u,
                         &mul,
                         &rs,
                         data,
@@ -89,22 +90,30 @@ pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
     });
 }
 
-async fn on_data(_core_arc_clone: Arc<Mutex<Core>>,
-                 _update_flag_u: &mut Decimal,
-                 _multiplier: &Decimal,
+async fn on_data(core_arc: Arc<Mutex<Core>>,
+                 multiplier: &Decimal,
                  _run_symbol: &String,
                  response: ResponseData) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
     match response.channel.as_str() {
-        // "futures.order_book" => {
-        //     trace_stack.set_source("gate_usdt_swap.order_book".to_string());
-        //     let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &response);
-        //     trace_stack.on_after_format();
-        //
-        //     on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
-        // }
+        "futures.order_book" => {
+            trace_stack.set_source("gate_usdt_swap.order_book".to_string());
+            let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response, multiplier);
+            trace_stack.on_after_format();
+
+            on_depth(core_arc, &response.label, &mut trace_stack, &depth).await;
+        }
+        "futures.trades" => {
+            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response, multiplier);
+
+            for trade in trades.iter_mut() {
+                let core_arc_clone = core_arc.clone();
+
+                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
+            }
+        }
         // "futures.balances" => {
         //     let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, &response, run_symbol);
         //     let mut core = core_arc_clone.lock().await;
@@ -136,25 +145,6 @@ async fn on_data(_core_arc_clone: Arc<Mutex<Core>>,
         //     let mut core = core_arc_clone.lock().await;
         //     core.update_position(positions).await;
         // }
-        // "futures.trades" => {
-        //     // let mut core = core_arc_clone.lock().await;
-        //     // let str = data.label.clone();
-        //     // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
-        //     //     *max_buy = Decimal::ZERO;
-        //     //     *min_sell = Decimal::ZERO;
-        //     //     core.is_update.remove(str.as_str());
-        //     // }
-        //     // let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-        //     // for trade in trades {
-        //     //     if trade.price > *max_buy || *max_buy == Decimal::ZERO{
-        //     //         *max_buy = trade.price
-        //     //     }
-        //     //     if trade.price < *min_sell || *min_sell == Decimal::ZERO{
-        //     //         *min_sell = trade.price
-        //     //     }
-        //     // }
-        //     // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-        // }
         _ => {
             error!("未知推送类型");
             error!(?response);

+ 10 - 153
strategy/src/predictor.rs

@@ -1,8 +1,8 @@
 use std::collections::BTreeMap;
 use rust_decimal::prelude::*;
 use rust_decimal_macros::dec;
-use standard::Ticker;
-use global::public_params;
+use tracing::info;
+use standard::{Depth, Ticker, Trade};
 
 #[derive(Debug)]
 pub struct Predictor {
@@ -46,87 +46,20 @@ impl Predictor {
         self
     }
 
-    // 计算任务,python里写作processer,是个错误的单词
-    // #[instrument(skip(self), level="TRACE")]
-    fn processor(&mut self) {
-        let last_market_info = self.market_info_list.last().unwrap();
-
-        // 更新mid_price
-        let bid_price = last_market_info[public_params::BID_PRICE_INDEX];
-        let ask_price = last_market_info[public_params::ASK_PRICE_INDEX];
-        let mid_price = (bid_price + ask_price) * dec!(0.5);
-        self.mid_price_list.push(mid_price);
-
-        // 更新参考ref_mid_price
-        let mut ref_mid_price_per_exchange = vec![];
-        for ref_index in 0..self.ref_exchange_length {
-            let ref_bid_price = last_market_info[public_params::LENGTH*(1+ref_index)+public_params::BID_PRICE_INDEX];
-            let ref_ask_price = last_market_info[public_params::LENGTH*(1+ref_index)+public_params::ASK_PRICE_INDEX];
-            let ref_mid_price = (ref_bid_price + ref_ask_price) * dec!(0.5);
-            // 依照交易所次序添加到ref_mid_price_per_exchange中
-            ref_mid_price_per_exchange.push(ref_mid_price);
-        }
-        self.ref_mid_price_per_exchange_per_frame.push(ref_mid_price_per_exchange);
-
-        // 价差更新
-        (*self).update_avg_spread()
+    pub fn on_depth(&mut self, depth: &Depth) {
+        info!(?depth);
+        self.processor();
     }
 
-    // 更新平均价差,_update_avg_spread
-    // #[instrument(skip(self), level="TRACE")]
-    fn update_avg_spread(&mut self) {
-        let last_ref_mid_price_per_exchange = self.ref_mid_price_per_exchange_per_frame.last().unwrap();
-        let mid_price_last = self.mid_price_list.last().unwrap();
-
-        for ref_index in 0..self.ref_exchange_length {
-            let bias = last_ref_mid_price_per_exchange[ref_index] * self.alpha[ref_index] - mid_price_last;
-
-            let mut gamma = self.gamma;
-            // 如果程序刚刚启动,gamma值不能太大
-            if self.loop_count < 100 {
-                gamma = dec!(0.9);
-            }
-
-            // 检测是否初始化
-            if dec!(0).eq(&self.avg_spread_list[ref_index]) {
-                self.avg_spread_list[ref_index] = bias;
-            } else {
-                self.avg_spread_list[ref_index] = self.avg_spread_list[ref_index] * gamma + bias*(dec!(1)-gamma);
-            }
-        }
+    pub fn on_trade(&mut self, trade: &Trade) {
+        info!(?trade);
+        self.processor();
     }
 
-    // 长度限定
+    // 计算任务,python里写作processer,是个错误的单词
     // #[instrument(skip(self), level="TRACE")]
-    fn check_length(&mut self) {
-        // 市场汇总信息长度限定
-        if self.market_info_list.len() > self.data_length_max {
-            self.market_info_list.remove(0);
-        }
-        // 交易交易所的mid_price长度限定
-        if self.mid_price_list.len() > self.data_length_max {
-            self.mid_price_list.remove(0);
-        }
-        // 参考交易所的长度限定
-        if self.ref_mid_price_per_exchange_per_frame.len() > self.data_length_max {
-            self.ref_mid_price_per_exchange_per_frame.remove(0);
-        }
-    }
-
-    // 市场信息处理器,也是python里的onTime方法
-    // #[instrument(skip(self, new_market_info), level="TRACE")]
-    pub fn market_info_handler(&mut self, new_market_info: &Vec<Decimal>) {
-        // 空数据不处理
-        if new_market_info.len() == 0 {
-            return;
-        }
+    fn processor(&mut self) {
 
-        if self.loop_count < i64::MAX {
-            self.loop_count += 1;
-        }
-        self.market_info_list.push(new_market_info.clone());
-        (*self).processor();
-        (*self).check_length();
     }
 
     // 获取预定价格, 也就是python的Get_ref函数
@@ -149,79 +82,3 @@ impl Predictor {
         return ref_price_list;
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use std::collections::BTreeMap;
-    use std::io;
-    use std::io::Write;
-    use rust_decimal_macros::dec;
-    use standard::Ticker;
-    use crate::predictor::Predictor;
-
-    #[test]
-    fn predictor_build_test() {
-        let mut stdout = io::stdout();
-
-        let predictor1 = Predictor::new(2)
-            .alpha(vec![dec!(0.99); 100])
-            .gamma(dec!(0.8));
-        writeln!(stdout, "predictor1:").unwrap();
-        writeln!(stdout, "{:?}", predictor1).unwrap();
-        writeln!(stdout, "").unwrap();
-
-        let predictor2 = Predictor::new(2);
-        writeln!(stdout, "predictor2:").unwrap();
-        writeln!(stdout, "{:?}", predictor2).unwrap();
-        writeln!(stdout, "").unwrap();
-    }
-
-    #[test]
-    fn market_info_handler_test() {
-        let mut predictor = Predictor::new(1);
-        let market_info_0 = vec![dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79)];
-        predictor.market_info_handler(&market_info_0);
-        let market_info_1 = vec![dec!(0.98), dec!(0.99), dec!(0.56), dec!(0.49), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79)];
-        predictor.market_info_handler(&market_info_1);
-    }
-
-    #[test]
-    fn get_ref_price_test() {
-        let mut predictor = Predictor::new(1)
-            .alpha(vec![dec!(0.99); 100])
-            .gamma(dec!(0.8));
-
-        //
-        let mut ref_ticker_map: BTreeMap<String, Ticker> = BTreeMap::new();
-        ref_ticker_map.insert("binance".to_string(), Ticker{
-            time: 0,
-            high: Default::default(),
-            low: Default::default(),
-            sell: dec!(0.93),
-            buy: dec!(0.92),
-            last: Default::default(),
-            volume: Default::default(),
-        });
-        println!("before market info: {:?}", predictor.get_ref_price(&ref_ticker_map));
-
-        let mut market_info = vec![];
-        market_info = vec![dec!(0.99), dec!(1.0), dec!(0.991), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79)];
-        predictor.market_info_handler(&market_info);
-        println!("market info 0: {:?}", predictor.get_ref_price(&ref_ticker_map));
-        market_info = vec![dec!(0.98), dec!(0.99), dec!(0.981), dec!(0.49), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
-        println!("market info 1: {:?}", predictor.get_ref_price(&ref_ticker_map));
-        market_info = vec![dec!(0.93), dec!(1.0), dec!(0.931), dec!(0.79), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
-        println!("market info 2: {:?}", predictor.get_ref_price(&ref_ticker_map));
-        market_info = vec![dec!(0.98), dec!(0.49), dec!(0.981), dec!(0.49), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
-        println!("market info 3: {:?}", predictor.get_ref_price(&ref_ticker_map));
-        market_info = vec![dec!(0.99), dec!(1.0), dec!(0.991), dec!(0.69), dec!(0.99), dec!(1.0), dec!(0.89), dec!(0.79), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
-        println!("market info 4: {:?}", predictor.get_ref_price(&ref_ticker_map));
-        market_info = vec![dec!(0.98), dec!(0.969), dec!(0.981), dec!(0.49), dec!(0.99), dec!(1.0), dec!(1.0), dec!(1.0), dec!(0.89)];
-        predictor.market_info_handler(&market_info);
-        println!("market info 5: {:?}", predictor.get_ref_price(&ref_ticker_map));
-    }
-}

+ 106 - 115
strategy/src/strategy.rs

@@ -1,7 +1,6 @@
-use std::cmp::{max, min};
+use std::cmp::{min};
 use std::collections::HashMap;
 use std::ops::{Div, Mul};
-// use std::str::FromStr;
 use chrono::Utc;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
@@ -9,8 +8,6 @@ use rust_decimal_macros::dec;
 use crate::model::{LocalPosition, OrderInfo};
 use crate::utils;
 use tracing::{info, error, warn};
-// use reqwest::{Client};
-// use tokio::spawn;
 use tokio::time::Instant;
 use global::params::Params;
 use standard::{OrderCommand};
@@ -242,110 +239,110 @@ impl Strategy {
     // 更新当前strategy的各类信息
     // #[instrument(skip(self, trader_msg), level="TRACE")]
     pub fn _update_data(&mut self,
-                        local_position: &LocalPosition,
-                        agg_market: &Vec<Decimal>,
-                        local_cash: &Decimal,
-                        local_coin: &Decimal,
-                        ref_price: &Vec<Vec<Decimal>>,
-                        predict: &Decimal) -> bool {
-        // position信息更新
-        if self.pos.long_pos != local_position.long_pos {
-            self.pos.long_pos = local_position.long_pos;
-            self.pos.long_avg = local_position.long_avg;
-        }
-        if self.pos.short_pos != local_position.short_pos {
-            self.pos.short_pos = local_position.short_pos;
-            self.pos.short_avg = local_position.short_avg;
-        }
-        // debug!(?self.pos);
-
-        // 价格值处理
-        self.bp = agg_market[global::public_params::BID_PRICE_INDEX];
-        self.ap = agg_market[global::public_params::ASK_PRICE_INDEX];
-        self.mp = (self.bp + self.ap) * dec!(0.5);
-        // 中间价的ema值处理
-        if self.mp_ema.eq(&Decimal::ZERO) {
-            self.mp_ema = self.mp;
-        } else {
-            self.mp_ema = self.mp_ema * dec!(0.999) + self.mp * dec!(0.001);
-        }
-        // debug!(?self.bp, ?self.ap, ?self.mp, ?self.mp_ema);
-
-        // 动态杠杆调节
-        if self.mp > self.mp_ema {
-            self.adjust_lever_rate = Decimal::ONE;
-        } else {
-            self.adjust_lever_rate = dec!(0.8);
-        }
-        // debug!(?self.adjust_lever_rate);
-
-        // 当前持仓价值处理
-        self.long_hold_value = self.pos.long_pos * self.mp;
-        self.short_hold_value = self.pos.short_pos * self.mp;
-        // debug!(?self.long_hold_value, ?self.short_hold_value);
-
-        // 分现货或合约计算最大开仓价值
-        if self.exchange.contains("spot") {
-            self.max_long_value = *local_cash * self.lever_rate * self.adjust_lever_rate;
-            self.max_short_value = *local_coin * self.lever_rate * self.adjust_lever_rate * self.mp;
-        } else {
-            self.max_long_value = self.equity * self.lever_rate * self.adjust_lever_rate;
-            self.max_short_value = self.max_long_value;
-        }
-        // debug!(?self.max_long_value, ?self.max_short_value, ?self.equity, ?self.lever_rate, ?self.adjust_lever_rate);
-
-        // 做市模式识别
-        if self.ref_name[self.ref_index].eq(&self.trade_name) {
-            self.maker_mode = "free".to_string();
-        } else {
-            self.maker_mode = "follow".to_string();
-        }
-        // debug!(?self.maker_mode);
-
-        // 参考价格
-        if ref_price.len() == 0 {
-            self.ref_bp = self.bp;
-            self.ref_ap = self.ap;
-            self.ref_price = self.mp;
-        } else {
-            self.ref_bp = ref_price[self.ref_index][0];
-            self.ref_ap = ref_price[self.ref_index][1];
-            self.ref_price = (self.ref_bp + self.ref_ap) * dec!(0.5);
-        }
-        // debug!(?self.ref_bp, ?self.ref_ap, %self.ref_price);
-
-        // spread
-        let temp_predict = predict * self.predict_alpha;
-        self.predict = utils::clip(temp_predict, -self.trade_open_dist, self.trade_open_dist);
-        // debug!(?self.predict);
-
-        // 计算当前账户cash和coin
-        self.coin = local_coin.clone();
-        self.cash = local_cash.clone();
-        self.equity = local_cash + local_coin * self.mp;
-        if self.equity > self.max_equity {
-            self.max_equity = self.equity;
-        }
-        // debug!(?self.coin, ?self.cash, ?self.equity, ?self.max_equity);
-
-        // 总可开数量
-        self.total_amount = self.equity * self.lever_rate * self.adjust_lever_rate / self.mp;
-        self.total_amount = utils::fix_amount(self.total_amount, self.step_size);
-        // debug!(?self.total_amount);
-        if self.total_amount.eq(&Decimal::ZERO) {
-            error!("总可开数量低于一张,请尝试加大杠杆倍数或资金!equity={}, lever_rate={}, adjust_lever_rate={}, mp={}, step_size={}",
-                self.equity, self.lever_rate, self.adjust_lever_rate, self.mp, self.step_size);
-            return false;
-        }
-
-        // 求最大pos
-        if self.equity > Decimal::ZERO {
-            let max_pos_rate = max(self.pos.long_pos, self.pos.short_pos) * self.mp / self.equity;
-            if max_pos_rate > self.max_pos_rate {
-                self.max_pos_rate = max_pos_rate;
-            }
-            // debug!(?max_pos_rate, ?self.max_pos_rate);
-        }
+                        _local_position: &LocalPosition,
+                        _agg_market: &Vec<Decimal>,
+                        _local_cash: &Decimal,
+                        _local_coin: &Decimal,
+                        _ref_price: &Vec<Vec<Decimal>>,
+                        _predict: &Decimal) -> bool {
+        // // position信息更新
+        // if self.pos.long_pos != local_position.long_pos {
+        //     self.pos.long_pos = local_position.long_pos;
+        //     self.pos.long_avg = local_position.long_avg;
+        // }
+        // if self.pos.short_pos != local_position.short_pos {
+        //     self.pos.short_pos = local_position.short_pos;
+        //     self.pos.short_avg = local_position.short_avg;
+        // }
+        // // debug!(?self.pos);
+        //
+        // // 价格值处理
+        // self.bp = agg_market[global::public_params::BID_PRICE_INDEX];
+        // self.ap = agg_market[global::public_params::ASK_PRICE_INDEX];
+        // self.mp = (self.bp + self.ap) * dec!(0.5);
+        // // 中间价的ema值处理
+        // if self.mp_ema.eq(&Decimal::ZERO) {
+        //     self.mp_ema = self.mp;
+        // } else {
+        //     self.mp_ema = self.mp_ema * dec!(0.999) + self.mp * dec!(0.001);
+        // }
+        // // debug!(?self.bp, ?self.ap, ?self.mp, ?self.mp_ema);
+        //
+        // // 动态杠杆调节
+        // if self.mp > self.mp_ema {
+        //     self.adjust_lever_rate = Decimal::ONE;
+        // } else {
+        //     self.adjust_lever_rate = dec!(0.8);
+        // }
+        // // debug!(?self.adjust_lever_rate);
+        //
+        // // 当前持仓价值处理
+        // self.long_hold_value = self.pos.long_pos * self.mp;
+        // self.short_hold_value = self.pos.short_pos * self.mp;
+        // // debug!(?self.long_hold_value, ?self.short_hold_value);
+        //
+        // // 分现货或合约计算最大开仓价值
+        // if self.exchange.contains("spot") {
+        //     self.max_long_value = *local_cash * self.lever_rate * self.adjust_lever_rate;
+        //     self.max_short_value = *local_coin * self.lever_rate * self.adjust_lever_rate * self.mp;
+        // } else {
+        //     self.max_long_value = self.equity * self.lever_rate * self.adjust_lever_rate;
+        //     self.max_short_value = self.max_long_value;
+        // }
+        // // debug!(?self.max_long_value, ?self.max_short_value, ?self.equity, ?self.lever_rate, ?self.adjust_lever_rate);
+        //
+        // // 做市模式识别
+        // if self.ref_name[self.ref_index].eq(&self.trade_name) {
+        //     self.maker_mode = "free".to_string();
+        // } else {
+        //     self.maker_mode = "follow".to_string();
+        // }
+        // // debug!(?self.maker_mode);
+        //
+        // // 参考价格
+        // if ref_price.len() == 0 {
+        //     self.ref_bp = self.bp;
+        //     self.ref_ap = self.ap;
+        //     self.ref_price = self.mp;
+        // } else {
+        //     self.ref_bp = ref_price[self.ref_index][0];
+        //     self.ref_ap = ref_price[self.ref_index][1];
+        //     self.ref_price = (self.ref_bp + self.ref_ap) * dec!(0.5);
+        // }
+        // // debug!(?self.ref_bp, ?self.ref_ap, %self.ref_price);
+        //
+        // // spread
+        // let temp_predict = predict * self.predict_alpha;
+        // self.predict = utils::clip(temp_predict, -self.trade_open_dist, self.trade_open_dist);
+        // // debug!(?self.predict);
+        //
+        // // 计算当前账户cash和coin
+        // self.coin = local_coin.clone();
+        // self.cash = local_cash.clone();
+        // self.equity = local_cash + local_coin * self.mp;
+        // if self.equity > self.max_equity {
+        //     self.max_equity = self.equity;
+        // }
+        // // debug!(?self.coin, ?self.cash, ?self.equity, ?self.max_equity);
+        //
+        // // 总可开数量
+        // self.total_amount = self.equity * self.lever_rate * self.adjust_lever_rate / self.mp;
+        // self.total_amount = utils::fix_amount(self.total_amount, self.step_size);
+        // // debug!(?self.total_amount);
+        // if self.total_amount.eq(&Decimal::ZERO) {
+        //     error!("总可开数量低于一张,请尝试加大杠杆倍数或资金!equity={}, lever_rate={}, adjust_lever_rate={}, mp={}, step_size={}",
+        //         self.equity, self.lever_rate, self.adjust_lever_rate, self.mp, self.step_size);
+        //     return false;
+        // }
+        //
+        // // 求最大pos
+        // if self.equity > Decimal::ZERO {
+        //     let max_pos_rate = max(self.pos.long_pos, self.pos.short_pos) * self.mp / self.equity;
+        //     if max_pos_rate > self.max_pos_rate {
+        //         self.max_pos_rate = max_pos_rate;
+        //     }
+        //     // debug!(?max_pos_rate, ?self.max_pos_rate);
+        // }
 
         return true;
     }
@@ -1251,7 +1248,6 @@ impl Strategy {
                    local_coin: &Decimal,
                    ref_price: &Vec<Vec<Decimal>>,
                    predict: &Decimal,
-                   is_sigma_allow_open: &bool,
                    _ins: &Instant) -> OrderCommand {
         self.on_time_print();
 
@@ -1280,12 +1276,7 @@ impl Strategy {
         // 下单指令处理逻辑
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
         self._post_close(&mut command, local_orders);               // 平仓单命令处理
-        // 波动率是否允许下单
-        if *is_sigma_allow_open {
-            self._post_open(&mut command, local_orders);                // 限价单命令处理
-        } else {
-            info!("波动率异常订单亏损, 不开单!");
-        }
+        self._post_open(&mut command, local_orders);                // 限价单命令处理
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter