Bladeren bron

查看进入协程之后是什么延迟。

skyffire 1 jaar geleden
bovenliggende
commit
75cee4870e
4 gewijzigde bestanden met toevoegingen van 109 en 111 verwijderingen
  1. 1 1
      standard/src/binance_swap.rs
  2. 7 9
      standard/src/gate_swap.rs
  3. 1 1
      standard/src/lib.rs
  4. 100 100
      strategy/src/core.rs

+ 1 - 1
standard/src/binance_swap.rs

@@ -345,7 +345,7 @@ impl Platform for BinanceSwap {
 
     async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "binance_swap:该交易所方法未实现".to_string())) }
 
-    async fn command_order(&mut self, _order_command: OrderCommand, _trace_stack: TraceStack) { warn!("binance_swap:该交易所方法未实现"); }
+    async fn command_order(&mut self, _order_command: &mut OrderCommand, _trace_stack: &TraceStack) { warn!("binance_swap:该交易所方法未实现"); }
 }
 
 pub fn format_position_item(position: &serde_json::Value, ct_val: Decimal) -> Position {

+ 7 - 9
standard/src/gate_swap.rs

@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, HashMap};
+use std::collections::{BTreeMap};
 use std::io::{Error, ErrorKind};
 use std::str::FromStr;
 use tokio::sync::mpsc::Sender;
@@ -510,10 +510,10 @@ impl Platform for GateSwap {
     }
 
     // 指令下单
-    async fn command_order(&mut self, order_command: OrderCommand, trace_stack: TraceStack) {
+    async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
         let mut handles = vec![];
         // 撤销订单
-        let cancel = order_command.cancel;
+        let cancel = &order_command.cancel;
         for item in cancel.keys() {
             let mut self_clone = self.clone();
             let cancel_clone = cancel.clone();
@@ -547,12 +547,10 @@ impl Platform for GateSwap {
             handles.push(handle)
         }
         // 下单指令
-        let mut limits = HashMap::new();
-        limits.extend(order_command.limits_open);
-        limits.extend(order_command.limits_close);
-        for item in limits.keys() {
+        order_command.limits_open.extend(order_command.limits_close.clone());
+        for item in order_command.limits_open.keys() {
             let mut self_clone = self.clone();
-            let limits_clone = limits.clone();
+            let limits_clone = order_command.limits_open.clone();
             let item_clone = item.clone();
             let result_sd = self.order_sender.clone();
             let err_sd = self.error_sender.clone();
@@ -586,7 +584,7 @@ impl Platform for GateSwap {
             handles.push(handle)
         }
         // 检查订单指令
-        let check = order_command.check;
+        let check = &order_command.check;
         for item in check.keys() {
             let mut self_clone = self.clone();
             let check_clone = check.clone();

+ 1 - 1
standard/src/lib.rs

@@ -560,5 +560,5 @@ pub trait Platform {
     // 交易账户互转
     async fn wallet_transfers(&mut self, coin: &str, from: &str, to: &str, amount: Decimal) -> Result<String, Error>;
     // 指令下单
-    async fn command_order(&mut self, order_command: OrderCommand, trace_stack: TraceStack);
+    async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack);
 }

+ 100 - 100
strategy/src/core.rs

@@ -375,7 +375,7 @@ impl Core {
                             if side == "kd" { // buy  开多
                                 self.local_buy_amount += filled - fee;
                                 self.local_buy_value += (filled - fee) * filled_price;
-                                let new_long_pos = self.local_position_by_orders.long_pos + filled  - fee;
+                                let new_long_pos = self.local_position_by_orders.long_pos + filled - fee;
                                 if new_long_pos == Decimal::ZERO {
                                     self.local_position_by_orders.long_avg = Decimal::ZERO;
                                     self.local_position_by_orders.long_pos = Decimal::ZERO;
@@ -494,14 +494,14 @@ impl Core {
                         // 更新策略时间
                         self.strategy.local_time = Utc::now().timestamp_millis();
                         // trace_stack.on_before_strategy();
-                        let order = self.strategy.on_tick(&self.local_orders,
-                                                          &self.local_position_by_orders,
-                                                          &self.agg_market,
-                                                          &self.local_cash,
-                                                          &self.local_coin,
-                                                          &self.ref_price,
-                                                          &self.predict,
-                                                          &trace_stack.ins);
+                        let mut order = self.strategy.on_tick(&self.local_orders,
+                                                              &self.local_position_by_orders,
+                                                              &self.agg_market,
+                                                              &self.local_cash,
+                                                              &self.local_coin,
+                                                              &self.ref_price,
+                                                              &self.predict,
+                                                              &trace_stack.ins);
                         // trace_stack.on_after_strategy();
                         // 记录指令触发信息
                         if order.is_not_empty() {
@@ -515,7 +515,7 @@ impl Core {
                             // ts.on_before_send();
                             // info!("update_order 订单指令:{:?}", order);
                             spawn(async move {
-                                platform_rest_fb.command_order(order, ts.clone()).await;
+                                platform_rest_fb.command_order(&mut order, &mut ts).await;
                             });
                         }
                     }
@@ -648,14 +648,14 @@ impl Core {
                 self.strategy.local_time = Utc::now().timestamp_millis();
 
                 // 产生交易信号
-                let orders = self.strategy.on_tick(&self.local_orders,
-                                                   &self.local_position_by_orders,
-                                                   &self.agg_market,
-                                                   &self.local_cash,
-                                                   &self.local_coin,
-                                                   &self.ref_price,
-                                                   &self.predict,
-                                                   &trace_stack.ins);
+                let mut orders = self.strategy.on_tick(&self.local_orders,
+                                                       &self.local_position_by_orders,
+                                                       &self.agg_market,
+                                                       &self.local_cash,
+                                                       &self.local_coin,
+                                                       &self.ref_price,
+                                                       &self.predict,
+                                                       &trace_stack.ins);
 
                 if orders.is_not_empty() {
                     // debug!("触发onTick");
@@ -664,11 +664,10 @@ impl Core {
                     let mut platform_rest_fb = self.platform_rest.clone_box();
                     // info!("订单指令:{:?}", orders);
                     self.update_trade_msg();
-                    TraceStack::show_delay(&trace_stack.ins);
-                    let mut ts = trace_stack.clone();
-                    ts.set_order_command(orders.to_string());
+                    let ts = trace_stack.clone();
                     spawn(async move {
-                        platform_rest_fb.command_order(orders, ts.clone()).await;
+                        TraceStack::show_delay(&ts.ins);
+                        platform_rest_fb.command_order(&mut orders, &ts).await;
                     });
 
                     // 更新中控账户相关信息
@@ -684,7 +683,6 @@ impl Core {
         }
 
         {
-
             let mut unrealized_pn_l = self.local_profit;
             unrealized_pn_l.rescale(4);
 
@@ -906,7 +904,7 @@ impl Core {
             Ok(val) => {
                 // info!("bybit_swap:定时获取的仓位信息");
                 self.update_position(val).await;
-            },
+            }
             Err(err) => {
                 error!("bybit_swap:定时获取仓位信息错误!\nget_position:res_data={:?}", err);
             }
@@ -1350,65 +1348,65 @@ impl Core {
                         }
                     };
                 }
-                    // match self.platform_rest.get_ticker_symbol(position.symbol.clone()).await {
-                    //     Ok(ticker) => {
-                    //         let ap = ticker.sell;
-                    //         let bp = ticker.buy;
-                    //         let mp = (ap + bp) / Decimal::TWO;
-                    //         let price;
-                    //         let side;
-                    //         let market_info;
-                    //         // 获取market
-                    //         match self.platform_rest.get_market_symbol(position.symbol.clone()).await {
-                    //             Ok(market) => {
-                    //                 market_info = market;
-                    //             }
-                    //             Err(err) => {
-                    //                 error!("    {} 获取当前market异常: {}", position.symbol.clone(), err);
-                    //                 continue;
-                    //             }
-                    //         }
-                    //         info!(?position);
-                    //         match position.position_mode {
-                    //             PositionModeEnum::Long => {
-                    //                 // pd
-                    //                 price = (mp * dec!(0.9985) / market_info.tick_size).floor() * market_info.tick_size;
-                    //                 side = "pd";
-                    //             }
-                    //             PositionModeEnum::Short => {
-                    //                 // pk
-                    //                 price = (mp * dec!(1.0015) / market_info.tick_size).floor() * market_info.tick_size;
-                    //                 side = "pk";
-                    //             }
-                    //             _ => {
-                    //                 error!("    仓位position_mode匹配失败,不做操作!");
-                    //                 // 执行完当前币对  结束循环
-                    //                 continue;
-                    //             }
-                    //         }
-                    //         // 发起清仓订单
-                    //         info!(?ticker);
-                    //         let mut ts = TraceStack::new(0, Instant::now());
-                    //         ts.on_before_send();
-                    //         match self.platform_rest.take_order_symbol(position.symbol.clone(), Decimal::ONE, utils::generate_client_id(None).as_str(), side, price, position.amount.abs()).await {
-                    //             Ok(order) => {
-                    //                 ts.on_after_send();
-                    //                 info!("    {}仓位清除下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
-                    //                 // 执行完当前币对  结束循环
-                    //                 continue;
-                    //             }
-                    //             Err(error) => {
-                    //                 ts.on_after_send();
-                    //                 error!("    {}仓位清除下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
-                    //                 // 执行完当前币对  结束循环
-                    //                 continue;
-                    //             }
-                    //         };
-                    //     }
-                    //     Err(err) => {
-                    //         error!("    {} 获取当前ticker异常: {}", position.symbol.clone(), err)
-                    //     }
-                    // }
+                // match self.platform_rest.get_ticker_symbol(position.symbol.clone()).await {
+                //     Ok(ticker) => {
+                //         let ap = ticker.sell;
+                //         let bp = ticker.buy;
+                //         let mp = (ap + bp) / Decimal::TWO;
+                //         let price;
+                //         let side;
+                //         let market_info;
+                //         // 获取market
+                //         match self.platform_rest.get_market_symbol(position.symbol.clone()).await {
+                //             Ok(market) => {
+                //                 market_info = market;
+                //             }
+                //             Err(err) => {
+                //                 error!("    {} 获取当前market异常: {}", position.symbol.clone(), err);
+                //                 continue;
+                //             }
+                //         }
+                //         info!(?position);
+                //         match position.position_mode {
+                //             PositionModeEnum::Long => {
+                //                 // pd
+                //                 price = (mp * dec!(0.9985) / market_info.tick_size).floor() * market_info.tick_size;
+                //                 side = "pd";
+                //             }
+                //             PositionModeEnum::Short => {
+                //                 // pk
+                //                 price = (mp * dec!(1.0015) / market_info.tick_size).floor() * market_info.tick_size;
+                //                 side = "pk";
+                //             }
+                //             _ => {
+                //                 error!("    仓位position_mode匹配失败,不做操作!");
+                //                 // 执行完当前币对  结束循环
+                //                 continue;
+                //             }
+                //         }
+                //         // 发起清仓订单
+                //         info!(?ticker);
+                //         let mut ts = TraceStack::new(0, Instant::now());
+                //         ts.on_before_send();
+                //         match self.platform_rest.take_order_symbol(position.symbol.clone(), Decimal::ONE, utils::generate_client_id(None).as_str(), side, price, position.amount.abs()).await {
+                //             Ok(order) => {
+                //                 ts.on_after_send();
+                //                 info!("    {}仓位清除下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
+                //                 // 执行完当前币对  结束循环
+                //                 continue;
+                //             }
+                //             Err(error) => {
+                //                 ts.on_after_send();
+                //                 error!("    {}仓位清除下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
+                //                 // 执行完当前币对  结束循环
+                //                 continue;
+                //             }
+                //         };
+                //     }
+                //     Err(err) => {
+                //         error!("    {} 获取当前ticker异常: {}", position.symbol.clone(), err)
+                //     }
+                // }
                 // }
             }
             Err(error) => {
@@ -1418,7 +1416,7 @@ impl Core {
             }
         }
 
-        return length
+        return length;
     }
 
 
@@ -1616,19 +1614,20 @@ pub fn run_strategy(core_arc: Arc<Mutex<Core>>) -> JoinHandle<()> {
                             let ref_price = core.ref_price.clone();
                             let predict = core.predict.clone();
 
-                            let orders = core.strategy.on_exit(&local_orders,
-                                                               &position,
-                                                               &agg_market,
-                                                               &local_cash,
-                                                               &local_coin,
-                                                               &ref_price,
-                                                               &predict);
+                            let mut orders = core.strategy.on_exit(&local_orders,
+                                                                   &position,
+                                                                   &agg_market,
+                                                                   &local_cash,
+                                                                   &local_coin,
+                                                                   &ref_price,
+                                                                   &predict);
                             if orders.is_not_empty() {
                                 info!("触发onExit");
                                 info!(?orders);
                                 core._update_local_orders(&orders);
                                 spawn(async move {
-                                    platform_rest_fb.command_order(orders, TraceStack::new(0, Instant::now())).await;
+                                    let mut ts = TraceStack::new(0, Instant::now());
+                                    platform_rest_fb.command_order(&mut orders, &mut ts).await;
                                 });
                             }
                         } else {
@@ -1641,20 +1640,21 @@ pub fn run_strategy(core_arc: Arc<Mutex<Core>>) -> JoinHandle<()> {
                             let ref_price = core.ref_price.clone();
                             let predict = core.predict.clone();
 
-                            let orders = core.strategy.on_sleep(&local_orders,
-                                                                &position,
-                                                                &agg_market,
-                                                                &local_cash,
-                                                                &local_coin,
-                                                                &ref_price,
-                                                                &predict);
+                            let mut orders = core.strategy.on_sleep(&local_orders,
+                                                                    &position,
+                                                                    &agg_market,
+                                                                    &local_cash,
+                                                                    &local_coin,
+                                                                    &ref_price,
+                                                                    &predict);
                             // 记录指令触发信息
                             if orders.is_not_empty() {
                                 info!("触发onSleep");
                                 info!(?orders);
                                 core._update_local_orders(&orders);
                                 spawn(async move {
-                                    platform_rest_fb.command_order(orders, TraceStack::new(0, Instant::now())).await;
+                                    let mut ts = TraceStack::new(0, Instant::now());
+                                    platform_rest_fb.command_order(&mut orders, &mut ts).await;
                                 });
                             }
                         }