Jelajahi Sumber

Merge branch 'master' of http://git.skyfffire.com/skyfffire/as-rust

875428575@qq.com 2 tahun lalu
induk
melakukan
6db4301ac6
3 mengubah file dengan 233 tambahan dan 172 penghapusan
  1. 2 166
      strategy/src/attach_.rs
  2. 1 1
      strategy/src/model.rs
  3. 230 5
      strategy/src/quant.rs

+ 2 - 166
strategy/src/attach_.rs

@@ -1,4 +1,3 @@
-use std::cmp::max;
 use std::sync::Arc;
 use std::time::Duration;
 use chrono::Utc;
@@ -6,172 +5,9 @@ use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
 use tokio::sync::Mutex;
 use tokio::task::JoinHandle;
-use tracing::{info, warn};
+use tracing::{info};
 use crate::quant::Quant;
 
-pub fn check_risk(mut _self: Quant) {
-    // 参数检查的风控
-    if _self.strategy.start_cash == Decimal::ZERO {
-        warn!("请检查交易账户余额");
-        warn!(?_self.strategy.start_cash);
-        return;
-    }
-
-    if _self.strategy.mp == Decimal::ZERO {
-        warn!("请检查最新价格");
-        warn!(?_self.strategy.mp);
-        return;
-    }
-
-    // 不是现货执行的回撤风控1
-    if !_self.exchange.contains("spot") {
-        let draw_back = Decimal::ONE - _self.strategy.equity / _self.strategy.max_equity;
-
-        if draw_back > _self.stop_loss {
-            let exit_msg = format!("{} 总资金吊灯回撤 {}。当前净值:{}, 最高净值{},触发止损,准备停机。",
-                                   _self.params.account_name, draw_back, _self.strategy.equity, _self.strategy.max_equity);
-            warn!(exit_msg);
-            _self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // _self.stop();
-        }
-    }
-    // 回撤风控2
-    let draw_back = _self.local_profit / _self.strategy.start_equity;
-    if draw_back < -_self.stop_loss {
-        let exit_msg = format!("{} 交易亏损,触发止损,准备停机。", _self.params.account_name);
-        warn!(exit_msg);
-        _self.exit_msg = exit_msg;
-        // TODO quant.stop()不存在
-        // _self.stop()
-    }
-    // 报单延迟风控,平均延迟允许上限5000ms
-    // TODO quant.platform_rest.avg_delay不存在
-    // if _self.platform_rest.avg_delay > 5000 {
-    //     let exit_msg = format!("{} 延迟爆表 触发风控 准备停机。", _self.params.account_name);
-    //     warn!(exit_msg);
-    //     _self.exit_msg = exit_msg;
-    //     // TODO quant.stop()不存在
-    //     // _self.stop()
-    // }
-
-    // 仓位异常风控,只在合约模式下执行
-    if !_self.exchange.contains("spot") {
-        let long_diff = (_self.local_position.long_pos - _self.local_position_by_orders.long_pos).abs();
-        let short_diff = (_self.local_position.short_pos - _self.local_position_by_orders.short_pos).abs();
-        let diff_pos = max(long_diff, short_diff);
-        let diff_pos_value = diff_pos * _self.strategy.mp;
-        if diff_pos_value > _self.strategy._min_amount_value {
-            warn!("{}发现仓位异常", _self.params.account_name);
-            warn!(?_self.local_position_by_orders, ?_self.local_position);
-            _self.position_check_series.push(1);
-        } else {
-            _self.position_check_series.push(0);
-        }
-
-        // _self.position_check_series长度限制
-        if _self.position_check_series.len() > 30 {
-            _self.position_check_series.remove(0);
-        }
-
-        // 连续不符合判定
-        if _self.position_check_series.iter().sum::<i8>() >= 30 {
-            let exit_msg = format!("{} 合约连续检查本地仓位和推算仓位不符合,退出。", _self.params.account_name);
-            warn!(exit_msg);
-            _self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // _self.stop()
-        }
-    }
-
-    // 下单异常风控
-    if _self.strategy.total_amount == Decimal::ZERO {
-        let exit_msg = format!("{} 开仓量为0,退出。", _self.params.account_name);
-        warn!(exit_msg);
-        _self.exit_msg = exit_msg;
-        // TODO quant.stop()不存在
-        // _self.stop()
-    }
-
-    // 行情更新异常风控
-    let mut exchange_names = _self.ref_name.clone();
-    exchange_names.push(_self.trade_name);
-    for exchange_name in exchange_names {
-        let now_time_millis = Utc::now().timestamp_millis();
-        let last_update_millis = _self.market_update_time.get(&exchange_name).unwrap();
-        let delay = now_time_millis - last_update_millis;
-        let limit = global::public_params::MARKET_DELAY_LIMIT;
-
-        if delay > limit {
-            let exit_msg = format!("{} ticker_name:{}, delay:{}ms,行情更新延迟过高,退出。",
-                                   _self.params.account_name, exchange_name, delay);
-            warn!(?now_time_millis, ?last_update_millis, ?limit);
-            warn!(exit_msg);
-            _self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // _self.stop()
-        }
-    }
-
-    // 订单异常风控
-    for (client_id, order) in _self.local_orders {
-        // 订单长时间停留 怀疑漏单 但未必一定漏 5min
-        if Utc::now().timestamp_millis() - order.local_time > 5 * 60 * 1000 {
-            let exit_msg = format!("{}订单停留过长,怀疑异常,退出,cid:{}。", _self.params.account_name, client_id);
-            warn!(exit_msg);
-            _self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // _self.stop()
-        }
-    }
-
-    // 持仓均价异常风控
-    if _self.strategy.long_pos_bias != Decimal::ZERO {
-        if _self.strategy.long_hold_value > Decimal::TWO * _self.strategy._min_amount_value {
-            if _self.strategy.long_pos_bias > dec!(4) || _self.strategy.long_pos_bias < -Decimal::TWO {
-                let exit_msg = format!("{} long_pos_bias: {},持仓均价异常,退出。", _self.params.account_name, _self.strategy.long_pos_bias);
-                warn!(exit_msg);
-                _self.exit_msg = exit_msg;
-                // TODO quant.stop()不存在
-                // _self.stop()
-            }
-        }
-    }
-    if _self.strategy.short_pos_bias != Decimal::ZERO {
-        if _self.strategy.short_hold_value > Decimal::TWO * _self.strategy._min_amount_value {
-            if _self.strategy.short_pos_bias > dec!(4) || _self.strategy.short_pos_bias < -Decimal::TWO {
-                let exit_msg = format!("{} short_pos_bias: {},持仓均价异常,退出。", _self.params.account_name, _self.strategy.long_pos_bias);
-                warn!(exit_msg);
-                _self.exit_msg = exit_msg;
-                // TODO quant.stop()不存在
-                // _self.stop()
-            }
-        }
-    }
-
-    // 订单撤单异常风控
-    for (client_id, cancel_delay) in _self.local_cancel_log {
-        if cancel_delay > 300 {
-            let exit_msg = format!("{} 长时间无法撤销,client_id: {},退出。", _self.params.account_name, client_id);
-            warn!(exit_msg);
-            warn!(?_self.strategy.ref_price, ?_self.strategy.mp);
-            _self.exit_msg = exit_msg;
-            // TODO quant.stop()不存在
-            // _self.stop()
-        }
-    }
-
-    // 定价异常风控
-    if (_self.strategy.ref_price - _self.strategy.mp).abs() / _self.strategy.mp > dec!(0.03) {
-        let exit_msg = format!("{} 定价偏离过大,怀疑定价异常,退出。", _self.params.account_name);
-        warn!(exit_msg);
-        warn!(?_self.strategy.ref_price, ?_self.strategy.mp);
-        _self.exit_msg = exit_msg;
-        // TODO quant.stop()不存在
-        // _self.stop()
-    }
-}
-
 // 定期触发的系统逻辑
 pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {
     let quant_arc_clone = quant_arc.clone();
@@ -185,7 +21,7 @@ pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {
             let mut quant = quant_arc_clone.lock().await;
             {
                 // 检查风控
-                // quant.check_risk();
+                quant.check_risk();
 
                 // 线程停止信号
                 if quant.mode_signal == 1 {

+ 1 - 1
strategy/src/model.rs

@@ -26,7 +26,7 @@ impl LocalPosition {
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct TraderMsg{
     pub position: LocalPosition,
     pub cash: Decimal,

+ 230 - 5
strategy/src/quant.rs

@@ -1,16 +1,18 @@
+use std::cmp::max;
 use std::collections::{BTreeMap, HashMap};
 use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
-use std::thread::{sleep};
 use std::time::Duration;
-use chrono::{Utc};
+use chrono::{Timelike, Utc};
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
 use serde_json::Value;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::sync::Mutex;
-use tracing::{error, info};
+use tokio::task::JoinHandle;
+use tokio::time::sleep;
+use tracing::{error, info, warn};
 use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
 use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
@@ -796,8 +798,171 @@ impl Quant {
         }
     }
 
+    pub fn check_risk(&mut self) {
+        // 参数检查的风控
+        if self.strategy.start_cash == Decimal::ZERO {
+            warn!("请检查交易账户余额");
+            warn!(?self.strategy.start_cash);
+            return;
+        }
+
+        if self.strategy.mp == Decimal::ZERO {
+            warn!("请检查最新价格");
+            warn!(?self.strategy.mp);
+            return;
+        }
+
+        // 不是现货执行的回撤风控1
+        if !self.exchange.contains("spot") {
+            let draw_back = Decimal::ONE - self.strategy.equity / self.strategy.max_equity;
+
+            if draw_back > self.stop_loss {
+                let exit_msg = format!("{} 总资金吊灯回撤 {}。当前净值:{}, 最高净值{},触发止损,准备停机。",
+                                       self.params.account_name, draw_back, self.strategy.equity, self.strategy.max_equity);
+                warn!(exit_msg);
+                self.exit_msg = exit_msg;
+                // TODO quant.stop()不存在
+                // _self.stop();
+            }
+        }
+        // 回撤风控2
+        let draw_back = self.local_profit / self.strategy.start_equity;
+        if draw_back < -self.stop_loss {
+            let exit_msg = format!("{} 交易亏损,触发止损,准备停机。", self.params.account_name);
+            warn!(exit_msg);
+            self.exit_msg = exit_msg;
+            // TODO quant.stop()不存在
+            // _self.stop()
+        }
+        // 报单延迟风控,平均延迟允许上限5000ms
+        // TODO quant.platform_rest.avg_delay不存在
+        // if _self.platform_rest.avg_delay > 5000 {
+        //     let exit_msg = format!("{} 延迟爆表 触发风控 准备停机。", _self.params.account_name);
+        //     warn!(exit_msg);
+        //     _self.exit_msg = exit_msg;
+        //     // TODO quant.stop()不存在
+        //     // _self.stop()
+        // }
+
+        // 仓位异常风控,只在合约模式下执行
+        if !self.exchange.contains("spot") {
+            let long_diff = (self.local_position.long_pos - self.local_position_by_orders.long_pos).abs();
+            let short_diff = (self.local_position.short_pos - self.local_position_by_orders.short_pos).abs();
+            let diff_pos = max(long_diff, short_diff);
+            let diff_pos_value = diff_pos * self.strategy.mp;
+            if diff_pos_value > self.strategy._min_amount_value {
+                warn!("{}发现仓位异常", self.params.account_name);
+                warn!(?self.local_position_by_orders, ?self.local_position);
+                self.position_check_series.push(1);
+            } else {
+                self.position_check_series.push(0);
+            }
+
+            // self.position_check_series长度限制
+            if self.position_check_series.len() > 30 {
+                self.position_check_series.remove(0);
+            }
+
+            // 连续不符合判定
+            if self.position_check_series.iter().sum::<i8>() >= 30 {
+                let exit_msg = format!("{} 合约连续检查本地仓位和推算仓位不符合,退出。", self.params.account_name);
+                warn!(exit_msg);
+                self.exit_msg = exit_msg;
+                // TODO quant.stop()不存在
+                // self.stop()
+            }
+        }
+
+        // 下单异常风控
+        if self.strategy.total_amount == Decimal::ZERO {
+            let exit_msg = format!("{} 开仓量为0,退出。", self.params.account_name);
+            warn!(exit_msg);
+            self.exit_msg = exit_msg;
+            // TODO quant.stop()不存在
+            // self.stop()
+        }
+
+        // 行情更新异常风控
+        let mut exchange_names = self.ref_name.clone();
+        exchange_names.push(self.trade_name.clone());
+        for exchange_name in exchange_names {
+            let now_time_millis = Utc::now().timestamp_millis();
+            let last_update_millis = self.market_update_time.get(&exchange_name).unwrap();
+            let delay = now_time_millis - last_update_millis;
+            let limit = global::public_params::MARKET_DELAY_LIMIT;
+
+            if delay > limit {
+                let exit_msg = format!("{} ticker_name:{}, delay:{}ms,行情更新延迟过高,退出。",
+                                       self.params.account_name, exchange_name, delay);
+                warn!(?now_time_millis, ?last_update_millis, ?limit);
+                warn!(exit_msg);
+                self.exit_msg = exit_msg;
+                // TODO quant.stop()不存在
+                // self.stop()
+            }
+        }
+        let local_orders = self.local_orders.clone();
+        // 订单异常风控
+        for (client_id, order) in local_orders{
+            // 订单长时间停留 怀疑漏单 但未必一定漏 5min
+            if Utc::now().timestamp_millis() - order.local_time > 5 * 60 * 1000 {
+                let exit_msg = format!("{}订单停留过长,怀疑异常,退出,cid:{}。", self.params.account_name, client_id);
+                warn!(exit_msg);
+                self.exit_msg = exit_msg;
+                // TODO quant.stop()不存在
+                // self.stop()
+            }
+        }
+
+        // 持仓均价异常风控
+        if self.strategy.long_pos_bias != Decimal::ZERO {
+            if self.strategy.long_hold_value > Decimal::TWO * self.strategy._min_amount_value {
+                if self.strategy.long_pos_bias > dec!(4) || self.strategy.long_pos_bias < -Decimal::TWO {
+                    let exit_msg = format!("{} long_pos_bias: {},持仓均价异常,退出。", self.params.account_name, self.strategy.long_pos_bias);
+                    warn!(exit_msg);
+                    self.exit_msg = exit_msg;
+                    // TODO quant.stop()不存在
+                    // self.stop()
+                }
+            }
+        }
+        if self.strategy.short_pos_bias != Decimal::ZERO {
+            if self.strategy.short_hold_value > Decimal::TWO * self.strategy._min_amount_value {
+                if self.strategy.short_pos_bias > dec!(4) || self.strategy.short_pos_bias < -Decimal::TWO {
+                    let exit_msg = format!("{} short_pos_bias: {},持仓均价异常,退出。", self.params.account_name, self.strategy.long_pos_bias);
+                    warn!(exit_msg);
+                    self.exit_msg = exit_msg;
+                    // TODO quant.stop()不存在
+                    // self.stop()
+                }
+            }
+        }
+
+        // 订单撤单异常风控
+        for (client_id, cancel_delay) in self.local_cancel_log.clone() {
+            if cancel_delay > 300 {
+                let exit_msg = format!("{} 长时间无法撤销,client_id: {},退出。", self.params.account_name, client_id);
+                warn!(exit_msg);
+                warn!(?self.strategy.ref_price, ?self.strategy.mp);
+                self.exit_msg = exit_msg;
+                // TODO quant.stop()不存在
+                // self.stop()
+            }
+        }
+
+        // 定价异常风控
+        if (self.strategy.ref_price - self.strategy.mp).abs() / self.strategy.mp > dec!(0.03) {
+            let exit_msg = format!("{} 定价偏离过大,怀疑定价异常,退出。", self.params.account_name);
+            warn!(exit_msg);
+            warn!(?self.strategy.ref_price, ?self.strategy.mp);
+            self.exit_msg = exit_msg;
+            // TODO quant.stop()不存在
+            // self.stop()
+        }
+    }
+
     pub async fn before_trade(&mut self) -> bool {
-        sleep(Duration::from_secs(1));
+        sleep(Duration::from_secs(1)).await;
 
         // 获取市场信息
         self.get_exchange_info().await;
@@ -873,7 +1038,8 @@ impl Quant {
         // 初始化调度器
         self.local_cash = start_cash;
         self.local_coin = start_coin;
-        return true;
+
+
         /*
         ###### 交易前准备就绪 可以开始交易 ######
         self.loop.create_task(self.rest.go())
@@ -882,12 +1048,70 @@ impl Quant {
         self.loop.create_task(self.run_stratey())
         self.loop.create_task(self.early_stop_loop())
         */
+        return true;
     }
 }
 
 fn parse_json_array(json: &str) -> serde_json::Result<Vec<Value>> {
     serde_json::from_str(json)
 }
+
+pub async fn run_stratey(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
+    let quant_arc_clone = quant_arc.clone();
+    return tokio::spawn(async move {
+        //定期触发策略
+        info!("定时触发器启动");
+        info!("前期准备完成");
+        sleep(Duration::from_secs(10)).await;
+        loop {
+            let start_time = Utc::now().second();
+            let mut quant = quant_arc_clone.lock().await;
+            if quant.ready == 1 {
+                // 更新交易信息集合
+                quant.update_trade_msg();
+                if quant.mode_signal != 0 {
+                    if quant.mode_signal > 1 {
+                        quant.mode_signal -= 1;
+                    }
+                    if quant.mode_signal == 1 {
+                        return;
+                    }
+                    // 触发策略  更新策略时间
+                    quant.strategy.local_time = Utc::now().timestamp_millis();
+                    let trade_msg = quant.trade_msg.clone();
+                    let platform_rest_fb = quant.platform_rest.clone_box();
+                    // 获取信号
+                    if quant.mode_signal > 20 {
+                        // 先执行onExit
+                        let orders = quant.strategy.on_exit(&trade_msg);
+                        if orders.is_not_empty() {
+                            info!("触发onExit");
+                            quant._update_local_orders(&orders);
+                            tokio::spawn(async move {
+                                platform_rest_fb.command_order(orders).await;
+                            });
+                        }
+                    } else {
+                        // 再执行onSleep
+                        let orders = quant.strategy.on_sleep(&trade_msg);
+                        // 记录指令触发信息
+                        if orders.is_not_empty() {
+                            quant._update_local_orders(&orders);
+                            tokio::spawn(async move{
+                                platform_rest_fb.command_order(orders).await;
+                            });
+                        }
+                    }
+                }
+            } else {
+                quant.check_ready();
+            }
+            // 计算耗时并进行休眠
+            let pass_time = Utc::now().second() - start_time;
+            sleep(Duration::from_secs(pass_time as u64)).await;
+        }
+    });
+}
 pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
     let (tx, mut rx) = channel(100);
     let gate_exc = GateSwapRest::new(false, exchange_params.clone());
@@ -925,6 +1149,7 @@ pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols
         gate_exc.set_subscribe(vec![GateSubscribeType::PuFuturesTrades]);
         gate_exc.custom_subscribe(symbols_one.clone()).await;
     });
+
     tokio::spawn(async move {
         let bot_arc_clone = Arc::clone(&quant_arc);
         let run_symbol = symbols.clone()[0].clone();