Procházet zdrojové kódy

恢复了正常版本,以后特殊任务还是搞分支吧……

skyffire před 1 rokem
rodič
revize
7343708c32

+ 33 - 74
src/binance_usdt_swap_data_listener.rs

@@ -1,11 +1,8 @@
 use std::collections::{BTreeMap, HashMap};
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
-use std::time::Duration;
 use lazy_static::lazy_static;
 use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
-use serde_json::{json, Value};
 use tokio::sync::{Mutex, MutexGuard};
 use tracing::info;
 use exchanges::binance_swap_rest::BinanceSwapRest;
@@ -13,7 +10,7 @@ use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, Binanc
 use exchanges::response_base::ResponseData;
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use standard::{Depth, OrderBook, SpecialDepth};
+use standard::{Depth, OrderBook};
 use crate::listener_tools::{DepthMap, RecordMap, TradeMap, update_depth, update_record, update_trade};
 const EXCHANGE_NAME: &str = "binance_usdt_swap";
 
@@ -30,78 +27,40 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     // 订阅所有币种
     let login = BTreeMap::new();
     let mut binance_rest = BinanceSwapRest::new(false, login);
-    // let response = binance_rest.get_exchange_info().await;
-    // let mut symbols = vec![];
-    // if response.code == 200 {
-    //     let data = response.data["symbols"].as_array().unwrap();
-    //     for info in data {
-    //         let s = info["symbol"].as_str().unwrap().to_string();
-    //         if !s.ends_with("USDT") {
-    //             continue
-    //         }
-    //         let symbol = s.replace("USDT", "_USDT");
-    //         symbols.push(symbol)
-    //     }
-    // }
-    // info!(?symbols);
-    //
-    // for chunk in symbols.chunks(20) {
-    //     let ws_name = name.to_string();
-    //     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    //     let write_tx_am = Arc::new(Mutex::new(write_tx));
-    //     let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-    //     let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-    //
-    //     tokio::spawn(async move {
-    //         let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
-    //         ws.set_subscribe(vec![
-    //             BinanceSwapSubscribeType::PuAggTrade,
-    //             BinanceSwapSubscribeType::PuKline,
-    //         ]);
-    //
-    //         // 建立链接
-    //         ws.set_symbols(symbols_chunk);
-    //         ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    //     });
-    // }
-
-    // 2024-5-7任务 单独订阅ETH的全档位深度数据
-    let depth_symbols = vec!["ETH_USDT".to_string()];
-    // 每60s初始化一次深度信息
-    let symbols_clone = depth_symbols.clone();
-    tokio::spawn(async move {
-        loop {
-            for depth_symbol in &symbols_clone {
-                let formated_str = depth_symbol.replace("_", "");
-                let mut response = binance_rest.get_order_book(formated_str.as_str(), 1000).await;
-
-                response.channel = "depthInit".to_string();
-                response.data["a"] = response.data["asks"].clone();
-                response.data["b"] = response.data["bids"].clone();
-                response.data["s"] = json!(formated_str.as_str());
-                response.data["asks"] = Value::Null;
-                response.data["bids"] = Value::Null;
-
-                data_listener(response).await;
+    let response = binance_rest.get_exchange_info().await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let data = response.data["symbols"].as_array().unwrap();
+        for info in data {
+            let s = info["symbol"].as_str().unwrap().to_string();
+            if !s.ends_with("USDT") {
+                continue
             }
-
-            tokio::time::sleep(Duration::from_secs(10)).await;
+            let symbol = s.replace("USDT", "_USDT");
+            symbols.push(symbol)
         }
-    });
-    let ws_name = name.to_string();
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let write_tx_am = Arc::new(Mutex::new(write_tx));
-    let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-    tokio::spawn(async move {
-        let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
-        ws.set_subscribe(vec![
-            BinanceSwapSubscribeType::PuDepthUpdate
-        ]);
-
-        // 建立链接
-        ws.set_symbols(depth_symbols);
-        ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    });
+    }
+    info!(?symbols);
+
+    for chunk in symbols.chunks(20) {
+        let ws_name = name.to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
+            ws.set_subscribe(vec![
+                BinanceSwapSubscribeType::PuAggTrade,
+                BinanceSwapSubscribeType::PuKline,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
 }
 
 // 读取数据

+ 2 - 2
src/json_db_utils.rs

@@ -137,9 +137,9 @@ pub async fn collect_depth_json(start_timestamp: i64, end_timestamp: i64, exchan
         // 检查文件内容是否成功读取
         if let Ok(content) = file_content {
             // 尝试反序列化文件内容
-            if let Ok(mut depth_list) = serde_json::from_str::<Vec<SpecialDepth>>(&content) {
+            if let Ok(depth_list) = serde_json::from_str::<Vec<SpecialDepth>>(&content) {
                 // info!("{} 找到 1 条", filename);
-                for mut depth in depth_list {
+                for depth in depth_list {
                     // 不在时间范围内的就不要返回了
                     let t = depth.t.to_i64().unwrap();
                     if t < start_timestamp || t > end_timestamp {

+ 3 - 3
src/main.rs

@@ -31,9 +31,9 @@ async fn main() {
     control_c::exit_handler(running.clone());
     // 启动各交易所的数据监听器
     binance_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // gate_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
+    coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 1 - 1
standard/src/exchange_struct_handler.rs

@@ -1,7 +1,7 @@
 use std::str::FromStr;
 use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
-use tracing::{error, info};
+use tracing::{error};
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
 use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, coinex_swap_handle, kucoin_handle};