|
@@ -0,0 +1,368 @@
|
|
|
|
|
+use std::io::Read;
|
|
|
|
|
+use std::sync::Arc;
|
|
|
|
|
+use std::sync::atomic::AtomicBool;
|
|
|
|
|
+use std::time::Duration;
|
|
|
|
|
+
|
|
|
|
|
+use flate2::read::GzDecoder;
|
|
|
|
|
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
|
|
|
|
|
+use serde_json::json;
|
|
|
|
|
+use serde_json::Value;
|
|
|
|
|
+use tokio::sync::Mutex;
|
|
|
|
|
+use tokio_tungstenite::tungstenite::{Error, Message};
|
|
|
|
|
+use tracing::{error, info, trace};
|
|
|
|
|
+
|
|
|
|
|
+use crate::response_base::ResponseData;
|
|
|
|
|
+use crate::socket_tool::AbstractWsMode;
|
|
|
|
|
+
|
|
|
|
|
+//类型
|
|
|
|
|
+pub enum MexcSwapWsType {
|
|
|
|
|
+ PublicAndPrivate,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug)]
|
|
|
|
|
+#[derive(Clone)]
|
|
|
|
|
+pub struct MexcSwapWsParam {
|
|
|
|
|
+ pub token: String,
|
|
|
|
|
+ pub ws_url: String,
|
|
|
|
|
+ pub ws_ping_interval: i64,
|
|
|
|
|
+ pub ws_ping_timeout: i64,
|
|
|
|
|
+ pub is_ok_subscribe: bool,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+//订阅频道
|
|
|
|
|
+#[derive(Clone)]
|
|
|
|
|
+pub enum MexcSwapSubscribeType {
|
|
|
|
|
+ // 深度
|
|
|
|
|
+ PuFuturesDepth,
|
|
|
|
|
+ // 公开成交
|
|
|
|
|
+ PuFuturesTrades,
|
|
|
|
|
+ // K线数据
|
|
|
|
|
+ PuFuturesRecords,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+//账号信息
|
|
|
|
|
+#[derive(Clone, Debug)]
|
|
|
|
|
+pub struct MexcSwapLogin {
|
|
|
|
|
+ pub access_key: String,
|
|
|
|
|
+ pub secret_key: String,
|
|
|
|
|
+ pub pass_key: String,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Clone)]
|
|
|
|
|
+#[allow(dead_code)]
|
|
|
|
|
+pub struct MexcSwapWs {
|
|
|
|
|
+ //类型
|
|
|
|
|
+ tag: String,
|
|
|
|
|
+ //地址
|
|
|
|
|
+ address_url: String,
|
|
|
|
|
+ //账号
|
|
|
|
|
+ login_param: Option<MexcSwapLogin>,
|
|
|
|
|
+ //登录数据
|
|
|
|
|
+ ws_param: MexcSwapWsParam,
|
|
|
|
|
+ //币对
|
|
|
|
|
+ symbol_s: Vec<String>,
|
|
|
|
|
+ //订阅
|
|
|
|
|
+ subscribe_types: Vec<MexcSwapSubscribeType>,
|
|
|
|
|
+ //心跳间隔
|
|
|
|
|
+ heartbeat_time: u64,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl MexcSwapWs {
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ /*****************************************获取一个对象****************************************************/
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ pub fn new(is_colo: bool, login_param: Option<MexcSwapLogin>, ws_type: MexcSwapWsType) -> MexcSwapWs {
|
|
|
|
|
+ return Self::new_with_tag("default-MexcSwapWs".to_string(), is_colo, login_param, ws_type);
|
|
|
|
|
+ }
|
|
|
|
|
+ pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<MexcSwapLogin>, ws_type: MexcSwapWsType) -> MexcSwapWs {
|
|
|
|
|
+ /*******公共频道-私有频道数据组装*/
|
|
|
|
|
+ let address_url = match ws_type {
|
|
|
|
|
+ MexcSwapWsType::PublicAndPrivate => {
|
|
|
|
|
+ let url = "wss://contract.mexc.com/edge".to_string();
|
|
|
|
|
+ info!("走普通通道(不支持colo通道):{}", url);
|
|
|
|
|
+ url
|
|
|
|
|
+ }
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ /*******公共频道-私有频道数据组装*/
|
|
|
|
|
+ let ws_param = MexcSwapWsParam {
|
|
|
|
|
+ token: "".to_string(),
|
|
|
|
|
+ ws_url: "".to_string(),
|
|
|
|
|
+ ws_ping_interval: 0,
|
|
|
|
|
+ ws_ping_timeout: 0,
|
|
|
|
|
+ is_ok_subscribe: false,
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ if is_colo {
|
|
|
|
|
+ info!("开启高速(未配置,走普通:{})通道",address_url);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ info!("走普通通道:{}",address_url);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ MexcSwapWs {
|
|
|
|
|
+ tag,
|
|
|
|
|
+ address_url,
|
|
|
|
|
+ login_param,
|
|
|
|
|
+ ws_param,
|
|
|
|
|
+ symbol_s: vec![],
|
|
|
|
|
+ subscribe_types: vec![],
|
|
|
|
|
+ heartbeat_time: 1000 * 18,
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ /*****************************************订阅函数********************************************************/
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ //手动添加订阅信息
|
|
|
|
|
+ pub fn set_subscribe(&mut self, subscribe_types: Vec<MexcSwapSubscribeType>) {
|
|
|
|
|
+ self.subscribe_types.extend(subscribe_types);
|
|
|
|
|
+ }
|
|
|
|
|
+ //手动添加币对
|
|
|
|
|
+ pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
|
|
|
|
|
+ for symbol in b_array.iter_mut() {
|
|
|
|
|
+ // 大写
|
|
|
|
|
+ *symbol = symbol.to_uppercase();
|
|
|
|
|
+ // 字符串替换
|
|
|
|
|
+ *symbol = symbol.replace("_", "_");
|
|
|
|
|
+ }
|
|
|
|
|
+ self.symbol_s = b_array;
|
|
|
|
|
+ }
|
|
|
|
|
+ fn contains_pr(&self) -> bool {
|
|
|
|
|
+ for t in self.subscribe_types.clone() {
|
|
|
|
|
+ if match t {
|
|
|
|
|
+ MexcSwapSubscribeType::PuFuturesTrades => false,
|
|
|
|
|
+ MexcSwapSubscribeType::PuFuturesRecords => false,
|
|
|
|
|
+ MexcSwapSubscribeType::PuFuturesDepth => false,
|
|
|
|
|
+ } {
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ false
|
|
|
|
|
+ }
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ /*****************************************工具函数********************************************************/
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ //订阅枚举解析
|
|
|
|
|
+ pub fn enum_to_string(symbol: String, subscribe_type: MexcSwapSubscribeType) -> Value {
|
|
|
|
|
+ match subscribe_type {
|
|
|
|
|
+ MexcSwapSubscribeType::PuFuturesDepth => {//深度
|
|
|
|
|
+ json!({
|
|
|
|
|
+ "method":"sub.depth",
|
|
|
|
|
+ "param":{ "symbol":symbol }
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+ MexcSwapSubscribeType::PuFuturesRecords => {//k线
|
|
|
|
|
+ json!({
|
|
|
|
|
+ "method":"sub.kline",
|
|
|
|
|
+ "param":{
|
|
|
|
|
+ "symbol":symbol,
|
|
|
|
|
+ "interval":"Min1"
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+ MexcSwapSubscribeType::PuFuturesTrades => {//公开成交
|
|
|
|
|
+ json!({
|
|
|
|
|
+ "method": "sub.deal",
|
|
|
|
|
+ "param": {"symbol":symbol}
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ //订阅信息生成
|
|
|
|
|
+ pub fn get_subscription(&self) -> Vec<String> {
|
|
|
|
|
+ let mut array = vec![];
|
|
|
|
|
+ for symbol in &self.symbol_s {
|
|
|
|
|
+ for subscribe_type in &self.subscribe_types {
|
|
|
|
|
+ let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
|
|
|
|
|
+ array.push(ty_str.to_string());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ array
|
|
|
|
|
+ }
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ /*****************************************socket基本*****************************************************/
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ //链接
|
|
|
|
|
+ pub async fn ws_connect_async<F, Future>(&mut self,
|
|
|
|
|
+ is_shutdown_arc: Arc<AtomicBool>,
|
|
|
|
|
+ handle_function: F,
|
|
|
|
|
+ _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
|
|
|
|
|
+ write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
|
|
|
|
|
+ where
|
|
|
|
|
+ F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
|
|
|
|
|
+ Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
|
|
|
|
|
+ {
|
|
|
|
|
+ let login_is = self.contains_pr();
|
|
|
|
|
+ let subscription = self.get_subscription();
|
|
|
|
|
+ let address_url = self.address_url.clone();
|
|
|
|
|
+ let tag = self.tag.clone();
|
|
|
|
|
+ // let heartbeat_time = self.ws_param.ws_ping_interval.clone();
|
|
|
|
|
+
|
|
|
|
|
+ //心跳-- 方法内部线程启动
|
|
|
|
|
+ // let write_tx_clone1 = write_tx_am.clone();
|
|
|
|
|
+ // tokio::spawn(async move {
|
|
|
|
|
+ // trace!("线程-异步心跳-开始");
|
|
|
|
|
+ // AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
|
|
|
|
|
+ // trace!("线程-异步心跳-结束");
|
|
|
|
|
+ // });
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ //设置订阅
|
|
|
|
|
+ let subscribe_array = subscription.clone();
|
|
|
|
|
+ if login_is {
|
|
|
|
|
+ //登录相关
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ //1 链接
|
|
|
|
|
+ let t2 = tokio::spawn(async move {
|
|
|
|
|
+ let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
|
|
|
|
|
+
|
|
|
|
|
+ loop {
|
|
|
|
|
+ info!("Mexc_usdt_swap socket 连接中……");
|
|
|
|
|
+ AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
|
|
|
|
|
+ false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
|
|
|
|
|
+ Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
|
|
|
|
|
+
|
|
|
|
|
+ error!("Mexc_usdt_swap socket 断连,1s以后重连……");
|
|
|
|
|
+ tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ tokio::try_join!(t2).unwrap();
|
|
|
|
|
+ trace!("线程-心跳与链接-结束");
|
|
|
|
|
+
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+ }
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ /*****************************************数据解析*****************************************************/
|
|
|
|
|
+ /*******************************************************************************************************/
|
|
|
|
|
+ //数据解析-Text
|
|
|
|
|
+ pub fn message_text(text: String) -> Option<ResponseData> {
|
|
|
|
|
+ let response_data = Self::ok_text(text);
|
|
|
|
|
+ Option::from(response_data)
|
|
|
|
|
+ }
|
|
|
|
|
+ //数据解析-ping
|
|
|
|
|
+ pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
|
|
|
|
|
+ return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
|
|
|
|
|
+ }
|
|
|
|
|
+ //数据解析-pong
|
|
|
|
|
+ pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
|
|
|
|
|
+ return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
|
|
|
|
|
+ }
|
|
|
|
|
+ //数据解析-二进制
|
|
|
|
|
+ pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
|
|
|
|
|
+ //二进制WebSocket消息
|
|
|
|
|
+ // let message_str = format!("Binary:{:?}", _po);
|
|
|
|
|
+ // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
|
|
|
|
|
+ // let result = String::from_utf8(bytes);
|
|
|
|
|
+ // let result = String::from_utf8(po);
|
|
|
|
|
+
|
|
|
|
|
+ let mut gz_decoder = GzDecoder::new(&po[..]);
|
|
|
|
|
+ let mut decompressed_data = Vec::new();
|
|
|
|
|
+
|
|
|
|
|
+ // 尝试解压数据
|
|
|
|
|
+ if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
|
|
|
|
|
+ // 将解压后的字节向量转换为 UTF-8 字符串
|
|
|
|
|
+ match String::from_utf8(decompressed_data) {
|
|
|
|
|
+ Ok(text) => {
|
|
|
|
|
+ let response_data = Self::ok_text(text);
|
|
|
|
|
+ return Option::from(response_data);
|
|
|
|
|
+ }
|
|
|
|
|
+ Err(_) => {
|
|
|
|
|
+ return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ //数据解析
|
|
|
|
|
+ pub fn ok_text(text: String) -> ResponseData
|
|
|
|
|
+ {
|
|
|
|
|
+ // trace!("原始数据:{:?}",text);
|
|
|
|
|
+ let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
|
|
|
|
|
+ let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
|
|
|
|
|
+
|
|
|
|
|
+ match json_value["channel"].as_str() {
|
|
|
|
|
+ Some(method) => {
|
|
|
|
|
+ if method.contains("pong") {
|
|
|
|
|
+ return ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null);
|
|
|
|
|
+ } else if method.contains("rs.sub.") {
|
|
|
|
|
+ //订阅响应
|
|
|
|
|
+ let data = json_value["data"].as_str().unwrap();
|
|
|
|
|
+ if method.contains(".depth") {
|
|
|
|
|
+ res_data.channel = "futures.order_book".to_string();
|
|
|
|
|
+ } else if method.contains(".kline") {
|
|
|
|
|
+ res_data.channel = "futures.candlesticks".to_string();
|
|
|
|
|
+ } else if method.contains(".deal") {
|
|
|
|
|
+ res_data.channel = "futures.trades".to_string();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ res_data.channel = "未知频道订阅".to_string();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if data == "success" {
|
|
|
|
|
+ res_data.code = -201;
|
|
|
|
|
+ res_data.message = "订阅成功".to_string();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ res_data.code = 400;
|
|
|
|
|
+ res_data.message = "订阅失败".to_string();
|
|
|
|
|
+ }
|
|
|
|
|
+ } else if method.contains("push.") {
|
|
|
|
|
+ if method.contains(".depth") {
|
|
|
|
|
+ res_data.channel = "futures.order_book".to_string();
|
|
|
|
|
+ } else if method.contains(".kline") {
|
|
|
|
|
+ res_data.channel = "futures.candlesticks".to_string();
|
|
|
|
|
+ } else if method.contains(".deal") {
|
|
|
|
|
+ res_data.channel = "futures.trades".to_string();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ res_data.channel = "未知频道推送".to_string();
|
|
|
|
|
+ }
|
|
|
|
|
+ res_data.code = 200;
|
|
|
|
|
+ res_data.data = json_value.clone();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ res_data.code = -1;
|
|
|
|
|
+ res_data.message = "未知解析".to_string();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ None => {
|
|
|
|
|
+ res_data.code = -1;
|
|
|
|
|
+ res_data.message = "未知解析".to_string();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ //
|
|
|
|
|
+ // if json_value["method"].as_str() == Option::from("id1") {}
|
|
|
|
|
+ //
|
|
|
|
|
+ // // { "id": "id1", "code": 0, "msg": "" }
|
|
|
|
|
+ // if json_value["id"].as_str() == Option::from("id1") {
|
|
|
|
|
+ // //订阅
|
|
|
|
|
+ // if json_value["code"].as_i64() == Option::from(0) {
|
|
|
|
|
+ // res_data.code = -201;
|
|
|
|
|
+ // res_data.message = "订阅成功".to_string();
|
|
|
|
|
+ // } else {
|
|
|
|
|
+ // res_data.code = 400;
|
|
|
|
|
+ // res_data.message = "订阅失败".to_string();
|
|
|
|
|
+ // }
|
|
|
|
|
+ // } else if json_value["code"].as_i64() == Option::from(0) {
|
|
|
|
|
+ // res_data.code = 200;
|
|
|
|
|
+ // res_data.data = json_value.clone();
|
|
|
|
|
+ //
|
|
|
|
|
+ // //订阅数据 甄别
|
|
|
|
|
+ // let dataType = json_value["dataType"].as_str().unwrap();
|
|
|
|
|
+ // if dataType.contains("@depth") {
|
|
|
|
|
+ // res_data.channel = "futures.order_book".to_string();
|
|
|
|
|
+ // } else if dataType.contains("@trade") {
|
|
|
|
|
+ // res_data.channel = "futures.trades".to_string();
|
|
|
|
|
+ // } else if dataType.contains("@kline_1m") {
|
|
|
|
|
+ // res_data.channel = "futures.candlesticks".to_string();
|
|
|
|
|
+ // } else {
|
|
|
|
|
+ // res_data.channel = "未知推送数据".to_string();
|
|
|
|
|
+ // }
|
|
|
|
|
+ // } else {
|
|
|
|
|
+ // res_data.code = -1;
|
|
|
|
|
+ // res_data.message = "未知解析".to_string();
|
|
|
|
|
+ // }
|
|
|
|
|
+
|
|
|
|
|
+ res_data
|
|
|
|
|
+ }
|
|
|
|
|
+}
|