| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- use std::cmp::{max, min};
- use std::str::FromStr;
- use actix_web::{HttpResponse};
- use chrono::Utc;
- use rust_decimal::{Decimal};
- use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
- use rust_decimal_macros::dec;
- use serde_json::{json, Value};
- use crate::db_connector::{get_simple_depths_json, get_trades_json};
- use crate::params_utils::{get_str, parse_str_to_decimal};
- use crate::server::{Response, SimpleDepth, Trade};
- pub fn symbol_fix(symbol: &str) -> String {
- let mut fixed = symbol.to_uppercase();
- if !fixed.contains("_USDT") {
- fixed = format!("{}_USDT", fixed);
- }
- fixed
- }
- // 将trades_json转换为指标
- pub async fn generate_msv(query_value: Value) -> HttpResponse {
- // 参数处理
- let exchange = match get_str(query_value.clone(), "exchange") {
- Ok(str) => {
- str
- }
- Err(response) => {
- return response
- }
- };
- let symbol = match get_str(query_value.clone(), "symbol") {
- Ok(symbol) => {
- symbol_fix(symbol.as_str())
- }
- Err(response) => {
- return response
- }
- };
- let mills_back = match parse_str_to_decimal(query_value.clone(), "mills_back") {
- Ok(t) => {
- t
- }
- Err(response) => {
- return response
- }
- };
- let minute_time_range = match parse_str_to_decimal(query_value.clone(), "minute_time_range") {
- Ok(t) => {
- t.to_i64().unwrap()
- }
- Err(response) => {
- return response
- }
- };
- // 链接数据服务器查询数据
- let end_time = Utc::now().timestamp_millis();
- let start_time = end_time - minute_time_range * 60 * 1000;
- let trades_response = get_trades_json(
- exchange.as_str(),
- symbol.as_str(),
- start_time,
- end_time,
- ).await;
- let simple_start_time = if start_time < end_time - 240 * 60 * 1000 {
- end_time - 240 * 60 * 1000
- } else {
- start_time
- };
- let simple_depths_response = get_simple_depths_json(
- exchange.as_str(),
- symbol.as_str(),
- simple_start_time,
- end_time,
- ).await;
- // 对数据库返回的数据进行容错处理
- if trades_response.code == 200 && simple_depths_response.code == 200 {
- // 数据本地化处理
- let trades = parse_json_to_trades(trades_response.data);
- // let simple_depths = parse_json_to_simple_depths(simple_depths_response.data);
- let simple_depths = vec![];
- // 指标生成
- let indicator = generate_msv_by_trades(trades, mills_back, simple_depths, start_time, end_time);
- // 返回数据
- let response = Response {
- query: query_value.clone(),
- msg: Some("指标生成完毕".to_string()),
- code: 200,
- data: indicator,
- };
- let json_string = serde_json::to_string(&response).unwrap();
- HttpResponse::Ok().content_type("application/json").body(json_string)
- } else {
- let json_string = serde_json::to_string(&trades_response).unwrap();
- HttpResponse::Ok().content_type("application/json").body(json_string)
- }
- }
- // 将trades转换为具体指标
- pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simple_depths: Vec<SimpleDepth>, start_time: i64, end_time: i64) -> Value {
- // 具体波动
- let mut msv_data: Vec<Vec<Decimal>> = vec![];
- // 预期利润幅度(except_profit_rate)
- let mut epr_data: Vec<Vec<Decimal>> = vec![];
- // 公平价格相关
- let mut fair_data: Vec<Vec<Decimal>> = vec![];
- let mut fair_price_simulation_ema = Decimal::ZERO;
- const GAMMA: Decimal = dec!(0.5);
- // 多少分钟
- let a = (end_time - start_time) / 1000;
- // 多少笔
- let b = trades.len().to_i64().unwrap();
- // 一分钟多少笔
- let c = if a == 0 {
- b
- } else {
- b / a
- };
- let gamma_fair: Decimal = Decimal::ONE / Decimal::from_i64(c).unwrap();
- // ================== 计算每个点的具体波动率以及回溯幅度 ===================
- trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap()));
- for (index, trade) in trades.iter().enumerate() {
- if index == 0 {
- continue
- }
- // 该元素向前遍历range毫秒
- let mut range_index = index;
- // 该区间的公平价格
- let mut fair_price = trade.price;
- let mut dissociation = Decimal::ZERO;
- loop {
- // 下标合法性判断
- if range_index == 0 {
- break;
- }
- let flag_trade = trades.get(range_index).unwrap();
- let range_time = trade.time - flag_trade.time;
- // 判断该ticker是否是range ms以外
- if range_time > mills_back {
- break;
- }
- fair_price = fair_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA);
- dissociation = dissociation + flag_trade.size.abs();
- range_index -= 1;
- }
- // 获取到range毫秒以后的公平价格,计算回去的幅度
- let mut future_ref_price_sum = Decimal::ZERO;
- let mut future_ref_count = Decimal::ZERO;
- let mut future_range_index = index + 1;
- loop {
- // 下标合法性判断
- if future_range_index >= trades.len() {
- break;
- }
- let flag_trade = trades.get(future_range_index).unwrap();
- let range_time = flag_trade.time - trade.time;
- // 判断该ticker是否是range ms以外
- if range_time > mills_back && future_ref_count > Decimal::ZERO {
- break;
- }
- future_range_index += 1;
- future_ref_price_sum += flag_trade.price;
- future_ref_count += Decimal::ONE;
- }
- let future_ref_price = if future_ref_count < Decimal::ONE {
- trade.price
- } else {
- future_ref_price_sum / future_ref_count
- };
- // 计算过去至多100条数据的sigma值 sigma^2 = (1 / (tn-t0))*sum((S(tk) - S(tk-1)) ^ 2)
- let mut sigma_index = index - 1;
- // let t_last = trade.time;
- let mut _t_first = trade.time;
- // 右值
- let mut total_right = Decimal::ZERO;
- loop {
- let flag_trade = trades.get(sigma_index).unwrap();
- let next_trade = trades.get(sigma_index + 1).unwrap();
- // 下标合法性判断
- if sigma_index == 0 || sigma_index + 100 <= index {
- _t_first = flag_trade.time;
- break;
- }
- // 计算差值
- let diff = Decimal::ONE - flag_trade.price / next_trade.price;
- total_right += diff * diff;
- sigma_index = sigma_index - 1;
- }
- // 计算公平价格仿真值的ema
- fair_price_simulation_ema = if fair_price_simulation_ema.is_zero() {
- fair_price
- } else {
- fair_price_simulation_ema * (Decimal::ONE - gamma_fair) + fair_price * gamma_fair
- };
- // ==================== 波动逻辑计算 ====================
- let last_price = trade.price;
- let mut rate = Decimal::ONE_HUNDRED * (last_price - fair_price) / fair_price;
- rate.rescale(2);
- // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力
- if rate.eq(&Decimal::ZERO) {
- continue
- }
- // ==================== 预期利润逻辑计算 ====================
- // 首先计算未来一段时间的价格与现在的距离
- let mut future_rate = Decimal::ONE_HUNDRED * (future_ref_price - last_price) / last_price;
- future_rate.rescale(2);
- // 根据具体向上波动还是向下波动来计算预期最大利润
- let mut epr = if rate > Decimal::ZERO {
- -future_rate
- } else {
- future_rate
- };
- epr = min(epr, rate.abs());
- // 去重,以及保留最大的波动率
- if msv_data.len() > 0 {
- let last = msv_data[msv_data.len() - 1].clone();
- let last_time = last[0];
- let last_rate = last[1];
- // 如果时间相同,则可能会进行remove等操作
- if last_time == trade.time {
- // 如果最新的波动率大于最后波动率
- if rate.abs() > last_rate.abs() {
- msv_data.remove(msv_data.len() - 1);
- msv_data.push(vec![trade.time, rate, dissociation]);
- epr_data.remove(epr_data.len() - 1);
- epr_data.push(vec![trade.time, epr]);
- fair_data.remove(fair_data.len() - 1);
- fair_data.push(vec![trade.time, fair_price, fair_price_simulation_ema]);
- }
- } else {
- msv_data.push(vec![trade.time, rate, dissociation]);
- epr_data.push(vec![trade.time, epr]);
- fair_data.push(vec![trade.time, fair_price, fair_price_simulation_ema]);
- }
- } else {
- msv_data.push(vec![trade.time, rate, dissociation]);
- epr_data.push(vec![trade.time, epr]);
- fair_data.push(vec![trade.time, fair_price, fair_price_simulation_ema]);
- }
- }
- // 按时间序列填充数据
- let mut msv_index = 0;
- let mut final_msv_data: Vec<Vec<Decimal>> = vec![];
- let mut final_epr_data: Vec<Vec<Decimal>> = vec![];
- let mut final_sigma_data: Vec<Vec<Decimal>> = vec![];
- let mut final_sigma_ma_data: Vec<Vec<Decimal>> = vec![];
- let mut depth_index = 0;
- let mut final_volume_data: Vec<Vec<Decimal>> = vec![];
- let mut index_timestamp = Decimal::from_i64(start_time).unwrap();
- let last_timestamp = Decimal::from_i64(end_time).unwrap();
- let step_timestamp = dec!(1000);
- loop {
- let mut max_msv_data = Decimal::ZERO;
- let mut max_msv_qty_data = Decimal::ZERO;
- let mut max_epr_data = Decimal::ZERO;
- let mut max_fair_d = Decimal::ZERO;
- let mut max_fair_ema_d = Decimal::ZERO;
- // ====================================== 数据生产 ===============================================
- // 获取时间范围内的波动率数据
- loop {
- // 下标合法性判断
- if msv_index >= msv_data.len() {
- break;
- }
- // msv_data的指定下标数据不在时间范围内(时间范围:指的是[index_timestamp-mills_back, index_timestamp]这个范围)
- if index_timestamp < msv_data[msv_index][0] {
- break;
- }
- // -------------- 大小判断,取值
- let msv_d = msv_data[msv_index][1];
- let msv_qty_data = msv_data[msv_index][2];
- let epr_d = epr_data[msv_index][1];
- let fair_d = fair_data[msv_index][1];
- let fair_ema_d = fair_data[msv_index][2];
- // msv波动数据
- if max_msv_data.abs() < msv_d.abs() {
- max_msv_data = msv_d;
- max_msv_qty_data = msv_qty_data;
- max_epr_data = epr_d;
- max_fair_d = fair_d;
- max_fair_ema_d = fair_ema_d;
- }
- // fair_price
- // 下标步近
- msv_index = msv_index + 1;
- }
- if max_fair_d.is_zero() || max_fair_ema_d.is_zero() {
- max_fair_d = fair_data[msv_index - 1][1];
- max_fair_ema_d = fair_data[msv_index - 1][2];
- }
- // 获取时间范围内的深度数据、买一及卖一价数据
- let mut max_size = Decimal::ZERO;
- let mut min_size = Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND;
- loop {
- // 下标合法性判断
- if depth_index >= simple_depths.len() {
- break;
- }
- let depth = &simple_depths[depth_index];
- // 时间范围合法性判断,只统计那一秒以内的深度总交易量
- if index_timestamp < depth.time {
- break;
- }
- // 这一秒的深度最大值、最小值
- max_size = max(max_size, depth.size);
- min_size = min(min_size, depth.size);
- // 下标步近
- depth_index += 1;
- }
- // ====================================== 智能填充数据 ===============================================
- // 流动性数据叠加
- // let rst_size = if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && final_depth_data.len() > 0 {
- // final_depth_data.last().unwrap()[1]
- // } else {
- // if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && simple_depths.len() > 0 {
- // simple_depths[0].size
- // } else {
- // if simple_depths.len() > 0 {
- // (max_size + min_size) / Decimal::TWO
- // } else {
- // Decimal::ZERO
- // }
- // }
- // };
- // final_depth_data.push(vec![index_timestamp, rst_size]);
- //
- // // 建议开仓距离
- // let mut rst_spread = if rst_size == Decimal::ZERO {
- // Decimal::ZERO
- // } else {
- // dec!(10000) / rst_size
- // };
- // rst_spread.rescale(6);
- // final_spread_data.push(vec![index_timestamp, rst_spread]);
- // 波动率数据处理
- // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的
- if max_msv_data == Decimal::ZERO {
- final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]);
- final_epr_data.push(vec![index_timestamp, Decimal::ZERO]);
- final_volume_data.push(vec![index_timestamp, Decimal::ZERO]);
- // 说明在这个时间范围内是有数据存在的,将各类副图放置完全
- } else {
- final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_qty_data]);
- final_epr_data.push(vec![index_timestamp, max_epr_data]);
- let mut final_qty = max_msv_qty_data / Decimal::ONE_THOUSAND;
- final_qty.rescale(2);
- final_volume_data.push(vec![index_timestamp, final_qty]);
- }
- final_sigma_data.push(vec![index_timestamp, max_fair_d]);
- final_sigma_ma_data.push(vec![index_timestamp, max_fair_ema_d]);
- // ====================================== 时间步进处理 ======================================
- // 对时间进行步近
- index_timestamp = index_timestamp + step_timestamp;
- // 时间越界
- if index_timestamp > last_timestamp {
- break
- }
- }
- // 结果统计
- let total_size = trades.len();
- let result_size = final_msv_data.len();
- json!({
- "msv": final_msv_data,
- "liqs": final_volume_data,
- "eprs": final_epr_data,
- "sigmas": final_sigma_data,
- "sigma_mas": final_sigma_ma_data,
- "total_size": total_size,
- "result_size": result_size,
- })
- }
- // 将json转换为trades
- pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
- let mut rst = vec![];
- for trade_json in trades_json.as_array().unwrap() {
- let arr = trade_json.as_array().unwrap();
- rst.push(Trade {
- id: arr[0].as_str().unwrap().to_string(),
- time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
- size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
- price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
- });
- }
- rst
- }
- // // 将json转换为简易深度数据
- // pub fn parse_json_to_simple_depths(depths_json: Value) -> Vec<SimpleDepth> {
- // let mut rst = vec![];
- //
- // for depth_json in depths_json.as_array().unwrap() {
- // let depth: SimpleDepth = SimpleDepth {
- // time: Decimal::from_str(depth_json["time"].as_str().unwrap()).unwrap(),
- // size: Decimal::from_str(depth_json["size"].as_str().unwrap()).unwrap(),
- // a1: Decimal::from_str(depth_json["a1"].as_str().unwrap()).unwrap(),
- // b1: Decimal::from_str(depth_json["b1"].as_str().unwrap()).unwrap(),
- // };
- //
- // rst.insert(0, depth)
- // }
- //
- // rst
- // }
|