use tracing::{error, info}; use std::collections::BTreeMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; use rust_decimal::Decimal; use tokio::spawn; use tokio::sync::Mutex; use exchanges::gate_swap_rest::GateSwapRest; use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType}; use exchanges::response_base::ResponseData; use global::trace_stack::{TraceStack}; use standard::exchange::ExchangeEnum::{GateSwap}; use crate::model::{OrderInfo}; use crate::core::Core; use crate::exchange_disguise::on_special_depth; // 1交易、0参考 gate 合约 启动 pub async fn gate_swap_run(is_shutdown_arc: Arc, is_trade: bool, core_arc: Arc>, name: String, symbols: Vec, is_colo: bool, symbol_multiplier: Decimal, exchange_params: BTreeMap) { let (write_tx, write_rx) = futures_channel::mpsc::unbounded(); let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone()); let mut user_id = "".to_string(); // 交易 if is_trade { // 获取user_id let res_data = gate_exc.wallet_fee().await; assert_eq!(res_data.code, 200, "获取gate交易所参数 user_id 失败, 启动失败!"); let wallet_obj = res_data.data; info!(?wallet_obj); user_id = wallet_obj["user_id"].to_string(); } let write_tx_am = Arc::new(Mutex::new(write_tx)); let symbols_clone = symbols.clone(); spawn(async move { let mut ws; // 交易 if is_trade { let login_param = parse_btree_map_to_gate_swap_login(exchange_params); ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param), GateSwapWsType::PublicAndPrivate("usdt".to_string())); ws.set_subscribe(vec![ // GateSwapSubscribeType::PuFuturesTrades, GateSwapSubscribeType::PuFuturesBookTicker, GateSwapSubscribeType::PrFuturesOrders(user_id.clone()), GateSwapSubscribeType::PrFuturesPositions(user_id.clone()), GateSwapSubscribeType::PrFuturesBalances(user_id.clone()), ]); } else { // 参考 ws = GateSwapWs::new_label(name.clone(), is_colo, None, GateSwapWsType::PublicAndPrivate("usdt".to_string())); ws.set_subscribe(vec![ GateSwapSubscribeType::PuFuturesTrades, GateSwapSubscribeType::PuFuturesBookTicker, ]); } // 读取数据 let mut update_flag_u = Decimal::ZERO; let core_arc_clone = Arc::clone(&core_arc); let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size; let run_symbol = symbols.clone()[0].clone(); let fun = move |data: ResponseData| { let core_arc_cc = core_arc_clone.clone(); // 在 async 块之前克隆 Arc let mul = multiplier.clone(); let rs = run_symbol.clone(); async move { on_data(core_arc_cc, &mut update_flag_u, &mul, &rs, symbol_multiplier, data, ).await } }; // 建立链接 ws.set_symbols(symbols_clone); ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)"); }); } async fn on_data(core_arc_clone: Arc>, update_flag_u: &mut Decimal, multiplier: &Decimal, run_symbol: &String, symbol_multiplier: Decimal, response: ResponseData) { let mut trace_stack = TraceStack::new(response.time, response.ins); trace_stack.on_after_span_line(); match response.channel.as_str() { "futures.order_book" => { trace_stack.set_source("gate_usdt_swap.order_book".to_string()); let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &response, symbol_multiplier); trace_stack.on_after_format(); info!("gate depth推送: {:?}", special_depth); on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await; } "futures.book_ticker" => { trace_stack.set_source("gate_usdt_swap.book_ticker".to_string()); // 将ticker数据转换为模拟深度 let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &response, symbol_multiplier); trace_stack.on_after_format(); info!("gate ticker推送: {:?}", special_depth); on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await; } "futures.balances" => { let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, &response, run_symbol); let mut core = core_arc_clone.lock().await; core.update_equity(account).await; } "futures.orders" => { trace_stack.set_source("gate_swap.orders".to_string()); let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, response.clone(), multiplier.clone()); let mut order_infos: Vec = Vec::new(); for mut order in orders.order { if order.status == "NULL" { error!("gate_usdt_swap 未识别的订单状态:{:?}", response); continue; } let order_info = OrderInfo::parse_order_to_order_info(&mut order); order_infos.push(order_info); } { let mut core = core_arc_clone.lock().await; core.update_order(order_infos, trace_stack).await; } } "futures.positions" => { let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap, &response, multiplier); let mut core = core_arc_clone.lock().await; core.update_position(positions).await; } "futures.trades" => { // let mut core = core_arc_clone.lock().await; // let str = data.label.clone(); // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){ // *max_buy = Decimal::ZERO; // *min_sell = Decimal::ZERO; // core.is_update.remove(str.as_str()); // } // let trades: Vec = serde_json::from_str(data.data.as_str()).unwrap(); // for trade in trades { // if trade.price > *max_buy || *max_buy == Decimal::ZERO{ // *max_buy = trade.price // } // if trade.price < *min_sell || *min_sell == Decimal::ZERO{ // *min_sell = trade.price // } // } // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]); } _ => { error!("未知推送类型"); error!(?response); } } } fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap) -> GateSwapLogin { GateSwapLogin { api_key: exchange_params.get("access_key").unwrap().clone(), secret: exchange_params.get("secret_key").unwrap().clone(), } }