Forráskód Böngészése

bitget合并完成,断线重连稍后补上。

skyffire 1 éve
szülő
commit
bfec007b78

+ 2 - 1
.gitignore

@@ -8,4 +8,5 @@ config.toml*
 /logs*
 /test_account.toml
 
-config.json
+config.json
+config.json*

+ 1 - 1
Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "as-rust"
-version = "3.1.0"
+version = "3.1.3"
 edition = "2021"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

+ 7 - 7
exchanges/src/binance_swap_rest.rs

@@ -17,7 +17,7 @@ pub struct BinanceSwapRest {
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
-    //登所需参数
+    //登所需参数
     login_param: BTreeMap<String, String>,
     delays: Vec<i64>,
     max_delay: i64,
@@ -326,7 +326,7 @@ impl BinanceSwapRest {
             is_login_param = false
         }
 
-        //请求头配置-如果需要登则存在额外配置
+        //请求头配置-如果需要登则存在额外配置
         let mut params_json: serde_json::Value = serde_json::from_str(params.clone().as_str()).unwrap();
         // let mut body = "".to_string();
         let timestamp = chrono::Utc::now().timestamp_millis().to_string();
@@ -341,12 +341,12 @@ impl BinanceSwapRest {
         }
 
 
-        //是否需要登-- 组装sing
+        //是否需要登-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
+                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
                 return e;
-            } else {//需要登陆-且登陆参数齐全
+            } else {//需要登录-且登录参数齐全
                 //组装sing
                 let sing = Self::sign(secret_key.clone(),
                                       params_json.to_string(),
@@ -360,7 +360,7 @@ impl BinanceSwapRest {
 
         trace!("headers:{:?}", headers);
         let start_time = chrono::Utc::now().timestamp_millis();
-        let response = self.http_toll(
+        let response = self.http_tool(
             format!("{}{}", prefix_url.clone(), request_url.clone()),
             request_type.to_string(),
             params_json.to_string(),
@@ -394,7 +394,7 @@ impl BinanceSwapRest {
     }
 
 
-    async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
+    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
         /****请求接口与 地址*/
         let url = format!("{}{}", self.base_url.to_string(), request_path);
         let request_type = request_type.clone().to_uppercase();

+ 1 - 1
exchanges/src/binance_swap_ws.rs

@@ -189,7 +189,7 @@ impl BinanceSwapWs {
                 info!("binance_usdt_swap socket 连接中……");
                 // ws层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("binance_usdt_swap socket 断连,1s以后重连……");

+ 8 - 8
exchanges/src/bitget_spot_rest.rs

@@ -16,8 +16,8 @@
 //     base_url: String,
 //     client: reqwest::Client,
 //     /*******参数*/
-//     //是否需要登
-//     //登所需参数
+//     //是否需要登
+//     //登所需参数
 //     login_param: BTreeMap<String, String>,
 //     delays: Vec<i64>,
 //     max_delay: i64,
@@ -653,7 +653,7 @@
 //         }
 //
 //
-//         //请求头配置-如果需要登则存在额外配置
+//         //请求头配置-如果需要登则存在额外配置
 //         let mut body = "".to_string();
 //         let timestamp = Self::get_timestamp();
 //         let mut headers = HeaderMap::new();
@@ -664,13 +664,13 @@
 //         }
 //
 //
-//         //是否需要登-- 组装sing
+//         //是否需要登-- 组装sing
 //         if is_login {
 //             if !is_login_param {
-//                 let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
+//                 let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
 //                 return e;
 //             } else {
-//                 //需要登陆-且登陆参数齐全
+//                 //需要登录-且登录参数齐全
 //                 trace!("param:{}", params);
 //                 trace!("body:{}", body);
 //                 //组装sing
@@ -691,7 +691,7 @@
 //         // trace!("headers:{:?}", headers);
 //         let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
 //         let start_time = chrono::Utc::now().timestamp_millis();
-//         let get_response = self.http_toll(
+//         let get_response = self.http_tool(
 //             format!("{}{}", prefix_url.clone(), request_url.clone()),
 //             method.to_string(),
 //             params.clone(),
@@ -735,7 +735,7 @@
 //         sign
 //     }
 //
-//     async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+//     async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
 //         let res_data: ResponseData;
 //         /****请求接口与 地址*/
 //         let url = format!("{}{}", self.base_url.to_string(), request_path);

+ 3 - 3
exchanges/src/bitget_spot_ws.rs

@@ -227,7 +227,7 @@
 //                         });
 //
 //             trace!("---login_json:{0}", login_json.to_string());
-//             trace!("--登:{}", login_json.to_string());
+//             trace!("--登:{}", login_json.to_string());
 //             login_json_str = login_json.to_string();
 //         }
 //         login_json_str
@@ -315,9 +315,9 @@
 //         // {"event":"login","code":0}
 //         if json_value.get("event").is_some() && json_value["event"].as_str() == Option::from("login") {
 //             if json_value.get("code").is_some() && json_value["code"] == 0 {
-//                 res_data.message = format!("登成功");
+//                 res_data.message = format!("登成功");
 //             } else {
-//                 res_data.message = format!("登失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
+//                 res_data.message = format!("登失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
 //             }
 //             res_data.channel = format!("login");
 //             res_data.code = "-200".to_string();

+ 393 - 0
exchanges/src/bitget_swap_rest.rs

@@ -0,0 +1,393 @@
+use std::collections::BTreeMap;
+use reqwest::Client;
+use reqwest::header::HeaderMap;
+use rust_decimal::Decimal;
+use tracing::info;
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+use ring::hmac;
+use rust_decimal::prelude::FromPrimitive;
+use serde_json::Value;
+
+#[derive(Clone, Debug)]
+pub struct BitgetSwapRest {
+    pub label: String,
+    base_url: String,
+    client: Client,
+    login_param: BTreeMap<String, String>,                  // 登录参数
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+}
+
+impl BitgetSwapRest {
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSwapRest {
+        return BitgetSwapRest::new_label("default-BitgetSwapRest".to_string(), is_colo, login_param)
+    }
+
+    // 构造Bitget,可以自定义label
+    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BitgetSwapRest {
+        let base_url = if is_colo {
+            "https://api.bitget.com".to_string()
+        } else {
+            "https://api.bitget.com".to_string()
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道", base_url);
+        } else {
+            info!("走普通通道:{}", base_url);
+        }
+
+        BitgetSwapRest {
+            label,
+            base_url,
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: Decimal::ZERO,
+        }
+    }
+
+    //获取行情信息
+    pub async fn get_contracts(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+            "symbol": symbol,
+            "productType": "USDT-FUTURES"
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v2".to_string(),
+                                "/mix/market/contracts".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //获取行情信息
+    pub async fn get_tickers(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+            "symbol": symbol,
+            "productType": "USDT-FUTURES"
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v2".to_string(),
+                                "/mix/market/ticker".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    // 获取服务器时间
+    pub async fn get_server_time(&mut self) -> ResponseData {
+        let params = serde_json::json!({});
+        self.request("GET".to_string(),
+                     "/api/v2".to_string(),
+                     "/public/time".to_string(),
+                     false,
+                     params.to_string(),
+        ).await
+    }
+
+    // 获取账户信息
+    pub async fn get_account_info(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "productType": "USDT-FUTURES"
+        });
+
+        self.request("GET".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/account/accounts".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 获取仓位信息(单个)
+    pub async fn get_single_position(&mut self, params: Value) -> ResponseData {
+        self.request("GET".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/position/single-position".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 获取所有仓位
+    pub async fn get_all_position(&mut self, params: Value) -> ResponseData {
+        self.request("GET".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/position/all-position".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 下单
+    pub async fn swap_order(&mut self, params: Value) -> ResponseData {
+        self.request("POST".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/order/place-order".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 撤单
+    pub async fn cancel_order(&mut self, params: Value) -> ResponseData {
+        self.request("POST".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/order/cancel-order".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 获取订单详情
+    pub async fn get_order(&mut self, params: Value) -> ResponseData {
+        self.request("GET".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/order/detail".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 获取当前未成交订单
+    pub async fn get_pending_orders(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "productType": "USDT-FUTURES"
+        });
+
+        self.request("GET".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/order/orders-pending".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 调整杠杆
+    pub async fn set_leverage(&mut self, params: Value) -> ResponseData {
+        self.request("POST".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/account/set-leverage".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 调整持仓模式(单向、双向)
+    pub async fn set_position_mode(&mut self, params: Value) -> ResponseData {
+        self.request("POST".to_string(),
+                     "/api/v2".to_string(),
+                     "/mix/account/set-position-mode".to_string(),
+                     true,
+                     params.to_string(),
+        ).await
+    }
+
+    // 发送请求
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         params: String) -> ResponseData
+    {
+        // trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        let mut passphrase = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("pass_key") {
+            passphrase = self.login_param.get("pass_key").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" || passphrase == "" {
+            is_login_param = false
+        }
+
+        //请求头配置-如果需要登陆则存在额外配置
+        let mut body = "".to_string();
+        let timestamp = chrono::Utc::now().timestamp_millis().to_string();
+        let mut headers = HeaderMap::new();
+        headers.insert("Content-Type", "application/json".parse().unwrap());
+        headers.insert("locale", "en-US".parse().unwrap());
+        if method == "POST" {
+            body = params.clone();
+        }
+
+        //是否需要登陆-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.label.clone(), "登录参数错误!".to_string());
+                return e;
+            } else {
+                //需要登陆-且登陆参数齐全
+                // trace!("param:{}", params);
+                // trace!("body:{}", body);
+                //组装sing
+                let sing = Self::sign(secret_key.clone(),
+                                      method.clone(),
+                                      prefix_url.clone(),
+                                      request_url.clone(),
+                                      params.clone(),
+                                      body.clone(),
+                                      timestamp.clone(),
+                );
+                //组装header
+                headers.extend(Self::headers(sing, timestamp, passphrase, access_key));
+            }
+        }
+
+
+        // trace!("headers:{:?}", headers);
+        let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let get_response = self.http_tool(
+            format!("{}{}", prefix_url.clone(), request_url.clone()),
+            method.to_string(),
+            params.clone(),
+            headers,
+        ).await;
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+        let res_data = Self::res_data_analysis(get_response, base_url, params);
+        res_data
+    }
+
+    pub fn res_data_analysis(result: Result<ResponseData, reqwest::Error>, base_url: String, params: String) -> ResponseData {
+        match result {
+            Ok(res_data) => {
+                if res_data.code != "200" {
+                    let json_value = res_data.data;
+                    let code = res_data.code;
+                    let error = ResponseData::new("".to_string(),
+                                                  format!("{}", code),
+                                                  format!("请求地址:{}, 请求参数:{}, 响应结果:{}", base_url, params, res_data.message),
+                                                  json_value);
+                    error
+                } else {
+                    let json_value = res_data.data;
+
+                    let code = json_value["code"].as_str().unwrap();
+                    if code == "00000" {
+                        let data = serde_json::to_string(&json_value["data"]).unwrap();
+                        let success = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), data.parse().unwrap());
+                        success
+                    } else {
+                        let error = ResponseData::new("".to_string(),
+                                                      format!("{}", code),
+                                                      format!("请求地址:{}, 请求参数:{}, 响应结果:{}", base_url, params, json_value.as_str().unwrap()),
+                                                      json_value);
+                        error
+                    }
+                }
+            }
+            Err(err) => {
+                let error = ResponseData::error("".to_string(), format!("json 解析失败:{}", err));
+                error
+            }
+        }
+    }
+
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+
+    // 封装请求头
+    pub fn headers(sign: String, timestamp: String, passphrase: String, access_key: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("ACCESS-KEY", access_key.parse().unwrap());
+        headers.insert("ACCESS-SIGN", sign.parse().unwrap());
+        headers.insert("ACCESS-TIMESTAMP", timestamp.parse().unwrap());
+        headers.insert("ACCESS-PASSPHRASE", passphrase.parse().unwrap());
+        headers
+    }
+
+
+    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+        let res_data: ResponseData;
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url: String = if RestTool::parse_params_to_str(params.clone()) == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()))
+        };
+        // trace!("url:{}", url);
+        // trace!("addrs_url:{}", addrs_url);
+        // let params_json: serde_json::Value = serde_json::from_str(&params).unwrap();
+        // trace!("params_json:{}",params_json);
+        // trace!("headers:{:?}",headers);
+
+        let req = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(url.clone()).body(params).headers(headers),
+            "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+        };
+
+        let response = req.send().await?;
+        if response.status().is_success() {
+            // 读取响应的内容
+            let body = response.text().await?;
+            // trace!("ok-----{}", body);
+            res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body.parse().unwrap());
+        } else {
+            let body = response.text().await?;
+            res_data = ResponseData::error(self.label.clone(), body.to_string())
+        }
+
+        Ok(res_data)
+    }
+
+    // 对请求进行签名处理
+    pub fn sign(secret_key: String,
+            method: String, prefix_url: String, request_url: String,
+            params: String, body: String, timestamp: String) -> String {
+        let url_param_str = RestTool::parse_params_to_str(params);
+        let base_url = if method == "GET" && url_param_str.len() > 0 {
+            format!("{}{}?{}", prefix_url, request_url, url_param_str)
+        } else {
+            format!("{}{}", prefix_url, request_url)
+        };
+
+        // 时间戳 + 请求类型+ 请求参数字符串
+        let message = format!("{}{}{}{}", timestamp, method, base_url, body);
+        // trace!("message:{}",message);
+
+        // 做签名
+        let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let result = hmac::sign(&hmac_key, &message.as_bytes());
+        let sign = base64::encode(result);
+        sign
+    }
+}

+ 327 - 0
exchanges/src/bitget_swap_ws.rs

@@ -0,0 +1,327 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use chrono::{Utc};
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::{json, Value};
+use tracing::{error, info, trace};
+use ring::hmac;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+pub enum BitgetSwapWsType {
+    Public,
+    Private,
+}
+
+#[derive(Clone)]
+pub enum BitgetSwapSubscribeType {
+    PuTrade,
+    PuBooks1,
+
+    PrAccount,
+    PrPosition,
+    PrOrders,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BitgetSwapLogin {
+    pub api_key: String,
+    pub secret_key: String,
+    pub passphrase_key: String,
+}
+
+#[derive(Clone)]
+pub struct BitgetSwapWs {
+    label: String,                                              // 类型
+    address_url: String,                                        // 地址
+    login_param: Option<BitgetSwapLogin>,                       // 账号
+    symbol_s: Vec<String>,                                      // 币对
+    subscribe_types: Vec<BitgetSwapSubscribeType>,              // 订阅
+    heartbeat_time: u64,                                        // 心跳间隔
+}
+
+impl BitgetSwapWs {
+    pub fn new(is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
+        return BitgetSwapWs::new_label("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
+        let address_url = match ws_type {
+            BitgetSwapWsType::Public => {
+                "wss://ws.bitget.com/v2/ws/public".to_string()
+            }
+            BitgetSwapWsType::Private => {
+                "wss://ws.bitget.com/v2/ws/private".to_string()
+            }
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        BitgetSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 20
+        }
+    }
+
+    // 添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BitgetSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+
+    // 手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 小写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("-", "");
+            *symbol = symbol.replace("_", "");
+        }
+        self.symbol_s = b_array;
+    }
+
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                BitgetSwapSubscribeType::PuTrade => false,
+                BitgetSwapSubscribeType::PuBooks1 => false,
+
+                BitgetSwapSubscribeType::PrAccount => true,
+                BitgetSwapSubscribeType::PrOrders => true,
+                BitgetSwapSubscribeType::PrPosition => true
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数*******************************************************/
+    /*******************************************************************************************************/
+    // 枚举解析成json
+    pub fn enum_to_json(symbol: String, subscribe_type: BitgetSwapSubscribeType) -> Value {
+        match subscribe_type {
+            // 公共订阅
+            BitgetSwapSubscribeType::PuTrade => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "trade",
+                    "instId": symbol,
+                })
+            },
+            BitgetSwapSubscribeType::PuBooks1 => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "books1",
+                    "instId": symbol,
+                })
+            },
+
+            // 私有订阅
+            BitgetSwapSubscribeType::PrAccount => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "account",
+                    "coin": "default",
+                })
+            },
+            BitgetSwapSubscribeType::PrPosition => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "positions",
+                    "instId": "default"
+                })
+            },
+            BitgetSwapSubscribeType::PrOrders => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "orders",
+                    "instId": "default"
+                })
+            },
+        }
+    }
+
+    // 订阅信息生成
+    pub fn get_subscription(&self) -> String {
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_json(symbol.clone(), subscribe_type.clone());
+                params.push(ty_str);
+            }
+        }
+        let str = json!({
+            "op": "subscribe",
+            "args": params
+        });
+
+        str.to_string()
+    }
+
+    // 登录数据组装
+    fn log_in_to_str(&self) -> String {
+        let mut login_json_str = "".to_string();
+
+        let mut access_key: String = "".to_string();
+        let mut secret_key: String = "".to_string();
+        let mut passphrase: String = "".to_string();
+        match self.login_param.clone() {
+            None => {}
+            Some(param) => {
+                access_key = param.api_key;
+                secret_key = param.secret_key;
+                passphrase = param.passphrase_key;
+            }
+        }
+        if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
+            let timestamp = Utc::now().timestamp().to_string();
+            // 时间戳 + 请求类型+ 请求参数字符串
+            let message = format!("{}GET{}", timestamp, "/user/verify");
+            let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+            let result = hmac::sign(&hmac_key, &message.as_bytes());
+            let sign = base64::encode(result);
+
+            let login_json = json!({
+                "op": "login",
+                "args": [{
+                    "apiKey": access_key,
+                    "passphrase": passphrase,
+                    "timestamp": timestamp,
+                    "sign": sign
+                }]
+            });
+
+            info!("---login_json: {0}", login_json.to_string());
+            info!("---登陆: {}", login_json.to_string());
+            login_json_str = login_json.to_string();
+        }
+        login_json_str
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                            is_shutdown_arc: Arc<AtomicBool>,
+                                            handle_function: F,
+                                            write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                            write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
+
+        // 登录相关
+        if login_is {
+            let login_str = self.log_in_to_str();
+            info!("发起ws登录: {}", login_str);
+            // TODO 这样写不能断线重连,后面再想办法修吧
+            let write_tx_clone2 = Arc::clone(write_tx_am);
+            AbstractWsMode::send_subscribe(write_tx_clone2, Message::Text(login_str)).await;
+        }
+        // 设置订阅
+        let subscription = self.get_subscription();
+        let subscribe_array = vec![subscription.to_string()];
+        info!(?subscribe_array);
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+        });
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("bitget_usdt_swap socket 连接中……");
+
+                // ws层重连
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+
+                error!("bitget_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************数据解析*******************************************************/
+    /******************************************************************************************************/
+    // 数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    // 数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), Value::Null));
+    }
+    // 数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), Value::Null));
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData {
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), text.clone(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") {
+            if json_value.get("code").is_some() && json_value["code"] == 0 {
+                res_data.message = "登陆成功".to_string();
+            } else {
+                res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
+            }
+            res_data.channel = "login".to_string();
+            res_data.code = "-200".to_string();
+            res_data.data = json_value;
+
+            res_data
+        } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
+            res_data.code = "-201".to_string();
+            res_data.data = json_value.clone();
+            res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
+            res_data.message = "success".to_string();
+            res_data
+        } else if json_value.get("action").is_some() {
+            res_data.data = json_value["data"].clone();
+            if res_data.data == "[]" {
+                res_data.code = "".to_string();
+            } else {
+                res_data.code = "200".to_string();
+            }
+            res_data.message = "success".to_string();
+            res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
+            res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
+            res_data
+        } else {
+            res_data
+        }
+    }
+}

+ 7 - 7
exchanges/src/bybit_swap_rest.rs

@@ -18,7 +18,7 @@ pub struct BybitSwapRest {
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
-    //登所需参数
+    //登所需参数
     login_param: BTreeMap<String, String>,
     delays: Vec<i64>,
     max_delay: i64,
@@ -326,7 +326,7 @@ impl BybitSwapRest {
         }
 
 
-        //请求头配置-如果需要登则存在额外配置
+        //请求头配置-如果需要登则存在额外配置
         let mut body = "".to_string();
         let timestamp = Self::get_timestamp();
         let mut headers = HeaderMap::new();
@@ -338,13 +338,13 @@ impl BybitSwapRest {
         }
 
 
-        //是否需要登-- 组装sing
+        //是否需要登-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
+                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
                 return e;
             } else {
-                //需要登陆-且登陆参数齐全
+                //需要登录-且登录参数齐全
                 trace!("param:{}", params);
                 trace!("body:{}", body);
                 //组装sing
@@ -363,7 +363,7 @@ impl BybitSwapRest {
 
         // trace!("headers:{:?}", headers);
         let start_time = chrono::Utc::now().timestamp_millis();
-        let response_data = self.http_toll(
+        let response_data = self.http_tool(
             format!("{}{}", prefix_url.clone(), request_url.clone()),
             method.to_string(),
             params.clone(),
@@ -408,7 +408,7 @@ impl BybitSwapRest {
         sign
     }
 
-    async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
+    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
         /****请求接口与 地址*/
         let url = format!("{}{}", self.base_url.to_string(), request_path);
         let request_type = request_type.clone().to_uppercase();

+ 1 - 1
exchanges/src/bybit_swap_ws.rs

@@ -249,7 +249,7 @@ impl BybitSwapWs {
 
                 // ws网络层重连
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
+                                                 false, label.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("bybit_usdt_swap socket 断连,1s以后重连……");

+ 7 - 7
exchanges/src/gate_swap_rest.rs

@@ -19,7 +19,7 @@ pub struct GateSwapRest {
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
-    //登所需参数
+    //登所需参数
     login_param: BTreeMap<String, String>,
     delays: Vec<i64>,
     max_delay: i64,
@@ -457,7 +457,7 @@ impl GateSwapRest {
             is_login_param = false
         }
 
-        //请求头配置-如果需要登则存在额外配置
+        //请求头配置-如果需要登则存在额外配置
         let mut body = "".to_string();
         let timestamp = chrono::Utc::now().timestamp().to_string();
 
@@ -474,12 +474,12 @@ impl GateSwapRest {
             body = params.clone();
         }
 
-        //是否需要登-- 组装sing
+        //是否需要登-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
+                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
                 return e;
-            } else {//需要登陆-且登陆参数齐全
+            } else {//需要登录-且登录参数齐全
                 //组装sing
                 let sing = Self::sign(secret_key.clone(),
                                       requesst_type.clone(),
@@ -498,7 +498,7 @@ impl GateSwapRest {
         // trace!("headers:{:?}", headers);
         let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
         let start_time = chrono::Utc::now().timestamp_millis();
-        let response = self.http_toll(
+        let response = self.http_tool(
             base_url.clone(),
             requesst_type.to_string(),
             params.clone(),
@@ -553,7 +553,7 @@ impl GateSwapRest {
     }
 
 
-    async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
+    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
         /****请求接口与 地址*/
         let url = format!("{}{}", self.base_url.to_string(), request_path);
         let request_type = request_type.clone().to_uppercase();

+ 1 - 1
exchanges/src/gate_swap_ws.rs

@@ -301,7 +301,7 @@ impl GateSwapWs {
                 info!("gate_usdt_swap socket 连接中……");
 
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("gate_usdt_swap socket 断连,1s以后重连……");

+ 1 - 1
exchanges/src/http_tool.rs

@@ -11,7 +11,7 @@ impl RestTool {
         RestTool { base_url }
     }
 
-    // pub async fn http_toll(&self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+    // pub async fn http_tool(&self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
     //     let res_data: ResponseData;
     //     /****请求接口与 地址*/
     //     let url = format!("{}{}", self.base_url.to_string(), request_path);

+ 8 - 8
exchanges/src/kucoin_spot_rest.rs

@@ -19,8 +19,8 @@
 //     base_url: String,
 //     client: reqwest::Client,
 //     /*******参数*/
-//     //是否需要登
-//     //登所需参数
+//     //是否需要登
+//     //登所需参数
 //     login_param: BTreeMap<String, String>,
 //     delays: Vec<i64>,
 //     max_delay: i64,
@@ -325,7 +325,7 @@
 //         }
 //
 //
-//         //请求头配置-如果需要登则存在额外配置
+//         //请求头配置-如果需要登则存在额外配置
 //         let mut body = "".to_string();
 //
 //         let timestamp = chrono::Utc::now().timestamp_millis().to_string();
@@ -336,13 +336,13 @@
 //             body = params.clone();
 //         }
 //
-//         //是否需要登-- 组装sing
+//         //是否需要登-- 组装sing
 //         if is_login {
 //             if !is_login_param {
-//                 let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
+//                 let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
 //                 return e;
 //             } else {
-//                 //需要登陆-且登陆参数齐全
+//                 //需要登录-且登录参数齐全
 //                 trace!("param:{}", params);
 //                 trace!("body:{}", body);
 //                 //组装sing
@@ -366,7 +366,7 @@
 //         trace!("headers:{:?}", headers);
 //         let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
 //         let start_time = chrono::Utc::now().timestamp_millis();
-//         let get_response = self.http_toll(
+//         let get_response = self.http_tool(
 //             base_url.clone(),
 //             method.to_string(),
 //             params.clone(),
@@ -441,7 +441,7 @@
 //     }
 //
 //
-//     async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+//     async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
 //         let res_data: ResponseData;
 //         /****请求接口与 地址*/
 //         let url = format!("{}{}", self.base_url.to_string(), request_path);

+ 1 - 1
exchanges/src/kucoin_spot_ws.rs

@@ -55,7 +55,7 @@
 //     address_url: String,
 //     //代理信息
 //     login_param: Option<KucoinSpotLogin>,
-//     //登数据
+//     //登数据
 //     ws_param: KucoinSpotWsParam,
 //     //币对
 //     symbol_s: Vec<String>,

+ 8 - 8
exchanges/src/kucoin_swap_rest.rs

@@ -16,8 +16,8 @@ pub struct KucoinSwapRest {
     base_url: String,
     client: reqwest::Client,
     /*******参数*/
-    //是否需要登
-    //登所需参数
+    //是否需要登
+    //登所需参数
     login_param: BTreeMap<String, String>,
     delays: Vec<i64>,
     max_delay: i64,
@@ -430,7 +430,7 @@ impl KucoinSwapRest {
         }
 
 
-        //请求头配置-如果需要登则存在额外配置
+        //请求头配置-如果需要登则存在额外配置
         let mut body = "".to_string();
 
         let timestamp = chrono::Utc::now().timestamp_millis().to_string();
@@ -441,13 +441,13 @@ impl KucoinSwapRest {
             body = params.clone();
         }
 
-        //是否需要登-- 组装sing
+        //是否需要登-- 组装sing
         if is_login {
             if !is_login_param {
-                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
+                let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
                 return e;
             } else {
-                //需要登陆-且登陆参数齐全
+                //需要登录-且登录参数齐全
                 trace!("param:{}", params);
                 trace!("body:{}", body);
                 //组装sing
@@ -471,7 +471,7 @@ impl KucoinSwapRest {
         // trace!("headers:{:?}", headers);
         let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
         let start_time = chrono::Utc::now().timestamp_millis();
-        let get_response = self.http_toll(
+        let get_response = self.http_tool(
             base_url.clone(),
             method.to_string(),
             params.clone(),
@@ -546,7 +546,7 @@ impl KucoinSwapRest {
     }
 
 
-    async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
         let res_data: ResponseData;
         /****请求接口与 地址*/
         let url = format!("{}{}", self.base_url.to_string(), request_path);

+ 2 - 2
exchanges/src/kucoin_swap_ws.rs

@@ -62,7 +62,7 @@ pub struct KucoinSwapWs {
     address_url: String,
     //账号
     login_param: Option<KucoinSwapLogin>,
-    //登数据
+    //登数据
     ws_param: KucoinSwapWsParam,
     //币对
     symbol_s: Vec<String>,
@@ -328,7 +328,7 @@ impl KucoinSwapWs {
             loop {
                 info!("kucoin_usdt_swap socket 连接中……");
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text, Self::message_ping, Self::message_pong).await;
 
                 error!("kucoin_usdt_swap socket 断连,1s以后重连……");

+ 2 - 0
exchanges/src/lib.rs

@@ -17,6 +17,8 @@ pub mod binance_swap_rest;
 mod utils;
 pub mod bitget_spot_ws;
 pub mod bitget_spot_rest;
+pub mod bitget_swap_ws;
+pub mod bitget_swap_rest;
 pub mod kucoin_spot_ws;
 pub mod kucoin_spot_rest;
 pub mod crypto_spot_ws;

+ 8 - 8
exchanges/src/okx_swap_rest.rs

@@ -15,8 +15,8 @@
 //     base_url: String,
 //     client: reqwest::Client,
 //     /*******参数*/
-//     //是否需要登
-//     //登所需参数
+//     //是否需要登
+//     //登所需参数
 //     login_param: BTreeMap<String, String>,
 //     delays: Vec<i64>,
 //     max_delay: i64,
@@ -394,7 +394,7 @@
 //         }
 //
 //
-//         //请求头配置-如果需要登则存在额外配置
+//         //请求头配置-如果需要登则存在额外配置
 //         let mut body = "".to_string();
 //         let timestamp = Self::get_timestamp();
 //         let mut headers = HeaderMap::new();
@@ -404,13 +404,13 @@
 //         }
 //
 //
-//         //是否需要登-- 组装sing
+//         //是否需要登-- 组装sing
 //         if is_login {
 //             if !is_login_param {
-//                 let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
+//                 let e = ResponseData::error(self.label.clone(), "登参数错误!".to_string());
 //                 return e;
 //             } else {
-//                 //需要登陆-且登陆参数齐全
+//                 //需要登录-且登录参数齐全
 //                 trace!("param:{}", params);
 //                 trace!("body:{}", body);
 //                 //组装sing
@@ -431,7 +431,7 @@
 //         // trace!("headers:{:?}", headers);
 //         let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
 //         let start_time = chrono::Utc::now().timestamp_millis();
-//         let get_response = self.http_toll(
+//         let get_response = self.http_tool(
 //             format!("{}{}", prefix_url.clone(), request_url.clone()),
 //             method.to_string(),
 //             params.clone(),
@@ -475,7 +475,7 @@
 //         sign
 //     }
 //
-//     async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+//     async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
 //         let res_data: ResponseData;
 //         /****请求接口与 地址*/
 //         let url = format!("{}{}", self.base_url.to_string(), request_path);

+ 2 - 2
exchanges/src/okx_swap_ws.rs

@@ -254,7 +254,7 @@
 //                         });
 //
 //             trace!("---login_json:{0}", login_json.to_string());
-//             trace!("--登:{}", login_json.to_string());
+//             trace!("--登:{}", login_json.to_string());
 //             login_json_str = login_json.to_string();
 //         }
 //         login_json_str
@@ -346,7 +346,7 @@
 //             if json_value["event"].as_str() == Option::from("login") &&
 //                 json_value["code"].as_str() == Option::from("0") {
 //                 res_data.code = "-200".to_string();
-//                 res_data.message = format!("登成功!");
+//                 res_data.message = format!("登成功!");
 //             } else if json_value["event"].as_str() == Option::from("error") {
 //                 res_data.code = json_value["code"].to_string();
 //                 res_data.message = format!("订阅失败:{}", json_value["msg"].to_string());

+ 17 - 7
exchanges/src/socket_tool.rs

@@ -31,6 +31,7 @@ pub struct AbstractWsMode {}
 
 impl AbstractWsMode {
     pub async fn ws_connected<T, PI, PO, F, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                    is_first_login: bool,
                                                     label: String,
                                                     is_shutdown_arc: Arc<AtomicBool>,
                                                     handle_function: &F,
@@ -60,11 +61,13 @@ impl AbstractWsMode {
             Ok::<(), Error>(())
         };
 
-        // 订阅消息
-        info!("订阅内容:{:?}", subscribe_array.clone());
-        for s in &subscribe_array {
-            let mut write_lock = ws_write_arc.lock().await;
-            write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+        // 如果不需要事先登录,则直接订阅消息
+        if !is_first_login {
+            info!("订阅内容:{:?}", subscribe_array.clone());
+            for s in &subscribe_array {
+                let mut write_lock = ws_write_arc.lock().await;
+                write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+            }
         }
 
         let ws_write_inner = Arc::clone(&ws_write_arc);
@@ -100,7 +103,12 @@ impl AbstractWsMode {
                     match code.as_str() {
                         "-200" => {
                             //登录成功
-                            trace!("登录成功:{:?}", data);
+                            info!("ws登录成功:{:?}", data);
+                            info!("订阅内容:{:?}", subscribe_array.clone());
+                            for s in &subscribe_array {
+                                let mut write_lock = ws_write_arc.lock().await;
+                                write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+                            }
                         }
                         "-201" => {
                             //订阅成功
@@ -149,6 +157,7 @@ impl AbstractWsMode {
     pub async fn ws_connect_async<T, PI, PO, F, Future>(is_shutdown_arc: Arc<AtomicBool>,
                                                         handle_function: F,
                                                         address_url: String,
+                                                        is_first_login: bool,
                                                         label: String,
                                                         subscribe_array: Vec<String>,
                                                         write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
@@ -179,6 +188,7 @@ impl AbstractWsMode {
                 info!("socket 链接成功,{}。", address_url);
 
                 Self::ws_connected(write_to_socket_rx_arc,
+                                   is_first_login,
                                    label,
                                    is_shutdown_arc,
                                    &handle_function,
@@ -417,7 +427,7 @@ pub fn log_in_to_str() -> String {
                         });
 
         // trace!("---login_json:{0}", login_json.to_string());
-        // trace!("--登:{}", login_json.to_string());
+        // trace!("--登:{}", login_json.to_string());
         login_json_str = login_json.to_string();
     }
     login_json_str

+ 145 - 0
exchanges/tests/bitget_swap_test.rs

@@ -0,0 +1,145 @@
+use std::collections::BTreeMap;
+use serde_json::json;
+use tracing::{info};
+use exchanges::bitget_swap_rest::BitgetSwapRest;
+
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+const PASS_KEY: &str = "";
+
+// 测试账户信息获取
+#[tokio::test]
+async fn rest_get_account_info_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_account_info().await;
+    info!(?rep_data)
+}
+
+// 下单测试
+#[tokio::test]
+async fn post_order_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+    let params = json!({
+        "symbol": "BTCUSDT",
+        "productType": "USDT-FUTURES",
+        "marginMode": "crossed",
+        "marginCoin": "USDT",
+        "size": "0.001",
+        "side": "buy",
+        "tradeSide": "close",
+        "orderType": "market",
+        "clientOid": "1130615113245"
+    });
+    let response = rest.swap_order(params).await;
+
+    info!(?response)
+}
+
+// 撤单测试
+#[tokio::test]
+async fn cancel_order_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+    let params = json!({
+        "symbol": "ethusdt",
+        "productType": "USDT-FUTURES",
+        "clientOid": "1130615111",
+    });
+    let response = rest.cancel_order(params).await;
+
+    info!(?response)
+}
+
+// 获取订单详情测试
+#[tokio::test]
+async fn get_order_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+    let params = json!({
+        "symbol": "ETHUSDT",
+        "productType": "USDT-FUTURES",
+        "clientOid": "1130615132",
+    });
+    let response = rest.get_order(params).await;
+
+    info!(?response)
+}
+
+// 获取当前的pending订单
+#[tokio::test]
+async fn get_pending_orders_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+    let response = rest.get_pending_orders().await;
+
+    info!(?response)
+}
+
+// 设置杠杆测试
+#[tokio::test]
+async fn set_leverage_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+
+
+    let params = json!({
+        "symbol": "ETHUSDT",
+        "productType": "USDT-FUTURES",
+        "marginCoin": "USDT",
+        "leverage": "5"
+    });
+    let response = rest.set_leverage(params).await;
+
+    info!(?response)
+}
+
+// 设置持仓模式
+#[tokio::test]
+async fn set_position_mode_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+
+
+    let params = json!({
+        "productType": "USDT-FUTURES",
+        "posMode": "hedge_mode",
+    });
+    let response = rest.set_position_mode(params).await;
+
+    info!(?response)
+}
+
+// 获取仓位信息
+#[tokio::test]
+async fn get_single_position_test() {
+    global::log_utils::init_log_with_info();
+
+    let mut rest = get_rest();
+    let params = json!({
+        "productType": "USDT-FUTURES",
+        "symbol": "ETHUSDT",
+        "marginCoin": "USDT"
+    });
+    let response = rest.get_single_position(params).await;
+
+    info!(?response)
+}
+
+fn get_rest() -> BitgetSwapRest {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+    btree_map.insert("pass_key".to_string(), PASS_KEY.to_string());
+
+    BitgetSwapRest::new(false, btree_map)
+}

+ 2 - 2
global/src/log_utils.rs

@@ -101,14 +101,14 @@ pub fn final_init(level: &str, port: u32, account_name: String) -> WorkerGuard {
 
     let fmt_layer = fmt::layer()
         .with_timer(local_time.clone())
-        .with_target(false)
+        .with_target(true)
         .with_level(true)
         .with_writer(io::stdout)
         .with_span_events(fmt::format::FmtSpan::FULL);
 
     let file_layer = fmt::layer()
         .with_timer(local_time.clone())
-        .with_target(false)
+        .with_target(true)
         .with_ansi(false)
         .with_level(true)
         .with_writer(non_blocking.clone())

+ 671 - 0
standard/src/bitget_swap.rs

@@ -0,0 +1,671 @@
+use std::collections::{BTreeMap};
+use exchanges::bitget_swap_rest::BitgetSwapRest;
+use std::io::{Error, ErrorKind};
+use tokio::sync::mpsc::Sender;
+use std::str::FromStr;
+use async_trait::async_trait;
+use futures::stream::FuturesUnordered;
+use futures::TryStreamExt;
+use rust_decimal::{Decimal, MathematicalOps};
+use rust_decimal::prelude::ToPrimitive;
+use rust_decimal_macros::dec;
+use serde_json::{json, Value};
+use tokio::spawn;
+use tokio::time::Instant;
+use tracing::{error, info};
+use global::trace_stack::TraceStack;
+use crate::exchange::ExchangeEnum;
+use crate::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Ticker, utils};
+
+#[allow(dead_code)]
+#[derive(Clone)]
+pub struct BitgetSwap {
+    exchange: ExchangeEnum,
+    symbol: String,
+    is_colo: bool,
+    params: BTreeMap<String, String>,
+    request: BitgetSwapRest,
+    market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
+}
+
+impl BitgetSwap {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> BitgetSwap {
+        let market = Market::new();
+        let mut bitget_swap = BitgetSwap {
+            exchange: ExchangeEnum::BitgetSwap,
+            symbol: symbol.to_uppercase(),
+            is_colo,
+            params: params.clone(),
+            request: BitgetSwapRest::new(is_colo, params.clone()),
+            market,
+            order_sender,
+            error_sender,
+        };
+        bitget_swap.market = BitgetSwap::get_market(&mut bitget_swap).await.unwrap();
+        // 修改持仓模式
+        let mode_result = bitget_swap.set_dual_mode("", true).await;
+        match mode_result {
+            Ok(ok) => {
+                info!("BitgetSwap:设置持仓模式成功!{:?}", ok);
+            }
+            Err(error) => {
+                error!("BitgetSwap:设置持仓模式失败!{:?}", error)
+            }
+        }
+        // 设置持仓杠杆
+        // let lever_rate_result = bitget_swap.set_dual_leverage("10").await;
+        // match lever_rate_result {
+        //     Ok(ok) => {
+        //         info!("BitgetSwap:设置持仓杠杆成功!{:?}", ok);
+        //     }
+        //     Err(error) => {
+        //         error!("BitgetSwap:设置持仓杠杆失败!{:?}", error)
+        //     }
+        // }
+
+        return bitget_swap;
+    }
+}
+
+#[async_trait]
+impl Platform for BitgetSwap {
+    fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
+
+    fn get_self_exchange(&self) -> ExchangeEnum { ExchangeEnum::BitgetSwap }
+
+    fn get_self_symbol(&self) -> String { self.symbol.clone() }
+
+    fn get_self_is_colo(&self) -> bool { self.is_colo }
+
+    fn get_self_params(&self) -> BTreeMap<String, String> { self.params.clone() }
+
+    fn get_self_market(&self) -> Market { self.market.clone() }
+
+    fn get_request_delays(&self) -> Vec<i64> {
+        // self.request.get_delays()
+        vec![]
+    }
+
+    fn get_request_avg_delay(&self) -> Decimal {
+        // self.request.get_avg_delay()
+        Decimal::ZERO
+    }
+
+    fn get_request_max_delay(&self) -> i64 { 0 }
+
+    async fn get_server_time(&mut self) -> Result<String, Error> {
+        let res_data = self.request.get_server_time().await;
+        if res_data.code == "200" {
+            let res_data_json = res_data.data;
+            let result = res_data_json["serverTime"].as_str().unwrap().to_string();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_account(&mut self) -> Result<Account, Error> {
+        let response = self.request.get_account_info().await;
+
+        if response.code == "200" {
+            for data in response.data.as_array().unwrap() {
+                if data["marginCoin"].as_str().unwrap() != "USDT" {
+                    continue
+                }
+
+                // 格式化account信息
+                let mut account = Account {
+                    coin: data["marginCoin"].to_string(),
+                    balance: Decimal::from_str(data["accountEquity"].as_str().unwrap()).unwrap(),
+                    available_balance: Decimal::from_str(data["available"].as_str().unwrap()).unwrap(),
+                    frozen_balance: Default::default(),
+                    stocks: Default::default(),
+                    available_stocks: Default::default(),
+                    frozen_stocks: Default::default(),
+                };
+                account.frozen_balance = account.balance - account.available_balance;
+
+                return Ok(account)
+            }
+
+            Err(Error::new(ErrorKind::NotFound, format!("bitget_usdt_swap 未能找到USDT账户:{}。", response.to_string())))
+        } else {
+            Err(Error::new(ErrorKind::Other, response.to_string()))
+        }
+    }
+
+    async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bitget_swap get_spot_account:该交易所方法未实现".to_string()))
+    }
+
+    async fn get_position(&mut self) -> Result<Vec<Position>, Error> { Err(Error::new(ErrorKind::NotFound, "bitget_swap get_position:该交易所方法未实现".to_string())) }
+
+    async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
+        let params = json!({
+            "productType": "USDT-FUTURES",
+            "marginCoin": "USDT"
+        });
+        let response = self.request.get_all_position(params).await;
+        info!(?response);
+
+        if response.code != "200" {
+            return Err(Error::new(ErrorKind::NotFound, format!("bitget_swap 获取仓位异常{:?}", response).to_string()))
+        }
+
+        // 正常处理持仓信息
+        let mut positions: Vec<Position> = vec![];
+        if response.data.is_null() {
+            return Ok(positions)
+        }
+
+        let positions_json = response.data.as_array().unwrap();
+        for position_json in positions_json {
+            let symbol = position_json["symbol"].as_str().unwrap().to_string();
+            let margin_level = Decimal::from_str(position_json["leverage"].as_str().unwrap()).unwrap();
+            let amount = Decimal::from_str(position_json["total"].as_str().unwrap()).unwrap();
+            let frozen_amount = Decimal::from_str(position_json["locked"].as_str().unwrap()).unwrap();
+            let price = Decimal::from_str(position_json["openPriceAvg"].as_str().unwrap()).unwrap();
+            let profit = Decimal::from_str(position_json["unrealizedPL"].as_str().unwrap()).unwrap();
+            let position_mode = match position_json["posMode"].as_str().unwrap() {
+                "hedge_mode" => {
+                    match position_json["holdSide"].as_str().unwrap() {
+                        "short" => {
+                            PositionModeEnum::Short
+                        }
+                        "long" => {
+                            PositionModeEnum::Long
+                        },
+                        _ => {
+                            panic!("bitget_usdt_swap: 未知的持仓模式与持仓方向: {}, {}",
+                                   position_json["posMode"].as_str().unwrap(), position_json["holdSide"].as_str().unwrap())
+                        }
+                    }
+                },
+                "one_way_mode" => {
+                    PositionModeEnum::Both
+                },
+                _ => {
+                    panic!("bitget_usdt_swap: 未知的持仓模式: {}", position_json["posMode"].as_str().unwrap())
+                }
+            };
+            let margin = Decimal::from_str(position_json["marginSize"].as_str().unwrap()).unwrap();
+
+            positions.push(Position {
+                symbol,
+                margin_level,
+                amount,
+                frozen_amount,
+                price,
+                profit,
+                position_mode,
+                margin,
+            });
+        }
+
+        Ok(positions)
+    }
+
+    async fn get_ticker(&mut self) -> Result<Ticker, Error> {
+        return self.get_ticker_symbol(self.symbol.clone()).await
+    }
+
+    async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
+        let symbol_format = utils::format_symbol(symbol.clone(), "");
+        let res_data = self.request.get_tickers(symbol_format).await;
+        if res_data.code == "200" {
+            let res_data_json = res_data.data;
+            let ticker_info = res_data_json[0].clone();
+            let time = (Decimal::from_str(&*ticker_info["ts"].as_str().unwrap()).unwrap() / dec!(1000)).floor().to_i64().unwrap();
+            let result = Ticker {
+                time,
+                high: Decimal::from_str(ticker_info["high24h"].as_str().unwrap()).unwrap(),
+                low: Decimal::from_str(ticker_info["low24h"].as_str().unwrap()).unwrap(),
+                sell: Decimal::from_str(ticker_info["askPr"].as_str().unwrap()).unwrap(),
+                buy: Decimal::from_str(ticker_info["bidPr"].as_str().unwrap()).unwrap(),
+                last: Decimal::from_str(ticker_info["lastPr"].as_str().unwrap()).unwrap(),
+                volume: Decimal::from_str(ticker_info["quoteVolume"].as_str().unwrap()).unwrap(),
+            };
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market(&mut self) -> Result<Market, Error> {
+        self.get_market_symbol(self.symbol.clone()).await
+    }
+
+    async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
+        let symbol_format = utils::format_symbol(symbol.clone(), "");
+        let response = self.request.get_contracts(symbol_format.clone()).await;
+
+        if response.code == "200" {
+            let res_data_json = response.data.as_array().unwrap();
+            let market_info = res_data_json[0].clone();
+
+            info!(?market_info);
+            if !market_info["symbol"].as_str().unwrap().to_string().eq(&symbol_format) {
+                return Err(Error::new(ErrorKind::NotFound, format!("符号未找到:symbol={}, response={:?}", symbol_format, response))).unwrap();
+            }
+
+            let base_asset = market_info["baseCoin"].as_str().unwrap().to_string();
+            let quote_asset = market_info["quoteCoin"].as_str().unwrap().to_string();
+            let price_precision = Decimal::from_str(market_info["pricePlace"].as_str().unwrap()).unwrap();
+            let tick_size = Decimal::TEN.powd(Decimal::NEGATIVE_ONE * price_precision);
+            let amount_precision = Decimal::from_str(market_info["volumePlace"].as_str().unwrap()).unwrap();
+            let amount_size = Decimal::TEN.powd(Decimal::NEGATIVE_ONE * amount_precision);
+            let min_qty = Decimal::NEGATIVE_ONE;
+            let max_qty = Decimal::NEGATIVE_ONE;
+            // let ct_val = Decimal::from_str(&market_info["sizeMultiplier"].as_str().unwrap()).unwrap();
+            let ct_val = Decimal::ONE;
+
+            let result = Market {
+                symbol: format!("{}_{}", base_asset, quote_asset),
+                base_asset,
+                quote_asset,
+                tick_size,
+                amount_size,
+                price_precision,
+                amount_precision,
+                min_qty,
+                max_qty,
+                min_notional: min_qty,
+                max_notional: max_qty,
+                ct_val,
+            };
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, response.to_string()))
+        }
+    }
+
+    async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol_format = utils::format_symbol(self.symbol.clone(), "");
+        let params = json!({
+            "symbol": symbol_format,
+            "productType": "USDT-FUTURES",
+            "clientOid": custom_id,
+            "orderId": order_id
+        });
+
+        let ct_val = self.market.ct_val;
+        let response = self.request.get_order(params).await;
+        if response.code == "200" {
+            let res_data_json = response.data;
+            let result = format_order_item(res_data_json, ct_val);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, response.to_string()))
+        }
+    }
+
+    async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bitget_swap get_orders_list:该交易所方法未实现".to_string()))
+        // let symbol_format = utils::format_symbol(self.symbol.clone(), "");
+        // let ct_val = self.market.ct_val;
+        // let res_data = self.request.get_unfilled_orders(symbol_format.to_string(), "".to_string(), "".to_string(), "".to_string(), "100".to_string(), "".to_string()).await;
+        // if res_data.code == "200" {
+        //     let res_data_str = &res_data.data;
+        //     let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
+        //     let result = res_data_json.iter().map(|item| format_order_item(item.clone(), ct_val)).collect();
+        //     Ok(result)
+        // } else {
+        //     Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        // }
+    }
+
+    async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let ct_val = self.market.ct_val;
+
+        return self.take_order_symbol(self.symbol.clone(), ct_val, custom_id, origin_side, price, amount).await;
+    }
+
+    async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol_format = utils::format_symbol(symbol, "");
+        let final_size = amount / ct_val;
+        let mut params = json!({
+            "symbol": symbol_format,
+            "clientOid": custom_id,
+            "productType": "USDT-FUTURES",
+            "marginMode": "crossed",
+            "marginCoin": "USDT",
+            "size": final_size.to_string()
+        });
+        if price.eq(&Decimal::ZERO) {
+            params["orderType"] = json!("market");
+            params["force"] = json!("gtc");
+        } else {
+            params["price"] = json!(price.to_string());
+            params["orderType"] = json!("limit");
+            params["force"] = json!("gtc");
+        };
+        match origin_side {
+            "kd" => {
+                params["side"] = json!("buy");
+                params["tradeSide"] = json!("open");
+            }
+            "pd" => {
+                params["side"] = json!("buy");
+                params["tradeSide"] = json!("close");
+            }
+            "kk" => {
+                params["side"] = json!("sell");
+                params["tradeSide"] = json!("open");
+            }
+            "pk" => {
+                params["side"] = json!("sell");
+                params["tradeSide"] = json!("close");
+            }
+            _ => { panic!("bitget_usdt_swap 下单参数错误"); }
+        };
+        let res_data = self.request.swap_order(params).await;
+        if res_data.code != "200" {
+            return Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+
+        let res_data_json = res_data.data;
+        let result = Order {
+            id: res_data_json["orderId"].as_str().unwrap().to_string(),
+            custom_id: res_data_json["clientOid"].as_str().unwrap().to_string(),
+            price: Decimal::ZERO,
+            amount: Decimal::ZERO,
+            deal_amount: Decimal::ZERO,
+            avg_price: Decimal::ZERO,
+            status: "NEW".to_string(),
+            order_type: "".to_string(),
+            trace_stack: TraceStack::new(0, Instant::now()).on_special("328 bitget_swap".to_string()),
+        };
+        return Ok(result)
+    }
+
+    async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol_format = utils::format_symbol(self.symbol.clone(), "");
+        let params = json!({
+            "symbol": symbol_format,
+            "productType": "USDT-FUTURES",
+            "clientOid": custom_id,
+            "orderId": order_id
+        });
+        let response = self.request.cancel_order(params).await;
+
+        // 取消失败,进行报错
+        if response.code != "200" {
+            return Err(Error::new(ErrorKind::NotFound, response.to_string()));
+        }
+
+        let res_data_json = response.data;
+        let result = Order {
+            id: res_data_json["orderId"].as_str().unwrap().to_string(),
+            custom_id: res_data_json["clientOid"].as_str().unwrap().to_string(),
+            price: Decimal::ZERO,
+            amount: Decimal::ZERO,
+            deal_amount: Decimal::ZERO,
+            avg_price: Decimal::ZERO,
+            status: "REMOVE".to_string(),
+            order_type: "".to_string(),
+            trace_stack: TraceStack::new(0, Instant::now()).on_special("443 bitget_swap".to_string()),
+        };
+        Ok(result)
+    }
+
+    async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bitget_swap cancel_orders:该交易所方法未实现".to_string()))
+    }
+
+    async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
+        let response = self.request.get_pending_orders().await;
+        info!(?response);
+        if response.code == "200" {
+            let mut result = vec![];
+            let symbol_format = utils::format_symbol(self.symbol.clone(), "");
+
+            if !response.data["entrustedList"].is_null() {
+                let orders_res_data_json = response.data["entrustedList"].as_array().unwrap();
+                for order in orders_res_data_json {
+                    let order_id = order["orderId"].as_str().unwrap().to_string();
+                    let symbol = symbol_format.clone();
+                    let params = json!({
+                        "symbol": symbol,
+                        "productType": "USDT-FUTURES",
+                        "orderId": order_id,
+                    });
+                    let cancel_res_data = self.request.cancel_order(params).await;
+                    if cancel_res_data.code == "200" {
+                        let cancel_res_data_json = cancel_res_data.data;
+                        result.push(Order {
+                            id: cancel_res_data_json["orderId"].as_str().unwrap().to_string(),
+                            custom_id: cancel_res_data_json["clientOid"].as_str().unwrap().to_string(),
+                            price: Decimal::ZERO,
+                            amount: Decimal::ZERO,
+                            deal_amount: Decimal::ZERO,
+                            avg_price: Decimal::ZERO,
+                            status: "REMOVE".to_string(),
+                            order_type: "".to_string(),
+                            trace_stack: TraceStack::new(0, Instant::now()).on_special("457 bitget_swap".to_string()),
+                        });
+                    } else {
+                        return Err(Error::new(ErrorKind::Other, cancel_res_data.to_string()));
+                    }
+                }
+            }
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, response.to_string()))
+        }
+    }
+
+    async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bitget_swap take_stop_loss_order:该交易所方法未实现".to_string()))
+    }
+
+    async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bitget_swap cancel_stop_loss_order:该交易所方法未实现".to_string()))
+    }
+
+    async fn set_dual_mode(&mut self, _coin: &str, is_dual_mode: bool) -> Result<String, Error> {
+        let pos_mode = if is_dual_mode {
+            "hedge_mode"
+        } else {
+            "one_way_mode"
+        };
+        let params = json!({
+            "productType": "USDT-FUTURES",
+            "posMode": pos_mode,
+        });
+        let response = self.request.set_position_mode(params).await;
+
+        if response.code != "200" {
+            return Err(Error::new(ErrorKind::Other, format!("设置持仓模式失败:{:?}", response).to_string()))
+        }
+
+        return Ok(response.data.to_string());
+    }
+
+    async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
+        let params = json!({
+            "symbol": "ETHUSDT",
+            "productType": "USDT-FUTURES",
+            "marginCoin": "USDT",
+            "leverage": leverage
+        });
+        let response = self.request.set_leverage(params).await;
+
+        if response.code != "200" {
+            return Err(Error::new(ErrorKind::Other, format!("设置杠杆失败:{:?}", response).to_string()))
+        }
+
+        return Ok(response.data.to_string());
+    }
+
+    async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bitget_swap set_auto_deposit_status:该交易所方法未实现".to_string()))
+    }
+
+    async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bitget_swap wallet_transfers:该交易所方法未实现".to_string()))
+        // let coin_format = coin.to_string().to_uppercase();
+        // let res_data = self.request.wallet_transfer(from.to_string(), to.to_string(), amount.to_string(), coin_format.clone(), "".to_string(), "".to_string()).await;
+        // if res_data.code == "200" {
+        //     let res_data_str = &res_data.data;
+        //     let result = res_data_str.clone();
+        //     Ok(result)
+        // } else {
+        //     Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        // }
+    }
+
+    async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
+        let mut handles = vec![];
+
+        // 下单指令
+        for item in order_command.limits_open.keys() {
+            let mut ts = trace_stack.clone();
+
+            let amount = Decimal::from_str(&*order_command.limits_open[item].get(0).unwrap().clone()).unwrap();
+            let side = order_command.limits_open[item].get(1).unwrap().clone();
+            let price = Decimal::from_str(&*order_command.limits_open[item].get(2).unwrap().clone()).unwrap();
+            let cid = order_command.limits_open[item].get(3).unwrap().clone();
+
+            //  order_name: [数量,方向,价格,c_id]
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                // TraceStack::show_delay(&ts.ins);
+                ts.on_before_send();
+                let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
+                ts.on_after_send();
+
+                match result {
+                    Ok(mut result) => {
+                        result.trace_stack = ts;
+
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        info!(?error);
+                        let mut err_order = Order::new();
+                        err_order.custom_id = cid.clone();
+                        err_order.status = "REMOVE".to_string();
+
+                        self_clone.order_sender.send(err_order).await.unwrap();
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+        let futures = FuturesUnordered::from_iter(handles);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
+        // 撤销订单
+        let mut cancel_handlers = vec![];
+        for item in order_command.cancel.keys() {
+            let order_id = order_command.cancel[item].get(1).unwrap().clone();
+            let custom_id = order_command.cancel[item].get(0).unwrap().clone();
+
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                let result = self_clone.cancel_order(&order_id, &custom_id).await;
+                match result {
+                    Ok(_) => {
+                        // result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        // 取消失败去查订单。
+                        let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
+                        match query_rst {
+                            Ok(order) => {
+                                self_clone.order_sender.send(order).await.unwrap();
+                            }
+                            Err(err) => {
+                                error!("撤单失败,而且查单也失败了,bitget_swap,oid={}, cid={}, err={:?}。", order_id.clone(), custom_id.clone(), err);
+                            }
+                        }
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            cancel_handlers.push(handle)
+        }
+        let futures = FuturesUnordered::from_iter(cancel_handlers);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+
+        // 检查订单指令
+        let mut check_handlers = vec![];
+        for item in order_command.check.keys() {
+            let order_id = order_command.check[item].get(1).unwrap().clone();
+            let custom_id = order_command.check[item].get(0).unwrap().clone();
+
+            let mut self_clone = self.clone();
+            let handle = spawn(async move {
+                let result = self_clone.get_order_detail(order_id.as_str(), custom_id.as_str()).await;
+                match result {
+                    Ok(result) => {
+                        self_clone.order_sender.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        self_clone.error_sender.send(error).await.unwrap();
+                    }
+                }
+            });
+            check_handlers.push(handle)
+        }
+
+        let futures = FuturesUnordered::from_iter(check_handlers);
+        // 等待所有任务完成
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+    }
+}
+
+// pub fn format_account_info(balance_data: Value) -> Account {
+//     let balance_coin = balance_data["coin"].as_str().unwrap().to_string().to_uppercase();
+//     let available_balance = Decimal::from_str(balance_data["available"].as_str().unwrap()).unwrap();
+//     let frozen_balance = Decimal::from_str(balance_data["frozen"].as_str().unwrap()).unwrap();
+//     let balance = available_balance + frozen_balance;
+//
+//     Account {
+//         coin: balance_coin,
+//         balance,
+//         available_balance,
+//         frozen_balance,
+//         stocks: Decimal::ZERO,
+//         available_stocks: Decimal::ZERO,
+//         frozen_stocks: Decimal::ZERO,
+//     }
+// }
+
+pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
+    let price = Decimal::from_str(order["price"].as_str().unwrap_or(order["priceAvg"].as_str().unwrap())).unwrap();
+    let size = Decimal::from_str(order["size"].as_str().unwrap()).unwrap();
+    let status = order["state"].as_str().unwrap();
+    let base_volume = Decimal::from_str(order["quoteVolume"].as_str().unwrap()).unwrap();
+    let avg_price = if order["priceAvg"].is_null() || order["priceAvg"].as_str().unwrap().is_empty() {
+        Decimal::ZERO
+    } else {
+        Decimal::from_str(order["priceAvg"].as_str().unwrap().to_string().as_str()).unwrap()
+    };
+
+    let amount = size * ct_val;
+    let deal_amount = base_volume * ct_val;
+    let custom_status = if ["filled", "cancelled"].contains(&status) {
+        "REMOVE".to_string()
+    } else if ["init", "live", "new", "partially_filled"].contains(&status) {
+        "NEW".to_string()
+    } else {
+        "NULL".to_string()
+    };
+    Order {
+        id: order["orderId"].as_str().unwrap().to_string(),
+        custom_id: order["clientOid"].as_str().unwrap().to_string(),
+        price,
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: order["orderType"].as_str().unwrap().to_string(),
+        trace_stack: TraceStack::new(0, Instant::now()).on_special("700 bitget_swap".to_string()),
+    }
+}

+ 181 - 0
standard/src/bitget_swap_handle.rs

@@ -0,0 +1,181 @@
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use serde_json::Value;
+use tokio::time::Instant;
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialOrder};
+
+// 处理账号信息
+pub fn handle_account_info(response: &ResponseData, _symbol: &String) -> Account {
+    let mut rst = Account::new();
+
+    for data in response.data.as_array().unwrap() {
+        if data["marginCoin"].as_str().unwrap() != "USDT" {
+            continue
+        }
+
+        // 格式化account信息
+        let mut account = Account {
+            coin: data["marginCoin"].to_string(),
+            balance: Decimal::from_str(data["usdtEquity"].as_str().unwrap()).unwrap(),
+            available_balance: Decimal::from_str(data["available"].as_str().unwrap()).unwrap(),
+            frozen_balance: Decimal::from_str(data["frozen"].as_str().unwrap()).unwrap(),
+            stocks: Default::default(),
+            available_stocks: Default::default(),
+            frozen_stocks: Default::default(),
+        };
+        account.frozen_balance = account.balance - account.available_balance;
+
+        rst = account
+    }
+
+    return rst;
+}
+
+// 处理order信息
+pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
+    let res_data_json = res_data.data.as_array().unwrap();
+    let mut order_info = Vec::new();
+    for item in res_data_json.iter() {
+        order_info.push(format_order_item(item.clone(), ct_val));
+    }
+    SpecialOrder {
+        name: res_data.label,
+        order: order_info,
+    }
+}
+
+// 处理订单信息
+pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
+    let price = Decimal::from_str(order["price"].as_str().unwrap().to_string().as_str()).unwrap();
+    let size = Decimal::from_str(order["size"].as_str().unwrap().to_string().as_str()).unwrap();
+    let binding = order["status"].clone().as_str().unwrap().to_string();
+    let status = binding.as_str();
+    let acc_base_volume = Decimal::from_str(order["accBaseVolume"].as_str().unwrap().to_string().as_str()).unwrap();
+    let avg_price = if order["priceAvg"].is_null() || order["priceAvg"].as_str().unwrap().is_empty() {
+        Decimal::ZERO
+    } else {
+        Decimal::from_str(order["priceAvg"].as_str().unwrap().to_string().as_str()).unwrap()
+    };
+    let c_id = if order["clientOid"].is_null() {
+        ""
+    } else {
+        order["clientOid"].as_str().unwrap()
+    };
+
+    let amount = size * ct_val;
+    let deal_amount = acc_base_volume * ct_val;
+    let custom_status = if ["filled", "canceled"].contains(&status) {
+        "REMOVE".to_string()
+    } else if ["init", "live", "new", "partially_filled"].contains(&status) {
+        "NEW".to_string()
+    } else {
+        "NULL".to_string()
+    };
+    Order {
+        id: order["orderId"].as_str().unwrap().to_string(),
+        custom_id: c_id.to_string(),
+        price,
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: order["orderType"].as_str().unwrap().to_string(),
+        trace_stack: TraceStack::new(0, Instant::now()).on_special("86 bitget_swap_handle".to_string()),
+    }
+}
+
+// 格式化深度信息
+pub fn format_depth_items(value: Value) -> Vec<MarketOrder> {
+    let mut depth_items: Vec<MarketOrder> = vec![];
+    for value in value.as_array().unwrap() {
+        depth_items.push(MarketOrder {
+            price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
+            amount: Decimal::from_str(value[1].as_str().unwrap()).unwrap(),
+        })
+    }
+    return depth_items;
+}
+
+// 处理position信息
+pub fn handle_position(res_data: &ResponseData, ct_val: &Decimal) -> Vec<Position> {
+    let res_data_json = res_data.data.as_array().unwrap();
+    res_data_json.iter().map(|item| { format_position_item(item, ct_val) }).collect()
+}
+
+pub fn format_position_item(position_json: &Value, ct_val: &Decimal) -> Position {
+    let symbol = position_json["instId"].as_str().unwrap().to_string();
+    let margin_level = Decimal::from_i64(position_json["leverage"].as_i64().unwrap()).unwrap();
+    let amount = Decimal::from_str(position_json["total"].as_str().unwrap()).unwrap() * ct_val;
+    let frozen_amount = Decimal::from_str(position_json["frozen"].as_str().unwrap()).unwrap() * ct_val;
+    let price = Decimal::from_str(position_json["openPriceAvg"].as_str().unwrap()).unwrap();
+    let profit = Decimal::from_str(position_json["unrealizedPL"].as_str().unwrap()).unwrap();
+    let position_mode = match position_json["posMode"].as_str().unwrap() {
+        "hedge_mode" => {
+            match position_json["holdSide"].as_str().unwrap() {
+                "short" => {
+                    PositionModeEnum::Short
+                }
+                "long" => {
+                    PositionModeEnum::Long
+                },
+                _ => {
+                    panic!("bitget_usdt_swap: 未知的持仓模式与持仓方向: {}, {}",
+                           position_json["posMode"].as_str().unwrap(), position_json["holdSide"].as_str().unwrap())
+                }
+            }
+        },
+        "one_way_mode" => {
+            PositionModeEnum::Both
+        },
+        _ => {
+            panic!("bitget_usdt_swap: 未知的持仓模式: {}", position_json["posMode"].as_str().unwrap())
+        }
+    };
+    let margin = Decimal::from_str(position_json["marginSize"].as_str().unwrap()).unwrap();
+
+    Position {
+        symbol,
+        margin_level,
+        amount,
+        frozen_amount,
+        price,
+        profit,
+        position_mode,
+        margin,
+    }
+}
+
+// 处理特殊深度数据
+// pub fn handle_special_depth(res_data: ResponseData) -> SpecialDepth {
+//     HandleSwapInfo::handle_special_depth(ExchangeEnum::BitgetSwap, res_data)
+// }
+
+// // 处理特殊Ticker信息
+// pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
+//     let res_data_str = res_data.data;
+//     let res_data_json: Vec<serde_json::Value> = serde_json::from_str(&*res_data_str).unwrap();
+//     format_special_ticker(res_data_json[0].clone(), res_data.label)
+// }
+//
+// pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+//     let bp = Decimal::from_str(data["bidPr"].as_str().unwrap()).unwrap();
+//     let bq = Decimal::from_str(data["bidSz"].as_str().unwrap()).unwrap();
+//     let ap = Decimal::from_str(data["askPr"].as_str().unwrap()).unwrap();
+//     let aq = Decimal::from_str(data["askSz"].as_str().unwrap()).unwrap();
+//     let mp = (bp + ap) * dec!(0.5);
+//     let t = Decimal::from_str(data["ts"].as_str().unwrap()).unwrap();
+//     let create_at = data["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
+//
+//     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
+//     let depth_info = vec![bp, bq, ap, aq];
+//     SpecialDepth {
+//         name: label,
+//         depth: depth_info,
+//         ticker: ticker_info,
+//         t,
+//         create_at,
+//     }
+// }

+ 8 - 0
standard/src/exchange.rs

@@ -9,6 +9,7 @@ use crate::gate_swap::GateSwap;
 use crate::kucoin_swap::KucoinSwap;
 // use crate::bitget_spot::BitgetSpot;
 use crate::bybit_swap::BybitSwap;
+use crate::bitget_swap::BitgetSwap;
 // use crate::kucoin_spot::KucoinSpot;
 // use crate::okx_swap::OkxSwap;
 
@@ -28,6 +29,7 @@ pub enum ExchangeEnum {
     // KucoinSpot,
     // OkxSwap,
     // BitgetSpot,
+    BitgetSwap,
     BybitSwap
 }
 
@@ -94,9 +96,15 @@ impl Exchange {
             // ExchangeEnum::BitgetSpot => {
             //     Box::new(BitgetSpot::new(symbol, is_colo, params, order_sender, error_sender).await)
             // }
+            ExchangeEnum::BitgetSwap => {
+                Box::new(BitgetSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
+            }
             ExchangeEnum::BybitSwap => {
                 Box::new(BybitSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
         }
     }
 }
+
+
+

+ 3 - 3
standard/src/gate_swap.rs

@@ -10,7 +10,7 @@ use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use serde_json::{json, Value};
 use tokio::spawn;
 use tokio::time::Instant;
-use tracing::{error, trace};
+use tracing::{error, info};
 use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
 use exchanges::gate_swap_rest::GateSwapRest;
 use global::trace_stack::TraceStack;
@@ -46,8 +46,8 @@ impl GateSwap {
         let symbol_array: Vec<&str> = symbol.split("_").collect();
         let mode_result = gate_swap.set_dual_mode(symbol_array[1], true).await;
         match mode_result {
-            Ok(_) => {
-                trace!("Gate:设置持仓模式成功!")
+            Ok(ok) => {
+                info!("Gate:设置持仓模式成功!{:?}", ok);
             }
             Err(error) => {
                 error!("Gate:设置持仓模式失败!mode_result={}", error)

+ 31 - 8
standard/src/handle_info.rs

@@ -3,11 +3,11 @@ use std::str::FromStr;
 use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
-use tracing::{error};
+use tracing::{error, info};
 use exchanges::response_base::ResponseData;
 use global::public_params;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, kucoin_handle};
+use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle};
 use crate::{Account, MarketOrder, Position, SpecialDepth, SpecialOrder, SpecialTicker};
 
 #[allow(dead_code)]
@@ -32,9 +32,9 @@ impl HandleSwapInfo {
             ExchangeEnum::GateSwap => {
                 gate_swap_handle::handle_account_info(res_data, symbol)
             }
-            ExchangeEnum::KucoinSwap => {
-                kucoin_handle::handle_account_info(res_data, symbol)
-            }
+            // ExchangeEnum::KucoinSwap => {
+            //     kucoin_handle::handle_account_info(res_data, symbol)
+            // }
             // ExchangeEnum::KucoinSpot => {
             //     kucoin_spot_handle::handle_account_info(res_data, symbol)
             // }
@@ -44,6 +44,9 @@ impl HandleSwapInfo {
             // ExchangeEnum::BitgetSpot => {
             //     bitget_spot_handle::handle_account_info(res_data, symbol)
             // },
+            ExchangeEnum::BitgetSwap => {
+                bitget_swap_handle::handle_account_info(res_data, symbol)
+            },
             ExchangeEnum::BybitSwap => {
                 bybit_swap_handle::handle_account_info(res_data, symbol)
             }
@@ -53,7 +56,7 @@ impl HandleSwapInfo {
             }
         }
     }
-    // 处理特殊Ticket信息
+    // 处理Ticker信息
     pub fn handle_book_ticker(exchange: ExchangeEnum, res_data: &ResponseData) -> SpecialDepth {
         match exchange {
             // ExchangeEnum::BinanceSpot => {
@@ -71,12 +74,20 @@ impl HandleSwapInfo {
             // ExchangeEnum::KucoinSpot => {
             //     kucoin_spot_handle::handle_special_ticker(res_data)
             // }
+            // ExchangeEnum::KucoinSpot => {
+            //     kucoin_spot_handle::handle_special_ticker(res_data)
+            // }
             // ExchangeEnum::OkxSwap => {
             //     okx_handle::handle_special_ticker(res_data)
             // }
             // ExchangeEnum::BitgetSpot => {
             //     bitget_spot_handle::handle_special_ticker(res_data)
             // },
+            ExchangeEnum::BitgetSwap => {
+                info!(?res_data);
+                panic!("BitgetSwap 85 未实现格式化");
+                // bitget_swap_handle::handle_special_ticker(res_data)
+            },
             ExchangeEnum::BybitSwap => {
                 bybit_swap_handle::handle_ticker(res_data)
             }
@@ -106,6 +117,9 @@ impl HandleSwapInfo {
             //     error!("暂未提供此交易所方法!handle_position:{:?}", exchange);
             //     panic!("暂未提供此交易所方法!handle_position:{:?}", exchange);
             // },
+            ExchangeEnum::BitgetSwap => {
+                bitget_swap_handle::handle_position(res_data, ct_val)
+            },
             ExchangeEnum::BybitSwap => {
                 bybit_swap_handle::handle_position(res_data, ct_val)
             }
@@ -133,6 +147,9 @@ impl HandleSwapInfo {
             // ExchangeEnum::BitgetSpot => {
             //     bitget_spot_handle::handle_order(res_data, ct_val)
             // },
+            ExchangeEnum::BitgetSwap => {
+                bitget_swap_handle::handle_order(res_data, ct_val)
+            },
             ExchangeEnum::BybitSwap => {
                 bybit_swap_handle::handle_order(res_data, ct_val)
             }
@@ -150,7 +167,7 @@ impl HandleSwapInfo {
 }
 
 
-pub fn make_special_depth(label: String, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, t: Decimal, create_at: i64) -> SpecialDepth{
+pub fn make_special_depth(label: String, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, t: Decimal, create_at: i64) -> SpecialDepth {
     depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
     depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
     // TODO 不排序的话,有4us可以省下来。
@@ -264,6 +281,12 @@ pub fn format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthPar
         //     t = Decimal::from_str(res_data_json[0]["ts"].as_str().unwrap()).unwrap();
         //     create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
         // }
+        ExchangeEnum::BitgetSwap => {
+            depth_asks = bitget_swap_handle::format_depth_items(res_data.data[0]["asks"].clone());
+            depth_bids = bitget_swap_handle::format_depth_items(res_data.data[0]["bids"].clone());
+            t = Decimal::from_str(res_data.data[0]["ts"].as_str().unwrap()).unwrap();
+            create_at = res_data.data[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
+        }
         ExchangeEnum::BybitSwap => {
             depth_asks = bybit_swap_handle::format_depth_items(res_data.data["a"].clone());
             depth_bids = bybit_swap_handle::format_depth_items(res_data.data["b"].clone());
@@ -272,7 +295,7 @@ pub fn format_depth(exchange: ExchangeEnum, res_data: &ResponseData) -> DepthPar
         }
     }
 
-    DepthParam{
+    DepthParam {
         depth_asks,
         depth_bids,
         t,

+ 5 - 3
standard/src/lib.rs

@@ -34,6 +34,8 @@ mod kucoin_spot;
 pub mod kucoin_spot_handle;
 mod bybit_swap;
 mod bybit_swap_handle;
+mod bitget_swap;
+mod bitget_swap_handle;
 
 /// 持仓模式枚举
 /// - `Both`:单持仓方向
@@ -384,9 +386,9 @@ impl Market {
 
 /// Position结构体(仓位信息)
 /// - `symbol(String)`: 币对
-/// - `margin_level(String)`: 持仓杆杠大小
-/// - `amount(String)`: 持仓量
-/// - `frozen_amount(String)`: 仓位冻结量
+/// - `margin_level(Decimal)`: 持仓杆杠大小
+/// - `amount(Decimal)`: 持仓量
+/// - `frozen_amount(Decimal)`: 仓位冻结量
 /// - `price(Decimal)`: 持仓均价
 /// - `profit(Decimal)`: 持仓浮动盈亏
 /// - `position_mode(PositionModeEnum)`: 持仓模式

+ 187 - 0
strategy/src/bitget_usdt_swap.rs

@@ -0,0 +1,187 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use rust_decimal::Decimal;
+use tokio::spawn;
+use tokio::sync::Mutex;
+use tracing::{error, info};
+use exchanges::bitget_swap_ws::{BitgetSwapLogin, BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::{BitgetSwap};
+use standard::{Position, PositionModeEnum};
+use crate::core::Core;
+use crate::exchange_disguise::on_special_depth;
+use crate::model::OrderInfo;
+
+pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
+                                  is_trade: bool,
+                                  core_arc: Arc<Mutex<Core>>,
+                                  name: String,
+                                  symbols: Vec<String>,
+                                  is_colo: bool,
+                                  exchange_params: BTreeMap<String, String>) {
+    // 开启公共频道
+    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+
+    // 开启公共连接
+    let is_shutdown_arc_c1 = is_shutdown_arc.clone();
+    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+    let name_clone = name.clone();
+    let core_arc_clone = core_arc.clone();
+    let symbols_clone = symbols.clone();
+    spawn(async move {
+        // 构建链接ws
+        let mut bg_public = BitgetSwapWs::new_label(name_clone.clone(),
+                                                    is_colo,
+                                                    None,
+                                                    BitgetSwapWsType::Public);
+
+        // 消费数据的函数
+        let mut update_flag_u = Decimal::ZERO;
+        let fun = move |data: ResponseData| {
+            let core_arc_cc = core_arc_clone.clone();
+
+            async move {
+                on_public_data(core_arc_cc, &mut update_flag_u, data).await
+            }
+        };
+
+        // 准备链接
+        bg_public.set_subscribe(vec![BitgetSwapSubscribeType::PuBooks1]); // 只用订阅深度数据
+        bg_public.set_symbols(symbols_clone);
+        bg_public.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_public, write_rx_public).await.expect("bitget_usdt_swap 链接有异常")
+    });
+
+    // 不需要交易就不用开启私有频道了
+    if !is_trade {
+        return;
+    }
+
+    // 开启私有频道
+    let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
+
+    // 开启公共连接
+    let is_shutdown_arc_c1 = is_shutdown_arc.clone();
+    let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+    spawn(async move {
+        // 构建链接ws
+        let mut bg_private = BitgetSwapWs::new_label(name.clone(),
+                                                     is_colo,
+                                                     Some(parse_btree_map_to_bitget_swap_login(exchange_params)),
+                                                     BitgetSwapWsType::Private);
+
+        // 消费数据的函数
+        let core_arc_clone = core_arc.clone();
+        let run_symbol = symbols[0].clone();
+        let ct_val = core_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+        let fun = move |data: ResponseData| {
+            let core_arc_cc = core_arc_clone.clone();
+            let run_symbol_c = run_symbol.clone();
+
+            async move {
+                on_private_data(core_arc_cc, ct_val, data, &run_symbol_c).await
+            }
+        };
+
+        // 准备链接
+        bg_private.set_subscribe(vec![
+            BitgetSwapSubscribeType::PrOrders,
+            BitgetSwapSubscribeType::PrAccount,
+            BitgetSwapSubscribeType::PrPosition
+        ]);
+        bg_private.set_symbols(symbols.clone());
+        bg_private.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_private, write_rx_private).await.expect("bitget_usdt_swap 链接有异常")
+    });
+}
+
+async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
+                         ct_val: Decimal,
+                         response: ResponseData,
+                         run_symbol: &String) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
+    trace_stack.on_after_span_line();
+
+    // public类型,目前只考虑订单流数据
+    match response.channel.as_str() {
+        "account" => {
+            let account = standard::handle_info::HandleSwapInfo::handle_account_info(BitgetSwap, &response, run_symbol);
+            let mut core = core_arc_clone.lock().await;
+            core.update_equity(account).await;
+        },
+        "positions" => {
+            let mut positions = standard::handle_info::HandleSwapInfo::handle_position(BitgetSwap, &response, &ct_val);
+
+            // bitget如果没有仓位不会给0,会给个空数组
+            if positions.is_empty() {
+                positions.push(Position {
+                    symbol: run_symbol.replace("_", "").to_uppercase(),
+                    margin_level: Default::default(),
+                    amount: Default::default(),
+                    frozen_amount: Default::default(),
+                    price: Default::default(),
+                    profit: Default::default(),
+                    position_mode: PositionModeEnum::Both,
+                    margin: Default::default(),
+                });
+            }
+
+            let mut core = core_arc_clone.lock().await;
+            core.update_position(positions).await;
+        },
+        "orders" => {
+            trace_stack.set_source("gate_swap.orders".to_string());
+            let orders = standard::handle_info::HandleSwapInfo::handle_order(BitgetSwap, response.clone(), ct_val.clone());
+
+            let mut order_infos:Vec<OrderInfo> = Vec::new();
+            for mut order in orders.order {
+                if order.status == "NULL" {
+                    error!("bitget_usdt_swap 未识别的订单状态:{:?}", response);
+
+                    continue;
+                }
+
+                let order_info = OrderInfo::parse_order_to_order_info(&mut order);
+                order_infos.push(order_info);
+            }
+
+            {
+                let mut core = core_arc_clone.lock().await;
+                core.update_order(order_infos, trace_stack).await;
+            }
+        },
+        _ => {
+            info!("bitget_usdt_swap 113 未知的订阅数据: {:?}", response);
+        }
+    }
+}
+
+async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
+                        update_flag_u: &mut Decimal,
+                        response: ResponseData) {
+    let mut trace_stack = TraceStack::new(response.time, response.ins);
+    trace_stack.on_after_span_line();
+
+    // public类型,目前只考虑订单流数据
+    match response.channel.as_str() {
+        "books1" => {
+            trace_stack.set_source("bitget_usdt_swap.books1".to_string());
+            let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSwap, &response);
+            trace_stack.on_after_format();
+
+            on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
+        }
+        _ => {
+            info!("bitget_usdt_swap 125 未知的订阅数据");
+            info!(?response)
+        }
+    }
+}
+
+fn parse_btree_map_to_bitget_swap_login(exchange_params: BTreeMap<String, String>) -> BitgetSwapLogin {
+    BitgetSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        secret_key: exchange_params.get("secret_key").unwrap().clone(),
+        passphrase_key: exchange_params.get("pass_key").unwrap().clone(),
+    }
+}

+ 10 - 6
strategy/src/core.rs

@@ -22,7 +22,7 @@ use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
-use standard::exchange::ExchangeEnum::{BinanceSwap, BybitSwap, GateSwap, KucoinSwap};
+use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, GateSwap, KucoinSwap};
 
 use crate::model::{LocalPosition, OrderInfo, TokenParam};
 use crate::predictor::Predictor;
@@ -221,6 +221,9 @@ impl Core {
                 // "bitget_spot" => {
                 //     Exchange::new(BitgetSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
                 // }
+                "bitget_usdt_swap" => {
+                    Exchange::new(BitgetSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
                 // "okex_usdt_swap" => {
                 //     Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
                 // }
@@ -863,6 +866,7 @@ impl Core {
     // #[instrument(skip(self), level="TRACE")]
     pub async fn get_exchange_info(&mut self) {
         self.market = self.platform_rest.get_self_market();
+        info!(?self.market);
     }
 
     // #[instrument(skip(self, data), level="TRACE")]
@@ -881,13 +885,13 @@ impl Core {
     // #[instrument(skip(self), level="TRACE")]
     pub async fn update_equity_rest_swap(&mut self) {
         match self.platform_rest.get_account().await {
-            Ok(val) => {
+            Ok(account) => {
                 /*
                    更新保证金信息
                    合约一直更新
                    现货只有当出现异常时更新
                */
-                self.local_cash = val.balance * self.used_pct
+                self.local_cash = account.balance * self.used_pct
             }
             Err(e) => {
                 info!("获取账户信息错误: {:?}", e);
@@ -1148,7 +1152,6 @@ impl Core {
 
     // #[instrument(skip(self, target_hold_coin), level="TRACE")]
     pub async fn check_position(&mut self, target_hold_coin: Decimal) -> bool {
-
         info!("------------------------------------------------------------------------------------------------------------");
         info!("步骤一:检查挂单:");
         let mut is_order_clear = false;
@@ -1296,7 +1299,7 @@ impl Core {
         let mut length = 0;
         match self.platform_rest.get_positions().await {
             Ok(val) => {
-                info!("检查仓位信息({}条仓位信息,部分交易所会返回0持仓的):", length);
+                info!("检查仓位信息");
 
                 for position in val {
                     if position.amount.eq(&Decimal::ZERO) {
@@ -1445,7 +1448,7 @@ impl Core {
 
     // #[instrument(skip(self), level="TRACE")]
     pub async fn exit(&mut self) {
-        info!("-------------------------启动退出流程----------------------------");
+        info!("-------------------------启动退出流程({})----------------------------", self.exit_msg);
         info!("");
 
         self.clear_position_and_orders(Decimal::ZERO).await;
@@ -1461,6 +1464,7 @@ impl Core {
         self.get_exchange_info().await;
         // 获取价格信息
         let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!");
+        info!(?ticker);
         let mp = (ticker.buy + ticker.sell) / Decimal::TWO;
         // 获取账户信息
         if self.exchange.contains("spot") {

+ 7 - 0
strategy/src/exchange_disguise.rs

@@ -7,6 +7,7 @@ use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
 
 use crate::binance_usdt_swap::reference_binance_swap_run;
+use crate::bitget_usdt_swap::bitget_usdt_swap_run;
 use crate::gate_swap::gate_swap_run;
 // use crate::binance_spot::reference_binance_spot_run;
 // use crate::bitget_spot::bitget_spot_run;
@@ -37,6 +38,9 @@ pub async fn run_transactional_exchange(is_shutdown_arc :Arc<AtomicBool>,
         // "bitget_spot" => {
         //     bitget_spot_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
         // },
+        "bitget_usdt_swap" => {
+            bitget_usdt_swap_run(is_shutdown_arc, true, core_arc, name, symbols, is_colo, exchange_params).await;
+        }
         "bybit_usdt_swap" => {
             bybit_swap_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
         }
@@ -77,6 +81,9 @@ pub async fn run_reference_exchange(is_shutdown_arc: Arc<AtomicBool>,
         // "bitget_spot" => {
         //     bitget_spot_run(is_shutdown_arc, false, core_arc, name, symbols, is_colo, exchange_params).await;
         // },
+        "bitget_usdt_swap" => {
+            bitget_usdt_swap_run(is_shutdown_arc, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        }
         "bybit_usdt_swap" => {
             bybit_swap_run(is_shutdown_arc, false, core_arc, name, symbols, is_colo, exchange_params).await;
         },

+ 2 - 1
strategy/src/lib.rs

@@ -11,4 +11,5 @@ mod kucoin_swap;
 mod kucoin_spot;
 mod bitget_spot;
 mod okx_usdt_swap;
-mod bybit_usdt_swap;
+mod bybit_usdt_swap;
+mod bitget_usdt_swap;

+ 4 - 0
strategy/src/utils.rs

@@ -65,6 +65,8 @@ pub fn get_limit_requests_num_per_second(exchange: String) -> i64 {
         return public_params::OKEX_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT * public_params::RATIO;
+    } else if exchange.eq("bitget_usdt_swap") {
+        return public_params::BITGET_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("bybit_usdt_swap"){
         return public_params::BYBIT_USDT_SWAP_LIMIT * public_params::RATIO;
     } else {
@@ -95,6 +97,8 @@ pub fn get_limit_order_requests_num_per_second(exchange: String) -> i64 {
         return public_params::OKEX_USDT_SWAP_LIMIT
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT
+    }  else if exchange.eq("bitget_usdt_swap") {
+        return public_params::BITGET_USDT_SWAP_LIMIT
     } else if exchange.eq("bybit_usdt_swap") {
         return public_params::BYBIT_USDT_SWAP_LIMIT
     } else {