875428575@qq.com преди 2 години
родител
ревизия
0dfb8e71f7
променени са 1 файла, в които са добавени 127 реда и са изтрити 124 реда
  1. 127 124
      src/exchange_libs.rs

+ 127 - 124
src/exchange_libs.rs

@@ -539,143 +539,145 @@ impl SocketTool {
         }
     }
     pub(crate) fn run(&self, exc_name: String, parse_fn: ParseFn) {
-        /*****消息溜***/
-        let mut stdout = io::stdout();
-        let mut stderr = io::stderr();
-
-        /*****socket配置信息***/
-        let request_url = Url::parse(self.request_url.as_str()).unwrap();
-        let ip_array: Vec<&str> = self.ip.split(".").collect();
-        let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
-            ip_array[0].parse().unwrap(),
-            ip_array[1].parse().unwrap(),
-            ip_array[2].parse().unwrap(),
-            ip_array[3].parse().unwrap())
-        ), self.port);
-        let websocket_config = Some(WebSocketConfig {
-            max_send_queue: Some(16),
-            max_message_size: Some(16 * 1024 * 1024),
-            max_frame_size: Some(16 * 1024 * 1024),
-            accept_unmasked_frames: false,
-        });
-        let max_redirects = 5;
-        /*****判断代理IP是否为空,空则不走代理*****/
-        if self.ip.len() > 0 {
-            println!("----socket-走代理");
-            let (mut socket, response) =
-                connect_with_proxy(request_url, proxy_address, websocket_config, max_redirects)
-                    .expect("Can't connect(无法连接)");
-
-            /******登陆认证********/
-            if self.is_login {
-                println!("----需要登陆");
-
-                let login_json_str = self.log_in_to_str();
-                println!("---组装 登陆信息:{0}", login_json_str);
-                socket.write_message(Message::Text(login_json_str)).unwrap();
-                thread::sleep(Duration::from_secs(1));
-            } else {
-                println!("----no longin(不需要登陆)");
-            }
-
-            /******订阅信息********/
-            let sub_json = self.subscription.clone();
-            println!("--订阅内容:{:?}", sub_json);
-            let sub_json_str = sub_json.to_string();
-            writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
-            socket.write_message(Message::Text(sub_json_str))
-                .unwrap();
-
-            /******数据读取********/
-            loop {
-                if !socket.can_read() {
-                    continue;
+        while true {//一个粗糙的 断开重连操作
+            /*****消息溜***/
+            let mut stdout = io::stdout();
+            let mut stderr = io::stderr();
+
+            /*****socket配置信息***/
+            let request_url = Url::parse(self.request_url.as_str()).unwrap();
+            let ip_array: Vec<&str> = self.ip.split(".").collect();
+            let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                ip_array[0].parse().unwrap(),
+                ip_array[1].parse().unwrap(),
+                ip_array[2].parse().unwrap(),
+                ip_array[3].parse().unwrap())
+            ), self.port);
+            let websocket_config = Some(WebSocketConfig {
+                max_send_queue: Some(16),
+                max_message_size: Some(16 * 1024 * 1024),
+                max_frame_size: Some(16 * 1024 * 1024),
+                accept_unmasked_frames: false,
+            });
+            let max_redirects = 5;
+            /*****判断代理IP是否为空,空则不走代理*****/
+            if self.ip.len() > 0 {
+                println!("----socket-走代理");
+                let (mut socket, response) =
+                    connect_with_proxy(request_url, proxy_address, websocket_config, max_redirects)
+                        .expect("Can't connect(无法连接)");
+
+                /******登陆认证********/
+                if self.is_login {
+                    println!("----需要登陆");
+
+                    let login_json_str = self.log_in_to_str();
+                    println!("---组装 登陆信息:{0}", login_json_str);
+                    socket.write_message(Message::Text(login_json_str)).unwrap();
+                    thread::sleep(Duration::from_secs(1));
+                } else {
+                    println!("----no longin(不需要登陆)");
                 }
 
-                let msg = socket.read_message();
+                /******订阅信息********/
+                let sub_json = self.subscription.clone();
+                println!("--订阅内容:{:?}", sub_json);
+                let sub_json_str = sub_json.to_string();
+                writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
+                socket.write_message(Message::Text(sub_json_str))
+                    .unwrap();
+
+                /******数据读取********/
+                loop {
+                    if !socket.can_read() {
+                        continue;
+                    }
+
+                    let msg = socket.read_message();
 
-                match msg {
-                    Ok(Message::Text(text)) => {
-                        writeln!(stdout, "{:?}", text).unwrap();
-                        if exc_name == "binance" {
-                            SocketTool::hook_fn_binance(text, parse_fn)
-                        } else if exc_name == "okx" {
-                            SocketTool::hook_fn_okx(text, parse_fn)
+                    match msg {
+                        Ok(Message::Text(text)) => {
+                            writeln!(stdout, "{:?}", text).unwrap();
+                            if exc_name == "binance" {
+                                SocketTool::hook_fn_binance(text, parse_fn)
+                            } else if exc_name == "okx" {
+                                SocketTool::hook_fn_okx(text, parse_fn)
+                            }
                         }
+                        Ok(Message::Ping(_)) | Ok(Message::Pong(_)) | Ok(Message::Close(_)) => {
+                            socket.write_message(Message::text("pong"));
+                            // writeln!(stdout, "ping----------pong").unwrap();
+                            writeln!(stdout, "ping----------pong").unwrap();
+                        }
+                        Err(error) => {
+                            writeln!(stderr, "Error receiving message: {}", error).unwrap();
+                            break;
+                        }
+                        _ => {}
                     }
-                    Ok(Message::Ping(_)) | Ok(Message::Pong(_)) | Ok(Message::Close(_)) => {
-                        socket.write_message(Message::text("pong"));
-                        // writeln!(stdout, "ping----------pong").unwrap();
-                        writeln!(stdout, "ping----------pong").unwrap();
-                    }
-                    Err(error) => {
-                        writeln!(stderr, "Error receiving message: {}", error).unwrap();
-                        break;
-                    }
-                    _ => {}
                 }
-            }
 
-            socket.close(None).unwrap();
-        } else {
-            // 提示,并未找到好的优化方式,
-            println!("----socket-没代理");
-            let (mut socket, response) =
-                connect(request_url)
-                    .expect("Can't connect(无法连接)");
-
-            /******登陆认证********/
-            if self.is_login {
-                println!("----需要登陆");
-
-                let login_json_str = self.log_in_to_str();
-                println!("---组装 登陆信息:{0}", login_json_str);
-                socket.write_message(Message::Text(login_json_str)).unwrap();
-                thread::sleep(Duration::from_secs(1));
+                socket.close(None).unwrap();
             } else {
-                println!("----no longin(不需要登陆)");
-            }
-
-            /******订阅信息********/
-            let sub_json = self.subscription.clone();
-            println!("--订阅内容:{:?}", sub_json);
-            let sub_json_str = sub_json.to_string();
-            writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
-            socket.write_message(Message::Text(sub_json_str))
-                .unwrap();
-
-            /******数据读取********/
-            loop {
-                if !socket.can_read() {
-                    continue;
+                // 提示,并未找到好的优化方式,
+                println!("----socket-没代理");
+                let (mut socket, response) =
+                    connect(request_url)
+                        .expect("Can't connect(无法连接)");
+
+                /******登陆认证********/
+                if self.is_login {
+                    println!("----需要登陆");
+
+                    let login_json_str = self.log_in_to_str();
+                    println!("---组装 登陆信息:{0}", login_json_str);
+                    socket.write_message(Message::Text(login_json_str)).unwrap();
+                    thread::sleep(Duration::from_secs(1));
+                } else {
+                    println!("----no longin(不需要登陆)");
                 }
 
-                let msg = socket.read_message();
+                /******订阅信息********/
+                let sub_json = self.subscription.clone();
+                println!("--订阅内容:{:?}", sub_json);
+                let sub_json_str = sub_json.to_string();
+                writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
+                socket.write_message(Message::Text(sub_json_str))
+                    .unwrap();
+
+                /******数据读取********/
+                loop {
+                    if !socket.can_read() {
+                        continue;
+                    }
 
-                match msg {
-                    Ok(Message::Text(text)) => {
-                        writeln!(stdout, "{:?}", text).unwrap();
-                        println!("??????????---{0}--", exc_name);
-                        if exc_name == "binance" {
-                            SocketTool::hook_fn_binance(text, parse_fn)
-                        } else if exc_name == "okx" {
-                            SocketTool::hook_fn_okx(text, parse_fn)
+                    let msg = socket.read_message();
+
+                    match msg {
+                        Ok(Message::Text(text)) => {
+                            writeln!(stdout, "{:?}", text).unwrap();
+                            println!("??????????---{0}--", exc_name);
+                            if exc_name == "binance" {
+                                SocketTool::hook_fn_binance(text, parse_fn)
+                            } else if exc_name == "okx" {
+                                SocketTool::hook_fn_okx(text, parse_fn)
+                            }
                         }
+                        Ok(Message::Ping(_)) | Ok(Message::Pong(_)) | Ok(Message::Close(_)) => {
+                            socket.write_message(Message::Pong(vec![]))
+                                .unwrap();
+                            writeln!(stdout, "ping----------pong").unwrap();
+                        }
+                        Err(error) => {
+                            writeln!(stderr, "Error receiving message: {}", error).unwrap();
+                            break;
+                        }
+                        _ => {}
                     }
-                    Ok(Message::Ping(_)) | Ok(Message::Pong(_)) | Ok(Message::Close(_)) => {
-                        socket.write_message(Message::Pong(vec![]))
-                            .unwrap();
-                        writeln!(stdout, "ping----------pong").unwrap();
-                    }
-                    Err(error) => {
-                        writeln!(stderr, "Error receiving message: {}", error).unwrap();
-                        break;
-                    }
-                    _ => {}
                 }
-            }
 
-            socket.close(None).unwrap();
+                socket.close(None).unwrap();
+            }
         }
     }
     fn log_in_to_str(&self) -> String {
@@ -736,8 +738,9 @@ impl SocketTool {
         SocketTool::binance_run(b_array, "kline_1s".to_string(), parse_fn);
     }
     //币安--深度信息
-    pub fn binance_run_depth(b_array: Vec<&str>, parse_fn: ParseFn) {
-        SocketTool::binance_run(b_array, "depth@100ms".to_string(), parse_fn);
+    pub fn binance_run_depth(b_array: Vec<&str>, levels: String, parse_fn: ParseFn) {
+        let str = format!("depth{}@100ms", levels);
+        SocketTool::binance_run(b_array, str.to_string(), parse_fn);
     }
     //币安--订阅
     pub fn binance_run(b_array: Vec<&str>, subscription_name: String, parse_fn: ParseFn) {