binance_usdt_swap.rs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. use std::collections::BTreeMap;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use rust_decimal::Decimal;
  5. use tokio::sync::Mutex;
  6. use tokio_tungstenite::tungstenite::Message;
  7. use tracing::{error};
  8. use exchanges::response_base::ResponseData;
  9. use global::trace_stack::{TraceStack};
  10. use crate::core::Core;
  11. use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  12. use standard::exchange::ExchangeEnum;
  13. use standard::exchange_struct_handler::ExchangeStructHandler;
  14. use crate::exchange_disguise::{on_depth, on_trade};
  15. use crate::model::OrderInfo;
  16. // 参考 币安 合约 启动
  17. pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  18. core_arc: Arc<Mutex<Core>>,
  19. name: String,
  20. symbols: Vec<String>,
  21. is_colo: bool,
  22. ref_index: usize
  23. ) {
  24. tokio::spawn(async move {
  25. //创建读写通道
  26. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  27. let mut ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::Public).await;
  28. ws.set_subscribe(vec![
  29. BinanceSwapSubscribeType::PuBookTicker,
  30. BinanceSwapSubscribeType::PuAggTrade
  31. ]);
  32. // 读取数据
  33. let core_arc_clone = Arc::clone(&core_arc);
  34. let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
  35. let run_symbol = symbols.clone()[0].clone();
  36. let fun = move |data: ResponseData| {
  37. // 在 async 块之前克隆 Arc
  38. let core_arc_cc = core_arc_clone.clone();
  39. let mul = multiplier.clone();
  40. let rs = run_symbol.clone();
  41. async move {
  42. // 使用克隆后的 Arc,避免 move 语义
  43. on_data(core_arc_cc, &mul, &rs, &data, ref_index).await
  44. }
  45. };
  46. // 链接
  47. let write_tx_am = Arc::new(Mutex::new(write_tx));
  48. ws.set_symbols(symbols);
  49. ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
  50. });
  51. }
  52. // 启动binance交易ws
  53. pub(crate) async fn binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  54. core_arc: Arc<Mutex<Core>>,
  55. name: String,
  56. symbols: Vec<String>,
  57. is_colo: bool,
  58. exchange_params: BTreeMap<String, String>) {
  59. // 参考
  60. reference_binance_swap_run(is_shutdown_arc.clone(), core_arc.clone(), name.clone(), symbols.clone(), is_colo, 233).await;
  61. tokio::spawn(async move {
  62. //创建读写通道
  63. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  64. let binance_login = parse_btree_map_to_binance_swap_login(exchange_params);
  65. let mut ws = BinanceSwapWs::new_label(name, is_colo, Option::from(binance_login), BinanceSwapWsType::Private).await;
  66. ws.set_subscribe(vec![
  67. BinanceSwapSubscribeType::PrPosition,
  68. BinanceSwapSubscribeType::PrAccount,
  69. BinanceSwapSubscribeType::PrBalance,
  70. ]);
  71. // 读取数据
  72. let core_arc_clone = Arc::clone(&core_arc);
  73. let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
  74. let run_symbol = symbols.clone()[0].clone();
  75. let fun = move |data: ResponseData| {
  76. // 在 async 块之前克隆 Arc
  77. let core_arc_cc = core_arc_clone.clone();
  78. let mul = multiplier.clone();
  79. let rs = run_symbol.clone();
  80. async move {
  81. // 使用克隆后的 Arc,避免 move 语义
  82. on_data(core_arc_cc, &mul, &rs, &data, 233).await
  83. }
  84. };
  85. // 链接
  86. let write_tx_am = Arc::new(Mutex::new(write_tx));
  87. ws.set_symbols(symbols);
  88. ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
  89. });
  90. }
  91. async fn on_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, run_symbol: &String, response: &ResponseData, ref_index: usize) {
  92. let mut trace_stack = TraceStack::new(response.time, response.ins);
  93. trace_stack.on_after_span_line();
  94. match response.channel.as_str() {
  95. "info" => {
  96. // info!("connected: {}", response.data.to_string())
  97. }
  98. "aggTrade" => {
  99. trace_stack.set_source("binance_usdt_swap.aggTrade".to_string());
  100. let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, response, multiplier);
  101. trace_stack.on_after_format();
  102. for trade in trades.iter_mut() {
  103. let core_arc_clone = core_arc.clone();
  104. on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade, ref_index).await;
  105. }
  106. }
  107. "bookTicker" => {
  108. trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
  109. // 将ticker数据转换为模拟深度
  110. let depth = ExchangeStructHandler::book_ticker_handle(ExchangeEnum::BinanceSwap, response, multiplier);
  111. trace_stack.on_after_format();
  112. on_depth(core_arc, &response.label, &mut trace_stack, &depth, ref_index).await;
  113. }
  114. "ACCOUNT_UPDATE" => {
  115. let account = ExchangeStructHandler::account_info_handle(ExchangeEnum::BinanceSwap, response, run_symbol);
  116. let positions = ExchangeStructHandler::position_handle(ExchangeEnum::BinanceSwap, response, multiplier);
  117. let mut core = core_arc.lock().await;
  118. core.update_position(positions).await;
  119. core.update_equity(account).await;
  120. }
  121. "ORDER_TRADE_UPDATE" => {
  122. trace_stack.set_source("binance_swap.ORDER_TRADE_UPDATE".to_string());
  123. let orders = ExchangeStructHandler::order_handle(ExchangeEnum::BinanceSwap, response, &Decimal::ONE);
  124. let mut order_infos:Vec<OrderInfo> = Vec::new();
  125. for mut order in orders.order {
  126. if order.status == "NULL" {
  127. error!("binance_usdt_swap 未识别的订单状态:{:?}", response);
  128. continue;
  129. }
  130. let order_info = OrderInfo::parse_order_to_order_info(&mut order);
  131. order_infos.push(order_info);
  132. }
  133. let mut core = core_arc.lock().await;
  134. core.update_order(order_infos, trace_stack).await;
  135. }
  136. _ => {
  137. error!("未知推送类型");
  138. error!(?response);
  139. }
  140. }
  141. }
  142. fn parse_btree_map_to_binance_swap_login(exchange_params: BTreeMap<String, String>) -> BinanceSwapLogin {
  143. BinanceSwapLogin {
  144. api_key: exchange_params.get("access_key").unwrap().clone(),
  145. api_secret: exchange_params.get("secret_key").unwrap().clone(),
  146. }
  147. }