JiahengHe 2 年 前
コミット
663d93ad1c
3 ファイル変更45 行追加42 行削除
  1. 3 2
      src/quant_libs.rs
  2. 10 10
      strategy/src/exchange_disguise.rs
  3. 32 30
      strategy/src/quant.rs

+ 3 - 2
src/quant_libs.rs

@@ -13,6 +13,7 @@ use standard::Order;
 use strategy::model::OrderInfo;
 
 pub async fn init(params: Params, running: Arc<AtomicBool>) -> Arc<Mutex<Quant>> {
+    let mut stop_flag = Arc::new(AtomicBool::new(true));
     // 封装
     let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
     exchange_params.insert("access_key".to_string(), params.access_key.clone());
@@ -29,9 +30,9 @@ pub async fn init(params: Params, running: Arc<AtomicBool>) -> Arc<Mutex<Quant>>
     quant_obj.before_trade().await;
     let mut quant_arc = Arc::new(Mutex::new(quant_obj));
     // 参考交易所
-    exchange_disguise::run_reference_exchange(params.ref_exchange.get(0).unwrap().clone(), quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
+    exchange_disguise::run_reference_exchange(stop_flag.clone(),params.ref_exchange.get(0).unwrap().clone(), quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
     // 交易交易所
-    exchange_disguise::run_transactional_exchange(params.exchange, quant_arc.clone(),  trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
+    exchange_disguise::run_transactional_exchange(stop_flag.clone(), params.exchange, quant_arc.clone(),  trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
     // 启动定期触发的系统逻辑
     quant::on_timer(quant_arc.clone());
     // 启动策略逻辑

+ 10 - 10
strategy/src/exchange_disguise.rs

@@ -1,6 +1,7 @@
 use std::collections::BTreeMap;
 use std::str::FromStr;
 use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
 use std::time::Duration;
 use rust_decimal::Decimal;
 use serde_json::Value;
@@ -17,10 +18,10 @@ use crate::model::{OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa};
 use crate::quant::Quant;
 
 // 交易交易所启动
-pub async fn run_transactional_exchange(exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     match exchange_name.as_str() {
         "gate_usdt_swap" => {
-            transactional_gate_swap_run(1i8, quant_arc, name, symbols, exchange_params).await;
+            transactional_gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
         }
         _ => {
             panic!("参数错误!")
@@ -29,13 +30,13 @@ pub async fn run_transactional_exchange(exchange_name: String, quant_arc: Arc<Mu
 }
 
 // 参考交易所启动
-pub async fn run_reference_exchange(exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     match exchange_name.as_str() {
         "binance_usdt_swap" => {
-            reference_binance_swap_run(quant_arc, name, symbols, exchange_params).await;
+            reference_binance_swap_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
         }
         "gate_usdt_swap" => {
-            transactional_gate_swap_run(0i8, quant_arc, name, symbols, exchange_params).await;
+            transactional_gate_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
         }
         _ => {
             panic!("参数错误!")
@@ -44,7 +45,7 @@ pub async fn run_reference_exchange(exchange_name: String, quant_arc: Arc<Mutex<
 }
 
 // 交易 gate 合约 启动
-async fn transactional_gate_swap_run(type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+async fn transactional_gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     let (tx, mut rx) = channel(100);
     let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
     // 获取user_id
@@ -87,8 +88,7 @@ async fn transactional_gate_swap_run(type_num: i8, quant_arc: Arc<Mutex<Quant>>,
                 GateSubscribeType::PuFuturesOrderBook
             ]);
         }
-
-        gate_exc.custom_subscribe(symbols_one).await;
+        gate_exc.custom_subscribe(bool_v1,symbols_one).await;
     });
     tokio::spawn(async move {
         let bot_arc_clone = Arc::clone(&quant_arc);
@@ -177,7 +177,7 @@ async fn transactional_gate_swap_run(type_num: i8, quant_arc: Arc<Mutex<Quant>>,
 }
 
 // 参考 币安 合约 启动
-async fn reference_binance_swap_run(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     let (tx, mut rx) = channel(100);
     tokio::spawn( async move {
         let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_params, BinanceWsType::PublicAndPrivate, tx);
@@ -185,7 +185,7 @@ async fn reference_binance_swap_run(quant_arc: Arc<Mutex<Quant>>, name: String,
             BinanceSubscribeType::PuBookTicker,
             BinanceSubscribeType::PuAggTrade
         ]);
-        ba_exc.custom_subscribe(symbols.clone()).await;
+        ba_exc.custom_subscribe(bool_v1, symbols.clone()).await;
     });
     tokio::spawn(async move {
         // trade

+ 32 - 30
strategy/src/quant.rs

@@ -9,6 +9,7 @@ use rust_decimal::Decimal;
 use rust_decimal::prelude::{ToPrimitive, Zero};
 use rust_decimal_macros::dec;
 use serde_json::Value;
+use tokio::spawn;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::sync::Mutex;
 use tokio::task::JoinHandle;
@@ -240,7 +241,7 @@ impl Quant {
     }
 
     pub async fn handle_signals(quant_arc: Arc<Mutex<Quant>>, mut rx: Receiver<Order>) {
-        tokio::spawn(async move{
+        spawn(async move{
             loop {
                 sleep(Duration::from_millis(1)).await;
                 match rx.try_recv() {
@@ -318,7 +319,6 @@ impl Quant {
             if self.local_cancel_log.contains_key(&data.client_id) {
                 self.local_cancel_log.remove(&data.client_id);
             }
-            // TODO 本地收到移除动作之后,会去继续撤销指定订单,有逻辑问题.
             if self.local_orders.contains_key(&data.client_id) {
                 info!("删除本地订单, client_id:{}", data.client_id);
                 self.local_orders.remove(&data.client_id);
@@ -472,7 +472,7 @@ impl Quant {
                             //交易所处理订单信号
                             let mut platform_rest_fb = self.platform_rest.clone_box();
                             // info!("订单指令:{:?}", order);
-                            tokio::spawn(async move{
+                            spawn(async move{
                                 info!("update_local_order订单指令:{:?}", order);
                                 platform_rest_fb.command_order(order).await;
                             });
@@ -603,7 +603,7 @@ impl Quant {
                     //异步交易所处理订单信号
                     let mut platform_rest_fb = self.platform_rest.clone_box();
                     // info!("订单指令:{:?}", orders);
-                    tokio::spawn(async move{
+                    spawn(async move{
                         info!("_update_depth订单指令:{:?}", orders);
                         platform_rest_fb.command_order(orders).await;
                     });
@@ -803,7 +803,7 @@ impl Quant {
         }
     }
 
-    pub fn check_risk(&mut self) {
+    pub async fn check_risk(&mut self) {
         // 参数检查的风控
         if self.strategy.start_cash == Decimal::ZERO {
             warn!("请检查交易账户余额");
@@ -826,8 +826,7 @@ impl Quant {
                                        self.params.account_name, draw_back, self.strategy.equity, self.strategy.max_equity);
                 warn!(exit_msg);
                 self.exit_msg = exit_msg;
-                // TODO quant.stop()不存在
-                // _self.stop();
+                self.exit(10).await;
             }
         }
         // 回撤风控2
@@ -836,8 +835,7 @@ impl Quant {
             let exit_msg = format!("{} 交易亏损,触发止损,准备停机。", self.params.account_name);
             warn!(exit_msg);
             self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // _self.stop()
+            self.exit(10).await;
         }
         // 报单延迟风控,平均延迟允许上限5000ms
         // TODO quant.platform_rest.avg_delay不存在
@@ -873,8 +871,7 @@ impl Quant {
                 let exit_msg = format!("{} 合约连续检查本地仓位和推算仓位不符合,退出。", self.params.account_name);
                 warn!(exit_msg);
                 self.exit_msg = exit_msg;
-                // TODO quant.stop()不存在
-                // self.stop()
+                self.exit(10).await;
             }
         }
 
@@ -883,8 +880,7 @@ impl Quant {
             let exit_msg = format!("{} 开仓量为0,退出。", self.params.account_name);
             warn!(exit_msg);
             self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // self.stop()
+            self.exit(10).await;
         }
 
         // 行情更新异常风控
@@ -902,8 +898,7 @@ impl Quant {
                 warn!(?now_time_millis, ?last_update_millis, ?limit);
                 warn!(exit_msg);
                 self.exit_msg = exit_msg;
-                // TODO quant.stop()不存在
-                // self.stop()
+                self.exit(10).await;
             }
         }
         let local_orders = self.local_orders.clone();
@@ -914,8 +909,7 @@ impl Quant {
                 let exit_msg = format!("{}订单停留过长,怀疑异常,退出,cid:{}。", self.params.account_name, client_id);
                 warn!(exit_msg);
                 self.exit_msg = exit_msg;
-                // TODO quant.stop()不存在
-                // self.stop()
+                self.exit(10).await;
             }
         }
 
@@ -926,8 +920,7 @@ impl Quant {
                     let exit_msg = format!("{} long_pos_bias: {},持仓均价异常,退出。", self.params.account_name, self.strategy.long_pos_bias);
                     warn!(exit_msg);
                     self.exit_msg = exit_msg;
-                    // TODO quant.stop()不存在
-                    // self.stop()
+                    self.exit(10).await;
                 }
             }
         }
@@ -937,8 +930,7 @@ impl Quant {
                     let exit_msg = format!("{} short_pos_bias: {},持仓均价异常,退出。", self.params.account_name, self.strategy.long_pos_bias);
                     warn!(exit_msg);
                     self.exit_msg = exit_msg;
-                    // TODO quant.stop()不存在
-                    // self.stop()
+                    self.exit(10).await;
                 }
             }
         }
@@ -950,8 +942,7 @@ impl Quant {
                 warn!(exit_msg);
                 warn!(?self.strategy.ref_price, ?self.strategy.mp);
                 self.exit_msg = exit_msg;
-                // TODO quant.stop()不存在
-                // self.stop()
+                self.exit(10).await;
             }
         }
 
@@ -961,8 +952,7 @@ impl Quant {
             warn!(exit_msg);
             warn!(?self.strategy.ref_price, ?self.strategy.mp);
             self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // self.stop()
+            self.exit(10).await;
         }
     }
 
@@ -1032,8 +1022,16 @@ impl Quant {
         }
     }
 
-
     pub async fn exit(&mut self, delay: i8){
+        /**
+         *  停机函数
+         *  mode_signal 不能小于80
+         *  前6秒用于maker平仓
+         *  后2秒用于撤maker平仓单
+         *  休眠2秒再执行check_position 避免卡单导致漏仓位
+        **/
+        info!("进入停机流程...");
+        self.mode_signal = 80;
         info!("预约退出操作 delay:{}", delay);
         if delay > 0i8 {
             sleep(Duration::from_secs(delay as u64)).await;
@@ -1068,6 +1066,7 @@ impl Quant {
         if start_cash.is_zero() && start_coin.is_zero() {
             self.exit_msg = format!("{}{}{}{}", "初始为零 cash: ", start_cash, " coin: ", start_coin);
             // 停止程序
+            self.exit(10).await;
             return false;
         }
         info!("初始cash: {start_cash} 初始coin: {start_coin}");
@@ -1075,6 +1074,7 @@ impl Quant {
         if mp <= Decimal::ZERO {
             self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp);
             // 停止程序
+            self.exit(10).await;
             return false;
         } else {
             info!("初始价格为 {}", mp);
@@ -1099,6 +1099,7 @@ impl Quant {
         if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero() {
             self.exit_msg = format!("{}{}{}{}", "交易精度未正常获取 step_size: ", self.strategy.step_size, " tick_size:", self.strategy.tick_size);
             // 停止程序
+            self.exit(10).await;
             return false;
         } else {
             info!("数量精度 {}", self.strategy.step_size);
@@ -1124,6 +1125,7 @@ impl Quant {
             (long_one_hand_value < hand_min_limit && short_one_hand_value < hand_min_limit) {
             self.exit_msg = format!("{}{}{}{}", "初始下单量太少 buy: ", long_one_hand_amount, " sell: ", short_one_hand_amount);
             // 停止程序
+            self.exit(10).await;
             return false;
         }
         // 初始化调度器
@@ -1148,7 +1150,7 @@ fn parse_json_array(json: &str) -> serde_json::Result<Vec<Value>> {
 }
 
 pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
-    return tokio::spawn(async move {
+    return spawn(async move {
         //定期触发策略
         info!("定时触发器启动");
         info!("前期准备完成");
@@ -1179,7 +1181,7 @@ pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
                             if orders.is_not_empty() {
                                 info!("触发onExit");
                                 quant._update_local_orders(&orders);
-                                tokio::spawn(async move {
+                                spawn(async move {
                                     platform_rest_fb.command_order(orders).await;
                                 });
                             }
@@ -1189,7 +1191,7 @@ pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
                             // 记录指令触发信息
                             if orders.is_not_empty() {
                                 quant._update_local_orders(&orders);
-                                tokio::spawn(async move {
+                                spawn(async move {
                                     platform_rest_fb.command_order(orders).await;
                                 });
                             }
@@ -1213,7 +1215,7 @@ pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
 pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {
     let quant_arc_clone = quant_arc.clone();
 
-    return tokio::spawn(async move {
+    return spawn(async move {
         tokio::time::sleep(Duration::from_secs(20)).await;
 
         loop {