ws_manager.rs 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use tokio::spawn;
  4. use tokio::sync::Mutex;
  5. use anyhow::Result;
  6. use tokio_tungstenite::tungstenite::Message;
  7. use tracing::{error, info};
  8. use crate::data_manager::DataManager;
  9. use crate::exchange::mexc_spot_ws::{MexcSpotWs, MexcSpotWsSubscribeType, MexcSpotWsType};
  10. use crate::exchange::response_base::Response;
  11. use crate::utils::log_setup::setup_logging;
  12. pub struct WsManager {
  13. pub symbols: Vec<String>,
  14. pub data_manager_am: Arc<Mutex<DataManager>>,
  15. pub running: Arc<AtomicBool>,
  16. }
  17. impl WsManager {
  18. pub fn new(symbols: Vec<String>, data_manager_am: Arc<Mutex<DataManager>>, running: Arc<AtomicBool>) -> WsManager {
  19. WsManager {
  20. symbols: symbols[0..100].to_owned(),
  21. data_manager_am,
  22. running
  23. }
  24. }
  25. pub async fn subscribe_all(&mut self) -> Result<()> {
  26. // 每批最大交易对数量
  27. const BATCH_SIZE: usize = 15;
  28. // 计算总共需要多少批次
  29. let num_batches = (self.symbols.len() + BATCH_SIZE - 1) / BATCH_SIZE;
  30. let fun = move |response: Response| {
  31. if response.code != 200 {
  32. error!("{}", serde_json::to_string_pretty(&response.data).unwrap());
  33. }
  34. async move {}
  35. };
  36. for i in 0..num_batches {
  37. // 计算当前批次的起始和结束索引
  38. let start_index = i * BATCH_SIZE;
  39. let end_index = (start_index + BATCH_SIZE).min(self.symbols.len());
  40. // 获取当前批次的交易对
  41. let current_batch_symbols = self.symbols[start_index..end_index].to_vec();
  42. info!("正在创建 [{}, {}) 的连接...", start_index, end_index);
  43. // 这个通道主要是为了后面给这个ws发送消息
  44. let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
  45. let write_tx_am = Arc::new(Mutex::new(write_tx));
  46. // 异步去订阅阻塞
  47. let running = self.running.clone();
  48. spawn(async move {
  49. let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
  50. ws.set_subscribe(vec![
  51. MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string()),
  52. MexcSpotWsSubscribeType::PuFuturesDepth
  53. ]);
  54. ws.set_symbols(current_batch_symbols);
  55. // 链接
  56. ws.ws_connect_async(running, fun, &write_tx_am, write_rx)
  57. .await
  58. .expect("链接失败");
  59. });
  60. }
  61. Ok(())
  62. }
  63. }