| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- use std::collections::BTreeMap;
- use std::sync::Arc;
- use std::sync::atomic::AtomicBool;
- use futures_util::StreamExt;
- use rust_decimal::Decimal;
- use serde_json::Value;
- use tokio::spawn;
- use tokio::sync::Mutex;
- use tracing::info;
- use exchanges::gate_swap_rest::GateSwapRest;
- use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
- use exchanges::response_base::ResponseData;
- use global::trace_stack::TraceStack;
- use standard::exchange::ExchangeEnum::GateSwap;
- use crate::model::{OrderInfo, OriginalTradeGa};
- use crate::quant::Quant;
- use crate::exchange_disguise::on_special_depth;
- // 1交易、0参考 gate 合约 启动
- pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
- is_trade: bool,
- quant_arc: Arc<Mutex<Quant>>,
- name: String,
- symbols: Vec<String>,
- is_colo: bool,
- exchange_params: BTreeMap<String, String>) {
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
- let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
- let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone());
- let mut user_id= "".to_string();
- // 交易
- if is_trade {
- // 获取user_id
- let res_data = gate_exc.wallet_fee().await;
- assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
- let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
- info!(?wallet_obj);
- user_id = wallet_obj["user_id"].to_string();
- }
- let write_tx_am = Arc::new(Mutex::new(write_tx));
- let symbols_clone = symbols.clone();
- spawn(async move {
- let mut ws;
- // 交易
- if is_trade {
- let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
- ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param),
- GateSwapWsType::PublicAndPrivate("usdt".to_string()));
- ws.set_subscribe(vec![
- // GateSwapSubscribeType::PuFuturesTrades,
- GateSwapSubscribeType::PuFuturesOrderBook,
- GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
- GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
- GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
- ]);
- } else { // 参考
- ws = GateSwapWs::new_label(name.clone(), is_colo, None,
- GateSwapWsType::PublicAndPrivate("usdt".to_string()));
- ws.set_subscribe(vec![
- GateSwapSubscribeType::PuFuturesTrades,
- GateSwapSubscribeType::PuFuturesOrderBook
- ]);
- }
- ws.set_symbols(symbols_clone);
- ws.ws_connect_async(bool_v1, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
- });
- spawn(async move {
- let bot_arc_clone = Arc::clone(&quant_arc);
- let run_symbol = symbols.clone()[0].clone();
- // trade
- let mut update_flag_u = Decimal::ZERO;
- let mut max_buy = Decimal::ZERO;
- let mut min_sell = Decimal::ZERO;
- let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
- loop {
- if let Some(data) = read_rx.next().await {
- let mut trace_stack = TraceStack::default();
- trace_stack.on_after_network(data.time);
- trace_stack.on_before_unlock_quant();
- if data.time != 0 {
- info!("gate>{}", trace_stack.to_string());
- }
- // on_data(bot_arc_clone.clone(),
- // &mut update_flag_u,
- // multiplier,
- // run_symbol.clone(),
- // &mut max_buy,
- // &mut min_sell,
- // data).await;
- }
- }
- });
- }
- async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
- update_flag_u: &mut Decimal,
- multiplier: Decimal,
- run_symbol: String,
- max_buy: &mut Decimal,
- min_sell: &mut Decimal,
- data: ResponseData) {
- let mut trace_stack = TraceStack::default();
- trace_stack.on_after_network(data.time);
- trace_stack.on_before_unlock_quant();
- // if data.code != "200".to_string() {
- // return;
- // }
- //
- // if data.channel == "futures.order_book" {
- // trace_stack.on_before_format();
- // let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data.clone());
- // trace_stack.on_before_network(depth.create_at.clone());
- // trace_stack.on_after_format();
- //
- // on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, depth).await;
- // } else if data.channel == "futures.balances" {
- // let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
- // {
- // let mut quant = bot_arc_clone.lock().await;
- // quant.update_equity(account).await;
- // }
- // } else if data.channel == "futures.orders" {
- // trace_stack.on_before_format();
- // let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
- // trace_stack.on_after_format();
- //
- // let mut order_infos:Vec<OrderInfo> = Vec::new();
- // for order in orders.order {
- // let order_info = OrderInfo {
- // symbol: "".to_string(),
- // amount: order.amount.abs(),
- // side: "".to_string(),
- // price: order.price,
- // client_id: order.custom_id,
- // filled_price: order.avg_price,
- // filled: order.deal_amount.abs(),
- // order_id: order.id,
- // local_time: 0,
- // create_time: 0,
- // status: order.status,
- // fee: Default::default(),
- // trace_stack: Default::default(),
- // };
- // order_infos.push(order_info);
- // }
- //
- // {
- // let mut quant = bot_arc_clone.lock().await;
- // quant.update_order(order_infos, trace_stack);
- // }
- // } else if data.channel == "futures.positions" {
- // let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
- // {
- // let mut quant = bot_arc_clone.lock().await;
- // quant.update_position(positions).await;
- // }
- // } else if data.channel == "futures.trades" {
- // let mut quant = bot_arc_clone.lock().await;
- // let str = data.label.clone();
- // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
- // *max_buy = Decimal::ZERO;
- // *min_sell = Decimal::ZERO;
- // quant.is_update.remove(str.as_str());
- // }
- // let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
- // for trade in trades {
- // if trade.price > *max_buy || *max_buy == Decimal::ZERO{
- // *max_buy = trade.price
- // }
- // if trade.price < *min_sell || *min_sell == Decimal::ZERO{
- // *min_sell = trade.price
- // }
- // }
- // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
- // }
- }
- fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap<String, String>) -> GateSwapLogin {
- GateSwapLogin {
- api_key: exchange_params.get("access_key").unwrap().clone(),
- secret: exchange_params.get("secret_key").unwrap().clone()
- }
- }
|