Bladeren bron

最后一个方法,command order。

skyffire 1 jaar geleden
bovenliggende
commit
ce9ba45190
4 gewijzigde bestanden met toevoegingen van 150 en 68 verwijderingen
  1. 1 1
      exchanges/src/bitget_swap_ws.rs
  2. 76 14
      standard/src/bitget_swap_handle.rs
  3. 2 5
      standard/src/handle_info.rs
  4. 71 48
      strategy/src/bitget_usdt_swap.rs

+ 1 - 1
exchanges/src/bitget_swap_ws.rs

@@ -240,7 +240,7 @@ impl BitgetSwapWs {
         }
         // 设置订阅
         let subscription = self.get_subscription();
-        let mut subscribe_array = vec![subscription.to_string()];
+        let subscribe_array = vec![subscription.to_string()];
         info!(?subscribe_array);
 
         //心跳-- 方法内部线程启动

+ 76 - 14
standard/src/bitget_swap_handle.rs

@@ -1,9 +1,12 @@
 use std::str::FromStr;
 use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use serde_json::Value;
 use tokio::time::Instant;
+use tracing::info;
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use crate::{Account, MarketOrder, Order, SpecialOrder};
+use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialOrder};
 
 // 处理账号信息
 pub fn handle_account_info(response: &ResponseData, _symbol: &String) -> Account {
@@ -17,9 +20,9 @@ pub fn handle_account_info(response: &ResponseData, _symbol: &String) -> Account
         // 格式化account信息
         let mut account = Account {
             coin: data["marginCoin"].to_string(),
-            balance: Decimal::from_str(data["accountEquity"].as_str().unwrap()).unwrap(),
+            balance: Decimal::from_str(data["usdtEquity"].as_str().unwrap()).unwrap(),
             available_balance: Decimal::from_str(data["available"].as_str().unwrap()).unwrap(),
-            frozen_balance: Default::default(),
+            frozen_balance: Decimal::from_str(data["frozen"].as_str().unwrap()).unwrap(),
             stocks: Default::default(),
             available_stocks: Default::default(),
             frozen_stocks: Default::default(),
@@ -46,17 +49,27 @@ pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
 }
 
 // 处理订单信息
-pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
-    let price = Decimal::from_str(order["price"].as_str().unwrap_or(order["priceAvg"].as_str().unwrap())).unwrap();
-    let size = Decimal::from_str(order["size"].as_str().unwrap()).unwrap();
-    let status = order["status"].as_str().unwrap_or("");
-    let acc_base_volume = Decimal::from_str(order["accBaseVolume"].as_str().unwrap()).unwrap();
-
-    let avg_price = Decimal::from_str(order["priceAvg"].as_str().unwrap()).unwrap();
+pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
+    info!(?order);
+    let price = Decimal::from_str(order["price"].as_str().unwrap().to_string().as_str()).unwrap();
+    let size = Decimal::from_str(order["size"].as_str().unwrap().to_string().as_str()).unwrap();
+    let binding = order["status"].clone().as_str().unwrap().to_string();
+    let status = binding.as_str();
+    let acc_base_volume = Decimal::from_str(order["accBaseVolume"].as_str().unwrap().to_string().as_str()).unwrap();
+    let avg_price = if order["priceAvg"].is_null() {
+        Decimal::ZERO
+    } else {
+        Decimal::from_str(order["priceAvg"].as_str().unwrap().to_string().as_str()).unwrap()
+    };
+    let c_id = if order["clientOid"].is_null() {
+        ""
+    } else {
+        order["clientOid"].as_str().unwrap()
+    };
 
     let amount = size * ct_val;
     let deal_amount = acc_base_volume * ct_val;
-    let custom_status = if ["filled", "cancelled"].contains(&status) {
+    let custom_status = if ["filled", "canceled"].contains(&status) {
         "REMOVE".to_string()
     } else if ["init", "live", "new", "partially_filled"].contains(&status) {
         "NEW".to_string()
@@ -65,19 +78,19 @@ pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
     };
     Order {
         id: order["orderId"].as_str().unwrap().to_string(),
-        custom_id: order["clientOid"].as_str().unwrap().to_string(),
+        custom_id: c_id.to_string(),
         price,
         amount,
         deal_amount,
         avg_price,
         status: custom_status,
         order_type: order["orderType"].as_str().unwrap().to_string(),
-        trace_stack: TraceStack::new(0, Instant::now()).on_special("84 bitget_swap_handle".to_string()),
+        trace_stack: TraceStack::new(0, Instant::now()).on_special("86 bitget_swap_handle".to_string()),
     }
 }
 
 // 格式化深度信息
-pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
+pub fn format_depth_items(value: Value) -> Vec<MarketOrder> {
     let mut depth_items: Vec<MarketOrder> = vec![];
     for value in value.as_array().unwrap() {
         depth_items.push(MarketOrder {
@@ -88,6 +101,55 @@ pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
     return depth_items;
 }
 
+// 处理position信息
+pub fn handle_position(res_data: &ResponseData, ct_val: &Decimal) -> Vec<Position> {
+    let res_data_json = res_data.data.as_array().unwrap();
+    res_data_json.iter().map(|item| { format_position_item(item, ct_val) }).collect()
+}
+
+pub fn format_position_item(position_json: &Value, ct_val: &Decimal) -> Position {
+    let symbol = position_json["instId"].as_str().unwrap().to_string();
+    let margin_level = Decimal::from_i64(position_json["leverage"].as_i64().unwrap()).unwrap();
+    let amount = Decimal::from_str(position_json["total"].as_str().unwrap()).unwrap() * ct_val;
+    let frozen_amount = Decimal::from_str(position_json["frozen"].as_str().unwrap()).unwrap() * ct_val;
+    let price = Decimal::from_str(position_json["openPriceAvg"].as_str().unwrap()).unwrap();
+    let profit = Decimal::from_str(position_json["unrealizedPL"].as_str().unwrap()).unwrap();
+    let position_mode = match position_json["posMode"].as_str().unwrap() {
+        "hedge_mode" => {
+            match position_json["holdSide"].as_str().unwrap() {
+                "short" => {
+                    PositionModeEnum::Short
+                }
+                "long" => {
+                    PositionModeEnum::Long
+                },
+                _ => {
+                    panic!("bitget_usdt_swap: 未知的持仓模式与持仓方向: {}, {}",
+                           position_json["posMode"].as_str().unwrap(), position_json["holdSide"].as_str().unwrap())
+                }
+            }
+        },
+        "one_way_mode" => {
+            PositionModeEnum::Both
+        },
+        _ => {
+            panic!("bitget_usdt_swap: 未知的持仓模式: {}", position_json["posMode"].as_str().unwrap())
+        }
+    };
+    let margin = Decimal::from_str(position_json["marginSize"].as_str().unwrap()).unwrap();
+
+    Position {
+        symbol,
+        margin_level,
+        amount,
+        frozen_amount,
+        price,
+        profit,
+        position_mode,
+        margin,
+    }
+}
+
 // 处理特殊深度数据
 // pub fn handle_special_depth(res_data: ResponseData) -> SpecialDepth {
 //     HandleSwapInfo::handle_special_depth(ExchangeEnum::BitgetSwap, res_data)

+ 2 - 5
standard/src/handle_info.rs

@@ -115,8 +115,7 @@ impl HandleSwapInfo {
             //     panic!("暂未提供此交易所方法!handle_position:{:?}", exchange);
             // },
             ExchangeEnum::BitgetSwap => {
-                info!(?res_data);
-                panic!("未实现格式化");
+                bitget_swap_handle::handle_position(res_data, ct_val)
             },
             ExchangeEnum::BybitSwap => {
                 bybit_swap_handle::handle_position(res_data, ct_val)
@@ -146,9 +145,7 @@ impl HandleSwapInfo {
             //     bitget_spot_handle::handle_order(res_data, ct_val)
             // },
             ExchangeEnum::BitgetSwap => {
-                info!(?res_data);
-                panic!("未实现格式化");
-                // bitget_spot_handle::handle_order(res_data, ct_val)
+                bitget_swap_handle::handle_order(res_data, ct_val)
             },
             ExchangeEnum::BybitSwap => {
                 bybit_swap_handle::handle_order(res_data, ct_val)

+ 71 - 48
strategy/src/bitget_usdt_swap.rs

@@ -4,13 +4,14 @@ use std::sync::atomic::AtomicBool;
 use rust_decimal::Decimal;
 use tokio::spawn;
 use tokio::sync::Mutex;
-use tracing::info;
+use tracing::{error, info};
 use exchanges::bitget_swap_ws::{BitgetSwapLogin, BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::{BitgetSwap, BybitSwap, GateSwap};
+use standard::exchange::ExchangeEnum::{BitgetSwap};
 use crate::core::Core;
 use crate::exchange_disguise::on_special_depth;
+use crate::model::OrderInfo;
 
 pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
                                   is_trade: bool,
@@ -19,37 +20,37 @@ pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
                                   symbols: Vec<String>,
                                   is_colo: bool,
                                   exchange_params: BTreeMap<String, String>) {
-    // // 开启公共频道
-    // let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
-    //
-    // // 开启公共连接
-    // let is_shutdown_arc_c1 = is_shutdown_arc.clone();
-    // let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
-    // let name_clone = name.clone();
-    // let core_arc_clone = core_arc.clone();
-    // let symbols_clone = symbols.clone();
-    // spawn(async move {
-    //     // 构建链接ws
-    //     let mut bg_public = BitgetSwapWs::new_label(name_clone.clone(),
-    //                                                 is_colo,
-    //                                                 None,
-    //                                                 BitgetSwapWsType::Public);
-    //
-    //     // 消费数据的函数
-    //     let mut update_flag_u = Decimal::ZERO;
-    //     let fun = move |data: ResponseData| {
-    //         let core_arc_cc = core_arc_clone.clone();
-    //
-    //         async move {
-    //             on_public_data(core_arc_cc, &mut update_flag_u, data).await
-    //         }
-    //     };
-    //
-    //     // 准备链接
-    //     bg_public.set_subscribe(vec![BitgetSwapSubscribeType::PuBooks1]); // 只用订阅深度数据
-    //     bg_public.set_symbols(symbols_clone);
-    //     bg_public.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_public, write_rx_public).await.expect("bitget_usdt_swap 链接有异常")
-    // });
+    // 开启公共频道
+    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+
+    // 开启公共连接
+    let is_shutdown_arc_c1 = is_shutdown_arc.clone();
+    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+    let name_clone = name.clone();
+    let core_arc_clone = core_arc.clone();
+    let symbols_clone = symbols.clone();
+    spawn(async move {
+        // 构建链接ws
+        let mut bg_public = BitgetSwapWs::new_label(name_clone.clone(),
+                                                    is_colo,
+                                                    None,
+                                                    BitgetSwapWsType::Public);
+
+        // 消费数据的函数
+        let mut update_flag_u = Decimal::ZERO;
+        let fun = move |data: ResponseData| {
+            let core_arc_cc = core_arc_clone.clone();
+
+            async move {
+                on_public_data(core_arc_cc, &mut update_flag_u, data).await
+            }
+        };
+
+        // 准备链接
+        bg_public.set_subscribe(vec![BitgetSwapSubscribeType::PuBooks1]); // 只用订阅深度数据
+        bg_public.set_symbols(symbols_clone);
+        bg_public.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_public, write_rx_public).await.expect("bitget_usdt_swap 链接有异常")
+    });
 
     // 不需要交易就不用开启私有频道了
     if !is_trade {
@@ -99,22 +100,44 @@ async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
                          run_symbol: &String) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
-    info!(?response);
 
     // public类型,目前只考虑订单流数据
-    // match response.channel.as_str() {
-    //     "account" => {
-    //         info!(?response);
-    //         let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, &response, run_symbol);
-    //         info!(?account);
-    //         let mut core = core_arc_clone.lock().await;
-    //         core.update_equity(account).await;
-    //     }
-    //     _ => {
-    //         info!("bitget_usdt_swap 113 未知的订阅数据");
-    //         info!(?response)
-    //     }
-    // }
+    match response.channel.as_str() {
+        "account" => {
+            let account = standard::handle_info::HandleSwapInfo::handle_account_info(BitgetSwap, &response, run_symbol);
+            let mut core = core_arc_clone.lock().await;
+            core.update_equity(account).await;
+        },
+        "positions" => {
+            let positions = standard::handle_info::HandleSwapInfo::handle_position(BitgetSwap, &response, &ct_val);
+            let mut core = core_arc_clone.lock().await;
+            core.update_position(positions).await;
+        },
+        "orders" => {
+            trace_stack.set_source("gate_swap.orders".to_string());
+            let orders = standard::handle_info::HandleSwapInfo::handle_order(BitgetSwap, response.clone(), ct_val.clone());
+
+            let mut order_infos:Vec<OrderInfo> = Vec::new();
+            for mut order in orders.order {
+                if order.status == "NULL" {
+                    error!("bitget_usdt_swap 未识别的订单状态:{:?}", response);
+
+                    continue;
+                }
+
+                let order_info = OrderInfo::parse_order_to_order_info(&mut order);
+                order_infos.push(order_info);
+            }
+
+            {
+                let mut core = core_arc_clone.lock().await;
+                core.update_order(order_infos, trace_stack).await;
+            }
+        },
+        _ => {
+            info!("bitget_usdt_swap 113 未知的订阅数据: {:?}", response);
+        }
+    }
 }
 
 async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
@@ -130,7 +153,7 @@ async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
             let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSwap, &response);
             trace_stack.on_after_format();
 
-            // on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
         }
         _ => {
             info!("bitget_usdt_swap 125 未知的订阅数据");