Quellcode durchsuchen

币安ws深度示例

875428575@qq.com vor 2 Jahren
Ursprung
Commit
27caf622e7

+ 3 - 1
exchanges/Cargo.toml

@@ -12,4 +12,6 @@ tungstenite = { git = "https://github.com/PrivateRookie/tungstenite-rs.git", rev
 url = "2.4.0"
 base64 = "0.13"
 tokio = { version = "1.31.0", features = ["full"] }
-chrono = "0.4.26"
+chrono = "0.4.26"
+hex = "0.4"
+ring = "0.17.0-alpha.11"

+ 322 - 0
exchanges/src/binance_usdt_swap_ws.rs

@@ -0,0 +1,322 @@
+use std::collections::BTreeMap;
+use std::future::Future;
+use std::{io, thread};
+use std::io::{Read, Write};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::sync::Arc;
+use std::time::Duration;
+use chrono::Utc;
+use ring::hmac;
+use serde_json::json;
+use tokio::sync::Mutex;
+use crate::proxy;
+use crate::response_base::ResponseData;
+use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
+use tungstenite::{connect, Message, WebSocket};
+use tungstenite::http::Response;
+use tungstenite::protocol::WebSocketConfig;
+use tungstenite::stream::Stream;
+use url::Url;
+use crate::binance_ws::BinanceExc;
+
+
+pub struct BinanceUsdtSwapWs {
+    /*******参数*/
+    //连接地址
+    request_url: String,
+    //ip
+    ip: String,
+    //端口
+    port: u16,
+    //是否需要登陆
+    is_login: bool,
+    //登陆所需参数
+    login_param: BTreeMap<String, String>,
+}
+
+impl BinanceUsdtSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool,
+               is_login: bool,
+               login_param: BTreeMap<String, String>) -> BinanceUsdtSwapWs
+    {
+        let mut base_url = String::from("");
+        if is_colo {
+            println!("不支持colo高速线路");
+            base_url = "wss://stream.binance.com:443/ws".to_string()
+        } else {
+            base_url = "wss://stream.binance.com:443/ws".to_string()
+        }
+        let mut ip_v = "";
+        let mut port_v = 8080;
+        let mut v_str = String::from("");
+
+        /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+        let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+        if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
+            ip_v = parsing_detail.ip_address.as_str();
+            port_v = parsing_detail.port.parse().unwrap();
+        }
+
+        /*****返回结构体*******/
+        BinanceUsdtSwapWs {
+            request_url: base_url,
+            ip: ip_v.clone().to_string(),
+            port: port_v,
+            is_login,
+            login_param,
+        }
+    }
+
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //深度-不需要认证
+    pub(crate) fn kline<F, Fut>(&self, b_array: Vec<&str>, parse_fn: F)
+        where
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+    {
+        //订阅信息拼接
+        let mut params = vec![];
+        for item in &b_array {
+            let mut b_name = item.to_lowercase();
+            b_name = format!("{}@{}", b_name, "kline_1s");
+            params.push(b_name);
+        }
+        self.run(b_array, params, parse_fn);
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    fn log_in_to_str(&self) -> String {
+        let login_param = self.login_param.clone();
+        let mut login_json_str = String::from("");
+
+
+        //解析并且组装 认证信息
+        let lable = login_param.get("lable");
+        if let Some(ref_string) = lable {
+            if *ref_string == "binance" {
+                //println!("----币安 暂不做登陆");
+            } else if *ref_string == "okx" {
+                let mut access_key: String = "".to_string();
+                let mut secret_key: String = "".to_string();
+                let mut passphrase: String = "".to_string();
+
+                for (key, value) in &login_param {
+                    // //println!("Key: {}, Value: {}", key, value);
+                    if key == "access_key" {
+                        access_key = value.parse().unwrap();
+                    } else if key == "secret_key" {
+                        secret_key = value.parse().unwrap();
+                    } else if key == "passphrase" {
+                        passphrase = value.parse().unwrap();
+                    }
+                }
+                let timestamp = Utc::now().timestamp().to_string();
+
+                // 时间戳 + 请求类型+ 请求参数字符串
+                let message = format!("{}GET{}", timestamp, "/users/self/verify");
+                // //println!("---message:{:?}", message);
+                let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+                let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+                let mut sign = base64::encode(result);
+
+                let login_json = json!({
+                              "op": "login",
+                                "args": [{
+                                "apiKey": access_key,
+                                "passphrase": passphrase,
+                                "timestamp": timestamp,
+                                "sign": sign  }]
+                        });
+
+                // //println!("---login_json:{0}", login_json.to_string());
+                // //println!("--登陆:{:?}", login_json);
+                login_json_str = login_json.to_string();
+            }
+        } else {
+            //println!("Option is None(lable 为None)");
+        }
+        login_json_str
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    fn run<F, Fut>(&self, b_array: Vec<&str>, params: Vec<String>, parse_fn: F)
+        where
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+    {
+        //订阅信息组装
+        let subscription = json!({
+        "method": "SUBSCRIBE",
+        "params":params,
+                "id": 1
+        });
+
+
+        let parse_fn_arc = Arc::new(Mutex::new(parse_fn)); // Wrap the closure in an Arc<Mutex<_>>
+        loop {
+            let request_url = Url::parse(self.request_url.as_str()).unwrap();
+            //1. 判断是否需要代理,根据代理地址是否存来选择
+            if self.ip.len() > 0 {
+                let ip_array: Vec<&str> = self.ip.split(".").collect();
+                let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                    ip_array[0].parse().unwrap(),
+                    ip_array[1].parse().unwrap(),
+                    ip_array[2].parse().unwrap(),
+                    ip_array[3].parse().unwrap())
+                ), self.port);
+                let websocket_config = Some(WebSocketConfig {
+                    max_send_queue: Some(16),
+                    max_message_size: Some(16 * 1024 * 1024),
+                    max_frame_size: Some(16 * 1024 * 1024),
+                    accept_unmasked_frames: false,
+                });
+                let max_redirects = 5;
+                // let (mut socket, response) =
+                let mut proxy_ws =
+                    connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects)
+                        .expect("Can't connect(无法连接)");
+                self.proxy_subscription(proxy_ws.0, &subscription, &parse_fn_arc);
+            } else {
+                // let (mut socket, response) =
+                let mut no_proxy_ws =
+                    connect(request_url.clone())
+                        .expect("Can't connect(无法连接)");
+                self.subscription(no_proxy_ws.0, &subscription, &parse_fn_arc);
+            }
+        }
+    }
+
+    //代理
+    fn proxy_subscription<F, Fut>(&self, mut web_socket: WebSocket<ProxyAutoStream>,
+                                  subscription: &serde_json::Value,
+                                  parse_fn: &Arc<Mutex<F>>,
+    )
+        where
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+
+    {
+        /*****消息溜***/
+        let mut stdout = io::stdout();
+        let mut stderr = io::stderr();
+        /*****是否需要登陆****/
+        if self.is_login {
+            println!("----需要登陆");
+            // let login_json_str = self.log_in_to_str();
+            // web_socket.write_message(Message::Text(login_json_str)).unwrap();
+            // thread::sleep(Duration::from_secs(1));
+        } else {
+            println!("----no longin(不需要登陆)");
+        }
+        /******订阅信息********/
+        let sub_json = subscription.clone();
+        println!("--订阅内容:{:?}", sub_json);
+        let sub_json_str = sub_json.to_string();
+        web_socket.write_message(Message::Text(sub_json_str))
+            .unwrap();
+        loop {
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    let req_data = Self::ok_text(text);
+                    writeln!(stdout, "Pong-响应--{:?}", req_data).unwrap();
+                    let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
+                    tokio::spawn(async move {
+                        let parse_fn_lock = parse_fn_clone.lock().await;
+                        parse_fn_lock(req_data).await;
+                    });
+                }
+                Ok(Message::Ping(s)) => {
+                    println!("Ping-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Pong(s)) => {
+                    println!("Pong-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Close(_)) => {
+                    println!("socket 关闭: ");
+                }
+                Err(error) => {
+                    println!("Error receiving message: {}", error);
+                    break;
+                }
+                _ => {}
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+    //非代理
+    fn subscription<F, Fut>(&self, mut web_socket: WebSocket<AutoStream>,
+                            subscription: &serde_json::Value,
+                            parse_fn: &Arc<Mutex<F>>,
+    )
+        where
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+    {
+        /*****消息溜***/
+        let mut stdout = io::stdout();
+        let mut stderr = io::stderr();
+        /*****是否需要登陆****/
+        if self.is_login {
+            println!("----需要登陆");
+            // let login_json_str = self.log_in_to_str();
+            // web_socket.write_message(Message::Text(login_json_str)).unwrap();
+            // thread::sleep(Duration::from_secs(1));
+        } else {
+            println!("----no longin(不需要登陆)");
+        }
+        /******订阅信息********/
+        let sub_json = subscription.clone();
+        println!("--订阅内容:{:?}", sub_json);
+        let sub_json_str = sub_json.to_string();
+        web_socket.write_message(Message::Text(sub_json_str))
+            .unwrap();
+        loop {
+            let msg = web_socket.read_message();
+            match msg {
+                Ok(Message::Text(text)) => {
+                    let req_data = Self::ok_text(text);
+                    writeln!(stdout, "Pong-响应--{:?}", req_data).unwrap();
+                    let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
+                    tokio::spawn(async move {
+                        let parse_fn_lock = parse_fn_clone.lock().await;
+                        parse_fn_lock(req_data).await;
+                    });
+                }
+                Ok(Message::Ping(s)) => {
+                    println!("Ping-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Pong(s)) => {
+                    println!("Pong-响应--{:?}", String::from_utf8(s));
+                }
+                Ok(Message::Close(_)) => {
+                    println!("socket 关闭: ");
+                }
+                Err(error) => {
+                    println!("Error receiving message: {}", error);
+                    break;
+                }
+                _ => {}
+            }
+        }
+        web_socket.close(None).unwrap();
+    }
+
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        println!("数据组装{:?}", text);
+        let res_data = ResponseData::new("0".to_string(), "success".to_string(), text);
+        res_data
+    }
+}

+ 67 - 12
exchanges/src/binance_ws.rs

@@ -1,22 +1,77 @@
-use crate::hook_base::ExcHook;
+use std::collections::BTreeMap;
+use std::future::Future;
+use std::io;
+use std::io::Stdout;
+use serde_json::json;
+use crate::response_base::ResponseData;
+use crate::ws_tool::SocketTool;
 
-pub struct BinanceExc {}
+pub struct BinanceExc {
+    url: String,
+}
 
-impl ExcHook for BinanceExc {
-    fn ok_text() {
-        // todo!()
-        println!("ok_text")
+impl BinanceExc {
+    pub fn new(is_colo: bool) -> BinanceExc {
+        let mut base_url = String::from("");
+        if is_colo {
+            base_url = "wss://stream.binance.com:443/ws".to_string()
+        } else {
+            base_url = "wss://stream.binance.com:443/ws".to_string()
+        }
+        BinanceExc { url: base_url }
     }
 
-    fn ok_ping() {
-        // todo!()
-        println!("ok_ping")
+    //深度-不需要认证
+    pub(crate) fn binance_kline<F, Fut>(&self, b_array: Vec<&str>, parse_fn: F)
+        where
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+    {
+        //订阅信息拼接
+        let mut params = vec![];
+        for item in &b_array {
+            let mut b_name = item.to_lowercase();
+            b_name = format!("{}@{}", b_name, "kline_1s");
+            params.push(b_name);
+        }
+        self.binance_run(b_array, params, parse_fn);
+    }
+
+    fn binance_run<F, Fut>(&self, b_array: Vec<&str>, params: Vec<String>, parse_fn: F)
+        where
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+    {
 
+        // 币安----socket
+        let subscription = json!({
+        "method": "SUBSCRIBE",
+        "params":params,
+                "id": 1
+        });
+        let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+        btree_map.insert("lable".parse().unwrap(), "binance".parse().unwrap());//产品Id
+        let biance_socke = SocketTool::new("wss://stream.binance.com:443/ws",
+                                           false,
+                                           btree_map,
+                                           subscription,
+        );
+        biance_socke.run(Self::ok_text,
+                         parse_fn);
     }
 
-    fn ok_pong() {
-        // todo!()
-        println!("ok_pong")
 
+    fn ok_text(text: String) -> ResponseData
+    {
+        println!("数据组装{:?}",text);
+        let res_data = ResponseData::new("0".to_string(), "success".to_string(), text);
+        res_data
+    }
+
+    fn ok_ping(s: Vec<u8>) {
+        println!("ok_ping{:?}", s)
+    }
+    fn ok_pong(s: Vec<u8>) {
+        println!("ok_pong{:?}", s)
     }
 }

+ 0 - 7
exchanges/src/hook_base.rs

@@ -1,7 +0,0 @@
-/*****每个交易行,返回数据格式,ping方式都不同*/
-
-pub trait ExcHook {
-    fn ok_text();
-    fn ok_ping();
-    fn ok_pong();
-}

+ 87 - 38
exchanges/src/ws_tool.rs

@@ -3,9 +3,10 @@ use std::future::Future;
 use std::{io, thread};
 use std::io::{Read, Write};
 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::time::Duration;
 use chrono::Utc;
+use tokio::sync::Mutex;
 use crate::proxy;
 use crate::response_base::ResponseData;
 use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
@@ -16,7 +17,6 @@ use tungstenite::stream::Stream;
 use url::Url;
 
 /**socket请求工具*/
-
 pub struct SocketTool {
     /*******参数*/
     //连接地址
@@ -35,6 +35,7 @@ pub struct SocketTool {
     /*******响应钩子*/
 }
 
+
 impl SocketTool {
     //装入配置获取
     pub fn new(request_url: &str,
@@ -64,52 +65,81 @@ impl SocketTool {
         }
     }
 
-    pub fn run(&self) {
-        let request_url = Url::parse(self.request_url.as_str()).unwrap();
-        //1. 判断是否需要代理,根据代理地址是否存来选择
-        if self.ip.len() > 0 {
-            let ip_array: Vec<&str> = self.ip.split(".").collect();
-            let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
-                ip_array[0].parse().unwrap(),
-                ip_array[1].parse().unwrap(),
-                ip_array[2].parse().unwrap(),
-                ip_array[3].parse().unwrap())
-            ), self.port);
-            let websocket_config = Some(WebSocketConfig {
-                max_send_queue: Some(16),
-                max_message_size: Some(16 * 1024 * 1024),
-                max_frame_size: Some(16 * 1024 * 1024),
-                accept_unmasked_frames: false,
-            });
-            let max_redirects = 5;
-            // let (mut socket, response) =
-            let mut proxy_ws =
-                connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects)
-                    .expect("Can't connect(无法连接)");
-            SocketTool::proxy_subscription(self, proxy_ws.0);
-        } else {
-            // let (mut socket, response) =
-            let mut no_proxy_ws =
-                connect(request_url.clone())
-                    .expect("Can't connect(无法连接)");
-            SocketTool::subscription(self, no_proxy_ws.0)
+    pub fn run<OkText, F, Fut>(&self,
+                               ok_text: OkText,
+                               parse_fn: F)
+        where
+            OkText: Fn(String) -> ResponseData + 'static + Send + Sync,
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+    {
+        let parse_fn_arc = Arc::new(Mutex::new(parse_fn)); // Wrap the closure in an Arc<Mutex<_>>
+        loop {
+            let request_url = Url::parse(self.request_url.as_str()).unwrap();
+            //1. 判断是否需要代理,根据代理地址是否存来选择
+            if self.ip.len() > 0 {
+                let ip_array: Vec<&str> = self.ip.split(".").collect();
+                let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                    ip_array[0].parse().unwrap(),
+                    ip_array[1].parse().unwrap(),
+                    ip_array[2].parse().unwrap(),
+                    ip_array[3].parse().unwrap())
+                ), self.port);
+                let websocket_config = Some(WebSocketConfig {
+                    max_send_queue: Some(16),
+                    max_message_size: Some(16 * 1024 * 1024),
+                    max_frame_size: Some(16 * 1024 * 1024),
+                    accept_unmasked_frames: false,
+                });
+                let max_redirects = 5;
+                // let (mut socket, response) =
+                let mut proxy_ws =
+                    connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects)
+                        .expect("Can't connect(无法连接)");
+                Self::proxy_subscription(self, proxy_ws.0, &ok_text, &parse_fn_arc);
+            } else {
+                // let (mut socket, response) =
+                let mut no_proxy_ws =
+                    connect(request_url.clone())
+                        .expect("Can't connect(无法连接)");
+                Self::subscription(self, no_proxy_ws.0, &ok_text, &parse_fn_arc);
+            }
         }
     }
 
     //代理-推送
-    pub fn proxy_subscription(socket_tool: &SocketTool, mut web_socket: WebSocket<ProxyAutoStream>) {
+    fn proxy_subscription<OkText, F, Fut>(socket_tool: &SocketTool,
+                                          mut web_socket: WebSocket<ProxyAutoStream>,
+                                          ok_text: &OkText,
+                                          parse_fn: &Arc<Mutex<F>>,
+    )
+        where
+            OkText: Fn(String) -> ResponseData + 'static + Send + Sync,
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+
+    {
+        /*****消息溜***/
+        let mut stdout = io::stdout();
+        let mut stderr = io::stderr();
         /******订阅信息********/
         let sub_json = socket_tool.subscription.clone();
         println!("--订阅内容:{:?}", sub_json);
         let sub_json_str = sub_json.to_string();
         web_socket.write_message(Message::Text(sub_json_str))
             .unwrap();
-        //读取信息-数据解析
         loop {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
-                    println!("???{:?}", text)
+                    let req_data = ok_text(text);
+                    writeln!(stdout, "Pong-响应--{:?}", req_data).unwrap();
+                    let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
+                    tokio::spawn(async move {
+                        let parse_fn_lock = parse_fn_clone.lock().await;
+                        parse_fn_lock(req_data).await;
+                    });
+
                 }
                 Ok(Message::Ping(s)) => {
                     println!("Ping-响应--{:?}", String::from_utf8(s));
@@ -127,21 +157,39 @@ impl SocketTool {
                 _ => {}
             }
         }
+        web_socket.close(None).unwrap();
     }
     //非代理-推送
-    pub fn subscription(socket_tool: &SocketTool, mut web_socket: WebSocket<AutoStream>) {
+    fn subscription<OkText, F, Fut>(socket_tool: &SocketTool, mut web_socket: WebSocket<AutoStream>,
+                                    ok_text: &OkText,
+                                    parse_fn: &Arc<Mutex<F>>,
+    )
+        where
+            OkText: Fn(String) -> ResponseData + 'static + Send + Sync,
+            F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
+            Fut: Future<Output=()> + Send + 'static,
+    {
+        /*****消息溜***/
+        let mut stdout = io::stdout();
+        let mut stderr = io::stderr();
         /******订阅信息********/
         let sub_json = socket_tool.subscription.clone();
         println!("--订阅内容:{:?}", sub_json);
         let sub_json_str = sub_json.to_string();
         web_socket.write_message(Message::Text(sub_json_str))
             .unwrap();
-        //读取信息-数据解析
         loop {
             let msg = web_socket.read_message();
             match msg {
                 Ok(Message::Text(text)) => {
-                    println!("???{:?}", text)
+                    let req_data = ok_text(text);
+                    writeln!(stdout, "Pong-响应--{:?}", req_data).unwrap();
+                    let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
+                    tokio::spawn(async move {
+                        let parse_fn_lock = parse_fn_clone.lock().await;
+                        parse_fn_lock(req_data).await;
+                    });
+
                 }
                 Ok(Message::Ping(s)) => {
                     println!("Ping-响应--{:?}", String::from_utf8(s));
@@ -159,10 +207,11 @@ impl SocketTool {
                 _ => {}
             }
         }
+        web_socket.close(None).unwrap();
     }
 
 
-    //访问
+    //访问---测试示例
     pub fn run_lz(&self)
     {
         /*****消息溜***/