|
|
@@ -1,6 +1,8 @@
|
|
|
use std::collections::{BTreeMap, HashMap};
|
|
|
use std::sync::{Arc};
|
|
|
use std::sync::atomic::{AtomicBool};
|
|
|
+use std::time::Duration;
|
|
|
+use chrono::Utc;
|
|
|
use lazy_static::lazy_static;
|
|
|
use rust_decimal::Decimal;
|
|
|
use tokio::sync::{Mutex};
|
|
|
@@ -11,8 +13,9 @@ use exchanges::response_base::ResponseData;
|
|
|
use serde_json::Value;
|
|
|
use standard::exchange::ExchangeEnum;
|
|
|
use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
+use crate::json_db_utils::{collect_records, delete_db_by_exchange};
|
|
|
use crate::listener_tools::{update_record, RecordMap};
|
|
|
-use crate::trend14400::Indicators;
|
|
|
+use crate::trend14400::{generate_14400trend_by_records, Indicators};
|
|
|
|
|
|
const EXCHANGE_NAME: &str = "bybit_usdt_swap";
|
|
|
|
|
|
@@ -60,28 +63,29 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
});
|
|
|
}
|
|
|
- // // 每分钟计算msv
|
|
|
- // tokio::spawn(async move {
|
|
|
- // loop {
|
|
|
- // let end_timestamp = Utc::now().timestamp_millis();
|
|
|
- // let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
|
|
|
- // for symbol in symbols.clone() {
|
|
|
- // let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
|
|
|
- // let trades = parse_json_to_trades(trades_value);
|
|
|
- // let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
|
|
|
- // let mut indicator_map = INDICATOR_MAP.lock().await;
|
|
|
- // indicator_map.insert(symbol, msv);
|
|
|
- // }
|
|
|
- // tokio::time::sleep(Duration::from_secs(65)).await;
|
|
|
- // }
|
|
|
- // });
|
|
|
- // // 定时删除数据
|
|
|
- // tokio::spawn(async move {
|
|
|
- // loop {
|
|
|
- // tokio::time::sleep(Duration::from_secs(1800)).await;
|
|
|
- // delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
|
|
|
- // }
|
|
|
- // });
|
|
|
+ // 每分钟计算trend
|
|
|
+ tokio::spawn(async move {
|
|
|
+ loop {
|
|
|
+ let end_timestamp = Utc::now().timestamp_millis();
|
|
|
+ let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
|
|
|
+ for symbol in symbols.clone() {
|
|
|
+ let mut records = collect_records(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
|
|
|
+ records.sort_by(|a, b| a.time.cmp(&b.time));
|
|
|
+
|
|
|
+ let indicators = generate_14400trend_by_records(&records);
|
|
|
+ let mut indicator_map = INDICATOR_MAP.lock().await;
|
|
|
+ indicator_map.insert(symbol, indicators);
|
|
|
+ }
|
|
|
+ tokio::time::sleep(Duration::from_secs(65)).await;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // 定时删除数据
|
|
|
+ tokio::spawn(async move {
|
|
|
+ loop {
|
|
|
+ tokio::time::sleep(Duration::from_secs(1800)).await;
|
|
|
+ delete_db_by_exchange(EXCHANGE_NAME, "record", 5 * 60).await;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
// 读取数据
|
|
|
@@ -98,8 +102,6 @@ pub async fn data_listener(response: ResponseData) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- info!(?records);
|
|
|
-
|
|
|
let record_map= RECORD_MAP.lock().await;
|
|
|
update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
|
|
|
},
|