Browse Source

修复quant被锁住100ms的问题。

skyfffire 2 years ago
parent
commit
e891590dfa
4 changed files with 97 additions and 92 deletions
  1. 1 0
      Cargo.toml
  2. 25 25
      src/main.rs
  3. 68 64
      strategy/src/quant.rs
  4. 3 3
      strategy/src/strategy.rs

+ 1 - 0
Cargo.toml

@@ -11,6 +11,7 @@ standard = { path="./standard" }
 exchanges = { path="./exchanges" }
 global = { path="./global" }
 tokio = { version = "1.31.0", features = ["full"] }
+chrono = "0.4.26"
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 serde = { version = "1.0.188", features = ["derive"] }

+ 25 - 25
src/main.rs

@@ -55,26 +55,26 @@ async fn main() {
         loop {
             match order_receiver.recv().await {
                 Some(order) => {
-                    let mut quant = order_handler_quant_arc.lock().await;
-
-                    let mut order_info = OrderInfo {
-                        symbol: "".to_string(),
-                        amount: Default::default(),
-                        side: "".to_string(),
-                        price: Default::default(),
-                        client_id: order.custom_id,
-                        filled_price: Default::default(),
-                        filled: Default::default(),
-                        order_id: order.id,
-                        local_time: 0,
-                        create_time: 0,
-                        status: order.status,
-                        fee: Default::default(),
-                    };
-
-                    quant.update_local_order(order_info.clone());
-
-                    info!("收到新的order{:?}", order_info.clone());
+                    {
+                        let mut quant = order_handler_quant_arc.lock().await;
+
+                        let mut order_info = OrderInfo {
+                            symbol: "".to_string(),
+                            amount: Default::default(),
+                            side: "".to_string(),
+                            price: Default::default(),
+                            client_id: order.custom_id,
+                            filled_price: Default::default(),
+                            filled: Default::default(),
+                            order_id: order.id,
+                            local_time: 0,
+                            create_time: 0,
+                            status: order.status,
+                            fee: Default::default(),
+                        };
+
+                        quant.update_local_order(order_info.clone());
+                    }
                 },
                 None => {
                     error!("Order channel has been closed!");
@@ -83,12 +83,12 @@ async fn main() {
         }
     });
 
-    let error_handler_quant_arc = quant_arc.clone();
+    // let error_handler_quant_arc = quant_arc.clone();
     let error_handler_thread = tokio::spawn(async move {
         loop {
             match error_receiver.recv().await {
                 Some(error) => {
-                    let quant = error_handler_quant_arc.lock().await;
+                    // let quant = error_handler_quant_arc.lock().await;
 
                     error!("main: 订单出现错误{:?}", error);
                 },
@@ -100,9 +100,9 @@ async fn main() {
     });
 
     let server_thread = tokio::spawn(async move {
-        let server = server::run_server(5566, quant_arc);
-        info!("中控服务已运行。");
-        server.await
+        // let server = server::run_server(5566, quant_arc);
+        // info!("中控服务已运行。");
+        // server.await
     });
 
     try_join!(order_handler_thread, error_handler_thread, server_thread).unwrap();

+ 68 - 64
strategy/src/quant.rs

@@ -310,6 +310,7 @@ impl Quant {
         REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
         为了防止下单失败依然有订单成交 本地需要做一个缓存
     */
+        info!(?data);
         // 触发订单更新
         self.trade_order_update_time = Utc::now().timestamp_millis();
         // 新增订单推送 仅需要cid oid信息
@@ -468,29 +469,29 @@ impl Quant {
                         let order = self.strategy.on_time(&self.trade_msg);
                         // 记录指令触发信息
                         if order.is_not_empty() {
-                            print!("触发onOrder");
+                            info!("触发onOrder");
                             self._update_local_orders(&order);
                             //交易所处理订单信号
                             let mut platform_rest_fb = self.platform_rest.clone_box();
-                            info!("订单指令:{:?}", order);
+                            // info!("订单指令:{:?}", order);
                             tokio::spawn(async move{
-                                info!("订单指令:{:?}", order);
+                                info!("update_local_order订单指令:{:?}", order);
                                 platform_rest_fb.command_order(order).await;
                             });
                         }
                     }
                 }
             } else {
-                info!("订单不属于本策略 拒绝进行仓位计算: {}", data.client_id);
+                debug!("订单不属于本策略 拒绝进行仓位计算: {}", data.client_id);
             }
             if self.local_orders.contains_key(&data.client_id) {
-                info!("删除本地订单, client_id:{}", data.client_id);
+                debug!("删除本地订单, client_id:{}", data.client_id);
                 self.local_orders.remove(&data.client_id);
             } else {
-                info!("该订单不在本地挂单表中, client_id:{}", data.client_id);
+                debug!("该订单不在本地挂单表中, client_id:{}", data.client_id);
             }
         } else {
-            info!("未知的订单事件类型:{:?}", data);
+            error!("未知的订单事件类型:{:?}", data);
         }
     }
 
@@ -604,14 +605,14 @@ impl Quant {
                 // 产生交易信号
                 let orders = self.strategy.on_time(&self.trade_msg);
                 if orders.is_not_empty() {
-                    info!("触发onTick");
+                    debug!("触发onTick");
                     self._update_local_orders(&orders);
 
                     //异步交易所处理订单信号
                     let mut platform_rest_fb = self.platform_rest.clone_box();
-                    info!("订单指令:{:?}", orders);
+                    // info!("订单指令:{:?}", orders);
                     tokio::spawn(async move{
-                        info!("订单指令:{:?}", orders);
+                        info!("_update_depth订单指令:{:?}", orders);
                         platform_rest_fb.command_order(orders).await;
                     });
                 }
@@ -1070,55 +1071,57 @@ pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
         sleep(Duration::from_secs(10)).await;
         loop {
             let start_time = Utc::now().timestamp_millis();
-            let mut quant = quant_arc.lock().await;
-            if quant.ready == 1 {
-                // 更新交易信息集合
-                quant.update_trade_msg();
-                if quant.mode_signal != 0 {
-                    if quant.mode_signal > 1 {
-                        quant.mode_signal -= 1;
-                    }
-                    if quant.mode_signal == 1 {
-                        return;
-                    }
-                    // 触发策略  更新策略时间
-                    quant.strategy.local_time = Utc::now().timestamp_millis();
-                    let trade_msg = quant.trade_msg.clone();
-                    let mut platform_rest_fb = quant.platform_rest.clone_box();
-                    // 获取信号
-                    if quant.mode_signal > 20 {
-                        // 先执行onExit
-                        let orders = quant.strategy.on_exit(&trade_msg);
-                        if orders.is_not_empty() {
-                            info!("触发onExit");
-                            quant._update_local_orders(&orders);
-                            tokio::spawn(async move {
-                                platform_rest_fb.command_order(orders).await;
-                            });
+            let mut delay = 1u64;
+            {
+                let mut quant = quant_arc.lock().await;
+                if quant.ready == 1 {
+                    // 更新交易信息集合
+                    quant.update_trade_msg();
+                    if quant.mode_signal != 0 {
+                        if quant.mode_signal > 1 {
+                            quant.mode_signal -= 1;
                         }
-                    } else {
-                        // 再执行onSleep
-                        let orders = quant.strategy.on_sleep(&trade_msg);
-                        // 记录指令触发信息
-                        if orders.is_not_empty() {
-                            quant._update_local_orders(&orders);
-                            tokio::spawn(async move{
-                                platform_rest_fb.command_order(orders).await;
-                            });
+                        if quant.mode_signal == 1 {
+                            return;
+                        }
+                        // 触发策略  更新策略时间
+                        quant.strategy.local_time = Utc::now().timestamp_millis();
+                        let trade_msg = quant.trade_msg.clone();
+                        let mut platform_rest_fb = quant.platform_rest.clone_box();
+                        // 获取信号
+                        if quant.mode_signal > 20 {
+                            // 先执行onExit
+                            let orders = quant.strategy.on_exit(&trade_msg);
+                            if orders.is_not_empty() {
+                                info!("触发onExit");
+                                quant._update_local_orders(&orders);
+                                tokio::spawn(async move {
+                                    platform_rest_fb.command_order(orders).await;
+                                });
+                            }
+                        } else {
+                            // 再执行onSleep
+                            let orders = quant.strategy.on_sleep(&trade_msg);
+                            // 记录指令触发信息
+                            if orders.is_not_empty() {
+                                quant._update_local_orders(&orders);
+                                tokio::spawn(async move {
+                                    platform_rest_fb.command_order(orders).await;
+                                });
+                            }
                         }
                     }
+                } else {
+                    quant.check_ready();
+                }
+                // 计算耗时并进行休眠
+                let pass_time = Utc::now().timestamp_millis() - start_time;
+                if pass_time < quant.interval.to_i64().unwrap() {
+                    delay = quant.interval.to_u64().unwrap() - pass_time.to_u64().unwrap();
                 }
-            } else {
-                quant.check_ready();
-            }
-            // 计算耗时并进行休眠
-            let pass_time = Utc::now().timestamp_millis() - start_time;
-            let mut delay = 1u64;
-            if pass_time < quant.interval.to_i64().unwrap() {
-                delay = quant.interval.to_u64().unwrap() - pass_time.to_u64().unwrap();
             }
             sleep(Duration::from_millis(delay)).await;
-           }
+        }
     });
 }
 pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
@@ -1159,7 +1162,7 @@ pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols
         gate_exc.set_subscribe(vec![
             GateSubscribeType::PuFuturesTrades,
             GateSubscribeType::PuFuturesOrderBook,
-            GateSubscribeType::PrFuturesOrders(user_id.clone()),
+            // GateSubscribeType::PrFuturesOrders(user_id.clone()),
             GateSubscribeType::PrFuturesPositions(user_id.clone()),
             GateSubscribeType::PrFuturesBalances(user_id.clone()),
         ]);
@@ -1231,8 +1234,8 @@ pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols
                         }
                     } else if data.channel == "futures.trades" {
                         let mut quant = bot_arc_clone.lock().await;
-                        let str = data.lable.clone();
-                        if quant.is_update.contains_key(&data.lable) && *quant.is_update.get(str.as_str()).unwrap(){
+                        let str = data.label.clone();
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
                             max_buy = Decimal::ZERO;
                             min_sell = Decimal::ZERO;
                             quant.is_update.remove(str.as_str());
@@ -1246,7 +1249,7 @@ pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols
                                 min_sell = trade.price
                             }
                         }
-                        quant.max_buy_min_sell_cache.insert(data.lable, vec![max_buy, min_sell]);
+                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
                     }
                 },
                 None => {
@@ -1263,7 +1266,7 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
     let (tx, mut rx) = channel(100);
     tokio::spawn( async move {
         let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_param, BinanceWsType::PublicAndPrivate, tx);
-        ba_exc.set_subscribe(vec![BinanceSubscribeType::PuBookTicker, BinanceSubscribeType::PuAggTrade]);
+        ba_exc.set_subscribe(vec![BinanceSubscribeType::PuDepth20levels100ms, BinanceSubscribeType::PuAggTrade]);
         ba_exc.custom_subscribe(symbol.clone()).await;
     });
     tokio::spawn(async move {
@@ -1281,9 +1284,9 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
                     }
                     if data.channel == "aggTrade" {
                         let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-                        let str = data.lable.clone();
+                        let str = data.label.clone();
                         let mut quant = bot_arc_clone.lock().await;
-                        if quant.is_update.contains_key(&data.lable) && *quant.is_update.get(str.as_str()).unwrap(){
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
                             max_buy = Decimal::ZERO;
                             min_sell = Decimal::ZERO;
                             quant.is_update.remove(str.as_str());
@@ -1295,7 +1298,7 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
                             min_sell = trade.p
                         }
                         {
-                            quant.max_buy_min_sell_cache.insert(data.lable, vec![max_buy, min_sell]);
+                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
                         }
                     } else if data.channel == "bookTicker" {
                         let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
@@ -1306,10 +1309,10 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
                                     sell: ticker.a.clone(),
                                     buy: ticker.b.clone(),
                                     mid_price: Default::default(),
-                                }, data.lable.clone());
+                                }, data.label.clone());
                                 let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
-                                quant._update_depth(depth.clone(), data.lable.clone());
-                                quant.local_depths.insert(data.lable.clone(), depth);
+                                quant._update_depth(depth.clone(), data.label.clone());
+                                quant.local_depths.insert(data.label.clone(), depth);
                             }
                         } else {
                             update_flag_u = ticker.u;
@@ -1319,6 +1322,7 @@ pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<S
                         {
                             let mut quant = bot_arc_clone.lock().await;
                             quant._update_depth(depth.depth.clone(), depth.name.clone());
+                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
                             quant.local_depths.insert(depth.name, depth.depth);
                         }
                     }

+ 3 - 3
strategy/src/strategy.rs

@@ -569,7 +569,7 @@ impl Strategy {
             }
 
             // 等待超时,就移除正在撤单队列
-            info!("移除取消队列:{}", client_id.clone());
+            debug!("等待超过后移除正在撤单队列:{}", client_id.clone());
             to_remove.push(client_id.clone());
         }
 
@@ -912,7 +912,7 @@ impl Strategy {
             // 开多订单处理
             if order.side == "kd".to_string() {
                 // 在价格范围内时不处理
-                if order.price < long_upper && order.price > long_lower {
+                if order.price <= long_upper && order.price >= long_lower {
                     continue
                 }
                 debug!(?key, ?order.price, ?long_upper, ?long_lower);
@@ -922,7 +922,7 @@ impl Strategy {
             // 开空订单处理
             if order.side == "kk".to_string() {
                 // 在价格范围内时不处理
-                if order.price > short_lower && order.price < short_upper {
+                if order.price >= short_lower && order.price <= short_upper {
                     continue
                 }
                 debug!(?key, ?order.price, ?short_lower, ?short_upper);