msv.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. use std::cmp::{max, min};
  2. use std::str::FromStr;
  3. use actix_web::{HttpResponse};
  4. use chrono::Utc;
  5. use rust_decimal::Decimal;
  6. use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
  7. use rust_decimal_macros::dec;
  8. use serde_json::{json, Value};
  9. use tracing::info;
  10. use crate::db_connector::{get_simple_depths_json, get_trades_json};
  11. use crate::params_utils::{get_str, parse_str_to_decimal};
  12. use crate::server::{Response, SimpleDepth, Trade};
  13. pub fn symbol_fix(symbol: &str) -> String {
  14. let mut fixed = symbol.to_uppercase();
  15. if !fixed.contains("_USDT") {
  16. fixed = format!("{}_USDT", fixed);
  17. }
  18. fixed
  19. }
  20. // 将trades_json转换为指标
  21. pub async fn generate_msv(query_value: Value) -> HttpResponse {
  22. // 参数处理
  23. let exchange = match get_str(query_value.clone(), "exchange") {
  24. Ok(str) => {
  25. str
  26. }
  27. Err(response) => {
  28. return response
  29. }
  30. };
  31. let symbol = match get_str(query_value.clone(), "symbol") {
  32. Ok(symbol) => {
  33. symbol_fix(symbol.as_str())
  34. }
  35. Err(response) => {
  36. return response
  37. }
  38. };
  39. let mills_back = match parse_str_to_decimal(query_value.clone(), "mills_back") {
  40. Ok(t) => {
  41. t
  42. }
  43. Err(response) => {
  44. return response
  45. }
  46. };
  47. let minute_time_range = match parse_str_to_decimal(query_value.clone(), "minute_time_range") {
  48. Ok(t) => {
  49. t.to_i64().unwrap()
  50. }
  51. Err(response) => {
  52. return response
  53. }
  54. };
  55. // 链接数据服务器查询数据
  56. let end_time = Utc::now().timestamp_millis();
  57. let start_time = end_time - minute_time_range * 60 * 1000;
  58. let trades_response = get_trades_json(
  59. exchange.as_str(),
  60. symbol.as_str(),
  61. start_time,
  62. end_time,
  63. ).await;
  64. let simple_start_time = if start_time < end_time - 240 * 60 * 1000 {
  65. end_time - 240 * 60 * 1000
  66. } else {
  67. start_time
  68. };
  69. let simple_depths_response = get_simple_depths_json(
  70. exchange.as_str(),
  71. symbol.as_str(),
  72. simple_start_time,
  73. end_time,
  74. ).await;
  75. // 对数据库返回的数据进行容错处理
  76. if trades_response.code == 200 && simple_depths_response.code == 200 {
  77. // 数据本地化处理
  78. let trades = parse_json_to_trades(trades_response.data);
  79. // let simple_depths = parse_json_to_simple_depths(simple_depths_response.data);
  80. let simple_depths = vec![];
  81. // 指标生成
  82. let indicator = generate_msv_by_trades(trades, mills_back, simple_depths, start_time, end_time);
  83. // 返回数据
  84. let response = Response {
  85. query: query_value.clone(),
  86. msg: Some("指标生成完毕".to_string()),
  87. code: 200,
  88. data: indicator,
  89. };
  90. let json_string = serde_json::to_string(&response).unwrap();
  91. HttpResponse::Ok().content_type("application/json").body(json_string)
  92. } else {
  93. let json_string = serde_json::to_string(&trades_response).unwrap();
  94. HttpResponse::Ok().content_type("application/json").body(json_string)
  95. }
  96. }
  97. // 将trades转换为具体指标
  98. pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simple_depths: Vec<SimpleDepth>, start_time: i64, end_time: i64) -> Value {
  99. // 具体波动
  100. let mut msv_data: Vec<Vec<Decimal>> = vec![];
  101. // 预期利润幅度(except_profit_rate)
  102. let mut epr_data: Vec<Vec<Decimal>> = vec![];
  103. // 波动率sigma
  104. let mut sigma_data: Vec<Vec<Decimal>> = vec![];
  105. const GAMMA: Decimal = dec!(0.5);
  106. // ================== 计算每个点的具体波动率以及回溯幅度 ===================
  107. trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap()));
  108. for (index, trade) in trades.iter().enumerate() {
  109. if index == 0 {
  110. continue
  111. }
  112. // 该元素向前遍历range毫秒
  113. let mut range_index = index;
  114. // 该区间的预定价格
  115. let mut ref_price = trade.price;
  116. let mut dissociation = Decimal::ZERO;
  117. loop {
  118. // 下标合法性判断
  119. if range_index == 0 {
  120. break;
  121. }
  122. let flag_trade = trades.get(range_index).unwrap();
  123. let range_time = trade.time - flag_trade.time;
  124. // 判断该ticker是否是range ms以外
  125. if range_time > mills_back {
  126. break;
  127. }
  128. ref_price = ref_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA);
  129. dissociation = dissociation + flag_trade.size.abs();
  130. range_index -= 1;
  131. }
  132. // 获取到range毫秒以后的预定价格,计算回去的幅度
  133. let mut future_ref_price = ref_price;
  134. let mut future_range_index = index;
  135. loop {
  136. // 下标合法性判断
  137. if future_range_index >= trades.len() {
  138. break;
  139. }
  140. let flag_trade = trades.get(future_range_index).unwrap();
  141. let range_time = flag_trade.time - trade.time;
  142. // 判断该ticker是否是range ms以外
  143. if range_time > mills_back {
  144. break;
  145. }
  146. future_ref_price = future_ref_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA);
  147. future_range_index += 1;
  148. }
  149. // 计算过去至多100条数据的sigma值 sigma^2 = (1 / (tn-t0))*sum((S(tk) - S(tk-1)) ^ 2)
  150. let mut sigma_index = index - 1;
  151. let t_last = trade.time;
  152. let mut t_first = trade.time;
  153. // 右值
  154. let mut total_right = Decimal::ZERO;
  155. loop {
  156. let flag_trade = trades.get(sigma_index).unwrap();
  157. let next_trade = trades.get(sigma_index + 1).unwrap();
  158. // 下标合法性判断
  159. if sigma_index == 0 || sigma_index + 100 <= index {
  160. t_first = flag_trade.time;
  161. break;
  162. }
  163. // 计算差值
  164. let diff = Decimal::ONE - flag_trade.price / next_trade.price;
  165. total_right += diff.abs();
  166. sigma_index = sigma_index - 1;
  167. }
  168. let mut sigma = if sigma_index + 100 > index {
  169. Decimal::ZERO
  170. } else {
  171. if t_first == t_last {
  172. let time_diff = dec!(0.001);
  173. (Decimal::ONE / time_diff) * total_right
  174. } else {
  175. let time_diff = (t_last - t_first) / Decimal::ONE_THOUSAND;
  176. (Decimal::ONE / time_diff) * total_right
  177. }
  178. };
  179. sigma.rescale(6);
  180. sigma_data.push(vec![trade.time, sigma]);
  181. // ==================== 波动逻辑计算 ====================
  182. let last_price = trade.price;
  183. let mut rate = Decimal::ONE_HUNDRED * (last_price - ref_price) / ref_price;
  184. rate.rescale(2);
  185. // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力
  186. if rate.eq(&Decimal::ZERO) {
  187. continue
  188. }
  189. // ==================== 预期利润逻辑计算 ====================
  190. // 首先计算未来一段时间的价格与现在的距离
  191. let mut future_rate = Decimal::ONE_HUNDRED * (future_ref_price - last_price) / last_price;
  192. future_rate.rescale(2);
  193. // 根据具体向上波动还是向下波动来计算预期最大利润
  194. let epr = if rate > Decimal::ZERO {
  195. -future_rate
  196. } else {
  197. future_rate
  198. };
  199. // 去重,以及保留最大的波动率
  200. if msv_data.len() > 0 {
  201. let last = msv_data[msv_data.len() - 1].clone();
  202. let last_time = last[0];
  203. let last_rate = last[1];
  204. // 如果时间相同,则可能会进行remove等操作
  205. if last_time == trade.time {
  206. // 如果最新的波动率大于最后波动率
  207. if rate.abs() > last_rate.abs() {
  208. msv_data.remove(msv_data.len() - 1);
  209. msv_data.push(vec![trade.time, rate, dissociation]);
  210. epr_data.remove(epr_data.len() - 1);
  211. epr_data.push(vec![trade.time, epr]);
  212. }
  213. } else {
  214. msv_data.push(vec![trade.time, rate, dissociation]);
  215. epr_data.push(vec![trade.time, epr]);
  216. }
  217. } else {
  218. msv_data.push(vec![trade.time, rate, dissociation]);
  219. epr_data.push(vec![trade.time, epr]);
  220. }
  221. }
  222. // 按时间序列填充数据
  223. let mut msv_index = 0;
  224. let mut final_msv_data: Vec<Vec<Decimal>> = vec![];
  225. let mut final_epr_data: Vec<Vec<Decimal>> = vec![];
  226. let mut final_sigma_data: Vec<Vec<Decimal>> = vec![];
  227. let mut depth_index = 0;
  228. let mut final_volume_data: Vec<Vec<Decimal>> = vec![];
  229. let mut index_timestamp = Decimal::from_i64(start_time).unwrap();
  230. let last_timestamp = Decimal::from_i64(end_time).unwrap();
  231. let step_timestamp = dec!(1000);
  232. loop {
  233. let mut max_msv_data = Decimal::ZERO;
  234. let mut max_msv_qty_data = Decimal::ZERO;
  235. let mut max_epr_data = Decimal::ZERO;
  236. let mut max_sigma_data = Decimal::ZERO;
  237. // ====================================== 数据生产 ===============================================
  238. // 获取时间范围内的波动率数据
  239. loop {
  240. // 下标合法性判断
  241. if msv_index >= msv_data.len() {
  242. break;
  243. }
  244. // msv_data的指定下标数据不在时间范围内(时间范围:指的是[index_timestamp-mills_back, index_timestamp]这个范围)
  245. if index_timestamp < msv_data[msv_index][0] {
  246. break;
  247. }
  248. // -------------- 大小判断,取值
  249. let msv_d = msv_data[msv_index][1];
  250. let msv_qty_data = msv_data[msv_index][2];
  251. let epr_d = epr_data[msv_index][1];
  252. let sigma_d = sigma_data[msv_index][1];
  253. // msv波动数据
  254. if max_msv_data.abs() < msv_d.abs() {
  255. max_msv_data = msv_d;
  256. max_msv_qty_data = msv_qty_data;
  257. max_epr_data = epr_d;
  258. max_sigma_data = sigma_d;
  259. }
  260. // // 波动率sigma
  261. // if max_sigma_data.abs() < sigma_d {
  262. // }
  263. // 下标步近
  264. msv_index = msv_index + 1;
  265. }
  266. // 获取时间范围内的深度数据、买一及卖一价数据
  267. let mut max_size = Decimal::ZERO;
  268. let mut min_size = Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND;
  269. loop {
  270. // 下标合法性判断
  271. if depth_index >= simple_depths.len() {
  272. break;
  273. }
  274. let depth = &simple_depths[depth_index];
  275. // 时间范围合法性判断,只统计那一秒以内的深度总交易量
  276. if index_timestamp < depth.time {
  277. break;
  278. }
  279. // 这一秒的深度最大值、最小值
  280. max_size = max(max_size, depth.size);
  281. min_size = min(min_size, depth.size);
  282. // 下标步近
  283. depth_index += 1;
  284. }
  285. // ====================================== 智能填充数据 ===============================================
  286. // 流动性数据叠加
  287. // let rst_size = if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && final_depth_data.len() > 0 {
  288. // final_depth_data.last().unwrap()[1]
  289. // } else {
  290. // if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && simple_depths.len() > 0 {
  291. // simple_depths[0].size
  292. // } else {
  293. // if simple_depths.len() > 0 {
  294. // (max_size + min_size) / Decimal::TWO
  295. // } else {
  296. // Decimal::ZERO
  297. // }
  298. // }
  299. // };
  300. // final_depth_data.push(vec![index_timestamp, rst_size]);
  301. //
  302. // // 建议开仓距离
  303. // let mut rst_spread = if rst_size == Decimal::ZERO {
  304. // Decimal::ZERO
  305. // } else {
  306. // dec!(10000) / rst_size
  307. // };
  308. // rst_spread.rescale(6);
  309. // final_spread_data.push(vec![index_timestamp, rst_spread]);
  310. // 波动率数据处理
  311. // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的
  312. if max_msv_data == Decimal::ZERO {
  313. final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]);
  314. final_epr_data.push(vec![index_timestamp, Decimal::ZERO]);
  315. final_volume_data.push(vec![index_timestamp, Decimal::ZERO]);
  316. if final_sigma_data.len() > 0 {
  317. final_sigma_data.push(final_sigma_data[final_sigma_data.len() - 1].clone());
  318. } else {
  319. final_sigma_data.push(vec![index_timestamp, Decimal::ZERO]);
  320. }
  321. // 说明在这个时间范围内是有数据存在的,将各类副图放置完全
  322. } else {
  323. final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_qty_data]);
  324. final_epr_data.push(vec![index_timestamp, max_epr_data]);
  325. let mut final_qty = max_msv_qty_data / Decimal::ONE_THOUSAND;
  326. final_qty.rescale(2);
  327. final_volume_data.push(vec![index_timestamp, final_qty]);
  328. final_sigma_data.push(vec![index_timestamp, max_sigma_data]);
  329. }
  330. // ====================================== 时间步进处理 ======================================
  331. // 对时间进行步近
  332. index_timestamp = index_timestamp + step_timestamp;
  333. // 时间越界
  334. if index_timestamp > last_timestamp {
  335. break
  336. }
  337. }
  338. // 结果统计
  339. let total_size = trades.len();
  340. let result_size = final_msv_data.len();
  341. json!({
  342. "msv": final_msv_data,
  343. "liqs": final_volume_data,
  344. "eprs": final_epr_data,
  345. "sigmas": final_sigma_data,
  346. "total_size": total_size,
  347. "result_size": result_size,
  348. })
  349. }
  350. // 将json转换为trades
  351. pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
  352. let mut rst = vec![];
  353. for trade_json in trades_json.as_array().unwrap() {
  354. let arr = trade_json.as_array().unwrap();
  355. rst.push(Trade {
  356. id: arr[0].as_str().unwrap().to_string(),
  357. time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
  358. size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
  359. price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
  360. });
  361. }
  362. rst
  363. }
  364. // // 将json转换为简易深度数据
  365. // pub fn parse_json_to_simple_depths(depths_json: Value) -> Vec<SimpleDepth> {
  366. // let mut rst = vec![];
  367. //
  368. // for depth_json in depths_json.as_array().unwrap() {
  369. // let depth: SimpleDepth = SimpleDepth {
  370. // time: Decimal::from_str(depth_json["time"].as_str().unwrap()).unwrap(),
  371. // size: Decimal::from_str(depth_json["size"].as_str().unwrap()).unwrap(),
  372. // a1: Decimal::from_str(depth_json["a1"].as_str().unwrap()).unwrap(),
  373. // b1: Decimal::from_str(depth_json["b1"].as_str().unwrap()).unwrap(),
  374. // };
  375. //
  376. // rst.insert(0, depth)
  377. // }
  378. //
  379. // rst
  380. // }