Jelajahi Sumber

去掉gate现货usdt限制 注释bitget

DESKTOP-NE65RNK\Citrus_limon 1 tahun lalu
induk
melakukan
4709c2082f
4 mengubah file dengan 103 tambahan dan 102 penghapusan
  1. 98 98
      src/bitget_usdt_swap_data_listener.rs
  2. 3 2
      src/gate_usdt_spot_data_listener.rs
  3. 1 1
      src/main.rs
  4. 1 1
      src/server.rs

+ 98 - 98
src/bitget_usdt_swap_data_listener.rs

@@ -1,98 +1,98 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use lazy_static::lazy_static;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::bitget_swap_rest::BitgetSwapRest;
-use exchanges::bitget_swap_ws::{BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
-use exchanges::response_base::ResponseData;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
-
-const EXCHANGE_NAME: &str = "bitget_usdt_swap";
-
-lazy_static! {
-    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "bitget_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut bitget_rest = BitgetSwapRest::new(false, login);
-    let response = bitget_rest.get_all_contracts().await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let data = response.data.as_array().unwrap();
-        for info in data {
-            symbols.push(info["symbol"].as_str().unwrap().to_string())
-        }
-    }
-
-    // 将 symbols 分成每份20个元素的小块
-    for chunk in symbols.chunks(20) {
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let ws_name = name.to_string();
-        let write_tx_clone = Arc::clone(&write_tx_am);
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = BitgetSwapWs::new_with_tag(ws_name, false, None, BitgetSwapWsType::Public);
-            ws.set_subscribe(vec![
-                BitgetSwapSubscribeType::PuTrade,
-                BitgetSwapSubscribeType::PuCandle1m,
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
-        });
-    }
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 深度数据
-        "深度" => {
-            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
-            //
-            // update_depth(&depth).await;
-        },
-        // 订单流数据
-        "trade" => {
-            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BitgetSwap, &response);
-
-            for trade in trades.iter() {
-                let trades_map = TRADES_MAP.lock().await;
-
-                update_trade(&trade, trades_map, EXCHANGE_NAME).await
-            }
-        },
-        // k线数据
-        "candle1m" => {
-            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
-
-            if records.is_empty() {
-                return;
-            }
-
-            let record_map= RECORD_MAP.lock().await;
-            update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
-        },
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}
+// use std::collections::{BTreeMap, HashMap};
+// use std::sync::{Arc};
+// use std::sync::atomic::AtomicBool;
+// use lazy_static::lazy_static;
+// use tokio::sync::{Mutex};
+// use tracing::info;
+// use exchanges::bitget_swap_rest::BitgetSwapRest;
+// use exchanges::bitget_swap_ws::{BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use standard::exchange::ExchangeEnum;
+// use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
+//
+// const EXCHANGE_NAME: &str = "bitget_usdt_swap";
+//
+// lazy_static! {
+//     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+//     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+//     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+// }
+//
+// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+//     let name = "bitget_usdt_swap_listener";
+//     // 订阅所有币种
+//     let login = BTreeMap::new();
+//     let mut bitget_rest = BitgetSwapRest::new(false, login);
+//     let response = bitget_rest.get_all_contracts().await;
+//     let mut symbols = vec![];
+//     if response.code == 200 {
+//         let data = response.data.as_array().unwrap();
+//         for info in data {
+//             symbols.push(info["symbol"].as_str().unwrap().to_string())
+//         }
+//     }
+//
+//     // 将 symbols 分成每份20个元素的小块
+//     for chunk in symbols.chunks(20) {
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//         let write_tx_am = Arc::new(Mutex::new(write_tx));
+//
+//         let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+//         let ws_name = name.to_string();
+//         let write_tx_clone = Arc::clone(&write_tx_am);
+//         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+//
+//         tokio::spawn(async move {
+//             let mut ws = BitgetSwapWs::new_with_tag(ws_name, false, None, BitgetSwapWsType::Public);
+//             ws.set_subscribe(vec![
+//                 BitgetSwapSubscribeType::PuTrade,
+//                 BitgetSwapSubscribeType::PuCandle1m,
+//             ]);
+//
+//             // 建立链接
+//             ws.set_symbols(symbols_chunk);
+//             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
+//         });
+//     }
+// }
+//
+// // 读取数据
+// pub async fn data_listener(response: ResponseData) {
+//     if response.code != 200 {
+//         return;
+//     }
+//
+//     match response.channel.as_str() {
+//         // 深度数据
+//         "深度" => {
+//             // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
+//             //
+//             // update_depth(&depth).await;
+//         },
+//         // 订单流数据
+//         "trade" => {
+//             let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BitgetSwap, &response);
+//
+//             for trade in trades.iter() {
+//                 let trades_map = TRADES_MAP.lock().await;
+//
+//                 update_trade(&trade, trades_map, EXCHANGE_NAME).await
+//             }
+//         },
+//         // k线数据
+//         "candle1m" => {
+//             let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
+//
+//             if records.is_empty() {
+//                 return;
+//             }
+//
+//             let record_map= RECORD_MAP.lock().await;
+//             update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
+//         },
+//         _ => {
+//             info!("48 未知的数据类型: {:?}", response)
+//         }
+//     }
+// }

+ 3 - 2
src/gate_usdt_spot_data_listener.rs

@@ -38,8 +38,9 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
         let mut mul_map = MUL_MAP.lock().await;
         for symbol_info in symbol_infos {
             let trade_status = symbol_info["trade_status"].as_str().unwrap();
-            let quote = symbol_info["quote"].as_str().unwrap();
-            if trade_status != "tradable" || quote != "USDT" { continue; };
+            // let quote = symbol_info["quote"].as_str().unwrap();
+            // if trade_status != "tradable" || quote != "USDT" { continue; };
+            if trade_status != "tradable" { continue; };
             // quanto_multiplier是ct_val
             let symbol = symbol_info["id"].as_str().unwrap().to_string();
             mul_map.insert(symbol.clone(), Decimal::ONE);

+ 1 - 1
src/main.rs

@@ -49,13 +49,13 @@ async fn main() {
     // coinsph_usdt_swap_data_listener::run_listener(running.clone()).await;
     // woo_usdt_swap_data_listener::run_listener(running.clone()).await;
     // cointr_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
 
     binance_usdt_swap_data_listener::run_listener(running.clone()).await;
     gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
     htx_usdt_swap_data_listener::run_listener(running.clone()).await;
     phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     gate_usdt_spot_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();

+ 1 - 1
src/server.rs

@@ -176,12 +176,12 @@ async fn get_exchanges() -> impl Responder {
         // "coinsph_usdt_swap"
         // "woo_usdt_swap",
         // "cointr_usdt_swap",
+        // "bitget_usdt_swap",
         "gate_usdt_swap",
         "binance_usdt_swap",
         "coinex_usdt_swap",
         "htx_usdt_swap",
         "phemex_usdt_swap",
-        "bitget_usdt_swap",
         "gate_usdt_spot",
     ];
     let response_data = json!(exchanges);