浏览代码

binance加入,但是没有做测试。

skyfffire 1 年之前
父节点
当前提交
915dfa2421
共有 2 个文件被更改,包括 59 次插入16 次删除
  1. 54 14
      strategy/src/binance_usdt_swap.rs
  2. 5 2
      strategy/src/exchange_disguise.rs

+ 54 - 14
strategy/src/binance_usdt_swap.rs

@@ -8,7 +8,7 @@ use tracing::error;
 use exchanges::response_base::ResponseData;
 use global::trace_stack::{TraceStack};
 use crate::core::Core;
-use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
+use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use crate::exchange_disguise::{on_depth, on_trade};
@@ -18,8 +18,7 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
                                                core_arc: Arc<Mutex<Core>>,
                                                name: String,
                                                symbols: Vec<String>,
-                                               is_colo: bool,
-                                               _exchange_params: BTreeMap<String, String>) {
+                                               is_colo: bool) {
     tokio::spawn(async move {
         //创建读写通道
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
@@ -42,7 +41,7 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 
             async move {
                 // 使用克隆后的 Arc,避免 move 语义
-                on_public_data(core_arc_cc, &mul, &rs, &data).await
+                on_data(core_arc_cc, &mul, &rs, &data).await
             }
         };
 
@@ -53,7 +52,49 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
     });
 }
 
-async fn on_public_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, _run_symbol: &String, response: &ResponseData) {
+// 启动binance交易ws
+pub(crate) async fn binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
+                                     core_arc: Arc<Mutex<Core>>,
+                                     name: String,
+                                     symbols: Vec<String>,
+                                     is_colo: bool,
+                                     exchange_params: BTreeMap<String, String>) {
+    tokio::spawn(async move {
+        //创建读写通道
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
+        let binance_login = parse_btree_map_to_binance_swap_login(exchange_params);
+        let mut ws = BinanceSwapWs::new_label(name, is_colo, Option::from(binance_login), BinanceSwapWsType::Private).await;
+        ws.set_subscribe(vec![
+            BinanceSwapSubscribeType::PrPosition,
+            BinanceSwapSubscribeType::PrAccount,
+            BinanceSwapSubscribeType::PrBalance
+        ]);
+
+        // 读取数据
+        let core_arc_clone = Arc::clone(&core_arc);
+        let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
+        let run_symbol = symbols.clone()[0].clone();
+
+        let fun = move |data: ResponseData| {
+            // 在 async 块之前克隆 Arc
+            let core_arc_cc = core_arc_clone.clone();
+            let mul = multiplier.clone();
+            let rs = run_symbol.clone();
+
+            async move {
+                // 使用克隆后的 Arc,避免 move 语义
+                on_data(core_arc_cc, &mul, &rs, &data).await
+            }
+        };
+
+        // 链接
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        ws.set_symbols(symbols);
+        ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
+    });
+}
+
+async fn on_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, _run_symbol: &String, response: &ResponseData) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
@@ -77,17 +118,16 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, _run_s
 
             on_depth(core_arc, &response.label, &mut trace_stack, &depth).await;
         }
-        // "depth" => {
-        //     trace_stack.set_source("binance_usdt_swap.depth".to_string());
-        //     // 将depth数据转换为模拟深度
-        //     let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, &response);
-        //     trace_stack.on_after_format();
-        //
-        //     on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
-        // }
         _ => {
             error!("未知推送类型");
             error!(?response);
         }
     }
-}
+}
+
+fn parse_btree_map_to_binance_swap_login(exchange_params: BTreeMap<String, String>) -> BinanceSwapLogin {
+    BinanceSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        api_secret: exchange_params.get("secret_key").unwrap().clone(),
+    }
+}

+ 5 - 2
strategy/src/exchange_disguise.rs

@@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool;
 use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
 use standard::{Depth, Trade};
-use crate::binance_usdt_swap::reference_binance_swap_run;
+use crate::binance_usdt_swap::{binance_swap_run, reference_binance_swap_run};
 use crate::gate_usdt_swap::gate_swap_run;
 use crate::core::Core;
 
@@ -17,6 +17,9 @@ pub async fn run_transactional_exchange(is_shutdown_arc :Arc<AtomicBool>,
                                         is_colo: bool,
                                         exchange_params: BTreeMap<String, String>) {
     match exchange_name.as_str() {
+        "binance_usdt_swap" => {
+            binance_swap_run(is_shutdown_arc, core_arc, name, symbols, is_colo, exchange_params).await;
+        },
         "gate_usdt_swap" => {
             gate_swap_run(is_shutdown_arc, true, core_arc, name, symbols, is_colo, exchange_params).await;
         }
@@ -59,7 +62,7 @@ pub async fn run_reference_exchange(is_shutdown_arc: Arc<AtomicBool>,
                                     exchange_params: BTreeMap<String, String>) {
     match exchange_name.as_str() {
         "binance_usdt_swap" => {
-            reference_binance_swap_run(is_shutdown_arc, core_arc, name, symbols, is_colo, exchange_params).await;
+            reference_binance_swap_run(is_shutdown_arc, core_arc, name, symbols, is_colo).await;
         },
         // "binance_spot" => {
         //     reference_binance_spot_run(is_shutdown_arc, core_arc, name, symbols, is_colo, exchange_params).await;