msv.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. use std::collections::BTreeMap;
  2. use std::str::FromStr;
  3. use actix_web::{HttpResponse};
  4. use chrono::Utc;
  5. use rust_decimal::Decimal;
  6. use rust_decimal::prelude::ToPrimitive;
  7. use rust_decimal_macros::dec;
  8. use serde_json::{json, Value};
  9. use crate::db_connector::get_trades_json;
  10. use crate::params_utils::{get_str, parse_str_to_decimal};
  11. use crate::server::{Response, Trade};
  12. pub fn symbol_fix(symbol: &str) -> String {
  13. let mut fixed = symbol.to_uppercase();
  14. if !fixed.contains("_USDT") {
  15. fixed = format!("{}_USDT", fixed);
  16. }
  17. fixed
  18. }
  19. // 将trades_json转换为指标
  20. pub async fn generate_msv(query_value: Value) -> HttpResponse {
  21. // 参数处理
  22. let exchange = match get_str(query_value.clone(), "exchange") {
  23. Ok(str) => {
  24. str
  25. }
  26. Err(response) => {
  27. return response
  28. }
  29. };
  30. let symbol = match get_str(query_value.clone(), "symbol") {
  31. Ok(symbol) => {
  32. symbol_fix(symbol.as_str())
  33. }
  34. Err(response) => {
  35. return response
  36. }
  37. };
  38. let mills_back = match parse_str_to_decimal(query_value.clone(), "mills_back") {
  39. Ok(t) => {
  40. t
  41. }
  42. Err(response) => {
  43. return response
  44. }
  45. };
  46. let minute_time_range = match parse_str_to_decimal(query_value.clone(), "minute_time_range") {
  47. Ok(t) => {
  48. t.to_i64().unwrap()
  49. }
  50. Err(response) => {
  51. return response
  52. }
  53. };
  54. // 链接数据服务器查询数据
  55. let end_time = Utc::now().timestamp_millis();
  56. let start_time = end_time - minute_time_range * 60 * 1000;
  57. let db_response = get_trades_json(
  58. exchange.as_str(),
  59. symbol.as_str(),
  60. start_time,
  61. end_time,
  62. ).await;
  63. // 对数据库返回的数据进行容错处理
  64. if db_response.code == 200 {
  65. // 数据本地化处理
  66. let trades = parse_json_to_trades(db_response.data);
  67. // 指标生成
  68. let indicator = generate_msv_by_trades(trades, mills_back);
  69. // 返回数据
  70. let response = Response {
  71. query: query_value.clone(),
  72. msg: Some("指标生成完毕".to_string()),
  73. code: 200,
  74. data: indicator,
  75. };
  76. let json_string = serde_json::to_string(&response).unwrap();
  77. HttpResponse::Ok().content_type("application/json").body(json_string)
  78. } else {
  79. let json_string = serde_json::to_string(&db_response).unwrap();
  80. HttpResponse::Ok().content_type("application/json").body(json_string)
  81. }
  82. }
  83. // 将trades转换为具体指标
  84. pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal) -> Value {
  85. let mut amplitude_map: BTreeMap<Decimal, Decimal> = BTreeMap::new();
  86. const GAMMA: Decimal = dec!(0.5);
  87. // 每一个元素都遍历一遍
  88. trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap()));
  89. for (index, trade) in trades.iter().enumerate() {
  90. // 该元素向前遍历range毫秒
  91. let mut range_index = if index == 0 {
  92. 0
  93. } else {
  94. index
  95. };
  96. // 计算区间的预定价格
  97. let mut ref_price = trade.price;
  98. loop {
  99. // 第0个就不搞
  100. if range_index == 0 {
  101. break;
  102. }
  103. let flag_trade = trades.get(range_index).unwrap();
  104. let range_time = trade.time - flag_trade.time;
  105. // 判断该ticker是否是range ms以外
  106. if range_time > mills_back {
  107. break;
  108. }
  109. ref_price = ref_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA);
  110. range_index -= 1;
  111. }
  112. // 逻辑计算层
  113. // 取离当前点最远的点进行测量
  114. let last_price = trade.price;
  115. // 不是初始值,以及不是0波动
  116. if index != 0 {
  117. let mut rate = Decimal::ONE_HUNDRED * (last_price - ref_price) / ref_price;
  118. rate.rescale(2);
  119. // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力
  120. if rate.eq(&Decimal::ZERO) {
  121. continue
  122. }
  123. amplitude_map.insert(trade.time, rate);
  124. }
  125. }
  126. let x: Vec<Decimal> = amplitude_map.keys().cloned().collect();
  127. let y: Vec<Decimal> = amplitude_map.values().cloned().collect();
  128. let total_size = trades.len();
  129. let result_size = x.len();
  130. json!({
  131. "x": x,
  132. "y": y,
  133. "total_size": total_size,
  134. "result_size": result_size,
  135. })
  136. }
  137. // 将json转换为trades
  138. pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
  139. let mut rst = vec![];
  140. for trade_json in trades_json.as_array().unwrap() {
  141. let arr = trade_json.as_array().unwrap();
  142. rst.push(Trade {
  143. id: arr[0].as_str().unwrap().to_string(),
  144. time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
  145. size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
  146. price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
  147. });
  148. }
  149. rst
  150. }