use std::sync::Arc; use std::sync::atomic::AtomicBool; use tokio::spawn; use tokio::sync::Mutex; use anyhow::Result; use tokio_tungstenite::tungstenite::Message; use tracing::{error, info}; use crate::data_manager::DataManager; use crate::exchange::mexc_spot_ws::{MexcSpotWs, MexcSpotWsSubscribeType, MexcSpotWsType}; use crate::exchange::response_base::Response; use crate::utils::log_setup::setup_logging; pub struct WsManager { pub symbols: Vec, pub data_manager_am: Arc>, pub running: Arc, } impl WsManager { pub fn new(symbols: Vec, data_manager_am: Arc>, running: Arc) -> WsManager { WsManager { symbols: symbols[0..100].to_owned(), data_manager_am, running } } pub async fn subscribe_all(&mut self) -> Result<()> { // 每批最大交易对数量 const BATCH_SIZE: usize = 15; // 计算总共需要多少批次 let num_batches = (self.symbols.len() + BATCH_SIZE - 1) / BATCH_SIZE; let fun = move |response: Response| { if response.code != 200 { error!("{}", serde_json::to_string_pretty(&response.data).unwrap()); } async move {} }; for i in 0..num_batches { // 计算当前批次的起始和结束索引 let start_index = i * BATCH_SIZE; let end_index = (start_index + BATCH_SIZE).min(self.symbols.len()); // 获取当前批次的交易对 let current_batch_symbols = self.symbols[start_index..end_index].to_vec(); info!("正在创建 [{}, {}) 的连接...", start_index, end_index); // 这个通道主要是为了后面给这个ws发送消息 let (write_tx, write_rx) = futures_channel::mpsc::unbounded::(); let write_tx_am = Arc::new(Mutex::new(write_tx)); // 异步去订阅阻塞 let running = self.running.clone(); spawn(async move { let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate); ws.set_subscribe(vec![ MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()), MexcSpotWsSubscribeType::PuFuturesDepth ]); ws.set_symbols(current_batch_symbols); // 链接 ws.ws_connect_async(running, fun, &write_tx_am, write_rx) .await .expect("链接失败"); }); } Ok(()) } }