use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::time::Duration; use chrono::Utc; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use hex; use hmac::{Hmac, Mac, NewMac}; use serde_json::{json, Value}; use sha2::Sha512; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::{Error, Message}; use tracing::{error, info, trace}; use crate::response_base::ResponseData; use crate::socket_tool::{AbstractWsMode, HeartbeatType}; //类型 pub enum GateSwapWsType { PublicAndPrivate(String), } //订阅频道 #[derive(Clone)] pub enum GateSwapSubscribeType { PuFuturesOrderBook, PuFuturesCandlesticks, PuFuturesTrades, PuFuturesBookTicker, PrFuturesOrders(String), PrFuturesPositions(String), PrFuturesBalances(String), } //账号信息 #[derive(Clone)] #[allow(dead_code)] pub struct GateSwapLogin { pub api_key: String, pub secret: String, } #[derive(Clone)] pub struct GateSwapWs { //类型 label: String, //地址 address_url: String, //账号信息 login_param: Option, //币对 symbol_s: Vec, //订阅 subscribe_types: Vec, //心跳间隔 heartbeat_time: u64, } impl GateSwapWs { /*******************************************************************************************************/ /*****************************************获取一个对象****************************************************/ /*******************************************************************************************************/ pub fn new(is_colo: bool, login_param: Option, ws_type: GateSwapWsType) -> GateSwapWs { return GateSwapWs::new_label("default-GateSwapWs".to_string(), is_colo, login_param, ws_type); } pub fn new_label(label: String, is_colo: bool, login_param: Option, ws_type: GateSwapWsType) -> GateSwapWs { /*******公共频道-私有频道数据组装*/ let address_url = match ws_type { GateSwapWsType::PublicAndPrivate(name) => { if is_colo { let url = format!("wss://fxws-private.gateapi.io/v4/ws/{}", name.to_string()); info!("开启高速通道:{:?}",url); url } else { let url = format!("wss://fx-ws.gateio.ws/v4/ws/{}", name.to_string()); info!("走普通通道:{}",url); url } } }; GateSwapWs { label, address_url, login_param, symbol_s: vec![], subscribe_types: vec![], heartbeat_time: 1000 * 10, } } /*******************************************************************************************************/ /*****************************************订阅函数********************************************************/ /*******************************************************************************************************/ //手动添加订阅信息 pub fn set_subscribe(&mut self, subscribe_types: Vec) { self.subscribe_types.extend(subscribe_types); } //手动添加币对 pub fn set_symbols(&mut self, mut b_array: Vec) { for symbol in b_array.iter_mut() { // 大写 *symbol = symbol.to_uppercase(); // 字符串替换 *symbol = symbol.replace("-", "_"); } self.symbol_s = b_array; } //频道是否需要登录 fn contains_pr(&self) -> bool { for t in self.subscribe_types.clone() { if match t { GateSwapSubscribeType::PuFuturesOrderBook => false, GateSwapSubscribeType::PuFuturesCandlesticks => false, GateSwapSubscribeType::PuFuturesTrades => false, GateSwapSubscribeType::PuFuturesBookTicker => false, GateSwapSubscribeType::PrFuturesOrders(_) => true, GateSwapSubscribeType::PrFuturesPositions(_) => true, GateSwapSubscribeType::PrFuturesBalances(_) => true, } { return true; } } false } /*******************************************************************************************************/ /*****************************************工具函数********************************************************/ /*******************************************************************************************************/ //订阅枚举解析 pub fn enum_to_string(symbol: String, subscribe_type: GateSwapSubscribeType, login_param: Option) -> Value { let time = chrono::Utc::now().timestamp(); let mut access_key = "".to_string(); let mut secret_key = "".to_string(); match login_param { None => {} Some(param) => { access_key = param.api_key.clone(); secret_key = param.secret.clone(); } } match subscribe_type { GateSwapSubscribeType::PuFuturesOrderBook => { json!({ "time": time, "channel": "futures.order_book", "event": "subscribe", "payload": [symbol, "20", "0"] }) } GateSwapSubscribeType::PuFuturesBookTicker => { json!({ "time": time, "channel": "futures.book_ticker", "event": "subscribe", "payload": [symbol] }) } GateSwapSubscribeType::PuFuturesCandlesticks => { json!({ "time": time, "channel": "futures.candlesticks", "event": "subscribe", "payload": ["1m", symbol] }) } GateSwapSubscribeType::PrFuturesOrders(user_id) => { json!({ "time": time, "channel": "futures.orders", "event": "subscribe", "payload": [user_id, symbol], "auth": { "method": "api_key", "KEY": access_key, "SIGN":Self::sign(secret_key.to_string(), "futures.orders".to_string(), "subscribe".to_string(), time.to_string()) } }) } GateSwapSubscribeType::PrFuturesPositions(user_id) => { json!({ "time": time, "channel": "futures.positions", "event": "subscribe", "payload": [user_id, symbol], "auth": { "method": "api_key", "KEY": access_key, "SIGN":Self::sign(secret_key.to_string(), "futures.positions".to_string(), "subscribe".to_string(), time.to_string()) } }) } GateSwapSubscribeType::PrFuturesBalances(user_id) => { json!({ "time": time, "channel": "futures.balances", "event": "subscribe", "payload": [user_id], "auth": { "method": "api_key", "KEY": access_key, "SIGN":Self::sign(secret_key.to_string(), "futures.balances".to_string(), "subscribe".to_string(), time.to_string()) } }) } GateSwapSubscribeType::PuFuturesTrades => { json!({ "time": time, "channel": "futures.trades", "event": "subscribe", "payload": [symbol] }) } } } //订阅信息生成 pub fn get_subscription(&self) -> Vec { let mut args = vec![]; for symbol in &self.symbol_s { for subscribe_type in &self.subscribe_types { let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone(), self.login_param.clone(), ); args.push(ty_str); } } args } //生成签名 fn sign(secret_key: String, channel: String, event: String, time: String) -> String { let message = format!("channel={}&event={}&time={}", channel, event, time); let mut mac = Hmac::::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC"); mac.update(message.as_bytes()); let result = mac.finalize().into_bytes(); let sign = hex::encode(result); sign } /*******************************************************************************************************/ /*****************************************socket基本*****************************************************/ /*******************************************************************************************************/ //链接 pub async fn ws_connect_async(&mut self, is_shutdown_arc: Arc, handle_function: F, write_tx_am: &Arc>>, write_to_socket_rx: UnboundedReceiver) -> Result<(), Error> where F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync, Future: std::future::Future + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 () { let login_is = self.contains_pr(); let subscription = self.get_subscription(); let address_url = self.address_url.clone(); let label = self.label.clone(); let heartbeat_time = self.heartbeat_time.clone(); let timestamp = Utc::now().timestamp(); //心跳-- 方法内部线程启动 let write_tx_clone1 = Arc::clone(write_tx_am); tokio::spawn(async move { trace!("线程-异步心跳-开始"); let ping_str = json!({ "time" : timestamp, "channel" : "futures.ping", }); AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await; trace!("线程-异步心跳-结束"); }); //设置订阅 let mut subscribe_array = vec![]; if login_is { //登录相关 } for s in subscription { subscribe_array.push(s.to_string()); } //链接 let t2 = tokio::spawn(async move { let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx)); loop { info!("gate_usdt_swap socket 连接中……"); AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(), false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(), Self::message_text, Self::message_ping, Self::message_pong).await; error!("gate_usdt_swap socket 断连,1s以后重连……"); tokio::time::sleep(Duration::from_secs(1)).await; } }); tokio::try_join!(t2).unwrap(); trace!("线程-心跳与链接-结束"); Ok(()) } /*******************************************************************************************************/ /*****************************************数据解析*****************************************************/ /*******************************************************************************************************/ //数据解析-Text pub fn message_text(text: String) -> Option { let response_data = Self::ok_text(text); Option::from(response_data) } //数据解析-ping pub fn message_ping(_pi: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), Value::Null)); } //数据解析-pong pub fn message_pong(_po: Vec) -> Option { return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), Value::Null)); } //数据解析 pub fn ok_text(text: String) -> ResponseData { // trace!("原始数据:{}", text); let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), Value::Null); let json_value: Value = serde_json::from_str(&text).unwrap(); if json_value["channel"].as_str() == Option::from("futures.pong") { res_data.code = "-301".to_string(); res_data.message = "success".to_string(); } else if json_value.get("error").is_some() { let message = json_value["error"]["message"].as_str().unwrap().to_string(); let mes = message.trim_end_matches('\n'); res_data.code = json_value["error"]["code"].to_string(); res_data.message = mes.to_string(); } else if json_value["result"]["status"].as_str() == Option::from("success") {//订阅返回 res_data.code = "-201".to_string(); res_data.data = json_value; } else { res_data.channel = format!("{}", json_value["channel"].as_str().unwrap()); res_data.code = "200".to_string(); res_data.data = json_value["result"].clone(); } res_data } }