use std::collections::HashMap; use rust_decimal::Decimal; use serde_json::Value; use anyhow::Result; pub struct DataManager { pub exchange_info_map: HashMap, pub klines_map: HashMap>, pub asks_map: HashMap>, pub bids_map: HashMap>, } impl DataManager { pub fn new(exchange_info_map: HashMap) -> Self { let klines_map: HashMap> = HashMap::new(); let asks_map: HashMap> = HashMap::new(); let bids_map: HashMap> = HashMap::new(); DataManager { exchange_info_map, klines_map, asks_map, bids_map, } } pub async fn process_klines_map(symbol: String, depth: Value) -> Result<()> { Ok(()) } pub async fn process_depth_data(symbol: String, depth: Value) -> Result<()> { Ok(()) } } // use std::collections::BTreeMap; // use std::sync::Arc; // use std::sync::atomic::{AtomicBool}; // use rust_decimal::Decimal; // use tokio::{spawn}; // use tokio::sync::Mutex; // use tracing::{error}; // use tokio_tungstenite::tungstenite::Message; // use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType}; // use exchanges::response_base::ResponseData; // use global::trace_stack::TraceStack; // use standard::exchange::ExchangeEnum::BybitSwap; // use standard::exchange_struct_handler::ExchangeStructHandler; // use standard::{Depth, OrderBook}; // use crate::core::Core; // use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade}; // use crate::model::OrderInfo; // // // 参考 Bybit 合约 启动 // pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc, // core_arc: Arc>, // name: String, // symbols: Vec, // is_colo: bool, // ref_index: usize // ) { // spawn(async move { // //创建读写通道 // let (write_tx, write_rx) = futures_channel::mpsc::unbounded::(); // let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public); // ws.set_subscribe(vec![ // BybitSwapSubscribeType::PuTrade, // BybitSwapSubscribeType::PuOrderBook1, // // BybitSwapSubscribeType::PuKline("1".to_string()), // // BybitSwapSubscribeType::PuTickers // ]); // // // 读取数据 // let core_arc_clone = Arc::clone(&core_arc); // let mut rest = core_arc_clone.lock().await.platform_rest.clone_box(); // let multiplier = rest.get_self_market().multiplier; // let mut records = rest.get_record("1".to_string()).await.unwrap(); // for record in records.iter_mut() { // let core_arc_clone = core_arc.clone(); // // on_record(core_arc_clone, record).await // } // // let depth_asks = Arc::new(Mutex::new(Vec::new())); // let depth_bids = Arc::new(Mutex::new(Vec::new())); // // let fun = move |data: ResponseData| { // // 在 async 块之前克隆 Arc // let core_arc_cc = core_arc_clone.clone(); // let mul = multiplier.clone(); // // let depth_asks = Arc::clone(&depth_asks); // let depth_bids = Arc::clone(&depth_bids); // // async move { // let mut depth_asks = depth_asks.lock().await; // let mut depth_bids = depth_bids.lock().await; // // 使用克隆后的 Arc,避免 move 语义 // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids, ref_index).await // } // }; // // // 链接 // let write_tx_am = Arc::new(Mutex::new(write_tx)); // ws.set_symbols(symbols); // ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败"); // }); // } // // // 交易 bybit 合约 启动 // pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc, // core_arc: Arc>, // name: String, // symbols: Vec, // is_colo: bool, // exchange_params: BTreeMap) { // // 参考 // reference_bybit_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await; // // // 交易 // spawn(async move { // // 交易交易所需要启动私有ws // let (write_tx, write_rx) = futures_channel::mpsc::unbounded(); // let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params)); // let mut ws = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private); // ws.set_subscribe(vec![ // BybitSwapSubscribeType::PrPosition, // BybitSwapSubscribeType::PrOrder, // BybitSwapSubscribeType::PrWallet // ]); // // let core_arc_clone_private = core_arc.clone(); // let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier; // let run_symbol = symbols.clone()[0].clone(); // // // 挂起私有ws // let fun = move |data: ResponseData| { // // 在 async 块之前克隆 Arc // let core_arc_cc = core_arc_clone_private.clone(); // let mul = multiplier.clone(); // let rs = run_symbol.clone(); // // async move { // // 使用克隆后的 Arc,避免 move 语义 // on_private_data(core_arc_cc, &mul, &rs, &data).await; // } // }; // // // 链接 // let write_tx_am = Arc::new(Mutex::new(write_tx)); // ws.set_symbols(symbols); // ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败"); // }); // } // // async fn on_public_data(core_arc: Arc>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec, depth_bids: &mut Vec, ref_index: usize) { // let mut trace_stack = TraceStack::new(response.time, response.ins); // trace_stack.on_after_span_line(); // // match response.channel.as_str() { // "orderbook" => { // trace_stack.set_source("bybit_usdt_swap.bookTicker".to_string()); // // let mut is_update = false; // if response.data_type == "delta" { // is_update = true; // } // let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, mul); // // 是增量更新 // if is_update { // if depth.asks.len() != 0 { // depth_asks.clear(); // depth_asks.append(&mut depth.asks); // } // // if depth.bids.len() != 0 { // depth_bids.clear(); // depth_bids.append(&mut depth.bids); // } // // let result_depth = Depth { // time: depth.time, // symbol: depth.symbol, // asks: depth_asks.clone(), // bids: depth_bids.clone(), // }; // // trace_stack.on_after_format(); // on_depth(core_arc.clone(), &response.label, &mut trace_stack, &result_depth, ref_index).await; // // on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 1).await; // } // // 全量 // else { // trace_stack.on_after_format(); // on_depth(core_arc.clone(), &response.label, &mut trace_stack, &depth, ref_index).await; // // on_depth(core_arc, &response.label, &mut trace_stack, &depth, 1).await; // // depth_asks.clear(); // depth_asks.append(&mut depth.asks); // depth_bids.clear(); // depth_bids.append(&mut depth.bids); // } // } // "trade" => { // trace_stack.set_source("bybit_usdt_swap.trade".to_string()); // // let mut trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul); // trace_stack.on_after_format(); // // for trade in trades.iter_mut() { // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, ref_index).await; // // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 1).await; // } // } // "tickers" => { // trace_stack.set_source("bybit_usdt_swap.tickers".to_string()); // let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await; // trace_stack.on_after_format(); // // on_ticker(core_arc, &mut trace_stack, &ticker).await; // }, // // k线数据 // "kline" => { // let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response); // // if records.is_empty() { // return; // } // // for record in records.iter_mut() { // let core_arc_clone = core_arc.clone(); // // on_record(core_arc_clone, record).await // } // }, // _ => { // error!("未知推送类型"); // error!(?response); // } // } // } // // async fn on_private_data(core_arc_clone: Arc>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) { // let mut trace_stack = TraceStack::new(response.time, response.ins); // trace_stack.on_after_span_line(); // // match response.channel.as_str() { // "wallet" => { // let account = ExchangeStructHandler::account_info_handle(BybitSwap, response, run_symbol); // let mut core = core_arc_clone.lock().await; // core.update_equity(account).await; // } // "order" => { // let orders = ExchangeStructHandler::order_handle(BybitSwap, response, ct_val); // trace_stack.on_after_format(); // // let mut order_infos:Vec = Vec::new(); // for mut order in orders.order { // if order.status == "NULL" { // error!("bybit_usdt_swap 未识别的订单状态:{:?}", response); // // continue; // } // // // if order.deal_amount != Decimal::ZERO { // // info!("bybit order 消息原文:{:?}", response); // // } // // let order_info = OrderInfo::parse_order_to_order_info(&mut order); // order_infos.push(order_info); // } // // let mut core = core_arc_clone.lock().await; // core.update_order(order_infos, trace_stack).await; // } // "position" => { // let positions = ExchangeStructHandler::position_handle(BybitSwap, response, ct_val); // let mut core = core_arc_clone.lock().await; // core.update_position(positions).await; // } // _ => { // error!("未知推送类型"); // error!(?response); // } // } // } // // fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap) -> BybitSwapLogin { // BybitSwapLogin { // api_key: exchange_params.get("access_key").unwrap().clone(), // secret_key: exchange_params.get("secret_key").unwrap().clone(), // } // }