use std::collections::HashMap; use std::str::FromStr; use rust_decimal::Decimal; use rust_decimal::prelude::ToPrimitive; use tokio::sync::MutexGuard; use standard::{Depth, ForceOrder, Record, SimpleDepth, SpecialDepth, SpecialTrade, Trade}; use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file}; pub type DepthMap = HashMap>; pub type SimpleDepthMap = HashMap>; pub type TradeMap = HashMap>; pub type RecordMap = HashMap; pub type ForceOrderMap = HashMap>; // 更新订单流数据 pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) { if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) { if let Some(last_trade) = trades.last() { // 这里的last_trade.0是以元组形式进行访问。 let last_trade_minutes = i64::from_str(last_trade.inner()[1].as_str()).unwrap() / 60000; // 将毫秒转换成分钟数 let new_trade_minutes = new_trade.time.to_i64().unwrap() / 60000; // 同上 // 如果分钟数不同,则清空列表并添加新的trade if last_trade_minutes != new_trade_minutes { let trades_json = serde_json::to_string(trades).unwrap(); let date_str = minute_to_date(last_trade_minutes); let path = generate_file_path(exchange, date_str.as_str(), new_trade.symbol.as_str(), "trades", last_trade_minutes); // info!(?path); write_to_file(trades_json, path).await; trades.clear(); } } trades.push(SpecialTrade::new(&new_trade)); } else { // 如果该symbol不存在,则创建新的Vec并添加trade trades_map.insert(new_trade.symbol.clone(), vec![SpecialTrade::new(&new_trade)]); } } // 更新k线 pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_, RecordMap>, exchange: &str) { if new_record.time.eq(&Decimal::ZERO) { return; } // 如果k线记录存在于map,则进行一系列操作,用于保存map if let Some(record) = records_map.get_mut(new_record.symbol.as_str()) { let last_trade_minutes = record.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数 let new_trade_minutes = new_record.time.to_i64().unwrap() / 60000; // 同上 // 如果分钟数不同,则清空列表并添加新的depth if last_trade_minutes != new_trade_minutes { let record_json = serde_json::to_string(record).unwrap(); let date_str = minute_to_date(last_trade_minutes); let path = generate_file_path(exchange, date_str.as_str(), new_record.symbol.as_str(), "record", last_trade_minutes); write_to_file(record_json, path).await; } } records_map.insert(new_record.symbol.clone(), new_record.clone()); } // 更新爆仓信息 pub async fn update_force_order(new_force_order: ForceOrder, mut force_order_map: MutexGuard<'_, ForceOrderMap>, exchange: &str) { if let Some(force_order_list) = force_order_map.get_mut(new_force_order.symbol.as_str()) { if let Some(force_order) = force_order_list.last() { let last_force_minutes = force_order.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数 let new_force_minutes = new_force_order.time.to_i64().unwrap() / 60000; // 同上 // 如果分钟数不同,则清空列表并添加新的trade if last_force_minutes != new_force_minutes { let force_order_list_json = serde_json::to_string(force_order_list).unwrap(); let date_str = minute_to_date(last_force_minutes); let path = generate_file_path(exchange, date_str.as_str(), new_force_order.symbol.as_str(), "force_order", last_force_minutes); write_to_file(force_order_list_json, path).await; force_order_list.clear(); } } force_order_list.push(new_force_order); } else { // 如果该symbol不存在,则创建新的Vec并添加trade force_order_map.insert(new_force_order.symbol.clone(), vec![new_force_order]); } } // 更新深度数据 pub async fn update_depth(new_depth: &Depth, mut depth_map: MutexGuard<'_, DepthMap>, exchange: &str) { if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) { if let Some(last_depth) = depths.last() { let last_depth_minutes = last_depth.t.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数 let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上 // 如果分钟数不同,则清空列表并添加新的depth if last_depth_minutes != new_depth_minutes { let depths_json = serde_json::to_string(depths).unwrap(); let date_str = minute_to_date(last_depth_minutes); let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book", last_depth_minutes); write_to_file(depths_json, path).await; depths.clear(); } } depths.push(SpecialDepth::new(&new_depth)); } else { // 如果该symbol不存在,则创建新的Vec并添加depth depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]); } } // 更新简易深度数据 pub async fn update_simple_depth(new_depth: &SimpleDepth, mut simple_depth_map: MutexGuard<'_, SimpleDepthMap>, exchange: &str) { if let Some(depths) = simple_depth_map.get_mut(new_depth.symbol.as_str()) { if let Some(last_depth) = depths.last() { let last_depth_minutes = last_depth.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数 let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上 // 如果分钟数不同,则清空列表并添加新的depth if last_depth_minutes != new_depth_minutes { let depths_json = serde_json::to_string(depths).unwrap(); let date_str = minute_to_date(last_depth_minutes); let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book_simple", last_depth_minutes); write_to_file(depths_json, path).await; depths.clear(); } } // 去重 if let Some(last_depth) = depths.last() { if last_depth.size != new_depth.size || last_depth.a1 != new_depth.a1 || last_depth.b1 != new_depth.b1 { depths.push(new_depth.clone()); } } else { depths.push(new_depth.clone()); } } else { // 如果该symbol不存在,则创建新的Vec并添加depth simple_depth_map.insert(new_depth.symbol.clone(), vec![new_depth.clone()]); } }