Bladeren bron

尝试解决中控问题。

skyfffire 2 jaren geleden
bovenliggende
commit
e8deb53de0
2 gewijzigde bestanden met toevoegingen van 23 en 15 verwijderingen
  1. 7 5
      src/main.rs
  2. 16 10
      strategy/src/quant.rs

+ 7 - 5
src/main.rs

@@ -21,16 +21,18 @@ async fn main() {
     // 获取本地配置
     let params = Params::new("config.toml").unwrap();
     info!("配置读取成功:{:?}。", params);
-    // 日志级别配置
-    log_level_init(params.log_level.clone());
-    // 退出检查程序
-    let running = control_c::exit_handler();
+    // 主进程控制
+    let running = Arc::new(AtomicBool::new(true));
     // ws退出程序
     let ws_running = Arc::new(AtomicBool::new(true));
+    // 日志级别配置
+    log_level_init(params.log_level.clone());
     // quant初始化动作
-    let quant_arc = quant_libs::init(params, ws_running.clone()).await;
+    let quant_arc = quant_libs::init(params, ws_running.clone(), running.clone()).await;
     // 初始化中控服务
     server::run_server(6000, running.clone(), quant_arc.clone());
+    // 退出检查程序
+    let running = control_c::exit_handler(running);
 
     // 每一秒检查一次程序是否结束
     while running.load(Ordering::Relaxed) {

+ 16 - 10
strategy/src/quant.rs

@@ -3,6 +3,7 @@ use std::collections::{BTreeMap, HashMap};
 use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use chrono::{Utc};
 use rust_decimal::Decimal;
@@ -99,11 +100,12 @@ pub struct Quant {
     pub max_buy_min_sell_cache: HashMap<String, Vec<Decimal>>,
     // 最近一次的depth信息
     pub local_depths: HashMap<String, Vec<Decimal>>,
-    pub is_update: HashMap<String, bool>
+    pub is_update: HashMap<String, bool>,
+    pub running: Arc<AtomicBool>,
 }
 
 impl Quant {
-    pub async fn new(exchange: String, params: Params, exchange_params: BTreeMap<String, String>,  order_sender: Sender<Order>, error_sender: Sender<Error>) -> Quant {
+    pub async fn new(exchange: String, params: Params, exchange_params: BTreeMap<String, String>,  order_sender: Sender<Order>, error_sender: Sender<Error>, running: Arc<AtomicBool>) -> Quant {
         let symbol = params.pair.clone();
         let pairs: Vec<&str> = params.pair.split('_').collect();
         let mut quant_obj = Quant {
@@ -203,7 +205,8 @@ impl Quant {
             },
             max_buy_min_sell_cache: Default::default(),
             local_depths: Default::default(),
-            is_update: Default::default()
+            is_update: Default::default(),
+            running,
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -340,10 +343,10 @@ impl Quant {
                 self.local_cancel_log.remove(&data.client_id);
             }
             if self.local_orders.contains_key(&data.client_id) {
-                info!("删除本地订单, client_id:{}", data.client_id);
+                debug!("删除本地订单, client_id:{:?}", data);
                 self.local_orders.remove(&data.client_id);
             } else {
-                debug!("该订单不在本地挂单表中, client_id:{}", data.client_id);
+                debug!("该订单不在本地挂单表中, order:{:?}", data);
             }
             // 在cid缓存队列中 说明是本策略的订单
             if self.local_orders_backup.contains_key(&data.client_id) {
@@ -365,6 +368,9 @@ impl Quant {
                     // 只有开仓成交才触发onPosition
                     // 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况
                     if filled > Decimal::ZERO {
+                        let filled_order = data.clone();
+                        info!("移除本地订单:{:?}", filled_order);
+
                         if self.exchange.contains("spot") { // 如果是现货交易 还需要修改equity
                             // 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
                             let fee = data.fee;
@@ -493,7 +499,7 @@ impl Quant {
                             let mut platform_rest_fb = self.platform_rest.clone_box();
                             // info!("订单指令:{:?}", order);
                             spawn(async move{
-                                info!("update_local_order订单指令:{:?}", order);
+                                // info!("update_local_order订单指令:{:?}", order);
                                 platform_rest_fb.command_order(order).await;
                             });
                         }
@@ -623,7 +629,7 @@ impl Quant {
                     let mut platform_rest_fb = self.platform_rest.clone_box();
                     // info!("订单指令:{:?}", orders);
                     spawn(async move{
-                        info!("_update_depth订单指令:{:?}", orders);
+                        // info!("_update_depth订单指令:{:?}", orders);
                         platform_rest_fb.command_order(orders).await;
                     });
                 }
@@ -646,8 +652,7 @@ impl Quant {
             }
         }
         // 更新仓位信息
-        info!("收到新的仓位推送");
-        info!(?data, ?position);
+        info!("收到新的仓位推送, position: {:?}, local_position: {:?}", data, position);
         if position != self.local_position {
             self.local_position = position;
             info!("更新本地仓位:{:?}", self.local_position);
@@ -1083,6 +1088,7 @@ impl Quant {
         info!("停机退出  停机原因: {}", self.exit_msg);
         // 发送交易状态 await self._post_params()
         // TODO: 向中控发送信号
+        self.running.store(false, Ordering::Relaxed);
         info!("退出进程!");
     }
 
@@ -1276,7 +1282,7 @@ pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {
                 // info!("Rest报单平均延迟{}ms", quant.rest.avg_delay);
                 // info!("Rest报单最高延迟{}ms", quant.rest.max_delay);
                 for (name, interval) in &quant.market_update_interval {
-                    info!("WS盘口{}行情平均更新间隔{}ms。", name, interval);
+                    debug!("WS盘口{}行情平均更新间隔{}ms。", name, interval);
                 }
             }
         }