DESKTOP-NE65RNK\Citrus_limon il y a 1 an
Parent
commit
f77e3b0f9d
3 fichiers modifiés avec 100 ajouts et 100 suppressions
  1. 98 98
      src/bitget_usdt_swap_data_listener.rs
  2. 1 1
      src/main.rs
  3. 1 1
      src/server.rs

+ 98 - 98
src/bitget_usdt_swap_data_listener.rs

@@ -1,98 +1,98 @@
-// use std::collections::{BTreeMap, HashMap};
-// use std::sync::{Arc};
-// use std::sync::atomic::AtomicBool;
-// use lazy_static::lazy_static;
-// use tokio::sync::{Mutex};
-// use tracing::info;
-// use exchanges::bitget_swap_rest::BitgetSwapRest;
-// use exchanges::bitget_swap_ws::{BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
-// use exchanges::response_base::ResponseData;
-// use standard::exchange::ExchangeEnum;
-// use standard::exchange_struct_handler::ExchangeStructHandler;
-// use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
-//
-// const EXCHANGE_NAME: &str = "bitget_usdt_swap";
-//
-// lazy_static! {
-//     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-//     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-//     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
-// }
-//
-// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-//     let name = "bitget_usdt_swap_listener";
-//     // 订阅所有币种
-//     let login = BTreeMap::new();
-//     let mut bitget_rest = BitgetSwapRest::new(false, login);
-//     let response = bitget_rest.get_all_contracts().await;
-//     let mut symbols = vec![];
-//     if response.code == 200 {
-//         let data = response.data.as_array().unwrap();
-//         for info in data {
-//             symbols.push(info["symbol"].as_str().unwrap().to_string())
-//         }
-//     }
-//
-//     // 将 symbols 分成每份20个元素的小块
-//     for chunk in symbols.chunks(20) {
-//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-//         let write_tx_am = Arc::new(Mutex::new(write_tx));
-//
-//         let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-//         let ws_name = name.to_string();
-//         let write_tx_clone = Arc::clone(&write_tx_am);
-//         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-//
-//         tokio::spawn(async move {
-//             let mut ws = BitgetSwapWs::new_with_tag(ws_name, false, None, BitgetSwapWsType::Public);
-//             ws.set_subscribe(vec![
-//                 BitgetSwapSubscribeType::PuTrade,
-//                 BitgetSwapSubscribeType::PuCandle1m,
-//             ]);
-//
-//             // 建立链接
-//             ws.set_symbols(symbols_chunk);
-//             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
-//         });
-//     }
-// }
-//
-// // 读取数据
-// pub async fn data_listener(response: ResponseData) {
-//     if response.code != 200 {
-//         return;
-//     }
-//
-//     match response.channel.as_str() {
-//         // 深度数据
-//         "深度" => {
-//             // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
-//             //
-//             // update_depth(&depth).await;
-//         },
-//         // 订单流数据
-//         "trade" => {
-//             let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BitgetSwap, &response);
-//
-//             for trade in trades.iter() {
-//                 let trades_map = TRADES_MAP.lock().await;
-//
-//                 update_trade(&trade, trades_map, EXCHANGE_NAME).await
-//             }
-//         },
-//         // k线数据
-//         "candle1m" => {
-//             let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
-//
-//             if records.is_empty() {
-//                 return;
-//             }
-//
-//             let record_map= RECORD_MAP.lock().await;
-//             update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
-//         },
-//         _ => {
-//             info!("48 未知的数据类型: {:?}", response)
-//         }
-//     }
-// }
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use lazy_static::lazy_static;
+use tokio::sync::{Mutex};
+use tracing::info;
+use exchanges::bitget_swap_rest::BitgetSwapRest;
+use exchanges::bitget_swap_ws::{BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
+use exchanges::response_base::ResponseData;
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
+
+const EXCHANGE_NAME: &str = "bitget_usdt_swap";
+
+lazy_static! {
+    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "bitget_usdt_swap_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut bitget_rest = BitgetSwapRest::new(false, login);
+    let response = bitget_rest.get_all_contracts().await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let data = response.data.as_array().unwrap();
+        for info in data {
+            symbols.push(info["symbol"].as_str().unwrap().to_string())
+        }
+    }
+
+    // 将 symbols 分成每份20个元素的小块
+    for chunk in symbols.chunks(20) {
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let ws_name = name.to_string();
+        let write_tx_clone = Arc::clone(&write_tx_am);
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = BitgetSwapWs::new_with_tag(ws_name, false, None, BitgetSwapWsType::Public);
+            ws.set_subscribe(vec![
+                BitgetSwapSubscribeType::PuTrade,
+                BitgetSwapSubscribeType::PuCandle1m,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
+        });
+    }
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 深度数据
+        "深度" => {
+            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
+            //
+            // update_depth(&depth).await;
+        },
+        // 订单流数据
+        "trade" => {
+            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BitgetSwap, &response);
+
+            for trade in trades.iter() {
+                let trades_map = TRADES_MAP.lock().await;
+
+                update_trade(&trade, trades_map, EXCHANGE_NAME).await
+            }
+        },
+        // k线数据
+        "candle1m" => {
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
+
+            if records.is_empty() {
+                return;
+            }
+
+            let record_map= RECORD_MAP.lock().await;
+            update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
+        },
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+}

+ 1 - 1
src/main.rs

@@ -41,7 +41,6 @@ async fn main() {
     // ctrl c退出检查程序
     control_c::exit_handler(running.clone());
     // 启动各交易所的数据监听器
-    // bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // okx_usdt_swap_data_listener::run_listener(running.clone()).await;
     // mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     // kucoin_usdt_swap_data_listener::run_listener(running.clone()).await;
@@ -56,6 +55,7 @@ async fn main() {
     coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
     htx_usdt_swap_data_listener::run_listener(running.clone()).await;
     phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 1 - 1
src/server.rs

@@ -168,7 +168,6 @@ async fn get_trades_count(query: web::Query<ExchangeSpecialQuery>) -> impl Respo
 #[get("/exchanges")]
 async fn get_exchanges() -> impl Responder {
     let exchanges = vec![
-        // "bitget_usdt_swap",
         // "okx_usdt_swap",
         // "bingx_usdt_swap",
         // "mexc_usdt_swap",
@@ -182,6 +181,7 @@ async fn get_exchanges() -> impl Responder {
         "coinex_usdt_swap",
         "htx_usdt_swap",
         "phemex_usdt_swap",
+        "bitget_usdt_swap",
     ];
     let response_data = json!(exchanges);