kucoin_swap.rs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. use std::collections::{BTreeMap};
  2. use std::sync::Arc;
  3. use std::sync::atomic::{AtomicBool};
  4. use std::time::Duration;
  5. use rust_decimal::Decimal;
  6. use tokio::spawn;
  7. use tokio::sync::Mutex;
  8. use tokio::time::sleep;
  9. use tracing::error;
  10. use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  11. use exchanges::response_base::ResponseData;
  12. use global::trace_stack::TraceStack;
  13. use standard::exchange::ExchangeEnum::KucoinSwap;
  14. use crate::exchange_disguise::on_special_depth;
  15. use crate::model::{OrderInfo};
  16. use crate::core::Core;
  17. // 1交易、0参考 kucoin 合约 启动
  18. pub async fn kucoin_swap_run(is_shutdown_arc :Arc<AtomicBool>,
  19. is_trade: bool,
  20. core_arc: Arc<Mutex<Core>>,
  21. name: String,
  22. symbols: Vec<String>,
  23. is_colo: bool,
  24. exchange_params: BTreeMap<String, String>) {
  25. let mut symbol_arr = Vec::new();
  26. for symbol in &symbols {
  27. let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSwap,symbol.as_str());
  28. let new_symbol = symbol_mapper.replace("_", "").to_uppercase() + "M";
  29. symbol_arr.push(new_symbol);
  30. }
  31. // 新增定期获取余额的协程
  32. let account_core_arc = core_arc.clone();
  33. spawn(async move {
  34. loop {
  35. // 每30秒重新获取一次
  36. sleep(Duration::from_secs(30)).await;
  37. {
  38. let mut core = account_core_arc.lock().await;
  39. core.update_equity_rest_swap().await;
  40. }
  41. }
  42. });
  43. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  44. let write_tx_am = Arc::new(Mutex::new(write_tx));
  45. spawn(async move {
  46. //创建读写通道
  47. spawn( async move {
  48. let mut ws;
  49. // 交易
  50. if is_trade {
  51. let login_params = parse_btree_map_to_kucoin_swap_login(exchange_params);
  52. ws = KucoinSwapWs::new_label(name.clone(), is_colo, Option::from(login_params), KucoinSwapWsType::Private).await;
  53. ws.set_subscribe(vec![
  54. KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
  55. KucoinSwapSubscribeType::PrContractPosition,
  56. KucoinSwapSubscribeType::PrContractMarketTradeOrders
  57. ]);
  58. } else { // 参考
  59. ws = KucoinSwapWs::new_label(name.clone(), is_colo, None, KucoinSwapWsType::Public).await;
  60. ws.set_subscribe(vec![
  61. KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
  62. // KucoinSwapSubscribeType::PuContractMarketExecution
  63. ]);
  64. }
  65. // 数据处理
  66. let core_arc_clone = Arc::clone(&core_arc);
  67. let mut update_flag_u = Decimal::ZERO;
  68. // let mut max_buy = Decimal::ZERO;
  69. // let mut min_sell = Decimal::ZERO;
  70. let multiplier = core_arc.lock().await.platform_rest.get_self_market().ct_val;
  71. let fun = move |data: ResponseData| {
  72. // 在 async 块之前克隆 Arc
  73. let core_arc_cc = core_arc_clone.clone();
  74. let mul = multiplier.clone();
  75. async move {
  76. on_data(core_arc_cc,
  77. &mut update_flag_u,
  78. &mul,
  79. // &mut max_buy,
  80. // &mut min_sell,
  81. data).await
  82. }
  83. };
  84. // 建立链接
  85. ws.set_symbols(symbol_arr);
  86. ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  87. });
  88. });
  89. }
  90. async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
  91. update_flag_u: &mut Decimal,
  92. multiplier: &Decimal,
  93. // max_buy: &mut Decimal,
  94. // min_sell: &mut Decimal,
  95. response: ResponseData) {
  96. let mut trace_stack = TraceStack::new(response.time, response.ins);
  97. trace_stack.on_after_span_line();
  98. match response.channel.as_str() {
  99. "level2" => {
  100. trace_stack.set_source("kucoin_usdt_swap.level2".to_string());
  101. let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap, &response);
  102. trace_stack.on_after_format();
  103. on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await
  104. }
  105. "tickerV2" => {
  106. trace_stack.set_source("kucoin_swap.tickerV2".to_string());
  107. let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(KucoinSwap, &response);
  108. trace_stack.on_before_network(special_depth.create_at.clone());
  109. on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await
  110. }
  111. "symbolOrderChange" => {
  112. trace_stack.set_source("kucoin_swap.symbolOrderChange".to_string());
  113. let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, response, multiplier.clone());
  114. let mut order_infos:Vec<OrderInfo> = Vec::new();
  115. for mut order in orders.order {
  116. if order.status == "NULL" {
  117. continue;
  118. }
  119. let order_info = OrderInfo::parse_order_to_order_info(&mut order);
  120. order_infos.push(order_info);
  121. }
  122. let mut core = core_arc_clone.lock().await;
  123. core.update_order(order_infos, trace_stack).await;
  124. }
  125. "position.change" => {
  126. let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap, &response, multiplier);
  127. let mut core = core_arc_clone.lock().await;
  128. core.update_position(positions).await;
  129. }
  130. _ => {
  131. error!("kucoin_swap 未知推送类型");
  132. error!(?response);
  133. }
  134. }
  135. }
  136. fn parse_btree_map_to_kucoin_swap_login(exchange_params: BTreeMap<String, String>) -> KucoinSwapLogin {
  137. KucoinSwapLogin {
  138. access_key: exchange_params.get("access_key").unwrap().clone(),
  139. secret_key: exchange_params.get("secret_key").unwrap().clone(),
  140. pass_key: exchange_params.get("pass_key").unwrap().clone(),
  141. }
  142. }