| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- use tracing::{error, info};
- use std::collections::BTreeMap;
- use std::sync::Arc;
- use std::sync::atomic::AtomicBool;
- use rust_decimal::Decimal;
- use tokio::spawn;
- use tokio::sync::Mutex;
- use exchanges::gate_swap_rest::GateSwapRest;
- use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
- use exchanges::response_base::ResponseData;
- use global::trace_stack::{TraceStack};
- use standard::exchange::ExchangeEnum::{GateSwap};
- use crate::model::{OrderInfo};
- use crate::core::Core;
- use crate::exchange_disguise::on_special_depth;
- // 1交易、0参考 gate 合约 启动
- pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
- is_trade: bool,
- core_arc: Arc<Mutex<Core>>,
- name: String,
- symbols: Vec<String>,
- is_colo: bool,
- symbol_multiplier: Decimal,
- exchange_params: BTreeMap<String, String>) {
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
- let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone());
- let mut user_id = "".to_string();
- // 交易
- if is_trade {
- // 获取user_id
- let res_data = gate_exc.wallet_fee().await;
- assert_eq!(res_data.code, 200, "获取gate交易所参数 user_id 失败, 启动失败!");
- let wallet_obj = res_data.data;
- info!(?wallet_obj);
- user_id = wallet_obj["user_id"].to_string();
- }
- let write_tx_am = Arc::new(Mutex::new(write_tx));
- let symbols_clone = symbols.clone();
- spawn(async move {
- let mut ws;
- // 交易
- if is_trade {
- let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
- ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param),
- GateSwapWsType::PublicAndPrivate("usdt".to_string()));
- ws.set_subscribe(vec![
- // GateSwapSubscribeType::PuFuturesTrades,
- GateSwapSubscribeType::PuFuturesBookTicker,
- GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
- GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
- GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
- ]);
- } else { // 参考
- ws = GateSwapWs::new_label(name.clone(), is_colo, None,
- GateSwapWsType::PublicAndPrivate("usdt".to_string()));
- ws.set_subscribe(vec![
- GateSwapSubscribeType::PuFuturesTrades,
- GateSwapSubscribeType::PuFuturesBookTicker,
- ]);
- }
- // 读取数据
- let mut update_flag_u = Decimal::ZERO;
- let core_arc_clone = Arc::clone(&core_arc);
- let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
- let run_symbol = symbols.clone()[0].clone();
- let fun = move |data: ResponseData| {
- let core_arc_cc = core_arc_clone.clone();
- // 在 async 块之前克隆 Arc
- let mul = multiplier.clone();
- let rs = run_symbol.clone();
- async move {
- on_data(core_arc_cc,
- &mut update_flag_u,
- &mul,
- &rs,
- symbol_multiplier,
- data,
- ).await
- }
- };
- // 建立链接
- ws.set_symbols(symbols_clone);
- ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
- });
- }
- async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
- update_flag_u: &mut Decimal,
- multiplier: &Decimal,
- run_symbol: &String,
- symbol_multiplier: Decimal,
- response: ResponseData) {
- let mut trace_stack = TraceStack::new(response.time, response.ins);
- trace_stack.on_after_span_line();
- match response.channel.as_str() {
- "futures.order_book" => {
- trace_stack.set_source("gate_usdt_swap.order_book".to_string());
- let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, &response, symbol_multiplier);
- trace_stack.on_after_format();
- info!("gate depth推送: {:?}", special_depth);
- on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
- }
- "futures.book_ticker" => {
- trace_stack.set_source("gate_usdt_swap.book_ticker".to_string());
- // 将ticker数据转换为模拟深度
- let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(GateSwap, &response, symbol_multiplier);
- trace_stack.on_after_format();
- info!("gate ticker推送: {:?}", special_depth);
- on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
- }
- "futures.balances" => {
- let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, &response, run_symbol);
- let mut core = core_arc_clone.lock().await;
- core.update_equity(account).await;
- }
- "futures.orders" => {
- trace_stack.set_source("gate_swap.orders".to_string());
- let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, response.clone(), multiplier.clone());
- let mut order_infos: Vec<OrderInfo> = Vec::new();
- for mut order in orders.order {
- if order.status == "NULL" {
- error!("gate_usdt_swap 未识别的订单状态:{:?}", response);
- continue;
- }
- let order_info = OrderInfo::parse_order_to_order_info(&mut order);
- order_infos.push(order_info);
- }
- {
- let mut core = core_arc_clone.lock().await;
- core.update_order(order_infos, trace_stack).await;
- }
- }
- "futures.positions" => {
- let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap, &response, multiplier);
- let mut core = core_arc_clone.lock().await;
- core.update_position(positions).await;
- }
- "futures.trades" => {
- // let mut core = core_arc_clone.lock().await;
- // let str = data.label.clone();
- // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
- // *max_buy = Decimal::ZERO;
- // *min_sell = Decimal::ZERO;
- // core.is_update.remove(str.as_str());
- // }
- // let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
- // for trade in trades {
- // if trade.price > *max_buy || *max_buy == Decimal::ZERO{
- // *max_buy = trade.price
- // }
- // if trade.price < *min_sell || *min_sell == Decimal::ZERO{
- // *min_sell = trade.price
- // }
- // }
- // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
- }
- _ => {
- error!("未知推送类型");
- error!(?response);
- }
- }
- }
- fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap<String, String>) -> GateSwapLogin {
- GateSwapLogin {
- api_key: exchange_params.get("access_key").unwrap().clone(),
- secret: exchange_params.get("secret_key").unwrap().clone(),
- }
- }
|