JiahengHe %!s(int64=2) %!d(string=hai) anos
pai
achega
83133ca683
Modificáronse 2 ficheiros con 66 adicións e 5 borrados
  1. 1 1
      strategy/src/model.rs
  2. 65 4
      strategy/src/quant.rs

+ 1 - 1
strategy/src/model.rs

@@ -26,7 +26,7 @@ impl LocalPosition {
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct TraderMsg{
     pub position: LocalPosition,
     pub cash: Decimal,

+ 65 - 4
strategy/src/quant.rs

@@ -2,14 +2,15 @@ use std::collections::{BTreeMap, HashMap};
 use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
-use std::thread::{sleep};
 use std::time::Duration;
-use chrono::{Utc};
+use chrono::{Timelike, Utc};
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
 use serde_json::Value;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::sync::Mutex;
+use tokio::task::JoinHandle;
+use tokio::time::sleep;
 use tracing::{error, info};
 use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
 use exchanges::gate_swap_rest::GateSwapRest;
@@ -797,7 +798,7 @@ impl Quant {
     }
 
     pub async fn before_trade(&mut self) -> bool {
-        sleep(Duration::from_secs(1));
+        sleep(Duration::from_secs(1)).await;
 
         // 获取市场信息
         self.get_exchange_info().await;
@@ -873,7 +874,8 @@ impl Quant {
         // 初始化调度器
         self.local_cash = start_cash;
         self.local_coin = start_coin;
-        return true;
+
+
         /*
         ###### 交易前准备就绪 可以开始交易 ######
         self.loop.create_task(self.rest.go())
@@ -882,12 +884,70 @@ impl Quant {
         self.loop.create_task(self.run_stratey())
         self.loop.create_task(self.early_stop_loop())
         */
+        return true;
     }
 }
 
 fn parse_json_array(json: &str) -> serde_json::Result<Vec<Value>> {
     serde_json::from_str(json)
 }
+
+pub async fn run_stratey(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
+    let quant_arc_clone = quant_arc.clone();
+    return tokio::spawn(async move {
+        //定期触发策略
+        info!("定时触发器启动");
+        info!("前期准备完成");
+        sleep(Duration::from_secs(10)).await;
+        loop {
+            let start_time = Utc::now().second();
+            let mut quant = quant_arc_clone.lock().await;
+            if quant.ready == 1 {
+                // 更新交易信息集合
+                quant.update_trade_msg();
+                if quant.mode_signal != 0 {
+                    if quant.mode_signal > 1 {
+                        quant.mode_signal -= 1;
+                    }
+                    if quant.mode_signal == 1 {
+                        return;
+                    }
+                    // 触发策略  更新策略时间
+                    quant.strategy.local_time = Utc::now().timestamp_millis();
+                    let trade_msg = quant.trade_msg.clone();
+                    // 获取信号
+                    if quant.mode_signal > 20 {
+
+                        // 先执行onExit
+                        let orders = quant.strategy.on_exit(&trade_msg);
+                        if orders.is_not_empty() {
+                            info!("触发onExit");
+                            quant._update_local_orders(&orders);
+                            // tokio::spawn(async {
+                            //     self.platform_rest.command_order(orders).await;
+                            // });
+                        }
+                    } else {
+                        // 再执行onSleep
+                        let orders = quant.strategy.on_sleep(&trade_msg);
+                        // 记录指令触发信息
+                        if orders.is_not_empty() {
+                            quant._update_local_orders(&orders);
+                            // tokio::spawn(async {
+                            //     self.platform_rest.command_order(orders).await;
+                            // });
+                        }
+                    }
+                }
+            } else {
+                quant.check_ready();
+            }
+            // 计算耗时并进行休眠
+            let pass_time = Utc::now().second() - start_time;
+            sleep(Duration::from_secs(pass_time as u64)).await;
+        }
+    });
+}
 pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
     let (tx, mut rx) = channel(100);
     let gate_exc = GateSwapRest::new(false, exchange_params.clone());
@@ -925,6 +985,7 @@ pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols
         gate_exc.set_subscribe(vec![GateSubscribeType::PuFuturesTrades]);
         gate_exc.custom_subscribe(symbols_one.clone()).await;
     });
+
     tokio::spawn(async move {
         let bot_arc_clone = Arc::clone(&quant_arc);
         let run_symbol = symbols.clone()[0].clone();