| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- use std::collections::BTreeMap;
- use std::sync::Arc;
- use std::sync::atomic::AtomicBool;
- use rust_decimal::Decimal;
- use tokio::sync::Mutex;
- use tokio_tungstenite::tungstenite::Message;
- use tracing::{error};
- use exchanges::response_base::ResponseData;
- use global::trace_stack::{TraceStack};
- use crate::core::Core;
- use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
- use standard::exchange::ExchangeEnum;
- use standard::exchange_struct_handler::ExchangeStructHandler;
- use crate::exchange_disguise::{on_depth, on_trade};
- use crate::model::OrderInfo;
- // 参考 币安 合约 启动
- pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
- core_arc: Arc<Mutex<Core>>,
- name: String,
- symbols: Vec<String>,
- is_colo: bool) {
- tokio::spawn(async move {
- //创建读写通道
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
- let mut ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::Public).await;
- ws.set_subscribe(vec![
- BinanceSwapSubscribeType::PuBookTicker,
- BinanceSwapSubscribeType::PuAggTrade
- ]);
- // 读取数据
- let core_arc_clone = Arc::clone(&core_arc);
- let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
- let run_symbol = symbols.clone()[0].clone();
- let fun = move |data: ResponseData| {
- // 在 async 块之前克隆 Arc
- let core_arc_cc = core_arc_clone.clone();
- let mul = multiplier.clone();
- let rs = run_symbol.clone();
- async move {
- // 使用克隆后的 Arc,避免 move 语义
- on_data(core_arc_cc, &mul, &rs, &data).await
- }
- };
- // 链接
- let write_tx_am = Arc::new(Mutex::new(write_tx));
- ws.set_symbols(symbols);
- ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
- });
- }
- // 启动binance交易ws
- pub(crate) async fn binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
- core_arc: Arc<Mutex<Core>>,
- name: String,
- symbols: Vec<String>,
- is_colo: bool,
- exchange_params: BTreeMap<String, String>) {
- tokio::spawn(async move {
- //创建读写通道
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
- let binance_login = parse_btree_map_to_binance_swap_login(exchange_params);
- let mut ws = BinanceSwapWs::new_label(name, is_colo, Option::from(binance_login), BinanceSwapWsType::Private).await;
- ws.set_subscribe(vec![
- BinanceSwapSubscribeType::PrPosition,
- BinanceSwapSubscribeType::PrAccount,
- BinanceSwapSubscribeType::PrBalance
- ]);
- // 读取数据
- let core_arc_clone = Arc::clone(&core_arc);
- let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
- let run_symbol = symbols.clone()[0].clone();
- let fun = move |data: ResponseData| {
- // 在 async 块之前克隆 Arc
- let core_arc_cc = core_arc_clone.clone();
- let mul = multiplier.clone();
- let rs = run_symbol.clone();
- async move {
- // 使用克隆后的 Arc,避免 move 语义
- on_data(core_arc_cc, &mul, &rs, &data).await
- }
- };
- // 链接
- let write_tx_am = Arc::new(Mutex::new(write_tx));
- ws.set_symbols(symbols);
- ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
- });
- }
- async fn on_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, run_symbol: &String, response: &ResponseData) {
- let mut trace_stack = TraceStack::new(response.time, response.ins);
- trace_stack.on_after_span_line();
- match response.channel.as_str() {
- "info" => {
- // info!("connected: {}", response.data.to_string())
- }
- "aggTrade" => {
- trace_stack.set_source("binance_usdt_swap.aggTrade".to_string());
- let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, response, multiplier);
- trace_stack.on_after_format();
- // for trade in trades.iter_mut() {
- let core_arc_clone = core_arc.clone();
- on_trade(core_arc_clone, &response.label, &mut trace_stack, trades).await;
- // }
- }
- "bookTicker" => {
- trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
- // 将ticker数据转换为模拟深度
- let depth = ExchangeStructHandler::book_ticker_handle(ExchangeEnum::BinanceSwap, response, multiplier);
- trace_stack.on_after_format();
- on_depth(core_arc, &response.label, &mut trace_stack, &depth).await;
- }
- "ACCOUNT_UPDATE" => {
- let account = ExchangeStructHandler::account_info_handle(ExchangeEnum::BinanceSwap, response, run_symbol);
- let positions = ExchangeStructHandler::position_handle(ExchangeEnum::BinanceSwap, response, multiplier);
- let mut core = core_arc.lock().await;
- core.update_position(positions).await;
- core.update_equity(account).await;
- }
- "ORDER_TRADE_UPDATE" => {
- trace_stack.set_source("binance_swap.ORDER_TRADE_UPDATE".to_string());
- let orders = ExchangeStructHandler::order_handle(ExchangeEnum::BinanceSwap, response, &Decimal::ONE);
- let mut order_infos:Vec<OrderInfo> = Vec::new();
- for mut order in orders.order {
- if order.status == "NULL" {
- error!("binance_usdt_swap 未识别的订单状态:{:?}", response);
- continue;
- }
- let order_info = OrderInfo::parse_order_to_order_info(&mut order);
- order_infos.push(order_info);
- }
- let mut core = core_arc.lock().await;
- core.update_order(order_infos, trace_stack).await;
- }
- _ => {
- error!("未知推送类型");
- error!(?response);
- }
- }
- }
- fn parse_btree_map_to_binance_swap_login(exchange_params: BTreeMap<String, String>) -> BinanceSwapLogin {
- BinanceSwapLogin {
- api_key: exchange_params.get("access_key").unwrap().clone(),
- api_secret: exchange_params.get("secret_key").unwrap().clone(),
- }
- }
|