ソースを参照

bybit下单指令优化,以及位置类型识别。

skyffire 1 年間 前
コミット
258c9e8a05
2 ファイル変更56 行追加47 行削除
  1. 44 39
      standard/src/bybit_swap.rs
  2. 12 8
      strategy/src/bybit_usdt_swap.rs

+ 44 - 39
standard/src/bybit_swap.rs

@@ -10,14 +10,14 @@ use serde_json::{from_value, json, Value};
 use rust_decimal::prelude::FromPrimitive;
 use serde::{Deserialize, Serialize};
 use tokio::time::Instant;
-use tracing::{error, debug, trace, info};
+use tracing::{error, debug, trace};
 use exchanges::bybit_swap_rest::BybitSwapRest;
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
 use global::trace_stack::TraceStack;
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
 #[serde(rename_all = "camelCase")]
-struct SwapTicker{
+struct SwapTicker {
     symbol: String,
     high_price24h: Decimal,
     low_price24h: Decimal,
@@ -527,38 +527,8 @@ impl Platform for BybitSwap {
 
     // 指令下单
     async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
-        let mut handles = vec![];
-        // 撤销订单
-        for item in order_command.cancel.keys() {
-            let mut self_clone = self.clone();
-
-            let order_id = order_command.cancel[item].get(1).unwrap().clone();
-            let custom_id = order_command.cancel[item].get(0).unwrap().clone();
-
-            let handle = tokio::spawn(async move {
-                let result = self_clone.cancel_order(&order_id, &custom_id).await;
-                match result {
-                    Ok(_) => {
-                        // result_sd.send(result).await.unwrap();
-                    }
-                    Err(error) => {
-                        // 取消失败去查订单。
-                        let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
-                        match query_rst {
-                            Ok(order) => {
-                                self_clone.order_sender.send(order).await.unwrap();
-                            }
-                            Err(_err) => {
-                                error!("bybit:撤单失败,而且查单也失败了,oid={}, cid={}。", order_id.clone(), custom_id.clone());
-                            }
-                        }
-                        self_clone.error_sender.send(error).await.unwrap();
-                    }
-                }
-            });
-            handles.push(handle)
-        }
         // 下单指令
+        let mut handles = vec![];
         order_command.limits_open.extend(order_command.limits_close.clone());
         for item in order_command.limits_open.keys() {
             let mut self_clone = self.clone();
@@ -570,16 +540,14 @@ impl Platform for BybitSwap {
 
             let mut ts = trace_stack.clone();
 
-            TraceStack::show_delay(&ts.ins);
 
             let handle = tokio::spawn(async move {
                 ts.on_before_send();
+                TraceStack::show_delay(&ts.ins);
                 let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
                 ts.on_after_send();
                 match result {
                     Ok(mut result) => {
-                        info!("{}", ts.to_string());
-
                         // 记录此订单完成时间
                         result.trace_stack = ts;
 
@@ -599,7 +567,45 @@ impl Platform for BybitSwap {
             });
             handles.push(handle)
         }
+        let futures = FuturesUnordered::from_iter(handles);
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
+        // 撤销订单
+        let mut cancel_handles = vec![];
+        for item in order_command.cancel.keys() {
+            let mut self_clone = self.clone();
+
+            let order_id = order_command.cancel[item].get(1).unwrap().clone();
+            let custom_id = order_command.cancel[item].get(0).unwrap().clone();
+
+            let handle = tokio::spawn(async move {
+                let result = self_clone.cancel_order(&order_id, &custom_id).await;
+                match result {
+                    Ok(_) => {
+                        // result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        // 取消失败去查订单。
+                        let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
+                        match query_rst {
+                            Ok(order) => {
+                                self_clone.order_sender.send(order).await.unwrap();
+                            }
+                            Err(_err) => {
+                                error!("bybit:撤单失败,而且查单也失败了,oid={}, cid={}。", order_id.clone(), custom_id.clone());
+                            }
+                        }
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            cancel_handles.push(handle)
+        }
+        let futures = FuturesUnordered::from_iter(cancel_handles);
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
         // 检查订单指令
+        let mut check_handles = vec![];
         for item in order_command.check.keys() {
             let mut self_clone = self.clone();
 
@@ -617,10 +623,9 @@ impl Platform for BybitSwap {
                     }
                 }
             });
-            handles.push(handle)
+            check_handles.push(handle)
         }
-
-        let futures = FuturesUnordered::from_iter(handles);
+        let futures = FuturesUnordered::from_iter(check_handles);
         let _: Result<Vec<_>, _> = futures.try_collect().await;
     }
 }

+ 12 - 8
strategy/src/bybit_usdt_swap.rs

@@ -6,6 +6,7 @@ use rust_decimal::Decimal;
 use tokio::{spawn, time};
 use tokio::sync::Mutex;
 use tokio::time::Instant;
+use tracing::{error};
 use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
@@ -110,22 +111,22 @@ pub async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
     }
 }
 
-async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, data: ResponseData) {
-    let mut trace_stack = TraceStack::new(data.time, data.ins);
+async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: ResponseData) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
-    if data.code.as_str() != "200" {
+    if response.code.as_str() != "200" {
         return;
     }
 
-    match data.channel.as_str() {
+    match response.channel.as_str() {
         "wallet" => {
-            let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, &data, run_symbol);
+            let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, &response, run_symbol);
             let mut core = core_arc_clone.lock().await;
             core.update_equity(account).await;
         }
         "order" => {
-            let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, data.clone(), ct_val.clone());
+            let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, response.clone(), ct_val.clone());
             trace_stack.on_after_format();
 
             let mut order_infos:Vec<OrderInfo> = Vec::new();
@@ -155,11 +156,14 @@ async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run
             core.update_order(order_infos, trace_stack).await;
         }
         "position" => {
-            let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap, &data, ct_val);
+            let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap, &response, ct_val);
             let mut core = core_arc_clone.lock().await;
             core.update_position(positions).await;
         }
-        _ => {}
+        _ => {
+            error!("未知推送类型");
+            error!(?response);
+        }
     }
 }