875428575@qq.com 2 سال پیش
والد
کامیت
28249b2f70
2فایلهای تغییر یافته به همراه382 افزوده شده و 8 حذف شده
  1. 380 6
      exchanges/src/binance_spot_ws.rs
  2. 2 2
      exchanges/src/kucoin_swap_rest.rs

+ 380 - 6
exchanges/src/binance_spot_ws.rs

@@ -1,16 +1,390 @@
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::mpsc::Sender;
+use tracing::{info, trace};
+use crate::{proxy};
+use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
+use tungstenite::{connect, Message, WebSocket};
+use tungstenite::protocol::WebSocketConfig;
+use url::Url;
+use crate::proxy::ParsingDetail;
+use crate::response_base::ResponseData;
+
+pub enum BinanceSpotWsType {
+    //订阅频道类型
+    PublicAndPrivate,
+}
+
+
+#[derive(Clone)]                        //订阅枚举
+pub enum BinanceSpotSubscribeType {
+    PuBookTicker,
+    PuAggTrade,
+    PuDepth20levels100ms,
+}
 
 #[derive(Clone)]
-pub struct BinanceSpotWs {}
+pub struct BinanceSpotWs {
+    pub label: String,
+    request_url: String,
+    //实际ws 链接地址
+    proxy: ParsingDetail,
+    //账号信息
+    login_param: BTreeMap<String, String>,
+    //kuconis特殊参数
+    symbol_s: Vec<String>,
+    //订阅币对
+    subscribe_types: Vec<BinanceSpotSubscribeType>,
+    //订阅信息
+    sender: Sender<ResponseData>,
+    //数据通道
+}
 
 impl BinanceSpotWs {
-    pub fn new(_is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotWs
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool,
+               login_param: BTreeMap<String, String>,
+               ws_type: BinanceSpotWsType,
+               sender: Sender<ResponseData>,
+    ) -> BinanceSpotWs
+    {
+        return BinanceSpotWs::new_label("default-BinanceSpotWs".to_string(), is_colo, login_param, ws_type, sender);
+    }
+    pub fn new_label(label: String, is_colo: bool,
+                     login_param: BTreeMap<String, String>,
+                     ws_type: BinanceSpotWsType,
+                     sender: Sender<ResponseData>,
+    ) -> BinanceSpotWs
+    {
+        if is_colo {
+            trace!("不支持高速通道")
+        }
+
+        /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+        let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+
+        /*******公共频道-私有频道数据组装*/
+        let request_url = match ws_type {
+            BinanceSpotWsType::PublicAndPrivate => {
+                "wss://stream.binance.com:9443/ws".to_string()
+            }
+        };
+
+        /*****返回结构体*******/
+        BinanceSpotWs {
+            label,
+            request_url,
+            proxy: parsing_detail,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            sender,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BinanceSpotSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //自定义
+    pub async fn custom_subscribe(&mut self, bool_v1: Arc<AtomicBool>, b_array: Vec<String>)
+    {
+        let mut symbol_s = b_array.clone();
+        for symbol in symbol_s.iter_mut() {
+            // 大写
+            *symbol = symbol.to_lowercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+            *symbol = symbol.replace("-", "");
+        }
+        self.symbol_s = symbol_s;
+        let log_in = self.login_param.clone();
+        trace!(?log_in);
+        self.run(bool_v1).await;
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+
+
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: BinanceSpotSubscribeType) -> String {
+        match subscribe_type {
+            BinanceSpotSubscribeType::PuAggTrade => {
+                format!("{}@aggTrade", symbol)
+            }
+            BinanceSpotSubscribeType::PuDepth20levels100ms => {
+                format!("{}@depth20@100ms", symbol)
+            }
+            BinanceSpotSubscribeType::PuBookTicker => {
+                format!("{}@bookTicker", symbol)
+            }
+        }
+    }
+    //组装订阅数据
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut args = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                args.push(ty_str);
+            }
+        }
+
+        trace!("订阅信息:{:?}", args);
+        args
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    async fn run(&mut self, bool_v1: Arc<AtomicBool>)
+    {
+        //订阅信息组装
+
+        let params = self.get_subscription();
+        let subscription_str = if params.len() > 0 {
+            serde_json::json!({
+                "method": "SUBSCRIBE",
+                "params":serde_json::Value::from(params),
+                "id": 1
+            }).to_string()
+        } else {
+            "".to_string()
+        };
+        trace!("订阅内容:{}",subscription_str);
+
+        let request_url = self.request_url.clone();
+        loop {
+            info!("要连接咯~~!!{}", request_url);
+            let request_url = Url::parse(request_url.as_str()).unwrap();
+            //1. 判断是否需要代理,根据代理地址是否存来选择
+            if self.proxy.ip_address.len() > 0 {
+                let ip_array: Vec<&str> = self.proxy.ip_address.split(".").collect();
+                let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                    ip_array[0].parse().unwrap(),
+                    ip_array[1].parse().unwrap(),
+                    ip_array[2].parse().unwrap(),
+                    ip_array[3].parse().unwrap())
+                ), self.proxy.port.parse().unwrap());
+                let websocket_config = Some(WebSocketConfig {
+                    max_send_queue: Some(16),
+                    max_message_size: Some(16 * 1024 * 1024),
+                    max_frame_size: Some(16 * 1024 * 1024),
+                    accept_unmasked_frames: false,
+                });
+                let max_redirects = 5;
+                match connect_with_proxy(request_url.clone(),
+                                         proxy_address, websocket_config, max_redirects) {
+                    Ok(ws) => {
+                        let bool_v1_clone = Arc::clone(&bool_v1);
+                        self.proxy_subscription(bool_v1_clone, ws.0, subscription_str.clone()).await;
+                    }
+                    Err(err) => {
+                        trace!("Can't connect(无法连接): {}", err);
+                    }
+                };
+            } else {
+                match connect(request_url.clone()) {
+                    Ok(ws) => {
+                        let bool_v1_clone = Arc::clone(&bool_v1);
+                        self.subscription(bool_v1_clone, ws.0, subscription_str.clone()).await;
+                    }
+                    Err(err) => {
+                        // 连接失败时执行的操作
+                        trace!("Can't connect(无法连接): {}", err);
+                        // 返回一个默认的 WebSocket 对象或其他适当的值
+                        // 或者根据需要触发 panic 或返回错误信息
+                    }
+                };
+            }
+            trace!("退出来咯");
+
+            let bool_v1_clone = Arc::clone(&bool_v1);
+            let bool_v1_v = bool_v1_clone.load(Ordering::SeqCst);
+            if !bool_v1_v {
+                break;
+            }
+        }
+    }
+
+    //代理
+    async fn proxy_subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<ProxyAutoStream>, subscription: String)
     {
-        return BinanceSpotWs::new_label("default-BinanceSpotWs".to_string(), _is_colo, _login_param);
+        info!("走代理-链接成功!开始数据读取");
+        let lable = self.label.clone();
+        /*****订阅消息**/
+        if subscription.len() > 0 {
+            web_socket.write_message(Message::Text(subscription)).unwrap();
+        }
+        /*****消息溜***/
+        loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    // trace!("获取推送:{}",text.clone());
+                    let res_data = Self::ok_text(lable.to_string(), text);
+                    if res_data.code == "-200" {
+                        trace!("订阅成功:{:?}", res_data.data);
+                    } else {
+                        let sender = self.sender.clone();
+                        tokio::spawn(async move {
+                            sender.send(res_data).await.unwrap();
+                        });
+                        tokio::spawn(async move {});
+                    }
+                }
+                Ok(Message::Ping(s)) => {
+                    trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
+                    // let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
+                    trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
+                }
+                Ok(Message::Pong(s)) => {
+                    // trace!("Pong-响应--{:?}", String::from_utf8(s));
+                    trace!( "Pong-响应--{:?}", String::from_utf8(s.clone()));
+                }
+                Ok(Message::Close(_)) => {
+                    // trace!("socket 关闭: ");
+                    trace!("Close-响应");
+                }
+                Err(error) => {
+                    // trace!("Error receiving message: {}", error);
+                    trace!("Err-响应{}", error);
+                    break;
+                }
+                _ => {}
+            }
+
+            let bool_v1_v = bool_v1.load(Ordering::SeqCst);
+            if !bool_v1_v {
+                break;
+            }
+        }
+        web_socket.close(None).unwrap();
     }
 
+    //非代理
+    async fn subscription(&self, bool_v1: Arc<AtomicBool>, mut web_socket: WebSocket<AutoStream>, subscription: String)
+    {
+        info!("链接成功!开始数据读取");
+        let lable = self.label.clone();
+        /*****订阅消息**/
+        if subscription.len() > 0 {
+            web_socket.write_message(Message::Text(subscription.clone())).unwrap();
+        }
+        /*****消息溜***/
+        loop {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    // trace!("获取推送:{}",text.clone());
+                    let res_data = Self::ok_text(lable.to_string(), text);
+                    if res_data.code == "-200" {
+                        trace!("订阅成功:{:?}", subscription.clone());
+                    } else {
+                        let sender = self.sender.clone();
+                        tokio::spawn(async move {
+                            sender.send(res_data).await.unwrap();
+                        });
+                        tokio::spawn(async move {});
+                    }
+                }
+                Ok(Message::Ping(s)) => {
+                    trace!( "Ping-响应--{:?}", String::from_utf8(s.clone()));
+                    // let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
+                    trace!( "回应-pong---{:?}", String::from_utf8(s.clone()));
+                }
+                Ok(Message::Pong(s)) => {
+                    // trace!("Pong-响应--{:?}", String::from_utf8(s));
+                    trace!("Pong-响应--{:?}", String::from_utf8(s.clone()));
+                }
+                Ok(Message::Close(_)) => {
+                    // trace!("socket 关闭: ");
+                    trace!( "Close-响应");
+                }
+                Err(error) => {
+                    // trace!("Error receiving message: {}", error);
+                    trace!( "Err-响应{}", error);
+                    break;
+                }
+                _ => {}
+            }
+
+            let bool_v1_v = bool_v1.load(Ordering::SeqCst);
+            if !bool_v1_v {
+                break;
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+
+    //数据解析
+    pub fn ok_text(lable: String, text: String) -> ResponseData
+    {
+        // trace!("原始数据");
+        // trace!(?text);
+        let mut res_data = ResponseData::new(lable, "200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
-    pub fn new_label(_label: String, _is_colo: bool, _login_param: BTreeMap<String, String>) -> BinanceSpotWs {
-        BinanceSpotWs {}
+        if json_value.get("result").is_some() &&
+            json_value.get("id").is_some()
+        {
+            //订阅反馈
+            res_data.code = "-200".to_string();
+            res_data.channel = "".to_string();
+            res_data.message = "订阅成功!".to_string();
+        } else if json_value.get("e").is_some() &&
+            json_value["e"].as_str() == Option::from("aggTrade")
+        {
+            res_data.channel = "aggTrade".to_string();
+            res_data.data = text;
+        } else if json_value.get("u").is_some() &&
+            json_value.get("s").is_some() &&
+            json_value.get("b").is_some() &&
+            json_value.get("B").is_some() &&
+            json_value.get("a").is_some() &&
+            json_value.get("A").is_some()
+        {
+            res_data.channel = "bookTicker".to_string();
+            res_data.data = text;
+        } else if json_value.get("bids").is_some() &&
+            json_value.get("asks").is_some()
+        {
+            res_data.channel = "depth".to_string();
+            res_data.data = text;
+        } else {
+            res_data.channel = "未知的频道".to_string();
+        }
+        //
+        // if json_value.get("error").is_some() {//订阅返回
+        //     res_data.code = json_value["error"]["code"].to_string();
+        //     res_data.message = json_value["error"]["msg"].to_string();
+        // } else if json_value.get("stream").is_some() {//订阅返回
+        //     res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
+        //     res_data.code = "200".to_string();
+        //
+        //     let channel = format!("{}", json_value.get("stream").as_ref().unwrap());
+        //     if channel.contains("@aggTrade") {
+        //         res_data.channel = "aggTrade".to_string();
+        //     } else if channel.contains("@depth20@100ms") {
+        //         res_data.channel = "depth".to_string();
+        //     } else if channel.contains("@bookTicker") {
+        //         res_data.channel = "bookTicker".to_string();
+        //     } else {}
+        // } else {
+        //     res_data.data = text
+        // }
+        res_data
     }
 }

+ 2 - 2
exchanges/src/kucoin_swap_rest.rs

@@ -244,8 +244,8 @@ impl KucoinSwapRest {
         data
     }
     //批量测但
-    pub async fn cancel_order_s(&mut self, symbol: String) -> ResponseData {
-        let mut params = serde_json::json!({   });
+    pub async fn cancel_orders(&mut self, symbol: String) -> ResponseData {
+        let  params = serde_json::json!({   });
         let data = self.request("DELETE".to_string(),
                                 "/api/v1".to_string(),
                                 format!("/orders?symbol={}", symbol),