use std::io::Read; use std::str::from_utf8; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::time::Duration; use chrono::Utc; use flate2::bufread::GzDecoder; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use once_cell::sync::Lazy; use ring::hmac; use serde_json::{json, Value}; use tokio::sync::Mutex; use tokio::task; use tokio_tungstenite::tungstenite::{Error, Message}; use tracing::{error, info, trace}; use crate::response_base::ResponseData; use crate::socket_tool::{AbstractWsMode, HeartbeatType}; pub(crate) static LOGIN_DATA: Lazy> = Lazy::new(|| { println!("初始化..."); // 0: 需要登录, 1:是否已经登录 Mutex::new((false, false)) }); pub enum BitMartSwapWsType { Public, Private, } //订阅频道 #[derive(Clone)] pub enum BitMartSwapSubscribeType { // 深度 PuFuturesDepth, // 公开成交 PuFuturesTrades, // K线数据 PuFuturesRecords, // // 深度 // PuFuturesDepth, // // 公开成交 // PuFuturesTrades, // // K线数据 // PuFuturesRecords, // // // 订单 // PrFuturesOrders, // // 仓位 // PrFuturesPositions, // // 余额 // PrFuturesBalances, } //账号信息 #[derive(Clone)] #[allow(dead_code)] pub struct BitMartSwapLogin { pub api_key: String, pub secret: String, pub api_memo: String, } #[derive(Clone)] pub struct BitMartSwapWs { tag: String, // 类型 address_url: String, // 地址 login_param: Option, // 账号 symbol_s: Vec, // 币对 subscribe_types: Vec, // 订阅 heartbeat_time: u64, // 心跳间隔 } impl BitMartSwapWs { /*******************************************************************************************************/ /*****************************************实例化一个对象****************************************************/ /*******************************************************************************************************/ pub fn new(is_colo: bool, login_param: Option, ws_type: BitMartSwapWsType) -> BitMartSwapWs { return Self::new_with_tag("default-BingxSwapWs".to_string(), is_colo, login_param, ws_type); } pub fn new_with_tag(tag: String, _is_colo: bool, login_param: Option, ws_type: BitMartSwapWsType) -> BitMartSwapWs { /*******公共频道-私有频道数据组装*/ let address_url = match ws_type { BitMartSwapWsType::Public => { let url = "wss://openapi-ws.bitmart.com/api?protocol=1.1".to_string(); info!("走普通通道(不支持colo通道):{}", url); url } BitMartSwapWsType::Private => { let url = "wss://openapi-ws.bitmart.com/user?protocol=1.1".to_string(); info!("走普通通道(不支持colo通道):{}", url); url } }; BitMartSwapWs { tag, address_url, login_param, symbol_s: vec![], subscribe_types: vec![], heartbeat_time: 1000 * 5, } } /*******************************************************************************************************/ /*****************************************订阅函数********************************************************/ /*******************************************************************************************************/ //手动添加订阅信息 pub fn set_subscribe(&mut self, subscribe_types: Vec) { self.subscribe_types.extend(subscribe_types); } //手动添加币对 pub fn set_symbols(&mut self, mut b_array: Vec) { for symbol in b_array.iter_mut() { // 大写 *symbol = symbol.to_uppercase(); // 字符串替换 *symbol = symbol.replace("_", ""); } self.symbol_s = b_array; } //频道是否需要登录 fn contains_pr(&self) -> bool { for t in self.subscribe_types.clone() { if match t { BitMartSwapSubscribeType::PuFuturesDepth => false, BitMartSwapSubscribeType::PuFuturesTrades => false, BitMartSwapSubscribeType::PuFuturesRecords => false, } { return true; } } false } /*******************************************************************************************************/ /*****************************************工具函数********************************************************/ /*******************************************************************************************************/ //订阅枚举解析 pub fn enum_to_string(symbol: String, subscribe_type: BitMartSwapSubscribeType, _login_param: Option) -> String { // let access_key; // let secret_key; // match login_param { // None => { // access_key = "".to_string(); // secret_key = "".to_string(); // } // Some(param) => { // access_key = param.api_key.clone(); // secret_key = param.secret.clone(); // } // } // let cid = ""; match subscribe_type { BitMartSwapSubscribeType::PuFuturesDepth => { format!("futures/depth5:{}", symbol.to_uppercase()) } BitMartSwapSubscribeType::PuFuturesTrades => { format!("futures/trade:{}", symbol.to_uppercase()) } BitMartSwapSubscribeType::PuFuturesRecords => { format!("futures/klineBin1m:{}", symbol.to_uppercase()) } } } //订阅信息生成 pub fn get_subscription(&self) -> Value { let mut args = vec![]; // 只获取第一个 for symbol in &self.symbol_s { for subscribe_type in &self.subscribe_types { let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone(), self.login_param.clone(), ); args.push(ty_str); } } json!({ "action":"subscribe", "args":args }) } /*******************************************************************************************************/ /*****************************************socket基本*****************************************************/ /*******************************************************************************************************/ //链接 pub async fn ws_connect_async(&mut self, is_shutdown_arc: Arc, handle_function: F, write_tx_am: &Arc>>, write_to_socket_rx: UnboundedReceiver) -> Result<(), Error> where F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync, Future: std::future::Future + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 () { let login_is = self.contains_pr(); let login_param = self.login_param.clone(); let subscription = self.get_subscription(); let address_url = self.address_url.clone(); let label = self.tag.clone(); let heartbeat_time = self.heartbeat_time.clone(); //心跳-- 方法内部线程启动 let write_tx_clone1 = Arc::clone(write_tx_am); let write_tx_clone2 = Arc::clone(write_tx_am); tokio::spawn(async move { trace!("线程-异步心跳-开始"); AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await; trace!("线程-异步心跳-结束"); }); //设置订阅 let mut subscribe_array = vec![]; subscribe_array.push(subscription.to_string()); //链接 let t2 = tokio::spawn(async move { let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx)); info!("启动连接"); loop { info!("BitMart_usdt_swap socket 连接中……"); // 需要登录 if login_is { let mut login_data = LOGIN_DATA.lock().await; let login_param_real = login_param.clone().unwrap(); login_data.0 = true; let timestamp = Utc::now().timestamp_millis().to_string(); let api_key = login_param_real.api_key.clone(); let secret_key = login_param_real.secret.clone(); let api_memo = login_param_real.api_memo.clone(); // let timestamp = "1589267764859".to_string(); // let api_key = "80618e45710812162b04892c7ee5ead4a3cc3e56".to_string(); // let secret_key = "6c6c98544461bbe71db2bca4c6d7fd0021e0ba9efc215f9c6ad41852df9d9df9".to_string(); // let api_memo = "test001".to_string(); let sign = { let message = format!("{}#{}#bitmart.WebSocket", timestamp.clone(), api_memo); trace!("组装数据:\n{}", message); let signed_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_ref()); let sign = hex::encode(hmac::sign(&signed_key, message.as_bytes()).as_ref()); sign }; trace!("参考sign-3ceeb7e1b8cb165a975e28a2e2dfaca4d30b358873c0351c1a071d8c83314556",); trace!("自己的sign-{}",sign.clone()); let mut args = vec![]; args.push(api_key.clone()); args.push(timestamp.clone()); args.push(sign.clone()); args.push(String::from("web")); // {"action":"access","args":["","","",""]} let login_param = json!({ "action": "access", "args": [ api_key, timestamp.as_str(),sign.as_str(),"web" ] }); let login_str = login_param.to_string(); info!("发起ws登录: {}", login_str); let write_tx_c = Arc::clone(&write_tx_clone2); AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await; } AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(), login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(), Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync, ).await; let mut login_data = LOGIN_DATA.lock().await; // 断联后 设置为没有登录 login_data.1 = false; info!("BitMart_usdt_swap socket 断连,1s以后重连……"); error!("BitMart_usdt_swap socket 断连,1s以后重连……"); tokio::time::sleep(Duration::from_secs(1)).await; } }); tokio::try_join!(t2).unwrap(); trace!("线程-心跳与链接-结束"); Ok(()) } /*******************************************************************************************************/ /*****************************************数据解析*****************************************************/ /*******************************************************************************************************/ //数据解析-Text pub async fn message_text(text: String) -> Option { let response_data = Self::ok_text(text).await; Option::from(response_data) } pub fn message_text_sync(text: String) -> Option { // 使用 tokio::task::block_in_place 来等待异步函数的结果 task::block_in_place(|| { tokio::runtime::Handle::current().block_on(Self::message_text(text)) }) } //数据解析-ping pub fn message_ping(_pi: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null)); } //数据解析-pong pub fn message_pong(_po: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null)); } //数据解析-二进制 pub async fn message_binary(binary: Vec) -> Option { //二进制WebSocket消息 let message_str = Self::parse_zip_data(binary); let response_data = Self::ok_text(message_str).await; Option::from(response_data) } pub fn message_binary_sync(binary: Vec) -> Option { // 使用 tokio::task::block_in_place 来等待异步函数的结果 task::block_in_place(|| { tokio::runtime::Handle::current().block_on(Self::message_binary(binary)) }) } //数据解析 pub async fn ok_text(text: String) -> ResponseData { // info!("原始数据:{}", text); let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null); let json_value: Value = serde_json::from_str(&text).unwrap(); // {"action":"access","success":true} let action = json_value["action"].as_str(); match action { None => {} Some(r) => { match r { "access" => { /*登录响应*/ let success = json_value["success"].as_bool(); match success { None => {} Some(s) => { if s { res_data.code = -200; res_data.message = "登录成功".to_string(); } else { res_data.code = 400; res_data.message = format!("登录失败:{}", json_value["error"].as_str().unwrap()); } return res_data; } } } "subscribe" => { /*订阅响应*/ let success = json_value["success"].as_bool(); match success { None => {} Some(s) => { if s { res_data.code = -201; res_data.message = format!("订阅成功:{}", json_value["request"].clone().to_string()); } else { res_data.code = 400; res_data.message = format!("订阅失败:{}", json_value["error"].as_str().unwrap()); } return res_data; } } } "unsubscribe" => { /*取消订阅响应*/ let success = json_value["success"].as_bool(); match success { None => {} Some(s) => { if s { res_data.code = -201; res_data.message = format!("取消-订阅成功:{}", json_value["request"].clone().to_string()); } else { res_data.code = 400; res_data.message = format!("取消-订阅失败:{}", json_value["error"].as_str().unwrap()); } return res_data; } } } _ => {} } } } let group = json_value["group"].as_str(); match group { Some(ch) => { res_data.code = 200; res_data.data = json_value["data"].clone(); //订阅数据 甄别 if ch.contains("futures/depth") { res_data.channel = "futures.order_book".to_string(); } else if ch.contains("futures/trade") { res_data.channel = "futures.trades".to_string(); } else if ch.contains("futures/klineBin") { res_data.channel = "futures.candlesticks".to_string(); } else { res_data.channel = "未知推送数据".to_string(); } return res_data; // match data { // Some(_) => { // // } // None => { // res_data.channel = format!("{}", ch); // res_data.code = 400; // res_data.data = data.clone(); // return res_data; // } // } } None => {} } res_data.code = 400; res_data.message = format!("未知响应内容"); res_data.data = text.parse().unwrap(); trace!("--------------------------------"); res_data } fn parse_zip_data(p0: Vec) -> String { // 创建一个GzDecoder的实例,将压缩数据作为输入 let mut decoder = GzDecoder::new(&p0[..]); // 创建一个缓冲区来存放解压缩后的数据 let mut decompressed_data = Vec::new(); // 读取解压缩的数据到缓冲区中 decoder.read_to_end(&mut decompressed_data).expect("解压缩失败"); let result = from_utf8(&decompressed_data) .expect("解压缩后的数据不是有效的UTF-8"); // info!("解压缩数据 {:?}", result); result.to_string() } }