| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- use std::collections::HashMap;
- use rust_decimal::Decimal;
- use serde_json::Value;
- use anyhow::Result;
- pub struct DataManager {
- pub exchange_info_map: HashMap<String, Value>,
- pub klines_map: HashMap<String, Vec<Value>>,
- pub asks_map: HashMap<String, HashMap<Decimal, Decimal>>,
- pub bids_map: HashMap<String, HashMap<Decimal, Decimal>>,
- }
- impl DataManager {
- pub fn new(exchange_info_map: HashMap<String, Value>) -> Self {
- let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
- let asks_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
- let bids_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
-
- DataManager {
- exchange_info_map,
- klines_map,
- asks_map,
- bids_map,
- }
- }
-
- pub async fn process_klines_map(symbol: String, depth: Value) -> Result<()> {
- Ok(())
- }
-
- pub async fn process_depth_data(symbol: String, depth: Value) -> Result<()> {
- Ok(())
- }
- }
- // use std::collections::BTreeMap;
- // use std::sync::Arc;
- // use std::sync::atomic::{AtomicBool};
- // use rust_decimal::Decimal;
- // use tokio::{spawn};
- // use tokio::sync::Mutex;
- // use tracing::{error};
- // use tokio_tungstenite::tungstenite::Message;
- // use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
- // use exchanges::response_base::ResponseData;
- // use global::trace_stack::TraceStack;
- // use standard::exchange::ExchangeEnum::BybitSwap;
- // use standard::exchange_struct_handler::ExchangeStructHandler;
- // use standard::{Depth, OrderBook};
- // use crate::core::Core;
- // use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
- // use crate::model::OrderInfo;
- //
- // // 参考 Bybit 合约 启动
- // pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
- // core_arc: Arc<Mutex<Core>>,
- // name: String,
- // symbols: Vec<String>,
- // is_colo: bool,
- // ref_index: usize
- // ) {
- // spawn(async move {
- // //创建读写通道
- // let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
- // let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
- // ws.set_subscribe(vec![
- // BybitSwapSubscribeType::PuTrade,
- // BybitSwapSubscribeType::PuOrderBook1,
- // // BybitSwapSubscribeType::PuKline("1".to_string()),
- // // BybitSwapSubscribeType::PuTickers
- // ]);
- //
- // // 读取数据
- // let core_arc_clone = Arc::clone(&core_arc);
- // let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
- // let multiplier = rest.get_self_market().multiplier;
- // let mut records = rest.get_record("1".to_string()).await.unwrap();
- // for record in records.iter_mut() {
- // let core_arc_clone = core_arc.clone();
- //
- // on_record(core_arc_clone, record).await
- // }
- //
- // let depth_asks = Arc::new(Mutex::new(Vec::new()));
- // let depth_bids = Arc::new(Mutex::new(Vec::new()));
- //
- // let fun = move |data: ResponseData| {
- // // 在 async 块之前克隆 Arc
- // let core_arc_cc = core_arc_clone.clone();
- // let mul = multiplier.clone();
- //
- // let depth_asks = Arc::clone(&depth_asks);
- // let depth_bids = Arc::clone(&depth_bids);
- //
- // async move {
- // let mut depth_asks = depth_asks.lock().await;
- // let mut depth_bids = depth_bids.lock().await;
- // // 使用克隆后的 Arc,避免 move 语义
- // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids, ref_index).await
- // }
- // };
- //
- // // 链接
- // let write_tx_am = Arc::new(Mutex::new(write_tx));
- // ws.set_symbols(symbols);
- // ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
- // });
- // }
- //
- // // 交易 bybit 合约 启动
- // pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
- // core_arc: Arc<Mutex<Core>>,
- // name: String,
- // symbols: Vec<String>,
- // is_colo: bool,
- // exchange_params: BTreeMap<String, String>) {
- // // 参考
- // reference_bybit_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await;
- //
- // // 交易
- // spawn(async move {
- // // 交易交易所需要启动私有ws
- // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
- // let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
- // let mut ws = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
- // ws.set_subscribe(vec![
- // BybitSwapSubscribeType::PrPosition,
- // BybitSwapSubscribeType::PrOrder,
- // BybitSwapSubscribeType::PrWallet
- // ]);
- //
- // let core_arc_clone_private = core_arc.clone();
- // let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
- // let run_symbol = symbols.clone()[0].clone();
- //
- // // 挂起私有ws
- // let fun = move |data: ResponseData| {
- // // 在 async 块之前克隆 Arc
- // let core_arc_cc = core_arc_clone_private.clone();
- // let mul = multiplier.clone();
- // let rs = run_symbol.clone();
- //
- // async move {
- // // 使用克隆后的 Arc,避免 move 语义
- // on_private_data(core_arc_cc, &mul, &rs, &data).await;
- // }
- // };
- //
- // // 链接
- // let write_tx_am = Arc::new(Mutex::new(write_tx));
- // ws.set_symbols(symbols);
- // ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
- // });
- // }
- //
- // async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>, ref_index: usize) {
- // let mut trace_stack = TraceStack::new(response.time, response.ins);
- // trace_stack.on_after_span_line();
- //
- // match response.channel.as_str() {
- // "orderbook" => {
- // trace_stack.set_source("bybit_usdt_swap.bookTicker".to_string());
- //
- // let mut is_update = false;
- // if response.data_type == "delta" {
- // is_update = true;
- // }
- // let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, mul);
- // // 是增量更新
- // if is_update {
- // if depth.asks.len() != 0 {
- // depth_asks.clear();
- // depth_asks.append(&mut depth.asks);
- // }
- //
- // if depth.bids.len() != 0 {
- // depth_bids.clear();
- // depth_bids.append(&mut depth.bids);
- // }
- //
- // let result_depth = Depth {
- // time: depth.time,
- // symbol: depth.symbol,
- // asks: depth_asks.clone(),
- // bids: depth_bids.clone(),
- // };
- //
- // trace_stack.on_after_format();
- // on_depth(core_arc.clone(), &response.label, &mut trace_stack, &result_depth, ref_index).await;
- // // on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 1).await;
- // }
- // // 全量
- // else {
- // trace_stack.on_after_format();
- // on_depth(core_arc.clone(), &response.label, &mut trace_stack, &depth, ref_index).await;
- // // on_depth(core_arc, &response.label, &mut trace_stack, &depth, 1).await;
- //
- // depth_asks.clear();
- // depth_asks.append(&mut depth.asks);
- // depth_bids.clear();
- // depth_bids.append(&mut depth.bids);
- // }
- // }
- // "trade" => {
- // trace_stack.set_source("bybit_usdt_swap.trade".to_string());
- //
- // let mut trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul);
- // trace_stack.on_after_format();
- //
- // for trade in trades.iter_mut() {
- // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, ref_index).await;
- // // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 1).await;
- // }
- // }
- // "tickers" => {
- // trace_stack.set_source("bybit_usdt_swap.tickers".to_string());
- // let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
- // trace_stack.on_after_format();
- //
- // on_ticker(core_arc, &mut trace_stack, &ticker).await;
- // },
- // // k线数据
- // "kline" => {
- // let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response);
- //
- // if records.is_empty() {
- // return;
- // }
- //
- // for record in records.iter_mut() {
- // let core_arc_clone = core_arc.clone();
- //
- // on_record(core_arc_clone, record).await
- // }
- // },
- // _ => {
- // error!("未知推送类型");
- // error!(?response);
- // }
- // }
- // }
- //
- // async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
- // let mut trace_stack = TraceStack::new(response.time, response.ins);
- // trace_stack.on_after_span_line();
- //
- // match response.channel.as_str() {
- // "wallet" => {
- // let account = ExchangeStructHandler::account_info_handle(BybitSwap, response, run_symbol);
- // let mut core = core_arc_clone.lock().await;
- // core.update_equity(account).await;
- // }
- // "order" => {
- // let orders = ExchangeStructHandler::order_handle(BybitSwap, response, ct_val);
- // trace_stack.on_after_format();
- //
- // let mut order_infos:Vec<OrderInfo> = Vec::new();
- // for mut order in orders.order {
- // if order.status == "NULL" {
- // error!("bybit_usdt_swap 未识别的订单状态:{:?}", response);
- //
- // continue;
- // }
- //
- // // if order.deal_amount != Decimal::ZERO {
- // // info!("bybit order 消息原文:{:?}", response);
- // // }
- //
- // let order_info = OrderInfo::parse_order_to_order_info(&mut order);
- // order_infos.push(order_info);
- // }
- //
- // let mut core = core_arc_clone.lock().await;
- // core.update_order(order_infos, trace_stack).await;
- // }
- // "position" => {
- // let positions = ExchangeStructHandler::position_handle(BybitSwap, response, ct_val);
- // let mut core = core_arc_clone.lock().await;
- // core.update_position(positions).await;
- // }
- // _ => {
- // error!("未知推送类型");
- // error!(?response);
- // }
- // }
- // }
- //
- // fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
- // BybitSwapLogin {
- // api_key: exchange_params.get("access_key").unwrap().clone(),
- // secret_key: exchange_params.get("secret_key").unwrap().clone(),
- // }
- // }
|