|
|
@@ -1,343 +0,0 @@
|
|
|
-use std::collections::BTreeMap;
|
|
|
-use std::io;
|
|
|
-use std::io::{Write};
|
|
|
-use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
|
-use serde_json::{json, Value};
|
|
|
-use tokio::sync::mpsc::Sender;
|
|
|
-use tracing::trace;
|
|
|
-use crate::proxy;
|
|
|
-use crate::response_base::ResponseData;
|
|
|
-use tungstenite::client::{AutoStream, connect_with_proxy, ProxyAutoStream};
|
|
|
-use tungstenite::{connect, Message, WebSocket};
|
|
|
-use tungstenite::protocol::WebSocketConfig;
|
|
|
-use url::Url;
|
|
|
-
|
|
|
-#[derive(Clone)]
|
|
|
-pub struct BinanceUsdtSwapWs {
|
|
|
- pub label: String,
|
|
|
- //连接地址
|
|
|
- request_url: String,
|
|
|
- //ip
|
|
|
- ip: String,
|
|
|
- //端口
|
|
|
- port: u16,
|
|
|
- //登陆所需参数
|
|
|
- // login_param: BTreeMap<String, String>,
|
|
|
-
|
|
|
- sender: Sender<ResponseData>,
|
|
|
-}
|
|
|
-
|
|
|
-impl BinanceUsdtSwapWs {
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************获取一个对象****************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- pub fn new(is_colo: bool,
|
|
|
- _login_param: BTreeMap<String, String>,
|
|
|
- sender: Sender<ResponseData>) -> BinanceUsdtSwapWs
|
|
|
- {
|
|
|
- return BinanceUsdtSwapWs::new_label("default-BinanceUsdtSwapWs".to_string(), is_colo, _login_param, sender);
|
|
|
- }
|
|
|
- pub fn new_label(label: String, is_colo: bool,
|
|
|
- _login_param: BTreeMap<String, String>,
|
|
|
- sender: Sender<ResponseData>) -> BinanceUsdtSwapWs
|
|
|
- {
|
|
|
- let base_url = if is_colo {
|
|
|
- trace!("不支持colo高速线路");
|
|
|
- "wss://stream.binance.com:443/ws".to_string()
|
|
|
- } else {
|
|
|
- "wss://stream.binance.com:443/ws".to_string()
|
|
|
- };
|
|
|
- let mut ip_v = "";
|
|
|
- let mut port_v = 8080;
|
|
|
-
|
|
|
- /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
|
|
|
- let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
|
|
|
- if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
|
|
|
- ip_v = parsing_detail.ip_address.as_str();
|
|
|
- port_v = parsing_detail.port.parse().unwrap();
|
|
|
- }
|
|
|
-
|
|
|
- /*****返回结构体*******/
|
|
|
- BinanceUsdtSwapWs {
|
|
|
- label,
|
|
|
- request_url: base_url,
|
|
|
- ip: ip_v.clone().to_string(),
|
|
|
- port: port_v,
|
|
|
- // login_param,
|
|
|
- sender,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************订阅函数********************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- //K线-不需要认证
|
|
|
- pub async fn kline(&self, b_array: Vec<&str>)
|
|
|
- {
|
|
|
- //订阅信息拼接
|
|
|
- let mut params = vec![];
|
|
|
- for item in &b_array {
|
|
|
- let mut b_name = item.to_lowercase();
|
|
|
- b_name = format!("{}@{}", b_name, "kline_1s");
|
|
|
- params.push(b_name);
|
|
|
- }
|
|
|
- self.run(params).await;
|
|
|
- }
|
|
|
-
|
|
|
- //自定义-不需要认证
|
|
|
- pub async fn custom_subscribe(&self, b_array: Vec<String>, sub_trade: u8, sub_fast: u8)
|
|
|
- {
|
|
|
- let mut params = vec![];
|
|
|
-
|
|
|
- for item in &b_array {
|
|
|
- let b_name = item.to_lowercase();
|
|
|
- if sub_trade > 0 {
|
|
|
- params.push(format!("{}@aggTrade", b_name));
|
|
|
- }
|
|
|
- if sub_fast > 0 {
|
|
|
- params.push(format!("{}@bookTicker", b_name));
|
|
|
- } else {
|
|
|
- params.push(format!("{}@depth20@100ms", b_name));
|
|
|
- }
|
|
|
- }
|
|
|
- self.run(params).await;
|
|
|
- }
|
|
|
-
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************工具函数********************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
-
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************socket基本*****************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- async fn run(&self, params: Vec<String>)
|
|
|
- {
|
|
|
- //订阅信息组装
|
|
|
- let subscription = json!({
|
|
|
- "method": "SUBSCRIBE",
|
|
|
- "params":params,
|
|
|
- "id": 1
|
|
|
- });
|
|
|
- // let parse_fn_arc = Arc::new(Mutex::new(parse_fn)); // Wrap the closure in an Arc<Mutex<_>>
|
|
|
- loop {
|
|
|
- trace!("要连接咯~~!!");
|
|
|
- //币安-登陆流程-rest请求获取k然后拿到 key 拼接地址
|
|
|
- // if self.is_login { //暂时没看到有订阅的频道需要登陆 所以暂时不做
|
|
|
- // }
|
|
|
-
|
|
|
- let request_url = Url::parse(self.request_url.as_str()).unwrap();
|
|
|
- //1. 判断是否需要代理,根据代理地址是否存来选择
|
|
|
- if self.ip.len() > 0 {
|
|
|
- let ip_array: Vec<&str> = self.ip.split(".").collect();
|
|
|
- let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
|
|
|
- ip_array[0].parse().unwrap(),
|
|
|
- ip_array[1].parse().unwrap(),
|
|
|
- ip_array[2].parse().unwrap(),
|
|
|
- ip_array[3].parse().unwrap())
|
|
|
- ), self.port);
|
|
|
- let websocket_config = Some(WebSocketConfig {
|
|
|
- max_send_queue: Some(16),
|
|
|
- max_message_size: Some(16 * 1024 * 1024),
|
|
|
- max_frame_size: Some(16 * 1024 * 1024),
|
|
|
- accept_unmasked_frames: false,
|
|
|
- });
|
|
|
- let max_redirects = 5;
|
|
|
- match connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects) {
|
|
|
- Ok(ws) => {
|
|
|
- self.proxy_subscription(ws.0, &subscription).await;
|
|
|
- }
|
|
|
- Err(err) => {
|
|
|
- trace!("Can't connect(无法连接): {}", err);
|
|
|
- }
|
|
|
- };
|
|
|
- } else {
|
|
|
- match connect(request_url.clone()) {
|
|
|
- Ok(ws) => {
|
|
|
- self.subscription(ws.0, &subscription).await;
|
|
|
- }
|
|
|
- Err(err) => {
|
|
|
- // 连接失败时执行的操作
|
|
|
- trace!("Can't connect(无法连接): {}", err);
|
|
|
- // 返回一个默认的 WebSocket 对象或其他适当的值
|
|
|
- // 或者根据需要触发 panic 或返回错误信息
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
- trace!("退出来咯")
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //代理
|
|
|
- async fn proxy_subscription(&self, mut web_socket: WebSocket<ProxyAutoStream>,
|
|
|
- subscription: &serde_json::Value,
|
|
|
- )
|
|
|
- {
|
|
|
- let label = self.label.clone();
|
|
|
- /*****消息溜***/
|
|
|
- let mut stdout = io::stdout();
|
|
|
- // let mut stderr = io::stderr();
|
|
|
- // /*****是否需要登陆****/
|
|
|
- // if self.is_login {
|
|
|
- // trace!("----需要登陆");
|
|
|
- // } else {
|
|
|
- // trace!("----no longin(不需要登陆)");
|
|
|
- // }
|
|
|
- /******订阅信息********/
|
|
|
- let sub_json = subscription.clone();
|
|
|
- trace!("--订阅内容:{:?}", sub_json.to_string());
|
|
|
- let sub_json_str = sub_json.to_string();
|
|
|
- web_socket.write_message(Message::Text(sub_json_str))
|
|
|
- .unwrap();
|
|
|
- loop {
|
|
|
- let msg = web_socket.read_message();
|
|
|
- match msg {
|
|
|
- Ok(Message::Text(text)) => {
|
|
|
- let res_data = Self::ok_text(label.to_string(), text);
|
|
|
-
|
|
|
- let sender = self.sender.clone();
|
|
|
- tokio::spawn(async move {
|
|
|
- sender.send(res_data).await.unwrap();
|
|
|
- });
|
|
|
- tokio::spawn(async move {});
|
|
|
- }
|
|
|
- Ok(Message::Ping(s)) => {
|
|
|
- writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s.clone())).unwrap();
|
|
|
- let _ = web_socket.write_message(Message::Pong(Vec::from("pong")));
|
|
|
- writeln!(stdout, "回应-pong---{:?}", String::from_utf8(s.clone())).unwrap();
|
|
|
- }
|
|
|
- Ok(Message::Pong(s)) => {
|
|
|
- // trace!("Pong-响应--{:?}", String::from_utf8(s));
|
|
|
- writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s.clone())).unwrap();
|
|
|
- }
|
|
|
- Ok(Message::Close(_)) => {
|
|
|
- // trace!("socket 关闭: ");
|
|
|
- writeln!(stdout, "Close-响应").unwrap();
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- // trace!("Error receiving message: {}", error);
|
|
|
- writeln!(stdout, "Err-响应{}", error).unwrap();
|
|
|
- break;
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- }
|
|
|
- }
|
|
|
- web_socket.close(None).unwrap();
|
|
|
- }
|
|
|
- //非代理
|
|
|
- async fn subscription(&self, mut web_socket: WebSocket<AutoStream>,
|
|
|
- subscription: &serde_json::Value,
|
|
|
- )
|
|
|
- {
|
|
|
- let label = self.label.clone();
|
|
|
- /*****消息溜***/
|
|
|
- // let stdout = io::stdout();
|
|
|
- // let mut stderr = io::stderr();
|
|
|
- /*****是否需要登陆****/
|
|
|
- // if self.is_login {
|
|
|
- // trace!("----需要登陆");
|
|
|
- // // let login_json_str = self.log_in_to_str();
|
|
|
- // // web_socket.write_message(Message::Text(login_json_str)).unwrap();
|
|
|
- // // thread::sleep(Duration::from_secs(1));
|
|
|
- // } else {
|
|
|
- // trace!("----no longin(不需要登陆)");
|
|
|
- // }
|
|
|
- /******订阅信息********/
|
|
|
- let sub_json = subscription.clone();
|
|
|
- trace!("--订阅内容:{:?}", sub_json);
|
|
|
- let sub_json_str = sub_json.to_string();
|
|
|
- web_socket.write_message(Message::Text(sub_json_str))
|
|
|
- .unwrap();
|
|
|
- loop {
|
|
|
- let msg = web_socket.read_message();
|
|
|
- match msg {
|
|
|
- Ok(Message::Text(text)) => {
|
|
|
- let res_data = Self::ok_text(label.to_string(), text);
|
|
|
- self.sender.send(res_data).await.unwrap();
|
|
|
- // writeln!(stdout, "Pong-响应--{:?}", res_data).unwrap();
|
|
|
- // let parse_fn_clone = Arc::clone(parse_fn); // Clone the Arc for each iteration
|
|
|
- // tokio::spawn(async move {
|
|
|
- // let parse_fn_lock = parse_fn_clone.lock().await;
|
|
|
- // parse_fn_lock(res_data).await;
|
|
|
- // });
|
|
|
- }
|
|
|
- Ok(Message::Ping(s)) => {
|
|
|
- trace!("Ping-响应--{:?}", String::from_utf8(s));
|
|
|
- }
|
|
|
- Ok(Message::Pong(s)) => {
|
|
|
- trace!("Pong-响应--{:?}", String::from_utf8(s));
|
|
|
- }
|
|
|
- Ok(Message::Close(_)) => {
|
|
|
- trace!("socket 关闭: ");
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- trace!("Error receiving message: {}", error);
|
|
|
- break;
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- }
|
|
|
- }
|
|
|
- web_socket.close(None).unwrap();
|
|
|
- }
|
|
|
-
|
|
|
- //数据解析
|
|
|
- pub fn ok_text(label: String, text: String) -> ResponseData
|
|
|
- {
|
|
|
- let mut res_data = ResponseData::new(label.to_string(), "200".to_string(), "success".to_string(), "".to_string());
|
|
|
- let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
|
|
|
- if json_value.get("result").is_some() {
|
|
|
- //订阅反馈
|
|
|
- } else {
|
|
|
- // trace!("????-----:{}", json_value);
|
|
|
- //数据解析
|
|
|
- if let Some(json_obj) = json_value.as_object() {
|
|
|
- if Self::channel_agg_trade(json_obj.clone()) {
|
|
|
- res_data.channel = "aggTrade".to_string();
|
|
|
- } else if Self::channel_book_ticker(json_obj.clone()) {
|
|
|
- res_data.channel = "bookTicker".to_string();
|
|
|
- } else if Self::channel_depth(json_obj.clone()) {
|
|
|
- res_data.channel = "depth".to_string();
|
|
|
- } else {
|
|
|
- res_data.channel = "未解析频道".to_string();
|
|
|
- }
|
|
|
- res_data.data = text
|
|
|
- }
|
|
|
- }
|
|
|
- res_data
|
|
|
- }
|
|
|
-
|
|
|
- pub fn channel_agg_trade(json_obj: serde_json::Map<String, Value>) -> bool {
|
|
|
- let mut is_agg_trade = false;
|
|
|
- if let Some(name) = json_obj.get("e") {
|
|
|
- if let Some(name_str) = name.as_str() {
|
|
|
- if name_str == "aggTrade" {
|
|
|
- is_agg_trade = true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- is_agg_trade
|
|
|
- }
|
|
|
- pub fn channel_book_ticker(json_obj: serde_json::Map<String, Value>) -> bool {
|
|
|
- let mut is_book_ticker = false;
|
|
|
- if json_obj.contains_key("A") &&
|
|
|
- json_obj.contains_key("B") &&
|
|
|
- json_obj.contains_key("a") &&
|
|
|
- json_obj.contains_key("b") &&
|
|
|
- json_obj.contains_key("s") &&
|
|
|
- json_obj.contains_key("u")
|
|
|
- {
|
|
|
- is_book_ticker = true;
|
|
|
- }
|
|
|
- is_book_ticker
|
|
|
- }
|
|
|
- pub fn channel_depth(json_obj: serde_json::Map<String, Value>) -> bool {
|
|
|
- let mut is_book_ticker = false;
|
|
|
- if json_obj.contains_key("asks") &&
|
|
|
- json_obj.contains_key("bids") &&
|
|
|
- json_obj.contains_key("lastUpdateId")
|
|
|
- {
|
|
|
- is_book_ticker = true;
|
|
|
- }
|
|
|
- is_book_ticker
|
|
|
- }
|
|
|
-}
|