Kaynağa Gözat

支持gate。

skyfffire 1 yıl önce
ebeveyn
işleme
468c7f8cad
2 değiştirilmiş dosya ile 94 ekleme ve 62 silme
  1. 7 6
      strategy/src/exchange_disguise.rs
  2. 87 56
      strategy/src/gate_swap.rs

+ 7 - 6
strategy/src/exchange_disguise.rs

@@ -6,15 +6,16 @@ use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
 use crate::binance_usdt_swap::reference_binance_swap_run;
+use crate::gate_swap::gate_swap_run;
 use crate::kucoin_swap::kucoin_swap_run;
 use crate::quant::Quant;
 
 // 交易交易所启动
 pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
     match exchange_name.as_str() {
-        // "gate_usdt_swap" => {
-        //     gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
-        // }
+        "gate_usdt_swap" => {
+            gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
+        }
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
         },
@@ -37,9 +38,9 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
         // "binance_spot" => {
         //     reference_binance_spot_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
         // },
-        // "gate_usdt_swap" => {
-        //     gate_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
-        // },
+        "gate_usdt_swap" => {
+            gate_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+        },
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
         },

+ 87 - 56
strategy/src/gate_swap.rs

@@ -1,8 +1,14 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
 use rust_decimal::Decimal;
+use serde_json::Value;
+use tokio::spawn;
 use tokio::sync::Mutex;
+use tracing::info;
+use exchanges::gate_swap_rest::GateSwapRest;
+use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::GateSwap;
@@ -11,65 +17,83 @@ use crate::quant::Quant;
 
 // 1交易、0参考 gate 合约 启动
 #[allow(dead_code)]
-pub async fn gate_swap_run(_bool_v1 :Arc<AtomicBool>, _type_num: i8, _quant_arc: Arc<Mutex<Quant>>, _name: String, _symbols: Vec<String>, _exchange_params: BTreeMap<String, String>){
-    // let (tx, mut rx) = channel(100);
-    // let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
-    // let mut user_id= "".to_string();
-    // let symbols_one = symbols.clone();
-    //
-    // // 交易
-    // if type_num == 1{
-    //     // 获取user_id
-    //     let res_data = gate_exc.wallet_fee().await;
-    //     assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
-    //
-    //     let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
-    //     info!(?wallet_obj);
-    //     user_id = wallet_obj["user_id"].to_string();
-    // }
-    //
-    // spawn( async move {
-    //     let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
-    //                                              GateWsType::PublicAndPrivate("usdt".to_string()), tx);
-    //     // 交易
-    //     if type_num == 1 {
-    //         gate_exc.set_subscribe(vec![
-    //             GateSubscribeType::PuFuturesTrades,
-    //             GateSubscribeType::PuFuturesOrderBook,
-    //             GateSubscribeType::PrFuturesOrders(user_id.clone()),
-    //             GateSubscribeType::PrFuturesPositions(user_id.clone()),
-    //             GateSubscribeType::PrFuturesBalances(user_id.clone()),
-    //         ]);
-    //     } else { // 参考
-    //         gate_exc.set_subscribe(vec![
-    //             GateSubscribeType::PuFuturesTrades,
-    //             GateSubscribeType::PuFuturesOrderBook
-    //         ]);
-    //     }
-    //     gate_exc.custom_subscribe(bool_v1,symbols_one).await;
-    // });
-    // spawn(async move {
-    //     let bot_arc_clone = Arc::clone(&quant_arc);
-    //     let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
-    //     let run_symbol = symbols.clone()[0].clone();
-    //     // trade
-    //     let mut max_buy = Decimal::ZERO;
-    //     let mut min_sell = Decimal::ZERO;
-    //     loop {
-    //         sleep(Duration::from_millis(1)).await;
-    //
-    //         match rx.try_recv() {
-    //             Ok(data) => {
-    //                 on_data(bot_arc_clone.clone(), multiplier, run_symbol.clone(), &mut max_buy, &mut min_sell, data).await;
-    //             },
-    //             Err(_e) => {}
-    //         }
-    //     }
-    // });
+pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>, type_num: i8,
+                           quant_arc: Arc<Mutex<Quant>>, name: String,
+                           symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+    let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
+    let mut user_id= "".to_string();
+
+    // 交易
+    if type_num == 1 {
+        // 获取user_id
+        let res_data = gate_exc.wallet_fee().await;
+        assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
+
+        let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
+        info!(?wallet_obj);
+        user_id = wallet_obj["user_id"].to_string();
+    }
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let symbols_clone = symbols.clone();
+    spawn(async move {
+        let mut ws;
+        // 交易
+        if type_num == 1 {
+            let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
+            ws = GateSwapWs::new_label(name.clone(), false, Some(login_param),
+                                       GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+            ws.set_subscribe(vec![
+                GateSwapSubscribeType::PuFuturesTrades,
+                GateSwapSubscribeType::PuFuturesOrderBook,
+                GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
+                GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
+                GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
+            ]);
+        } else { // 参考
+            ws = GateSwapWs::new_label(name.clone(), false, None,
+                                       GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+            ws.set_subscribe(vec![
+                GateSwapSubscribeType::PuFuturesTrades,
+                GateSwapSubscribeType::PuFuturesOrderBook
+            ]);
+        }
+
+        ws.set_symbols(symbols_clone);
+        ws.ws_connect_async(bool_v1, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+
+    spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
+        let run_symbol = symbols.clone()[0].clone();
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+
+        loop {
+            if let Some(data) = read_rx.next().await {
+                on_data(bot_arc_clone.clone(),
+                        multiplier,
+                        run_symbol.clone(),
+                        &mut max_buy,
+                        &mut min_sell,
+                        data).await;
+            }
+        }
+    });
 }
 
 #[allow(dead_code)]
-async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, run_symbol: String, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
+                 multiplier: Decimal,
+                 run_symbol: String,
+                 max_buy: &mut Decimal,
+                 min_sell: &mut Decimal,
+                 data: ResponseData) {
     let mut trace_stack = TraceStack::default();
     trace_stack.on_network(data.time);
     trace_stack.on_before_quant();
@@ -150,3 +174,10 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, multiplier: Decimal, run_symb
         quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
     }
 }
+
+fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap<String, String>) -> GateSwapLogin {
+    GateSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        secret: exchange_params.get("secret_key").unwrap().clone()
+    }
+}