Selaa lähdekoodia

kucoin 现货,参考交易所,交易交易所的对接,仍需调整

875428575@qq.com 2 vuotta sitten
vanhempi
commit
f95708c6af

+ 0 - 1
exchanges/src/binance_spot_ws.rs

@@ -25,7 +25,6 @@ pub enum BinanceSpotSubscribeType {
     PuBookTicker,
     PuAggTrade,
     PuDepth20levels100ms,
-
 }
 
 #[derive(Clone)]

+ 452 - 0
exchanges/src/kucoin_spot_rest.rs

@@ -0,0 +1,452 @@
+use std::collections::BTreeMap;
+use reqwest::header::HeaderMap;
+use hmac::{Hmac, Mac, NewMac};
+use reqwest::{Client};
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use sha2::Sha256;
+use tracing::{info, trace};
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+
+#[derive(Clone, Debug)]
+pub struct KucoinSpotRest {
+    pub label: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //是否需要登陆
+    //登陆所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+
+}
+
+impl KucoinSpotRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSpotRest
+    {
+        return KucoinSpotRest::new_lable("default-KucoinSpotRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_lable(label: String, is_colo: bool, login_param: BTreeMap<String, String>) -> KucoinSpotRest {
+        let base_url = if is_colo {
+            "https://api.kucoin.com".to_string()
+        } else {
+            "https://api.kucoin.com".to_string()
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",base_url);
+        } else {
+            info!("走普通通道:{}",base_url);
+        }
+        /*****返回结构体*******/
+        KucoinSpotRest {
+            label,
+            base_url,
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    pub async fn get_server_time(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v1".to_string(),
+                                "/timestamp".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //获取订单
+    pub async fn get_order(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+             "status":"active",
+             "tradeType":"TRADE",
+             "type":"limit"
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v1".to_string(),
+                                "/orders".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //獲取行情
+    pub async fn get_level1(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+             "symbol":symbol
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v1".to_string(),
+                                "/market/orderbook/level1".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //通過orderId获取訂單詳情
+    pub async fn get_order_by_order_id(&mut self, order_id: String) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v1".to_string(),
+                                format!("/orders/{}", order_id),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //通過clientOid獲取訂單詳情
+    pub async fn get_order_by_client_id(&mut self, client_id: String) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v1".to_string(),
+                                format!("/order/client-order/{}", client_id),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //獲取賬戶列表 - 現貨/槓桿/現貨高頻
+    pub async fn get_accounts(&mut self, currency: String) -> ResponseData {
+        let params = serde_json::json!({
+            "currency":currency,
+            "type":"trade",
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v1".to_string(),
+                                format!("/accounts"),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //獲取交易對列表
+    pub async fn get_symbols(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v2".to_string(),
+                                format!("/symbols"),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //通過orderId撤單
+    pub async fn cancel_order_by_order_id(&mut self, order_id: String) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("DELETE".to_string(),
+                                "/api/v1".to_string(),
+                                format!("/orders/{}", order_id),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //通過clientOid撤單
+    pub async fn cancel_order_by_client_id(&mut self, client_id: String) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("DELETE".to_string(),
+                                "/api/v1".to_string(),
+                                format!("/order/client-order/{}", client_id),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //获取合约令牌-公共
+    pub async fn get_public_token(&mut self) -> ResponseData {
+        let params = serde_json::json!({});
+        let data = self.request("POST".to_string(),
+                                "/api/v1".to_string(),
+                                "/bullet-public".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //获取合约令牌-私有
+    pub async fn get_private_token(&mut self) -> ResponseData {
+        let params = serde_json::json!({});
+        let data = self.request("POST".to_string(),
+                                "/api/v1".to_string(),
+                                "/bullet-private".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         params: String) -> ResponseData
+    {
+        trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        let mut pass_key = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("pass_key") {
+            pass_key = self.login_param.get("pass_key").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" || pass_key == "" {
+            is_login_param = false
+        }
+
+
+        //请求头配置-如果需要登陆则存在额外配置
+        let mut body = "".to_string();
+
+        let timestamp = chrono::Utc::now().timestamp_millis().to_string();
+
+        let mut headers = HeaderMap::new();
+        headers.insert("Content-Type", "application/json".parse().unwrap());
+        if method == "POST" {
+            body = params.clone();
+        }
+
+        //是否需要登陆-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.label.clone(), "登陆参数错误!".to_string());
+                return e;
+            } else {
+                //需要登陆-且登陆参数齐全
+                trace!("param:{}", params);
+                trace!("body:{}", body);
+                //组装sing
+                let sing = Self::sign(secret_key.clone(),
+                                      method.clone(),
+                                      prefix_url.clone(),
+                                      request_url.clone(),
+                                      params.clone(),
+                                      body.clone(),
+                                      timestamp.clone(),
+                );
+                trace!("sing:{}", sing);
+                let passphrase = Self::passphrase(secret_key, pass_key);
+                trace!("passphrase:{}", passphrase);
+                //组装header
+                headers.extend(Self::headers(sing, timestamp, passphrase, access_key));
+            }
+        }
+
+
+        trace!("headers:{:?}", headers);
+        let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let get_response = self.http_toll(
+            base_url.clone(),
+            method.to_string(),
+            params.clone(),
+            headers,
+        ).await;
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+        let res_data = Self::res_data_analysis(get_response, base_url, params);
+        res_data
+    }
+
+    pub fn headers(sign: String, timestamp: String, passphrase: String, access_key: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("KC-API-KEY", access_key.parse().unwrap());
+        headers.insert("KC-API-SIGN", sign.parse().unwrap());
+        headers.insert("KC-API-TIMESTAMP", timestamp.parse().unwrap());
+        headers.insert("KC-API-PASSPHRASE", passphrase.parse().unwrap());
+        headers.insert("KC-API-KEY-VERSION", "2".parse().unwrap());
+        headers
+    }
+    pub fn sign(secret_key: String,
+                method: String, prefix_url: String, request_url: String,
+                params: String, body_data: String, timestamp: String) -> String
+    {
+        let url = format!("{}{}", prefix_url, request_url);
+        let params_str = RestTool::parse_params_to_str(params.clone());
+        trace!("body_data:{}", body_data);
+        // let body = Some(body_data);
+        // let hashed_payload = if let Some(body) = body {
+        //     let mut m = digest::Context::new(&digest::SHA256);
+        //     m.update(body.as_bytes());
+        //     hex::encode(m.finish().as_ref())
+        // } else {
+        //     String::new()
+        // };
+        // trace!("hashed_payload:{}", hashed_payload);
+
+        let mut message = format!("{}{}{}",
+                                  timestamp,
+                                  method,
+                                  url
+        );
+        if method == "GET" || method == "DELETE" {
+            message = if params_str.len() > 0 {
+                format!("{}?{}", message, params_str)
+            } else {
+                format!("{}", message)
+            };
+        } else if method == "POST" || method == "PUT" {
+            message = format!("{}{}", message, body_data);
+        }
+
+        trace!("**********", );
+        trace!("组装数据:{}", message);
+        trace!("**********", );
+
+        let mut mac = Hmac::<Sha256>::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC");
+        mac.update(message.as_bytes());
+        let result = mac.finalize().into_bytes();
+        let base64_encoded = base64::encode(result);
+        base64_encoded
+    }
+
+    pub fn passphrase(secret_key: String, pass_key: String) -> String
+    {
+        let mut mac = Hmac::<Sha256>::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC");
+        mac.update(pass_key.as_bytes());
+        let result = mac.finalize().into_bytes();
+        let base64_encoded = base64::encode(result);
+        base64_encoded
+    }
+
+
+    async fn http_toll(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+        let res_data: ResponseData;
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url: String = if RestTool::parse_params_to_str(params.clone()) == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()))
+        };
+        trace!("url:{}", url);
+        trace!("addrs_url:{}", addrs_url);
+
+
+        let req = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(url.clone()).body(params).headers(headers),
+            "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => return Ok(ResponseData::error(self.label.clone(), format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+        };
+
+        let response = req.send().await?;
+        if response.status().is_success() {
+            // 读取响应的内容
+            let body = response.text().await?;
+            // trace!("ok-----{}", body);
+            res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body);
+        } else {
+            let body = response.text().await?;
+            // trace!("error-----{}", body);
+            res_data = ResponseData::error(self.label.clone(), body.to_string())
+        }
+
+        Ok(res_data)
+    }
+
+
+    //res_data 解析
+    pub fn res_data_analysis(result: Result<ResponseData, reqwest::Error>, base_url: String, params: String) -> ResponseData {
+        match result {
+            Ok(res_data) => {
+                if res_data.code != "200" {
+                    // res_data
+                    let mut error = res_data;
+                    error.message = format!("错误:{},url:{},相关参数:{}", error.message, base_url, params);
+                    error
+                } else {
+                    let body: String = res_data.data;
+                    let json_value: serde_json::Value = serde_json::from_str(&body).unwrap();
+
+                    let code = json_value["code"].as_str().unwrap();
+
+                    if code != "200000" {
+                        let msg = json_value["msg"].as_str().unwrap();
+                        // let error = ResponseData::new("".to_string(), code.to_string(),
+                        //                               format!("错误:{},url:{},相关参数:{}", msg, base_url, params),
+                        //                               "".parse().unwrap());
+                        // error
+
+                        let mut error = ResponseData::error(res_data.label, msg.parse().unwrap());
+                        error.code = code.parse().unwrap();
+                        error.data = format!("请求地址:{},请求参数:{}", base_url, params);
+                        error
+                    } else {
+                        let data = serde_json::to_string(&json_value["data"]).unwrap();
+                        let success = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), data.parse().unwrap());
+                        success
+                    }
+                }
+            }
+            Err(err) => {
+                let error = ResponseData::error("".to_string(), format!("json 解析失败:{},相关参数:{}", err, params));
+                error
+            }
+        }
+    }
+}

+ 507 - 0
exchanges/src/kucoin_spot_ws.rs

@@ -0,0 +1,507 @@
+use std::collections::{BTreeMap, HashSet};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::mpsc::Sender;
+use tracing::{error, info, trace};
+use crate::{proxy};
+use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
+use tungstenite::{connect, Message, WebSocket};
+use tungstenite::protocol::WebSocketConfig;
+use url::Url;
+use crate::kucoin_spot_rest::KucoinSpotRest;
+use crate::proxy::ParsingDetail;
+use crate::response_base::ResponseData;
+use crate::utils::get_time_microsecond;
+
+
+pub enum KucoinWsType {
+    Public,
+    Private,
+}
+
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct KucoinWsParam {
+    pub token: String,
+    pub ws_url: String,
+    pub ws_ping_interval: i64,
+    pub ws_ping_timeout: i64,
+}
+
+#[derive(Clone)]                        //订阅枚举
+pub enum KucoinSubscribeType {
+    PuSpotMarketLevel2Depth50,
+    PuMarketMatch,
+    PuMarketTicker,
+
+    PrSpotMarketTradeOrders,
+    PrAccountBalance,
+}
+
+#[derive(Clone)]
+pub struct KucoinSpotWs {
+    pub label: String,
+    request_url: String,
+    //实际ws 链接地址
+    proxy: ParsingDetail,
+    //代理信息
+    // login_param: BTreeMap<String, String>,
+    //登陆数据
+    ws_param: KucoinWsParam,
+    //kuconis特殊参数
+    symbol_s: Vec<String>,
+    //订阅币对
+    subscribe_types: Vec<KucoinSubscribeType>,
+    //订阅信息
+    sender: Sender<ResponseData>,     //数据通道
+}
+
+impl KucoinSpotWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub async fn new(is_colo: bool,
+                     login_param: BTreeMap<String, String>,
+                     ws_type: KucoinWsType,
+                     sender: Sender<ResponseData>,
+    ) -> KucoinSpotWs {
+        return KucoinSpotWs::new_label("default-KucoinSpotWs".to_string(), is_colo, login_param, ws_type, sender).await;
+    }
+    pub async fn new_label(label: String, _is_colo: bool,
+                           login_param: BTreeMap<String, String>,
+                           ws_type: KucoinWsType,
+                           sender: Sender<ResponseData>,
+    ) -> KucoinSpotWs
+    {
+        /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+        let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+
+        /*******公共频道-私有频道数据组装*/
+        let mut ws_param = KucoinWsParam {
+            token: "".to_string(),
+            ws_url: "".to_string(),
+            ws_ping_interval: 0,
+            ws_ping_timeout: 0,
+        };
+        let res_data = KucoinSpotWs::get_rul_token(ws_type, login_param.clone()).await;
+        match res_data {
+            Ok(param) => {
+                ws_param = param
+            }
+            Err(error) => {
+                error!("-链接地址等参数错误:{:?}", error)
+            }
+        }
+
+
+        /*****返回结构体*******/
+        KucoinSpotWs {
+            label,
+            request_url: "".to_string(),
+            proxy: parsing_detail,
+            // login_param,
+            ws_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            sender,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<KucoinSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //根据当前类型获取对应的频道 地址 与 token
+    async fn get_rul_token(ws_type: KucoinWsType, login_param: BTreeMap<String, String>) -> Result<KucoinWsParam, reqwest::Error> {
+        let mut kucoin_exc = KucoinSpotRest::new(false, login_param.clone());
+        let res_data = match ws_type {
+            KucoinWsType::Public => {
+                kucoin_exc.get_public_token().await
+            }
+            KucoinWsType::Private => {
+                kucoin_exc.get_private_token().await
+            }
+        };
+
+        trace!("kucoin-spot-rest 获取ws连接地址:{:?}",res_data);
+
+        if res_data.code == "200" {
+            let mut ws_url = "".to_string();
+            let mut ws_token = "".to_string();
+            let mut ws_ping_interval: i64 = 0;
+            let mut ws_ping_timeout: i64 = 0;
+
+
+            //数据解析
+            let parsed_json: serde_json::Value = serde_json::from_str(res_data.data.as_str()).unwrap();
+            if let Some(value) = parsed_json.get("token") {
+                let formatted_value = match value {
+                    serde_json::Value::String(s) => s.clone(),
+                    _ => value.to_string()
+                };
+                ws_token = format!("{}", formatted_value);
+            }
+            if let Some(endpoint) = parsed_json["instanceServers"][0]["endpoint"].as_str() {
+                ws_url = format!("{}", endpoint);
+            }
+            if let Some(ping_interval) = parsed_json["instanceServers"][0]["pingInterval"].as_i64() {
+                ws_ping_interval = ping_interval;
+            }
+            if let Some(ping_timeout) = parsed_json["instanceServers"][0]["pingTimeout"].as_i64() {
+                ws_ping_timeout = ping_timeout;
+            }
+
+
+            Ok(KucoinWsParam { ws_url, token: ws_token, ws_ping_interval, ws_ping_timeout })
+        } else {
+            error!("公共/私有-频道获取失败:{:?}", res_data);
+            panic!("公共/私有-频道获取失败:{:?}", res_data);
+        }
+    }
+    //自定义
+    pub async fn custom_subscribe(&mut self, bool_v1: Arc<AtomicBool>, b_array: Vec<String>)
+    {
+        let mut symbol_s = b_array.clone();
+        for symbol in symbol_s.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "-");
+        }
+        self.symbol_s = symbol_s.clone();
+        self.request_url = format!("{}?token={}", self.ws_param.ws_url, self.ws_param.token);
+        info!("走普通通道:{}",  self.request_url);
+        self.run(bool_v1).await;
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: KucoinSubscribeType) -> serde_json::Value {
+        match subscribe_type {
+            KucoinSubscribeType::PuSpotMarketLevel2Depth50 => {//level2
+                serde_json::json!({
+                     "topic": format!("/spotMarket/level2Depth50:{}", symbol),
+                     "type": "subscribe",
+                     "response": true
+                })
+            }
+            KucoinSubscribeType::PuMarketMatch => {//match
+                serde_json::json!({
+                     "topic": format!("/market/match:{}", symbol),
+                     "type": "subscribe",
+                     "response": true
+                })
+            }
+            KucoinSubscribeType::PuMarketTicker => {//ticker
+                serde_json::json!({
+                     "topic": format!("/market/ticker:{}", symbol),
+                     "type": "subscribe",
+                     "response": true
+                })
+            }
+
+            KucoinSubscribeType::PrSpotMarketTradeOrders => {//market.tradeOrders
+                serde_json::json!({
+                    "type": "subscribe",
+                    "topic": "/spotMarket/tradeOrders",
+                    "privateChannel":true,
+                    "response":true,
+                })
+            }
+            KucoinSubscribeType::PrAccountBalance => {//account.balance
+                serde_json::json!({
+                    "type": "subscribe",
+                    "topic": "/account/balance",
+                    "privateChannel":true,
+                    "response":true,
+                })
+            }
+        }
+    }
+    //组装订阅数据
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut array = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                array.push(ty_str.to_string());
+            }
+        }
+        array
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    async fn run(&self, bool_v1: Arc<AtomicBool>)
+    {
+        //订阅信息组装
+        let subscription = self.get_subscription();
+        let subscription: Vec<String> = subscription.into_iter().collect::<HashSet<String>>().into_iter().collect();
+        loop {
+            tokio::time::sleep(Duration::from_millis(5000)).await;
+            trace!("要连接咯~~!!{}", self.request_url);
+            //币安-登陆流程-rest请求获取k然后拿到 key 拼接地址
+            // if self.is_login { //暂时没看到有订阅的频道需要登陆 所以暂时不做
+            // }
+
+            let request_url = Url::parse(self.request_url.as_str()).unwrap();
+            //1. 判断是否需要代理,根据代理地址是否存来选择
+            if self.proxy.ip_address.len() > 0 {
+                let ip_array: Vec<&str> = self.proxy.ip_address.split(".").collect();
+                let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                    ip_array[0].parse().unwrap(),
+                    ip_array[1].parse().unwrap(),
+                    ip_array[2].parse().unwrap(),
+                    ip_array[3].parse().unwrap())
+                ), self.proxy.port.parse().unwrap());
+                let websocket_config = Some(WebSocketConfig {
+                    max_send_queue: Some(16),
+                    max_message_size: Some(16 * 1024 * 1024),
+                    max_frame_size: Some(16 * 1024 * 1024),
+                    accept_unmasked_frames: false,
+                });
+                let max_redirects = 5;
+                let _ = match connect_with_proxy(request_url.clone(),
+                                                 proxy_address, websocket_config, max_redirects) {
+                    Ok(ws) => {
+                        let bool_v1_clone = Arc::clone(&bool_v1);
+                        self.proxy_subscription(bool_v1_clone, ws.0, subscription.clone()).await;
+                    }
+                    Err(err) => {
+                        error!("Can't connect(无法连接): {}", err);
+                    }
+                };
+            } else {
+                let _ = match connect(request_url.clone()) {
+                    Ok(ws) => {
+                        let bool_v1_clone = Arc::clone(&bool_v1);
+                        self.subscription(bool_v1_clone, ws.0, subscription.clone()).await;
+                    }
+                    Err(err) => {
+                        // 连接失败时执行的操作
+                        error!("Can't connect(无法连接): {}", err);
+                        // 返回一个默认的 WebSocket 对象或其他适当的值
+                        // 或者根据需要触发 panic 或返回错误信息
+                    }
+                };
+            }
+            trace!("退出来咯");
+
+            let bool_v1_clone = Arc::clone(&bool_v1);
+            let bool_v1_v = bool_v1_clone.load(Ordering::SeqCst);
+            if !bool_v1_v {
+                break;
+            }
+        }
+    }
+
+    //代理
+    async fn proxy_subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<ProxyAutoStream>,
+                                subscription: Vec<String>)
+    {
+        info!("走代理-链接成功!开始数据读取");
+        let label = self.label.clone();
+        /*****消息溜***/
+        let mut ping_interval = chrono::Utc::now().timestamp_millis();
+        let mut ping_timeout = chrono::Utc::now().timestamp_millis();
+        loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    //心跳-一定时间间隔发送心跳
+                    let get_time = chrono::Utc::now().timestamp_millis();
+                    // trace!("--心跳-{}-{}-{}-{}-{}", get_time, ping_interval
+                    //          , (get_time - ping_interval), self.ws_param.ws_ping_interval, (get_time - ping_interval) >= self.ws_param.ws_ping_interval);
+                    if (get_time - ping_interval) >= self.ws_param.ws_ping_interval {
+                        web_socket.write_message(Message::Ping(Vec::from("ping")))
+                            .unwrap();
+                        trace!("--发送心跳-ping");
+                        ping_interval = get_time;
+                        ping_timeout = get_time;
+                    } else if (get_time - ping_timeout) > (self.ws_param.ws_ping_timeout + self.ws_param.ws_ping_interval) {
+                        //心跳超时-发送心跳之后 一定时间没有响应
+                        trace!("--心跳相应超时-重连");
+                        break;
+                    }
+
+                    // trace!("获取推送:{}",text.clone());
+                    // trace!(stdout, "Text-响应--{:?}", text.clone()).unwrap();
+                    let mut res_data = Self::ok_text(label.to_string(), text);
+                    res_data.time = get_time_microsecond();
+                    // trace!("获取推送:{:?}", res_data);
+                    if res_data.code == "-200" {//表示链接成功
+                        for sub in &subscription {
+                            trace!("--发起订阅:{:?}", sub);
+                            web_socket.write_message(Message::Text(sub.parse().unwrap()))
+                                .unwrap();
+                        }
+                    } else if res_data.code == "-201" {
+                        trace!("订阅成功:{:?}", res_data);
+                    } else if res_data.code == "-202" {
+                        trace!("无用数据:{:?}", res_data);
+                    } else {
+                        let sender = self.sender.clone();
+                        tokio::spawn(async move {
+                            sender.send(res_data).await.unwrap();
+                        });
+                        tokio::spawn(async move {});
+                    }
+                }
+                Ok(Message::Ping(s)) => {
+                    trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
+                    let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
+                    trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
+                }
+                Ok(Message::Pong(s)) => {
+                    // trace!("Pong-响应--{:?}", String::from_utf8(s));
+                    trace!( "Pong-响应--{:?}", String::from_utf8(s.clone()));
+                    ping_timeout = chrono::Utc::now().timestamp_millis();
+                }
+                Ok(Message::Close(_)) => {
+                    // trace!("socket 关闭: ");
+                    trace!( "Close-响应");
+                }
+                Err(error) => {
+                    // trace!("Error receiving message: {}", error);
+                    error!( "Err-响应{}", error);
+                    break;
+                }
+                _ => {}
+            }
+
+
+            let bool_v1_v = bool_v1.load(Ordering::SeqCst);
+            if !bool_v1_v {
+                break;
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+
+    //非代理
+    async fn subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<AutoStream>,
+                          subscription: Vec<String>)
+    {
+        info!("链接成功!开始数据读取");
+        let label = self.label.clone();
+        /*****消息溜***/
+        let mut ping_interval = chrono::Utc::now().timestamp_millis();
+        let mut ping_timeout = chrono::Utc::now().timestamp_millis();
+        loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    //心跳-一定时间间隔发送心跳
+                    let get_time = chrono::Utc::now().timestamp_millis();
+                    // trace!("--心跳-{}-{}-{}-{}-{}", get_time, ping_interval
+                    //          , (get_time - ping_interval), self.ws_param.ws_ping_interval, (get_time - ping_interval) >= self.ws_param.ws_ping_interval);
+                    if (get_time - ping_interval) >= self.ws_param.ws_ping_interval {
+                        web_socket.write_message(Message::Ping(Vec::from("ping")))
+                            .unwrap();
+                        trace!("--发送心跳-ping");
+                        ping_interval = get_time;
+                        ping_timeout = get_time;
+                    } else if (get_time - ping_timeout) > (self.ws_param.ws_ping_timeout + self.ws_param.ws_ping_interval) {
+                        //心跳超时-发送心跳之后 一定时间没有响应
+                        trace!("--心跳相应超时-重连");
+                        break;
+                    }
+
+                    trace!("获取推送:{}",text.clone());
+                    // trace!(stdout, "Text-响应--{:?}", text.clone()).unwrap();
+                    let mut res_data = Self::ok_text(label.to_string(), text);
+                    res_data.time = get_time_microsecond();
+                    // trace!("获取推送:{:?}", res_data);
+                    if res_data.code == "-200" {//表示链接成功
+                        for sub in &subscription {
+                            trace!("--发起订阅:{:?}", sub);
+                            web_socket.write_message(Message::Text(sub.parse().unwrap()))
+                                .unwrap();
+                        }
+                    } else if res_data.code == "-201" {
+                        trace!("订阅成功:{:?}", res_data);
+                    } else if res_data.code == "-202" {
+                        trace!("无用数据:{:?}", res_data);
+                    } else {
+                        let sender = self.sender.clone();
+                        tokio::spawn(async move {
+                            sender.send(res_data).await.unwrap();
+                        });
+                        tokio::spawn(async move {});
+                    }
+                }
+                Ok(Message::Ping(s)) => {
+                    trace!("Ping-响应--{:?}", String::from_utf8(s.clone()));
+                    let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
+                    trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
+                }
+                Ok(Message::Pong(s)) => {
+                    // trace!("Pong-响应--{:?}", String::from_utf8(s));
+                    trace!("Pong-响应--{:?}", String::from_utf8(s.clone()));
+                    ping_timeout = chrono::Utc::now().timestamp_millis();
+                }
+                Ok(Message::Close(_)) => {
+                    // trace!("socket 关闭: ");
+                    trace!( "Close-响应");
+                }
+                Err(error) => {
+                    // trace!("Error receiving message: {}", error);
+                    error!( "Err-响应{}", error);
+                    break;
+                }
+                _ => {}
+            }
+
+            let bool_v1_v = bool_v1.load(Ordering::SeqCst);
+            if !bool_v1_v {
+                break;
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+
+    //数据解析
+    pub fn ok_text(lable: String, text: String) -> ResponseData
+    {
+        let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+        //订阅 相应
+        if json_value["type"].as_str() == Option::from("welcome") {
+            //链接成功
+            res_data.code = "-200".to_string();
+            res_data.message = "链接成功,主动发起订阅".to_string();
+            trace!("链接成功,主动发起订阅:");
+        } else if json_value["type"].as_str() == Option::from("ack") {
+            res_data.code = "-201".to_string();
+            res_data.message = "订阅成功".to_string();
+        } else if json_value["type"].as_str() == Option::from("error") {
+            res_data.code = format!("{}", json_value["code"]);
+            res_data.message = format!("{}", json_value["data"].as_str().unwrap());
+        } else if json_value.get("topic").is_some() {
+            res_data.channel = format!("{}", json_value["subject"].as_str().unwrap());
+
+            if json_value["topic"].as_str() == Option::from("/contractAccount/wallet") {
+                res_data.code = "-202".to_string();
+                if json_value["subject"].as_str() == Option::from("availableBalance.change") {
+                    res_data.code = "200".to_string();
+                    res_data.data = json_value["data"].to_string();
+                } else {}
+            } else {
+                res_data.data = json_value["data"].to_string();
+            }
+        } else {
+            res_data.code = "-1".to_string();
+            res_data.message = "未知解析".to_string();
+        }
+        res_data
+    }
+}

+ 1 - 1
exchanges/src/kucoin_swap_ws.rs

@@ -130,7 +130,7 @@ impl KucoinSwapWs {
             }
         };
 
-        trace!("kucoin-rest 获取ws连接地址:{:?}",res_data);
+        trace!("kucoin-swap-rest 获取ws连接地址:{:?}",res_data);
 
         if res_data.code == "200" {
             let mut ws_url = "".to_string();

+ 2 - 0
exchanges/src/lib.rs

@@ -17,4 +17,6 @@ pub mod binance_swap_rest;
 mod utils;
 pub mod bitget_spot_ws;
 pub mod bitget_spot_rest;
+pub mod kucoin_spot_ws;
+pub mod kucoin_spot_rest;
 

+ 1 - 0
exchanges/src/okx_swap_ws.rs

@@ -331,6 +331,7 @@ impl OkxSwapWs {
         /*****消息溜***/
         let mut ping_timeout = chrono::Utc::now().timestamp_millis();
         loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {

+ 18 - 4
exchanges/tests/bitget_spot_test.rs

@@ -13,9 +13,9 @@ use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType}
 use exchanges::proxy;
 use exchanges::response_base::ResponseData;
 
-const ACCESS_KEY: &str = "bg_f23d43b1e9d9cdf3ff34ce9efdd94d12";
-const SECRET_KEY: &str = "29a4be3fb17ef9df99ff1bfcf81cbb20544c7f99fa80160fbb09bfce54c0e9d3";
-const PASS_KEY: &str = "Bitget123123";
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+const PASS_KEY: &str = "";
 
 //ws-订阅公共频道信息
 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -23,7 +23,7 @@ async fn ws_custom_subscribe_pu() {
     global::log_utils::init_log_with_trace();
 
     let mut bool_v1 = Arc::new(AtomicBool::new(true));
-    let btree_map: BTreeMap<String, String> = BTreeMap::new();
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
     let (tx, mut rx) = channel(1024);
     let mut ws = get_ws(btree_map, BitgetWsType::Public, tx).await;
     ws.set_subscribe(vec![
@@ -390,6 +390,20 @@ async fn rest_cancel_plan_orders_test() {
     trace!(?rep_data)
 }
 
+//rest-划转
+#[tokio::test]
+async fn rest_wallet_transfer_test() {
+    global::log_utils::init_log_with_trace();
+    let mut rest = get_rest();
+    let rep_data = rest.wallet_transfer("".to_string(),
+                                        "".to_string(),
+                                        "".to_string(),
+                                        "".to_string(),
+                                        "".to_string(),
+                                        "".to_string()).await;
+    trace!(?rep_data)
+}
+
 
 async fn get_ws(btree_map: BTreeMap<String, String>, type_v: BitgetWsType, tx: Sender<ResponseData>) -> BitgetSpotWs {
     let mut ku_ws = BitgetSpotWs::new(false, btree_map.clone(),

+ 171 - 0
exchanges/tests/kucoin_spot_test.rs

@@ -0,0 +1,171 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use tokio::sync::mpsc::{channel, Sender};
+use tokio::try_join;
+use tracing::trace;
+use exchanges::kucoin_spot_rest::KucoinSpotRest;
+use exchanges::kucoin_spot_ws::{KucoinSpotWs, KucoinSubscribeType, KucoinWsType};
+use exchanges::response_base::ResponseData;
+
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+const PASS_KEY: &str = "";
+
+//ws-订阅公共频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ws_custom_subscribe_pu() {
+    global::log_utils::init_log_with_trace();
+
+    let mut bool_v1 = Arc::new(AtomicBool::new(true));
+    let btree_map: BTreeMap<String, String> = BTreeMap::new();
+    let (tx, mut rx) = channel(1024);
+    let mut ws = get_ws(btree_map, KucoinWsType::Public, tx).await;
+    ws.set_subscribe(vec![
+        // KucoinSubscribeType::PuMarketMatch,
+        // KucoinSubscribeType::PuSpotMarketLevel2Depth50,
+        KucoinSubscribeType::PuMarketTicker,
+    ]);
+    let t1 = tokio::spawn(async move {
+        ws.custom_subscribe(bool_v1, vec!["ACH_USDT".to_string(), "ROSE_USDT".to_string()]).await;
+    });
+    let t2 = tokio::spawn(async move {
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                trace!( "age: {:?}", received);
+            }
+        }
+    });
+    try_join!(t1,t2).unwrap();
+}
+
+//ws-订阅私有频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ws_custom_subscribe_pr() {
+    global::log_utils::init_log_with_trace();
+
+    let mut bool_v1 = Arc::new(AtomicBool::new(true));
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+    btree_map.insert("pass_key".to_string(), PASS_KEY.to_string());
+    let (tx, mut rx) = channel(1024);
+    let mut ws = get_ws(btree_map, KucoinWsType::Private, tx).await;
+    ws.set_subscribe(vec![
+        KucoinSubscribeType::PrAccountBalance,
+        KucoinSubscribeType::PrSpotMarketTradeOrders,
+        KucoinSubscribeType::PuMarketTicker,
+    ]);
+    let t1 = tokio::spawn(async move {
+        ws.custom_subscribe(bool_v1, vec!["ACH_USDT".to_string(), "ROSE_USDT".to_string()]).await;
+    });
+
+    let t2 = tokio::spawn(async move {
+        loop {
+            if let Ok(received) = rx.try_recv() {
+                trace!( "age: {:?}", received);
+            }
+        }
+    });
+    try_join!(t1,t2).unwrap();
+}
+
+
+//rest-获取订单
+#[tokio::test]
+async fn rest_get_order_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_order().await;
+    trace!(?rep_data)
+}
+
+//rest-獲取行情
+#[tokio::test]
+async fn rest_get_level1_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_level1("BTC-USDT".to_string()).await;
+    trace!(?rep_data)
+}
+
+//rest-通過orderId获取訂單詳情
+#[tokio::test]
+async fn rest_get_order_by_order_id_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_order_by_order_id("3123123".to_string()).await;
+    trace!(?rep_data)
+}
+
+//rest-通過clientOid獲取訂單詳情
+#[tokio::test]
+async fn rest_get_order_by_client_id_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_order_by_client_id("3123123".to_string()).await;
+    trace!(?rep_data)
+}
+
+//rest-獲取賬戶列表 - 現貨/槓桿/現貨高頻
+#[tokio::test]
+async fn rest_get_accounts_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_accounts("USDT".to_string()).await;
+    trace!(?rep_data)
+}
+
+//rest-獲取交易對列表
+#[tokio::test]
+async fn rest_get_symbols_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_symbols().await;
+    trace!(?rep_data)
+}
+
+//rest-通過orderId撤單- 没权限需要查看
+#[tokio::test]
+async fn rest_cancel_order_by_order_id_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.cancel_order_by_order_id("dddd123131".to_string()).await;
+    trace!(?rep_data)
+}
+
+//rest-通過clientOid撤單- 没权限需要查看
+#[tokio::test]
+async fn rest_cancel_order_by_client_id_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.cancel_order_by_client_id("dddd123131".to_string()).await;
+    trace!(?rep_data)
+}
+
+
+async fn get_ws(btree_map: BTreeMap<String, String>, type_v: KucoinWsType, tx: Sender<ResponseData>) -> KucoinSpotWs {
+    let mut ku_ws = KucoinSpotWs::new(false, btree_map.clone(),
+                                      type_v, tx).await;
+    ku_ws
+}
+
+fn get_rest() -> KucoinSpotRest {
+    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+    // btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    // btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+    btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
+    btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
+    btree_map.insert("pass_key".to_string(), PASS_KEY.to_string());
+
+    let mut ku_exc = KucoinSpotRest::new(false, btree_map);
+    ku_exc
+}

+ 2 - 2
exchanges/tests/okx_swap_test.rs

@@ -34,8 +34,8 @@ async fn ws_custom_subscribe_pu() {
     ws.set_subscribe(vec![
         OkxSubscribeType::PuIndexTickers,
         OkxSubscribeType::PuBooks5,
-        OkxSubscribeType::Putrades,
-        OkxSubscribeType::PuBooks50L2tbt,
+        // OkxSubscribeType::Putrades,
+        // OkxSubscribeType::PuBooks50L2tbt,
     ]);
 
     let t1 = tokio::spawn(async move {