Forráskód Böngészése

1. find_latest_directory方法更新,不再按照操作系统的文件夹创建时间来寻找文件夹,而是改用ASCII码对比标准日期格式的形式。
2. 暂时移除phemex、gate现货、mexc交易所。

skyffire 11 hónapja
szülő
commit
75bd2876fa

+ 148 - 148
src/gate_coin_spot_data_listener.rs

@@ -1,148 +1,148 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use lazy_static::lazy_static;
-use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::gate_spot_rest::GateSpotRest;
-use exchanges::gate_spot_ws::{GateSpotSubscribeType, GateSpotWs, GateSpotWsType};
-use exchanges::response_base::ResponseData;
-use serde_json::json;
-use standard::{Depth, OrderBook, SimpleDepth};
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::delete_db_by_exchange;
-use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
-
-const EXCHANGE_NAME: &str = "gate_coin_spot";
-
-lazy_static! {
-    static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-    static ref SIMPLE_DEPTH_MAP: Mutex<SimpleDepthMap> = Mutex::new(HashMap::new());
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
-    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "gate_coin_spot_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut gate_rest = GateSpotRest::new(false, login);
-    let params = json!({});
-    let response = gate_rest.get_market_details(params).await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data.as_array().unwrap();
-        let mut mul_map = MUL_MAP.lock().await;
-        for symbol_info in symbol_infos {
-            let trade_status = symbol_info["trade_status"].as_str().unwrap();
-            // let quote = symbol_info["quote"].as_str().unwrap();
-            // if trade_status != "tradable" || quote == "USDT" { continue; };
-            if trade_status != "tradable" { continue; };
-            // quanto_multiplier是ct_val
-            let symbol = symbol_info["id"].as_str().unwrap().to_string();
-            mul_map.insert(symbol.clone(), Decimal::ONE);
-            symbols.push(symbol)
-        }
-    }
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
-            ws.set_subscribe(vec![
-                GateSpotSubscribeType::PuSpotTrades,
-                GateSpotSubscribeType::PuSpotCandlesticks,
-                // GateSpotSubscribeType::PuFuturesOrderBook,
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
-            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 深度数据
-        "spot.order_book" => {
-            let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSpot, &response);
-            let mul_map = MUL_MAP.lock().await;
-            let new_depth = Depth {
-                time: depth.time,
-                symbol: depth.symbol.clone(),
-                asks: depth.asks.iter().map(|ask| OrderBook { price: ask.price, amount: ask.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
-                bids: depth.bids.iter().map(|bid| OrderBook { price: bid.price, amount: bid.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
-            };
-
-            // 更新到本地数据库
-            // ------ 简易深度数据
-            let simple_depth = SimpleDepth::new(&new_depth);
-            let simple_depth_map = SIMPLE_DEPTH_MAP.lock().await;
-            update_simple_depth(&simple_depth, simple_depth_map, EXCHANGE_NAME).await;
-
-            // ------ 标准深度数据
-            // let depth_map = DEPTH_MAP.lock().await;
-            // update_depth(&new_depth, depth_map, EXCHANGE_NAME).await;
-        }
-        // 订单流数据
-        "spot.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSpot, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为gate的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        // k线数据
-        "spot.candlesticks" => {
-            let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSpot, &response);
-
-            let mul_map = MUL_MAP.lock().await;
-            for record in records.iter_mut() {
-                // 真实交易量处理,因为gate的量都是张数
-                let mul = mul_map[record.symbol.as_str()];
-                let mid_price = (record.high + record.low) * dec!(0.5);
-                let mut real_volume = record.volume * mul * mid_price;
-                real_volume.rescale(2);
-                record.volume = real_volume;
-
-                // 更新到本地数据库
-                let record_map = RECORD_MAP.lock().await;
-                update_record(record, record_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}
+// use std::collections::{BTreeMap, HashMap};
+// use std::sync::{Arc};
+// use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
+// use lazy_static::lazy_static;
+// use rust_decimal::Decimal;
+// use rust_decimal_macros::dec;
+// use tokio::sync::{Mutex};
+// use tracing::info;
+// use exchanges::gate_spot_rest::GateSpotRest;
+// use exchanges::gate_spot_ws::{GateSpotSubscribeType, GateSpotWs, GateSpotWsType};
+// use exchanges::response_base::ResponseData;
+// use serde_json::json;
+// use standard::{Depth, OrderBook, SimpleDepth};
+// use standard::exchange::ExchangeEnum;
+// use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
+// use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
+//
+// const EXCHANGE_NAME: &str = "gate_coin_spot";
+//
+// lazy_static! {
+//     static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+//     static ref SIMPLE_DEPTH_MAP: Mutex<SimpleDepthMap> = Mutex::new(HashMap::new());
+//     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+//     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+//     static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
+// }
+//
+// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+//     let name = "gate_coin_spot_listener";
+//     // 订阅所有币种
+//     let login = BTreeMap::new();
+//     let mut gate_rest = GateSpotRest::new(false, login);
+//     let params = json!({});
+//     let response = gate_rest.get_market_details(params).await;
+//     let mut symbols = vec![];
+//     if response.code == 200 {
+//         let symbol_infos = response.data.as_array().unwrap();
+//         let mut mul_map = MUL_MAP.lock().await;
+//         for symbol_info in symbol_infos {
+//             let trade_status = symbol_info["trade_status"].as_str().unwrap();
+//             // let quote = symbol_info["quote"].as_str().unwrap();
+//             // if trade_status != "tradable" || quote == "USDT" { continue; };
+//             if trade_status != "tradable" { continue; };
+//             // quanto_multiplier是ct_val
+//             let symbol = symbol_info["id"].as_str().unwrap().to_string();
+//             mul_map.insert(symbol.clone(), Decimal::ONE);
+//             symbols.push(symbol)
+//         }
+//     }
+//     for chunk in symbols.chunks(20) {
+//         let ws_name = name.to_string();
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//         let write_tx_am = Arc::new(Mutex::new(write_tx));
+//         let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+//         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+//
+//         tokio::spawn(async move {
+//             let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
+//             ws.set_subscribe(vec![
+//                 GateSpotSubscribeType::PuSpotTrades,
+//                 GateSpotSubscribeType::PuSpotCandlesticks,
+//                 // GateSpotSubscribeType::PuFuturesOrderBook,
+//             ]);
+//
+//             // 建立链接
+//             ws.set_symbols(symbols_chunk);
+//             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//         });
+//     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
+// }
+//
+// // 读取数据
+// pub async fn data_listener(response: ResponseData) {
+//     if response.code != 200 {
+//         return;
+//     }
+//
+//     match response.channel.as_str() {
+//         // 深度数据
+//         "spot.order_book" => {
+//             let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSpot, &response);
+//             let mul_map = MUL_MAP.lock().await;
+//             let new_depth = Depth {
+//                 time: depth.time,
+//                 symbol: depth.symbol.clone(),
+//                 asks: depth.asks.iter().map(|ask| OrderBook { price: ask.price, amount: ask.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
+//                 bids: depth.bids.iter().map(|bid| OrderBook { price: bid.price, amount: bid.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
+//             };
+//
+//             // 更新到本地数据库
+//             // ------ 简易深度数据
+//             let simple_depth = SimpleDepth::new(&new_depth);
+//             let simple_depth_map = SIMPLE_DEPTH_MAP.lock().await;
+//             update_simple_depth(&simple_depth, simple_depth_map, EXCHANGE_NAME).await;
+//
+//             // ------ 标准深度数据
+//             // let depth_map = DEPTH_MAP.lock().await;
+//             // update_depth(&new_depth, depth_map, EXCHANGE_NAME).await;
+//         }
+//         // 订单流数据
+//         "spot.trades" => {
+//             let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSpot, &response);
+//             let mul_map = MUL_MAP.lock().await;
+//
+//             for trade in trades.iter_mut() {
+//                 // 真实交易量处理,因为gate的量都是张数
+//                 let mul = mul_map[trade.symbol.as_str()];
+//                 let mut real_size = trade.size * mul * trade.price;
+//                 real_size.rescale(2);
+//                 trade.size = real_size;
+//
+//                 // 更新到本地数据库
+//                 let trades_map = TRADES_MAP.lock().await;
+//                 update_trade(trade, trades_map, EXCHANGE_NAME).await;
+//             }
+//         }
+//         // k线数据
+//         "spot.candlesticks" => {
+//             let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSpot, &response);
+//
+//             let mul_map = MUL_MAP.lock().await;
+//             for record in records.iter_mut() {
+//                 // 真实交易量处理,因为gate的量都是张数
+//                 let mul = mul_map[record.symbol.as_str()];
+//                 let mid_price = (record.high + record.low) * dec!(0.5);
+//                 let mut real_volume = record.volume * mul * mid_price;
+//                 real_volume.rescale(2);
+//                 record.volume = real_volume;
+//
+//                 // 更新到本地数据库
+//                 let record_map = RECORD_MAP.lock().await;
+//                 update_record(record, record_map, EXCHANGE_NAME).await;
+//             }
+//         }
+//         _ => {
+//             info!("48 未知的数据类型: {:?}", response)
+//         }
+//     }
+// }

+ 11 - 7
src/json_db_utils.rs

@@ -258,23 +258,27 @@ async fn delete_directory(target_directory: &str) {
 }
 
 fn find_latest_directory(path: &Path) -> std::io::Result<Option<PathBuf>> {
-    let mut latest: Option<(PathBuf, std::time::SystemTime)> = None;
+    let mut latest: Option<PathBuf> = None;
 
     for entry in std::fs::read_dir(path)? {
         let entry = entry?;
         let path = entry.path();
+
+        // 仅处理目录
         if path.is_dir() {
-            if let Ok(metadata) = entry.metadata() {
-                if let Ok(modified) = metadata.modified() {
-                    if latest.is_none() || modified > latest.as_ref().unwrap().1 {
-                        latest = Some((path, modified));
-                    }
+            // 更新最新目录的逻辑
+            if let Some(ref latest_path) = latest {
+                // 比较当前目录名与最新目录名
+                if path.file_name().unwrap() > latest_path.file_name().unwrap() {
+                    latest = Some(path);
                 }
+            } else {
+                latest = Some(path);
             }
         }
     }
 
-    Ok(latest.map(|l| l.0))
+    Ok(latest)
 }
 
 fn list_directories(path: &Path) -> std::io::Result<Vec<PathBuf>> {

+ 3 - 3
src/main.rs

@@ -56,9 +56,9 @@ async fn main() {
     binance_usdt_swap_data_listener::run_listener(running.clone()).await;
     gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_coin_spot_data_listener::run_listener(running.clone()).await;
-    mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // gate_coin_spot_data_listener::run_listener(running.clone()).await;
+    // mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出

+ 131 - 131
src/mexc_usdt_swap_data_listener.rs

@@ -1,131 +1,131 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use lazy_static::lazy_static;
-use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::mexc_swap_rest::MexcSwapRest;
-use exchanges::mexc_swap_ws::{MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
-use exchanges::response_base::ResponseData;
-use rust_decimal::prelude::FromPrimitive;
-use serde_json::json;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::delete_db_by_exchange;
-use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
-
-const EXCHANGE_NAME: &str = "mexc_usdt_swap";
-
-lazy_static! {
-    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
-    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "mexc_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut mexc_rest = MexcSwapRest::new(false, login);
-    let params = json!({});
-    let response = mexc_rest.get_market(params).await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data.as_array().unwrap();
-        let mut mul_map = MUL_MAP.lock().await;
-        for symbol_info in symbol_infos {
-            // quanto_multiplier是ct_val
-            let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
-            let mul = Decimal::from_f64(symbol_info["contractSize"].as_f64().unwrap()).unwrap();
-            mul_map.insert(symbol.clone(), mul);
-
-            symbols.push(symbol)
-        }
-    }
-
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = MexcSwapWs::new_with_tag(ws_name, false, None, MexcSwapWsType::PublicAndPrivate);
-            ws.set_subscribe(vec![
-                MexcSwapSubscribeType::PuFuturesTrades,
-                MexcSwapSubscribeType::PuFuturesRecords,
-                // MexcSwapSubscribeType::PuFuturesOrderBook
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
-            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 深度数据
-        "futures.order_book" => {
-            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::MexcSwap, &response);
-            //
-            // update_depth(&depth).await;
-        }
-        // 订单流数据
-        "futures.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::MexcSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为mexc的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        // k线数据
-        "futures.candlesticks" => {
-            let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::MexcSwap, &response);
-
-            let mul_map = MUL_MAP.lock().await;
-            for record in records.iter_mut() {
-                // 真实交易量处理,因为mexc的量都是张数
-                let mul = mul_map[record.symbol.as_str()];
-                let mid_price = (record.high + record.low) * dec!(0.5);
-                let mut real_volume = record.volume * mul * mid_price;
-                real_volume.rescale(2);
-                record.volume = real_volume;
-
-                // 更新到本地数据库
-                let record_map = RECORD_MAP.lock().await;
-                update_record(record, record_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}
+// use std::collections::{BTreeMap, HashMap};
+// use std::sync::{Arc};
+// use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
+// use lazy_static::lazy_static;
+// use rust_decimal::Decimal;
+// use rust_decimal_macros::dec;
+// use tokio::sync::{Mutex};
+// use tracing::info;
+// use exchanges::mexc_swap_rest::MexcSwapRest;
+// use exchanges::mexc_swap_ws::{MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use rust_decimal::prelude::FromPrimitive;
+// use serde_json::json;
+// use standard::exchange::ExchangeEnum;
+// use standard::exchange_struct_handler::ExchangeStructHandler;
+// use crate::json_db_utils::delete_db_by_exchange;
+// use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
+//
+// const EXCHANGE_NAME: &str = "mexc_usdt_swap";
+//
+// lazy_static! {
+//     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+//     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+//     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+//     static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
+// }
+//
+// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+//     let name = "mexc_usdt_swap_listener";
+//     // 订阅所有币种
+//     let login = BTreeMap::new();
+//     let mut mexc_rest = MexcSwapRest::new(false, login);
+//     let params = json!({});
+//     let response = mexc_rest.get_market(params).await;
+//     let mut symbols = vec![];
+//     if response.code == 200 {
+//         let symbol_infos = response.data.as_array().unwrap();
+//         let mut mul_map = MUL_MAP.lock().await;
+//         for symbol_info in symbol_infos {
+//             // quanto_multiplier是ct_val
+//             let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
+//             let mul = Decimal::from_f64(symbol_info["contractSize"].as_f64().unwrap()).unwrap();
+//             mul_map.insert(symbol.clone(), mul);
+//
+//             symbols.push(symbol)
+//         }
+//     }
+//
+//     for chunk in symbols.chunks(20) {
+//         let ws_name = name.to_string();
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//         let write_tx_am = Arc::new(Mutex::new(write_tx));
+//         let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+//         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+//
+//         tokio::spawn(async move {
+//             let mut ws = MexcSwapWs::new_with_tag(ws_name, false, None, MexcSwapWsType::PublicAndPrivate);
+//             ws.set_subscribe(vec![
+//                 MexcSwapSubscribeType::PuFuturesTrades,
+//                 MexcSwapSubscribeType::PuFuturesRecords,
+//                 // MexcSwapSubscribeType::PuFuturesOrderBook
+//             ]);
+//
+//             // 建立链接
+//             ws.set_symbols(symbols_chunk);
+//             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//         });
+//     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
+// }
+//
+// // 读取数据
+// pub async fn data_listener(response: ResponseData) {
+//     if response.code != 200 {
+//         return;
+//     }
+//
+//     match response.channel.as_str() {
+//         // 深度数据
+//         "futures.order_book" => {
+//             // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::MexcSwap, &response);
+//             //
+//             // update_depth(&depth).await;
+//         }
+//         // 订单流数据
+//         "futures.trades" => {
+//             let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::MexcSwap, &response);
+//             let mul_map = MUL_MAP.lock().await;
+//
+//             for trade in trades.iter_mut() {
+//                 // 真实交易量处理,因为mexc的量都是张数
+//                 let mul = mul_map[trade.symbol.as_str()];
+//                 let mut real_size = trade.size * mul * trade.price;
+//                 real_size.rescale(2);
+//                 trade.size = real_size;
+//
+//                 // 更新到本地数据库
+//                 let trades_map = TRADES_MAP.lock().await;
+//                 update_trade(trade, trades_map, EXCHANGE_NAME).await;
+//             }
+//         }
+//         // k线数据
+//         "futures.candlesticks" => {
+//             let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::MexcSwap, &response);
+//
+//             let mul_map = MUL_MAP.lock().await;
+//             for record in records.iter_mut() {
+//                 // 真实交易量处理,因为mexc的量都是张数
+//                 let mul = mul_map[record.symbol.as_str()];
+//                 let mid_price = (record.high + record.low) * dec!(0.5);
+//                 let mut real_volume = record.volume * mul * mid_price;
+//                 real_volume.rescale(2);
+//                 record.volume = real_volume;
+//
+//                 // 更新到本地数据库
+//                 let record_map = RECORD_MAP.lock().await;
+//                 update_record(record, record_map, EXCHANGE_NAME).await;
+//             }
+//         }
+//         _ => {
+//             info!("48 未知的数据类型: {:?}", response)
+//         }
+//     }
+// }

+ 131 - 131
src/phemex_usdt_swap_data_listener.rs

@@ -1,131 +1,131 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use lazy_static::lazy_static;
-use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
-use tokio::sync::{Mutex};
-use exchanges::phemex_swap_rest::PhemexSwapRest;
-use exchanges::phemex_swap_ws::{PhemexSwapSubscribeType, PhemexSwapWs, PhemexSwapWsType};
-use exchanges::response_base::ResponseData;
-use serde_json::{json};
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use standard::utils::symbol_out_mapper;
-use crate::json_db_utils::delete_db_by_exchange;
-use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
-
-const EXCHANGE_NAME: &str = "phemex_usdt_swap";
-
-lazy_static! {
-    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
-    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "phemex_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut phemex_rest = PhemexSwapRest::new(false, login);
-    let params = json!({});
-    let response = phemex_rest.get_market(params).await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data["perpProductsV2"].as_array().unwrap();
-        let mut mul_map = MUL_MAP.lock().await;
-        for symbol_info in symbol_infos {
-            if symbol_info["quoteCurrency"] != "USDT" || symbol_info["status"] == "Delisted" { continue; }
-            // quanto_multiplier是ct_val
-            let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
-            let base_currency: String = symbol_info["baseCurrency"].as_str().unwrap().to_string().split_whitespace().collect();
-            let loc_symbol = format!("{}_{}", base_currency, symbol_info["quoteCurrency"].as_str().unwrap());
-            let mul = Decimal::ONE;
-            mul_map.insert(symbol_out_mapper(ExchangeEnum::PhemexSwap, &loc_symbol), mul);
-
-            symbols.push(symbol)
-        }
-    }
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = PhemexSwapWs::new_with_tag(ws_name, false, None, PhemexSwapWsType::PublicAndPrivate).clone();
-            ws.set_subscribe(vec![
-                PhemexSwapSubscribeType::PuFuturesTrades,
-                PhemexSwapSubscribeType::PuFuturesRecords,
-                // PhemexSwapSubscribeType::PuFuturesOrderBook
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
-            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 深度数据
-        "futures.order_book" => {
-            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::PhemexSwap, &response);
-            //
-            // update_depth(&depth).await;
-        }
-        // 订单流数据
-        "futures.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::PhemexSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为phemex的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        // k线数据
-        "futures.candlesticks" => {
-            let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::PhemexSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-            for record in records.iter_mut() {
-                // 真实交易量处理,因为phemex的量都是张数
-                let mul = mul_map[record.symbol.as_str()];
-                let mid_price = (record.high + record.low) * dec!(0.5);
-                let mut real_volume = record.volume * mul * mid_price;
-                real_volume.rescale(2);
-                record.volume = real_volume;
-
-                // 更新到本地数据库
-                let record_map = RECORD_MAP.lock().await;
-                update_record(record, record_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            // info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}
+// use std::collections::{BTreeMap, HashMap};
+// use std::sync::{Arc};
+// use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
+// use lazy_static::lazy_static;
+// use rust_decimal::Decimal;
+// use rust_decimal_macros::dec;
+// use tokio::sync::{Mutex};
+// use exchanges::phemex_swap_rest::PhemexSwapRest;
+// use exchanges::phemex_swap_ws::{PhemexSwapSubscribeType, PhemexSwapWs, PhemexSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use serde_json::{json};
+// use standard::exchange::ExchangeEnum;
+// use standard::exchange_struct_handler::ExchangeStructHandler;
+// use standard::utils::symbol_out_mapper;
+// use crate::json_db_utils::delete_db_by_exchange;
+// use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
+//
+// const EXCHANGE_NAME: &str = "phemex_usdt_swap";
+//
+// lazy_static! {
+//     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+//     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+//     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+//     static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
+// }
+//
+// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+//     let name = "phemex_usdt_swap_listener";
+//     // 订阅所有币种
+//     let login = BTreeMap::new();
+//     let mut phemex_rest = PhemexSwapRest::new(false, login);
+//     let params = json!({});
+//     let response = phemex_rest.get_market(params).await;
+//     let mut symbols = vec![];
+//     if response.code == 200 {
+//         let symbol_infos = response.data["perpProductsV2"].as_array().unwrap();
+//         let mut mul_map = MUL_MAP.lock().await;
+//         for symbol_info in symbol_infos {
+//             if symbol_info["quoteCurrency"] != "USDT" || symbol_info["status"] == "Delisted" { continue; }
+//             // quanto_multiplier是ct_val
+//             let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
+//             let base_currency: String = symbol_info["baseCurrency"].as_str().unwrap().to_string().split_whitespace().collect();
+//             let loc_symbol = format!("{}_{}", base_currency, symbol_info["quoteCurrency"].as_str().unwrap());
+//             let mul = Decimal::ONE;
+//             mul_map.insert(symbol_out_mapper(ExchangeEnum::PhemexSwap, &loc_symbol), mul);
+//
+//             symbols.push(symbol)
+//         }
+//     }
+//     for chunk in symbols.chunks(20) {
+//         let ws_name = name.to_string();
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//         let write_tx_am = Arc::new(Mutex::new(write_tx));
+//         let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+//         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+//
+//         tokio::spawn(async move {
+//             let mut ws = PhemexSwapWs::new_with_tag(ws_name, false, None, PhemexSwapWsType::PublicAndPrivate).clone();
+//             ws.set_subscribe(vec![
+//                 PhemexSwapSubscribeType::PuFuturesTrades,
+//                 PhemexSwapSubscribeType::PuFuturesRecords,
+//                 // PhemexSwapSubscribeType::PuFuturesOrderBook
+//             ]);
+//
+//             // 建立链接
+//             ws.set_symbols(symbols_chunk);
+//             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//         });
+//     }
+//     // 定时删除数据
+//     tokio::spawn(async move {
+//         loop {
+//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+//         }
+//     });
+// }
+//
+// // 读取数据
+// pub async fn data_listener(response: ResponseData) {
+//     if response.code != 200 {
+//         return;
+//     }
+//
+//     match response.channel.as_str() {
+//         // 深度数据
+//         "futures.order_book" => {
+//             // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::PhemexSwap, &response);
+//             //
+//             // update_depth(&depth).await;
+//         }
+//         // 订单流数据
+//         "futures.trades" => {
+//             let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::PhemexSwap, &response);
+//             let mul_map = MUL_MAP.lock().await;
+//
+//             for trade in trades.iter_mut() {
+//                 // 真实交易量处理,因为phemex的量都是张数
+//                 let mul = mul_map[trade.symbol.as_str()];
+//                 let mut real_size = trade.size * mul * trade.price;
+//                 real_size.rescale(2);
+//                 trade.size = real_size;
+//
+//                 // 更新到本地数据库
+//                 let trades_map = TRADES_MAP.lock().await;
+//                 update_trade(trade, trades_map, EXCHANGE_NAME).await;
+//             }
+//         }
+//         // k线数据
+//         "futures.candlesticks" => {
+//             let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::PhemexSwap, &response);
+//             let mul_map = MUL_MAP.lock().await;
+//             for record in records.iter_mut() {
+//                 // 真实交易量处理,因为phemex的量都是张数
+//                 let mul = mul_map[record.symbol.as_str()];
+//                 let mid_price = (record.high + record.low) * dec!(0.5);
+//                 let mut real_volume = record.volume * mul * mid_price;
+//                 real_volume.rescale(2);
+//                 record.volume = real_volume;
+//
+//                 // 更新到本地数据库
+//                 let record_map = RECORD_MAP.lock().await;
+//                 update_record(record, record_map, EXCHANGE_NAME).await;
+//             }
+//         }
+//         _ => {
+//             // info!("48 未知的数据类型: {:?}", response)
+//         }
+//     }
+// }

+ 3 - 3
src/server.rs

@@ -177,13 +177,13 @@ async fn get_exchanges() -> impl Responder {
         // "woo_usdt_swap",
         // "cointr_usdt_swap",
         // "gate_usdt_spot",
+        // "phemex_usdt_swap",
+        // "mexc_usdt_swap",
+        // "gate_coin_spot",
 
         "gate_usdt_swap",
         "binance_usdt_swap",
         "coinex_usdt_swap",
-        "phemex_usdt_swap",
-        "mexc_usdt_swap",
-        "gate_coin_spot",
         "bybit_usdt_swap",
         "bitget_usdt_swap",
     ];