msv.rs 6.6 KB


  1. use std::cmp::max;
  2. use std::collections::BTreeMap;
  3. use std::str::FromStr;
  4. use actix_web::{HttpResponse};
  5. use chrono::Utc;
  6. use rust_decimal::Decimal;
  7. use rust_decimal_macros::dec;
  8. use serde_json::{json, Value};
  9. use crate::db_connector::get_trades_json;
  10. use crate::server::{Response, Trade};
  11. pub fn symbol_fix(symbol: &str) -> String {
  12. let mut fixed = symbol.to_uppercase();
  13. if !fixed.contains("_USDT") {
  14. fixed = format!("{}_USDT", fixed);
  15. }
  16. fixed
  17. }
  18. // 将trades_json转换为指标
  19. pub async fn generate_msv(query_value: Value) -> HttpResponse {
  20. // 参数处理
  21. let exchange = match query_value["exchange"].as_str() {
  22. None => {
  23. "gate_usdt_swap"
  24. }
  25. Some(exchange) => {
  26. exchange
  27. }
  28. };
  29. let symbol = match query_value["symbol"].as_str() {
  30. None => {
  31. "BTC_USDT".to_string()
  32. }
  33. Some(symbol) => {
  34. symbol_fix(symbol)
  35. }
  36. };
  37. let mills_back = match query_value["mills_back"].as_str() {
  38. None => {
  39. dec!(37)
  40. }
  41. Some(mills_back_str) => {
  42. Decimal::from_str(mills_back_str).unwrap()
  43. }
  44. };
  45. let minute_time_range = match query_value["minute_time_range"].as_str() {
  46. None => {
  47. 240
  48. }
  49. Some(minute_time_range_str) => {
  50. match i64::from_str(minute_time_range_str) {
  51. Ok(minute_time_range) => {
  52. minute_time_range
  53. }
  54. Err(_) => {
  55. // 返回数据
  56. let response = Response {
  57. query: query_value.clone(),
  58. msg: Some("时间不要字符串,要数字,这个版本不支持字符串".to_string()),
  59. code: 500,
  60. data: Value::Null,
  61. };
  62. let json_string = serde_json::to_string(&response).unwrap();
  63. return HttpResponse::Ok().content_type("application/json").body(json_string)
  64. }
  65. }
  66. }
  67. };
  68. // 链接数据服务器查询数据
  69. let end_time = Utc::now().timestamp_millis();
  70. let start_time = end_time - minute_time_range * 60 * 1000;
  71. let db_response = get_trades_json(
  72. exchange,
  73. symbol.as_str(),
  74. start_time,
  75. end_time,
  76. ).await;
  77. // 对数据库返回的数据进行容错处理
  78. if db_response.code == 200 {
  79. // 数据本地化处理
  80. let trades = parse_json_to_trades(db_response.data);
  81. // 指标生成
  82. let indicator = generate_msv_by_trades(trades, mills_back);
  83. // 返回数据
  84. let response = Response {
  85. query: query_value.clone(),
  86. msg: Some("指标生成完毕".to_string()),
  87. code: 200,
  88. data: indicator,
  89. };
  90. let json_string = serde_json::to_string(&response).unwrap();
  91. HttpResponse::Ok().content_type("application/json").body(json_string)
  92. } else {
  93. let json_string = serde_json::to_string(&db_response).unwrap();
  94. HttpResponse::BadRequest().content_type("application/json").body(json_string)
  95. }
  96. }
  97. // 将trades转换为具体指标
  98. pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal) -> Value {
  99. let mut amplitude_map: BTreeMap<Decimal, Decimal> = BTreeMap::new();
  100. // 每一个元素都遍历一遍
  101. trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap()));
  102. for (index, trade) in trades.iter().enumerate() {
  103. // 该元素向前遍历range毫秒
  104. let mut range_index = if index == 0 {
  105. 0
  106. } else {
  107. index
  108. };
  109. // 寻找区间最大值、最小值
  110. let mut max_price = dec!(-1);
  111. let mut min_price = dec!(1e28);
  112. loop {
  113. // 第0个就不搞
  114. if range_index == 0 {
  115. break;
  116. }
  117. let flag_trade = trades.get(range_index).unwrap();
  118. let range_time = trade.time - flag_trade.time;
  119. // 判断该ticker是否是range ms以外
  120. if range_time > mills_back {
  121. break;
  122. }
  123. // 判断最大值、最小值
  124. if flag_trade.price > max_price {
  125. max_price = flag_trade.price;
  126. }
  127. if flag_trade.price < min_price {
  128. min_price = flag_trade.price;
  129. }
  130. range_index -= 1;
  131. }
  132. // 逻辑计算层
  133. // 取离当前点最远的点进行测量
  134. let last_price = trade.price;
  135. // 不是初始值,以及不是0波动
  136. if index != 0 {
  137. let mut up_rate = Decimal::ONE_HUNDRED * (last_price - min_price) / min_price;
  138. let mut dn_rate = Decimal::ONE_HUNDRED * (last_price - max_price) / max_price;
  139. up_rate.rescale(2);
  140. dn_rate.rescale(2);
  141. // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力
  142. if up_rate.eq(&Decimal::ZERO) && dn_rate.eq(&Decimal::ZERO) {
  143. continue
  144. }
  145. // 如果已经生成了一个波动,则也要和已生成的波动进行比较
  146. let insert_value = if amplitude_map.contains_key(&trade.time) {
  147. let origin_rate = amplitude_map.get(&trade.time).unwrap();
  148. if up_rate > dn_rate.abs() {
  149. max(*origin_rate, up_rate)
  150. } else {
  151. max(*origin_rate, dn_rate)
  152. }
  153. } else {
  154. if up_rate > dn_rate.abs() {
  155. up_rate
  156. } else {
  157. dn_rate
  158. }
  159. };
  160. amplitude_map.insert(trade.time, insert_value);
  161. }
  162. }
  163. let x: Vec<Decimal> = amplitude_map.keys().cloned().collect();
  164. let y: Vec<Decimal> = amplitude_map.values().cloned().collect();
  165. let total_size = trades.len();
  166. let result_size = x.len();
  167. json!({
  168. "x": x,
  169. "y": y,
  170. "total_size": total_size,
  171. "result_size": result_size,
  172. })
  173. }
  174. // 将json转换为trades
  175. pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
  176. let mut rst = vec![];
  177. for trade_json in trades_json.as_array().unwrap() {
  178. let arr = trade_json.as_array().unwrap();
  179. rst.push(Trade {
  180. id: arr[0].as_str().unwrap().to_string(),
  181. time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
  182. size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
  183. price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
  184. });
  185. }
  186. rst
  187. }