| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- use std::collections::HashMap;
- use std::str::FromStr;
- use rust_decimal::Decimal;
- use rust_decimal::prelude::ToPrimitive;
- use tokio::sync::MutexGuard;
- use standard::{Depth, ForceOrder, Record, SimpleDepth, SpecialDepth, SpecialTrade, Trade};
- use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
- pub type DepthMap = HashMap<String, Vec<SpecialDepth>>;
- pub type SimpleDepthMap = HashMap<String, Vec<SimpleDepth>>;
- pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
- pub type RecordMap = HashMap<String, Record>;
- pub type ForceOrderMap = HashMap<String, Vec<ForceOrder>>;
- // 更新订单流数据
- pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
- if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) {
- if let Some(last_trade) = trades.last() {
- // 这里的last_trade.0是以元组形式进行访问。
- let last_trade_minutes = i64::from_str(last_trade.inner()[1].as_str()).unwrap() / 60000; // 将毫秒转换成分钟数
- let new_trade_minutes = new_trade.time.to_i64().unwrap() / 60000; // 同上
- // 如果分钟数不同,则清空列表并添加新的trade
- if last_trade_minutes != new_trade_minutes {
- let trades_json = serde_json::to_string(trades).unwrap();
- let date_str = minute_to_date(last_trade_minutes);
- let path = generate_file_path(exchange, date_str.as_str(), new_trade.symbol.as_str(), "trades", last_trade_minutes);
- // info!(?path);
- write_to_file(trades_json, path).await;
- trades.clear();
- }
- }
- trades.push(SpecialTrade::new(&new_trade));
- } else {
- // 如果该symbol不存在,则创建新的Vec并添加trade
- trades_map.insert(new_trade.symbol.clone(), vec![SpecialTrade::new(&new_trade)]);
- }
- }
- // 更新k线
- pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_, RecordMap>, exchange: &str) {
- if new_record.time.eq(&Decimal::ZERO) {
- return;
- }
- // 如果k线记录存在于map,则进行一系列操作,用于保存map
- if let Some(record) = records_map.get_mut(new_record.symbol.as_str()) {
- let last_trade_minutes = record.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
- let new_trade_minutes = new_record.time.to_i64().unwrap() / 60000; // 同上
- // 如果分钟数不同,则清空列表并添加新的depth
- if last_trade_minutes != new_trade_minutes {
- let record_json = serde_json::to_string(record).unwrap();
- let date_str = minute_to_date(last_trade_minutes);
- let path = generate_file_path(exchange, date_str.as_str(), new_record.symbol.as_str(), "record", last_trade_minutes);
- write_to_file(record_json, path).await;
- }
- }
- records_map.insert(new_record.symbol.clone(), new_record.clone());
- }
- // 更新爆仓信息
- pub async fn update_force_order(new_force_order: ForceOrder, mut force_order_map: MutexGuard<'_, ForceOrderMap>, exchange: &str) {
- if let Some(force_order_list) = force_order_map.get_mut(new_force_order.symbol.as_str()) {
- if let Some(force_order) = force_order_list.last() {
- let last_force_minutes = force_order.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
- let new_force_minutes = new_force_order.time.to_i64().unwrap() / 60000; // 同上
- // 如果分钟数不同,则清空列表并添加新的trade
- if last_force_minutes != new_force_minutes {
- let force_order_list_json = serde_json::to_string(force_order_list).unwrap();
- let date_str = minute_to_date(last_force_minutes);
- let path = generate_file_path(exchange, date_str.as_str(), new_force_order.symbol.as_str(), "force_order", last_force_minutes);
- write_to_file(force_order_list_json, path).await;
- force_order_list.clear();
- }
- }
- force_order_list.push(new_force_order);
- } else {
- // 如果该symbol不存在,则创建新的Vec并添加trade
- force_order_map.insert(new_force_order.symbol.clone(), vec![new_force_order]);
- }
- }
- // 更新深度数据
- pub async fn update_depth(new_depth: &Depth, mut depth_map: MutexGuard<'_, DepthMap>, exchange: &str) {
- if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {
- if let Some(last_depth) = depths.last() {
- let last_depth_minutes = last_depth.t.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
- let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
- // 如果分钟数不同,则清空列表并添加新的depth
- if last_depth_minutes != new_depth_minutes {
- let depths_json = serde_json::to_string(depths).unwrap();
- let date_str = minute_to_date(last_depth_minutes);
- let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book", last_depth_minutes);
- write_to_file(depths_json, path).await;
- depths.clear();
- }
- }
- depths.push(SpecialDepth::new(&new_depth));
- } else {
- // 如果该symbol不存在,则创建新的Vec并添加depth
- depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]);
- }
- }
- // 更新简易深度数据
- pub async fn update_simple_depth(new_depth: &SimpleDepth, mut simple_depth_map: MutexGuard<'_, SimpleDepthMap>, exchange: &str) {
- if let Some(depths) = simple_depth_map.get_mut(new_depth.symbol.as_str()) {
- if let Some(last_depth) = depths.last() {
- let last_depth_minutes = last_depth.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
- let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
- // 如果分钟数不同,则清空列表并添加新的depth
- if last_depth_minutes != new_depth_minutes {
- let depths_json = serde_json::to_string(depths).unwrap();
- let date_str = minute_to_date(last_depth_minutes);
- let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book_simple", last_depth_minutes);
- write_to_file(depths_json, path).await;
- depths.clear();
- }
- }
- // 去重
- if let Some(last_depth) = depths.last() {
- if last_depth.size != new_depth.size || last_depth.a1 != new_depth.a1 || last_depth.b1 != new_depth.b1 {
- depths.push(new_depth.clone());
- }
- } else {
- depths.push(new_depth.clone());
- }
- } else {
- // 如果该symbol不存在,则创建新的Vec并添加depth
- simple_depth_map.insert(new_depth.symbol.clone(), vec![new_depth.clone()]);
- }
- }
|