|
|
@@ -1,294 +0,0 @@
|
|
|
-use std::collections::BTreeMap;
|
|
|
-use std::future::Future;
|
|
|
-use std::{io, thread};
|
|
|
-use std::io::{Read, Write};
|
|
|
-use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
|
-use std::sync::Arc;
|
|
|
-use std::time::Duration;
|
|
|
-use chrono::Utc;
|
|
|
-use tokio::sync::Mutex;
|
|
|
-use crate::proxy;
|
|
|
-use crate::response_base::ResponseData;
|
|
|
-use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
|
|
|
-use tungstenite::{connect, Message, WebSocket};
|
|
|
-use tungstenite::http::Response;
|
|
|
-use tungstenite::protocol::WebSocketConfig;
|
|
|
-use tungstenite::stream::Stream;
|
|
|
-use url::Url;
|
|
|
-
|
|
|
-/**socket请求工具*/
|
|
|
-pub struct SocketTool {
|
|
|
- /*******参数*/
|
|
|
- //连接地址
|
|
|
- request_url: String,
|
|
|
- //ip
|
|
|
- ip: String,
|
|
|
- //端口
|
|
|
- port: u16,
|
|
|
- //是否需要登陆
|
|
|
- is_login: bool,
|
|
|
- //登陆所需参数
|
|
|
- login_param: BTreeMap<String, String>,
|
|
|
- //订阅参数
|
|
|
- subscription: serde_json::Value,
|
|
|
-
|
|
|
- /*******响应钩子*/
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-impl SocketTool {
|
|
|
- //装入配置获取
|
|
|
- pub fn new(request_url: &str,
|
|
|
- is_login: bool,
|
|
|
- login_param: BTreeMap<String, String>,
|
|
|
- subscription: serde_json::Value) -> SocketTool
|
|
|
- {
|
|
|
- let mut ip_v = "";
|
|
|
- let mut port_v = 8080;
|
|
|
- let mut v_str = String::from("");
|
|
|
-
|
|
|
- /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
|
|
|
- let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
|
|
|
- if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
|
|
|
- ip_v = parsing_detail.ip_address.as_str();
|
|
|
- port_v = parsing_detail.port.parse().unwrap();
|
|
|
- }
|
|
|
-
|
|
|
- /*****返回结构体*******/
|
|
|
- SocketTool {
|
|
|
- request_url: request_url.to_string(),
|
|
|
- ip: ip_v.clone().to_string(),
|
|
|
- port: port_v,
|
|
|
- is_login,
|
|
|
- login_param,
|
|
|
- subscription,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- pub fn run<OkText, F, Fut>(&self,
|
|
|
- ok_text: OkText,
|
|
|
- parse_fn: F)
|
|
|
- where
|
|
|
- OkText: Fn(String) -> ResponseData + 'static + Send + Sync,
|
|
|
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
|
|
|
- Fut: Future<Output=()> + Send + 'static,
|
|
|
- {
|
|
|
- let parse_fn_arc = Arc::new(Mutex::new(parse_fn)); // Wrap the closure in an Arc<Mutex<_>>
|
|
|
- loop {
|
|
|
- let request_url = Url::parse(self.request_url.as_str()).unwrap();
|
|
|
- //1. 判断是否需要代理,根据代理地址是否存来选择
|
|
|
- if self.ip.len() > 0 {
|
|
|
- let ip_array: Vec<&str> = self.ip.split(".").collect();
|
|
|
- let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
|
|
|
- ip_array[0].parse().unwrap(),
|
|
|
- ip_array[1].parse().unwrap(),
|
|
|
- ip_array[2].parse().unwrap(),
|
|
|
- ip_array[3].parse().unwrap())
|
|
|
- ), self.port);
|
|
|
- let websocket_config = Some(WebSocketConfig {
|
|
|
- max_send_queue: Some(16),
|
|
|
- max_message_size: Some(16 * 1024 * 1024),
|
|
|
- max_frame_size: Some(16 * 1024 * 1024),
|
|
|
- accept_unmasked_frames: false,
|
|
|
- });
|
|
|
- let max_redirects = 5;
|
|
|
- // let (mut socket, response) =
|
|
|
- let mut proxy_ws =
|
|
|
- connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects)
|
|
|
- .expect("Can't connect(无法连接)");
|
|
|
- Self::proxy_subscription(self, proxy_ws.0, &ok_text, &parse_fn_arc);
|
|
|
- } else {
|
|
|
- // let (mut socket, response) =
|
|
|
- let mut no_proxy_ws =
|
|
|
- connect(request_url.clone())
|
|
|
- .expect("Can't connect(无法连接)");
|
|
|
- Self::subscription(self, no_proxy_ws.0, &ok_text, &parse_fn_arc);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //代理-推送
|
|
|
- fn proxy_subscription<OkText, F, Fut>(socket_tool: &SocketTool,
|
|
|
- mut web_socket: WebSocket<ProxyAutoStream>,
|
|
|
- ok_text: &OkText,
|
|
|
- parse_fn: &Arc<Mutex<F>>,
|
|
|
- )
|
|
|
- where
|
|
|
- OkText: Fn(String) -> ResponseData + 'static + Send + Sync,
|
|
|
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
|
|
|
- Fut: Future<Output=()> + Send + 'static,
|
|
|
-
|
|
|
- {
|
|
|
- /*****消息溜***/
|
|
|
- let mut stdout = io::stdout();
|
|
|
- let mut stderr = io::stderr();
|
|
|
- /******订阅信息********/
|
|
|
- let sub_json = socket_tool.subscription.clone();
|
|
|
- println!("--订阅内容:{:?}", sub_json);
|
|
|
- let sub_json_str = sub_json.to_string();
|
|
|
- web_socket.write_message(Message::Text(sub_json_str))
|
|
|
- .unwrap();
|
|
|
- loop {
|
|
|
- let msg = web_socket.read_message();
|
|
|
- match msg {
|
|
|
- Ok(Message::Text(text)) => {
|
|
|
- let req_data = ok_text(text);
|
|
|
- writeln!(stdout, "Pong-响应--{:?}", req_data).unwrap();
|
|
|
- let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
|
|
|
- tokio::spawn(async move {
|
|
|
- let parse_fn_lock = parse_fn_clone.lock().await;
|
|
|
- parse_fn_lock(req_data).await;
|
|
|
- });
|
|
|
-
|
|
|
- }
|
|
|
- Ok(Message::Ping(s)) => {
|
|
|
- println!("Ping-响应--{:?}", String::from_utf8(s));
|
|
|
- }
|
|
|
- Ok(Message::Pong(s)) => {
|
|
|
- println!("Pong-响应--{:?}", String::from_utf8(s));
|
|
|
- }
|
|
|
- Ok(Message::Close(_)) => {
|
|
|
- println!("socket 关闭: ");
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- println!("Error receiving message: {}", error);
|
|
|
- break;
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- }
|
|
|
- }
|
|
|
- web_socket.close(None).unwrap();
|
|
|
- }
|
|
|
- //非代理-推送
|
|
|
- fn subscription<OkText, F, Fut>(socket_tool: &SocketTool, mut web_socket: WebSocket<AutoStream>,
|
|
|
- ok_text: &OkText,
|
|
|
- parse_fn: &Arc<Mutex<F>>,
|
|
|
- )
|
|
|
- where
|
|
|
- OkText: Fn(String) -> ResponseData + 'static + Send + Sync,
|
|
|
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
|
|
|
- Fut: Future<Output=()> + Send + 'static,
|
|
|
- {
|
|
|
- /*****消息溜***/
|
|
|
- let mut stdout = io::stdout();
|
|
|
- let mut stderr = io::stderr();
|
|
|
- /******订阅信息********/
|
|
|
- let sub_json = socket_tool.subscription.clone();
|
|
|
- println!("--订阅内容:{:?}", sub_json);
|
|
|
- let sub_json_str = sub_json.to_string();
|
|
|
- web_socket.write_message(Message::Text(sub_json_str))
|
|
|
- .unwrap();
|
|
|
- loop {
|
|
|
- let msg = web_socket.read_message();
|
|
|
- match msg {
|
|
|
- Ok(Message::Text(text)) => {
|
|
|
- let req_data = ok_text(text);
|
|
|
- writeln!(stdout, "Pong-响应--{:?}", req_data).unwrap();
|
|
|
- let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
|
|
|
- tokio::spawn(async move {
|
|
|
- let parse_fn_lock = parse_fn_clone.lock().await;
|
|
|
- parse_fn_lock(req_data).await;
|
|
|
- });
|
|
|
-
|
|
|
- }
|
|
|
- Ok(Message::Ping(s)) => {
|
|
|
- println!("Ping-响应--{:?}", String::from_utf8(s));
|
|
|
- }
|
|
|
- Ok(Message::Pong(s)) => {
|
|
|
- println!("Pong-响应--{:?}", String::from_utf8(s));
|
|
|
- }
|
|
|
- Ok(Message::Close(_)) => {
|
|
|
- println!("socket 关闭: ");
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- println!("Error receiving message: {}", error);
|
|
|
- break;
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- }
|
|
|
- }
|
|
|
- web_socket.close(None).unwrap();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- //访问---测试示例
|
|
|
- pub fn run_lz(&self)
|
|
|
- {
|
|
|
- /*****消息溜***/
|
|
|
- let mut stdout = io::stdout();
|
|
|
- let mut stderr = io::stderr();
|
|
|
-
|
|
|
- /*****socket配置信息***/
|
|
|
- let request_url = Url::parse(self.request_url.as_str()).unwrap();
|
|
|
- // let parse_fn = Arc::new(Mutex::new(parse_fn)); // Wrap the closure in an Arc<Mutex<_>>
|
|
|
- let login_param = self.login_param.clone();
|
|
|
- let lable = login_param.get("lable").unwrap().clone();
|
|
|
-
|
|
|
- /*****判断代理IP是否为空,空则不走代理*****/
|
|
|
- if self.ip.len() > 0 {
|
|
|
- //println!("----socket-走代理");
|
|
|
- let ip_array: Vec<&str> = self.ip.split(".").collect();
|
|
|
- let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
|
|
|
- ip_array[0].parse().unwrap(),
|
|
|
- ip_array[1].parse().unwrap(),
|
|
|
- ip_array[2].parse().unwrap(),
|
|
|
- ip_array[3].parse().unwrap())
|
|
|
- ), self.port);
|
|
|
- let websocket_config = Some(WebSocketConfig {
|
|
|
- max_send_queue: Some(16),
|
|
|
- max_message_size: Some(16 * 1024 * 1024),
|
|
|
- max_frame_size: Some(16 * 1024 * 1024),
|
|
|
- accept_unmasked_frames: false,
|
|
|
- });
|
|
|
- let max_redirects = 5;
|
|
|
- let (mut socket, response) =
|
|
|
- connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects)
|
|
|
- .expect("Can't connect(无法连接)");
|
|
|
-
|
|
|
- /******登陆认证********/
|
|
|
- // if self.is_login {
|
|
|
- // //println!("----需要登陆");
|
|
|
- // let login_json_str = SocketTool::log_in_to_str(login_param);
|
|
|
- // // //println!("---组装 登陆信息:{0}", login_json_str);
|
|
|
- // socket.write_message(Message::Text(login_json_str)).unwrap();
|
|
|
- // thread::sleep(Duration::from_secs(1));
|
|
|
- // } else {
|
|
|
- // //println!("----no longin(不需要登陆)");
|
|
|
- // }
|
|
|
-
|
|
|
- /******订阅信息********/
|
|
|
- let sub_json = self.subscription.clone();
|
|
|
- println!("--订阅内容:{:?}", sub_json);
|
|
|
- let sub_json_str = sub_json.to_string();
|
|
|
- // writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
|
|
|
- socket.write_message(Message::Text(sub_json_str))
|
|
|
- .unwrap();
|
|
|
-
|
|
|
- /******数据读取********/
|
|
|
- loop {
|
|
|
- let msg = socket.read_message();
|
|
|
- //数据解析
|
|
|
- match msg {
|
|
|
- Ok(Message::Text(text)) => {
|
|
|
- println!("???{:?}", text)
|
|
|
- }
|
|
|
- Ok(Message::Ping(s)) => {
|
|
|
- println!("Ping-响应--{:?}", String::from_utf8(s));
|
|
|
- }
|
|
|
- Ok(Message::Pong(s)) => {
|
|
|
- println!("Pong-响应--{:?}-{:?}", String::from_utf8(s), lable.to_string());
|
|
|
- }
|
|
|
- Ok(Message::Close(_)) => {
|
|
|
- println!("socket 关闭: ");
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- println!("Error receiving message: {}", error);
|
|
|
- break;
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- }
|
|
|
- }
|
|
|
- socket.close(None).unwrap();
|
|
|
- }
|
|
|
- }
|
|
|
-}
|