use std::collections::BTreeMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; use rust_decimal::Decimal; use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::Message; use tracing::{error}; use exchanges::response_base::ResponseData; use global::trace_stack::{TraceStack}; use crate::core::Core; use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType}; use standard::exchange::ExchangeEnum; use standard::exchange_struct_handler::ExchangeStructHandler; use crate::exchange_disguise::{on_depth, on_trade}; use crate::model::OrderInfo; // 参考 币安 合约 启动 pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc, core_arc: Arc>, name: String, symbols: Vec, is_colo: bool, ref_index: usize ) { tokio::spawn(async move { //创建读写通道 let (write_tx, write_rx) = futures_channel::mpsc::unbounded::(); let mut ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::Public).await; ws.set_subscribe(vec![ BinanceSwapSubscribeType::PuBookTicker, BinanceSwapSubscribeType::PuAggTrade ]); // 读取数据 let core_arc_clone = Arc::clone(&core_arc); let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier; let run_symbol = symbols.clone()[0].clone(); let fun = move |data: ResponseData| { // 在 async 块之前克隆 Arc let core_arc_cc = core_arc_clone.clone(); let mul = multiplier.clone(); let rs = run_symbol.clone(); async move { // 使用克隆后的 Arc,避免 move 语义 on_data(core_arc_cc, &mul, &rs, &data, ref_index).await } }; // 链接 let write_tx_am = Arc::new(Mutex::new(write_tx)); ws.set_symbols(symbols); ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败"); }); } // 启动binance交易ws pub(crate) async fn binance_swap_run(is_shutdown_arc: Arc, core_arc: Arc>, name: String, symbols: Vec, is_colo: bool, exchange_params: BTreeMap) { // 参考 reference_binance_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await; tokio::spawn(async move { //创建读写通道 let (write_tx, write_rx) = futures_channel::mpsc::unbounded::(); let binance_login = parse_btree_map_to_binance_swap_login(exchange_params); let mut ws = BinanceSwapWs::new_label(name, is_colo, Option::from(binance_login), BinanceSwapWsType::Private).await; ws.set_subscribe(vec![ BinanceSwapSubscribeType::PrPosition, BinanceSwapSubscribeType::PrAccount, BinanceSwapSubscribeType::PrBalance, ]); // 读取数据 let core_arc_clone = Arc::clone(&core_arc); let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier; let run_symbol = symbols.clone()[0].clone(); let fun = move |data: ResponseData| { // 在 async 块之前克隆 Arc let core_arc_cc = core_arc_clone.clone(); let mul = multiplier.clone(); let rs = run_symbol.clone(); async move { // 使用克隆后的 Arc,避免 move 语义 on_data(core_arc_cc, &mul, &rs, &data, 233).await } }; // 链接 let write_tx_am = Arc::new(Mutex::new(write_tx)); ws.set_symbols(symbols); ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败"); }); } async fn on_data(core_arc: Arc>, multiplier: &Decimal, run_symbol: &String, response: &ResponseData, ref_index: usize) { let mut trace_stack = TraceStack::new(response.time, response.ins); trace_stack.on_after_span_line(); match response.channel.as_str() { "info" => { // info!("connected: {}", response.data.to_string()) } "aggTrade" => { trace_stack.set_source("binance_usdt_swap.aggTrade".to_string()); let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, response, multiplier); trace_stack.on_after_format(); for trade in trades.iter_mut() { let core_arc_clone = core_arc.clone(); on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade, ref_index).await; } } "bookTicker" => { trace_stack.set_source("binance_usdt_swap.bookTicker".to_string()); // 将ticker数据转换为模拟深度 let depth = ExchangeStructHandler::book_ticker_handle(ExchangeEnum::BinanceSwap, response, multiplier); trace_stack.on_after_format(); on_depth(core_arc, &response.label, &mut trace_stack, &depth, ref_index).await; } "ACCOUNT_UPDATE" => { let account = ExchangeStructHandler::account_info_handle(ExchangeEnum::BinanceSwap, response, run_symbol); let positions = ExchangeStructHandler::position_handle(ExchangeEnum::BinanceSwap, response, multiplier); let mut core = core_arc.lock().await; core.update_position(positions).await; core.update_equity(account).await; } "ORDER_TRADE_UPDATE" => { trace_stack.set_source("binance_swap.ORDER_TRADE_UPDATE".to_string()); let orders = ExchangeStructHandler::order_handle(ExchangeEnum::BinanceSwap, response, &Decimal::ONE); let mut order_infos:Vec = Vec::new(); for mut order in orders.order { if order.status == "NULL" { error!("binance_usdt_swap 未识别的订单状态:{:?}", response); continue; } let order_info = OrderInfo::parse_order_to_order_info(&mut order); order_infos.push(order_info); } let mut core = core_arc.lock().await; core.update_order(order_infos, trace_stack).await; } _ => { error!("未知推送类型"); error!(?response); } } } fn parse_btree_map_to_binance_swap_login(exchange_params: BTreeMap) -> BinanceSwapLogin { BinanceSwapLogin { api_key: exchange_params.get("access_key").unwrap().clone(), api_secret: exchange_params.get("secret_key").unwrap().clone(), } }