msv.rs 11 KB


  1. use std::cmp::{max, min};
  2. use std::str::FromStr;
  3. use actix_web::{HttpResponse};
  4. use chrono::Utc;
  5. use rust_decimal::Decimal;
  6. use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
  7. use rust_decimal_macros::dec;
  8. use serde_json::{json, Value};
  9. use crate::db_connector::{get_simple_depths_json, get_trades_json};
  10. use crate::params_utils::{get_str, parse_str_to_decimal};
  11. use crate::server::{Response, SimpleDepth, Trade};
  12. pub fn symbol_fix(symbol: &str) -> String {
  13. let mut fixed = symbol.to_uppercase();
  14. if !fixed.contains("_USDT") {
  15. fixed = format!("{}_USDT", fixed);
  16. }
  17. fixed
  18. }
  19. // 将trades_json转换为指标
  20. pub async fn generate_msv(query_value: Value) -> HttpResponse {
  21. // 参数处理
  22. let exchange = match get_str(query_value.clone(), "exchange") {
  23. Ok(str) => {
  24. str
  25. }
  26. Err(response) => {
  27. return response
  28. }
  29. };
  30. let symbol = match get_str(query_value.clone(), "symbol") {
  31. Ok(symbol) => {
  32. symbol_fix(symbol.as_str())
  33. }
  34. Err(response) => {
  35. return response
  36. }
  37. };
  38. let mills_back = match parse_str_to_decimal(query_value.clone(), "mills_back") {
  39. Ok(t) => {
  40. t
  41. }
  42. Err(response) => {
  43. return response
  44. }
  45. };
  46. let minute_time_range = match parse_str_to_decimal(query_value.clone(), "minute_time_range") {
  47. Ok(t) => {
  48. t.to_i64().unwrap()
  49. }
  50. Err(response) => {
  51. return response
  52. }
  53. };
  54. // 链接数据服务器查询数据
  55. let end_time = Utc::now().timestamp_millis();
  56. let start_time = end_time - minute_time_range * 60 * 1000;
  57. let trades_response = get_trades_json(
  58. exchange.as_str(),
  59. symbol.as_str(),
  60. start_time,
  61. end_time,
  62. ).await;
  63. let simple_start_time = if start_time < end_time - 240 * 60 * 1000 {
  64. end_time - 240 * 60 * 1000
  65. } else {
  66. start_time
  67. };
  68. let simple_depths_response = get_simple_depths_json(
  69. exchange.as_str(),
  70. symbol.as_str(),
  71. simple_start_time,
  72. end_time,
  73. ).await;
  74. // 对数据库返回的数据进行容错处理
  75. if trades_response.code == 200 && simple_depths_response.code == 200 {
  76. // 数据本地化处理
  77. let trades = parse_json_to_trades(trades_response.data);
  78. let simple_depths = parse_json_to_simple_depths(simple_depths_response.data);
  79. // 指标生成
  80. let indicator = generate_msv_by_trades(trades, mills_back, simple_depths, start_time, end_time);
  81. // 返回数据
  82. let response = Response {
  83. query: query_value.clone(),
  84. msg: Some("指标生成完毕".to_string()),
  85. code: 200,
  86. data: indicator,
  87. };
  88. let json_string = serde_json::to_string(&response).unwrap();
  89. HttpResponse::Ok().content_type("application/json").body(json_string)
  90. } else {
  91. let json_string = serde_json::to_string(&trades_response).unwrap();
  92. HttpResponse::Ok().content_type("application/json").body(json_string)
  93. }
  94. }
  95. // 将trades转换为具体指标
  96. pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simple_depths: Vec<SimpleDepth>, start_time: i64, end_time: i64) -> Value {
  97. let mut msv_data: Vec<Vec<Decimal>> = vec![];
  98. const GAMMA: Decimal = dec!(0.5);
  99. // 每一个元素都遍历一遍
  100. trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap()));
  101. for (index, trade) in trades.iter().enumerate() {
  102. // 该元素向前遍历range毫秒
  103. let mut range_index = if index == 0 {
  104. 0
  105. } else {
  106. index
  107. };
  108. // 计算区间的预定价格
  109. let mut ref_price = trade.price;
  110. let mut dissociation = Decimal::ZERO;
  111. loop {
  112. // 第0个就不搞
  113. if range_index == 0 {
  114. break;
  115. }
  116. let flag_trade = trades.get(range_index).unwrap();
  117. let range_time = trade.time - flag_trade.time;
  118. // 判断该ticker是否是range ms以外
  119. if range_time > mills_back {
  120. break;
  121. }
  122. ref_price = ref_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA);
  123. dissociation = dissociation + flag_trade.size.abs();
  124. range_index -= 1;
  125. }
  126. // 逻辑计算层
  127. // 取离当前点最远的点进行测量
  128. let last_price = trade.price;
  129. // 不是初始值,以及不是0波动
  130. if index != 0 {
  131. let mut rate = Decimal::ONE_HUNDRED * (last_price - ref_price) / ref_price;
  132. rate.rescale(2);
  133. // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力
  134. if rate.eq(&Decimal::ZERO) {
  135. continue
  136. }
  137. // 去重,以及保留最大的波动率
  138. if msv_data.len() > 0 {
  139. let last = msv_data[msv_data.len() - 1].clone();
  140. let last_time = last[0];
  141. let last_rate = last[1];
  142. // 如果时间相同,则可能会进行remove等操作
  143. if last_time == trade.time {
  144. // 如果最新的波动率大于最后波动率
  145. if rate.abs() > last_rate.abs() {
  146. msv_data.remove(msv_data.len() - 1);
  147. msv_data.push(vec![trade.time, rate, dissociation]);
  148. }
  149. } else {
  150. msv_data.push(vec![trade.time, rate, dissociation]);
  151. }
  152. } else {
  153. msv_data.push(vec![trade.time, rate, dissociation]);
  154. }
  155. }
  156. }
  157. // 按时间序列填充数据
  158. let mut msv_index = 0;
  159. let mut final_msv_data: Vec<Vec<Decimal>> = vec![];
  160. let mut depth_index = 0;
  161. let mut final_depth_data: Vec<Vec<Decimal>> = vec![];
  162. let mut final_spread_data: Vec<Vec<Decimal>> = vec![];
  163. let mut index_timestamp = Decimal::from_i64(start_time).unwrap();
  164. let last_timestamp = Decimal::from_i64(end_time).unwrap();
  165. let step_timestamp = dec!(1000);
  166. loop {
  167. let mut max_msv_data = Decimal::ZERO;
  168. let mut max_msv_diss_data = Decimal::ZERO;
  169. // ====================================== 数据生产 ===============================================
  170. // 获取时间范围内的波动率数据
  171. loop {
  172. // 下标合法性判断
  173. if msv_index >= msv_data.len() {
  174. break;
  175. }
  176. // msv_data的指定下标数据不在时间范围内(时间范围:指的是[index_timestamp-mills_back, index_timestamp]这个范围)
  177. if index_timestamp < msv_data[msv_index][0] {
  178. break;
  179. }
  180. // -------------- 大小判断,取值
  181. let msv_d = msv_data[msv_index][1];
  182. let msv_diss_data = msv_data[msv_index][2];
  183. // msv波动数据
  184. if max_msv_data.abs() < msv_d.abs() {
  185. max_msv_data = msv_d;
  186. }
  187. // 堆叠的交易量数据
  188. if max_msv_diss_data < msv_diss_data {
  189. max_msv_diss_data = msv_diss_data;
  190. }
  191. // 下标步近
  192. msv_index = msv_index + 1;
  193. }
  194. // 获取时间范围内的深度数据、买一及卖一价数据
  195. let mut max_size = Decimal::ZERO;
  196. let mut min_size = Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND;
  197. let mut max_spread = Decimal::ZERO;
  198. loop {
  199. // 下标合法性判断
  200. if depth_index >= simple_depths.len() {
  201. break;
  202. }
  203. let depth = &simple_depths[depth_index];
  204. // 时间范围合法性判断,只统计那一秒以内的深度总交易量
  205. if index_timestamp < depth.time {
  206. break;
  207. }
  208. // 这一秒的深度最大值、最小值
  209. max_size = max(max_size, depth.size);
  210. min_size = min(min_size, depth.size);
  211. // 这一秒的差价最大值
  212. let spread = depth.a1 - depth.b1;
  213. max_spread = max(spread, max_spread);
  214. // 下标步近
  215. depth_index += 1;
  216. }
  217. // ====================================== 智能填充数据 ===============================================
  218. // 价差
  219. final_spread_data.push(vec![index_timestamp, max_spread]);
  220. // 流动性数据叠加
  221. let rst_size = if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && final_depth_data.len() > 0 {
  222. final_depth_data.last().unwrap()[1]
  223. } else {
  224. if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && simple_depths.len() > 0 {
  225. simple_depths[0].size
  226. } else {
  227. if simple_depths.len() > 0 {
  228. (max_size + min_size) / Decimal::TWO
  229. } else {
  230. Decimal::ZERO
  231. }
  232. }
  233. };
  234. final_depth_data.push(vec![index_timestamp, rst_size]);
  235. // 波动率数据处理
  236. // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的
  237. if max_msv_data == Decimal::ZERO {
  238. final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]);
  239. // 说明在这个时间范围内是有数据存在的
  240. } else {
  241. final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_diss_data]);
  242. }
  243. // ====================================== 时间步进处理 ======================================
  244. // 对时间进行步近
  245. index_timestamp = index_timestamp + step_timestamp;
  246. // 时间越界
  247. if index_timestamp > last_timestamp {
  248. break
  249. }
  250. }
  251. // 结果统计
  252. let total_size = trades.len();
  253. let result_size = final_msv_data.len();
  254. json!({
  255. "msv": final_msv_data,
  256. "liqs": final_depth_data,
  257. "spreads": final_spread_data,
  258. "total_size": total_size,
  259. "result_size": result_size,
  260. })
  261. }
  262. // 将json转换为trades
  263. pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
  264. let mut rst = vec![];
  265. for trade_json in trades_json.as_array().unwrap() {
  266. let arr = trade_json.as_array().unwrap();
  267. rst.push(Trade {
  268. id: arr[0].as_str().unwrap().to_string(),
  269. time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
  270. size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
  271. price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
  272. });
  273. }
  274. rst
  275. }
  276. // 将json转换为简易深度数据
  277. pub fn parse_json_to_simple_depths(depths_json: Value) -> Vec<SimpleDepth> {
  278. let mut rst = vec![];
  279. for depth_json in depths_json.as_array().unwrap() {
  280. let depth: SimpleDepth = SimpleDepth {
  281. time: Decimal::from_str(depth_json["time"].as_str().unwrap()).unwrap(),
  282. size: Decimal::from_str(depth_json["size"].as_str().unwrap()).unwrap(),
  283. a1: Decimal::from_str(depth_json["a1"].as_str().unwrap()).unwrap(),
  284. b1: Decimal::from_str(depth_json["b1"].as_str().unwrap()).unwrap(),
  285. };
  286. rst.insert(0, depth)
  287. }
  288. rst
  289. }