Browse Source

能够打印一些特殊通道(未解析的)

skyffire 1 year ago
parent
commit
98865bbd6b

+ 8 - 8
standard/src/bybit_swap_handle.rs

@@ -4,7 +4,7 @@ use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use rust_decimal_macros::dec;
 use serde_json::{from_value, Value};
 use tokio::time::Instant;
-use tracing::{debug, error};
+use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
@@ -94,7 +94,7 @@ pub fn format_position_item(position: &Value, ct_val: &Decimal) -> Position {
 
 // 处理order信息
 pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
-    let res_data_json: Vec<serde_json::Value> = res_data.data.as_array().unwrap().clone();
+    let res_data_json: Vec<Value> = res_data.data.as_array().unwrap().clone();
     let mut order_info = Vec::new();
     for item in res_data_json.iter() {
         order_info.push(format_order_item(item.clone(), ct_val));
@@ -106,9 +106,7 @@ pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
     }
 }
 
-pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
-    debug!("format-order-start, bybit_handle");
-    debug!(?order);
+pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
     let status = order["orderStatus"].as_str().unwrap_or("");
     let text = order["orderLinkId"].as_str().unwrap_or("");
     let size = Decimal::from_str(order["qty"].as_str().unwrap()).unwrap();
@@ -121,7 +119,11 @@ pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
         avg_price = right_val / right;
     }
     let deal_amount = right * ct_val;
-    let custom_status = if status == "Filled" || status == "Cancelled" { "REMOVE".to_string() } else if status == "New" { "NEW".to_string() } else {
+    let custom_status = if status == "Filled" || status == "Cancelled" {
+        "REMOVE".to_string()
+    } else if status == "New" {
+        "NEW".to_string()
+    } else {
        "NULL".to_string()
     };
     let rst_order = Order {
@@ -136,8 +138,6 @@ pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
         trace_stack: TraceStack::new(0, Instant::now()).on_special("120 bybit_handle".to_string()),
     };
 
-    debug!(?rst_order);
-    debug!("format-order-end, bybit_handle");
     return rst_order;
 }
 

+ 1 - 1
standard/src/gate_swap.rs

@@ -10,7 +10,7 @@ use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use serde_json::json;
 use tokio::spawn;
 use tokio::time::Instant;
-use tracing::{error, debug, trace, info};
+use tracing::{error, debug, trace};
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
 use exchanges::gate_swap_rest::GateSwapRest;
 use global::trace_stack::TraceStack;

+ 12 - 12
strategy/src/binance_usdt_swap.rs

@@ -4,6 +4,7 @@ use std::sync::atomic::AtomicBool;
 use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::Message;
+use tracing::error;
 use exchanges::response_base::ResponseData;
 use global::trace_stack::{TraceStack};
 use standard::exchange::ExchangeEnum::BinanceSwap;
@@ -51,15 +52,11 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 
 async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
                  update_flag_u: &mut Decimal,
-                 data: ResponseData) {
-    if data.code.as_str() != "200" {
-        return;
-    }
-
-    let mut trace_stack = TraceStack::new(data.time, data.ins);
+                 response: ResponseData) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
-    match data.channel.as_str() {
+    match response.channel.as_str() {
         "aggTrade" => {
             // let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
             // let name = data.label.clone();
@@ -86,19 +83,22 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
         "bookTicker" => {
             trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
             // 将ticker数据转换为模拟深度
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BinanceSwap, &data);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BinanceSwap, &response);
             trace_stack.on_after_format();
 
-            on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
         }
         "depth" => {
             trace_stack.set_source("binance_usdt_swap.depth".to_string());
             // 将depth数据转换为模拟深度
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, &data);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, &response);
             trace_stack.on_after_format();
 
-            on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+        }
+        _ => {
+            error!("未知推送类型");
+            error!(?response);
         }
-        _ => {}
     }
 }

+ 0 - 5
strategy/src/bybit_usdt_swap.rs

@@ -114,11 +114,6 @@ pub async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: ResponseData) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
-
-    if response.code.as_str() != "200" {
-        return;
-    }
-
     match response.channel.as_str() {
         "wallet" => {
             let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, &response, run_symbol);

+ 1 - 1
strategy/src/core.rs

@@ -59,7 +59,7 @@ pub struct Core {
     pub local_coin: Decimal,
     // 仓位信息
     pub local_position: LocalPosition,
-    // 仓位信息-自订单
+    // 仓位信息-自订单
     pub local_position_by_orders: LocalPosition,
     //
     pub local_buy_amount: Decimal,

+ 15 - 16
strategy/src/gate_swap.rs

@@ -1,4 +1,4 @@
-use tracing::info;
+use tracing::{error, info};
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
@@ -97,38 +97,34 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
                  update_flag_u: &mut Decimal,
                  multiplier: &Decimal,
                  run_symbol: &String,
-                 data: ResponseData) {
-    if data.code.as_str() != "200" {
-        return;
-    }
-
-    let mut trace_stack = TraceStack::new(data.time, data.ins);
+                 response: ResponseData) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
-    match data.channel.as_str() {
+    match response.channel.as_str() {
         "futures.order_book" => {
             trace_stack.set_source("gate_usdt_swap.order_book".to_string());
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &data);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &response);
             trace_stack.on_after_format();
 
-            on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
         }
         "futures.book_ticker" => {
             trace_stack.set_source("gate_usdt_swap.book_ticker".to_string());
             // 将ticker数据转换为模拟深度
-            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &data);
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &response);
             trace_stack.on_after_format();
 
-            on_special_depth(core_arc_clone, update_flag_u, &data.label, &mut trace_stack, &special_depth).await;
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
         }
         "futures.balances" => {
-            let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, &data, run_symbol);
+            let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, &response, run_symbol);
             let mut core = core_arc_clone.lock().await;
             core.update_equity(account).await;
         }
         "futures.orders" => {
             trace_stack.set_source("gate_swap.orders".to_string());
-            let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
+            let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, response.clone(), multiplier.clone());
 
             let mut order_infos:Vec<OrderInfo> = Vec::new();
             for order in orders.order {
@@ -156,7 +152,7 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             }
         }
         "futures.positions" => {
-            let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap, &data, multiplier);
+            let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap, &response, multiplier);
             let mut core = core_arc_clone.lock().await;
             core.update_position(positions).await;
         }
@@ -179,7 +175,10 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             // }
             // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
         }
-        _ => {}
+        _ => {
+            error!("未知推送类型");
+            error!(?response);
+        }
     }
 }
 

+ 2 - 2
strategy/src/strategy.rs

@@ -1271,8 +1271,8 @@ impl Strategy {
 
         // 下单指令处理逻辑
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_close(&mut command, local_orders);           // 平仓单命令处理
-        self._post_open(&mut command, local_orders);        // 限价单命令处理
+        self._post_close(&mut command, local_orders);               // 平仓单命令处理
+        self._post_open(&mut command, local_orders);                // 限价单命令处理
 
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter