use std::sync::Arc; use std::sync::atomic::AtomicBool; use chrono::{Utc}; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use serde_json::{json, Value}; use tracing::{error, info, trace}; use ring::hmac; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::{Error, Message}; use crate::response_base::ResponseData; use crate::socket_tool::{AbstractWsMode, HeartbeatType}; pub enum BitgetSwapWsType { Public, Private, } #[derive(Clone)] pub enum BitgetSwapSubscribeType { PuTrade, PuBooks1, PrAccount, PrPosition, PrOrders, } #[derive(Clone)] #[allow(dead_code)] pub struct BitgetSwapLogin { pub api_key: String, pub secret_key: String, pub passphrase_key: String, } #[derive(Clone)] pub struct BitgetSwapWs { label: String, // 类型 address_url: String, // 地址 login_param: Option, // 账号 symbol_s: Vec, // 币对 subscribe_types: Vec, // 订阅 heartbeat_time: u64, // 心跳间隔 } impl BitgetSwapWs { pub fn new(is_colo: bool, login_param: Option, ws_type: BitgetSwapWsType) -> BitgetSwapWs { return BitgetSwapWs::new_label("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type); } pub fn new_label(label: String, is_colo: bool, login_param: Option, ws_type: BitgetSwapWsType) -> BitgetSwapWs { let address_url = match ws_type { BitgetSwapWsType::Public => { "wss://ws.bitget.com/v2/ws/public".to_string() } BitgetSwapWsType::Private => { "wss://ws.bitget.com/v2/ws/private".to_string() } }; if is_colo { info!("开启高速(未配置,走普通:{})通道",address_url); } else { info!("走普通通道:{}",address_url); } BitgetSwapWs { label, address_url, login_param, symbol_s: vec![], subscribe_types: vec![], heartbeat_time: 1000 * 10 } } // 添加订阅信息 pub fn set_subscribe(&mut self, subscribe_types: Vec) { self.subscribe_types.extend(subscribe_types); } // 手动添加币对 pub fn set_symbols(&mut self, mut b_array: Vec) { for symbol in b_array.iter_mut() { // 小写 *symbol = symbol.to_uppercase(); // 字符串替换 *symbol = symbol.replace("-", ""); *symbol = symbol.replace("_", ""); } self.symbol_s = b_array; } //频道是否需要登录 fn contains_pr(&self) -> bool { for t in self.subscribe_types.clone() { if match t { BitgetSwapSubscribeType::PuTrade => false, BitgetSwapSubscribeType::PuBooks1 => false, BitgetSwapSubscribeType::PrAccount => true, BitgetSwapSubscribeType::PrOrders => true, BitgetSwapSubscribeType::PrPosition => true } { return true; } } false } /*******************************************************************************************************/ /*****************************************工具函数*******************************************************/ /*******************************************************************************************************/ // 枚举解析成json pub fn enum_to_json(symbol: String, subscribe_type: BitgetSwapSubscribeType) -> Value { match subscribe_type { // 公共订阅 BitgetSwapSubscribeType::PuTrade => { json!({ "instType": "USDT-FUTURES", "channel": "trade", "instId": symbol, }) }, BitgetSwapSubscribeType::PuBooks1 => { json!({ "instType": "USDT-FUTURES", "channel": "books1", "instId": symbol, }) }, // 私有订阅 BitgetSwapSubscribeType::PrAccount => { json!({ "instType": "USDT-FUTURES", "channel": "account", "coin": "default", }) }, BitgetSwapSubscribeType::PrPosition => { json!({ "instType": "USDT-FUTURES", "channel": "positions", "instId": "default" }) }, BitgetSwapSubscribeType::PrOrders => { json!({ "instType": "USDT-FUTURES", "channel": "orders", "instId": "default" }) }, } } // 订阅信息生成 pub fn get_subscription(&self) -> String { let mut params = vec![]; for symbol in &self.symbol_s { for subscribe_type in &self.subscribe_types { let ty_str = Self::enum_to_json(symbol.clone(), subscribe_type.clone()); params.push(ty_str); } } let str = json!({ "op": "subscribe", "args": params }); str.to_string() } /*******************************************************************************************************/ /*****************************************socket基本*****************************************************/ /*******************************************************************************************************/ pub async fn ws_connect_async(&mut self, is_shutdown_arc: Arc, handle_function: F, write_tx_am: &Arc>>, write_to_socket_rx: UnboundedReceiver) -> Result<(), Error> where F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync, Future: std::future::Future + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 () { let login_is = self.contains_pr(); let address_url = self.address_url.clone(); let label = self.label.clone(); let heartbeat_time = self.heartbeat_time.clone(); // 设置订阅 let subscription = self.get_subscription(); let subscribe_array = vec![subscription.to_string()]; info!(?subscribe_array); //心跳-- 方法内部线程启动 let write_tx_clone1 = Arc::clone(write_tx_am); tokio::spawn(async move { let ping_str = json!("ping"); AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.as_str().unwrap().to_string()), heartbeat_time).await; }); //链接 let login_param = self.login_param.clone(); let write_tx_clone2 = Arc::clone(write_tx_am); let t2 = tokio::spawn(async move { let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx)); loop { info!("bitget_usdt_swap socket 连接中……"); // 登录相关 if login_is { let login_param_c = login_param.clone().unwrap(); let timestamp = Utc::now().timestamp().to_string(); // 时间戳 + 请求类型+ 请求参数字符串 let message = format!("{}GET{}", timestamp, "/user/verify"); let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, login_param_c.secret_key.as_bytes()); let result = hmac::sign(&hmac_key, &message.as_bytes()); let sign = base64::encode(result); let login_json = json!({ "op": "login", "args": [{ "apiKey": login_param_c.api_key, "passphrase": login_param_c.passphrase_key, "timestamp": timestamp, "sign": sign }] }); let login_str = login_json.to_string(); info!("发起ws登录: {}", login_str); let write_tx_c = Arc::clone(&write_tx_clone2); AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await; } // ws层重连 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(), login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(), Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await; error!("bitget_usdt_swap socket 断连,重连中……"); } }); tokio::try_join!(t2).unwrap(); trace!("线程-心跳与链接-结束"); Ok(()) } /*******************************************************************************************************/ /*****************************************数据解析*******************************************************/ /******************************************************************************************************/ // 数据解析-Text pub fn message_text(text: String) -> Option { let response_data = Self::ok_text(text); Option::from(response_data) } // 数据解析-ping pub fn message_ping(_pi: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null)); } // 数据解析-pong pub fn message_pong(_po: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null)); } //数据解析-二进制 pub fn message_binary(_po: Vec) -> Option { //二进制WebSocket消息 let message_str = format!("Binary:{:?}", _po); Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null)) } //数据解析 pub fn ok_text(text: String) -> ResponseData { let mut res_data = ResponseData::new("".to_string(), 200, text.clone(), Value::Null); match text.as_str() { "pong" => { res_data.code = -301; res_data.channel = "pong".to_string(); res_data.message = "success".to_string(); }, _ => { let json_value: Value = serde_json::from_str(&text).unwrap(); if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") { if json_value.get("code").is_some() && json_value["code"] == 0 { res_data.message = "登陆成功".to_string(); } else { res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap()); } res_data.channel = "login".to_string(); res_data.code = -200; res_data.data = json_value; } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") { res_data.code = -201; res_data.data = json_value.clone(); res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap()); res_data.message = "success".to_string(); } else if json_value.get("action").is_some() { res_data.data = json_value["data"].clone(); if res_data.data == "[]" { res_data.code = -1; } else { res_data.code = 200; } res_data.message = "success".to_string(); res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap()); res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000; } } } res_data } }