use std::io::Read; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::time::Duration; use flate2::read::GzDecoder; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use serde_json::json; use serde_json::Value; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::{Error, Message}; use tracing::{error, info, trace}; use crate::response_base::ResponseData; use crate::socket_tool::AbstractWsMode; //类型 pub enum BingxSwapWsType { PublicAndPrivate, } #[derive(Debug)] #[derive(Clone)] pub struct BingxSwapWsParam { pub token: String, pub ws_url: String, pub ws_ping_interval: i64, pub ws_ping_timeout: i64, pub is_ok_subscribe: bool, } //订阅频道 #[derive(Clone)] pub enum BingxSwapSubscribeType { // 深度 PuFuturesDepth, // 公开成交 PuFuturesTrades, // K线数据 PuFuturesRecords, } //账号信息 #[derive(Clone, Debug)] pub struct BingxSwapLogin { pub access_key: String, pub secret_key: String, pub pass_key: String, } #[derive(Clone)] #[allow(dead_code)] pub struct BingxSwapWs { //类型 tag: String, //地址 address_url: String, //账号 login_param: Option, //登录数据 ws_param: BingxSwapWsParam, //币对 symbol_s: Vec, //订阅 subscribe_types: Vec, //心跳间隔 heartbeat_time: u64, } impl BingxSwapWs { /*******************************************************************************************************/ /*****************************************获取一个对象****************************************************/ /*******************************************************************************************************/ pub fn new(is_colo: bool, login_param: Option, ws_type: BingxSwapWsType) -> BingxSwapWs { return Self::new_with_tag("default-BingxSwapWs".to_string(), is_colo, login_param, ws_type); } pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option, ws_type: BingxSwapWsType) -> BingxSwapWs { /*******公共频道-私有频道数据组装*/ let address_url = match ws_type { BingxSwapWsType::PublicAndPrivate => { let url = "wss://open-api-swap.bingx.com/swap-market".to_string(); info!("走普通通道(不支持colo通道):{}", url); url } }; /*******公共频道-私有频道数据组装*/ let ws_param = BingxSwapWsParam { token: "".to_string(), ws_url: "".to_string(), ws_ping_interval: 0, ws_ping_timeout: 0, is_ok_subscribe: false, }; if is_colo { info!("开启高速(未配置,走普通:{})通道",address_url); } else { info!("走普通通道:{}",address_url); } BingxSwapWs { tag, address_url, login_param, ws_param, symbol_s: vec![], subscribe_types: vec![], heartbeat_time: 1000 * 18, } } /*******************************************************************************************************/ /*****************************************订阅函数********************************************************/ /*******************************************************************************************************/ //手动添加订阅信息 pub fn set_subscribe(&mut self, subscribe_types: Vec) { self.subscribe_types.extend(subscribe_types); } //手动添加币对 pub fn set_symbols(&mut self, mut b_array: Vec) { for symbol in b_array.iter_mut() { // 大写 *symbol = symbol.to_uppercase(); // 字符串替换 *symbol = symbol.replace("_", "-"); } self.symbol_s = b_array; } fn contains_pr(&self) -> bool { for t in self.subscribe_types.clone() { if match t { BingxSwapSubscribeType::PuFuturesTrades => false, BingxSwapSubscribeType::PuFuturesRecords => false, BingxSwapSubscribeType::PuFuturesDepth => false, } { return true; } } false } /*******************************************************************************************************/ /*****************************************工具函数********************************************************/ /*******************************************************************************************************/ //订阅枚举解析 pub fn enum_to_string(symbol: String, subscribe_type: BingxSwapSubscribeType) -> String { match subscribe_type { BingxSwapSubscribeType::PuFuturesDepth => { format!("{}@depth5@100ms", symbol) } BingxSwapSubscribeType::PuFuturesRecords => { format!("{}@kline_1m", symbol) } BingxSwapSubscribeType::PuFuturesTrades => { format!("{}@trade", symbol) } } } //订阅信息生成 pub fn get_subscription(&self) -> Vec { let mut array = vec![]; for symbol in &self.symbol_s { for subscribe_type in &self.subscribe_types { let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone()); let str = json!({ "id": "id1", "reqType": "sub", "dataType": ty_str.to_string() }); array.push(str.to_string()); } } array } /*******************************************************************************************************/ /*****************************************socket基本*****************************************************/ /*******************************************************************************************************/ //链接 pub async fn ws_connect_async(&mut self, is_shutdown_arc: Arc, handle_function: F, _write_tx_am: &Arc>>, write_to_socket_rx: UnboundedReceiver) -> Result<(), Error> where F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync, Future: std::future::Future + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 () { let login_is = self.contains_pr(); let subscription = self.get_subscription(); let address_url = self.address_url.clone(); let tag = self.tag.clone(); // let heartbeat_time = self.ws_param.ws_ping_interval.clone(); //心跳-- 方法内部线程启动 // let write_tx_clone1 = write_tx_am.clone(); // tokio::spawn(async move { // trace!("线程-异步心跳-开始"); // AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await; // trace!("线程-异步心跳-结束"); // }); //设置订阅 let subscribe_array = subscription.clone(); if login_is { //登录相关 } //1 链接 let t2 = tokio::spawn(async move { let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx)); loop { info!("Bingx_usdt_swap socket 连接中……"); AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(), false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(), Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await; error!("Bingx_usdt_swap socket 断连,1s以后重连……"); tokio::time::sleep(Duration::from_secs(1)).await; } }); tokio::try_join!(t2).unwrap(); trace!("线程-心跳与链接-结束"); Ok(()) } /*******************************************************************************************************/ /*****************************************数据解析*****************************************************/ /*******************************************************************************************************/ //数据解析-Text pub fn message_text(text: String) -> Option { let response_data = Self::ok_text(text); Option::from(response_data) } //数据解析-ping pub fn message_ping(_pi: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null)); } //数据解析-pong pub fn message_pong(_po: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null)); } //数据解析-二进制 pub fn message_binary(po: Vec) -> Option { //二进制WebSocket消息 // let message_str = format!("Binary:{:?}", _po); // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null)) // let result = String::from_utf8(bytes); // let result = String::from_utf8(po); let mut gz_decoder = GzDecoder::new(&po[..]); let mut decompressed_data = Vec::new(); // 尝试解压数据 if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) { // 将解压后的字节向量转换为 UTF-8 字符串 match String::from_utf8(decompressed_data) { Ok(text) => { let response_data = Self::ok_text(text); return Option::from(response_data); } Err(_) => { return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null)); } } } else { return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null)); } } //数据解析 pub fn ok_text(text: String) -> ResponseData { // trace!("原始数据:{:?}",text); match text.as_str() { "Ping" => { return ResponseData::new("".to_string(), -300, "success".to_string(), Value::String(String::from("Pong"))); } _ => {} } let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null); let json_value: serde_json::Value = serde_json::from_str(&text).unwrap(); // { "id": "id1", "code": 0, "msg": "" } if json_value["id"].as_str() == Option::from("id1") { //订阅 if json_value["code"].as_i64() == Option::from(0) { res_data.code = -201; res_data.message = "订阅成功".to_string(); } else { res_data.code = 400; res_data.message = "订阅失败".to_string(); } } else if json_value["code"].as_i64() == Option::from(0) { res_data.code = 200; res_data.data = json_value.clone(); //订阅数据 甄别 let data_type = json_value["dataType"].as_str().unwrap(); if data_type.contains("@depth") { res_data.channel = "futures.order_book".to_string(); } else if data_type.contains("@trade") { res_data.channel = "futures.trades".to_string(); } else if data_type.contains("@kline_1m") { res_data.channel = "futures.candlesticks".to_string(); } else { res_data.channel = "未知推送数据".to_string(); } } else { res_data.code = -1; res_data.message = "未知解析".to_string(); } res_data } }