use std::cmp::{max, min}; use std::str::FromStr; use actix_web::{HttpResponse}; use chrono::Utc; use rust_decimal::{Decimal}; use rust_decimal::prelude::{FromPrimitive, ToPrimitive}; use rust_decimal_macros::dec; use serde_json::{json, Value}; use crate::db_connector::{get_simple_depths_json, get_trades_json}; use crate::params_utils::{get_str, parse_str_to_decimal}; use crate::server::{Response, SimpleDepth, Trade}; pub fn symbol_fix(symbol: &str) -> String { let mut fixed = symbol.to_uppercase(); if !fixed.contains("_USDT") { fixed = format!("{}_USDT", fixed); } fixed } // 将trades_json转换为指标 pub async fn generate_msv(query_value: Value) -> HttpResponse { // 参数处理 let exchange = match get_str(query_value.clone(), "exchange") { Ok(str) => { str } Err(response) => { return response } }; let symbol = match get_str(query_value.clone(), "symbol") { Ok(symbol) => { symbol_fix(symbol.as_str()) } Err(response) => { return response } }; let mills_back = match parse_str_to_decimal(query_value.clone(), "mills_back") { Ok(t) => { t } Err(response) => { return response } }; let minute_time_range = match parse_str_to_decimal(query_value.clone(), "minute_time_range") { Ok(t) => { t.to_i64().unwrap() } Err(response) => { return response } }; // 链接数据服务器查询数据 let end_time = Utc::now().timestamp_millis(); let start_time = end_time - minute_time_range * 60 * 1000; let trades_response = get_trades_json( exchange.as_str(), symbol.as_str(), start_time, end_time, ).await; let simple_start_time = if start_time < end_time - 240 * 60 * 1000 { end_time - 240 * 60 * 1000 } else { start_time }; let simple_depths_response = get_simple_depths_json( exchange.as_str(), symbol.as_str(), simple_start_time, end_time, ).await; // 对数据库返回的数据进行容错处理 if trades_response.code == 200 && simple_depths_response.code == 200 { // 数据本地化处理 let trades = parse_json_to_trades(trades_response.data); // let simple_depths = parse_json_to_simple_depths(simple_depths_response.data); let simple_depths = vec![]; // 指标生成 let indicator = generate_msv_by_trades(trades, mills_back, simple_depths, start_time, end_time); // 返回数据 let response = Response { query: query_value.clone(), msg: Some("指标生成完毕".to_string()), code: 200, data: indicator, }; let json_string = serde_json::to_string(&response).unwrap(); HttpResponse::Ok().content_type("application/json").body(json_string) } else { let json_string = serde_json::to_string(&trades_response).unwrap(); HttpResponse::Ok().content_type("application/json").body(json_string) } } // 将trades转换为具体指标 pub fn generate_msv_by_trades(mut trades: Vec, mills_back: Decimal, simple_depths: Vec, start_time: i64, end_time: i64) -> Value { // 具体波动 let mut msv_data: Vec> = vec![]; // 预期利润幅度(except_profit_rate) let mut epr_data: Vec> = vec![]; // 公平价格相关 let mut fair_data: Vec> = vec![]; let mut fair_price_simulation_ema = Decimal::ZERO; const GAMMA: Decimal = dec!(0.5); // 多少分钟 let a = (end_time - start_time) / 1000; // 多少笔 let b = trades.len().to_i64().unwrap(); // 一分钟多少笔 let c = if a == 0 { b } else { b / a }; let gamma_fair: Decimal = Decimal::ONE / Decimal::from_i64(c).unwrap(); // ================== 计算每个点的具体波动率以及回溯幅度 =================== trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap())); for (index, trade) in trades.iter().enumerate() { if index == 0 { continue } // 该元素向前遍历range毫秒 let mut range_index = index; // 该区间的公平价格 let mut fair_price = trade.price; let mut dissociation = Decimal::ZERO; loop { // 下标合法性判断 if range_index == 0 { break; } let flag_trade = trades.get(range_index).unwrap(); let range_time = trade.time - flag_trade.time; // 判断该ticker是否是range ms以外 if range_time > mills_back { break; } fair_price = fair_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA); dissociation = dissociation + flag_trade.size.abs(); range_index -= 1; } // 获取到range毫秒以后的公平价格,计算回去的幅度 let mut future_ref_price_sum = Decimal::ZERO; let mut future_ref_count = Decimal::ZERO; let mut future_range_index = index + 1; loop { // 下标合法性判断 if future_range_index >= trades.len() { break; } let flag_trade = trades.get(future_range_index).unwrap(); let range_time = flag_trade.time - trade.time; // 判断该ticker是否是range ms以外 if range_time > mills_back && future_ref_count > Decimal::ZERO { break; } future_range_index += 1; future_ref_price_sum += flag_trade.price; future_ref_count += Decimal::ONE; } let future_ref_price = if future_ref_count < Decimal::ONE { trade.price } else { future_ref_price_sum / future_ref_count }; // 计算过去至多100条数据的sigma值 sigma^2 = (1 / (tn-t0))*sum((S(tk) - S(tk-1)) ^ 2) let mut sigma_index = index - 1; // let t_last = trade.time; let mut _t_first = trade.time; // 右值 let mut total_right = Decimal::ZERO; loop { let flag_trade = trades.get(sigma_index).unwrap(); let next_trade = trades.get(sigma_index + 1).unwrap(); // 下标合法性判断 if sigma_index == 0 || sigma_index + 100 <= index { _t_first = flag_trade.time; break; } // 计算差值 let diff = Decimal::ONE - flag_trade.price / next_trade.price; total_right += diff * diff; sigma_index = sigma_index - 1; } // 计算公平价格仿真值的ema fair_price_simulation_ema = if fair_price_simulation_ema.is_zero() { fair_price } else { fair_price_simulation_ema * (Decimal::ONE - gamma_fair) + fair_price * gamma_fair }; // ==================== 波动逻辑计算 ==================== let last_price = trade.price; let mut rate = Decimal::ONE_HUNDRED * (last_price - fair_price) / fair_price; rate.rescale(2); // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力 if rate.eq(&Decimal::ZERO) { continue } // ==================== 预期利润逻辑计算 ==================== // 首先计算未来一段时间的价格与现在的距离 let mut future_rate = Decimal::ONE_HUNDRED * (future_ref_price - last_price) / last_price; future_rate.rescale(2); // 根据具体向上波动还是向下波动来计算预期最大利润 let mut epr = if rate > Decimal::ZERO { -future_rate } else { future_rate }; epr = min(epr, rate.abs()); // 去重,以及保留最大的波动率 if msv_data.len() > 0 { let last = msv_data[msv_data.len() - 1].clone(); let last_time = last[0]; let last_rate = last[1]; // 如果时间相同,则可能会进行remove等操作 if last_time == trade.time { // 如果最新的波动率大于最后波动率 if rate.abs() > last_rate.abs() { msv_data.remove(msv_data.len() - 1); msv_data.push(vec![trade.time, rate, dissociation]); epr_data.remove(epr_data.len() - 1); epr_data.push(vec![trade.time, epr]); fair_data.remove(fair_data.len() - 1); fair_data.push(vec![trade.time, fair_price, fair_price_simulation_ema]); } } else { msv_data.push(vec![trade.time, rate, dissociation]); epr_data.push(vec![trade.time, epr]); fair_data.push(vec![trade.time, fair_price, fair_price_simulation_ema]); } } else { msv_data.push(vec![trade.time, rate, dissociation]); epr_data.push(vec![trade.time, epr]); fair_data.push(vec![trade.time, fair_price, fair_price_simulation_ema]); } } // 按时间序列填充数据 let mut msv_index = 0; let mut final_msv_data: Vec> = vec![]; let mut final_epr_data: Vec> = vec![]; let mut final_sigma_data: Vec> = vec![]; let mut final_sigma_ma_data: Vec> = vec![]; let mut depth_index = 0; let mut final_volume_data: Vec> = vec![]; let mut index_timestamp = Decimal::from_i64(start_time).unwrap(); let last_timestamp = Decimal::from_i64(end_time).unwrap(); let step_timestamp = dec!(1000); loop { let mut max_msv_data = Decimal::ZERO; let mut max_msv_qty_data = Decimal::ZERO; let mut max_epr_data = Decimal::ZERO; let mut max_fair_d = Decimal::ZERO; let mut max_fair_ema_d = Decimal::ZERO; // ====================================== 数据生产 =============================================== // 获取时间范围内的波动率数据 loop { // 下标合法性判断 if msv_index >= msv_data.len() { break; } // msv_data的指定下标数据不在时间范围内(时间范围:指的是[index_timestamp-mills_back, index_timestamp]这个范围) if index_timestamp < msv_data[msv_index][0] { break; } // -------------- 大小判断,取值 let msv_d = msv_data[msv_index][1]; let msv_qty_data = msv_data[msv_index][2]; let epr_d = epr_data[msv_index][1]; let fair_d = fair_data[msv_index][1]; let fair_ema_d = fair_data[msv_index][2]; // msv波动数据 if max_msv_data.abs() < msv_d.abs() { max_msv_data = msv_d; max_msv_qty_data = msv_qty_data; max_epr_data = epr_d; max_fair_d = fair_d; max_fair_ema_d = fair_ema_d; } // fair_price // 下标步近 msv_index = msv_index + 1; } if max_fair_d.is_zero() || max_fair_ema_d.is_zero() { max_fair_d = fair_data[msv_index - 1][1]; max_fair_ema_d = fair_data[msv_index - 1][2]; } // 获取时间范围内的深度数据、买一及卖一价数据 let mut max_size = Decimal::ZERO; let mut min_size = Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND; loop { // 下标合法性判断 if depth_index >= simple_depths.len() { break; } let depth = &simple_depths[depth_index]; // 时间范围合法性判断,只统计那一秒以内的深度总交易量 if index_timestamp < depth.time { break; } // 这一秒的深度最大值、最小值 max_size = max(max_size, depth.size); min_size = min(min_size, depth.size); // 下标步近 depth_index += 1; } // ====================================== 智能填充数据 =============================================== // 流动性数据叠加 // let rst_size = if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && final_depth_data.len() > 0 { // final_depth_data.last().unwrap()[1] // } else { // if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && simple_depths.len() > 0 { // simple_depths[0].size // } else { // if simple_depths.len() > 0 { // (max_size + min_size) / Decimal::TWO // } else { // Decimal::ZERO // } // } // }; // final_depth_data.push(vec![index_timestamp, rst_size]); // // // 建议开仓距离 // let mut rst_spread = if rst_size == Decimal::ZERO { // Decimal::ZERO // } else { // dec!(10000) / rst_size // }; // rst_spread.rescale(6); // final_spread_data.push(vec![index_timestamp, rst_spread]); // 波动率数据处理 // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的 if max_msv_data == Decimal::ZERO { final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]); final_epr_data.push(vec![index_timestamp, Decimal::ZERO]); final_volume_data.push(vec![index_timestamp, Decimal::ZERO]); // 说明在这个时间范围内是有数据存在的,将各类副图放置完全 } else { final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_qty_data]); final_epr_data.push(vec![index_timestamp, max_epr_data]); let mut final_qty = max_msv_qty_data / Decimal::ONE_THOUSAND; final_qty.rescale(2); final_volume_data.push(vec![index_timestamp, final_qty]); } final_sigma_data.push(vec![index_timestamp, max_fair_d]); final_sigma_ma_data.push(vec![index_timestamp, max_fair_ema_d]); // ====================================== 时间步进处理 ====================================== // 对时间进行步近 index_timestamp = index_timestamp + step_timestamp; // 时间越界 if index_timestamp > last_timestamp { break } } // 结果统计 let total_size = trades.len(); let result_size = final_msv_data.len(); json!({ "msv": final_msv_data, "liqs": final_volume_data, "eprs": final_epr_data, "sigmas": final_sigma_data, "sigma_mas": final_sigma_ma_data, "total_size": total_size, "result_size": result_size, }) } // 将json转换为trades pub fn parse_json_to_trades(trades_json: Value) -> Vec { let mut rst = vec![]; for trade_json in trades_json.as_array().unwrap() { let arr = trade_json.as_array().unwrap(); rst.push(Trade { id: arr[0].as_str().unwrap().to_string(), time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(), size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(), price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(), }); } rst } // // 将json转换为简易深度数据 // pub fn parse_json_to_simple_depths(depths_json: Value) -> Vec { // let mut rst = vec![]; // // for depth_json in depths_json.as_array().unwrap() { // let depth: SimpleDepth = SimpleDepth { // time: Decimal::from_str(depth_json["time"].as_str().unwrap()).unwrap(), // size: Decimal::from_str(depth_json["size"].as_str().unwrap()).unwrap(), // a1: Decimal::from_str(depth_json["a1"].as_str().unwrap()).unwrap(), // b1: Decimal::from_str(depth_json["b1"].as_str().unwrap()).unwrap(), // }; // // rst.insert(0, depth) // } // // rst // }