mexc_spot_ws.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. use std::fmt::format;
  2. use std::fs::File;
  3. use std::io::{Read, Write};
  4. use std::path::Path;
  5. use std::sync::Arc;
  6. use std::sync::atomic::AtomicBool;
  7. use std::time::Duration;
  8. use flate2::read::GzDecoder;
  9. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  10. use prost::Message as ProstMessage;
  11. use serde_json::json;
  12. use serde_json::Value;
  13. use tokio::sync::Mutex;
  14. use tokio_tungstenite::tungstenite::{Error, Message};
  15. use tracing::{error, info, trace};
  16. use crate::exchange::response_base::Response;
  17. use crate::exchange::socket_tool::AbstractWsMode;
  18. //类型
  19. pub enum MexcSpotWsType {
  20. PublicAndPrivate,
  21. }
  22. pub mod mexc_spot {
  23. include!(concat!(env!("OUT_DIR"), "/_.rs"));
  24. }
  25. use mexc_spot::PublicSpotKlineV3ApiMessage;
  26. use mexc_spot::KlineDataV3;
  27. // 引入新的结构体
  28. use mexc_spot::PublicIncreaseDepthsV3ApiMessage;
  29. use mexc_spot::DepthDataContentV3;
  30. use mexc_spot::DepthItemV3;
  31. #[derive(Debug)]
  32. #[derive(Clone)]
  33. pub struct MexcSpotWsParam {
  34. pub token: String,
  35. pub ws_url: String,
  36. pub ws_ping_interval: i64,
  37. pub ws_ping_timeout: i64,
  38. pub is_ok_subscribe: bool,
  39. }
  40. //订阅频道
  41. #[derive(Clone)]
  42. pub enum MexcSpotWsSubscribeType {
  43. // 深度
  44. PuFuturesDepth,
  45. // K线数据,Min -> 分钟; Hour -> 小时; Day -> 天; Week -> 周, M -> 月
  46. // Min1
  47. // Min5
  48. // Min15
  49. // Min30
  50. // Min60
  51. // Hour4
  52. // Hour8
  53. // Day1
  54. // Week1
  55. // Month1
  56. PuFuturesRecords(String),
  57. }
  58. //账号信息
  59. #[derive(Clone, Debug)]
  60. pub struct MexcSpotWsLogin {
  61. pub access_key: String,
  62. pub secret_key: String,
  63. pub pass_key: String,
  64. }
  65. #[derive(Clone)]
  66. #[allow(dead_code)]
  67. pub struct MexcSpotWs {
  68. //类型
  69. tag: String,
  70. //地址
  71. address_url: String,
  72. //账号
  73. login_param: Option<MexcSpotWsLogin>,
  74. //登录数据
  75. ws_param: MexcSpotWsParam,
  76. //币对
  77. symbol_s: Vec<String>,
  78. //订阅
  79. subscribe_types: Vec<MexcSpotWsSubscribeType>,
  80. //心跳间隔
  81. heartbeat_time: u64,
  82. }
  83. impl MexcSpotWs {
  84. // ============================================= 构造函数 ================================================
  85. pub fn new_with_tag(tag: String, login_param: Option<MexcSpotWsLogin>, ws_type: MexcSpotWsType) -> MexcSpotWs {
  86. /*******公共频道-私有频道数据组装*/
  87. let address_url = match ws_type {
  88. MexcSpotWsType::PublicAndPrivate => {
  89. let url = "wss://wbs-api.mexc.com/ws".to_string();
  90. url
  91. }
  92. };
  93. /*******公共频道-私有频道数据组装*/
  94. let ws_param = MexcSpotWsParam {
  95. token: "".to_string(),
  96. ws_url: "".to_string(),
  97. ws_ping_interval: 0,
  98. ws_ping_timeout: 0,
  99. is_ok_subscribe: false,
  100. };
  101. MexcSpotWs {
  102. tag,
  103. address_url,
  104. login_param,
  105. ws_param,
  106. symbol_s: vec![],
  107. subscribe_types: vec![],
  108. heartbeat_time: 1000 * 18,
  109. }
  110. }
  111. // ============================================= 订阅函数 ================================================
  112. // 手动添加订阅信息
  113. pub fn set_subscribe(&mut self, subscribe_types: Vec<MexcSpotWsSubscribeType>) {
  114. self.subscribe_types.extend(subscribe_types);
  115. }
  116. // 手动添加币对
  117. pub fn set_symbols(&mut self, mut symbol_array: Vec<String>) {
  118. for symbol in symbol_array.iter_mut() {
  119. // 大写
  120. *symbol = symbol.to_uppercase();
  121. // 字符串替换
  122. *symbol = symbol.replace("_", "");
  123. }
  124. self.symbol_s = symbol_array;
  125. }
  126. fn contains_pr(&self) -> bool {
  127. for t in self.subscribe_types.clone() {
  128. if match t {
  129. MexcSpotWsSubscribeType::PuFuturesRecords(_) => false,
  130. MexcSpotWsSubscribeType::PuFuturesDepth => false,
  131. } {
  132. return true;
  133. }
  134. }
  135. false
  136. }
  137. // 订阅枚举解析
  138. pub fn enum_to_string(symbol: String, subscribe_type: MexcSpotWsSubscribeType) -> Value {
  139. match subscribe_type {
  140. // 深度
  141. MexcSpotWsSubscribeType::PuFuturesDepth => {
  142. json!({
  143. "method": "SUBSCRIPTION",
  144. "params": [
  145. format!("spot@public.aggre.depth.v3.api.pb@10ms@{symbol}")
  146. ]
  147. })
  148. }
  149. // k线
  150. MexcSpotWsSubscribeType::PuFuturesRecords(interval) => {
  151. json!({
  152. "method": "SUBSCRIPTION",
  153. "params": [
  154. format!("spot@public.kline.v3.api.pb@{symbol}@{interval}")
  155. ]
  156. })
  157. }
  158. }
  159. }
  160. // 订阅信息生成
  161. pub fn get_subscription(&self) -> Vec<String> {
  162. let mut array = vec![];
  163. for symbol in &self.symbol_s {
  164. for subscribe_type in &self.subscribe_types {
  165. let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
  166. array.push(ty_str.to_string());
  167. }
  168. }
  169. array
  170. }
  171. // 链接
  172. pub async fn ws_connect_async<F, Future>(&mut self,
  173. is_shutdown_arc: Arc<AtomicBool>,
  174. handle_function: F,
  175. _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  176. write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
  177. where
  178. F: Fn(Response) -> Future + Clone + Send + 'static + Sync,
  179. Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  180. {
  181. let login_is = self.contains_pr();
  182. let subscription = self.get_subscription();
  183. let address_url = self.address_url.clone();
  184. let tag = self.tag.clone();
  185. // let heartbeat_time = self.ws_param.ws_ping_interval.clone();
  186. //心跳-- 方法内部线程启动
  187. // let write_tx_clone1 = write_tx_am.clone();
  188. // tokio::spawn(async move {
  189. // trace!("线程-异步心跳-开始");
  190. // AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
  191. // trace!("线程-异步心跳-结束");
  192. // });
  193. // 设置订阅
  194. let subscribe_array = subscription.clone();
  195. if login_is {
  196. //登录相关
  197. }
  198. // 链接
  199. let t2 = tokio::spawn(async move {
  200. let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
  201. loop {
  202. info!("Mexc_usdt_swap socket 连接中……");
  203. AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
  204. false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
  205. Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
  206. error!("Mexc_usdt_swap socket 断连,1s以后重连……");
  207. tokio::time::sleep(Duration::from_secs(1)).await;
  208. }
  209. });
  210. tokio::try_join!(t2).unwrap();
  211. trace!("线程-心跳与链接-结束");
  212. Ok(())
  213. }
  214. //数据解析-Text
  215. pub fn message_text(text: String) -> Option<Response> {
  216. let response_data = Self::ok_text(text);
  217. Option::from(response_data)
  218. }
  219. //数据解析-ping
  220. pub fn message_ping(_pi: Vec<u8>) -> Option<Response> {
  221. Option::from(Response::new("".to_string(), -300, "success".to_string(), Value::Null))
  222. }
  223. //数据解析-pong
  224. pub fn message_pong(_po: Vec<u8>) -> Option<Response> {
  225. Option::from(Response::new("".to_string(), -301, "success".to_string(), Value::Null))
  226. }
  227. //数据解析-二进制
  228. pub fn message_binary(po: Vec<u8>) -> Option<Response> {
  229. // info!("Received binary message ({} bytes)", po.len());
  230. // 1. 尝试用新的顶层消息结构 PublicSpotKlineV3ApiMessage 来解析 K 线数据
  231. // 根据 Topic 前缀判断依然有效,但现在是判断是否**可能**是 K 线相关消息
  232. let prefix_len = po.len().min(100);
  233. let prefix_string = String::from_utf8_lossy(&po[..prefix_len]);
  234. if prefix_string.contains("spot@public.kline.v3.api.pb") {
  235. // info!("通过 Topic 前缀判断为 K 线数据相关消息");
  236. // 尝试解析为 PublicSpotKlineV3ApiMessage
  237. match PublicSpotKlineV3ApiMessage::decode(&po[..]) {
  238. Ok(kline_message) => {
  239. // info!("成功解析为顶层 K 线消息结构");
  240. // 检查是否包含嵌套的 KlineDataV3 字段 (Tag 308)
  241. if let Some(kline_data) = kline_message.kline_data { // 注意这里 PublicSpotKlineV3ApiMessage 的 kline_data 字段是 Option<KlineDataV3>
  242. // info!("找到并成功访问嵌套的 KlineDataV3");
  243. // 现在 kline_data 是 KlineDataV3 结构体,你可以使用它了!
  244. // 填充 Response 并返回 (省略详细实现)
  245. let response_data = Response::new(
  246. kline_message.topic_info.clone(), // 使用解析到的 Topic 信息
  247. 200,
  248. "OK".to_string(),
  249. json!({
  250. "interval": kline_data.interval,
  251. "windowStart": kline_data.window_start, //注意 snake_case
  252. "openingPrice": kline_data.opening_price,
  253. "closingPrice": kline_data.closing_price,
  254. "highestPrice": kline_data.highest_price,
  255. "lowestPrice": kline_data.lowest_price,
  256. "volume": kline_data.volume,
  257. "amount": kline_data.amount,
  258. "windowEnd": kline_data.window_end,
  259. // 可以添加顶层字段的信息,如果需要
  260. "topic_info": kline_message.topic_info,
  261. "symbol": kline_message.symbol,
  262. "id_info": kline_message.id_info,
  263. "timestamp_or_version": kline_message.timestamp_or_version,
  264. })
  265. );
  266. return Some(response_data);
  267. } else {
  268. info!("顶层 K 线消息结构解析成功,但未找到嵌套的 kline_data 字段 (Tag 308)");
  269. // 这可能是一个只有顶层字段的控制消息
  270. return Some(Response::new(
  271. kline_message.topic_info.clone(), // 使用解析到的 Topic 信息
  272. 200,
  273. "OK (Control Message)".to_string(),
  274. json!({
  275. "topic_info": kline_message.topic_info,
  276. "symbol": kline_message.symbol,
  277. "id_info": kline_message.id_info,
  278. "timestamp_or_version": kline_message.timestamp_or_version,
  279. })
  280. ));
  281. }
  282. }
  283. Err(e) => {
  284. error!("尝试解析为 PublicSpotKlineV3ApiMessage 失败: {:?}", e);
  285. return Some(Response::new("".to_string(), 500, format!("Protobuf K 线顶层消息解析出错: {:?}", e), Value::Null));
  286. }
  287. }
  288. }
  289. // 2. 尝试解析深度数据 (使用新的结构体)
  290. if prefix_string.contains("spot@public.aggre.depth.v3.api.pb") {
  291. // info!("通过 Topic 前缀判断为深度数据");
  292. // 尝试解析为 PublicIncreaseDepthsV3ApiMessage (新的顶层深度消息)
  293. match PublicIncreaseDepthsV3ApiMessage::decode(&po[..]) {
  294. Ok(depth_message) => {
  295. // info!("成功解析为顶层深度消息结构");
  296. // 检查是否包含嵌套的 depth_data 字段 (Tag 313)
  297. if let Some(depth_data_content) = depth_message.depth_data {
  298. // info!("找到并成功访问嵌套的 DepthDataContentV3");
  299. // 填充 Response 并返回
  300. let response_data = Response::new(
  301. depth_message.topic_info.clone(), // 使用解析到的 Topic
  302. 200, "OK".to_string(),
  303. serde_json::json!({
  304. // 嵌套消息内部的字段
  305. "asks": depth_data_content.asks.into_iter().map(|item| serde_json::json!({"price": item.price, "quantity": item.quantity})).collect::<Vec<_>>(),
  306. "bids": depth_data_content.bids.into_iter().map(|item| serde_json::json!({"price": item.price, "quantity": item.quantity})).collect::<Vec<_>>(),
  307. "eventType": depth_data_content.event_type,
  308. "version": depth_data_content.version,
  309. "lastUpdateId": depth_data_content.last_update_id, // 新增字段
  310. // 顶层字段
  311. "topic_info": depth_message.topic_info,
  312. "symbol": depth_message.symbol,
  313. "timestamp": depth_message.timestamp, // 新增字段
  314. })
  315. );
  316. return Some(response_data);
  317. } else {
  318. info!("顶层深度消息结构解析成功,但未找到嵌套的 depth_data 字段 (Tag 313)");
  319. // 处理只有顶层字段的深度相关消息
  320. return Some(Response::new(
  321. depth_message.topic_info.clone(),
  322. 200, "OK (Control Message)".to_string(),
  323. serde_json::json!({
  324. "topic_info": depth_message.topic_info,
  325. "symbol": depth_message.symbol,
  326. "timestamp": depth_message.timestamp,
  327. })
  328. ));
  329. }
  330. }
  331. Err(e) => {
  332. error!("解析深度消息 PublicIncreaseDepthsV3ApiMessage 失败: {:?}", e);
  333. // 保存数据以供分析
  334. let file_path = Path::new("depth_error_data.bin");
  335. // ... 保存 po 到文件 ...
  336. return Some(Response::new("".to_string(), 500, format!("Protobuf 深度消息解析出错: {:?}", e), Value::Null));
  337. }
  338. }
  339. }
  340. // 如果都不是已知的 Protobuf 类型,处理未知消息
  341. error!("无法将二进制消息解析为任何已知 Protobuf 类型");
  342. // *** 在这里将原始二进制数据保存到文件 ***
  343. let file_path = Path::new("un_decode.bin");
  344. match File::create(&file_path) {
  345. Ok(mut file) => {
  346. match file.write_all(&po) {
  347. Ok(_) => info!("原始 K 线二进制数据保存到 {:?}", file_path),
  348. Err(write_e) => error!("保存 K 线二进制数据失败: {:?}", write_e),
  349. }
  350. }
  351. Err(create_e) => error!("创建文件 {:?} 失败: {:?}", file_path, create_e),
  352. }
  353. Some(Response::new("".to_string(), 400, "无法解析未知二进制消息".to_string(), Value::Null))
  354. }
  355. //数据解析
  356. pub fn ok_text(text: String) -> Response
  357. {
  358. info!("{}", text);
  359. let mut res_data = Response::new("".to_string(), 200, "success".to_string(), Value::Null);
  360. let json_value: Value = serde_json::from_str(&text).unwrap();
  361. match json_value["channel"].as_str() {
  362. Some(method) => {
  363. if method.contains("pong") {
  364. return Response::new("".to_string(), -301, "success".to_string(), Value::Null);
  365. } else if method.contains("rs.sub.") {
  366. //订阅响应
  367. let data = json_value["data"].as_str().unwrap();
  368. if method.contains(".depth") {
  369. res_data.channel = "futures.order_book".to_string();
  370. } else if method.contains(".kline") {
  371. res_data.channel = "futures.candlesticks".to_string();
  372. } else if method.contains(".deal") {
  373. res_data.channel = "futures.trades".to_string();
  374. } else {
  375. res_data.channel = "未知频道订阅".to_string();
  376. }
  377. if data == "success" {
  378. res_data.code = -201;
  379. res_data.message = "订阅成功".to_string();
  380. } else {
  381. res_data.code = 400;
  382. res_data.message = "订阅失败".to_string();
  383. }
  384. } else if method.contains("push.") {
  385. if method.contains(".depth") {
  386. res_data.channel = "futures.order_book".to_string();
  387. } else if method.contains(".kline") {
  388. res_data.channel = "futures.candlesticks".to_string();
  389. } else {
  390. res_data.channel = "未知频道推送".to_string();
  391. }
  392. res_data.code = 200;
  393. res_data.data = json_value.clone();
  394. } else {
  395. res_data.code = -1;
  396. res_data.message = "未知解析".to_string();
  397. }
  398. }
  399. None => {
  400. res_data.data = json_value.clone();
  401. res_data.code = -1;
  402. res_data.message = "未知解析".to_string();
  403. }
  404. }
  405. res_data
  406. }
  407. }
  408. #[cfg(test)]
  409. mod tests {
  410. use std::sync::Arc;
  411. use std::sync::atomic::AtomicBool;
  412. use tokio::sync::Mutex;
  413. use tokio_tungstenite::tungstenite::Message;
  414. use tracing::info;
  415. use crate::exchange::mexc_spot_ws::{MexcSpotWs, MexcSpotWsSubscribeType, MexcSpotWsType};
  416. use crate::exchange::response_base::Response;
  417. use crate::utils::log_setup::setup_logging;
  418. #[tokio::test]
  419. async fn test_mexc_spot_ws() {
  420. let ws_running = Arc::new(AtomicBool::new(true));
  421. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  422. let _guard = setup_logging().unwrap();
  423. let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
  424. ws.set_subscribe(vec![
  425. MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
  426. MexcSpotWsSubscribeType::PuFuturesRecords("Min3".to_string()),
  427. MexcSpotWsSubscribeType::PuFuturesDepth
  428. ]);
  429. ws.set_symbols(vec!["BTC_USDT".to_string()]);
  430. let fun = move |response: Response| {
  431. info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
  432. async move {}
  433. };
  434. // 链接
  435. info!("开始链接");
  436. let write_tx_am = Arc::new(Mutex::new(write_tx));
  437. ws.ws_connect_async(ws_running, fun, &write_tx_am, write_rx)
  438. .await
  439. .expect("链接失败");
  440. }
  441. }