|
|
@@ -10,6 +10,7 @@ use serde_json::Value;
|
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
|
use tracing::{error, info};
|
|
|
use crate::data_manager::DataManager;
|
|
|
+use crate::exchange::mexc_spot_client::MexcSpotClient;
|
|
|
use crate::exchange::mexc_spot_ws::{MexcSpotWs, MexcSpotWsSubscribeType, MexcSpotWsType};
|
|
|
use crate::exchange::response_base::Response;
|
|
|
|
|
|
@@ -18,6 +19,7 @@ pub struct WsManager {
|
|
|
pub managers: Arc<Mutex<Vec<Arc<Mutex<DataManager>>>>>,
|
|
|
pub filtered_map: HashMap<String, Value>,
|
|
|
pub running: Arc<AtomicBool>,
|
|
|
+ pub client_am: Arc<Mutex<MexcSpotClient>>,
|
|
|
}
|
|
|
|
|
|
impl WsManager {
|
|
|
@@ -26,6 +28,7 @@ impl WsManager {
|
|
|
symbols,
|
|
|
filtered_map,
|
|
|
running,
|
|
|
+ client_am: Arc::new(tokio::sync::Mutex::new(MexcSpotClient::new_with_tag("MexcSpot".to_string(), None))),
|
|
|
managers: Arc::new(Mutex::new(vec![])),
|
|
|
};
|
|
|
|
|
|
@@ -42,6 +45,7 @@ impl WsManager {
|
|
|
let num_batches = (self.symbols.len() + BATCH_SIZE - 1) / BATCH_SIZE;
|
|
|
|
|
|
for i in 0..num_batches {
|
|
|
+ // 这里是一组数据(至多15个),使用一个DataManager
|
|
|
let data_manager = DataManager::new(self.filtered_map.clone());
|
|
|
let data_manager_am = Arc::new(Mutex::new(data_manager));
|
|
|
|
|
|
@@ -66,6 +70,9 @@ impl WsManager {
|
|
|
|
|
|
let mut dm_guard = dm_clone.lock().await;
|
|
|
dm_guard.record_latency(received_at, origin_timestamp);
|
|
|
+
|
|
|
+ // 提前获取depth全量数据
|
|
|
+ // info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
|
|
|
dm_guard.process_depth_data(response.data).await.unwrap();
|
|
|
}
|
|
|
};
|
|
|
@@ -76,7 +83,29 @@ impl WsManager {
|
|
|
|
|
|
// 获取当前批次的交易对
|
|
|
let current_batch_symbols = self.symbols[start_index..end_index].to_vec();
|
|
|
- info!("正在创建 {:?} 的连接...", current_batch_symbols);
|
|
|
+ info!("正在获取 {:?} 的原始深度数据", current_batch_symbols);
|
|
|
+
|
|
|
+ let mut rest_client = self.client_am.lock().await;
|
|
|
+ for current_batch_symbol in current_batch_symbols.iter() {
|
|
|
+ let symbol = current_batch_symbol.clone();
|
|
|
+ let params = serde_json::json!({
|
|
|
+ "symbol": symbol,
|
|
|
+ "limit": 100,
|
|
|
+ });
|
|
|
+
|
|
|
+ let depth_response = rest_client.depth(params).await;
|
|
|
+
|
|
|
+ if let Some(object_map) = depth_response.data.as_object() {
|
|
|
+ let keys: Vec<String> = object_map.keys().map(|s| s.to_string()).collect();
|
|
|
+ info!("Symbol: {}, Keys in the top-level object: {:?}", symbol, keys);
|
|
|
+ } else {
|
|
|
+ info!("The top-level value is not a JSON object:{:?}", serde_json::to_string_pretty(&depth_response.data)?);
|
|
|
+ }
|
|
|
+
|
|
|
+ tokio::time::sleep(Duration::from_millis(20)).await;
|
|
|
+ }
|
|
|
+
|
|
|
+ info!("正在创建 {:?} 的订阅...", current_batch_symbols);
|
|
|
|
|
|
// 这个通道主要是为了后面给这个ws发送消息
|
|
|
let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
|
|
|
@@ -96,7 +125,7 @@ impl WsManager {
|
|
|
// 链接
|
|
|
ws.ws_connect_async(running, fun, &write_tx_am, write_rx)
|
|
|
.await
|
|
|
- .expect("链接失败");
|
|
|
+ .expect("ws链接失败");
|
|
|
});
|
|
|
}
|
|
|
|