msv.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. use std::str::FromStr;
  2. use actix_web::{HttpResponse};
  3. use chrono::Utc;
  4. use rust_decimal::Decimal;
  5. use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
  6. use rust_decimal_macros::dec;
  7. use serde_json::{json, Value};
  8. use crate::db_connector::{get_depths_json, get_trades_json};
  9. use crate::params_utils::{get_str, parse_str_to_decimal};
  10. use crate::server::{Response, SimpleDepth, Trade};
  11. pub fn symbol_fix(symbol: &str) -> String {
  12. let mut fixed = symbol.to_uppercase();
  13. if !fixed.contains("_USDT") {
  14. fixed = format!("{}_USDT", fixed);
  15. }
  16. fixed
  17. }
  18. // 将trades_json转换为指标
  19. pub async fn generate_msv(query_value: Value) -> HttpResponse {
  20. // 参数处理
  21. let exchange = match get_str(query_value.clone(), "exchange") {
  22. Ok(str) => {
  23. str
  24. }
  25. Err(response) => {
  26. return response
  27. }
  28. };
  29. let symbol = match get_str(query_value.clone(), "symbol") {
  30. Ok(symbol) => {
  31. symbol_fix(symbol.as_str())
  32. }
  33. Err(response) => {
  34. return response
  35. }
  36. };
  37. let mills_back = match parse_str_to_decimal(query_value.clone(), "mills_back") {
  38. Ok(t) => {
  39. t
  40. }
  41. Err(response) => {
  42. return response
  43. }
  44. };
  45. let minute_time_range = match parse_str_to_decimal(query_value.clone(), "minute_time_range") {
  46. Ok(t) => {
  47. t.to_i64().unwrap()
  48. }
  49. Err(response) => {
  50. return response
  51. }
  52. };
  53. // 链接数据服务器查询数据
  54. let end_time = Utc::now().timestamp_millis();
  55. let start_time = end_time - minute_time_range * 60 * 1000;
  56. let trades_response = get_trades_json(
  57. exchange.as_str(),
  58. symbol.as_str(),
  59. start_time,
  60. end_time,
  61. ).await;
  62. let depth_response = get_depths_json(
  63. exchange.as_str(),
  64. symbol.as_str(),
  65. start_time,
  66. end_time,
  67. ).await;
  68. // 对数据库返回的数据进行容错处理
  69. if trades_response.code == 200 && depth_response.code == 200{
  70. // 数据本地化处理
  71. let trades = parse_json_to_trades(trades_response.data);
  72. // depth本地化处理
  73. let simple_depths = parse_json_to_simple_depths(depth_response.data);
  74. // 指标生成
  75. let indicator = generate_msv_by_trades(trades, simple_depths, mills_back, start_time, end_time);
  76. // 返回数据
  77. let response = Response {
  78. query: query_value.clone(),
  79. msg: Some("指标生成完毕".to_string()),
  80. code: 200,
  81. data: indicator,
  82. };
  83. let json_string = serde_json::to_string(&response).unwrap();
  84. HttpResponse::Ok().content_type("application/json").body(json_string)
  85. } else {
  86. let json_string = serde_json::to_string(&trades_response).unwrap();
  87. HttpResponse::Ok().content_type("application/json").body(json_string)
  88. }
  89. }
  90. // 将trades转换为具体指标
  91. pub fn generate_msv_by_trades(mut trades: Vec<Trade>, simple_depths: Vec<SimpleDepth>, mills_back: Decimal, start_time: i64, end_time: i64) -> Value {
  92. let mut msv_data: Vec<Vec<Decimal>> = vec![];
  93. const GAMMA: Decimal = dec!(0.5);
  94. // 每一个元素都遍历一遍
  95. trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap()));
  96. for (index, trade) in trades.iter().enumerate() {
  97. // 该元素向前遍历range毫秒
  98. let mut range_index = if index == 0 {
  99. 0
  100. } else {
  101. index
  102. };
  103. // 计算区间的预定价格
  104. let mut ref_price = trade.price;
  105. let mut dissociation = Decimal::ZERO;
  106. loop {
  107. // 第0个就不搞
  108. if range_index == 0 {
  109. break;
  110. }
  111. let flag_trade = trades.get(range_index).unwrap();
  112. let range_time = trade.time - flag_trade.time;
  113. // 判断该ticker是否是range ms以外
  114. if range_time > mills_back {
  115. break;
  116. }
  117. ref_price = ref_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA);
  118. dissociation = dissociation + flag_trade.size.abs();
  119. range_index -= 1;
  120. }
  121. // 逻辑计算层
  122. // 取离当前点最远的点进行测量
  123. let last_price = trade.price;
  124. // 不是初始值,以及不是0波动
  125. if index != 0 {
  126. let mut rate = Decimal::ONE_HUNDRED * (last_price - ref_price) / ref_price;
  127. rate.rescale(2);
  128. // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力
  129. if rate.eq(&Decimal::ZERO) {
  130. continue
  131. }
  132. // 去重,以及保留最大的波动率
  133. if msv_data.len() > 0 {
  134. let last = msv_data[msv_data.len() - 1].clone();
  135. let last_time = last[0];
  136. let last_rate = last[1];
  137. // 如果时间相同,则可能会进行remove等操作
  138. if last_time == trade.time {
  139. // 如果最新的波动率大于最后波动率
  140. if rate.abs() > last_rate.abs() {
  141. msv_data.remove(msv_data.len() - 1);
  142. msv_data.push(vec![trade.time, rate, dissociation]);
  143. }
  144. } else {
  145. msv_data.push(vec![trade.time, rate, dissociation]);
  146. }
  147. } else {
  148. msv_data.push(vec![trade.time, rate, dissociation]);
  149. }
  150. }
  151. }
  152. // 按时间序列填充数据
  153. let mut final_msv_data: Vec<Vec<Decimal>> = vec![];
  154. let mut final_depth_data: Vec<Vec<Decimal>> = vec![];
  155. let mut data_index = 0;
  156. let mut depth_index = 0;
  157. let mut index_timestamp = Decimal::from_i64(start_time).unwrap();
  158. let last_timestamp = Decimal::from_i64(end_time).unwrap();
  159. let step_timestamp = dec!(1000);
  160. loop {
  161. let mut max_msv_data = Decimal::ZERO;
  162. let mut max_msv_diss_data = Decimal::ZERO;
  163. // 获取时间范围内的波动率数据
  164. loop {
  165. // 下标合法性判断
  166. if data_index >= msv_data.len() {
  167. break;
  168. }
  169. // msv_data的指定下标数据不在时间范围内(时间范围:指的是[index_timestamp-mills_back, index_timestamp]这个范围)
  170. if index_timestamp < msv_data[data_index][0] {
  171. break;
  172. }
  173. // -------------- 大小判断,取值
  174. let msv_d = msv_data[data_index][1];
  175. let msv_diss_data = msv_data[data_index][2];
  176. // msv波动数据
  177. if max_msv_data.abs() < msv_d.abs() {
  178. max_msv_data = msv_d;
  179. }
  180. // 堆叠的交易量数据
  181. if max_msv_diss_data < msv_diss_data {
  182. max_msv_diss_data = msv_diss_data;
  183. }
  184. // 下标步近
  185. data_index = data_index + 1;
  186. }
  187. // 获取时间范围内的深度数据
  188. let mut total_size = Decimal::ZERO;
  189. loop {
  190. // 下标合法性判断
  191. if depth_index >= simple_depths.len() {
  192. break;
  193. }
  194. // 时间范围合法性判断,只统计那一秒以内的深度总交易量
  195. if index_timestamp < simple_depths[depth_index].time {
  196. break;
  197. }
  198. // 这一秒的所有深度数据求和
  199. total_size += simple_depths[depth_index].size;
  200. // 下标步近
  201. depth_index += 1;
  202. }
  203. // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的
  204. if max_msv_data == Decimal::ZERO {
  205. final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]);
  206. // 说明在这个时间范围内是有数据存在的
  207. } else {
  208. final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_diss_data]);
  209. }
  210. // 简易的深度数据处理
  211. final_depth_data.push(vec![index_timestamp, total_size]);
  212. // ---------------- 最后的时间处理 -----------------
  213. // 对时间进行步近
  214. index_timestamp = index_timestamp + step_timestamp;
  215. // 时间越界
  216. if index_timestamp > last_timestamp {
  217. break
  218. }
  219. }
  220. // 结果统计
  221. let total_size = trades.len();
  222. let result_size = final_msv_data.len();
  223. json!({
  224. "msv": final_msv_data,
  225. "tr": final_depth_data,
  226. "total_size": total_size,
  227. "result_size": result_size,
  228. })
  229. }
  230. // 将json转换为trades
  231. pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
  232. let mut rst = vec![];
  233. for trade_json in trades_json.as_array().unwrap() {
  234. let arr = trade_json.as_array().unwrap();
  235. rst.push(Trade {
  236. id: arr[0].as_str().unwrap().to_string(),
  237. time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
  238. size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
  239. price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
  240. });
  241. }
  242. rst
  243. }
  244. // 将json转换为简易深度数据
  245. pub fn parse_json_to_simple_depths(depths_json: Value) -> Vec<SimpleDepth> {
  246. let mut rst = vec![];
  247. for depth_json in depths_json.as_array().unwrap() {
  248. let time = Decimal::from_str(depth_json["t"].as_str().unwrap()).unwrap();
  249. let mut size = Decimal::ZERO;
  250. // 卖盘交易量统计
  251. for ask_order_book in depth_json["a"].as_array().unwrap() {
  252. let price = Decimal::from_str(ask_order_book.as_array().unwrap()[0].as_str().unwrap()).unwrap();
  253. let amount = Decimal::from_str(ask_order_book.as_array().unwrap()[1].as_str().unwrap()).unwrap();
  254. size = size + price * amount;
  255. }
  256. // 买盘交易量统计
  257. for bid_order_book in depth_json["b"].as_array().unwrap() {
  258. let price = Decimal::from_str(bid_order_book.as_array().unwrap()[0].as_str().unwrap()).unwrap();
  259. let amount = Decimal::from_str(bid_order_book.as_array().unwrap()[1].as_str().unwrap()).unwrap();
  260. size = size + price * amount;
  261. }
  262. size.rescale(2);
  263. let simple_depth = SimpleDepth {
  264. time,
  265. size,
  266. };
  267. rst.insert(0, simple_depth)
  268. }
  269. rst
  270. }