gate_swap.rs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. use tracing::{error, info};
  2. use std::collections::BTreeMap;
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use rust_decimal::Decimal;
  6. use tokio::spawn;
  7. use tokio::sync::Mutex;
  8. use exchanges::gate_swap_rest::GateSwapRest;
  9. use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  10. use exchanges::response_base::ResponseData;
  11. use global::trace_stack::{TraceStack};
  12. use standard::exchange::ExchangeEnum::{GateSwap};
  13. use crate::model::{OrderInfo};
  14. use crate::core::Core;
  15. use crate::exchange_disguise::on_special_depth;
  16. // 1交易、0参考 gate 合约 启动
  17. pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
  18. is_trade: bool,
  19. core_arc: Arc<Mutex<Core>>,
  20. name: String,
  21. symbols: Vec<String>,
  22. is_colo: bool,
  23. exchange_params: BTreeMap<String, String>) {
  24. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  25. let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone());
  26. let mut user_id= "".to_string();
  27. // 交易
  28. if is_trade {
  29. // 获取user_id
  30. let res_data = gate_exc.wallet_fee().await;
  31. assert_eq!(res_data.code, 200, "获取gate交易所参数 user_id 失败, 启动失败!");
  32. let wallet_obj = res_data.data;
  33. info!(?wallet_obj);
  34. user_id = wallet_obj["user_id"].to_string();
  35. }
  36. let write_tx_am = Arc::new(Mutex::new(write_tx));
  37. let symbols_clone = symbols.clone();
  38. spawn(async move {
  39. let mut ws;
  40. // 交易
  41. if is_trade {
  42. let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
  43. ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param),
  44. GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  45. ws.set_subscribe(vec![
  46. // GateSwapSubscribeType::PuFuturesTrades,
  47. GateSwapSubscribeType::PuFuturesBookTicker,
  48. GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
  49. GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
  50. GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
  51. ]);
  52. } else { // 参考
  53. ws = GateSwapWs::new_label(name.clone(), is_colo, None,
  54. GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  55. ws.set_subscribe(vec![
  56. GateSwapSubscribeType::PuFuturesTrades,
  57. GateSwapSubscribeType::PuFuturesBookTicker
  58. ]);
  59. }
  60. // 读取数据
  61. let mut update_flag_u = Decimal::ZERO;
  62. let core_arc_clone = Arc::clone(&core_arc);
  63. let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
  64. let run_symbol = symbols.clone()[0].clone();
  65. let fun = move |data: ResponseData| {
  66. let core_arc_cc = core_arc_clone.clone();
  67. // 在 async 块之前克隆 Arc
  68. let mul = multiplier.clone();
  69. let rs = run_symbol.clone();
  70. async move {
  71. on_data(core_arc_cc,
  72. &mut update_flag_u,
  73. &mul,
  74. &rs,
  75. data,
  76. ).await
  77. }
  78. };
  79. // 建立链接
  80. ws.set_symbols(symbols_clone);
  81. ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  82. });
  83. }
  84. async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
  85. update_flag_u: &mut Decimal,
  86. multiplier: &Decimal,
  87. run_symbol: &String,
  88. response: ResponseData) {
  89. let mut trace_stack = TraceStack::new(response.time, response.ins);
  90. trace_stack.on_after_span_line();
  91. match response.channel.as_str() {
  92. "futures.order_book" => {
  93. trace_stack.set_source("gate_usdt_swap.order_book".to_string());
  94. let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &response);
  95. trace_stack.on_after_format();
  96. // 网络时间差
  97. let mut distance = -1;
  98. if !response.label.contains("ref") {
  99. distance = response.time - special_depth.create_at/1000;
  100. }
  101. on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
  102. }
  103. "futures.book_ticker" => {
  104. trace_stack.set_source("gate_usdt_swap.book_ticker".to_string());
  105. // 将ticker数据转换为模拟深度
  106. let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &response);
  107. trace_stack.on_after_format();
  108. // 网络时间差
  109. let mut distance = -1;
  110. if !response.label.contains("ref") {
  111. distance = response.time - special_depth.create_at/1000;
  112. }
  113. on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth, distance).await;
  114. }
  115. "futures.balances" => {
  116. let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, &response, run_symbol);
  117. let mut core = core_arc_clone.lock().await;
  118. core.update_equity(account).await;
  119. }
  120. "futures.orders" => {
  121. trace_stack.set_source("gate_swap.orders".to_string());
  122. let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, response.clone(), multiplier.clone());
  123. let mut order_infos:Vec<OrderInfo> = Vec::new();
  124. for mut order in orders.order {
  125. if order.status == "NULL" {
  126. error!("gate_usdt_swap 未识别的订单状态:{:?}", response);
  127. continue;
  128. }
  129. let order_info = OrderInfo::parse_order_to_order_info(&mut order);
  130. order_infos.push(order_info);
  131. }
  132. {
  133. let mut core = core_arc_clone.lock().await;
  134. core.update_order(order_infos, trace_stack).await;
  135. }
  136. }
  137. "futures.positions" => {
  138. let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap, &response, multiplier);
  139. let mut core = core_arc_clone.lock().await;
  140. core.update_position(positions).await;
  141. }
  142. "futures.trades" => {
  143. // let mut core = core_arc_clone.lock().await;
  144. // let str = data.label.clone();
  145. // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
  146. // *max_buy = Decimal::ZERO;
  147. // *min_sell = Decimal::ZERO;
  148. // core.is_update.remove(str.as_str());
  149. // }
  150. // let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
  151. // for trade in trades {
  152. // if trade.price > *max_buy || *max_buy == Decimal::ZERO{
  153. // *max_buy = trade.price
  154. // }
  155. // if trade.price < *min_sell || *min_sell == Decimal::ZERO{
  156. // *min_sell = trade.price
  157. // }
  158. // }
  159. // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
  160. }
  161. _ => {
  162. error!("未知推送类型");
  163. error!(?response);
  164. }
  165. }
  166. }
  167. fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap<String, String>) -> GateSwapLogin {
  168. GateSwapLogin {
  169. api_key: exchange_params.get("access_key").unwrap().clone(),
  170. secret: exchange_params.get("secret_key").unwrap().clone()
  171. }
  172. }