瀏覽代碼

bitget多个ws订阅,可以避免断线问题。

skyffire 1 年之前
父節點
當前提交
4bc77df71a
共有 5 個文件被更改,包括 27 次插入23 次删除
  1. 0 1
      global/src/log_utils.rs
  2. 19 11
      src/bitget_usdt_swap_data_listener.rs
  3. 7 9
      src/gate_usdt_swap_data_listener.rs
  4. 0 1
      src/listener_tools.rs
  5. 1 1
      src/main.rs

+ 0 - 1
global/src/log_utils.rs

@@ -56,7 +56,6 @@ pub fn init_log_with_info() {
 pub fn final_init(level: &str, port: u32, app_name: String) -> WorkerGuard {
     let mut path = String::new();
     path.push_str("./logs");
-    path.push_str(port.to_string().as_str());
 
     let file_appender = RollingFileAppender::builder()
         .time_zone(8)

+ 19 - 11
src/bitget_usdt_swap_data_listener.rs

@@ -22,8 +22,6 @@ lazy_static! {
 }
 
 pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let write_tx_am = Arc::new(Mutex::new(write_tx));
     let name = "bitget_usdt_swap_listener";
     // 订阅所有币种
     let login = BTreeMap::new();
@@ -36,19 +34,29 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             symbols.push(info["symbol"].as_str().unwrap().to_string())
         }
     }
-    info!(?symbols);
 
-    tokio::spawn(async move {
-        let mut ws = BitgetSwapWs::new_with_tag(name.to_string(), false, None, BitgetSwapWsType::Public);
+    // 将 symbols 分成每份20个元素的小块
+    for chunk in symbols.chunks(20) {
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let ws_name = name.to_string();
+        let write_tx_clone = Arc::clone(&write_tx_am);
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = BitgetSwapWs::new_with_tag(ws_name, false, None, BitgetSwapWsType::Public);
             ws.set_subscribe(vec![
                 BitgetSwapSubscribeType::PuTrade,
-                BitgetSwapSubscribeType::PuCandle1m
+                BitgetSwapSubscribeType::PuCandle1m,
             ]);
 
-        // 建立链接
-        ws.set_symbols(symbols);
-        ws.ws_connect_async(is_shutdown_arc, data_listener, &write_tx_am, write_rx).await.unwrap();
-    });
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
+        });
+    }
 }
 
 // 读取数据
@@ -76,7 +84,7 @@ pub async fn data_listener(response: ResponseData) {
         },
         // k线数据
         "candle1m" => {
-            // let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
+            let _records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
             //
             // for record in records.iter() {
             //     info!(?record);

+ 7 - 9
src/gate_usdt_swap_data_listener.rs

@@ -41,7 +41,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         let mut ws = GateSwapWs::new_with_tag(name.to_string(), false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
             ws.set_subscribe(vec![
-                // GateSwapSubscribeType::PuFuturesTrades,
+                GateSwapSubscribeType::PuFuturesTrades,
                 GateSwapSubscribeType::PuFuturesCandlesticks,
                 // GateSwapSubscribeType::PuFuturesOrderBook
             ]);
@@ -69,19 +69,17 @@ pub async fn data_listener(response: ResponseData) {
         "futures.trades" => {
             let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response);
 
-            for trade in trades.iter() {
-                let trades_map = TRADES_MAP.lock().await;
+            let trades_map = TRADES_MAP.lock().await;
 
-                update_trade(&trade, trades_map, EXCHANGE_NAME).await
-            }
+            update_trade(&trades[trades.len() - 1], trades_map, EXCHANGE_NAME).await
         },
         // k线数据
         "futures.candlesticks" => {
-            let records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSwap, &response);
+            let _records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSwap, &response);
 
-            for record in records.iter() {
-                info!(?record);
-            }
+            // for record in records.iter() {
+            //     info!(?record);
+            // }
         },
         _ => {
             info!("48 未知的数据类型: {:?}", response)

+ 0 - 1
src/listener_tools.rs

@@ -1,7 +1,6 @@
 use std::str::FromStr;
 use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
-use tracing::info;
 use standard::{SpecialTrade, Trade};
 use crate::gate_usdt_swap_data_listener::TradeMap;
 use crate::json_db_utils::{generate_file_path, write_to_file};

+ 1 - 1
src/main.rs

@@ -24,7 +24,7 @@ async fn main() {
     // 掌控全局的关闭
     let running = Arc::new(AtomicBool::new(true));
     // 启动各交易所的数据监听器
-    // gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();