kucoin_spot.rs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. // use std::collections::BTreeMap;
  2. // use std::sync::Arc;
  3. // use std::sync::atomic::AtomicBool;
  4. // use futures_util::StreamExt;
  5. // use rust_decimal::Decimal;
  6. // use tokio::sync::Mutex;
  7. // use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
  8. // use exchanges::response_base::ResponseData;
  9. // use global::trace_stack::TraceStack;
  10. // use standard::exchange::ExchangeEnum::KucoinSpot;
  11. // use crate::exchange_disguise::on_special_depth;
  12. // use crate::model::OriginalTradeGa;
  13. // use crate::core::Core;
  14. //
  15. // // 1交易、0参考 kucoin 现货 启动
  16. // pub async fn kucoin_spot_run(is_shutdown_arc: Arc<AtomicBool>,
  17. // _is_trade: bool,
  18. // core_arc: Arc<Mutex<Core>>,
  19. // name: String,
  20. // symbols: Vec<String>,
  21. // is_colo: bool,
  22. // _exchange_params: BTreeMap<String, String>) {
  23. // let mut symbol_arr = Vec::new();
  24. // for symbol in symbols {
  25. // let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot, symbol.as_str());
  26. // let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
  27. // symbol_arr.push(new_symbol);
  28. // }
  29. //
  30. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  31. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  32. //
  33. // let mut ws = KucoinSpotWs::new_label(name.clone(), is_colo, None, KucoinSpotWsType::Public).await;
  34. // ws.set_symbols(symbol_arr);
  35. // ws.set_subscribe(vec![
  36. // KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50,
  37. // // KucoinSpotSubscribeType::PuMarketTicker, // python说:订阅 ticker来的很慢
  38. // KucoinSpotSubscribeType::PuMarketMatch,
  39. // ]);
  40. //
  41. // // 开启ws
  42. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  43. // tokio::spawn(async move {
  44. // //链接
  45. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  46. // ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  47. // });
  48. // //读取
  49. // // let is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  50. // tokio::spawn(async move {
  51. // let core_arc_clone = Arc::clone(&core_arc);
  52. // // trade
  53. // let mut update_flag_u = Decimal::ZERO;
  54. // let mut max_buy = Decimal::ZERO;
  55. // let mut min_sell = Decimal::ZERO;
  56. // let multiplier = Decimal::ONE;
  57. // // let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
  58. // // let run_symbol = symbols.clone()[0].clone();
  59. //
  60. // loop {
  61. // if let Some(data) = read_rx.next().await {
  62. // on_kucoin_spot_data(core_arc_clone.clone(),
  63. // &mut update_flag_u,
  64. // multiplier,
  65. // &mut max_buy,
  66. // &mut min_sell,
  67. // data).await;
  68. // }
  69. // }
  70. // });
  71. // }
  72. //
  73. // async fn on_kucoin_spot_data(core_arc_clone: Arc<Mutex<Core>>,
  74. // update_flag_u: &mut Decimal,
  75. // _multiplier: Decimal,
  76. // max_buy: &mut Decimal,
  77. // min_sell: &mut Decimal,
  78. // data: ResponseData) {
  79. // let mut trace_stack = TraceStack::new(0, Instant::now());
  80. // trace_stack.on_after_network(data.time);
  81. // trace_stack.on_before_unlock_core();
  82. //
  83. // if data.code != "200".to_string() {
  84. // return;
  85. // }
  86. //
  87. // if data.channel == "level2" {
  88. // trace_stack.on_before_format();
  89. // let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot, data.clone());
  90. // trace_stack.on_before_network(special_depth.create_at.clone());
  91. // trace_stack.on_after_format();
  92. //
  93. // on_special_depth(core_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
  94. // } else if data.channel == "trade.ticker" {
  95. // trace_stack.on_before_format();
  96. // let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data.clone());
  97. // trace_stack.on_before_network(special_depth.create_at.clone());
  98. // trace_stack.on_after_format();
  99. //
  100. // on_special_depth(core_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
  101. // } else if data.channel == "trade.l3match" {
  102. // let mut core = core_arc_clone.lock().await;
  103. // let str = data.label.clone();
  104. // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
  105. // *max_buy = Decimal::ZERO;
  106. // *min_sell = Decimal::ZERO;
  107. // core.is_update.remove(str.as_str());
  108. // }
  109. // let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
  110. // if trade.price > *max_buy || *max_buy == Decimal::ZERO {
  111. // *max_buy = trade.price
  112. // }
  113. // if trade.price < *min_sell || *min_sell == Decimal::ZERO {
  114. // *min_sell = trade.price
  115. // }
  116. // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
  117. // } else if data.channel == "availableBalance.change" {
  118. // // 取消原有推送解析,因为推送的信息不准确
  119. // // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
  120. // // {
  121. // // let mut core = core_arc_clone.lock().await;
  122. // // core.update_equity(account);
  123. // // }
  124. // } else if data.channel == "symbolOrderChange" {
  125. // // trace_stack.on_before_format();
  126. // // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
  127. // // trace_stack.on_after_format();
  128. // // let mut order_infos:Vec<OrderInfo> = Vec::new();
  129. // // for order in orders.order {
  130. // // if order.status == "NULL" {
  131. // // continue;
  132. // // }
  133. // // let order_info = OrderInfo {
  134. // // symbol: "".to_string(),
  135. // // amount: order.amount.abs(),
  136. // // side: "".to_string(),
  137. // // price: order.price,
  138. // // client_id: order.custom_id,
  139. // // filled_price: order.avg_price,
  140. // // filled: order.deal_amount.abs(),
  141. // // order_id: order.id,
  142. // // local_time: 0,
  143. // // create_time: 0,
  144. // // status: order.status,
  145. // // fee: Default::default(),
  146. // // trace_stack: Default::default(),
  147. // // };
  148. // // order_infos.push(order_info);
  149. // // }
  150. // //
  151. // // {
  152. // // let mut core = core_arc_clone.lock().await;
  153. // // core.update_order(order_infos, trace_stack);
  154. // // }
  155. // } else if data.channel == "position.change" {
  156. // // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
  157. // // {
  158. // // let mut core = core_arc_clone.lock().await;
  159. // // core.update_position(positions);
  160. // // }
  161. // }
  162. // }