Przeglądaj źródła

Merge branch 'dev' into fresh

# Conflicts:
#	strategy/Cargo.toml
#	strategy/src/lib.rs
#	strategy/src/quant.rs
hl 1 rok temu
rodzic
commit
005715791d

+ 2 - 2
derive/tests/gate_swap_export_test.rs

@@ -5,7 +5,7 @@ use derive::export_excel::ExportEnum;
 use crate::export_excel_test::test_new_export;
 
 
-const SYMBOL: &str = "LOOM_USDT";
+const SYMBOL: &str = "TIA_USDT";
 
 // 测试获取Exchange实体
 #[tokio::test]
@@ -14,6 +14,6 @@ async fn test_get_self_exchange() {
     global::log_utils::init_log_with_trace();
 
     let mut export = test_new_export(ExportEnum::GateSwap).await;
-    let export_trades = export.export_trades("gate_swap", SYMBOL.to_string(), 0, 0, 100).await;
+    let export_trades = export.export_trades("gate_swap_42", SYMBOL.to_string(), 0, 0, 1000).await;
     trace!(?export_trades);
 }

+ 2 - 1
exchanges/Cargo.toml

@@ -42,4 +42,5 @@ tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 log = "0.4.20"
 
-##
+##生成 xlsx
+rust_xlsxwriter = "0.58.0"

+ 462 - 0
exchanges/src/bybit_swap_rest.rs

@@ -0,0 +1,462 @@
+use std::collections::BTreeMap;
+
+use reqwest::Client;
+use reqwest::header::HeaderMap;
+use ring::hmac;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use tracing::{info, trace};
+
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+
+#[derive(Clone, Debug)]
+pub struct BybitSwapRest {
+    pub label: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //登陆所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+}
+
+impl BybitSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BybitSwapRest
+    {
+        return BybitSwapRest::new_label("default-BybitSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BybitSwapRest {
+        let base_url = if is_colo {
+            "https://api.bytick.com".to_string()
+        } else {
+            "https://api.bytick.com".to_string()
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",base_url);
+        } else {
+            info!("走普通通道:{}",base_url);
+        }
+        /*****返回结构体*******/
+        BybitSwapRest {
+            label,
+            base_url,
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    //服務器時間
+    pub async fn get_server_time(&mut self) -> ResponseData {
+        let  params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/time".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢公告
+    pub async fn get_announcements(&mut self) -> ResponseData {
+        let  params = serde_json::json!({
+            "locale":"zh-TW"
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/announcements/index".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢可交易產品的規格信息
+    pub async fn get_instruments_info(&mut self, symbol: String) -> ResponseData {
+        let  params = serde_json::json!({
+            "category":"linear",
+            "symbol":symbol
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/instruments-info".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //查看持仓信息
+    pub async fn get_positions(&mut self, symbol: String) -> ResponseData {
+        let mut params = serde_json::json!({
+            "category":"linear",
+         });
+        if symbol.len() > 0 {
+            params.as_object_mut().unwrap().insert("symbol".parse().unwrap(), serde_json::Value::from(symbol));
+        }
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/position/list".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //设置持仓模式
+    pub async fn set_position_mode(&mut self, symbol: String, mode: i64) -> ResponseData {
+        let params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+             "mode": mode,
+         });
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/position/switch-mode".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //設置槓桿
+    pub async fn set_leverage(&mut self, symbol: String,
+                              lever: String) -> ResponseData {
+        let params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+             "buyLeverage": lever,
+             "sellLeverage": lever,
+         });
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/position/set-leverage".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+
+    //查詢錢包餘額
+    pub async fn get_account_balance(&mut self) -> ResponseData {
+        let  params = serde_json::json!({
+            "accountType":"UNIFIED"
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/account/wallet-balance".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //創建委託單
+    pub async fn swap_order(&mut self, params: serde_json::Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/order/create".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢實時委託單
+    pub async fn get_order(&mut self, symbol: String, order_id: String, order_link_id: String) -> ResponseData {
+        let mut params = serde_json::json!({
+            "category":"linear",
+            "symbol":symbol,
+         });
+        if order_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderId".parse().unwrap(), serde_json::Value::from(order_id));
+        }
+        if order_link_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderLinkId".parse().unwrap(), serde_json::Value::from(order_link_id));
+        }
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/order/realtime".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //撤单
+    pub async fn cancel_order(&mut self, symbol: String, order_id: String, order_link_id: String) -> ResponseData {
+        let mut params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+         });
+        if order_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderId".parse().unwrap(), serde_json::Value::from(order_id));
+        }
+        if order_link_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderLinkId".parse().unwrap(), serde_json::Value::from(order_link_id));
+        }
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/order/cancel".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         params: String) -> ResponseData
+    {
+        trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        // let mut passphrase = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        // if self.login_param.contains_key("pass_key") {
+        //     passphrase = self.login_param.get("pass_key").unwrap().to_string();
+        // }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" {
+            is_login_param = false
+        }
+
+
+        //请求头配置-如果需要登陆则存在额外配置
+        let mut body = "".to_string();
+        let timestamp = Self::get_timestamp();
+        let mut headers = HeaderMap::new();
+        headers.insert("Content-Type", "application/json; charset=utf-8".parse().unwrap());
+        headers.insert("X-BAPI-RECV-WINDOW", "5000".parse().unwrap());
+
+        if method == "POST" {
+            body = params.clone();
+        }
+
+
+        //是否需要登陆-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.label.clone(), "登陆参数错误!".to_string());
+                return e;
+            } else {
+                //需要登陆-且登陆参数齐全
+                trace!("param:{}", params);
+                trace!("body:{}", body);
+                //组装sing
+                let sing = Self::sign(
+                    access_key.clone(),
+                    secret_key.clone(),
+                    method.clone(),
+                    params.clone(),
+                    timestamp.clone(),
+                );
+                //组装header
+                headers.extend(Self::headers(sing, timestamp, access_key));
+            }
+        }
+
+
+        // trace!("headers:{:?}", headers);
+        let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let get_response = self.http_toll(
+            format!("{}{}", prefix_url.clone(), request_url.clone()),
+            method.to_string(),
+            params.clone(),
+            headers,
+        ).await;
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+        let res_data = Self::res_data_analysis(get_response, base_url, params);
+        res_data
+    }
+
+    pub fn headers(sign: String, timestamp: String, access_key: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("X-BAPI-SIGN-TYPE", "2".parse().unwrap());
+        headers.insert("X-BAPI-API-KEY", access_key.parse().unwrap());
+        headers.insert("X-BAPI-TIMESTAMP", timestamp.parse().unwrap());
+        headers.insert("X-BAPI-SIGN", sign.parse().unwrap());
+        // headers.insert("X-Referer", passphrase.parse().unwrap());
+        headers
+    }
+    pub fn sign(access_key: String,
+                secret_key: String,
+                method: String,
+                params: String,   timestamp: String) -> String
+    {
+        /*签名生成*/
+        let url_param_str = RestTool::parse_params_to_str(params.clone());
+        let parameters = if method == "GET" {
+            url_param_str
+        } else {
+            params
+        };
+
+        let message = format!("{}{}5000{}", timestamp, access_key, parameters);
+        trace!("message:{}",message);
+
+        // 做签名
+        let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+        let sign = hex::encode(result.as_ref());
+        sign
+    }
+
+    async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+        let res_data: ResponseData;
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url: String = if RestTool::parse_params_to_str(params.clone()) == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()))
+        };
+        trace!("url:{}", url);
+        trace!("addrs_url:{}", addrs_url);
+        let params_json: serde_json::Value = serde_json::from_str(&params).unwrap();
+        trace!("params_json:{}",params_json);
+        trace!("headers:{:?}",headers);
+
+
+        let req = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(url.clone()).body(params).headers(headers),
+            "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+        };
+
+        let response = req.send().await?;
+        if response.status().is_success() {
+            // 读取响应的内容
+            let body = response.text().await?;
+            // trace!("ok-----{}", body);
+            res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body);
+        } else {
+            let body = response.text().await?;
+            // trace!("error-----{}", body);
+            res_data = ResponseData::error(self.label.clone(), body.to_string())
+        }
+
+        Ok(res_data)
+    }
+
+
+    //res_data 解析
+    pub fn res_data_analysis(result: Result<ResponseData, reqwest::Error>, base_url: String, params: String) -> ResponseData {
+        // trace!("原始数据:{:?}",result);
+        match result {
+            Ok(res_data) => {
+                if res_data.code != "200" {
+                    // trace!("不等于200");
+                    let message: String = res_data.message;
+                    let json_value: serde_json::Value = serde_json::from_str(&message).unwrap();
+                    let code = json_value["code"].as_str().unwrap();
+                    let msg = json_value["msg"].as_str().unwrap();
+                    let error = ResponseData::new("".to_string(),
+                                                  format!("{}", code),
+                                                  format!("{}", msg),
+                                                  format!("请求地址:{},请求参数:{}", base_url, params));
+                    error
+                } else {
+                    let body: String = res_data.data.as_str().parse().unwrap();
+                    let json_value: serde_json::Value = serde_json::from_str(&body).unwrap();
+                    // trace!("json_value:{:?}",json_value.to_string().as_str());
+                    let code: i64 = if json_value["retCode"].as_i64().is_some() {
+                        json_value["retCode"].as_i64().unwrap()
+                    } else if json_value["ret_code"].as_i64().is_some() {
+                        json_value["ret_code"].as_i64().unwrap()
+                    } else {
+                        -1
+                    };
+
+                    let msg: &str = if json_value["retMsg"].as_str().is_some() {
+                        json_value["retMsg"].as_str().unwrap()
+                    } else if json_value["ret_msg"].as_str().is_some() {
+                        json_value["ret_msg"].as_str().unwrap()
+                    } else {
+                        ""
+                    };
+
+
+                    if code == 0 {
+                        let data = serde_json::to_string(&json_value["result"]).unwrap();
+                        let mut success = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), data.parse().unwrap());
+                        success.time = json_value["time"].as_i64().unwrap();
+                        success
+                    } else {
+                        let error = ResponseData::new("".to_string(),
+                                                      format!("{}", code),
+                                                      format!("{}", msg),
+                                                      format!("请求地址:{},请求参数:{}", base_url, params));
+                        error
+                    }
+                }
+            }
+            Err(err) => {
+                let error = ResponseData::error("".to_string(), format!("json 解析失败:{}", err));
+                error
+            }
+        }
+    }
+    fn get_timestamp() -> String {
+        chrono::Utc::now().timestamp_millis()
+            .to_string()
+    }
+}

+ 76 - 58
exchanges/src/crypto_spot_ws.rs

@@ -20,7 +20,10 @@ pub enum CryptoSpotWsType {
 //订阅频道
 #[derive(Clone)]
 pub enum CryptoSpotSubscribeType {
-    PuGetInstruments,
+    PuBook,
+    PuTicker,
+    PuTrade,
+    PuCandlestick,
 }
 
 //账号信息
@@ -60,7 +63,7 @@ impl CryptoSpotWs {
         let address_url = match ws_type {
             CryptoSpotWsType::Public => {
                 "wss://stream.crypto.com/exchange/v1/market".to_string()
-            },
+            }
             CryptoSpotWsType::Private => {
                 "wss://stream.crypto.com/exchange/v1/user".to_string()
             }
@@ -92,7 +95,7 @@ impl CryptoSpotWs {
     //手动添加币对
     pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
         for symbol in b_array.iter_mut() {
-            // 
+            // 
             *symbol = symbol.to_uppercase();
             // 字符串替换
             *symbol = symbol.replace("_", "");
@@ -104,7 +107,11 @@ impl CryptoSpotWs {
     fn contains_pr(&self) -> bool {
         for t in self.subscribe_types.clone() {
             if match t {
-                CryptoSpotSubscribeType::PuGetInstruments => false,
+                CryptoSpotSubscribeType::PuBook => false,
+                CryptoSpotSubscribeType::PuTicker => false,
+                CryptoSpotSubscribeType::PuTrade => false,
+                CryptoSpotSubscribeType::PuCandlestick => false,
+
             } {
                 return true;
             }
@@ -117,8 +124,17 @@ impl CryptoSpotWs {
     //订阅枚举解析
     pub fn enum_to_string(symbol: String, subscribe_type: CryptoSpotSubscribeType) -> String {
         match subscribe_type {
-            CryptoSpotSubscribeType::PuGetInstruments => {
-                format!("{}@aggTrade", symbol)
+            CryptoSpotSubscribeType::PuBook => {
+                format!("book.{}-PERP", symbol)
+            }
+            CryptoSpotSubscribeType::PuTicker => {
+                format!("ticker.{}-PERP", symbol)
+            }
+            CryptoSpotSubscribeType::PuTrade => {
+                format!("trade.{}-PERP", symbol)
+            }
+            CryptoSpotSubscribeType::PuCandlestick => {
+                format!("candlestick.M1.{}-PERP", symbol)
             }
         }
     }
@@ -132,17 +148,21 @@ impl CryptoSpotWs {
             }
         }
 
-       let nonce =  Utc::now().timestamp_millis();
+        let nonce = Utc::now().timestamp_millis();
         let str = json!({
-          "id": 1,
-          "method": "subscribe",
-          "params": {
-            "channels":params
-          },
-          "nonce": nonce
-        });
+                  "id": 1,
+                  "method": "subscribe",
+                  "params": {
+                    "channels":params
+                  },
+                  "nonce": nonce
+                });
 
-        str.to_string()
+        if params.len() > 0 {
+            str.to_string()
+        } else {
+            "".to_string()
+        }
     }
     /*******************************************************************************************************/
     /*****************************************socket基本*****************************************************/
@@ -155,7 +175,7 @@ impl CryptoSpotWs {
                                   read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
     {
         let login_is = self.contains_pr();
-        // let subscription = self.get_subscription();
+        let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
         // let heartbeat_time = self.heartbeat_time.clone();
@@ -170,11 +190,11 @@ impl CryptoSpotWs {
         // });
 
         //设置订阅
-        let  subscribe_array = vec![];
+        let mut subscribe_array = vec![];
         if login_is {
             //登录相关
         }
-        // subscribe_array.push(subscription.to_string());
+        subscribe_array.push(subscription.to_string());
 
         //链接
         let t2 = tokio::spawn(async move {
@@ -198,7 +218,7 @@ impl CryptoSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping
@@ -211,56 +231,54 @@ impl CryptoSpotWs {
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
-        trace!("原始数据");
-        trace!(?text);
+        // trace!("原始数据");
+        // trace!(?text);
+
         let mut res_data = ResponseData::new("".to_string(), "".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
-        if json_value.get("id").is_some()  && json_value.get("method").is_some()&& json_value.get("code").is_some()  {
-            //服务器心跳,需要做响应
+        if json_value.get("id").is_some() && json_value.get("method").is_some() && json_value.get("code").is_some() {
             let id = json_value["id"].as_i64().unwrap();
             let method = json_value["method"].as_str().unwrap();
             let code = json_value["code"].as_i64().unwrap();
-            if code == 0 && method == "public/heartbeat"{
-                res_data.code = "-302".to_string();
-                let str = json!({
+
+            if method == "public/heartbeat" {
+                if code == 0 {
+                    res_data.code = "-302".to_string();
+                    let str = json!({
                   "id": id,
                   "method": "public/respond-heartbeat"
                 });
-                res_data.message = "服务器主动心跳检测,客户端回应~!".to_string();
-                res_data.data = str .to_string();
+                    res_data.message = "服务器主动心跳检测,客户端回应~!".to_string();
+                    res_data.data = str.to_string();
+                } else {
+                    res_data.message = "心跳异常~!".to_string();
+                }
+            } else if method == "subscribe" && json_value.get("channel").is_some() {
+                //订阅反馈
+                if code == 0 {
+                    res_data.code = "-201".to_string();
+                    res_data.channel = json_value["channel"].as_str().unwrap().to_string();
+                } else {
+                    res_data.message = "订阅失败~!".to_string();
+                    res_data.data = json_value["data"].to_string()
+                }
+            } else if method == "subscribe" && json_value.get("result").is_some() {
+                if code == 0 {
+                    let subscription = json_value["result"]["subscription"].as_str().unwrap();
+                    res_data.channel = subscription.to_string();
+                    res_data.code = "200".to_string();
+                    res_data.data = json_value["result"]["data"].to_string()
+                } else {
+                    res_data.message = "推送数据异常~!".to_string();
+                    res_data.data = json_value["result"]["data"].to_string()
+                }
+            } else {
+                res_data.message = "未知解析!!".to_string();
             }
-        } else{
-            res_data.message = "未知解析!!".to_string();
+        } else {
+            res_data.message = "错误解析!!".to_string();
         }
-
-        // if json_value.get("result").is_some() && json_value.get("id").is_some() &&
-        //     json_value.get("id").unwrap() == 1
-        // {
-        //     res_data.code = "-201".to_string();
-        //     res_data.message = "订阅成功".to_string();
-        // } else if json_value.get("error").is_some() {//订阅返回
-        //     res_data.code = json_value["error"]["code"].to_string();
-        //     res_data.message = json_value["error"]["msg"].to_string();
-        // } else if json_value.get("stream").is_some() {//订阅返回
-        //     res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
-        //     res_data.code = "200".to_string();
-        //
-        //     let channel = format!("{}", json_value.get("stream").as_ref().unwrap());
-        //     if channel.contains("@aggTrade") {
-        //         res_data.channel = "aggTrade".to_string();
-        //     } else if channel.contains("@depth20@100ms") {
-        //         res_data.channel = "depth".to_string();
-        //     } else if channel.contains("@bookTicker") {
-        //         res_data.channel = "bookTicker".to_string();
-        //     } else {
-        //         res_data.channel = "未知的频道".to_string();
-        //     }
-        // } else {
-        //     res_data.data = text
-        // }
-
-
         res_data
     }
 }

+ 1 - 0
exchanges/src/gate_swap_rest.rs

@@ -351,6 +351,7 @@ impl GateSwapRest {
         ).await;
         data
     }
+
     //查询个人成交记录
     pub async fn my_trades(&mut self, settle: String, contract: String, limit: i64) -> ResponseData {
         let mut params = serde_json::json!({

+ 1 - 1
exchanges/src/gate_swap_ws.rs

@@ -69,7 +69,7 @@ impl GateSwapWs {
         let address_url = match ws_type {
             GateSwapWsType::PublicAndPrivate(name) => {
                 if is_colo {
-                    let url = format!("wss://fxws-privategateapi.io/v4/ws/{}", name.to_string());
+                    let url = format!("wss://fxws-private.gateapi.io/v4/ws/{}", name.to_string());
                     info!("开启高速通道:{:?}",url);
                     url
                 } else {

+ 2 - 2
exchanges/src/http_tool.rs

@@ -57,8 +57,8 @@ impl RestTool {
                     serde_json::Value::String(s) => s.clone(),
                     _ => value.to_string()
                 };
-                trace!("Key: {}", key);
-                trace!("Value: {}", formatted_value);
+                // trace!("Key: {}", key);
+                // trace!("Value: {}", formatted_value);
                 // let formatted_value = match value {
                 //     Value::String(s) => s.clone(),
                 //     _ => value.to_string()

+ 2 - 0
exchanges/src/lib.rs

@@ -20,4 +20,6 @@ pub mod bitget_spot_rest;
 pub mod kucoin_spot_ws;
 pub mod kucoin_spot_rest;
 pub mod crypto_spot_ws;
+pub mod bybit_swap_rest;
+pub mod xlsx_utils;
 

+ 3 - 3
exchanges/src/okx_swap_rest.rs

@@ -99,7 +99,7 @@ impl OkxSwapRest {
         let data = self.request("GET".to_string(),
                                 "/api/v5".to_string(),
                                 "/public/time".to_string(),
-                                true,
+                                false,
                                 params.to_string(),
         ).await;
         data
@@ -127,7 +127,7 @@ impl OkxSwapRest {
         let data = self.request("GET".to_string(),
                                 "/api/v5".to_string(),
                                 "/market/ticker".to_string(),
-                                true,
+                                false,
                                 params.to_string(),
         ).await;
         data
@@ -153,7 +153,7 @@ impl OkxSwapRest {
         let data = self.request("GET".to_string(),
                                 "/api/v5".to_string(),
                                 "/public/instruments".to_string(),
-                                true,
+                                false,
                                 params.to_string(),
         ).await;
         data

+ 1 - 1
exchanges/src/socket_tool.rs

@@ -105,7 +105,7 @@ impl AbstractWsMode {
                                 }
                             }
                             "-200" => {
-                                //订阅成功
+                                //登录成功
                                 trace!("登录成功:{:?}", data);
                             }
                             "-201" => {

+ 1 - 1
exchanges/src/utils.rs

@@ -1,9 +1,9 @@
 // use chrono::Utc;
 
-
 // pub fn get_time_microsecond() -> i64 {
 //     let now = Utc::now();
 //     let total_micros = now.timestamp_micros(); //微妙
 //     total_micros
 // }
 //
+

+ 92 - 0
exchanges/src/xlsx_utils.rs

@@ -0,0 +1,92 @@
+use std::collections::BTreeMap;
+use rust_xlsxwriter::*;
+
+pub fn creation_xlsx(one_row_name: &Vec<&str>, data_rows: &BTreeMap<String, Vec<Vec<String>>>, file_name: String) -> Result<(), XlsxError> {
+
+    // 创建一个新的Excel文件对象。
+    let mut workbook = Workbook::new();
+    for (key, value) in data_rows {
+        // 向工作簿中添加工作表。
+        let  worksheet = workbook.add_worksheet();
+        worksheet.set_name(key).unwrap();
+
+        // 第一行 列,明
+        let mut row_index: usize = 0;
+        let mut i: usize = 0;
+        let bold_format = Format::new().set_bold();
+        while i <= one_row_name.len() - 1 {
+            worksheet.write_with_format(row_index as RowNum, i as ColNum, one_row_name[i], &bold_format)?;
+            i = i + 1;
+        }
+
+
+        //后续 数据写入
+        row_index = 1;
+        while row_index <= value.len()  {
+            let row = value.get(row_index - 1).unwrap();
+
+            i = 0;
+            for str in row {
+                worksheet.write(row_index as RowNum, i as ColNum, str)?;
+                i = i + 1;
+            }
+            row_index = row_index + 1;
+        }
+    }
+
+
+    //
+    // for row in one_row_name {}
+    // // 写一个不带格式的字符串。
+    // worksheet.write(0, 0, "Hello")?;
+    //
+    //
+    // // 创建一些要在工作表中使用的格式。
+    // let bold_format = Format::new().set_bold();
+    // let decimal_format = Format::new().set_num_format("0.000");
+    // let date_format = Format::new().set_num_format("yyyy-mm-dd");
+    // let merge_format = Format::new()
+    //     .set_border(FormatBorder::Thin)
+    //     .set_align(FormatAlign::Center);
+    //
+    //
+    // // 为清晰设置列宽度。
+    // worksheet.set_column_width(0, 22)?;
+    //
+    // // 写一个不带格式的字符串。
+    // worksheet.write(0, 0, "Hello")?;
+    //
+    // // 用上面定义的粗体格式编写一个字符串。
+    // worksheet.write_with_format(1, 0, "World", &bold_format)?;
+    //
+    // //
+    // worksheet.write(2, 0, 1)?;
+    // worksheet.write(3, 0, 2.34)?;
+    //
+    // // 用格式写一个数字。
+    // worksheet.write_with_format(4, 0, 3.00, &decimal_format)?;
+    //
+    // // 写一个公式。
+    // worksheet.write(5, 0, Formula::new("=SIN(PI()/4)"))?;
+    //
+    // // 写一个日期。
+    // let date = ExcelDateTime::from_ymd(2023, 1, 25)?;
+    // worksheet.write_with_format(6, 0, &date, &date_format)?;
+    //
+    // //写一些链接。
+    // worksheet.write(7, 0, Url::new("https://www.rust-lang.org"))?;
+    // worksheet.write(8, 0, Url::new("https://www.rust-lang.org").set_text("Rust"))?;
+
+    // 写入一些合并的单元格。
+    // worksheet.merge_range(9, 0, 9, 1, "Merged cells", &merge_format)?;
+
+    // Insert an image.
+    // let image = Image::new("examples/rust_logo.png")?;
+    // worksheet.insert_image(1, 2, &image)?;
+
+    // 生成文件
+    let path = format!("./{}.xlsx", file_name);
+    workbook.save(path)?;
+
+    Ok(())
+}

+ 220 - 0
exchanges/tests/bybit_swap_test.rs

@@ -0,0 +1,220 @@
+use std::collections::BTreeMap;
+
+use exchanges::bybit_swap_rest::BybitSwapRest;
+
+const ACCESS_KEY: &str = "JKHMEL6kD7I7WjbHKP";
+const SECRET_KEY: &str = "jmofU9X9PjzGZ8BlO0xZLcUzImHE2CaTSQ3Y";
+
+
+//ws-订阅公共频道信息
+// #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+// async fn ws_custom_subscribe_pu() {
+//     global::log_utils::init_log_with_trace();
+//
+//
+//     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+//
+//
+//     let mut ws = get_ws(None, BybitSwapWsType::Public).await;
+//     ws.set_symbols(vec!["BTC_USDT".to_string()]);
+//     ws.set_subscribe(vec![
+//         // BybitSwapSubscribeType::PuBooks5,
+//         // BybitSwapSubscribeType::Putrades,
+//         BybitSwapSubscribeType::PuBooks50L2tbt,
+//         // BybitSwapSubscribeType::PuIndexTickers,
+//     ]);
+//
+//
+//     let write_tx_am = Arc::new(Mutex::new(write_tx));
+//     let bool_v1 = Arc::new(AtomicBool::new(true));
+//
+//     //读取
+//     let _bool_v1_clone = Arc::clone(&bool_v1);
+//     let _tr = tokio::spawn(async move {
+//         trace!("线程-数据读取-开启");
+//         loop {
+//             if let Some(data) = read_rx.next().await {
+//                 trace!("读取数据data:{:?}",data)
+//             }
+//         }
+//         // trace!("线程-数据读取-结束");
+//     });
+//
+//     //写数据
+//     // let bool_v2_clone = Arc::clone(&bool_v1);
+//     // let write_tx_clone = Arc::clone(&write_tx_am);
+//     // let su = ws.get_subscription();
+//     // let tw = tokio::spawn(async move {
+//     //     trace!("线程-数据写入-开始");
+//     //     loop {
+//     //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+//     //         // let close_frame = CloseFrame {
+//     //         //     code: CloseCode::Normal,
+//     //         //     reason: Cow::Borrowed("Bye bye"),
+//     //         // };
+//     //         // let message = Message::Close(Some(close_frame));
+//     //
+//     //
+//     //         let message = Message::Text(su.clone());
+//     //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+//     //         trace!("发送指令成功");
+//     //     }
+//     //     trace!("线程-数据写入-结束");
+//     // });
+//
+//     let t1 = tokio::spawn(async move {
+//         //链接
+//         let bool_v3_clone = Arc::clone(&bool_v1);
+//         ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//         trace!("test 唯一线程结束--");
+//     });
+//     tokio::try_join!(t1).unwrap();
+//     trace!("当此结束");
+//     trace!("重启!");
+//     trace!("参考交易所关闭");
+//     return;
+// }
+//
+
+//rest-服務器時間
+#[tokio::test]
+async fn rest_get_server_time_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_server_time().await;
+    println!("Bybit--服務器時間--{:?}", req_data);
+}
+
+//rest-查詢公告
+#[tokio::test]
+async fn rest_get_announcements_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_announcements().await;
+    println!("Bybit--查詢公告--{:?}", req_data);
+}
+
+//rest-查詢可交易產品的規格信息
+#[tokio::test]
+async fn rest_get_instruments_info_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_instruments_info("BTCUSDT".to_string()).await;
+    println!("Bybit--查詢可交易產品的規格信息--{:?}", req_data);
+}
+
+
+//rest-查詢錢包餘額
+#[tokio::test]
+async fn rest_get_account_balance_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_account_balance().await;
+    println!("Bybit--查詢錢包餘額--{:?}", req_data);
+}
+
+//rest-查看持仓信息
+#[tokio::test]
+async fn rest_get_positions_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_positions("DOGEUSDT".to_string()).await;
+    println!("Bybit--查看持仓信息--{:?}", req_data);
+}
+
+//rest-设置持仓模式
+#[tokio::test]
+async fn rest_set_position_mode_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.set_position_mode("DOGEUSDT".to_string(), 3).await;
+    println!("Bybit--设置持仓模式--{:?}", req_data);
+}
+
+//rest-設置槓桿
+#[tokio::test]
+async fn rest_set_leverage_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.set_leverage(
+        "DOGEUSDT".to_string(), "1".to_string()).await;
+    println!("Bybit--設置槓桿--{:?}", req_data);
+}
+
+
+//rest-創建委託單
+#[tokio::test]
+async fn rest_swap_order_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let params = serde_json::json!({
+            "category":"linear",
+            "symbol":"DOGEUSDT",
+            "orderType":"Limit",
+            "side":"Buy",
+            "qty":"1",
+            "price":"0.085",
+         });
+    let req_data = ret.swap_order(params).await;
+    println!("Bybit--創建委託單--{:?}", req_data);
+}
+
+
+//rest-查詢實時委託單
+#[tokio::test]
+async fn rest_get_order_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_order("DOGEUSDT".to_string(),
+                                 "".to_string(), "".to_string()).await;
+    println!("Bybit--查詢實時委託單--{:?}", req_data);
+}
+
+
+//rest-撤单
+#[tokio::test]
+async fn rest_cancel_order_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.cancel_order("DOGEUSDT".to_string(),
+                                    "1d3ea16f-cf1c-4dab-9a79-d441a2dea549".to_string(), "".to_string()).await;
+    println!("Bybit--撤单--{:?}", req_data);
+}
+
+
+// //rest-查詢實時委託單
+// #[tokio::test]
+// async fn rest_get_order_test() {
+//     global::log_utils::init_log_with_trace();
+//
+//     let mut ret = get_rest();
+//     let req_data = ret.get_order("BTCUSDT".to_string(),
+//                                  "".to_string(), "".to_string()).await;
+//     println!("Bybit--查詢可交易產品的規格信息--{:?}", req_data);
+// }
+
+
+// async fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
+//     let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
+//     ku_ws
+// }
+
+fn get_rest() -> BybitSwapRest {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+
+    let bybit_exc = BybitSwapRest::new(false, btree_map.clone());
+    bybit_exc
+}

+ 9 - 9
exchanges/tests/crypto_spot_test.rs

@@ -5,7 +5,7 @@ use futures_util::StreamExt;
 use tokio::sync::Mutex;
 use tracing::trace;
 
-use exchanges::crypto_spot_ws::{CryptoSpotLogin, CryptoSpotWs, CryptoSpotWsType};
+use exchanges::crypto_spot_ws::{CryptoSpotLogin, CryptoSpotSubscribeType, CryptoSpotWs, CryptoSpotWsType};
 
 const ACCESS_KEY: &str = "";
 const SECRET_KEY: &str = "";
@@ -19,12 +19,13 @@ async fn ws_pu() {
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
-    let mut ws = get_ws(None,CryptoSpotWsType::Public);
-    // ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    let mut ws = get_ws(None, CryptoSpotWsType::Public);
+    ws.set_symbols(vec!["BTC_USD".to_string()]);
     ws.set_subscribe(vec![
-        // CryptoSpotSubscribeType::PuBookTicker,
-        // BinanceSwapSubscribeType::PuAggTrade,
-        // BinanceSwapSubscribeType::PuDepth20levels100ms,
+        // CryptoSpotSubscribeType::PuBook,
+        // CryptoSpotSubscribeType::PuTicker,
+        // CryptoSpotSubscribeType::PuTrade,
+        CryptoSpotSubscribeType::PuCandlestick,
     ]);
 
     let write_tx_am = Arc::new(Mutex::new(write_tx));
@@ -75,12 +76,11 @@ async fn ws_pu() {
     trace!("重启!");
     trace!("参考交易所关闭");
     return;
-
 }
 
-fn get_ws(btree_map: Option< CryptoSpotLogin>,ws_type:CryptoSpotWsType) -> CryptoSpotWs {
+fn get_ws(btree_map: Option<CryptoSpotLogin>, ws_type: CryptoSpotWsType) -> CryptoSpotWs {
     let binance_ws = CryptoSpotWs::new(false,
-                                        btree_map,
+                                       btree_map,
                                        ws_type);
     binance_ws
 }

+ 18 - 6
exchanges/tests/gate_swap_test.rs

@@ -7,7 +7,7 @@ use tokio::sync::Mutex;
 use tracing::trace;
 
 use exchanges::gate_swap_rest::GateSwapRest;
-use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs,  GateSwapWsType};
+use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 
 const ACCESS_KEY: &str = "";
 const SECRET_KEY: &str = "";
@@ -20,7 +20,7 @@ async fn ws_custom_subscribe() {
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
 
-    let param  = GateSwapLogin{
+    let param = GateSwapLogin {
         api_key: ACCESS_KEY.to_string(),
         secret: SECRET_KEY.to_string(),
     };
@@ -99,18 +99,30 @@ async fn rest_cancel_order_all_test() {
     println!("okx--设置持仓模式--{:?}", req_data);
 }
 
+
+//rest-查询合约账户变更历史
+#[tokio::test]
+async fn rest_account_book_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.account_book("usdt".to_string()).await;
+    println!("okx--查询合约账户变更历史--{:?}", req_data);
+}
+
 fn get_ws(btree_map: Option<GateSwapLogin>) -> GateSwapWs {
-    let  binance_ws = GateSwapWs::new(false,
-                                         btree_map,
-                                         GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+    let binance_ws = GateSwapWs::new(false,
+                                     btree_map,
+                                     GateSwapWsType::PublicAndPrivate("usdt".to_string()));
     binance_ws
 }
 
+
 fn get_rest() -> GateSwapRest {
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
     btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
     btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
 
-    let  ba_exc = GateSwapRest::new(false, btree_map);
+    let ba_exc = GateSwapRest::new(false, btree_map);
     ba_exc
 }

+ 1 - 1
exchanges/tests/okx_swap_test.rs

@@ -298,7 +298,7 @@ async fn rest_get_ticker_test() {
     global::log_utils::init_log_with_trace();
 
     let mut ret = get_rest();
-    let req_data = ret.get_ticker("BTC-USD".to_string()).await;
+    let req_data = ret.get_ticker("BTC-USDT".to_string()).await;
     println!("okx--获取单个产品行情信息--{:?}", req_data);
 }
 

+ 52 - 56
exchanges/tests/test.rs

@@ -2,9 +2,9 @@ use exchanges::gate_swap_rest::GateSwapRest;
 use std::collections::BTreeMap;
 use tokio::io::{AsyncReadExt};
 use exchanges::kucoin_swap_rest::KucoinSwapRest;
-use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
+use exchanges::kucoin_swap_ws::{KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
 use exchanges::{proxy};
-use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
+use exchanges::okx_swap_ws::{OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
 
 use std::io::{Read, Write};
 use std::sync::Arc;
@@ -13,9 +13,8 @@ use tokio::sync::mpsc::channel;
 use tokio::try_join;
 use tracing::{trace};
 use tracing::instrument::WithSubscriber;
-use tungstenite::{connect, Message};
 use exchanges::binance_swap_rest::BinanceSwapRest;
-use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
+use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use exchanges::okx_swap_rest::OkxSwapRest;
 
 #[tokio::test]
@@ -46,7 +45,7 @@ async fn test_import() {
 
 
     //kucoin_rest -账户信息
-    demo_rest_kucoin().await;
+    // demo_rest_kucoin().await;
     //Kucoin-ws--公共频道
     // demo_ws_kucoin_pu().await;
     //Kucoin-ws--私有频道
@@ -186,9 +185,7 @@ async fn demo_okx_rest() {
     // trace!("okx_rest-rest - get_account- {:?}", res_data);
 }
 
-async fn demo_ws_gate() {
-
-}
+async fn demo_ws_gate() {}
 
 
 fn demo_so() {
@@ -283,54 +280,54 @@ async fn demo_ws_okx_bu() {
 }
 
 async fn demo_ws_kucoin_pr() {
-    let mut bool_v1 = Arc::new(AtomicBool::new(true));
-    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
-    btree_map.insert("access_key".to_string(), "".to_string());
-    btree_map.insert("secret_key".to_string(), "".to_string());
-    btree_map.insert("pass_key".to_string(), "".to_string());
-    trace!("----------------------btree_map{:?}", btree_map.clone());
-    let (tx, mut rx) = channel(1024);
-    let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
-                                      KucoinWsType::Private, tx).await;
-    ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
-
-    let t1 = tokio::spawn(async move {
-        ku_ws.custom_subscribe(bool_v1,vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
-    });
-
-    let t2 = tokio::spawn(async move {
-        loop {
-            if let Ok(received) = rx.try_recv() {
-                trace!( "age: {:?}", received);
-            }
-        }
-    });
-
-    try_join!(t1,t2).unwrap();
+    // let mut bool_v1 = Arc::new(AtomicBool::new(true));
+    // let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    // btree_map.insert("access_key".to_string(), "".to_string());
+    // btree_map.insert("secret_key".to_string(), "".to_string());
+    // btree_map.insert("pass_key".to_string(), "".to_string());
+    // trace!("----------------------btree_map{:?}", btree_map.clone());
+    // let (tx, mut rx) = channel(1024);
+    // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
+    //                                   KucoinWsType::Private, tx).await;
+    // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
+    //
+    // let t1 = tokio::spawn(async move {
+    //     ku_ws.custom_subscribe(bool_v1, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
+    // });
+    //
+    // let t2 = tokio::spawn(async move {
+    //     loop {
+    //         if let Ok(received) = rx.try_recv() {
+    //             trace!( "age: {:?}", received);
+    //         }
+    //     }
+    // });
+    //
+    // try_join!(t1,t2).unwrap();
 }
 
 async fn demo_ws_kucoin_pu() {
-    let mut bool_v1 = Arc::new(AtomicBool::new(true));
-    let btree_map: BTreeMap<String, String> = BTreeMap::new();
-    let (tx, mut rx) = channel(1024);
-    let mut ku_ws = KucoinSwapWs::new(false, btree_map, KucoinWsType::Public, tx).await;
-    ku_ws.set_subscribe(vec![
-        KucoinSubscribeType::PuContractMarketLevel2Depth50,
-        KucoinSubscribeType::PuContractMarkettickerV2,
-    ]);
-
-    let t1 = tokio::spawn(async move {
-        ku_ws.custom_subscribe(bool_v1,vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
-    });
-    let t2 = tokio::spawn(async move {
-        loop {
-            if let Ok(received) = rx.try_recv() {
-                trace!("age: {:?}", received);
-            }
-        }
-    });
-
-    try_join!(t1,t2).unwrap();
+    // let mut bool_v1 = Arc::new(AtomicBool::new(true));
+    // let btree_map: BTreeMap<String, String> = BTreeMap::new();
+    // let (tx, mut rx) = channel(1024);
+    // let mut ku_ws = KucoinSwapWs::new(false, btree_map, KucoinWsType::Public, tx).await;
+    // ku_ws.set_subscribe(vec![
+    //     KucoinSubscribeType::PuContractMarketLevel2Depth50,
+    //     KucoinSubscribeType::PuContractMarkettickerV2,
+    // ]);
+    //
+    // let t1 = tokio::spawn(async move {
+    //     ku_ws.custom_subscribe(bool_v1, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
+    // });
+    // let t2 = tokio::spawn(async move {
+    //     loop {
+    //         if let Ok(received) = rx.try_recv() {
+    //             trace!("age: {:?}", received);
+    //         }
+    //     }
+    // });
+    //
+    // try_join!(t1,t2).unwrap();
 }
 
 async fn demo_rest_kucoin() {
@@ -437,11 +434,10 @@ async fn demo_rest_gate() {
     // trace!("gate-rest -order{:?}", res_data);
 
 
-    let res_data = gate_exc.my_trades("usdt".to_string()).await;
-    trace!("gate-rest -my_trades{:?}", res_data);
+    // let res_data = gate_exc.my_trades("usdt".to_string()).await;
+    // trace!("gate-rest -my_trades{:?}", res_data);
     let res_data = gate_exc.account_book("usdt".to_string()).await;
     trace!("gate-rest -account_book{:?}", res_data);
-
 }
 
 async fn demo_rest_ba() {

+ 229 - 0
exchanges/tests/xlsx_test.rs

@@ -0,0 +1,229 @@
+use std::collections::{BTreeMap, HashSet};
+use chrono::{DateTime, FixedOffset, NaiveDateTime, TimeZone, Utc};
+
+use tracing::{trace};
+
+use exchanges::gate_swap_rest::GateSwapRest;
+use exchanges::xlsx_utils::creation_xlsx;
+
+//ws-订阅公共频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn test_gate_creation_xlsx() {
+    global::log_utils::init_log_with_trace();
+
+    //获取不同账号的数据
+    let mut acc_array: Vec<BTreeMap<String, String>> = vec![];
+    let mut acc_all_data: BTreeMap<String, Vec<Vec<String>>> = BTreeMap::new();
+    let mut acc_all_data_clone: BTreeMap<String, Vec<Vec<String>>> = BTreeMap::new();
+    let mut data_array_all: Vec<Vec<String>> = vec![];
+    let mut time_all: Vec<i64> = vec![];
+    loop {
+        let mut gate60: BTreeMap<String, String> = BTreeMap::new();
+        gate60.insert("acc_name".to_string(), String::from("gate60"));
+        gate60.insert("access_key".to_string(), String::from("61e3a1b12c44c8ccfce1f32782b9922f"));
+        gate60.insert("secret_key".to_string(), String::from("f3f533fa685cbae44b3017f73b71e90eaa9aa8d4922a39426744721e4824a527"));
+        acc_array.push(gate60);
+
+        let mut gate59: BTreeMap<String, String> = BTreeMap::new();
+        gate59.insert("acc_name".to_string(), String::from("gate59"));
+        gate59.insert("access_key".to_string(), String::from("c691d4fcc5a5a98af459f993a3a0c653"));
+        gate59.insert("secret_key".to_string(), String::from("05e7f8640bffeacc146b6f9f08512955d00d89bbdb051c9427f31adf96adeb2f"));
+        acc_array.push(gate59);
+
+        let mut gate58: BTreeMap<String, String> = BTreeMap::new();
+        gate58.insert("acc_name".to_string(), String::from("gate58"));
+        gate58.insert("access_key".to_string(), String::from("dfdc30687ac71daefa2fb39c706b8afa"));
+        gate58.insert("secret_key".to_string(), String::from("31e8999f85d38f5dd174f8919d9a1611c24d4e35b4d6319d6b160005871bf8b6"));
+        acc_array.push(gate58);
+
+        let mut gate57: BTreeMap<String, String> = BTreeMap::new();
+        gate57.insert("acc_name".to_string(), String::from("gate57"));
+        gate57.insert("access_key".to_string(), String::from("ba11ea511f343763db7c92c685f20c12"));
+        gate57.insert("secret_key".to_string(), String::from("272d1d38ac4f0af3e6ed96e6a9a2c59dd905c3d7ad730507008604bba14edb3d"));
+        acc_array.push(gate57);
+
+        let mut gate56: BTreeMap<String, String> = BTreeMap::new();
+        gate56.insert("acc_name".to_string(), String::from("gate56"));
+        gate56.insert("access_key".to_string(), String::from("72f0c351a83b04808baad625f6085599"));
+        gate56.insert("secret_key".to_string(), String::from("8a8d482bcc31f184cce350531cb40ca70564f9c466698d025e16d8943257dc0f"));
+        acc_array.push(gate56);
+
+        let mut gate55: BTreeMap<String, String> = BTreeMap::new();
+        gate55.insert("acc_name".to_string(), String::from("gate55"));
+        gate55.insert("access_key".to_string(), String::from("036408d5cbd8ddf1c6e0baab61ee641a"));
+        gate55.insert("secret_key".to_string(), String::from("940414a37711e59848011e99e223fa11b0e69f8badd35347301e3f8e41957a60"));
+        acc_array.push(gate55);
+
+        let mut gate54: BTreeMap<String, String> = BTreeMap::new();
+        gate54.insert("acc_name".to_string(), String::from("gate54"));
+        gate54.insert("access_key".to_string(), String::from("fbe564e8bd4efaa0c3c023ca8c057b36"));
+        gate54.insert("secret_key".to_string(), String::from("0be3c0223fd021fdacacc03f183f467e988ceee6eb1b0e09ca96bb1eebd45f39"));
+        acc_array.push(gate54);
+
+        let mut gate53: BTreeMap<String, String> = BTreeMap::new();
+        gate53.insert("acc_name".to_string(), String::from("gate53"));
+        gate53.insert("access_key".to_string(), String::from("16d91ba0a3d79a2925d16ea01a615fa1"));
+        gate53.insert("secret_key".to_string(), String::from("607b07cf240466656c00beb0c6fff252839467583fd3f8b14782eb007b3d99ce"));
+        acc_array.push(gate53);
+
+        let mut gate52: BTreeMap<String, String> = BTreeMap::new();
+        gate52.insert("acc_name".to_string(), String::from("gate52"));
+        gate52.insert("access_key".to_string(), String::from("31054006c457a80a027e961cf3e5e3a4"));
+        gate52.insert("secret_key".to_string(), String::from("a43f8f5672b49bfcc304d0731100610de1c55e7d2b6c00199b267993f0b189d1"));
+        acc_array.push(gate52);
+
+        let mut gate51: BTreeMap<String, String> = BTreeMap::new();
+        gate51.insert("acc_name".to_string(), String::from("gate51"));
+        gate51.insert("access_key".to_string(), String::from("edcedefe7830dd5722c2c37704ae700f"));
+        gate51.insert("secret_key".to_string(), String::from("41db86a8463ac7c66023a8505e5fddd3448e551a11a52141bf57ca8478e2149b"));
+        acc_array.push(gate51);
+
+
+        for acc in acc_array {
+            let mut gate_exc = GateSwapRest::new(false, acc.clone());
+            let data = gate_exc.account_book("usdt".to_string()).await;
+            // trace!("data???????:{:?}",data.clone());
+            if data.code.as_str() == "200" {
+                let acc_name = acc.get("acc_name").clone();
+
+                //账号
+                let json_value: serde_json::Value = serde_json::from_str(&data.data).unwrap();
+                if let serde_json::Value::Array(array) = json_value {
+                    let mut name_data_all: Vec<Vec<String>> = vec![];
+                    let mut name_data_all_clone: Vec<Vec<String>> = vec![];
+                    for item in array {
+                        let time = item["time"].as_i64().unwrap();//秒级
+                        // 使用秒构建一个DateTime<Utc>对象
+                        // let datetime: DateTime<Utc> = Utc.timestamp(time, 0);
+                        //2023-12-10 00:00:00
+                        if 1702051200 > time {
+                            continue;
+                        }
+                        time_all.push(time);
+                        let time_str = NaiveDateTime::from_timestamp_millis((time + 8 * 3600) * 1000).unwrap().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
+
+                        trace!("数据时间解析:{:?}---{:?}",time,time_str);
+                        let change = item["change"].as_str().unwrap();
+
+                        let balance = item["balance"].as_str().unwrap();
+                        let type_str = match item["type"].as_str().unwrap() {
+                            "dnw" => { "转入转出" }
+                            "pnl" => { "减仓盈亏" }
+                            "fee" => { "交易手续费" }
+                            "refr" => { "推荐人返佣" }
+                            "fund" => { "资金费用" }
+                            "point_dnw" => { "点卡转入转出" }
+                            "point_fee" => { "点卡交易手续费" }
+                            "point_refr" => { "点卡推荐人返佣" }
+                            _ => {
+                                "未知-变更类型"
+                            }
+                        };
+                        let text = item["text"].as_str().unwrap();
+                        let contract = item["contract"].as_str().unwrap();
+                        let trade_id = item["trade_id"].as_str().unwrap();
+
+                        let mut name_data_array: Vec<String> = vec![];
+                        name_data_array.push(time_str.to_string());
+                        name_data_array.push(trade_id.to_string());
+                        name_data_array.push(change.to_string());
+                        name_data_array.push(balance.to_string());
+                        name_data_array.push(type_str.to_string());
+                        name_data_array.push(contract.to_string());
+                        name_data_array.push(text.to_string());
+
+                        let mut name_data_array_clone: Vec<String> = vec![];
+                        name_data_array_clone.push(time.to_string());
+                        name_data_array_clone.push(trade_id.to_string());
+                        name_data_array_clone.push(change.to_string());
+                        name_data_array_clone.push(balance.to_string());
+                        name_data_array_clone.push(type_str.to_string());
+                        name_data_array_clone.push(contract.to_string());
+                        name_data_array_clone.push(text.to_string());
+
+                        name_data_all.push(name_data_array.clone());
+                        name_data_all_clone.push(name_data_array_clone.clone());
+                        let mut cp =  name_data_array.clone();
+                        cp.push(acc_name.clone().unwrap().to_string());
+                        data_array_all.push(cp);
+                    }
+                    acc_all_data.insert(acc_name.clone().unwrap().to_string(), name_data_all.clone());
+                    acc_all_data_clone.insert(acc_name.clone().unwrap().to_string(), name_data_all_clone.clone());
+                } else {
+                    trace!("不是数组 检查数据");
+                }
+            }
+            // break;
+        }
+        break;//这里是为了 代码收纳,用了loop来放置代码
+    }
+    trace!("数据如下:{:?}",acc_all_data);
+
+    //汇总
+    //1. 生成时间片
+    let mut unique = HashSet::new();
+    time_all.retain(|e| unique.insert(*e));
+    trace!("时间片:{:?}",time_all);
+
+    //2. 根据时间片 去求每个时间片的  总余额,
+    let mut sum_data_array_all: Vec<Vec<String>> = vec![];
+    for time in time_all.clone() {
+        let mut sum_data_array: Vec<String> = vec![];
+        let mut sum_balance: f64 = 0 as f64;
+        for (key, value) in acc_all_data_clone.clone() {
+            let acc_key = key;
+            trace!("读取value:{:?}",value);
+            let filter_arrya: Vec<Vec<String>> = value.clone().into_iter().filter(|s| {
+                trace!("读取time:{:?}",s);
+                let time_v = s[0].clone();
+                time_v < (time - 1).to_string()
+            }).collect();
+
+            let balance: f64 = if filter_arrya.len() > 0 {
+                let row = filter_arrya[filter_arrya.len() - 1].clone();
+                trace!("读取balance:{:?}",row);
+                let balance_clone = row[3].clone();
+                balance_clone.parse::<f64>().unwrap()
+            } else {
+                0 as f64
+            };
+            sum_balance = balance + sum_balance;
+        }
+
+        // 使用秒构建一个DateTime<Utc>对象
+        let time_str = NaiveDateTime::from_timestamp_millis((time + 8 * 3600) * 1000).unwrap().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
+
+        sum_data_array.push(time_str.to_string());
+        sum_data_array.push("".to_string());
+        sum_data_array.push("".to_string());
+        sum_data_array.push(sum_balance.to_string());
+        sum_data_array.push("".to_string());
+        sum_data_array.push("".to_string());
+        sum_data_array.push("".to_string());
+
+        sum_data_array_all.push(sum_data_array);
+    }
+    if sum_data_array_all.len() > 0 {
+        acc_all_data.insert("total".to_string(), sum_data_array_all);
+    }
+    if data_array_all.len() > 0{
+        acc_all_data.insert("gather".to_string(), data_array_all);
+    }
+
+    //数据组装.
+    let noe_row_name = vec!["时间", "成交Id", "变更金额", "变更后余额", "变更类型", "合约标识", "注释"];
+    //创建表格
+    //提示。涉及到 需要转换的数据,请提前自行转换,工具只负责写入生成
+    // after - 之后的,时间戳> after 才是有效数据
+    match creation_xlsx(&noe_row_name, &acc_all_data,
+                        "okx".to_string(),
+    ) {
+        Ok(d) => {
+            trace!("完成");
+        }
+        Err(z) => {
+            eprint!("{:?}", z);
+            trace!("失败");
+        }
+    }
+}

+ 4 - 0
global/src/trace_stack.rs

@@ -121,6 +121,10 @@ impl fmt::Display for TraceStack {
             msg.push_str(format!("发送订单耗时(发送-服务器处理-响应到本地){}毫秒  ", (self.after_send - self.before_send).to_f64().unwrap() / 1000.0).as_str());
         }
 
+        if self.after_network != 0 && self.before_send != 0 {
+            msg.push_str(format!("本地总耗时{}微秒  ", self.before_send - self.after_network).as_str());
+        }
+
         if self.after_send != 0 && self.after_network != 0 {
             msg.push_str(format!("总共耗时{}毫秒", (self.after_send - self.after_network).to_f64().unwrap() / 1000.0).as_str());
         }

+ 1 - 1
standard/src/gate_swap.rs

@@ -551,7 +551,7 @@ impl Platform for GateSwap {
                                 result_sd.send(order).await.unwrap();
                             }
                             Err(_err) => {
-                                error!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
+                                // error!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
                                 // panic!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
                             }
                         }

+ 36 - 7
standard/src/okx_handle.rs

@@ -10,6 +10,29 @@ use crate::exchange::ExchangeEnum;
 use crate::handle_info::HandleSwapInfo;
 use crate::okx_swap::SwapPosition;
 
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SwapBalanceAndPositionSubscribe {
+    pos_data: Vec<SwapBalanceAndPositionPosDataSubscribe>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SwapBalanceAndPositionPosDataSubscribe {
+    pos_id: String,
+    trade_id: String,
+    inst_id: String,
+    inst_type: String,
+    mgn_mode: String,
+    pos_side: String,
+    pos: Decimal,
+    ccy: String,
+    pos_ccy: String,
+    avg_px: Decimal,
+    u_time: String,
+}
+
 #[derive(Debug, Deserialize, Serialize)]
 #[serde(rename_all = "camelCase")]
 struct SwapPositionSubscribe {
@@ -141,15 +164,21 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     }
 }
 
+
 // 处理position信息
 pub fn handle_position(res_data: ResponseData, ct_val: Decimal) -> Vec<Position> {
     let res_data_str = res_data.data;
-    let data_list: SwapPositionSubscribe = serde_json::from_str(&res_data_str).unwrap();
-    let position_data = data_list.data;
-    position_data.iter().map(|item| format_position_item(item, ct_val)).collect()
+    let data_list: Vec<SwapBalanceAndPositionSubscribe> = serde_json::from_str(&res_data_str).unwrap();
+
+    let position_data = data_list[0].pos_data.clone();
+    if position_data.len() > 0 {
+        position_data.iter().map(|item| format_position_item(item, ct_val)).collect()
+    } else {
+        vec![]
+    }
 }
 
-pub fn format_position_item(value: &SwapPosition, ct_val: Decimal) -> Position {
+pub fn format_position_item(value: &SwapBalanceAndPositionPosDataSubscribe, ct_val: Decimal) -> Position {
     let position_mode = match value.pos_side.as_str() {
         "long" => { PositionModeEnum::Long }
         "short" => { PositionModeEnum::Short }
@@ -157,12 +186,12 @@ pub fn format_position_item(value: &SwapPosition, ct_val: Decimal) -> Position {
     };
     Position {
         symbol: value.inst_id.replace("-SWAP", ""),
-        margin_level: value.lever,
+        margin_level: Decimal::ZERO,
         amount: value.pos * ct_val,
         frozen_amount: Decimal::ZERO,
         price: value.avg_px,
-        profit: value.upl,
+        profit: Decimal::ZERO,
         position_mode,
-        margin: if value.margin != "" { Decimal::from_str(&value.margin).unwrap() } else { Decimal::ZERO },
+        margin: Decimal::ZERO,
     }
 }

+ 21 - 3
standard/src/okx_swap.rs

@@ -13,7 +13,7 @@ use tracing::{debug, error};
 use exchanges::okx_swap_rest::OkxSwapRest;
 use global::trace_stack::TraceStack;
 use crate::exchange::ExchangeEnum;
-use crate::{Account, Market, okx_handle, Order, OrderCommand, Platform, Position, Ticker, utils};
+use crate::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Ticker, utils};
 
 /// Okx交易所账户信息请求数据结构
 /// - 接口`"/api/v5/account/balance"`
@@ -567,7 +567,7 @@ impl Platform for OkxSwap {
             let res_data_str = &res_data.data;
             let data_list: Vec<SwapPosition> = serde_json::from_str(&res_data_str).unwrap();
             let position_info: Vec<&SwapPosition> = data_list.iter().filter(|item| item.inst_id == symbol_format).collect();
-            let result = position_info.iter().map(|item| okx_handle::format_position_item(item, ct_val)).collect();
+            let result = position_info.iter().map(|item| format_position_item(item, ct_val)).collect();
             Ok(result)
         } else {
             Err(Error::new(ErrorKind::Other, res_data.to_string()))
@@ -579,7 +579,7 @@ impl Platform for OkxSwap {
         if res_data.code == "200" {
             let res_data_str = &res_data.data;
             let data_list: Vec<SwapPosition> = serde_json::from_str(&res_data_str).unwrap();
-            let result = data_list.iter().map(|item| okx_handle::format_position_item(item, Decimal::ONE)).collect();
+            let result = data_list.iter().map(|item| format_position_item(item, Decimal::ONE)).collect();
             Ok(result)
         } else {
             Err(Error::new(ErrorKind::Other, res_data.to_string()))
@@ -1051,3 +1051,21 @@ pub fn format_order_item(data: SwapOrder, ct_val: Decimal) -> Order {
     debug!("format-order-end, okx_swap");
     result
 }
+
+pub fn format_position_item(value: &SwapPosition, ct_val: Decimal) -> Position {
+    let position_mode = match value.pos_side.as_str() {
+        "long" => { PositionModeEnum::Long }
+        "short" => { PositionModeEnum::Short }
+        _ => { PositionModeEnum::Both }
+    };
+    Position {
+        symbol: value.inst_id.replace("-SWAP", ""),
+        margin_level: value.lever,
+        amount: value.pos * ct_val,
+        frozen_amount: Decimal::ZERO,
+        price: value.avg_px,
+        profit: value.upl,
+        position_mode,
+        margin: if value.margin != "" { Decimal::from_str(&value.margin).unwrap() } else { Decimal::ZERO },
+    }
+}

+ 1 - 0
strategy/Cargo.toml

@@ -19,6 +19,7 @@ tracing-subscriber = "0.3.17"
 standard = { path = "../standard" }
 global = { path = "../global" }
 exchanges = { path = "../exchanges" }
+reqwest = { version = "0.11.14", features = ["json"] }
 ndarray = "0.15.6"
 argmin = "0.7"
 

+ 1 - 1
strategy/src/binance_spot.rs

@@ -96,7 +96,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 24 - 21
strategy/src/binance_usdt_swap.rs

@@ -10,7 +10,7 @@ use crate::model::{OriginalTradeBa};
 use crate::quant::Quant;
 use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use futures_util::StreamExt;
-use crate::exchange_disguise::on_special_depth;
+use crate::exchange_disguise::{on_special_depth, on_trade};
 
 // 参考 币安 合约 启动
 pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
@@ -28,7 +28,7 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
         ws.set_subscribe(vec![
             // BinanceSwapSubscribeType::PuDepth20levels100ms,
             BinanceSwapSubscribeType::PuBookTicker,
-            // BinanceSwapSubscribeType::PuAggTrade
+            BinanceSwapSubscribeType::PuAggTrade
         ]);
 
         //读取数据
@@ -55,11 +55,11 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
 
 async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  update_flag_u: &mut Decimal,
-                 max_buy: &mut Decimal,
-                 min_sell: &mut Decimal,
+                 _max_buy: &mut Decimal,
+                 _min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -68,22 +68,25 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
 
     if data.channel == "aggTrade" {
         let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-        let str = data.label.clone();
-        let mut quant = bot_arc_clone.lock().await;
-        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
-            *max_buy = Decimal::ZERO;
-            *min_sell = Decimal::ZERO;
-            quant.is_update.remove(str.as_str());
-        }
-        if trade.p > *max_buy || *max_buy == Decimal::ZERO {
-            *max_buy = trade.p
-        }
-        if trade.p < *min_sell || *min_sell == Decimal::ZERO {
-            *min_sell = trade.p
-        }
-        {
-            quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-        }
+        // let name = data.label.clone();
+
+        // 订单流逻辑
+        on_trade(trade.clone(), bot_arc_clone.clone()).await;
+
+        // // 原本的逻辑
+        // let mut quant = bot_arc_clone.lock().await;
+        // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(name.as_str()).unwrap() {
+        //     *max_buy = Decimal::ZERO;
+        //     *min_sell = Decimal::ZERO;
+        //     quant.is_update.remove(name.as_str());
+        // }
+        // if trade.p > *max_buy || *max_buy == Decimal::ZERO {
+        //     *max_buy = trade.p
+        // }
+        // if trade.p < *min_sell || *min_sell == Decimal::ZERO {
+        //     *min_sell = trade.p
+        // }
+        // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
     } else if data.channel == "bookTicker" {
         trace_stack.on_before_format();
         // 将ticker数据转换为模拟深度

+ 2 - 2
strategy/src/bitget_spot.rs

@@ -130,7 +130,7 @@ pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>,
 async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
 
-    trace_stack.on_after_network(data.time);
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {
@@ -179,7 +179,7 @@ async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data
 async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
     let mut trace_stack = TraceStack::default();
 
-    trace_stack.on_after_network(data.time);
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 57 - 2
strategy/src/exchange_disguise.rs

@@ -11,6 +11,8 @@ use crate::bitget_spot::bitget_spot_run;
 use crate::gate_swap::gate_swap_run;
 use crate::kucoin_spot::kucoin_spot_run;
 use crate::kucoin_swap::kucoin_swap_run;
+use crate::model::OriginalTradeBa;
+use crate::okx_usdt_swap::okex_swap_run;
 use crate::quant::Quant;
 
 // 交易交易所启动
@@ -28,8 +30,11 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>,
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, true, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
+        "okex_usdt_swap" => {
+            okex_swap_run(bool_v1,true, quant_arc, name, symbols, is_colo, exchange_params).await;
+        },
         "bitget_spot" => {
-            bitget_spot_run(bool_v1,false, quant_arc, name, symbols, is_colo, exchange_params).await;
+            bitget_spot_run(bool_v1,true, quant_arc, name, symbols, is_colo, exchange_params).await;
         }
         _ => {
             let msg = format!("不支持的交易交易所:{}", exchange_name);
@@ -56,6 +61,9 @@ pub async fn run_reference_exchange(bool_v1: Arc<AtomicBool>,
         "gate_usdt_swap" => {
             gate_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
+        "okex_usdt_swap" => {
+            okex_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
+        },
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
@@ -86,7 +94,54 @@ pub async fn on_special_depth(bot_arc: Arc<Mutex<Quant>>,
     }
 }
 
-pub async fn on_trade() {}
+pub async fn on_trade(trade: OriginalTradeBa,
+                      bot_arc_clone: Arc<Mutex<Quant>>) {
+    let mut bot = bot_arc_clone.lock().await;
+    // 1. 塞入数据到bot
+    bot.trades.push(trade.clone());
+    // 2. 长度检查
+    if bot.trades.len() > bot.recall_max_count {
+        bot.trades.remove(0);
+    }
+    // 3. 如果少于100条,不进行判断
+    if bot.trades.len() < 100 {
+        return;
+    }
+    // 求最近的多空总和
+    let mut long_sum = Decimal::ZERO;
+    let mut short_sum = Decimal::ZERO;
+    let last_trade_t = trade.t.clone();
+    let mut rev = bot.trades.clone();
+    rev.reverse();
+    for trade_o in rev {
+        // 如果该元素已过期,我们是按时间顺序插入的,说明前面的应该都过期了,跳出循环,停止检测
+        if trade_o.t < last_trade_t - bot.recall_time {
+            continue;
+        }
+
+        // 卖出订单
+        if trade_o.m {
+            short_sum += trade_o.q;
+        } else {
+            long_sum += trade_o.q;
+        }
+    }
+
+    // 做多主动性
+    if (long_sum / (long_sum + short_sum)) > bot.long_volume_rate {
+        if bot.side != "long".to_string() {
+            bot.side = "long".to_string();
+        }
+    } else if (short_sum / (long_sum + short_sum)) > bot.short_volume_rate {
+        if bot.side != "short".to_string() {
+            bot.side = "short".to_string();
+        }
+    } else {
+        if bot.side != "normal".to_string() {
+            bot.side = "normal".to_string();
+        }
+    }
+}
 
 pub async fn on_order() {}
 

+ 1 - 1
strategy/src/gate_swap.rs

@@ -101,7 +101,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 1 - 1
strategy/src/kucoin_spot.rs

@@ -77,7 +77,7 @@ async fn on_kucoin_spot_data(bot_arc_clone: Arc<Mutex<Quant>>,
                              min_sell: &mut Decimal,
                              data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 1 - 1
strategy/src/kucoin_swap.rs

@@ -116,7 +116,7 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
 
     if data.code != "200".to_string() {

+ 2 - 1
strategy/src/lib.rs

@@ -13,4 +13,5 @@ mod kucoin_spot;
 mod bitget_spot;
 mod predictor_new;
 mod instant_volatility_indicator;
-mod ring_buffer;
+mod ring_buffer;
+mod okx_usdt_swap;

+ 34 - 2
strategy/src/model.rs

@@ -42,7 +42,8 @@ pub struct TraderMsg {
     pub orders: HashMap<String, OrderInfo>,
     pub ref_price: Vec<Vec<Decimal>>,
     pub market: Vec<Decimal>,
-    pub predict: Decimal
+    pub predict: Decimal,
+    pub side: String,
 }
 
 impl TraderMsg {
@@ -60,6 +61,7 @@ impl TraderMsg {
             ref_price: Default::default(),
             market: vec![],
             predict: Default::default(),
+            side: "normal".to_string(),
         }
     }
 }
@@ -93,10 +95,15 @@ pub struct OrderInfo {
     pub trace_stack: TraceStack
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone, Debug)]
 pub struct OriginalTradeBa {
     // 成交价格
     pub p: Decimal,
+    // 成交数量
+    pub q: Decimal,
+    // 成交时间
+    #[serde(rename = "T")]
+    pub t: Decimal,
     // 买方是否是做市方。如true,则此次成交是一个主动卖出单,否则是一个主动买入单。
     pub m: bool
 }
@@ -107,6 +114,14 @@ pub struct OriginalTradeGa {
     pub price: Decimal
 }
 
+#[derive(Serialize, Deserialize)]
+pub struct OriginalTradeOK {
+    // 数量
+    pub sz: Decimal,
+    // 价格
+    pub px: Decimal
+}
+
 #[allow(non_snake_case)]
 #[derive(Serialize, Deserialize, Debug)]
 pub struct OriginalTicker {
@@ -122,3 +137,20 @@ pub struct OriginalTicker {
     pub A: Decimal
 }
 
+#[allow(non_snake_case)]
+#[derive(Serialize, Deserialize, Debug)]
+pub struct DealRecord {
+    // 参考价
+    pub refPrice: String,
+    // 挂单价
+    pub regPrice: String,
+    // 买单最优挂单数量
+    pub num: String,
+    // 触发时间
+    pub triggerTime: i64,
+    // 机器名称
+    pub robotName: String,
+    // 方向
+    pub side: String
+}
+

+ 208 - 0
strategy/src/okx_usdt_swap.rs

@@ -0,0 +1,208 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
+use rust_decimal::Decimal;
+use tokio::sync::Mutex;
+use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::OkxSwap;
+use crate::exchange_disguise::on_special_depth;
+use crate::model::{OrderInfo, OriginalTradeOK};
+use crate::quant::Quant;
+
+pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
+                           is_trade: bool,
+                           _quant_arc: Arc<Mutex<Quant>>,
+                           name: String,
+                           symbols: Vec<String>,
+                           is_colo: bool,
+                           exchange_params: BTreeMap<String, String>) {
+    // 启动公共频道
+    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+    let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
+
+    let mut ws_public = OkxSwapWs::new_label(name.clone(), is_colo, None, OkxSwapWsType::Public);
+    ws_public.set_symbols(symbols.clone());
+    if is_trade {
+        ws_public.set_subscribe(vec![
+            OkxSwapSubscribeType::PuBooks5
+        ])
+    } else {
+        ws_public.set_subscribe(vec![
+            OkxSwapSubscribeType::PuBooks50L2tbt
+        ])
+    }
+    // 挂起公共ws
+    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+    let bool_clone_public = Arc::clone(&bool_v1);
+    tokio::spawn(async move {
+        ws_public.ws_connect_async(bool_clone_public,
+                                   &write_tx_am_public,
+                                   write_rx_public,
+                                   read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+    // 消费数据
+    let bot_arc_clone = _quant_arc.clone();
+    // 接收public数据
+    tokio::spawn(async move {
+        // ticker
+        let mut update_flag_u = Decimal::ZERO;
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+
+        loop {
+            if let Some(public_data) = read_rx_public.next().await {
+                on_public_data(bot_arc_clone.clone(),
+                               &mut update_flag_u,
+                               &mut max_buy,
+                               &mut min_sell,
+                               public_data).await;
+            }
+        }
+    });
+
+    // 交易交易所需要启动私有ws
+    if is_trade {
+        let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
+        let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
+        let auth = Some(parse_btree_map_to_okx_swap_login(exchange_params));
+
+        let mut ws_private = OkxSwapWs::new_label(name.clone(), is_colo, auth, OkxSwapWsType::Private);
+        ws_private.set_symbols(symbols.clone());
+        ws_private.set_subscribe(vec![
+            OkxSwapSubscribeType::PrBalanceAndPosition,
+            OkxSwapSubscribeType::PrAccount("USDT".to_string()),
+            OkxSwapSubscribeType::PrOrders
+        ]);
+
+
+        // 挂起私有ws
+        let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+        let bool_clone_private = Arc::clone(&bool_v1);
+        tokio::spawn(async move {
+            ws_private.ws_connect_async(bool_clone_private,
+                                &write_tx_am_private,
+                                write_rx_private,
+                                read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+
+        // 消费数据
+        let bot_arc_clone = _quant_arc.clone();
+        // 接收private信息
+        tokio::spawn(async move {
+            let ct_val = _quant_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
+            let run_symbol = symbols.clone()[0].clone();
+            loop {
+                if let Some(private_data) = read_rx_private.next().await {
+                    on_private_data(bot_arc_clone.clone(),
+                                    ct_val,
+                                    private_data,
+                                    run_symbol.clone()).await;
+                }
+            }
+        });
+    }
+}
+
+async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
+    let mut trace_stack = TraceStack::default();
+
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "orders" {
+        trace_stack.on_before_format();
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(OkxSwap, data.clone(), ct_val);
+        trace_stack.on_after_format();
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for order in orders.order {
+            if order.status == "NULL" {
+                continue;
+            }
+            let order_info = OrderInfo {
+                symbol: "".to_string(),
+                amount: order.amount.abs(),
+                side: "".to_string(),
+                price: order.price,
+                client_id: order.custom_id,
+                filled_price: order.avg_price,
+                filled: order.deal_amount.abs(),
+                order_id: order.id,
+                local_time: 0,
+                create_time: 0,
+                status: order.status,
+                fee: Default::default(),
+                trace_stack: Default::default(),
+            };
+            order_infos.push(order_info);
+        }
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_order(order_infos, trace_stack);
+        }
+    } else if data.channel == "balance_and_position" {
+        let positions = standard::handle_info::HandleSwapInfo::handle_position(OkxSwap,data, ct_val);
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_position(positions);
+        }
+    } else if data.channel == "account" {
+        let account = standard::handle_info::HandleSwapInfo::handle_account_info(OkxSwap, data.clone(), run_symbol.clone());
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_equity(account);
+        }
+    }
+}
+
+async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+    let mut trace_stack = TraceStack::default();
+    trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "tickers" {
+        trace_stack.on_before_format();
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
+        trace_stack.on_after_format();
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+    } else if data.channel == "trades" {
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        let trades: Vec<OriginalTradeOK> = serde_json::from_str(data.data.as_str()).unwrap();
+        for trade in trades {
+            if trade.px > *max_buy || *max_buy == Decimal::ZERO{
+                *max_buy = trade.px
+            }
+            if trade.px < *min_sell || *min_sell == Decimal::ZERO{
+                *min_sell = trade.px
+            }
+        }
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    } else if data.channel == "books5" {
+        trace_stack.on_before_format();
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
+        trace_stack.on_after_format();
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+    }
+}
+
+fn parse_btree_map_to_okx_swap_login(exchange_params: BTreeMap<String, String>) -> OkxSwapLogin {
+    OkxSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        secret_key: exchange_params.get("secret_key").unwrap().clone(),
+        passphrase: exchange_params.get("pass_key").unwrap().clone(),
+    }
+}

+ 6 - 4
strategy/src/quant.rs

@@ -20,11 +20,10 @@ use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
-use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, BitgetSpot, GateSpot, GateSwap, KucoinSwap};
-use crate::instant_volatility_indicator::InstantVolatilityIndicator;
+use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, BitgetSpot, GateSpot, GateSwap, KucoinSwap, OkxSwap};
 
-use crate::model::{LocalPosition, OrderInfo, TokenParam, TraderMsg};
-use crate::predictor_new::PredictorNew;
+use crate::model::{LocalPosition, OrderInfo, OriginalTradeBa, TokenParam, TraderMsg};
+use crate::predictor::Predictor;
 use crate::strategy::Strategy;
 use crate::utils;
 use crate::utils::clip;
@@ -217,6 +216,9 @@ impl Quant {
                 "bitget_spot" => {
                     Exchange::new(BitgetSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
                 }
+                "okex_usdt_swap" => {
+                    Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
                 _ => {
                     error!("203未找到对应的交易所rest枚举!");
                     panic!("203未找到对应的交易所rest枚举!");

+ 86 - 8
strategy/src/strategy.rs

@@ -1,15 +1,18 @@
 use std::cmp::{max, min};
 use std::collections::HashMap;
 use std::ops::{Div, Mul};
+use std::str::FromStr;
 use chrono::Utc;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
 use rust_decimal_macros::dec;
-use crate::model::{LocalPosition, OrderInfo, TraderMsg};
+use crate::model::{DealRecord, LocalPosition, OrderInfo, TraderMsg};
 use crate::utils;
 use tracing::{info, error, warn, instrument};
+use reqwest::{Client};
+use tokio::spawn;
 use global::params::Params;
-use standard::OrderCommand;
+use standard::{OrderCommand};
 
 #[derive(Debug)]
 pub struct Strategy {
@@ -100,6 +103,9 @@ pub struct Strategy {
     pub post_side: i64,                                             // 交易方向
     pub trade_vol_24h_w: Decimal,                                   // 24小时成交额(单位:万)
     pub grid: Decimal,                                              // 网格数量
+
+    // 订单流相关
+    pub side: String,                                               // 当前主动性方向
 }
 
 impl Strategy {
@@ -191,6 +197,7 @@ impl Strategy {
             post_side: 0,
             trade_vol_24h_w: Default::default(),
             grid: Decimal::from(params.grid),
+            side: "normal".to_string(),
         };
 
         // 交易名字
@@ -336,6 +343,8 @@ impl Strategy {
             // debug!(?max_pos_rate, ?self.max_pos_rate);
         }
 
+        self.side = trader_msg.side.clone();
+
         return true;
     }
 
@@ -409,7 +418,7 @@ impl Strategy {
                              self.local_profit, self.profit, long_pos_leverage, self.long_pos_bias, short_pos_leverage, self.short_pos_bias).as_str());
         msg.push_str(format!("[请求 {:?}, 上限{:?}次/10秒], ", self._req_num_per_window, self.limit_order_requests_num).as_str());
         msg.push_str(format!("[当前参数, 开仓 {:?}, 平仓 {:?}, 方向 {:?}, 参考 {:?}, 模式 {:?}], ",
-                             self.trade_open_dist, self.trade_close_dist, self.post_side, self.ref_name[self.ref_index], self.maker_mode).as_str());
+                             self.trade_open_dist, self.trade_close_dist, self.side, self.ref_name[self.ref_index], self.maker_mode).as_str());
         msg.push_str(format!("[挂单列表,共{:?}单, ", o_num).as_str());
         for (_, order) in &self.local_orders {
             let mut order_value = order.amount * self.mp;
@@ -610,10 +619,17 @@ impl Strategy {
 
             // 判断是否在本地挂单表中
             if let Some(order) = order_some {
+                let is_side_error = (order.side == "kk" && self.side == "long") || (order.side == "kd" && self.side == "short");
+
                 // 如果订单创建时间大于100ms,才能有撤单操作
-                if self.local_time - order.create_time < 100 {
+                if  self.local_time - order.create_time < 100 {
                     need_limit_cancel = false;
                 }
+
+                // 如果方向有误,直接撤单
+                if is_side_error {
+                    need_limit_cancel = true;
+                }
             }
 
             if need_limit_cancel {
@@ -1050,7 +1066,7 @@ impl Strategy {
             // 开多订单处理
             if order.side == "kd".to_string() {
                 // 在价格范围内时不处理
-                if order.price <= long_upper && order.price >= long_lower {
+                if order.price <= long_upper && order.price >= long_lower && self.side != "short".to_string() {
                     continue
                 }
                 // debug!(?key, ?order.price, ?long_upper, ?long_lower);
@@ -1060,7 +1076,7 @@ impl Strategy {
             // 开空订单处理
             if order.side == "kk".to_string() {
                 // 在价格范围内时不处理
-                if order.price >= short_lower && order.price <= short_upper {
+                if order.price >= short_lower && order.price <= short_upper && self.side != "long".to_string() {
                     continue
                 }
                 // debug!(?key, ?order.price, ?short_lower, ?short_upper);
@@ -1170,7 +1186,7 @@ impl Strategy {
 
         // debug!(?self.post_side);
         // 挂多单
-        if self.post_side >= 0 {
+        if self.post_side >= 0 && self.side != "short".to_string() {
             // debug!(?buy_price_list);
             if buy_price_list.len() == 0 {
                 let mut target_buy_price = (long_upper + long_lower) * dec!(0.5);
@@ -1197,7 +1213,7 @@ impl Strategy {
             }
         }
         // 挂空单
-        if self.post_side <= 0 {
+        if self.post_side <= 0 && self.side != "long".to_string() {
             // debug!(?sell_price_list);
             if sell_price_list.len() == 0 {
                 let mut target_sell_price = (short_lower + short_upper) * dec!(0.5);
@@ -1273,10 +1289,72 @@ impl Strategy {
         self._refresh_request_limit();                      // 刷新频率限制
         self._update_request_num(&mut command);             // 统计刷新频率
 
+        if command.limits_open.len() > 0 || command.limits_close.len() > 0 {
+            let time = chrono::Utc::now().timestamp_millis();
+            let name = self.params.account_name.clone();
+            // 参考卖价
+            let ref_ap = self.ref_ap;
+            // 参考买价
+            let ref_bp = self.ref_bp;
+            let limits_open = command.limits_open.clone();
+            let limits_close = command.limits_close.clone();
+            spawn(async move {
+                let param_list = paras_limit_command(name.clone(), time.clone(), ref_ap.clone(), ref_bp.clone(), limits_open, limits_close);
+                let param_json_obj = serde_json::to_string(&param_list).unwrap();
+                market_warehouse_request(param_json_obj).await;
+            });
+        }
+
         return command;
     }
 }
 
+async fn market_warehouse_request(body_params: String) {
+    /****请求接口与 地址*/
+    let url = "http://as.skyfffire.com:8848/basic/saveDealRecords";
+
+    let client = Client::new();
+    let req = client.post(url).header("auth", "43626546liangjiang")
+        .header("Content-Type", "application/json").body(body_params.clone());
+
+    match req.send().await {
+        Ok(_) => {}
+        Err(_) => {}
+    };
+    // if !response.status().is_success()  {
+    //     error!("行情数据------仓库挂单数据存储失败--------!{}", response.status());
+    //     error!(body_params);
+    // }
+}
+
+fn paras_limit_command (robot_name: String, time: i64, ref_ap: Decimal, ref_bp: Decimal, limits_open: HashMap<String, Vec<String>>, limits_close: HashMap<String, Vec<String>>) -> Vec<DealRecord>{
+    let mut limits = HashMap::new();
+    limits.extend(limits_open);
+    limits.extend(limits_close);
+    let mut list: Vec<DealRecord> = Vec::with_capacity(limits.len());
+    for item in limits.keys() {
+        let item_clone = item.clone();
+        let value = limits[&item_clone].clone();
+        let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
+        let side = value.get(1).unwrap();
+        let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
+        let mut ref_price = ref_ap;
+        if "kd" == side {
+            ref_price = ref_bp;
+        }
+        let deal_recode = DealRecord {
+            refPrice: ref_price.to_string(),
+            regPrice: price.to_string(),
+            num: amount.to_string(),
+            triggerTime: time,
+            robotName: robot_name.clone(),
+            side: side.to_string(),
+        };
+        list.push(deal_recode);
+    }
+    return list;
+}
+
 #[cfg(test)]
 mod tests {
     use rust_decimal::Decimal;

+ 5 - 1
strategy/src/utils.rs

@@ -61,6 +61,8 @@ pub fn get_limit_requests_num_per_second(exchange: String) -> i64 {
         return public_params::COINEX_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("coinex_spot") {
         return public_params::COINEX_SPOT_LIMIT * public_params::RATIO;
+    } else if exchange.eq("okex_usdt_swap") {
+        return public_params::OKEX_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT * public_params::RATIO;
     } else {
@@ -87,6 +89,8 @@ pub fn get_limit_order_requests_num_per_second(exchange: String) -> i64 {
         return public_params::COINEX_USDT_SWAP_LIMIT
     } else if exchange.eq("coinex_spot") {
         return public_params::COINEX_SPOT_LIMIT
+    } else if exchange.eq("okex_usdt_swap") {
+        return public_params::OKEX_USDT_SWAP_LIMIT
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT
     } else {
@@ -144,7 +148,7 @@ mod tests {
         println!("timestamp: {}", now.timestamp());
         println!("timestamp_millis: {}", now.timestamp_millis());
         println!("timestamp_micros: {}", now.timestamp_micros());
-        println!("timestamp_nanos: {}", now.timestamp_nanos());
+        println!("timestamp_nanos: {}", now.timestamp_nanos_opt().unwrap());
     }
 
 }