浏览代码

bitget_swap逻辑基本做完和测通,可以进行分支合并。

skyffire 1 年之前
父节点
当前提交
8b8952a935

+ 0 - 1
exchanges/src/bitget_swap_rest.rs

@@ -270,7 +270,6 @@ impl BitgetSwapRest {
         match result {
             Ok(res_data) => {
                 if res_data.code != "200" {
-                    info!(?res_data);
                     let json_value = res_data.data;
                     let code = res_data.code;
                     let error = ResponseData::new("".to_string(),

+ 110 - 106
standard/src/bitget_usdt_swap.rs → standard/src/bitget_swap.rs

@@ -4,10 +4,13 @@ use std::io::{Error, ErrorKind};
 use tokio::sync::mpsc::Sender;
 use std::str::FromStr;
 use async_trait::async_trait;
+use futures::stream::FuturesUnordered;
+use futures::TryStreamExt;
 use rust_decimal::{Decimal, MathematicalOps};
 use rust_decimal::prelude::ToPrimitive;
 use rust_decimal_macros::dec;
 use serde_json::{json, Value};
+use tokio::spawn;
 use tokio::time::Instant;
 use tracing::{error, info};
 use global::trace_stack::TraceStack;
@@ -255,7 +258,8 @@ impl Platform for BitgetSwap {
             let amount_size = Decimal::TEN.powd(Decimal::NEGATIVE_ONE * amount_precision);
             let min_qty = Decimal::NEGATIVE_ONE;
             let max_qty = Decimal::NEGATIVE_ONE;
-            let ct_val = Decimal::from_str(&market_info["sizeMultiplier"].as_str().unwrap()).unwrap();
+            // let ct_val = Decimal::from_str(&market_info["sizeMultiplier"].as_str().unwrap()).unwrap();
+            let ct_val = Decimal::ONE;
 
             let result = Market {
                 symbol: format!("{}_{}", base_asset, quote_asset),
@@ -512,111 +516,107 @@ impl Platform for BitgetSwap {
         // }
     }
 
-    async fn command_order(&mut self, _order_command: &mut OrderCommand, _trace_stack: &TraceStack) {
-        // let mut handles = vec![];
-        // // 撤销订单
-        // let cancel = order_command.cancel;
-        // for item in cancel.keys() {
-        //     let mut self_clone = self.clone();
-        //     let cancel_clone = cancel.clone();
-        //     let item_clone = item.clone();
-        //     let order_id = cancel_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
-        //     let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-        //     let result_sd = self.order_sender.clone();
-        //     let err_sd = self.error_sender.clone();
-        //     let handle = tokio::spawn(async move {
-        //         let result = self_clone.cancel_order(&order_id, &custom_id).await;
-        //         match result {
-        //             Ok(_) => {
-        //                 // result_sd.send(result).await.unwrap();
-        //             }
-        //             Err(error) => {
-        //                 // 取消失败去查订单。
-        //                 let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
-        //                 match query_rst {
-        //                     Ok(order) => {
-        //                         result_sd.send(order).await.unwrap();
-        //                     }
-        //                     Err(_query_err) => {
-        //                         // error!(?_query_err);
-        //                         // error!("撤单失败,而且查单也失败了,bitget_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
-        //                     }
-        //                 }
-        //                 err_sd.send(error).await.unwrap();
-        //             }
-        //         }
-        //     });
-        //     handles.push(handle)
-        // }
-        // // 下单指令
-        // let mut limits = HashMap::new();
-        // limits.extend(order_command.limits_open);
-        // limits.extend(order_command.limits_close);
-        // for item in limits.keys() {
-        //     let mut self_clone = self.clone();
-        //     let limits_clone = limits.clone();
-        //     let item_clone = item.clone();
-        //     let result_sd = self.order_sender.clone();
-        //     let err_sd = self.error_sender.clone();
-        //     let ts = trace_stack.clone();
-        //
-        //     let handle = tokio::spawn(async move {
-        //         let value = limits_clone[&item_clone].clone();
-        //         let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
-        //         let side = value.get(1).unwrap();
-        //         let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
-        //         let cid = value.get(3).unwrap();
-        //
-        //         //  order_name: [数量,方向,价格,c_id]
-        //         let result = self_clone.take_order(cid, side, price, amount).await;
-        //         match result {
-        //             Ok(mut result) => {
-        //                 // 记录此订单完成时间
-        //                 // ts.on_after_send();
-        //                 result.trace_stack = ts;
-        //
-        //                 result_sd.send(result).await.unwrap();
-        //             }
-        //             Err(error) => {
-        //                 let mut err_order = Order::new();
-        //                 err_order.custom_id = cid.clone();
-        //                 err_order.status = "REMOVE".to_string();
-        //
-        //                 result_sd.send(err_order).await.unwrap();
-        //                 err_sd.send(error).await.unwrap();
-        //             }
-        //         }
-        //     });
-        //     handles.push(handle)
-        // }
-        // // 检查订单指令
-        // let check = order_command.check;
-        // for item in check.keys() {
-        //     let mut self_clone = self.clone();
-        //     let check_clone = check.clone();
-        //     let item_clone = item.clone();
-        //     let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
-        //     let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
-        //     let result_sd = self.order_sender.clone();
-        //     let err_sd = self.error_sender.clone();
-        //     let handle = tokio::spawn(async move {
-        //         let result = self_clone.get_order_detail(&order_id, &custom_id).await;
-        //         match result {
-        //             Ok(result) => {
-        //                 result_sd.send(result).await.unwrap();
-        //             }
-        //             Err(error) => {
-        //                 err_sd.send(error).await.unwrap();
-        //             }
-        //         }
-        //     });
-        //     handles.push(handle)
-        // }
-        //
-        // let futures = FuturesUnordered::from_iter(handles);
-        // let _: Result<Vec<_>, _> = futures.try_collect().await;
+    async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
+        let mut handles = vec![];
+
+        // 下单指令
+        for item in order_command.limits_open.keys() {
+            let mut ts = trace_stack.clone();
+
+            let amount = Decimal::from_str(&*order_command.limits_open[item].get(0).unwrap().clone()).unwrap();
+            let side = order_command.limits_open[item].get(1).unwrap().clone();
+            let price = Decimal::from_str(&*order_command.limits_open[item].get(2).unwrap().clone()).unwrap();
+            let cid = order_command.limits_open[item].get(3).unwrap().clone();
+
+            //  order_name: [数量,方向,价格,c_id]
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                // TraceStack::show_delay(&ts.ins);
+                ts.on_before_send();
+                let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
+                ts.on_after_send();
+
+                match result {
+                    Ok(mut result) => {
+                        result.trace_stack = ts;
 
-        panic!("bitget_swap command_order:该交易所方法未实现")
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        info!(?error);
+                        let mut err_order = Order::new();
+                        err_order.custom_id = cid.clone();
+                        err_order.status = "REMOVE".to_string();
+
+                        self_clone.order_sender.send(err_order).await.unwrap();
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+        let futures = FuturesUnordered::from_iter(handles);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
+        // 撤销订单
+        let mut cancel_handlers = vec![];
+        for item in order_command.cancel.keys() {
+            let order_id = order_command.cancel[item].get(1).unwrap().clone();
+            let custom_id = order_command.cancel[item].get(0).unwrap().clone();
+
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                let result = self_clone.cancel_order(&order_id, &custom_id).await;
+                match result {
+                    Ok(_) => {
+                        // result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        // 取消失败去查订单。
+                        let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
+                        match query_rst {
+                            Ok(order) => {
+                                self_clone.order_sender.send(order).await.unwrap();
+                            }
+                            Err(err) => {
+                                error!("撤单失败,而且查单也失败了,bitget_swap,oid={}, cid={}, err={:?}。", order_id.clone(), custom_id.clone(), err);
+                            }
+                        }
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            cancel_handlers.push(handle)
+        }
+        let futures = FuturesUnordered::from_iter(cancel_handlers);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
+        // 检查订单指令
+        let mut check_handlers = vec![];
+        for item in order_command.check.keys() {
+            let order_id = order_command.check[item].get(1).unwrap().clone();
+            let custom_id = order_command.check[item].get(0).unwrap().clone();
+
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                let result = self_clone.get_order_detail(order_id.as_str(), custom_id.as_str()).await;
+                match result {
+                    Ok(result) => {
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            check_handlers.push(handle)
+        }
+
+        let futures = FuturesUnordered::from_iter(check_handlers);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
     }
 }
 
@@ -642,7 +642,11 @@ pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
     let size = Decimal::from_str(order["size"].as_str().unwrap()).unwrap();
     let status = order["state"].as_str().unwrap();
     let base_volume = Decimal::from_str(order["quoteVolume"].as_str().unwrap()).unwrap();
-    let avg_price = Decimal::from_str(order["priceAvg"].as_str().unwrap()).unwrap();
+    let avg_price = if order["priceAvg"].is_null() || order["priceAvg"].as_str().unwrap().is_empty() {
+        Decimal::ZERO
+    } else {
+        Decimal::from_str(order["priceAvg"].as_str().unwrap().to_string().as_str()).unwrap()
+    };
 
     let amount = size * ct_val;
     let deal_amount = base_volume * ct_val;

+ 1 - 3
standard/src/bitget_swap_handle.rs

@@ -3,7 +3,6 @@ use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
 use serde_json::Value;
 use tokio::time::Instant;
-use tracing::info;
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialOrder};
@@ -50,13 +49,12 @@ pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
 
 // 处理订单信息
 pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
-    info!(?order);
     let price = Decimal::from_str(order["price"].as_str().unwrap().to_string().as_str()).unwrap();
     let size = Decimal::from_str(order["size"].as_str().unwrap().to_string().as_str()).unwrap();
     let binding = order["status"].clone().as_str().unwrap().to_string();
     let status = binding.as_str();
     let acc_base_volume = Decimal::from_str(order["accBaseVolume"].as_str().unwrap().to_string().as_str()).unwrap();
-    let avg_price = if order["priceAvg"].is_null() {
+    let avg_price = if order["priceAvg"].is_null() || order["priceAvg"].as_str().unwrap().is_empty() {
         Decimal::ZERO
     } else {
         Decimal::from_str(order["priceAvg"].as_str().unwrap().to_string().as_str()).unwrap()

+ 1 - 1
standard/src/exchange.rs

@@ -9,7 +9,7 @@ use crate::gate_swap::GateSwap;
 // use crate::kucoin_swap::KucoinSwap;
 // use crate::bitget_spot::BitgetSpot;
 use crate::bybit_swap::BybitSwap;
-use crate::bitget_usdt_swap::BitgetSwap;
+use crate::bitget_swap::BitgetSwap;
 // use crate::kucoin_spot::KucoinSpot;
 // use crate::okx_swap::OkxSwap;
 

+ 1 - 1
standard/src/lib.rs

@@ -34,7 +34,7 @@ mod kucoin_spot;
 pub mod kucoin_spot_handle;
 mod bybit_swap;
 mod bybit_swap_handle;
-mod bitget_usdt_swap;
+mod bitget_swap;
 mod bitget_swap_handle;
 
 /// 持仓模式枚举

+ 17 - 1
strategy/src/bitget_usdt_swap.rs

@@ -9,6 +9,7 @@ use exchanges::bitget_swap_ws::{BitgetSwapLogin, BitgetSwapSubscribeType, Bitget
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::{BitgetSwap};
+use standard::{Position, PositionModeEnum};
 use crate::core::Core;
 use crate::exchange_disguise::on_special_depth;
 use crate::model::OrderInfo;
@@ -109,7 +110,22 @@ async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
             core.update_equity(account).await;
         },
         "positions" => {
-            let positions = standard::handle_info::HandleSwapInfo::handle_position(BitgetSwap, &response, &ct_val);
+            let mut positions = standard::handle_info::HandleSwapInfo::handle_position(BitgetSwap, &response, &ct_val);
+
+            // bitget如果没有仓位不会给0,会给个空数组
+            if positions.is_empty() {
+                positions.push(Position {
+                    symbol: run_symbol.replace("_", "").to_uppercase(),
+                    margin_level: Default::default(),
+                    amount: Default::default(),
+                    frozen_amount: Default::default(),
+                    price: Default::default(),
+                    profit: Default::default(),
+                    position_mode: PositionModeEnum::Both,
+                    margin: Default::default(),
+                });
+            }
+
             let mut core = core_arc_clone.lock().await;
             core.update_position(positions).await;
         },