mexc_spot_ws.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. use std::fmt::format;
  2. use std::io::Read;
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use std::time::Duration;
  6. use flate2::read::GzDecoder;
  7. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  8. use serde_json::json;
  9. use serde_json::Value;
  10. use tokio::sync::Mutex;
  11. use tokio_tungstenite::tungstenite::{Error, Message};
  12. use tracing::{error, info, trace};
  13. use crate::exchange::response_base::Response;
  14. use crate::exchange::socket_tool::AbstractWsMode;
  15. //类型
  16. pub enum MexcSpotWsType {
  17. PublicAndPrivate,
  18. }
  19. #[derive(Debug)]
  20. #[derive(Clone)]
  21. pub struct MexcSpotWsParam {
  22. pub token: String,
  23. pub ws_url: String,
  24. pub ws_ping_interval: i64,
  25. pub ws_ping_timeout: i64,
  26. pub is_ok_subscribe: bool,
  27. }
  28. //订阅频道
  29. #[derive(Clone)]
  30. pub enum MexcSpotWsSubscribeType {
  31. // 深度
  32. PuFuturesDepth,
  33. // K线数据,Min -> 分钟; Hour -> 小时; Day -> 天; Week -> 周, M -> 月
  34. // Min1
  35. // Min5
  36. // Min15
  37. // Min30
  38. // Min60
  39. // Hour4
  40. // Hour8
  41. // Day1
  42. // Week1
  43. // Month1
  44. PuFuturesRecords(String),
  45. }
  46. //账号信息
  47. #[derive(Clone, Debug)]
  48. pub struct MexcSpotWsLogin {
  49. pub access_key: String,
  50. pub secret_key: String,
  51. pub pass_key: String,
  52. }
  53. #[derive(Clone)]
  54. #[allow(dead_code)]
  55. pub struct MexcSpotWs {
  56. //类型
  57. tag: String,
  58. //地址
  59. address_url: String,
  60. //账号
  61. login_param: Option<MexcSpotWsLogin>,
  62. //登录数据
  63. ws_param: MexcSpotWsParam,
  64. //币对
  65. symbol_s: Vec<String>,
  66. //订阅
  67. subscribe_types: Vec<MexcSpotWsSubscribeType>,
  68. //心跳间隔
  69. heartbeat_time: u64,
  70. }
  71. impl MexcSpotWs {
  72. // ============================================= 构造函数 ================================================
  73. pub fn new_with_tag(tag: String, login_param: Option<MexcSpotWsLogin>, ws_type: MexcSpotWsType) -> MexcSpotWs {
  74. /*******公共频道-私有频道数据组装*/
  75. let address_url = match ws_type {
  76. MexcSpotWsType::PublicAndPrivate => {
  77. let url = "ws://wbs-api.mexc.com/ws".to_string();
  78. url
  79. }
  80. };
  81. /*******公共频道-私有频道数据组装*/
  82. let ws_param = MexcSpotWsParam {
  83. token: "".to_string(),
  84. ws_url: "".to_string(),
  85. ws_ping_interval: 0,
  86. ws_ping_timeout: 0,
  87. is_ok_subscribe: false,
  88. };
  89. MexcSpotWs {
  90. tag,
  91. address_url,
  92. login_param,
  93. ws_param,
  94. symbol_s: vec![],
  95. subscribe_types: vec![],
  96. heartbeat_time: 1000 * 18,
  97. }
  98. }
  99. // ============================================= 订阅函数 ================================================
  100. // 手动添加订阅信息
  101. pub fn set_subscribe(&mut self, subscribe_types: Vec<MexcSpotWsSubscribeType>) {
  102. self.subscribe_types.extend(subscribe_types);
  103. }
  104. // 手动添加币对
  105. pub fn set_symbols(&mut self, mut symbol_array: Vec<String>) {
  106. for symbol in symbol_array.iter_mut() {
  107. // 大写
  108. *symbol = symbol.to_uppercase();
  109. // 字符串替换
  110. *symbol = symbol.replace("_", "_");
  111. }
  112. self.symbol_s = symbol_array;
  113. }
  114. fn contains_pr(&self) -> bool {
  115. for t in self.subscribe_types.clone() {
  116. if match t {
  117. MexcSpotWsSubscribeType::PuFuturesRecords(_) => false,
  118. MexcSpotWsSubscribeType::PuFuturesDepth => false,
  119. } {
  120. return true;
  121. }
  122. }
  123. false
  124. }
  125. // 订阅枚举解析
  126. pub fn enum_to_string(symbol: String, subscribe_type: MexcSpotWsSubscribeType) -> Value {
  127. match subscribe_type {
  128. // 深度
  129. MexcSpotWsSubscribeType::PuFuturesDepth => {
  130. json!({
  131. "method": "SUBSCRIPTION",
  132. "param": [
  133. format!("spot@public.aggre.depth.v3.api.pb@10ms@{symbol}")
  134. ]
  135. })
  136. }
  137. // k线
  138. MexcSpotWsSubscribeType::PuFuturesRecords(interval) => {
  139. json!({
  140. "method": "SUBSCRIPTION",
  141. "param": [
  142. format!("spot@public.kline.v3.api.pb@{symbol}@{interval}")
  143. ]
  144. })
  145. }
  146. }
  147. }
  148. // 订阅信息生成
  149. pub fn get_subscription(&self) -> Vec<String> {
  150. let mut array = vec![];
  151. for symbol in &self.symbol_s {
  152. for subscribe_type in &self.subscribe_types {
  153. let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
  154. array.push(ty_str.to_string());
  155. }
  156. }
  157. array
  158. }
  159. // 链接
  160. pub async fn ws_connect_async<F, Future>(&mut self,
  161. is_shutdown_arc: Arc<AtomicBool>,
  162. handle_function: F,
  163. _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  164. write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
  165. where
  166. F: Fn(Response) -> Future + Clone + Send + 'static + Sync,
  167. Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  168. {
  169. let login_is = self.contains_pr();
  170. let subscription = self.get_subscription();
  171. let address_url = self.address_url.clone();
  172. let tag = self.tag.clone();
  173. // let heartbeat_time = self.ws_param.ws_ping_interval.clone();
  174. //心跳-- 方法内部线程启动
  175. // let write_tx_clone1 = write_tx_am.clone();
  176. // tokio::spawn(async move {
  177. // trace!("线程-异步心跳-开始");
  178. // AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
  179. // trace!("线程-异步心跳-结束");
  180. // });
  181. // 设置订阅
  182. let subscribe_array = subscription.clone();
  183. if login_is {
  184. //登录相关
  185. }
  186. // 链接
  187. let t2 = tokio::spawn(async move {
  188. let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
  189. loop {
  190. info!("Mexc_usdt_swap socket 连接中……");
  191. AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
  192. false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
  193. Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
  194. error!("Mexc_usdt_swap socket 断连,1s以后重连……");
  195. tokio::time::sleep(Duration::from_secs(1)).await;
  196. }
  197. });
  198. tokio::try_join!(t2).unwrap();
  199. trace!("线程-心跳与链接-结束");
  200. Ok(())
  201. }
  202. //数据解析-Text
  203. pub fn message_text(text: String) -> Option<Response> {
  204. let response_data = Self::ok_text(text);
  205. Option::from(response_data)
  206. }
  207. //数据解析-ping
  208. pub fn message_ping(_pi: Vec<u8>) -> Option<Response> {
  209. Option::from(Response::new("".to_string(), -300, "success".to_string(), Value::Null))
  210. }
  211. //数据解析-pong
  212. pub fn message_pong(_po: Vec<u8>) -> Option<Response> {
  213. Option::from(Response::new("".to_string(), -301, "success".to_string(), Value::Null))
  214. }
  215. //数据解析-二进制
  216. pub fn message_binary(po: Vec<u8>) -> Option<Response> {
  217. //二进制WebSocket消息
  218. // let message_str = format!("Binary:{:?}", _po);
  219. // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
  220. // let result = String::from_utf8(bytes);
  221. // let result = String::from_utf8(po);
  222. let mut gz_decoder = GzDecoder::new(&po[..]);
  223. let mut decompressed_data = Vec::new();
  224. // 尝试解压数据
  225. if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
  226. // 将解压后的字节向量转换为 UTF-8 字符串
  227. match String::from_utf8(decompressed_data) {
  228. Ok(text) => {
  229. let response_data = Self::ok_text(text);
  230. return Option::from(response_data);
  231. }
  232. Err(_) => {
  233. return Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
  234. }
  235. }
  236. } else {
  237. return Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
  238. }
  239. }
  240. //数据解析
  241. pub fn ok_text(text: String) -> Response
  242. {
  243. let mut res_data = Response::new("".to_string(), 200, "success".to_string(), Value::Null);
  244. let json_value: Value = serde_json::from_str(&text).unwrap();
  245. match json_value["channel"].as_str() {
  246. Some(method) => {
  247. if method.contains("pong") {
  248. return Response::new("".to_string(), -301, "success".to_string(), Value::Null);
  249. } else if method.contains("rs.sub.") {
  250. //订阅响应
  251. let data = json_value["data"].as_str().unwrap();
  252. if method.contains(".depth") {
  253. res_data.channel = "futures.order_book".to_string();
  254. } else if method.contains(".kline") {
  255. res_data.channel = "futures.candlesticks".to_string();
  256. } else if method.contains(".deal") {
  257. res_data.channel = "futures.trades".to_string();
  258. } else {
  259. res_data.channel = "未知频道订阅".to_string();
  260. }
  261. if data == "success" {
  262. res_data.code = -201;
  263. res_data.message = "订阅成功".to_string();
  264. } else {
  265. res_data.code = 400;
  266. res_data.message = "订阅失败".to_string();
  267. }
  268. } else if method.contains("push.") {
  269. if method.contains(".depth") {
  270. res_data.channel = "futures.order_book".to_string();
  271. } else if method.contains(".kline") {
  272. res_data.channel = "futures.candlesticks".to_string();
  273. } else {
  274. res_data.channel = "未知频道推送".to_string();
  275. }
  276. res_data.code = 200;
  277. res_data.data = json_value.clone();
  278. } else {
  279. res_data.code = -1;
  280. res_data.message = "未知解析".to_string();
  281. }
  282. }
  283. None => {
  284. res_data.code = -1;
  285. res_data.message = "未知解析".to_string();
  286. }
  287. }
  288. res_data
  289. }
  290. }
  291. #[cfg(test)]
  292. mod tests {
  293. use std::sync::Arc;
  294. use std::sync::atomic::AtomicBool;
  295. use tokio::sync::Mutex;
  296. use tokio_tungstenite::tungstenite::Message;
  297. use tracing::info;
  298. use crate::exchange::mexc_spot_ws::{MexcSpotWs, MexcSpotWsSubscribeType, MexcSpotWsType};
  299. use crate::exchange::response_base::Response;
  300. use crate::utils::log_setup::setup_logging;
  301. #[tokio::test]
  302. async fn test_mexc_spot_ws() {
  303. let ws_running = Arc::new(AtomicBool::new(true));
  304. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  305. let _guard = setup_logging().unwrap();
  306. let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
  307. ws.set_subscribe(vec![
  308. MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string())
  309. ]);
  310. ws.set_symbols(vec!["BTCUSDT".to_string()]);
  311. let fun = move |response: Response| {
  312. info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
  313. async move {}
  314. };
  315. // 链接
  316. info!("开始链接");
  317. let write_tx_am = Arc::new(Mutex::new(write_tx));
  318. ws.ws_connect_async(ws_running, fun, &write_tx_am, write_rx)
  319. .await
  320. .expect("链接失败");
  321. }
  322. }