htx_usdt_swap.rs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. use tracing::{error};
  2. use std::collections::BTreeMap;
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use rust_decimal::Decimal;
  6. use tokio::spawn;
  7. use tokio::sync::Mutex;
  8. use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
  9. use exchanges::response_base::ResponseData;
  10. use global::trace_stack::{TraceStack};
  11. use standard::exchange::ExchangeEnum::{HtxSwap};
  12. use crate::model::{OrderInfo};
  13. use crate::core::Core;
  14. use crate::exchange_disguise::on_special_depth;
  15. // 1交易、0参考 htx 合约 启动
  16. pub async fn htx_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  17. is_trade: bool,
  18. core_arc: Arc<Mutex<Core>>,
  19. name: String,
  20. symbols: Vec<String>,
  21. _is_colo: bool,
  22. exchange_params: BTreeMap<String, String>) {
  23. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  24. let write_tx_am = Arc::new(Mutex::new(write_tx));
  25. let symbols_clone = symbols.clone();
  26. spawn(async move {
  27. let mut ws;
  28. // 交易
  29. if is_trade {
  30. let login_param = parse_btree_map_to_htx_swap_login(exchange_params);
  31. ws = HtxSwapWs::new_label(name.clone(), Some(login_param), HtxSwapWsType::Private);
  32. ws.set_subscribe(vec![
  33. HtxSwapSubscribeType::PuFuturesDepth,
  34. HtxSwapSubscribeType::PrFuturesOrders,
  35. HtxSwapSubscribeType::PrFuturesPositions,
  36. HtxSwapSubscribeType::PrFuturesBalances
  37. ]);
  38. } else { // 参考
  39. ws = HtxSwapWs::new_label(name.clone(), None,
  40. HtxSwapWsType::Public);
  41. ws.set_subscribe(vec![
  42. HtxSwapSubscribeType::PuFuturesDepth
  43. ]);
  44. }
  45. // 读取数据
  46. let mut update_flag_u = Decimal::ZERO;
  47. let core_arc_clone = Arc::clone(&core_arc);
  48. let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
  49. let run_symbol = symbols.clone()[0].clone();
  50. let fun = move |data: ResponseData| {
  51. let core_arc_cc = core_arc_clone.clone();
  52. // 在 async 块之前克隆 Arc
  53. let mul = multiplier.clone();
  54. let rs = run_symbol.clone();
  55. async move {
  56. on_data(core_arc_cc,
  57. &mut update_flag_u,
  58. &mul,
  59. &rs,
  60. data,
  61. ).await
  62. }
  63. };
  64. // 建立链接
  65. ws.set_symbols(symbols_clone);
  66. ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  67. });
  68. }
  69. async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
  70. update_flag_u: &mut Decimal,
  71. multiplier: &Decimal,
  72. run_symbol: &String,
  73. response: ResponseData) {
  74. let mut trace_stack = TraceStack::new(response.time, response.ins);
  75. trace_stack.on_after_span_line();
  76. let channel_symbol = run_symbol.replace("_", "-");
  77. let depth_channel = format!("market.{}.depth.step0", channel_symbol.to_uppercase());
  78. let order_channel = format!("orders_cross.{}", channel_symbol.to_lowercase());
  79. let position_channel = format!("positions_cross.{}", channel_symbol.to_uppercase());
  80. let balance_channel = "accounts_cross.USDT";
  81. if response.channel == depth_channel { // 深度频道
  82. trace_stack.set_source("htx_usdt_swap.depth".to_string());
  83. let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(HtxSwap, &response);
  84. trace_stack.on_after_format();
  85. on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
  86. } else if response.channel == order_channel { // 订单频道
  87. trace_stack.set_source("htx_swap.orders".to_string());
  88. let orders = standard::handle_info::HandleSwapInfo::handle_order(HtxSwap, response.clone(), multiplier.clone());
  89. let mut order_infos:Vec<OrderInfo> = Vec::new();
  90. for mut order in orders.order {
  91. if order.status == "NULL" {
  92. error!("htx_usdt_swap 未识别的订单状态:{:?}", response);
  93. continue;
  94. }
  95. let order_info = OrderInfo::parse_order_to_order_info(&mut order);
  96. order_infos.push(order_info);
  97. }
  98. {
  99. let mut core = core_arc_clone.lock().await;
  100. core.update_order(order_infos, trace_stack).await;
  101. }
  102. } else if response.channel == position_channel { // 仓位频道
  103. let positions = standard::handle_info::HandleSwapInfo::handle_position(HtxSwap, &response, multiplier);
  104. let mut core = core_arc_clone.lock().await;
  105. core.update_position(positions).await;
  106. } else if response.channel == balance_channel { // 余额频道
  107. let account = standard::handle_info::HandleSwapInfo::handle_account_info(HtxSwap, &response, run_symbol);
  108. let mut core = core_arc_clone.lock().await;
  109. core.update_equity(account).await;
  110. } else {
  111. error!("未知推送类型");
  112. error!(?response);
  113. }
  114. }
  115. fn parse_btree_map_to_htx_swap_login(exchange_params: BTreeMap<String, String>) -> HtxSwapLogin {
  116. HtxSwapLogin {
  117. api_key: exchange_params.get("access_key").unwrap().clone(),
  118. secret: exchange_params.get("secret_key").unwrap().clone()
  119. }
  120. }