||
- use std::collections::BTreeMap;
- use std::sync::Arc;
- use std::sync::atomic::AtomicBool;
- use rust_decimal::Decimal;
- use tokio::spawn;
- use tokio::sync::Mutex;
- use tracing::{error};
- use tokio_tungstenite::tungstenite::Message;
- use exchanges::mexc_swap_ws::{MexcSwapLogin, MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
- use exchanges::response_base::ResponseData;
- use global::trace_stack::TraceStack;
- use standard::exchange::ExchangeEnum::MexcSwap;
- use standard::exchange_struct_handler::ExchangeStructHandler;
- use standard::{Depth, OrderBook, Position, PositionModeEnum};
- use crate::core::Core;
- use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
- use crate::model::OrderInfo;
- pub async fn reference_mexc_swap_run(is_shutdown_arc: Arc<AtomicBool>,
- core_arc: Arc<Mutex<Core>>,
- name: String,
- symbols: Vec<String>,
- is_colo: bool) {
- spawn(async move {
- // 开启公共频道
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
- let mut ws = MexcSwapWs::new_with_tag(name, is_colo, None, MexcSwapWsType::PublicAndPrivate);
- ws.set_subscribe(vec![
- MexcSwapSubscribeType::PuFuturesDepthFull(5),
- ]);
- // 读取数据
- let core_arc_clone = Arc::clone(&core_arc);
- let rest = core_arc_clone.lock().await.platform_rest.clone_box();
- let multiplier = rest.get_self_market().multiplier;
- // let mut records = rest.get_record("1".to_string()).await.unwrap();
- // for record in records.iter_mut() {
- // let core_arc_clone = core_arc.clone();
- //
- // on_record(core_arc_clone, record).await
- // }
- let depth_asks = Arc::new(Mutex::new(Vec::new()));
- let depth_bids = Arc::new(Mutex::new(Vec::new()));
- let fun = move |data: ResponseData| {
- // 在 async 块之前克隆 Arc
- let core_arc_cc = core_arc_clone.clone();
- let mul = multiplier.clone();
- let depth_asks = Arc::clone(&depth_asks);
- let depth_bids = Arc::clone(&depth_bids);
- async move {
- let mut depth_asks = depth_asks.lock().await;
- let mut depth_bids = depth_bids.lock().await;
- // 使用克隆后的 Arc,避免 move 语义
- on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
- }
- };
- // 链接
- let write_tx_am = Arc::new(Mutex::new(write_tx));
- ws.set_symbols(symbols);
- ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("mexc_usdt_swap 链接有异常");
- });
- }
- // 交易 mexc 合约 启动
- pub(crate) async fn mexc_swap_run(is_shutdown_arc: Arc<AtomicBool>,
- core_arc: Arc<Mutex<Core>>,
- name: String,
- symbols: Vec<String>,
- is_colo: bool,
- exchange_params: BTreeMap<String, String>) {
- reference_mexc_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo).await;
- spawn(async move {
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
- let auth = Some(parse_btree_map_to_mexc_swap_login(exchange_params));
- let mut ws = MexcSwapWs::new_with_tag(name.clone(), is_colo, auth, MexcSwapWsType::PublicAndPrivate);
- ws.set_subscribe(vec![
- // mexcSwapSubscribeType::PrOrders,
- // mexcSwapSubscribeType::PrAccount,
- // mexcSwapSubscribeType::PrPosition
- ]);
- let core_arc_clone_private = core_arc.clone();
- let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
- let run_symbol = symbols.clone()[0].clone();
- // 挂起私有ws
- let fun = move |data: ResponseData| {
- // 在 async 块之前克隆 Arc
- let core_arc_cc = core_arc_clone_private.clone();
- let mul = multiplier.clone();
- let rs = run_symbol.clone();
- async move {
- // 使用克隆后的 Arc,避免 move 语义
- on_private_data(core_arc_cc, &mul, &rs, &data).await;
- }
- };
- // 链接
- let write_tx_am = Arc::new(Mutex::new(write_tx));
- ws.set_symbols(symbols);
- ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("mexc_usdt_swap 链接有异常");
- });
- }
- async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
- let mut trace_stack = TraceStack::new(response.time, response.ins);
- trace_stack.on_after_span_line();
- match response.channel.as_str() {
- "wallet" => {
- let account = ExchangeStructHandler::account_info_handle(MexcSwap, response, run_symbol);
- let mut core = core_arc_clone.lock().await;
- core.update_equity(account).await;
- }
- "order" => {
- let orders = ExchangeStructHandler::order_handle(MexcSwap, response, ct_val);
- trace_stack.on_after_format();
- let mut order_infos: Vec<OrderInfo> = Vec::new();
- for mut order in orders.order {
- if order.status == "NULL" {
- error!("mexc_usdt_swap 未识别的订单状态:{:?}", response);
- continue;
- }
- // if order.deal_amount != Decimal::ZERO {
- // info!("mexc order 消息原文:{:?}", response);
- // }
- let order_info = OrderInfo::parse_order_to_order_info(&mut order);
- order_infos.push(order_info);
- }
- let mut core = core_arc_clone.lock().await;
- core.update_order(order_infos, trace_stack).await;
- }
- "position" => {
- let mut positions = ExchangeStructHandler::position_handle(MexcSwap, response, ct_val);
- let mut core = core_arc_clone.lock().await;
- if positions.is_empty() {
- positions.push(Position {
- symbol: run_symbol.to_string(),
- margin_level: Default::default(),
- amount: Default::default(),
- frozen_amount: Default::default(),
- price: Default::default(),
- profit: Default::default(),
- position_mode: PositionModeEnum::Both,
- margin: Default::default(),
- });
- }
- core.update_position(positions).await;
- }
- _ => {
- error!("未知推送类型");
- error!(?response);
- }
- }
- }
- async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>) {
- let mut trace_stack = TraceStack::new(response.time, response.ins);
- trace_stack.on_after_span_line();
- match response.channel.as_str() {
- "orderbook" => {
- trace_stack.set_source("mexc_usdt_swap.bookTicker".to_string());
- let mut is_update = false;
- if response.data_type == "delta" {
- is_update = true;
- }
- let mut depth = ExchangeStructHandler::book_ticker_handle(MexcSwap, &response, mul);
- // 是增量更新
- if is_update {
- if depth.asks.len() != 0 {
- depth_asks.clear();
- depth_asks.append(&mut depth.asks);
- } else if depth.bids.len() != 0 {
- depth_bids.clear();
- depth_bids.append(&mut depth.bids);
- }
- let result_depth = Depth {
- time: depth.time,
- symbol: depth.symbol,
- asks: depth_asks.clone(),
- bids: depth_bids.clone(),
- };
- trace_stack.on_after_format();
- on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 0).await;
- }
- // 全量
- else {
- trace_stack.on_after_format();
- on_depth(core_arc, &response.label, &mut trace_stack, &depth, 0).await;
- depth_asks.clear();
- depth_asks.append(&mut depth.asks);
- depth_bids.clear();
- depth_bids.append(&mut depth.bids);
- }
- }
- "trade" => {
- trace_stack.set_source("mexc_usdt_swap.trade".to_string());
- let mut trades = ExchangeStructHandler::trades_handle(MexcSwap, response, mul);
- trace_stack.on_after_format();
- for trade in trades.iter_mut() {
- let core_arc_clone = core_arc.clone();
- on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade, 0).await;
- }
- }
- "tickers" => {
- trace_stack.set_source("mexc_usdt_swap.tickers".to_string());
- let ticker = ExchangeStructHandler::ticker_handle(MexcSwap, response).await;
- trace_stack.on_after_format();
- on_ticker(core_arc, &mut trace_stack, &ticker).await;
- }
- // k线数据
- "kline" => {
- let mut records = ExchangeStructHandler::records_handle(MexcSwap, &response);
- if records.is_empty() {
- return;
- }
- for record in records.iter_mut() {
- let core_arc_clone = core_arc.clone();
- on_record(core_arc_clone, record).await
- }
- }
- _ => {
- error!("未知推送类型");
- error!(?response);
- }
- }
- }
- fn parse_btree_map_to_mexc_swap_login(exchange_params: BTreeMap<String, String>) -> MexcSwapLogin {
- MexcSwapLogin {
- access_key: exchange_params.get("access_key").unwrap().clone(),
- secret_key: exchange_params.get("secret_key").unwrap().clone(),
- pass_key: "".to_string(),
- }
- }
|