use std::collections::BTreeMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; use futures_util::StreamExt; use rust_decimal::Decimal; use serde_json::Value; use tokio::spawn; use tokio::sync::Mutex; use tracing::info; use exchanges::gate_swap_rest::GateSwapRest; use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType}; use exchanges::response_base::ResponseData; use global::trace_stack::TraceStack; use standard::exchange::ExchangeEnum::GateSwap; use crate::model::{OrderInfo, OriginalTradeGa}; use crate::quant::Quant; use crate::exchange_disguise::on_special_depth; // 1交易、0参考 gate 合约 启动 pub async fn gate_swap_run(bool_v1: Arc, is_trade: bool, quant_arc: Arc>, name: String, symbols: Vec, is_colo: bool, exchange_params: BTreeMap) { let (write_tx, write_rx) = futures_channel::mpsc::unbounded(); let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded(); let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone()); let mut user_id= "".to_string(); // 交易 if is_trade { // 获取user_id let res_data = gate_exc.wallet_fee().await; assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!"); let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap(); info!(?wallet_obj); user_id = wallet_obj["user_id"].to_string(); } let write_tx_am = Arc::new(Mutex::new(write_tx)); let symbols_clone = symbols.clone(); spawn(async move { let mut ws; // 交易 if is_trade { let login_param = parse_btree_map_to_gate_swap_login(exchange_params); ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param), GateSwapWsType::PublicAndPrivate("usdt".to_string())); ws.set_subscribe(vec![ // GateSwapSubscribeType::PuFuturesTrades, GateSwapSubscribeType::PuFuturesOrderBook, GateSwapSubscribeType::PrFuturesOrders(user_id.clone()), GateSwapSubscribeType::PrFuturesPositions(user_id.clone()), GateSwapSubscribeType::PrFuturesBalances(user_id.clone()), ]); } else { // 参考 ws = GateSwapWs::new_label(name.clone(), is_colo, None, GateSwapWsType::PublicAndPrivate("usdt".to_string())); ws.set_subscribe(vec![ GateSwapSubscribeType::PuFuturesTrades, GateSwapSubscribeType::PuFuturesOrderBook ]); } ws.set_symbols(symbols_clone); ws.ws_connect_async(bool_v1, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)"); }); spawn(async move { let bot_arc_clone = Arc::clone(&quant_arc); let run_symbol = symbols.clone()[0].clone(); // trade let mut update_flag_u = Decimal::ZERO; let mut max_buy = Decimal::ZERO; let mut min_sell = Decimal::ZERO; let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size; loop { while let Some(data) = read_rx.next().await { on_data(bot_arc_clone.clone(), &mut update_flag_u, multiplier, run_symbol.clone(), &mut max_buy, &mut min_sell, data).await; } } }); } async fn on_data(bot_arc_clone: Arc>, update_flag_u: &mut Decimal, multiplier: Decimal, run_symbol: String, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) { let mut trace_stack = TraceStack::default(); trace_stack.on_after_network(data.time); trace_stack.on_before_unlock_quant(); if data.code != "200".to_string() { return; } if data.channel == "futures.order_book" { trace_stack.on_before_format(); let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data.clone()); trace_stack.on_before_network(depth.create_at.clone()); trace_stack.on_after_format(); on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, depth).await; } else if data.channel == "futures.balances" { let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone()); { let mut quant = bot_arc_clone.lock().await; quant.update_equity(account).await; } } else if data.channel == "futures.orders" { trace_stack.on_before_format(); let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone()); trace_stack.on_after_format(); let mut order_infos:Vec = Vec::new(); for order in orders.order { let order_info = OrderInfo { symbol: "".to_string(), amount: order.amount.abs(), side: "".to_string(), price: order.price, client_id: order.custom_id, filled_price: order.avg_price, filled: order.deal_amount.abs(), order_id: order.id, local_time: 0, create_time: 0, status: order.status, fee: Default::default(), trace_stack: Default::default(), }; order_infos.push(order_info); } { let mut quant = bot_arc_clone.lock().await; quant.update_order(order_infos, trace_stack); } } else if data.channel == "futures.positions" { let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone()); { let mut quant = bot_arc_clone.lock().await; quant.update_position(positions).await; } } else if data.channel == "futures.trades" { let mut quant = bot_arc_clone.lock().await; let str = data.label.clone(); if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){ *max_buy = Decimal::ZERO; *min_sell = Decimal::ZERO; quant.is_update.remove(str.as_str()); } let trades: Vec = serde_json::from_str(data.data.as_str()).unwrap(); for trade in trades { if trade.price > *max_buy || *max_buy == Decimal::ZERO{ *max_buy = trade.price } if trade.price < *min_sell || *min_sell == Decimal::ZERO{ *min_sell = trade.price } } quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]); } } fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap) -> GateSwapLogin { GateSwapLogin { api_key: exchange_params.get("access_key").unwrap().clone(), secret: exchange_params.get("secret_key").unwrap().clone() } }