Browse Source

解决了一个先登录后订阅的架构问题。

skyffire 1 năm trước cách đây
mục cha
commit
d49d4719ed

+ 1 - 1
exchanges/src/binance_swap_ws.rs

@@ -189,7 +189,7 @@ impl BinanceSwapWs {
                 info!("binance_usdt_swap socket 连接中……");
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("binance_usdt_swap socket 断连,1s以后重连……");

+ 45 - 31
exchanges/src/bitget_swap_ws.rs

@@ -19,6 +19,7 @@ pub enum BitgetSwapWsType {
 #[derive(Clone)]
 pub enum BitgetSwapSubscribeType {
     PuTrade,
+    PuBooks1,
 
     PrAccount,
     PrPosition,
@@ -35,12 +36,12 @@ pub struct BitgetSwapLogin {
 
 #[derive(Clone)]
 pub struct BitgetSwapWs {
-    label: String,                                      // 类型
-    address_url: String,                                // 地址
-    login_param: Option<BitgetSwapLogin>,               // 账号
-    symbol_s: Vec<String>,                              // 币对
-    subscribe_types: Vec<BitgetSwapSubscribeType>,      // 订阅
-    heartbeat_time: u64,                                // 心跳间隔
+    label: String,                                              // 类型
+    address_url: String,                                        // 地址
+    login_param: Option<BitgetSwapLogin>,                       // 账号
+    symbol_s: Vec<String>,                                      // 币对
+    subscribe_types: Vec<BitgetSwapSubscribeType>,              // 订阅
+    heartbeat_time: u64,                                        // 心跳间隔
 }
 
 impl BitgetSwapWs {
@@ -70,7 +71,7 @@ impl BitgetSwapWs {
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 20,
+            heartbeat_time: 1000 * 20
         }
     }
 
@@ -96,6 +97,7 @@ impl BitgetSwapWs {
         for t in self.subscribe_types.clone() {
             if match t {
                 BitgetSwapSubscribeType::PuTrade => false,
+                BitgetSwapSubscribeType::PuBooks1 => false,
 
                 BitgetSwapSubscribeType::PrAccount => true,
                 BitgetSwapSubscribeType::PrOrders => true,
@@ -121,6 +123,13 @@ impl BitgetSwapWs {
                     "instId": symbol,
                 })
             },
+            BitgetSwapSubscribeType::PuBooks1 => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "books1",
+                    "instId": symbol,
+                })
+            },
 
             // 私有订阅
             BitgetSwapSubscribeType::PrAccount => {
@@ -188,14 +197,14 @@ impl BitgetSwapWs {
             let sign = base64::encode(result);
 
             let login_json = json!({
-                              "op": "login",
-                              "args": [{
-                                "apiKey": access_key,
-                                "passphrase": passphrase,
-                                "timestamp": timestamp,
-                                "sign": sign
-                              }]
-                        });
+                "op": "login",
+                "args": [{
+                    "apiKey": access_key,
+                    "passphrase": passphrase,
+                    "timestamp": timestamp,
+                    "sign": sign
+                }]
+            });
 
             info!("---login_json: {0}", login_json.to_string());
             info!("---登陆: {}", login_json.to_string());
@@ -217,11 +226,22 @@ impl BitgetSwapWs {
             Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
     {
         let login_is = self.contains_pr();
-        let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
         let heartbeat_time = self.heartbeat_time.clone();
 
+        // 登录相关
+        if login_is {
+            let login_str = self.log_in_to_str();
+            info!("发起ws登录: {}", login_str);
+            // TODO 这样写不能断线重连,后面再想办法修吧
+            let write_tx_clone2 = Arc::clone(write_tx_am);
+            AbstractWsMode::send_subscribe(write_tx_clone2, Message::Text(login_str)).await;
+        }
+        // 设置订阅
+        let subscription = self.get_subscription();
+        let mut subscribe_array = vec![subscription.to_string()];
+        info!(?subscribe_array);
 
         //心跳-- 方法内部线程启动
         let write_tx_clone1 = Arc::clone(write_tx_am);
@@ -229,26 +249,16 @@ impl BitgetSwapWs {
             AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
         });
 
-        //设置订阅
-        let mut subscribe_array = vec![];
-        if login_is {
-            //登录相关
-            let login_str = self.log_in_to_str();
-            let write_tx_clone2 = Arc::clone(write_tx_am);
-            AbstractWsMode::send_subscribe(write_tx_clone2, Message::Text(login_str)).await;
-            tokio::time::sleep(Duration::from_millis(1000 * 3)).await;
-        }
-        subscribe_array.push(subscription.to_string());
-
         //链接
         let t2 = tokio::spawn(async move {
             let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
 
             loop {
                 info!("bitget_usdt_swap socket 连接中……");
+
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("bitget_usdt_swap socket 断连,1s以后重连……");
@@ -279,10 +289,10 @@ impl BitgetSwapWs {
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
-        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), Value::Null);
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), text.clone(), Value::Null);
         let json_value: Value = serde_json::from_str(&text).unwrap();
 
-        if json_value.get("event").is_some() && json_value["event"].as_str() == Option::from("login") {
+        if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") {
             if json_value.get("code").is_some() && json_value["code"] == 0 {
                 res_data.message = "登陆成功".to_string();
             } else {
@@ -290,11 +300,14 @@ impl BitgetSwapWs {
             }
             res_data.channel = "login".to_string();
             res_data.code = "-200".to_string();
+            res_data.data = json_value;
+
             res_data
-        } else if json_value.get("event").is_some() && json_value["event"].as_str() == Option::from("subscribe") {
+        } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
             res_data.code = "-201".to_string();
             res_data.data = json_value.clone();
             res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
+            res_data.message = "success".to_string();
             res_data
         } else if json_value.get("action").is_some() {
             res_data.data = json_value["data"].clone();
@@ -303,6 +316,7 @@ impl BitgetSwapWs {
             } else {
                 res_data.code = "200".to_string();
             }
+            res_data.message = "success".to_string();
             res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
             res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
             res_data

+ 1 - 1
exchanges/src/bybit_swap_ws.rs

@@ -249,7 +249,7 @@ impl BybitSwapWs {
 
                 // ws网络层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
+                                                 false, label.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("bybit_usdt_swap socket 断连,1s以后重连……");

+ 1 - 1
exchanges/src/gate_swap_ws.rs

@@ -301,7 +301,7 @@ impl GateSwapWs {
                 info!("gate_usdt_swap socket 连接中……");
 
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("gate_usdt_swap socket 断连,1s以后重连……");

+ 16 - 6
exchanges/src/socket_tool.rs

@@ -31,6 +31,7 @@ pub struct AbstractWsMode {}
 
 impl AbstractWsMode {
     pub async fn ws_connected<T, PI, PO, F, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                    is_first_login: bool,
                                                     label: String,
                                                     is_shutdown_arc: Arc<AtomicBool>,
                                                     handle_function: &F,
@@ -60,11 +61,13 @@ impl AbstractWsMode {
             Ok::<(), Error>(())
         };
 
-        // 订阅消息
-        info!("订阅内容:{:?}", subscribe_array.clone());
-        for s in &subscribe_array {
-            let mut write_lock = ws_write_arc.lock().await;
-            write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+        // 如果不需要事先登录,则直接订阅消息
+        if !is_first_login {
+            info!("订阅内容:{:?}", subscribe_array.clone());
+            for s in &subscribe_array {
+                let mut write_lock = ws_write_arc.lock().await;
+                write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+            }
         }
 
         let ws_write_inner = Arc::clone(&ws_write_arc);
@@ -100,7 +103,12 @@ impl AbstractWsMode {
                     match code.as_str() {
                         "-200" => {
                             //登录成功
-                            trace!("登录成功:{:?}", data);
+                            info!("ws登录成功:{:?}", data);
+                            info!("订阅内容:{:?}", subscribe_array.clone());
+                            for s in &subscribe_array {
+                                let mut write_lock = ws_write_arc.lock().await;
+                                write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+                            }
                         }
                         "-201" => {
                             //订阅成功
@@ -149,6 +157,7 @@ impl AbstractWsMode {
     pub async fn ws_connect_async<T, PI, PO, F, Future>(is_shutdown_arc: Arc<AtomicBool>,
                                                         handle_function: F,
                                                         address_url: String,
+                                                        is_first_login: bool,
                                                         label: String,
                                                         subscribe_array: Vec<String>,
                                                         write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
@@ -179,6 +188,7 @@ impl AbstractWsMode {
                 info!("socket 链接成功,{}。", address_url);
 
                 Self::ws_connected(write_to_socket_rx_arc,
+                                   is_first_login,
                                    label,
                                    is_shutdown_arc,
                                    &handle_function,

+ 93 - 108
standard/src/bitget_swap_handle.rs

@@ -1,113 +1,98 @@
-// use std::str::FromStr;
-// use rust_decimal::Decimal;
-// use rust_decimal_macros::dec;
-// use serde_json::json;
-// use tokio::time::Instant;
-// use tracing::trace;
-// use exchanges::response_base::ResponseData;
-// use global::trace_stack::TraceStack;
-// use crate::{Account, MarketOrder, Order, SpecialDepth, SpecialOrder, SpecialTicker};
-// use crate::exchange::ExchangeEnum;
-// use crate::handle_info::HandleSwapInfo;
-//
-// // 处理账号信息
-// pub fn handle_account_info(res_data: ResponseData, symbol: String) -> Account {
-//     let symbol_upper = symbol.to_uppercase();
-//     let symbol_array: Vec<&str> = symbol_upper.split("_").collect();
-//     let res_data_str = res_data.data;
-//     let res_data_json: Vec<serde_json::Value> = serde_json::from_str(&res_data_str).unwrap();
-//     let balance_info_default = json!({"available":"0","coin": symbol_array[1],"frozen":"0","limitAvailable":"0","locked":"0","uTime":"0"});
-//     let balance_info = res_data_json.iter().find(|&item| item["coin"].as_str().unwrap() == symbol_array[1]).unwrap_or(&balance_info_default);
-//     let stocks_info_default = json!({"available":"0","coin": symbol_array[0],"frozen":"0","limitAvailable":"0","locked":"0","uTime":"0"});
-//     let stocks_info = res_data_json.iter().find(|&item| item["coin"].as_str().unwrap() == symbol_array[0]).unwrap_or(&stocks_info_default);
-//     format_account_info(balance_info.clone(), stocks_info.clone())
-// }
-//
-// pub fn format_account_info(balance_data: serde_json::Value, stocks_data: serde_json::Value) -> Account {
-//     let balance_coin = balance_data["coin"].as_str().unwrap().to_string().to_uppercase();
-//     let available_balance = Decimal::from_str(balance_data["available"].as_str().unwrap()).unwrap();
-//     let frozen_balance = Decimal::from_str(balance_data["frozen"].as_str().unwrap()).unwrap();
-//     let balance = available_balance + frozen_balance;
-//
-//     let stocks_coin = stocks_data["coin"].as_str().unwrap().to_string().to_uppercase();
-//     let available_stocks = Decimal::from_str(stocks_data["available"].as_str().unwrap()).unwrap();
-//     let frozen_stocks = Decimal::from_str(stocks_data["frozen"].as_str().unwrap()).unwrap();
-//     let stocks = available_stocks + frozen_stocks;
-//
-//     Account {
-//         coin: format!("{}_{}", stocks_coin, balance_coin),
-//         balance,
-//         available_balance,
-//         frozen_balance,
-//         stocks,
-//         available_stocks,
-//         frozen_stocks,
-//     }
-// }
-//
-// // 处理order信息
-// pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
-//     let res_data_str = res_data.data;
-//     let res_data_json: Vec<serde_json::Value> = serde_json::from_str(&*res_data_str).unwrap();
-//     let mut order_info = Vec::new();
-//     for item in res_data_json.iter() {
-//         order_info.push(format_order_item(item.clone(), ct_val));
-//     }
-//     trace!(?order_info);
-//     SpecialOrder {
-//         name: res_data.label,
-//         order: order_info,
-//     }
-// }
-//
-// // 处理订单信息
-// pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
-//     let price = Decimal::from_str(order["price"].as_str().unwrap_or(order["priceAvg"].as_str().unwrap())).unwrap();
-//     let size = Decimal::from_str(order["size"].as_str().unwrap()).unwrap();
-//     let status = order["status"].as_str().unwrap_or("");
-//     let acc_base_volume = Decimal::from_str(order["accBaseVolume"].as_str().unwrap()).unwrap();
-//
-//     let avg_price = Decimal::from_str(order["priceAvg"].as_str().unwrap()).unwrap();
-//
-//     let amount = size * ct_val;
-//     let deal_amount = acc_base_volume * ct_val;
-//     let custom_status = if ["filled", "cancelled"].contains(&status) {
-//         "REMOVE".to_string()
-//     } else if ["init", "live", "new", "partially_filled"].contains(&status) {
-//         "NEW".to_string()
-//     } else {
-//         "NULL".to_string()
-//     };
-//     Order {
-//         id: order["orderId"].as_str().unwrap().to_string(),
-//         custom_id: order["clientOid"].as_str().unwrap().to_string(),
-//         price,
-//         amount,
-//         deal_amount,
-//         avg_price,
-//         status: custom_status,
-//         order_type: order["orderType"].as_str().unwrap().to_string(),
-//         trace_stack: TraceStack::new(0, Instant::now()).on_special("89 bitget_spot_handle".to_string()),
-//     }
-// }
-//
-// // 处理特殊深度数据
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use tokio::time::Instant;
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use crate::{Account, MarketOrder, Order, SpecialOrder};
+
+// 处理账号信息
+pub fn handle_account_info(response: &ResponseData, _symbol: &String) -> Account {
+    let mut rst = Account::new();
+
+    for data in response.data.as_array().unwrap() {
+        if data["marginCoin"].as_str().unwrap() != "USDT" {
+            continue
+        }
+
+        // 格式化account信息
+        let mut account = Account {
+            coin: data["marginCoin"].to_string(),
+            balance: Decimal::from_str(data["accountEquity"].as_str().unwrap()).unwrap(),
+            available_balance: Decimal::from_str(data["available"].as_str().unwrap()).unwrap(),
+            frozen_balance: Default::default(),
+            stocks: Default::default(),
+            available_stocks: Default::default(),
+            frozen_stocks: Default::default(),
+        };
+        account.frozen_balance = account.balance - account.available_balance;
+
+        rst = account
+    }
+
+    return rst;
+}
+
+// 处理order信息
+pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
+    let res_data_json = res_data.data.as_array().unwrap();
+    let mut order_info = Vec::new();
+    for item in res_data_json.iter() {
+        order_info.push(format_order_item(item.clone(), ct_val));
+    }
+    SpecialOrder {
+        name: res_data.label,
+        order: order_info,
+    }
+}
+
+// 处理订单信息
+pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
+    let price = Decimal::from_str(order["price"].as_str().unwrap_or(order["priceAvg"].as_str().unwrap())).unwrap();
+    let size = Decimal::from_str(order["size"].as_str().unwrap()).unwrap();
+    let status = order["status"].as_str().unwrap_or("");
+    let acc_base_volume = Decimal::from_str(order["accBaseVolume"].as_str().unwrap()).unwrap();
+
+    let avg_price = Decimal::from_str(order["priceAvg"].as_str().unwrap()).unwrap();
+
+    let amount = size * ct_val;
+    let deal_amount = acc_base_volume * ct_val;
+    let custom_status = if ["filled", "cancelled"].contains(&status) {
+        "REMOVE".to_string()
+    } else if ["init", "live", "new", "partially_filled"].contains(&status) {
+        "NEW".to_string()
+    } else {
+        "NULL".to_string()
+    };
+    Order {
+        id: order["orderId"].as_str().unwrap().to_string(),
+        custom_id: order["clientOid"].as_str().unwrap().to_string(),
+        price,
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: order["orderType"].as_str().unwrap().to_string(),
+        trace_stack: TraceStack::new(0, Instant::now()).on_special("84 bitget_swap_handle".to_string()),
+    }
+}
+
+// 格式化深度信息
+pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
+    let mut depth_items: Vec<MarketOrder> = vec![];
+    for value in value.as_array().unwrap() {
+        depth_items.push(MarketOrder {
+            price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
+            amount: Decimal::from_str(value[1].as_str().unwrap()).unwrap(),
+        })
+    }
+    return depth_items;
+}
+
+// 处理特殊深度数据
 // pub fn handle_special_depth(res_data: ResponseData) -> SpecialDepth {
-//     HandleSwapInfo::handle_special_depth(ExchangeEnum::BitgetSpot, res_data)
-// }
-//
-// // 格式化深度信息
-// pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
-//     let mut depth_items: Vec<MarketOrder> = vec![];
-//     for value in value.as_array().unwrap() {
-//         depth_items.push(MarketOrder {
-//             price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
-//             amount: Decimal::from_str(value[1].as_str().unwrap()).unwrap(),
-//         })
-//     }
-//     return depth_items;
+//     HandleSwapInfo::handle_special_depth(ExchangeEnum::BitgetSwap, res_data)
 // }
-//
+
 // // 处理特殊Ticker信息
 // pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
 //     let res_data_str = res_data.data;

+ 10 - 13
standard/src/handle_info.rs

@@ -44,6 +44,9 @@ impl HandleSwapInfo {
             // ExchangeEnum::BitgetSpot => {
             //     bitget_spot_handle::handle_account_info(res_data, symbol)
             // },
+            ExchangeEnum::BitgetSwap => {
+                bitget_swap_handle::handle_account_info(res_data, symbol)
+            },
             ExchangeEnum::BybitSwap => {
                 bybit_swap_handle::handle_account_info(res_data, symbol)
             }
@@ -53,7 +56,7 @@ impl HandleSwapInfo {
             }
         }
     }
-    // 处理特殊Ticket信息
+    // 处理Ticker信息
     pub fn handle_book_ticker(exchange: ExchangeEnum, res_data: &ResponseData) -> SpecialDepth {
         match exchange {
             // ExchangeEnum::BinanceSpot => {
@@ -79,7 +82,7 @@ impl HandleSwapInfo {
             // },
             ExchangeEnum::BitgetSwap => {
                 info!(?res_data);
-                panic!("未实现格式化");
+                panic!("BitgetSwap 85 未实现格式化");
                 // bitget_swap_handle::handle_special_ticker(res_data)
             },
             ExchangeEnum::BybitSwap => {
@@ -164,7 +167,7 @@ impl HandleSwapInfo {
 }
 
 
-pub fn make_special_depth(label: String, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, t: Decimal, create_at: i64) -> SpecialDepth{
+pub fn make_special_depth(label: String, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, t: Decimal, create_at: i64) -> SpecialDepth {
     depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
     depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
     // TODO 不排序的话,有4us可以省下来。
@@ -279,14 +282,10 @@ pub fn format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthPar
         //     create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
         // }
         ExchangeEnum::BitgetSwap => {
-            depth_asks = vec![];
-            depth_bids = vec![];
-            t = Decimal::ZERO;
-            create_at = 0;
-            // depth_asks = bitget_spot_handle::format_depth_items(res_data_json[0]["asks"].clone());
-            // depth_bids = bitget_spot_handle::format_depth_items(res_data_json[0]["bids"].clone());
-            // t = Decimal::from_str(res_data_json[0]["ts"].as_str().unwrap()).unwrap();
-            // create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
+            depth_asks = bitget_swap_handle::format_depth_items(res_data.data[0]["asks"].clone());
+            depth_bids = bitget_swap_handle::format_depth_items(res_data.data[0]["bids"].clone());
+            t = Decimal::from_str(res_data.data[0]["ts"].as_str().unwrap()).unwrap();
+            create_at = res_data.data[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
         }
         ExchangeEnum::BybitSwap => {
             depth_asks = bybit_swap_handle::format_depth_items(res_data.data["a"].clone());
@@ -303,5 +302,3 @@ pub fn format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthPar
         create_at
     }
 }
-
-

+ 78 - 39
strategy/src/bitget_usdt_swap.rs

@@ -7,7 +7,10 @@ use tokio::sync::Mutex;
 use tracing::info;
 use exchanges::bitget_swap_ws::{BitgetSwapLogin, BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
 use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::{BitgetSwap, BybitSwap, GateSwap};
 use crate::core::Core;
+use crate::exchange_disguise::on_special_depth;
 
 pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
                                   is_trade: bool,
@@ -16,37 +19,37 @@ pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
                                   symbols: Vec<String>,
                                   is_colo: bool,
                                   exchange_params: BTreeMap<String, String>) {
-    // 开启公共频道
-    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
-
-    // 开启公共连接
-    let is_shutdown_arc_c1 = is_shutdown_arc.clone();
-    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
-    let name_clone = name.clone();
-    let core_arc_clone = core_arc.clone();
-    let symbols_clone = symbols.clone();
-    spawn(async move {
-        // 构建链接ws
-        let mut bg_public = BitgetSwapWs::new_label(name_clone.clone(),
-                                                    is_colo,
-                                                    None,
-                                                    BitgetSwapWsType::Public);
-
-        // 消费数据的函数
-        let mut update_flag_u = Decimal::ZERO;
-        let fun = move |data: ResponseData| {
-            let core_arc_cc = core_arc_clone.clone();
-
-            async move {
-                on_public_data(core_arc_cc, &mut update_flag_u, data).await
-            }
-        };
-
-        // 准备链接
-        bg_public.set_subscribe(vec![BitgetSwapSubscribeType::PuTrade]); // 只用订阅订单流数据
-        bg_public.set_symbols(symbols_clone);
-        bg_public.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_public, write_rx_public).await.expect("bitget_usdt_swap 链接有异常")
-    });
+    // // 开启公共频道
+    // let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+    //
+    // // 开启公共连接
+    // let is_shutdown_arc_c1 = is_shutdown_arc.clone();
+    // let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+    // let name_clone = name.clone();
+    // let core_arc_clone = core_arc.clone();
+    // let symbols_clone = symbols.clone();
+    // spawn(async move {
+    //     // 构建链接ws
+    //     let mut bg_public = BitgetSwapWs::new_label(name_clone.clone(),
+    //                                                 is_colo,
+    //                                                 None,
+    //                                                 BitgetSwapWsType::Public);
+    //
+    //     // 消费数据的函数
+    //     let mut update_flag_u = Decimal::ZERO;
+    //     let fun = move |data: ResponseData| {
+    //         let core_arc_cc = core_arc_clone.clone();
+    //
+    //         async move {
+    //             on_public_data(core_arc_cc, &mut update_flag_u, data).await
+    //         }
+    //     };
+    //
+    //     // 准备链接
+    //     bg_public.set_subscribe(vec![BitgetSwapSubscribeType::PuBooks1]); // 只用订阅深度数据
+    //     bg_public.set_symbols(symbols_clone);
+    //     bg_public.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_public, write_rx_public).await.expect("bitget_usdt_swap 链接有异常")
+    // });
 
     // 不需要交易就不用开启私有频道了
     if !is_trade {
@@ -68,12 +71,14 @@ pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
 
         // 消费数据的函数
         let core_arc_clone = core_arc.clone();
+        let run_symbol = symbols[0].clone();
         let ct_val = core_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
         let fun = move |data: ResponseData| {
             let core_arc_cc = core_arc_clone.clone();
+            let run_symbol_c = run_symbol.clone();
 
             async move {
-                on_private_data(core_arc_cc, ct_val, data).await
+                on_private_data(core_arc_cc, ct_val, data, &run_symbol_c).await
             }
         };
 
@@ -88,16 +93,50 @@ pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
     });
 }
 
-async fn on_private_data(_core_arc_clone: Arc<Mutex<Core>>,
-                         _ct_val: Decimal,
-                         response: ResponseData) {
-    info!(?response)
+async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
+                         ct_val: Decimal,
+                         response: ResponseData,
+                         run_symbol: &String) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
+    trace_stack.on_after_span_line();
+    info!(?response);
+
+    // public类型,目前只考虑订单流数据
+    // match response.channel.as_str() {
+    //     "account" => {
+    //         info!(?response);
+    //         let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, &response, run_symbol);
+    //         info!(?account);
+    //         let mut core = core_arc_clone.lock().await;
+    //         core.update_equity(account).await;
+    //     }
+    //     _ => {
+    //         info!("bitget_usdt_swap 113 未知的订阅数据");
+    //         info!(?response)
+    //     }
+    // }
 }
 
-async fn on_public_data(_core_arc_clone: Arc<Mutex<Core>>,
-                        _update_flag_u: &mut Decimal,
+async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
+                        update_flag_u: &mut Decimal,
                         response: ResponseData) {
-    info!(?response)
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
+    trace_stack.on_after_span_line();
+
+    // public类型,目前只考虑订单流数据
+    match response.channel.as_str() {
+        "books1" => {
+            trace_stack.set_source("bitget_usdt_swap.books1".to_string());
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSwap, &response);
+            trace_stack.on_after_format();
+
+            // on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+        }
+        _ => {
+            info!("bitget_usdt_swap 125 未知的订阅数据");
+            info!(?response)
+        }
+    }
 }
 
 fn parse_btree_map_to_bitget_swap_login(exchange_params: BTreeMap<String, String>) -> BitgetSwapLogin {