skyffire 1 год назад
Родитель
Сommit
25f9466a73

+ 0 - 1
exchanges/Cargo.toml

@@ -40,7 +40,6 @@ rust_decimal_macros = "1.32.0"
 global = { path="../global" }
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
-log = "0.4.20"
 
 ##生成 xlsx
 rust_xlsxwriter = "0.58.0"

+ 41 - 6
exchanges/src/socket_tool.rs

@@ -13,7 +13,7 @@ use tokio::net::TcpStream;
 use tokio::sync::Mutex;
 use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
 use tokio_tungstenite::tungstenite::{Error, Message};
-use tracing::trace;
+use tracing::{info, trace};
 
 use crate::proxy;
 use crate::proxy::{ProxyEnum, ProxyResponseEnum};
@@ -85,12 +85,50 @@ impl AbstractWsMode {
                     let write_clone3 = Arc::clone(&write_arc);
                     let ws_to_stdout = async {
                         while let Some(message) = read.next().await {
+                            if !bool_v1.load(Ordering::Relaxed) {
+                                continue;
+                            }
+
                             let mut write_lock3 = write_clone3.lock().await;
                             let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
                             // let response_data = func(message);
                             if response_data.is_some() {
-                                let mut data = response_data.unwrap();
-                                data.label = lable.clone();
+                                let data = response_data.unwrap();
+
+                                if data.code == "200" {
+                                    let mut data_c = data.clone();
+                                    data_c.time = chrono::Utc::now().timestamp_micros();
+                                    data_c.label = lable.clone();
+
+                                    if data_c.label.contains("gate_usdt_swap") {
+                                        if data_c.channel == "futures.order_book" {
+                                            if read_tx.len() == 0 {
+                                                read_tx.unbounded_send(data_c).unwrap();
+                                            }
+                                        } else {
+                                            read_tx.unbounded_send(data_c).unwrap();
+                                        }
+                                    } else if data_c.label.contains("binance_usdt_swap") {
+                                        if data_c.channel == "bookTicker" {
+                                            if read_tx.len() == 0 {
+                                                read_tx.unbounded_send(data_c).unwrap();
+                                            }
+                                        } else {
+                                            read_tx.unbounded_send(data_c).unwrap();
+                                        }
+                                    } else if data_c.label.contains("bybit_usdt_swap") {
+                                        if data_c.channel == "orderbook" {
+                                            if read_tx.len() == 0 {
+                                                read_tx.unbounded_send(data_c).unwrap();
+                                            }
+                                        } else {
+                                            read_tx.unbounded_send(data_c).unwrap();
+                                        }
+                                    } else {
+                                        read_tx.unbounded_send(data_c).unwrap();
+                                    }
+                                }
+
                                 let code = data.code.clone();
                                 /*
                                     200 -正确返回
@@ -102,9 +140,6 @@ impl AbstractWsMode {
                                 */
                                 match code.as_str() {
                                     "200" => {
-                                        if bool_v1.load(Ordering::Relaxed) {
-                                            read_tx.unbounded_send(data).unwrap();
-                                        }
                                     }
                                     "-200" => {
                                         //登录成功

+ 4 - 4
global/src/trace_stack.rs

@@ -97,6 +97,10 @@ impl fmt::Display for TraceStack {
             msg.push_str(format!("订单来源:{} ", self.source).as_str());
         }
 
+        if self.after_network != 0 && self.before_send != 0 {
+            msg.push_str(format!("本地总耗时{}微秒  ", self.before_send - self.after_network).as_str());
+        }
+
         if self.before_network != 0 && self.after_network != 0 {
             msg.push_str(format!("数据生成+到达rust耗时{}毫秒  ", (self.after_network - self.before_network).to_f64().unwrap() / 1000.0).as_str());
         }
@@ -121,10 +125,6 @@ impl fmt::Display for TraceStack {
             msg.push_str(format!("发送订单耗时(发送-服务器处理-响应到本地){}毫秒  ", (self.after_send - self.before_send).to_f64().unwrap() / 1000.0).as_str());
         }
 
-        if self.after_network != 0 && self.before_send != 0 {
-            msg.push_str(format!("本地总耗时{}微秒  ", self.before_send - self.after_network).as_str());
-        }
-
         if self.after_send != 0 && self.after_network != 0 {
             msg.push_str(format!("总共耗时{}毫秒", (self.after_send - self.after_network).to_f64().unwrap() / 1000.0).as_str());
         }

+ 4 - 1
standard/src/gate_swap.rs

@@ -8,7 +8,7 @@ use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use serde_json::json;
 use futures::stream::FuturesUnordered;
 use futures::{TryStreamExt};
-use tracing::{error, debug, trace};
+use tracing::{error, debug, trace, info};
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
 use exchanges::gate_swap_rest::GateSwapRest;
 use global::trace_stack::TraceStack;
@@ -586,6 +586,9 @@ impl Platform for GateSwap {
                     Ok(mut result) => {
                         // 记录此订单完成时间
                         ts.on_after_send();
+
+                        info!("{}", ts.to_string());
+
                         result.trace_stack = ts;
 
                         result_sd.send(result).await.unwrap();

+ 1 - 1
strategy/src/binance_spot.rs

@@ -95,7 +95,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  _min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 1 - 1
strategy/src/binance_usdt_swap.rs

@@ -59,7 +59,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  _min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 2 - 2
strategy/src/bitget_spot.rs

@@ -130,7 +130,7 @@ pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>,
 async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
 
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -179,7 +179,7 @@ async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data
 async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
 
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 1 - 1
strategy/src/gate_swap.rs

@@ -101,7 +101,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 1 - 1
strategy/src/kucoin_spot.rs

@@ -77,7 +77,7 @@ async fn on_kucoin_spot_data(bot_arc_clone: Arc<Mutex<Quant>>,
                              min_sell: &mut Decimal,
                              data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 1 - 1
strategy/src/kucoin_swap.rs

@@ -116,7 +116,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 2 - 2
strategy/src/okx_usdt_swap.rs

@@ -109,7 +109,7 @@ pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
 async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
     let mut trace_stack = TraceStack::default();
 
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -162,7 +162,7 @@ async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data
 
 async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {