data_manager.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. use std::collections::HashMap;
  2. use rust_decimal::Decimal;
  3. use serde_json::Value;
  4. use anyhow::Result;
  5. pub struct DataManager {
  6. pub exchange_info_map: HashMap<String, Value>,
  7. pub klines_map: HashMap<String, Vec<Value>>,
  8. pub asks_map: HashMap<String, HashMap<Decimal, Decimal>>,
  9. pub bids_map: HashMap<String, HashMap<Decimal, Decimal>>,
  10. }
  11. impl DataManager {
  12. pub fn new(exchange_info_map: HashMap<String, Value>) -> Self {
  13. let klines_map: HashMap<String, Vec<Value>> = HashMap::new();
  14. let asks_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
  15. let bids_map: HashMap<String, HashMap<Decimal, Decimal>> = HashMap::new();
  16. DataManager {
  17. exchange_info_map,
  18. klines_map,
  19. asks_map,
  20. bids_map,
  21. }
  22. }
  23. pub async fn process_klines_map(symbol: String, depth: Value) -> Result<()> {
  24. Ok(())
  25. }
  26. pub async fn process_depth_data(symbol: String, depth: Value) -> Result<()> {
  27. Ok(())
  28. }
  29. }
  30. // use std::collections::BTreeMap;
  31. // use std::sync::Arc;
  32. // use std::sync::atomic::{AtomicBool};
  33. // use rust_decimal::Decimal;
  34. // use tokio::{spawn};
  35. // use tokio::sync::Mutex;
  36. // use tracing::{error};
  37. // use tokio_tungstenite::tungstenite::Message;
  38. // use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
  39. // use exchanges::response_base::ResponseData;
  40. // use global::trace_stack::TraceStack;
  41. // use standard::exchange::ExchangeEnum::BybitSwap;
  42. // use standard::exchange_struct_handler::ExchangeStructHandler;
  43. // use standard::{Depth, OrderBook};
  44. // use crate::core::Core;
  45. // use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
  46. // use crate::model::OrderInfo;
  47. //
  48. // // 参考 Bybit 合约 启动
  49. // pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  50. // core_arc: Arc<Mutex<Core>>,
  51. // name: String,
  52. // symbols: Vec<String>,
  53. // is_colo: bool,
  54. // ref_index: usize
  55. // ) {
  56. // spawn(async move {
  57. // //创建读写通道
  58. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  59. // let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
  60. // ws.set_subscribe(vec![
  61. // BybitSwapSubscribeType::PuTrade,
  62. // BybitSwapSubscribeType::PuOrderBook1,
  63. // // BybitSwapSubscribeType::PuKline("1".to_string()),
  64. // // BybitSwapSubscribeType::PuTickers
  65. // ]);
  66. //
  67. // // 读取数据
  68. // let core_arc_clone = Arc::clone(&core_arc);
  69. // let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
  70. // let multiplier = rest.get_self_market().multiplier;
  71. // let mut records = rest.get_record("1".to_string()).await.unwrap();
  72. // for record in records.iter_mut() {
  73. // let core_arc_clone = core_arc.clone();
  74. //
  75. // on_record(core_arc_clone, record).await
  76. // }
  77. //
  78. // let depth_asks = Arc::new(Mutex::new(Vec::new()));
  79. // let depth_bids = Arc::new(Mutex::new(Vec::new()));
  80. //
  81. // let fun = move |data: ResponseData| {
  82. // // 在 async 块之前克隆 Arc
  83. // let core_arc_cc = core_arc_clone.clone();
  84. // let mul = multiplier.clone();
  85. //
  86. // let depth_asks = Arc::clone(&depth_asks);
  87. // let depth_bids = Arc::clone(&depth_bids);
  88. //
  89. // async move {
  90. // let mut depth_asks = depth_asks.lock().await;
  91. // let mut depth_bids = depth_bids.lock().await;
  92. // // 使用克隆后的 Arc,避免 move 语义
  93. // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids, ref_index).await
  94. // }
  95. // };
  96. //
  97. // // 链接
  98. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  99. // ws.set_symbols(symbols);
  100. // ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
  101. // });
  102. // }
  103. //
  104. // // 交易 bybit 合约 启动
  105. // pub(crate) async fn bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  106. // core_arc: Arc<Mutex<Core>>,
  107. // name: String,
  108. // symbols: Vec<String>,
  109. // is_colo: bool,
  110. // exchange_params: BTreeMap<String, String>) {
  111. // // 参考
  112. // reference_bybit_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await;
  113. //
  114. // // 交易
  115. // spawn(async move {
  116. // // 交易交易所需要启动私有ws
  117. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  118. // let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
  119. // let mut ws = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
  120. // ws.set_subscribe(vec![
  121. // BybitSwapSubscribeType::PrPosition,
  122. // BybitSwapSubscribeType::PrOrder,
  123. // BybitSwapSubscribeType::PrWallet
  124. // ]);
  125. //
  126. // let core_arc_clone_private = core_arc.clone();
  127. // let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
  128. // let run_symbol = symbols.clone()[0].clone();
  129. //
  130. // // 挂起私有ws
  131. // let fun = move |data: ResponseData| {
  132. // // 在 async 块之前克隆 Arc
  133. // let core_arc_cc = core_arc_clone_private.clone();
  134. // let mul = multiplier.clone();
  135. // let rs = run_symbol.clone();
  136. //
  137. // async move {
  138. // // 使用克隆后的 Arc,避免 move 语义
  139. // on_private_data(core_arc_cc, &mul, &rs, &data).await;
  140. // }
  141. // };
  142. //
  143. // // 链接
  144. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  145. // ws.set_symbols(symbols);
  146. // ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
  147. // });
  148. // }
  149. //
  150. // async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>, ref_index: usize) {
  151. // let mut trace_stack = TraceStack::new(response.time, response.ins);
  152. // trace_stack.on_after_span_line();
  153. //
  154. // match response.channel.as_str() {
  155. // "orderbook" => {
  156. // trace_stack.set_source("bybit_usdt_swap.bookTicker".to_string());
  157. //
  158. // let mut is_update = false;
  159. // if response.data_type == "delta" {
  160. // is_update = true;
  161. // }
  162. // let mut depth = ExchangeStructHandler::book_ticker_handle(BybitSwap, &response, mul);
  163. // // 是增量更新
  164. // if is_update {
  165. // if depth.asks.len() != 0 {
  166. // depth_asks.clear();
  167. // depth_asks.append(&mut depth.asks);
  168. // }
  169. //
  170. // if depth.bids.len() != 0 {
  171. // depth_bids.clear();
  172. // depth_bids.append(&mut depth.bids);
  173. // }
  174. //
  175. // let result_depth = Depth {
  176. // time: depth.time,
  177. // symbol: depth.symbol,
  178. // asks: depth_asks.clone(),
  179. // bids: depth_bids.clone(),
  180. // };
  181. //
  182. // trace_stack.on_after_format();
  183. // on_depth(core_arc.clone(), &response.label, &mut trace_stack, &result_depth, ref_index).await;
  184. // // on_depth(core_arc, &response.label, &mut trace_stack, &result_depth, 1).await;
  185. // }
  186. // // 全量
  187. // else {
  188. // trace_stack.on_after_format();
  189. // on_depth(core_arc.clone(), &response.label, &mut trace_stack, &depth, ref_index).await;
  190. // // on_depth(core_arc, &response.label, &mut trace_stack, &depth, 1).await;
  191. //
  192. // depth_asks.clear();
  193. // depth_asks.append(&mut depth.asks);
  194. // depth_bids.clear();
  195. // depth_bids.append(&mut depth.bids);
  196. // }
  197. // }
  198. // "trade" => {
  199. // trace_stack.set_source("bybit_usdt_swap.trade".to_string());
  200. //
  201. // let mut trades = ExchangeStructHandler::trades_handle(BybitSwap, response, mul);
  202. // trace_stack.on_after_format();
  203. //
  204. // for trade in trades.iter_mut() {
  205. // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, ref_index).await;
  206. // // on_trade(core_arc.clone(), &response.label, &mut trace_stack, &trade, 1).await;
  207. // }
  208. // }
  209. // "tickers" => {
  210. // trace_stack.set_source("bybit_usdt_swap.tickers".to_string());
  211. // let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
  212. // trace_stack.on_after_format();
  213. //
  214. // on_ticker(core_arc, &mut trace_stack, &ticker).await;
  215. // },
  216. // // k线数据
  217. // "kline" => {
  218. // let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response);
  219. //
  220. // if records.is_empty() {
  221. // return;
  222. // }
  223. //
  224. // for record in records.iter_mut() {
  225. // let core_arc_clone = core_arc.clone();
  226. //
  227. // on_record(core_arc_clone, record).await
  228. // }
  229. // },
  230. // _ => {
  231. // error!("未知推送类型");
  232. // error!(?response);
  233. // }
  234. // }
  235. // }
  236. //
  237. // async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
  238. // let mut trace_stack = TraceStack::new(response.time, response.ins);
  239. // trace_stack.on_after_span_line();
  240. //
  241. // match response.channel.as_str() {
  242. // "wallet" => {
  243. // let account = ExchangeStructHandler::account_info_handle(BybitSwap, response, run_symbol);
  244. // let mut core = core_arc_clone.lock().await;
  245. // core.update_equity(account).await;
  246. // }
  247. // "order" => {
  248. // let orders = ExchangeStructHandler::order_handle(BybitSwap, response, ct_val);
  249. // trace_stack.on_after_format();
  250. //
  251. // let mut order_infos:Vec<OrderInfo> = Vec::new();
  252. // for mut order in orders.order {
  253. // if order.status == "NULL" {
  254. // error!("bybit_usdt_swap 未识别的订单状态:{:?}", response);
  255. //
  256. // continue;
  257. // }
  258. //
  259. // // if order.deal_amount != Decimal::ZERO {
  260. // // info!("bybit order 消息原文:{:?}", response);
  261. // // }
  262. //
  263. // let order_info = OrderInfo::parse_order_to_order_info(&mut order);
  264. // order_infos.push(order_info);
  265. // }
  266. //
  267. // let mut core = core_arc_clone.lock().await;
  268. // core.update_order(order_infos, trace_stack).await;
  269. // }
  270. // "position" => {
  271. // let positions = ExchangeStructHandler::position_handle(BybitSwap, response, ct_val);
  272. // let mut core = core_arc_clone.lock().await;
  273. // core.update_position(positions).await;
  274. // }
  275. // _ => {
  276. // error!("未知推送类型");
  277. // error!(?response);
  278. // }
  279. // }
  280. // }
  281. //
  282. // fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
  283. // BybitSwapLogin {
  284. // api_key: exchange_params.get("access_key").unwrap().clone(),
  285. // secret_key: exchange_params.get("secret_key").unwrap().clone(),
  286. // }
  287. // }