Преглед на файлове

新版TraceStack上线,希望能解决那个延时越来越大的bug。

skyffire преди 1 година
родител
ревизия
6476c5fd35
променени са 6 файла, в които са добавени 78 реда и са изтрити 48 реда
  1. 57 38
      global/src/trace_stack.rs
  2. 3 1
      standard/src/gate_swap.rs
  3. 6 2
      strategy/src/binance_usdt_swap.rs
  4. 3 3
      strategy/src/core.rs
  5. 3 2
      strategy/src/exchange_disguise.rs
  6. 6 2
      strategy/src/gate_swap.rs

+ 57 - 38
global/src/trace_stack.rs

@@ -9,20 +9,19 @@ use tracing::info;
 pub struct TraceStack {
     pub before_network: i64,                // 官方数据生成时间
     pub after_network: i64,                 // 到达网络层时间
-    pub ins: Instant,                       // 到达网络层的Instant
 
-    pub after_span_line: i64,               // 跨线程之后
+    pub ins: Instant,                       // 到达网络层的Instant,用于本地各层耗时计算。
 
-    pub before_unlock_core: i64,            // 解锁core之前的时间
+    pub after_span_line: u128,              // 跨线程之后
 
-    pub before_format: i64,                 // 开始格式化时间
-    pub after_format: i64,                  // 结束格式化时间
+    pub after_unlock_core: u128,            // 解锁core之后
 
-    pub before_strategy: i64,               // 计算层开始时间
-    pub after_strategy: i64,                // 计算层结束时间
+    pub after_format: u128,                 // 结束格式化之后
 
-    pub before_send: i64,                   // 发送指令时时间
-    pub after_send: i64,                    // 发送指令结束时间
+    pub after_strategy: u128,               // 逻辑层结束
+
+    pub before_send: u128,                  // 发送指令时时间
+    pub after_send: u128,                   // 发送指令结束时间
 
     pub source: String,                     // 订单来源[depth|order]
     pub order_command: String,              // 订单发送时的command
@@ -35,10 +34,8 @@ impl TraceStack {
             after_network,
             ins: after_network_instant,
             after_span_line: 0,
-            before_unlock_core: 0,
-            before_format: 0,
+            after_unlock_core: 0,
             after_format: 0,
-            before_strategy: 0,
             after_strategy: 0,
             before_send: 0,
             after_send: 0,
@@ -80,6 +77,30 @@ impl TraceStack {
         self.after_network = after_network;
     }
 
+    pub fn on_after_span_line(&mut self) {
+        self.after_span_line = self.ins.elapsed().as_nanos();
+    }
+
+    pub fn on_after_unlock_core(&mut self) {
+        self.after_unlock_core = self.ins.elapsed().as_nanos();
+    }
+
+    pub fn on_after_format(&mut self) {
+        self.after_format = self.ins.elapsed().as_nanos();
+    }
+
+    pub fn on_after_strategy(&mut self) {
+        self.after_strategy = self.ins.elapsed().as_nanos();
+    }
+
+    pub fn on_before_send(&mut self) {
+        self.before_send = self.ins.elapsed().as_nanos();
+    }
+
+    pub fn on_after_send(&mut self) {
+        self.after_send = self.ins.elapsed().as_nanos();
+    }
+
     pub fn set_order_command(&mut self, command_str: String) {
         self.order_command = command_str;
     }
@@ -104,36 +125,34 @@ impl fmt::Display for TraceStack {
             msg.push_str(format!("订单来源:{} ", self.source).as_str());
         }
 
-        msg.push_str(format!("本地总耗时{}us  ", self.ins.elapsed().as_micros()).as_str());
+        if self.before_send != 0 {
+            msg.push_str(format!("本地总耗时{}ns  ", self.before_send).as_str());
+        }
 
         if self.before_network != 0 && self.after_network != 0 {
-            msg.push_str(format!("数据生成+到达rust耗时{}毫秒  ", (self.after_network - self.before_network).to_f64().unwrap() / 1000.0).as_str());
+            msg.push_str(format!("数据到达rust耗时{}ms  ", (self.after_network - self.before_network).to_f64().unwrap() / 1000.0).as_str());
+        }
+
+        if self.after_format != 0 {
+            msg.push_str(format!("跨协程完毕{}ns  ", self.after_span_line).as_str());
+        }
+
+        if self.after_format != 0 {
+            msg.push_str(format!("数据格式化完毕{}ns  ", self.after_format).as_str());
+        }
+
+        if self.after_format != 0 {
+            msg.push_str(format!("解锁完毕{}ns  ", self.after_unlock_core).as_str());
+        }
+
+        if self.after_strategy != 0 {
+            msg.push_str(format!("计算层完毕{}ns  ", self.after_strategy).as_str());
+        }
+
+        if self.after_send != 0 && self.before_send != 0 {
+            msg.push_str(format!("发送订单耗时(发送-服务器处理-响应到本地){}ms  ", (self.after_send - self.before_send).to_f64().unwrap() / 1000.0).as_str());
         }
 
-        // if self.after_format != 0 && self.before_format != 0 {
-        //     msg.push_str(format!("数据格式化耗时{}us  ", self.after_format - self.before_format).as_str());
-        // }
-        //
-        // if self.after_core != 0 && self.before_core != 0 {
-        //     msg.push_str(format!("core执行耗时{}us  ", self.after_core - self.before_core).as_str());
-        // }
-        //
-        // if self.after_strategy != 0 && self.before_strategy != 0 {
-        //     msg.push_str(format!("计算层执行耗时{}us  ", self.after_strategy - self.before_strategy).as_str());
-        // }
-        //
-        // if self.after_network != 0 && self.after_strategy != 0 {
-        //     msg.push_str(format!("core核心全部走完{}us  ", self.after_strategy - self.after_network).as_str());
-        // }
-        //
-        // if self.before_send != 0 && self.before_send_thread != 0 {
-        //     msg.push_str(format!("进入发单协程耗时{}us  ", self.before_send - self.before_send_thread).as_str());
-        // }
-        //
-        // if self.after_send != 0 && self.before_send != 0 {
-        //     msg.push_str(format!("发送订单耗时(发送-服务器处理-响应到本地){}毫秒  ", (self.after_send - self.before_send).to_f64().unwrap() / 1000.0).as_str());
-        // }
-        //
         // if self.after_send != 0 && self.after_network != 0 {
         //     msg.push_str(format!("总共耗时{}毫秒", (self.after_send - self.after_network).to_f64().unwrap() / 1000.0).as_str());
         // }

+ 3 - 1
standard/src/gate_swap.rs

@@ -542,7 +542,7 @@ impl Platform for GateSwap {
         // 下单指令
         order_command.limits_open.extend(order_command.limits_close.clone());
         for item in order_command.limits_open.keys() {
-            let ts = trace_stack.clone();
+            let mut ts = trace_stack.clone();
 
             let amount = Decimal::from_str(&*order_command.limits_open[item].get(0).unwrap().clone()).unwrap();
             let side = order_command.limits_open[item].get(1).unwrap().clone();
@@ -553,9 +553,11 @@ impl Platform for GateSwap {
             let mut self_clone = self.clone();
             TraceStack::show_delay(&ts.ins);
             spawn(async move {
+                ts.on_before_send();
                 let result = self_clone.take_order(&cid, &side, price, amount).await;
                 match result {
                     Ok(mut result) => {
+                        ts.on_after_send();
                         info!("{}", ts.to_string());
 
                         result.trace_stack = ts;

+ 6 - 2
strategy/src/binance_usdt_swap.rs

@@ -57,6 +57,8 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
     }
 
     let mut trace_stack = TraceStack::new(data.time, data.ins);
+    trace_stack.on_after_span_line();
+
     if data.channel == "aggTrade" {
         // let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
         // let name = data.label.clone();
@@ -83,13 +85,15 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
         trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
         // 将ticker数据转换为模拟深度
         let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BinanceSwap, &data);
+        trace_stack.on_after_format();
 
-        on_special_depth(core_arc_clone, update_flag_u, &data.label, &trace_stack, &special_depth).await;
+        on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
     } else if data.channel == "depth" {
         trace_stack.set_source("binance_usdt_swap.depth".to_string());
         // 将depth数据转换为模拟深度
         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, &data);
+        trace_stack.on_after_format();
 
-        on_special_depth(core_arc_clone, update_flag_u, &data.label, &trace_stack, &special_depth).await;
+        on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
     }
 }

+ 3 - 3
strategy/src/core.rs

@@ -606,7 +606,7 @@ impl Core {
     }
 
     // #[instrument(skip(self, depth, name_ref, trace_stack), level="TRACE")]
-    pub async fn on_depth_update(&mut self, depth: &Vec<Decimal>, name_ref: &String, trace_stack: &TraceStack) {
+    pub async fn on_depth_update(&mut self, depth: &Vec<Decimal>, name_ref: &String, trace_stack: &mut TraceStack) {
         // 要从回调传入的深度信息中获取data.name
         let now_time = Utc::now().timestamp_millis();
         if self.market_update_time.contains_key(name_ref) && *self.market_update_time.get(name_ref).unwrap() != 0i64 {
@@ -657,12 +657,12 @@ impl Core {
                                                        &self.ref_price,
                                                        &self.predict,
                                                        &trace_stack.ins);
+                trace_stack.on_after_strategy();
 
                 if orders.is_not_empty() {
                     let mut platform_rest_fb = self.platform_rest.clone_box();
                     // info!("订单指令:{:?}", orders);
-                    let ts = trace_stack.clone();
-                    platform_rest_fb.command_order(&mut orders, &ts).await;
+                    platform_rest_fb.command_order(&mut orders, trace_stack).await;
 
                     // 发了单再更新本地记录。
                     self._update_local_orders(&orders);

+ 3 - 2
strategy/src/exchange_disguise.rs

@@ -90,16 +90,17 @@ pub async fn run_reference_exchange(is_shutdown_arc: Arc<AtomicBool>,
 pub async fn on_special_depth(core_arc: Arc<Mutex<Core>>,
                               update_flag_u: &mut Decimal,
                               label: &String,
-                              trace_stack: &TraceStack,
+                              trace_stack: &mut TraceStack,
                               special_depth: &SpecialDepth) {
     if special_depth.t > *update_flag_u {
         let mut core = core_arc.lock().await;
+        trace_stack.on_after_unlock_core();
 
         core.tickers.insert(label.clone(), special_depth.ticker.clone());
         core.depths.insert(label.clone(), special_depth.depth.clone());
 
         // 触发depth更新
-        core.on_depth_update(&(special_depth.depth), &label, &trace_stack).await;
+        core.on_depth_update(&(special_depth.depth), &label, trace_stack).await;
 
         core.local_depths.insert(special_depth.name.clone(), special_depth.depth.clone());
 

+ 6 - 2
strategy/src/gate_swap.rs

@@ -103,17 +103,21 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
     }
 
     let mut trace_stack = TraceStack::new(data.time, data.ins);
+    trace_stack.on_after_span_line();
+
     if data.channel == "futures.order_book" {
         trace_stack.set_source("gate_usdt_swap.order_book".to_string());
         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &data);
+        trace_stack.on_after_format();
 
-        on_special_depth(core_arc_clone, update_flag_u, &data.label, &trace_stack, &special_depth).await;
+        on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
     } else if data.channel == "futures.book_ticker" {
         trace_stack.set_source("gate_usdt_swap.book_ticker".to_string());
         // 将ticker数据转换为模拟深度
         let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &data);
+        trace_stack.on_after_format();
 
-        on_special_depth(core_arc_clone, update_flag_u, &data.label, &trace_stack, &special_depth).await;
+        on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
     } else if data.channel == "futures.balances" {
         let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
         {