listener_tools.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. use std::collections::HashMap;
  2. use std::str::FromStr;
  3. use rust_decimal::Decimal;
  4. use rust_decimal::prelude::ToPrimitive;
  5. use tokio::sync::MutexGuard;
  6. use standard::{Depth, ForceOrder, Record, SimpleDepth, SpecialDepth, SpecialTrade, Trade};
  7. use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
  8. pub type DepthMap = HashMap<String, Vec<SpecialDepth>>;
  9. pub type SimpleDepthMap = HashMap<String, Vec<SimpleDepth>>;
  10. pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
  11. pub type RecordMap = HashMap<String, Record>;
  12. pub type ForceOrderMap = HashMap<String, Vec<ForceOrder>>;
  13. // 更新订单流数据
  14. pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
  15. if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) {
  16. if let Some(last_trade) = trades.last() {
  17. // 这里的last_trade.0是以元组形式进行访问。
  18. let last_trade_minutes = i64::from_str(last_trade.inner()[1].as_str()).unwrap() / 60000; // 将毫秒转换成分钟数
  19. let new_trade_minutes = new_trade.time.to_i64().unwrap() / 60000; // 同上
  20. // 如果分钟数不同,则清空列表并添加新的trade
  21. if last_trade_minutes != new_trade_minutes {
  22. let trades_json = serde_json::to_string(trades).unwrap();
  23. let date_str = minute_to_date(last_trade_minutes);
  24. let path = generate_file_path(exchange, date_str.as_str(), new_trade.symbol.as_str(), "trades", last_trade_minutes);
  25. // info!(?path);
  26. write_to_file(trades_json, path).await;
  27. trades.clear();
  28. }
  29. }
  30. trades.push(SpecialTrade::new(&new_trade));
  31. } else {
  32. // 如果该symbol不存在,则创建新的Vec并添加trade
  33. trades_map.insert(new_trade.symbol.clone(), vec![SpecialTrade::new(&new_trade)]);
  34. }
  35. }
  36. // 更新k线
  37. pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_, RecordMap>, exchange: &str) {
  38. if new_record.time.eq(&Decimal::ZERO) {
  39. return;
  40. }
  41. // 如果k线记录存在于map,则进行一系列操作,用于保存map
  42. if let Some(record) = records_map.get_mut(new_record.symbol.as_str()) {
  43. let last_trade_minutes = record.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
  44. let new_trade_minutes = new_record.time.to_i64().unwrap() / 60000; // 同上
  45. // 如果分钟数不同,则清空列表并添加新的depth
  46. if last_trade_minutes != new_trade_minutes {
  47. let record_json = serde_json::to_string(record).unwrap();
  48. let date_str = minute_to_date(last_trade_minutes);
  49. let path = generate_file_path(exchange, date_str.as_str(), new_record.symbol.as_str(), "record", last_trade_minutes);
  50. write_to_file(record_json, path).await;
  51. }
  52. }
  53. records_map.insert(new_record.symbol.clone(), new_record.clone());
  54. }
  55. // 更新爆仓信息
  56. pub async fn update_force_order(new_force_order: ForceOrder, mut force_order_map: MutexGuard<'_, ForceOrderMap>, exchange: &str) {
  57. if let Some(force_order_list) = force_order_map.get_mut(new_force_order.symbol.as_str()) {
  58. if let Some(force_order) = force_order_list.last() {
  59. let last_force_minutes = force_order.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
  60. let new_force_minutes = new_force_order.time.to_i64().unwrap() / 60000; // 同上
  61. // 如果分钟数不同,则清空列表并添加新的trade
  62. if last_force_minutes != new_force_minutes {
  63. let force_order_list_json = serde_json::to_string(force_order_list).unwrap();
  64. let date_str = minute_to_date(last_force_minutes);
  65. let path = generate_file_path(exchange, date_str.as_str(), new_force_order.symbol.as_str(), "force_order", last_force_minutes);
  66. write_to_file(force_order_list_json, path).await;
  67. force_order_list.clear();
  68. }
  69. }
  70. force_order_list.push(new_force_order);
  71. } else {
  72. // 如果该symbol不存在,则创建新的Vec并添加trade
  73. force_order_map.insert(new_force_order.symbol.clone(), vec![new_force_order]);
  74. }
  75. }
  76. // 更新深度数据
  77. pub async fn update_depth(new_depth: &Depth, mut depth_map: MutexGuard<'_, DepthMap>, exchange: &str) {
  78. if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {
  79. if let Some(last_depth) = depths.last() {
  80. let last_depth_minutes = last_depth.t.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
  81. let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
  82. // 如果分钟数不同,则清空列表并添加新的depth
  83. if last_depth_minutes != new_depth_minutes {
  84. let depths_json = serde_json::to_string(depths).unwrap();
  85. let date_str = minute_to_date(last_depth_minutes);
  86. let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book", last_depth_minutes);
  87. write_to_file(depths_json, path).await;
  88. depths.clear();
  89. }
  90. }
  91. depths.push(SpecialDepth::new(&new_depth));
  92. } else {
  93. // 如果该symbol不存在,则创建新的Vec并添加depth
  94. depth_map.insert(new_depth.symbol.clone(), vec![SpecialDepth::new(&new_depth)]);
  95. }
  96. }
  97. // 更新简易深度数据
  98. pub async fn update_simple_depth(new_depth: &SimpleDepth, mut simple_depth_map: MutexGuard<'_, SimpleDepthMap>, exchange: &str) {
  99. if let Some(depths) = simple_depth_map.get_mut(new_depth.symbol.as_str()) {
  100. if let Some(last_depth) = depths.last() {
  101. let last_depth_minutes = last_depth.time.to_i64().unwrap() / 60000; // 将毫秒转换成分钟数
  102. let new_depth_minutes = new_depth.time.to_i64().unwrap() / 60000; // 同上
  103. // 如果分钟数不同,则清空列表并添加新的depth
  104. if last_depth_minutes != new_depth_minutes {
  105. let depths_json = serde_json::to_string(depths).unwrap();
  106. let date_str = minute_to_date(last_depth_minutes);
  107. let path = generate_file_path(exchange, date_str.as_str(), new_depth.symbol.as_str(), "order_book_simple", last_depth_minutes);
  108. write_to_file(depths_json, path).await;
  109. depths.clear();
  110. }
  111. }
  112. // 去重
  113. if let Some(last_depth) = depths.last() {
  114. if last_depth.size != new_depth.size || last_depth.a1 != new_depth.a1 || last_depth.b1 != new_depth.b1 {
  115. depths.push(new_depth.clone());
  116. }
  117. } else {
  118. depths.push(new_depth.clone());
  119. }
  120. } else {
  121. // 如果该symbol不存在,则创建新的Vec并添加depth
  122. simple_depth_map.insert(new_depth.symbol.clone(), vec![new_depth.clone()]);
  123. }
  124. }