Pārlūkot izejas kodu

测试bybit的延迟,看看有join和没有join的差别。

skyffire 1 gadu atpakaļ
vecāks
revīzija
e1b9e20630
2 mainītis faili ar 99 papildinājumiem un 109 dzēšanām
  1. 99 106
      standard/src/bybit_swap.rs
  2. 0 3
      strategy/src/bybit_usdt_swap.rs

+ 99 - 106
standard/src/bybit_swap.rs

@@ -3,12 +3,14 @@ use std::io::{Error, ErrorKind};
 use std::str::FromStr;
 use tokio::sync::mpsc::Sender;
 use async_trait::async_trait;
+use futures::stream::FuturesUnordered;
+use futures::TryStreamExt;
 use rust_decimal::Decimal;
 use serde_json::{from_value, json, Value};
 use rust_decimal::prelude::FromPrimitive;
 use serde::{Deserialize, Serialize};
 use tokio::time::Instant;
-use tracing::{error, debug, trace};
+use tracing::{error, debug, trace, info};
 use exchanges::bybit_swap_rest::BybitSwapRest;
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
 use global::trace_stack::TraceStack;
@@ -524,111 +526,102 @@ impl Platform for BybitSwap {
     }
 
     // 指令下单
-    async fn command_order(&mut self, _order_command: &mut OrderCommand, _trace_stack: &TraceStack) {
-        // let mut handles = vec![];
-        // // 撤销订单
-        // let cancel = order_command.cancel;
-        // for item in cancel.keys() {
-        //     let mut self_clone = self.clone();
-        //     let cancel_clone = cancel.clone();
-        //     let item_clone = item.clone();
-        //     let order_id = cancel_clone.get(&item_clone).unwrap().get(1).unwrap_or(&"".to_string()).clone();
-        //     let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-        //     let result_sd = self.order_sender.clone();
-        //     let err_sd = self.error_sender.clone();
-        //     let handle = tokio::spawn(async move {
-        //         let result = self_clone.cancel_order(&order_id, &custom_id).await;
-        //         match result {
-        //             Ok(_) => {
-        //                 // result_sd.send(result).await.unwrap();
-        //             }
-        //             Err(error) => {
-        //                 // 取消失败去查订单。
-        //                 let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
-        //                 match query_rst {
-        //                     Ok(order) => {
-        //                         result_sd.send(order).await.unwrap();
-        //                     }
-        //                     Err(_err) => {
-        //                         error!("bybit:撤单失败,而且查单也失败了,oid={}, cid={}。", order_id.clone(), custom_id.clone());
-        //                         // panic!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
-        //                     }
-        //                 }
-        //                 err_sd.send(error).await.unwrap();
-        //             }
-        //         }
-        //     });
-        //     handles.push(handle)
-        // }
-        // // 下单指令
-        // let mut limits = HashMap::new();
-        // limits.extend(order_command.limits_open);
-        // limits.extend(order_command.limits_close);
-        // for item in limits.keys() {
-        //     let mut self_clone = self.clone();
-        //     let limits_clone = limits.clone();
-        //     let item_clone = item.clone();
-        //     let result_sd = self.order_sender.clone();
-        //     let err_sd = self.error_sender.clone();
-        //     let ts = trace_stack.clone();
-        //
-        //     let handle = tokio::spawn(async move {
-        //         let value = limits_clone[&item_clone].clone();
-        //         let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
-        //         let side = value.get(1).unwrap();
-        //         let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
-        //         let cid = value.get(3).unwrap();
-        //
-        //         //  order_name: [数量,方向,价格,c_id]
-        //         let result = self_clone.take_order(cid, side, price, amount).await;
-        //         match result {
-        //             Ok(mut result) => {
-        //                 // 记录此订单完成时间
-        //                 // ts.on_after_send();
-        //                 result.trace_stack = ts;
-        //
-        //                 result_sd.send(result).await.unwrap();
-        //             }
-        //             Err(error) => {
-        //                 error!("bybit:下单失败:{:?}", error);
-        //
-        //                 let mut err_order = Order::new();
-        //                 err_order.custom_id = cid.clone();
-        //                 err_order.status = "REMOVE".to_string();
-        //
-        //                 result_sd.send(err_order).await.unwrap();
-        //                 err_sd.send(error).await.unwrap();
-        //             }
-        //         }
-        //     });
-        //     handles.push(handle)
-        // }
-        // // 检查订单指令
-        // let check = order_command.check;
-        // for item in check.keys() {
-        //     let mut self_clone = self.clone();
-        //     let check_clone = check.clone();
-        //     let item_clone = item.clone();
-        //     let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
-        //     let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-        //     let result_sd = self.order_sender.clone();
-        //     let err_sd = self.error_sender.clone();
-        //     let handle = tokio::spawn(async move {
-        //         let result = self_clone.get_order_detail(&order_id, &custom_id).await;
-        //         match result {
-        //             Ok(result) => {
-        //                 result_sd.send(result).await.unwrap();
-        //             }
-        //             Err(error) => {
-        //                 err_sd.send(error).await.unwrap();
-        //             }
-        //         }
-        //     });
-        //     handles.push(handle)
-        // }
-        //
-        // let futures = FuturesUnordered::from_iter(handles);
-        // let _: Result<Vec<_>, _> = futures.try_collect().await;
+    async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
+        let mut handles = vec![];
+        // 撤销订单
+        for item in order_command.cancel.keys() {
+            let mut self_clone = self.clone();
+
+            let order_id = order_command.cancel[item].get(1).unwrap().clone();
+            let custom_id = order_command.cancel[item].get(0).unwrap().clone();
+
+            let handle = tokio::spawn(async move {
+                let result = self_clone.cancel_order(&order_id, &custom_id).await;
+                match result {
+                    Ok(_) => {
+                        // result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        // 取消失败去查订单。
+                        let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
+                        match query_rst {
+                            Ok(order) => {
+                                self_clone.order_sender.send(order).await.unwrap();
+                            }
+                            Err(_err) => {
+                                error!("bybit:撤单失败,而且查单也失败了,oid={}, cid={}。", order_id.clone(), custom_id.clone());
+                            }
+                        }
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+        // 下单指令
+        order_command.limits_open.extend(order_command.limits_close.clone());
+        for item in order_command.limits_open.keys() {
+            let mut self_clone = self.clone();
+
+            let amount = Decimal::from_str(order_command.limits_open[item].get(0).unwrap_or(&"0".to_string())).unwrap();
+            let side = order_command.limits_open[item].get(1).unwrap().clone();
+            let price = Decimal::from_str(order_command.limits_open[item].get(2).unwrap_or(&"0".to_string())).unwrap();
+            let cid = order_command.limits_open[item].get(3).unwrap().clone();
+
+            let mut ts = trace_stack.clone();
+
+            TraceStack::show_delay(&ts.ins);
+
+            let handle = tokio::spawn(async move {
+                ts.on_before_send();
+                let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
+                ts.on_after_send();
+                match result {
+                    Ok(mut result) => {
+                        info!("{}", ts.to_string());
+
+                        // 记录此订单完成时间
+                        result.trace_stack = ts;
+
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        error!("bybit:下单失败:{:?}", error);
+
+                        let mut err_order = Order::new();
+                        err_order.custom_id = cid.clone();
+                        err_order.status = "REMOVE".to_string();
+
+                        self_clone.order_sender.send(err_order).await.unwrap();
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+        // 检查订单指令
+        for item in order_command.check.keys() {
+            let mut self_clone = self.clone();
+
+            let order_id = order_command.check[item].get(1).unwrap().clone();
+            let custom_id = order_command.check[item].get(0).unwrap().clone();
+
+            let handle = tokio::spawn(async move {
+                let result = self_clone.get_order_detail(&order_id, &custom_id).await;
+                match result {
+                    Ok(result) => {
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+
+        let futures = FuturesUnordered::from_iter(handles);
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
     }
 }
 

+ 0 - 3
strategy/src/bybit_usdt_swap.rs

@@ -6,7 +6,6 @@ use rust_decimal::Decimal;
 use tokio::{spawn, time};
 use tokio::sync::Mutex;
 use tokio::time::Instant;
-use tracing::info;
 use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
@@ -180,8 +179,6 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut De
             let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BybitSwap, &response);
             trace_stack.on_after_format();
 
-            info!("{}", trace_stack.to_string());
-
             on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
         }
         "orderbook" => {