gate_swap.rs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. use std::collections::BTreeMap;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use futures_util::StreamExt;
  5. use rust_decimal::Decimal;
  6. use serde_json::Value;
  7. use tokio::spawn;
  8. use tokio::sync::Mutex;
  9. use tracing::info;
  10. use exchanges::gate_swap_rest::GateSwapRest;
  11. use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  12. use exchanges::response_base::ResponseData;
  13. use global::trace_stack::TraceStack;
  14. use standard::exchange::ExchangeEnum::GateSwap;
  15. use crate::model::{OrderInfo, OriginalTradeGa};
  16. use crate::quant::Quant;
  17. use crate::exchange_disguise::on_special_depth;
  18. // 1交易、0参考 gate 合约 启动
  19. pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
  20. is_trade: bool,
  21. quant_arc: Arc<Mutex<Quant>>,
  22. name: String,
  23. symbols: Vec<String>,
  24. is_colo: bool,
  25. exchange_params: BTreeMap<String, String>) {
  26. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  27. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  28. let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone());
  29. let mut user_id= "".to_string();
  30. // 交易
  31. if is_trade {
  32. // 获取user_id
  33. let res_data = gate_exc.wallet_fee().await;
  34. assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
  35. let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
  36. info!(?wallet_obj);
  37. user_id = wallet_obj["user_id"].to_string();
  38. }
  39. let write_tx_am = Arc::new(Mutex::new(write_tx));
  40. let symbols_clone = symbols.clone();
  41. spawn(async move {
  42. let mut ws;
  43. // 交易
  44. if is_trade {
  45. let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
  46. ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param),
  47. GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  48. ws.set_subscribe(vec![
  49. // GateSwapSubscribeType::PuFuturesTrades,
  50. GateSwapSubscribeType::PuFuturesOrderBook,
  51. GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
  52. GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
  53. GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
  54. ]);
  55. } else { // 参考
  56. ws = GateSwapWs::new_label(name.clone(), is_colo, None,
  57. GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  58. ws.set_subscribe(vec![
  59. GateSwapSubscribeType::PuFuturesTrades,
  60. GateSwapSubscribeType::PuFuturesOrderBook
  61. ]);
  62. }
  63. ws.set_symbols(symbols_clone);
  64. ws.ws_connect_async(bool_v1, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  65. });
  66. spawn(async move {
  67. let bot_arc_clone = Arc::clone(&quant_arc);
  68. let run_symbol = symbols.clone()[0].clone();
  69. // trade
  70. let mut update_flag_u = Decimal::ZERO;
  71. let mut max_buy = Decimal::ZERO;
  72. let mut min_sell = Decimal::ZERO;
  73. let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
  74. loop {
  75. if let Some(data) = read_rx.next().await {
  76. let mut trace_stack = TraceStack::default();
  77. trace_stack.on_after_network(data.time);
  78. trace_stack.on_before_unlock_quant();
  79. if data.time != 0 {
  80. info!("gate>{}", trace_stack.to_string());
  81. }
  82. // on_data(bot_arc_clone.clone(),
  83. // &mut update_flag_u,
  84. // multiplier,
  85. // run_symbol.clone(),
  86. // &mut max_buy,
  87. // &mut min_sell,
  88. // data).await;
  89. }
  90. }
  91. });
  92. }
  93. async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
  94. update_flag_u: &mut Decimal,
  95. multiplier: Decimal,
  96. run_symbol: String,
  97. max_buy: &mut Decimal,
  98. min_sell: &mut Decimal,
  99. data: ResponseData) {
  100. let mut trace_stack = TraceStack::default();
  101. trace_stack.on_after_network(data.time);
  102. trace_stack.on_before_unlock_quant();
  103. // if data.code != "200".to_string() {
  104. // return;
  105. // }
  106. //
  107. // if data.channel == "futures.order_book" {
  108. // trace_stack.on_before_format();
  109. // let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data.clone());
  110. // trace_stack.on_before_network(depth.create_at.clone());
  111. // trace_stack.on_after_format();
  112. //
  113. // on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, depth).await;
  114. // } else if data.channel == "futures.balances" {
  115. // let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
  116. // {
  117. // let mut quant = bot_arc_clone.lock().await;
  118. // quant.update_equity(account).await;
  119. // }
  120. // } else if data.channel == "futures.orders" {
  121. // trace_stack.on_before_format();
  122. // let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
  123. // trace_stack.on_after_format();
  124. //
  125. // let mut order_infos:Vec<OrderInfo> = Vec::new();
  126. // for order in orders.order {
  127. // let order_info = OrderInfo {
  128. // symbol: "".to_string(),
  129. // amount: order.amount.abs(),
  130. // side: "".to_string(),
  131. // price: order.price,
  132. // client_id: order.custom_id,
  133. // filled_price: order.avg_price,
  134. // filled: order.deal_amount.abs(),
  135. // order_id: order.id,
  136. // local_time: 0,
  137. // create_time: 0,
  138. // status: order.status,
  139. // fee: Default::default(),
  140. // trace_stack: Default::default(),
  141. // };
  142. // order_infos.push(order_info);
  143. // }
  144. //
  145. // {
  146. // let mut quant = bot_arc_clone.lock().await;
  147. // quant.update_order(order_infos, trace_stack);
  148. // }
  149. // } else if data.channel == "futures.positions" {
  150. // let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
  151. // {
  152. // let mut quant = bot_arc_clone.lock().await;
  153. // quant.update_position(positions).await;
  154. // }
  155. // } else if data.channel == "futures.trades" {
  156. // let mut quant = bot_arc_clone.lock().await;
  157. // let str = data.label.clone();
  158. // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
  159. // *max_buy = Decimal::ZERO;
  160. // *min_sell = Decimal::ZERO;
  161. // quant.is_update.remove(str.as_str());
  162. // }
  163. // let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
  164. // for trade in trades {
  165. // if trade.price > *max_buy || *max_buy == Decimal::ZERO{
  166. // *max_buy = trade.price
  167. // }
  168. // if trade.price < *min_sell || *min_sell == Decimal::ZERO{
  169. // *min_sell = trade.price
  170. // }
  171. // }
  172. // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
  173. // }
  174. }
  175. fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap<String, String>) -> GateSwapLogin {
  176. GateSwapLogin {
  177. api_key: exchange_params.get("access_key").unwrap().clone(),
  178. secret: exchange_params.get("secret_key").unwrap().clone()
  179. }
  180. }