mexc_usdt_swap.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. use std::collections::BTreeMap;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use rust_decimal::Decimal;
  5. use tokio::spawn;
  6. use tokio::sync::Mutex;
  7. use tracing::{error};
  8. use tokio_tungstenite::tungstenite::Message;
  9. use exchanges::mexc_swap_ws::{MexcSwapLogin, MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
  10. use exchanges::response_base::ResponseData;
  11. use global::trace_stack::TraceStack;
  12. use standard::exchange::ExchangeEnum::MexcSwap;
  13. use standard::exchange_struct_handler::ExchangeStructHandler;
  14. use standard::{Depth, OrderBook, Position, PositionModeEnum};
  15. use crate::core::Core;
  16. use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
  17. use crate::model::OrderInfo;
  18. pub async fn reference_mexc_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  19. core_arc: Arc<Mutex<Core>>,
  20. name: String,
  21. symbols: Vec<String>,
  22. is_colo: bool) {
  23. spawn(async move {
  24. // 开启公共频道
  25. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  26. let mut ws = MexcSwapWs::new_with_tag(name, is_colo, None, MexcSwapWsType::PublicAndPrivate);
  27. ws.set_subscribe(vec![
  28. MexcSwapSubscribeType::PuFuturesDepthFull(5),
  29. ]);
  30. // 读取数据
  31. let core_arc_clone = Arc::clone(&core_arc);
  32. let rest = core_arc_clone.lock().await.platform_rest.clone_box();
  33. let multiplier = rest.get_self_market().multiplier;
  34. // let mut records = rest.get_record("1".to_string()).await.unwrap();
  35. // for record in records.iter_mut() {
  36. // let core_arc_clone = core_arc.clone();
  37. //
  38. // on_record(core_arc_clone, record).await
  39. // }
  40. let depth_asks = Arc::new(Mutex::new(Vec::new()));
  41. let depth_bids = Arc::new(Mutex::new(Vec::new()));
  42. let fun = move |data: ResponseData| {
  43. // 在 async 块之前克隆 Arc
  44. let core_arc_cc = core_arc_clone.clone();
  45. let mul = multiplier.clone();
  46. let depth_asks = Arc::clone(&depth_asks);
  47. let depth_bids = Arc::clone(&depth_bids);
  48. async move {
  49. let mut depth_asks = depth_asks.lock().await;
  50. let mut depth_bids = depth_bids.lock().await;
  51. // 使用克隆后的 Arc,避免 move 语义
  52. on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
  53. }
  54. };
  55. // 链接
  56. let write_tx_am = Arc::new(Mutex::new(write_tx));
  57. ws.set_symbols(symbols);
  58. ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("mexc_usdt_swap 链接有异常");
  59. });
  60. }
  61. // 交易 mexc 合约 启动
  62. pub(crate) async fn mexc_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  63. core_arc: Arc<Mutex<Core>>,
  64. name: String,
  65. symbols: Vec<String>,
  66. is_colo: bool,
  67. exchange_params: BTreeMap<String, String>) {
  68. reference_mexc_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo).await;
  69. spawn(async move {
  70. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  71. let auth = Some(parse_btree_map_to_mexc_swap_login(exchange_params));
  72. let mut ws = MexcSwapWs::new_with_tag(name.clone(), is_colo, auth, MexcSwapWsType::PublicAndPrivate);
  73. ws.set_subscribe(vec![
  74. // mexcSwapSubscribeType::PrOrders,
  75. // mexcSwapSubscribeType::PrAccount,
  76. // mexcSwapSubscribeType::PrPosition
  77. ]);
  78. let core_arc_clone_private = core_arc.clone();
  79. let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
  80. let run_symbol = symbols.clone()[0].clone();
  81. // 挂起私有ws
  82. let fun = move |data: ResponseData| {
  83. // 在 async 块之前克隆 Arc
  84. let core_arc_cc = core_arc_clone_private.clone();
  85. let mul = multiplier.clone();
  86. let rs = run_symbol.clone();
  87. async move {
  88. // 使用克隆后的 Arc,避免 move 语义
  89. on_private_data(core_arc_cc, &mul, &rs, &data).await;
  90. }
  91. };
  92. // 链接
  93. let write_tx_am = Arc::new(Mutex::new(write_tx));
  94. ws.set_symbols(symbols);
  95. ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("mexc_usdt_swap 链接有异常");
  96. });
  97. }
  98. async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
  99. let mut trace_stack = TraceStack::new(response.time, response.ins);
  100. trace_stack.on_after_span_line();
  101. match response.channel.as_str() {
  102. "wallet" => {
  103. let account = ExchangeStructHandler::account_info_handle(MexcSwap, response, run_symbol);
  104. let mut core = core_arc_clone.lock().await;
  105. core.update_equity(account).await;
  106. }
  107. "order" => {
  108. let orders = ExchangeStructHandler::order_handle(MexcSwap, response, ct_val);
  109. trace_stack.on_after_format();
  110. let mut order_infos: Vec<OrderInfo> = Vec::new();
  111. for mut order in orders.order {
  112. if order.status == "NULL" {
  113. error!("mexc_usdt_swap 未识别的订单状态:{:?}", response);
  114. continue;
  115. }
  116. // if order.deal_amount != Decimal::ZERO {
  117. // info!("mexc order 消息原文:{:?}", response);
  118. // }
  119. let order_info = OrderInfo::parse_order_to_order_info(&mut order);
  120. order_infos.push(order_info);
  121. }
  122. let mut core = core_arc_clone.lock().await;
  123. core.update_order(order_infos, trace_stack).await;
  124. }
  125. "position" => {
  126. let mut positions = ExchangeStructHandler::position_handle(MexcSwap, response, ct_val);
  127. let mut core = core_arc_clone.lock().await;
  128. if positions.is_empty() {
  129. positions.push(Position {
  130. symbol: run_symbol.to_string(),
  131. margin_level: Default::default(),
  132. amount: Default::default(),
  133. frozen_amount: Default::default(),
  134. price: Default::default(),
  135. profit: Default::default(),
  136. position_mode: PositionModeEnum::Both,
  137. margin: Default::default(),
  138. });
  139. }
  140. core.update_position(positions).await;
  141. }
  142. _ => {
  143. error!("未知推送类型");
  144. error!(?response);
  145. }
  146. }
  147. }
  148. async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>) {
  149. let mut trace_stack = TraceStack::new(response.time, response.ins);
  150. trace_stack.on_after_span_line();
  151. match response.channel.as_str() {
  152. "orderbook" => {
  153. trace_stack.set_source("mexc_usdt_swap.bookTicker".to_string());
  154. let mut is_update = false;
  155. if response.data_type == "delta" {
  156. is_update = true;
  157. }
  158. let mut depth = ExchangeStructHandler::book_ticker_handle(MexcSwap, &response, mul);
  159. // 是增量更新
  160. if is_update {
  161. if depth.asks.len() != 0 {
  162. depth_asks.clear();
  163. depth_asks.append(&mut depth.asks);
  164. } else if depth.bids.len() != 0 {
  165. depth_bids.clear();
  166. depth_bids.append(&mut depth.bids);
  167. }
  168. let result_depth = Depth {
  169. time: depth.time,
  170. symbol: depth.symbol,
  171. asks: depth_asks.clone(),
  172. bids: depth_bids.clone(),
  173. };
  174. trace_stack.on_after_format();
  175. on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 0).await;
  176. }
  177. // 全量
  178. else {
  179. trace_stack.on_after_format();
  180. on_depth(core_arc, &response.label, &mut trace_stack, &depth, 0).await;
  181. depth_asks.clear();
  182. depth_asks.append(&mut depth.asks);
  183. depth_bids.clear();
  184. depth_bids.append(&mut depth.bids);
  185. }
  186. }
  187. "trade" => {
  188. trace_stack.set_source("mexc_usdt_swap.trade".to_string());
  189. let mut trades = ExchangeStructHandler::trades_handle(MexcSwap, response, mul);
  190. trace_stack.on_after_format();
  191. for trade in trades.iter_mut() {
  192. let core_arc_clone = core_arc.clone();
  193. on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade, 0).await;
  194. }
  195. }
  196. "tickers" => {
  197. trace_stack.set_source("mexc_usdt_swap.tickers".to_string());
  198. let ticker = ExchangeStructHandler::ticker_handle(MexcSwap, response).await;
  199. trace_stack.on_after_format();
  200. on_ticker(core_arc, &mut trace_stack, &ticker).await;
  201. }
  202. // k线数据
  203. "kline" => {
  204. let mut records = ExchangeStructHandler::records_handle(MexcSwap, &response);
  205. if records.is_empty() {
  206. return;
  207. }
  208. for record in records.iter_mut() {
  209. let core_arc_clone = core_arc.clone();
  210. on_record(core_arc_clone, record).await
  211. }
  212. }
  213. _ => {
  214. error!("未知推送类型");
  215. error!(?response);
  216. }
  217. }
  218. }
  219. fn parse_btree_map_to_mexc_swap_login(exchange_params: BTreeMap<String, String>) -> MexcSwapLogin {
  220. MexcSwapLogin {
  221. access_key: exchange_params.get("access_key").unwrap().clone(),
  222. secret_key: exchange_params.get("secret_key").unwrap().clone(),
  223. pass_key: "".to_string(),
  224. }
  225. }