|
|
@@ -0,0 +1,133 @@
|
|
|
+use tracing::{error};
|
|
|
+use std::collections::BTreeMap;
|
|
|
+use std::sync::Arc;
|
|
|
+use std::sync::atomic::AtomicBool;
|
|
|
+use rust_decimal::Decimal;
|
|
|
+use tokio::spawn;
|
|
|
+use tokio::sync::Mutex;
|
|
|
+use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
|
|
|
+use exchanges::response_base::ResponseData;
|
|
|
+use global::trace_stack::{TraceStack};
|
|
|
+use standard::exchange::ExchangeEnum::{HtxSwap};
|
|
|
+use crate::model::{OrderInfo};
|
|
|
+use crate::core::Core;
|
|
|
+use crate::exchange_disguise::on_special_depth;
|
|
|
+
|
|
|
+// 1交易、0参考 htx 合约 启动
|
|
|
+pub async fn htx_swap_run(is_shutdown_arc: Arc<AtomicBool>,
|
|
|
+ is_trade: bool,
|
|
|
+ core_arc: Arc<Mutex<Core>>,
|
|
|
+ name: String,
|
|
|
+ symbols: Vec<String>,
|
|
|
+ _is_colo: bool,
|
|
|
+ exchange_params: BTreeMap<String, String>) {
|
|
|
+ let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
+
|
|
|
+ let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
+ let symbols_clone = symbols.clone();
|
|
|
+ spawn(async move {
|
|
|
+ let mut ws;
|
|
|
+ // 交易
|
|
|
+ if is_trade {
|
|
|
+ let login_param = parse_btree_map_to_htx_swap_login(exchange_params);
|
|
|
+ ws = HtxSwapWs::new_label(name.clone(), Some(login_param), HtxSwapWsType::Private);
|
|
|
+ ws.set_subscribe(vec![
|
|
|
+ HtxSwapSubscribeType::PuFuturesDepth,
|
|
|
+
|
|
|
+ HtxSwapSubscribeType::PrFuturesOrders,
|
|
|
+ HtxSwapSubscribeType::PrFuturesPositions,
|
|
|
+ HtxSwapSubscribeType::PrFuturesBalances
|
|
|
+ ]);
|
|
|
+ } else { // 参考
|
|
|
+ ws = HtxSwapWs::new_label(name.clone(), None,
|
|
|
+ HtxSwapWsType::Public);
|
|
|
+ ws.set_subscribe(vec![
|
|
|
+ HtxSwapSubscribeType::PuFuturesDepth
|
|
|
+ ]);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 读取数据
|
|
|
+ let mut update_flag_u = Decimal::ZERO;
|
|
|
+ let core_arc_clone = Arc::clone(&core_arc);
|
|
|
+ let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
|
|
|
+ let run_symbol = symbols.clone()[0].clone();
|
|
|
+
|
|
|
+ let fun = move |data: ResponseData| {
|
|
|
+ let core_arc_cc = core_arc_clone.clone();
|
|
|
+ // 在 async 块之前克隆 Arc
|
|
|
+ let mul = multiplier.clone();
|
|
|
+ let rs = run_symbol.clone();
|
|
|
+
|
|
|
+ async move {
|
|
|
+ on_data(core_arc_cc,
|
|
|
+ &mut update_flag_u,
|
|
|
+ &mul,
|
|
|
+ &rs,
|
|
|
+ data,
|
|
|
+ ).await
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // 建立链接
|
|
|
+ ws.set_symbols(symbols_clone);
|
|
|
+ ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
|
|
|
+ update_flag_u: &mut Decimal,
|
|
|
+ multiplier: &Decimal,
|
|
|
+ run_symbol: &String,
|
|
|
+ response: ResponseData) {
|
|
|
+ let mut trace_stack = TraceStack::new(response.time, response.ins);
|
|
|
+ trace_stack.on_after_span_line();
|
|
|
+ let channel_symbol = run_symbol.replace("_", "-");
|
|
|
+ let depth_channel = format!("market.{}.depth.step0", channel_symbol.to_uppercase());
|
|
|
+ let order_channel = format!("orders_cross.{}", channel_symbol.to_lowercase());
|
|
|
+ let position_channel = format!("positions_cross.{}", channel_symbol.to_uppercase());
|
|
|
+ let balance_channel = "accounts_cross.USDT";
|
|
|
+ if response.channel == depth_channel { // 深度频道
|
|
|
+ trace_stack.set_source("htx_usdt_swap.depth".to_string());
|
|
|
+ let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(HtxSwap, &response);
|
|
|
+ trace_stack.on_after_format();
|
|
|
+
|
|
|
+ on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
|
|
|
+ } else if response.channel == order_channel { // 订单频道
|
|
|
+ trace_stack.set_source("htx_swap.orders".to_string());
|
|
|
+ let orders = standard::handle_info::HandleSwapInfo::handle_order(HtxSwap, response.clone(), multiplier.clone());
|
|
|
+ let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
+ for mut order in orders.order {
|
|
|
+ if order.status == "NULL" {
|
|
|
+ error!("htx_usdt_swap 未识别的订单状态:{:?}", response);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ let order_info = OrderInfo::parse_order_to_order_info(&mut order);
|
|
|
+ order_infos.push(order_info);
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+ core.update_order(order_infos, trace_stack).await;
|
|
|
+ }
|
|
|
+ } else if response.channel == position_channel { // 仓位频道
|
|
|
+ let positions = standard::handle_info::HandleSwapInfo::handle_position(HtxSwap, &response, multiplier);
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+
|
|
|
+ core.update_position(positions).await;
|
|
|
+ } else if response.channel == balance_channel { // 余额频道
|
|
|
+ let account = standard::handle_info::HandleSwapInfo::handle_account_info(HtxSwap, &response, run_symbol);
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+
|
|
|
+ core.update_equity(account).await;
|
|
|
+ } else {
|
|
|
+ error!("未知推送类型");
|
|
|
+ error!(?response);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+fn parse_btree_map_to_htx_swap_login(exchange_params: BTreeMap<String, String>) -> HtxSwapLogin {
|
|
|
+ HtxSwapLogin {
|
|
|
+ api_key: exchange_params.get("access_key").unwrap().clone(),
|
|
|
+ secret: exchange_params.get("secret_key").unwrap().clone()
|
|
|
+ }
|
|
|
+}
|