Bladeren bron

优化代码

875428575@qq.com 2 jaren geleden
bovenliggende
commit
50a458b767
4 gewijzigde bestanden met toevoegingen van 420 en 3 verwijderingen
  1. 1 1
      exchanges/src/kucoin_swap_rest.rs
  2. 410 0
      exchanges/src/kuconin_swap_ws.rs
  3. 8 2
      exchanges/src/lib.rs
  4. 1 0
      exchanges/src/proxy.rs

+ 1 - 1
exchanges/src/kucoin_swap_rest.rs

@@ -5,7 +5,7 @@ use reqwest::{Client};
 use sha2::Sha256;
 use crate::http_tool::RestTool;
 use crate::response_base::ResponseData;
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct KucoinSwapRest {
     base_url: String,
     client: reqwest::Client,

+ 410 - 0
exchanges/src/kuconin_swap_ws.rs

@@ -0,0 +1,410 @@
+use std::collections::{BTreeMap, HashSet};
+use std::error::Error;
+use std::future::Future;
+use std::io;
+use std::io::{Write};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::sync::{mpsc};
+use std::thread::sleep;
+use hmac::{NewMac};
+use serde_json::{json, Value};
+use serde_json::map::Values;
+use tokio::runtime::Runtime;
+use crate::{proxy, WsType};
+use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
+use tungstenite::{connect, Message, WebSocket};
+use tungstenite::protocol::WebSocketConfig;
+use url::Url;
+use crate::kucoin_swap_rest::KucoinSwapRest;
+use crate::proxy::ParsingDetail;
+use crate::response_base::ResponseData;
+
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct WsParam {
+    pub token: String,
+    pub ws_url: String,
+    pub ws_ping_interval: i64,
+    pub ws_ping_timeout: i64,
+}
+
+#[derive(Clone)]
+pub enum SubscribeType {
+    PuContractMarketLevel2Depth50,
+    // level2的50檔全量數據推送頻道
+    PuContractMarketExecution, //  成交记录
+
+    PrContractAccountWallet,
+    PrContractPosition,
+    PrContractMarketTradeOrdersSys,
+    PrContractMarketTradeOrders,
+}
+
+#[derive(Clone)]
+pub struct KuconinSwapWs {
+    request_url: String,
+
+    proxy: ParsingDetail,
+    // ip: String,
+    // port: u16,
+    login_param: BTreeMap<String, String>,
+    // token: String,
+    // ws_url: String,
+    // ws_ping_interval: i64,
+    // ws_ping_timeout: i64,
+    ws_param: WsParam,
+    symbol_s: Vec<String>,
+    subscribe_types: Vec<SubscribeType>,
+    sender: mpsc::Sender<ResponseData>,
+}
+
+impl KuconinSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub async fn new(is_colo: bool,
+                     login_param: BTreeMap<String, String>,
+                     ws_type: WsType,
+                     sender: mpsc::Sender<ResponseData>,
+    ) -> KuconinSwapWs
+    {
+        if is_colo {
+            println!("不支持高速通道")
+        }
+
+        /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+        let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+
+        /*******公共频道-私有频道数据组装*/
+        let mut ws_param = WsParam {
+            token: "".to_string(),
+            ws_url: "".to_string(),
+            ws_ping_interval: 0,
+            ws_ping_timeout: 0,
+        };
+        let res_data = KuconinSwapWs::get_rul_token(ws_type, login_param.clone()).await;
+        match res_data {
+            Ok(param) => {
+                ws_param = param
+            }
+            Err(error) => {
+                print!("-链接地址等参数错误:{:?}", error)
+            }
+        }
+        /*****返回结构体*******/
+        KuconinSwapWs {
+            request_url: "".to_string(),
+            proxy: parsing_detail,
+            login_param,
+            ws_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            sender,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<SubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //根据当前类型获取对应的频道 地址 与 token
+    async fn get_rul_token(ws_type: WsType, login_param: BTreeMap<String, String>) -> Result<WsParam, reqwest::Error> {
+        let kucoin_exc = KucoinSwapRest::new(false, login_param.clone());
+        let mut res_data = ResponseData::error("等待数据~~~".to_string());
+        match ws_type {
+            WsType::Public => {
+                res_data = kucoin_exc.get_public_token().await;
+            }
+            WsType::Private => {
+                res_data = kucoin_exc.get_private_token().await;
+            }
+        }
+
+        if res_data.code == "200" {
+            let mut ws_url = "".to_string();
+            let mut ws_token = "".to_string();
+            let mut ws_ping_interval: i64 = 0;
+            let mut ws_ping_timeout: i64 = 0;
+
+
+            //数据解析
+            let parsed_json: serde_json::Value = serde_json::from_str(res_data.data.as_str()).unwrap();
+            if let Some(value) = parsed_json.get("token") {
+                let formatted_value = match value {
+                    serde_json::Value::String(s) => s.clone(),
+                    _ => value.to_string()
+                };
+                ws_token = format!("{}", formatted_value);
+            }
+            if let Some(endpoint) = parsed_json["instanceServers"][0]["endpoint"].as_str() {
+                ws_url = format!("{}", endpoint);
+            }
+            if let Some(pingInterval) = parsed_json["instanceServers"][0]["pingInterval"].as_i64() {
+                ws_ping_interval = pingInterval;
+            }
+            if let Some(pingTimeout) = parsed_json["instanceServers"][0]["pingTimeout"].as_i64() {
+                ws_ping_timeout = pingTimeout;
+            }
+
+
+            Ok(WsParam { ws_url, token: ws_token, ws_ping_interval, ws_ping_timeout })
+        } else {
+            panic!("公共/私有-频道获取失败:{:?}", res_data)
+        }
+    }
+    //自定义
+    pub fn custom_subscribe(&mut self, b_array: Vec<String>)
+    {
+        self.symbol_s = b_array.clone();
+        self.request_url = format!("{}?token={}", self.ws_param.ws_url, self.ws_param.token);
+        self.run();
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: SubscribeType) -> String {
+        match subscribe_type {
+            SubscribeType::PuContractMarketLevel2Depth50 => {
+                format!("/contractMarket/level2Depth50:{}", symbol)
+            }
+            SubscribeType::PuContractMarketExecution => {
+                format!("/contractMarket/execution:{}", symbol)
+            }
+            SubscribeType::PrContractAccountWallet => {
+                format!("/contractAccount/wallet")
+            }
+            SubscribeType::PrContractPosition => {
+                format!("/contract/position:{}", symbol)
+            }
+            SubscribeType::PrContractMarketTradeOrdersSys => {
+                format!("/contractMarket/tradeOrders")
+            }
+            SubscribeType::PrContractMarketTradeOrders => {
+                format!("/contractMarket/tradeOrders:{}", symbol)
+            }
+        }
+    }
+    //组装订阅数据
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut array = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                array.push(json!({
+                    "topic": ty_str,
+                     "type": "subscribe",
+                }).to_string());
+            }
+        }
+        array
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    fn run(&self)
+    {
+        //订阅信息组装
+        let subscription = self.get_subscription();
+        let subscription: Vec<String> = subscription.into_iter().collect::<HashSet<String>>().into_iter().collect();
+        loop {
+            println!("要连接咯~~!!{}", self.request_url);
+            //币安-登陆流程-rest请求获取k然后拿到 key 拼接地址
+            // if self.is_login { //暂时没看到有订阅的频道需要登陆 所以暂时不做
+            // }
+
+            let request_url = Url::parse(self.request_url.as_str()).unwrap();
+            //1. 判断是否需要代理,根据代理地址是否存来选择
+            if self.proxy.ip_address.len() > 0 {
+                let ip_array: Vec<&str> = self.proxy.ip_address.split(".").collect();
+                let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                    ip_array[0].parse().unwrap(),
+                    ip_array[1].parse().unwrap(),
+                    ip_array[2].parse().unwrap(),
+                    ip_array[3].parse().unwrap())
+                ), self.proxy.port.parse().unwrap());
+                let websocket_config = Some(WebSocketConfig {
+                    max_send_queue: Some(16),
+                    max_message_size: Some(16 * 1024 * 1024),
+                    max_frame_size: Some(16 * 1024 * 1024),
+                    accept_unmasked_frames: false,
+                });
+                let max_redirects = 5;
+                match connect_with_proxy(request_url.clone(),
+                                         proxy_address, websocket_config, max_redirects) {
+                    Ok(ws) => {
+                        self.proxy_subscription(ws.0, subscription.clone());
+                    }
+                    Err(err) => {
+                        println!("Can't connect(无法连接): {}", err);
+                    }
+                };
+            } else {
+                let no_proxy_ws = match connect(request_url.clone()) {
+                    Ok(ws) => {
+                        self.subscription(ws.0, subscription.clone());
+                    }
+                    Err(err) => {
+                        // 连接失败时执行的操作
+                        println!("Can't connect(无法连接): {}", err);
+                        // 返回一个默认的 WebSocket 对象或其他适当的值
+                        // 或者根据需要触发 panic 或返回错误信息
+                    }
+                };
+            }
+            println!("退出来咯")
+        }
+    }
+
+    //代理
+    fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>,
+                          subscription: Vec<String>,
+    )
+    {
+        /*****消息溜***/
+        let mut stdout = io::stdout();
+        let mut ping_interval = chrono::Utc::now().timestamp_millis();
+        let mut ping_timeout = chrono::Utc::now().timestamp_millis();
+        loop {
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    // writeln!(stdout, "Text-响应--{:?}", text.clone()).unwrap();
+                    let res_data = Self::ok_text(text);
+
+                    if res_data.code == "-200" {//表示链接成功
+                        for sub in &subscription {
+                            println!("--订阅-成功-内容:{:?}", sub);
+                            web_socket.write_message(Message::Text(sub.parse().unwrap()))
+                                .unwrap();
+                        }
+                    } else {
+                        //心跳-一定时间间隔发送心跳
+                        let mut get_time = chrono::Utc::now().timestamp_millis();
+                        // println!("--心跳-{}-{}-{}-{}-{}", get_time, ping_interval
+                        //          , (get_time - ping_interval), self.ws_param.ws_ping_interval, (get_time - ping_interval) >= self.ws_param.ws_ping_interval);
+                        if (get_time - ping_interval) >= self.ws_param.ws_ping_interval {
+                            web_socket.write_message(Message::Ping(Vec::from("ping")))
+                                .unwrap();
+                            println!("--发送心跳-ping");
+                            ping_interval = get_time;
+                            ping_timeout = get_time;
+                        } else if (get_time - ping_timeout) > (self.ws_param.ws_ping_timeout + self.ws_param.ws_ping_interval) {
+                            //心跳超时-发送心跳之后 一定时间没有响应
+                            println!("--心跳相应超时-重连");
+                            break;
+                        }
+
+                        self.sender.send(res_data).unwrap();
+                    }
+                }
+                Ok(Message::Ping(s)) => {
+                    writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+                    let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
+                    writeln!(stdout, "回应-pong---{:?}", String::from_utf8(s.clone())).unwrap();
+                }
+                Ok(Message::Pong(s)) => {
+                    // println!("Pong-响应--{:?}", String::from_utf8(s));
+                    writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s.clone())).unwrap();
+                    ping_timeout = chrono::Utc::now().timestamp_millis();
+                }
+                Ok(Message::Close(_)) => {
+                    // println!("socket 关闭: ");
+                    writeln!(stdout, "Close-响应").unwrap();
+                }
+                Err(error) => {
+                    // println!("Error receiving message: {}", error);
+                    writeln!(stdout, "Err-响应{}", error).unwrap();
+                    break;
+                }
+                _ => {}
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+    //非代理
+    fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
+                    subscription: Vec<String>,
+    )
+
+    {
+        /*****消息溜***/
+        // let  stdout = io::stdout();
+        // let mut stderr = io::stderr();
+        /*****是否需要登陆****/
+        // if self.is_login {
+        //     println!("----需要登陆");
+        //     // let login_json_str = self.log_in_to_str();
+        //     // web_socket.write_message(Message::Text(login_json_str)).unwrap();
+        //     // thread::sleep(Duration::from_secs(1));
+        // } else {
+        //     println!("----no longin(不需要登陆)");
+        // }
+        /******订阅信息********/
+        // let sub_json = subscription.clone();
+        // println!("--订阅内容:{:?}", sub_json);
+        // let sub_json_str = sub_json.to_string();
+        // web_socket.write_message(Message::Text(sub_json_str))
+        //     .unwrap();
+        loop {
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    let res_data = Self::ok_text(text);
+                    self.sender.send(res_data).unwrap();
+                    // writeln!(stdout, "Pong-响应--{:?}", res_data).unwrap();
+                    // let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
+                    // tokio::spawn(async move {
+                    //     let parse_fn_lock = parse_fn_clone.lock().await;
+                    //     parse_fn_lock(res_data).await;
+                    // });
+                }
+                Ok(Message::Ping(s)) => {
+                    println!("Ping-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Pong(s)) => {
+                    println!("Pong-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Close(_)) => {
+                    println!("socket 关闭: ");
+                }
+                Err(error) => {
+                    println!("Error receiving message: {}", error);
+                    break;
+                }
+                _ => {}
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        let mut res_data = ResponseData::new("200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+        if json_value.get("id").is_some() && json_value.get("type").is_some() {
+            //订阅 相应
+            if json_value["type"].as_str() == Option::from("welcome") {
+                //链接成功
+                res_data.code = "-200".to_string();
+                res_data.message = "链接成功,主动发起订阅".to_string();
+            } else if json_value["type"].as_str() == Option::from("ack") {
+                res_data.message = "订阅成功".to_string();
+            }
+        } else if json_value.get("topic").is_some() {
+            let topic = json_value["topic"].to_string();
+            let parts: Vec<&str> = topic.split(':').collect();
+            res_data.channel = topic.to_string();
+            res_data.data = json_value["data"].to_string();
+        } else {
+            res_data.code = "-1".to_string();
+            res_data.message = "未知解析".to_string();
+        }
+        res_data
+    }
+}

+ 8 - 2
exchanges/src/lib.rs

@@ -11,12 +11,18 @@ pub mod gate_swap_ws;
 pub mod gate_swap_rest;
 pub mod socket_tool;
 pub mod kucoin_swap_rest;
-// pub mod kuconin_swap_ws;
+pub mod kuconin_swap_ws;
 
 
-pub enum  WsType{
+pub enum WsType {
     Public,
     Private,
 }
 
+pub enum OrderType {
+    LongBuy,
+    LongSell,
+    ShortSell,
+    ShortBuy,
+}
 

+ 1 - 0
exchanges/src/proxy.rs

@@ -2,6 +2,7 @@ use std::env;
 
 /**代理工具*/
 #[derive(Debug)]
+#[derive(Clone)]
 pub struct ParsingDetail {
     pub ip_address: String,
     pub port: String,