Przeglądaj źródła

trend14400第一版,代码整理的差不多了,下午开始写逻辑。

skyffire 7 miesięcy temu
rodzic
commit
e13dbc57f9

+ 0 - 110
src/binance_usdt_swap_data_listener.rs

@@ -1,110 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use chrono::Utc;
-use lazy_static::lazy_static;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::binance_swap_rest::BinanceSwapRest;
-use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
-use exchanges::response_base::ResponseData;
-use rust_decimal_macros::dec;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
-
-const EXCHANGE_NAME: &str = "binance_usdt_swap";
-
-lazy_static! {
-    // 给其他模块使用
-    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-    // static ref LOCAL_DEPTH: Mutex<HashMap<String, Depth>> = Mutex::new(HashMap::new());             // 本地缓存的订单簿
-
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "binance_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut binance_rest = BinanceSwapRest::new(false, login);
-    let response = binance_rest.get_exchange_info().await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let data = response.data["symbols"].as_array().unwrap();
-        for info in data {
-            let s = info["symbol"].as_str().unwrap().to_string();
-            if !s.ends_with("USDT") {
-                continue;
-            }
-            let symbol = s.replace("USDT", "_USDT");
-            symbols.push(symbol)
-        }
-    }
-
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
-            ws.set_subscribe(vec![
-                BinanceSwapSubscribeType::PuAggTrade,
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(55)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-    match response.channel.as_str() {
-        // 订单流数据
-        "aggTrade" => {
-            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, &response);
-
-            for trade in trades.iter() {
-                let trades_map = TRADES_MAP.lock().await;
-
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}

+ 0 - 117
src/bitget_usdt_swap_data_listener.rs

@@ -1,117 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::{AtomicBool};
-use std::time::Duration;
-use chrono::Utc;
-use lazy_static::lazy_static;
-use rust_decimal::Decimal;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::bitget_swap_rest::BitgetSwapRest;
-use exchanges::bitget_swap_ws::{BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
-use exchanges::response_base::ResponseData;
-use rust_decimal_macros::dec;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
-
-const EXCHANGE_NAME: &str = "bitget_usdt_swap";
-
-lazy_static! {
-    // 给其他模块使用
-    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "bitget_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut bitget_rest = BitgetSwapRest::new(false, login);
-    let response = bitget_rest.get_all_contracts().await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data.as_array().unwrap();
-        let mut mul_map = MUL_MAP.lock().await;
-        for symbol_info in symbol_infos {
-            let symbol = symbol_info["symbol"].as_str().unwrap().replace("USDT", "_USDT");
-
-            mul_map.insert(symbol.clone(), Decimal::ONE);
-            symbols.push(symbol);
-        }
-    }
-
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = BitgetSwapWs::new_with_tag(ws_name, false, None, BitgetSwapWsType::Public);
-            ws.set_subscribe(vec![
-                BitgetSwapSubscribeType::PuTrade,
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(70)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-    match response.channel.as_str() {
-        // 订单流数据
-        "trade" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BitgetSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为gate的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}

+ 37 - 45
src/bybit_usdt_swap_data_listener.rs

@@ -1,8 +1,6 @@
 use std::collections::{BTreeMap, HashMap};
 use std::sync::{Arc};
 use std::sync::atomic::{AtomicBool};
-use std::time::Duration;
-use chrono::Utc;
 use lazy_static::lazy_static;
 use rust_decimal::Decimal;
 use tokio::sync::{Mutex};
@@ -10,13 +8,11 @@ use tracing::info;
 use exchanges::bybit_swap_rest::BybitSwapRest;
 use exchanges::bybit_swap_ws::{BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
 use exchanges::response_base::ResponseData;
-use rust_decimal_macros::dec;
 use serde_json::Value;
 use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+use crate::listener_tools::{update_record, RecordMap};
+use crate::trend14400::Indicators;
 
 const EXCHANGE_NAME: &str = "bybit_usdt_swap";
 
@@ -24,7 +20,7 @@ lazy_static! {
     // 给其他模块使用
     pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
 
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
     static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
 }
 
@@ -56,7 +52,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
         tokio::spawn(async move {
             let mut ws = BybitSwapWs::new_with_tag(ws_name, false, None, BybitSwapWsType::Public);
             ws.set_subscribe(vec![
-                BybitSwapSubscribeType::PuBlicTrade,
+                BybitSwapSubscribeType::PuKline("1".to_string()),
             ]);
 
             // 建立链接
@@ -64,28 +60,28 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(65)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-        }
-    });
+    // // 每分钟计算msv
+    // tokio::spawn(async move {
+    //     loop {
+    //         let end_timestamp = Utc::now().timestamp_millis();
+    //         let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+    //         for symbol in symbols.clone() {
+    //             let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
+    //             let trades = parse_json_to_trades(trades_value);
+    //             let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
+    //             let mut indicator_map = INDICATOR_MAP.lock().await;
+    //             indicator_map.insert(symbol, msv);
+    //         }
+    //         tokio::time::sleep(Duration::from_secs(65)).await;
+    //     }
+    // });
+    // // 定时删除数据
+    // tokio::spawn(async move {
+    //     loop {
+    //         tokio::time::sleep(Duration::from_secs(1800)).await;
+    //         delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
+    //     }
+    // });
 }
 
 // 读取数据
@@ -94,23 +90,19 @@ pub async fn data_listener(response: ResponseData) {
         return;
     }
     match response.channel.as_str() {
-        // 订单流数据
-        "trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BybitSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为gate的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
+        // k线数据
+        "kline.1m" => {
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BybitSwap, &response);
 
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
+            if records.is_empty() {
+                return;
             }
-        }
+
+            info!(?records);
+
+            let record_map= RECORD_MAP.lock().await;
+            update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
+        },
         _ => {
             info!("48 未知的数据类型: {:?}", response)
         }

+ 0 - 117
src/coinex_usdt_swap_data_listener.rs

@@ -1,117 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use chrono::Utc;
-use lazy_static::lazy_static;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::coinex_swap_rest::CoinexSwapRest;
-use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs, CoinexSwapWsType};
-// use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs};
-use exchanges::response_base::ResponseData;
-use rust_decimal_macros::dec;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
-
-const EXCHANGE_NAME: &str = "coinex_usdt_swap";
-
-lazy_static! {
-    // 给其他模块使用
-    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "coinex_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut coinex_rest = CoinexSwapRest::new(login);
-    let mut symbols = vec![];
-    loop {
-        let response = coinex_rest.get_market_details("usdt".to_string()).await;
-        if response.code == 200 {
-            let data = response.data.as_array().unwrap();
-            for info in data {
-                let s = info["market"].as_str().unwrap();
-                if !s.ends_with("USDT") { continue; }
-                let symbol = s.to_string().replace("USDT", "_USDT");
-                symbols.push(symbol)
-            }
-        }
-        if symbols.len() > 0 { break; } else { tokio::time::sleep(Duration::from_secs(2)).await; }
-    }
-
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = CoinexSwapWs::new_with_tag(ws_name, false, None, CoinexSwapWsType::PublicAndPrivate).clone();
-            ws.set_subscribe(vec![
-                CoinexSwapSubscribeType::PuFuturesDeals,
-                // GateSwapSubscribeType::PuFuturesOrderBook
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(60)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 订单流数据
-        "futures.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::CoinexSwap, &response);
-
-            for trade in trades.iter_mut() {
-                // 更新为计价币的量
-                trade.size = trade.price * trade.size;
-                trade.size.rescale(2);
-
-                // 订单流数据更新
-                let trades_map_1 = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map_1, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("85 未知的数据类型: {:?}", response)
-        }
-    }
-}

+ 0 - 117
src/gate_coin_spot_data_listener.rs

@@ -1,117 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use chrono::Utc;
-use lazy_static::lazy_static;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::gate_spot_rest::GateSpotRest;
-use exchanges::gate_spot_ws::{GateSpotSubscribeType, GateSpotWs, GateSpotWsType};
-use exchanges::response_base::ResponseData;
-use rust_decimal_macros::dec;
-use serde_json::json;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
-
-const EXCHANGE_NAME: &str = "gate_coin_spot";
-
-lazy_static! {
-    // 给其他模块使用
-    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "gate_coin_spot_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut gate_rest = GateSpotRest::new(false, login);
-    let params = json!({});
-    let response = gate_rest.get_market_details(params).await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data.as_array().unwrap();
-        for symbol_info in symbol_infos {
-            let trade_status = symbol_info["trade_status"].as_str().unwrap();
-            // let quote = symbol_info["quote"].as_str().unwrap();
-            // if trade_status != "tradable" || quote == "USDT" { continue; };
-            if trade_status != "tradable" { continue; };
-            let symbol = symbol_info["id"].as_str().unwrap().to_string();
-            symbols.push(symbol)
-        }
-    }
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
-            ws.set_subscribe(vec![
-                GateSpotSubscribeType::PuSpotTrades,
-                // GateSpotSubscribeType::PuSpotCandlesticks,
-                // GateSpotSubscribeType::PuFuturesOrderBook,
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(70)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 订单流数据
-        "spot.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSpot, &response);
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为gate的量都是张数
-                let mut real_size = trade.size * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}

+ 0 - 120
src/gate_usdt_swap_data_listener.rs

@@ -1,120 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::str::FromStr;
-use std::sync::{Arc};
-use std::sync::atomic::{AtomicBool};
-use std::time::Duration;
-use chrono::Utc;
-use lazy_static::lazy_static;
-use rust_decimal::Decimal;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::gate_swap_rest::GateSwapRest;
-use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
-use exchanges::response_base::ResponseData;
-use rust_decimal_macros::dec;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
-
-const EXCHANGE_NAME: &str = "gate_usdt_swap";
-
-lazy_static! {
-    // 给其他模块使用
-    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "gate_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut gate_rest = GateSwapRest::new(false, login);
-    let response = gate_rest.get_market_details("usdt".to_string()).await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data.as_array().unwrap();
-        let mut mul_map = MUL_MAP.lock().await;
-        for symbol_info in symbol_infos {
-            // quanto_multiplier是ct_val
-            let symbol = symbol_info["name"].as_str().unwrap().to_string();
-            let mul = Decimal::from_str(symbol_info["quanto_multiplier"].as_str().unwrap().to_string().as_str()).unwrap();
-            mul_map.insert(symbol.clone(), mul);
-
-            symbols.push(symbol)
-        }
-    }
-
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = GateSwapWs::new_with_tag(ws_name, false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
-            ws.set_subscribe(vec![
-                GateSwapSubscribeType::PuFuturesTrades,
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(55)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-    match response.channel.as_str() {
-        // 订单流数据
-        "futures.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为gate的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}

+ 20 - 24
src/listener_tools.rs

@@ -1,36 +1,32 @@
 use std::collections::HashMap;
-use std::str::FromStr;
+use rust_decimal::Decimal;
 use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
-use standard::{SpecialTrade, Trade};
+use standard::{Record};
 use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
 
-pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+pub type RecordMap = HashMap<String, Record>;
 
-// 更新订单流数据
-pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
-    if let Some(trades) = trades_map.get_mut(new_trade.symbol.as_str()) {
-        if let Some(last_trade) = trades.last() {
-            // 这里的last_trade.0是以元组形式进行访问。
-            let last_trade_minutes = i64::from_str(last_trade.inner()[1].as_str()).unwrap() / 60000;    // 将毫秒转换成分钟数
-            let new_trade_minutes = new_trade.time.to_i64().unwrap() / 60000;                           // 同上
-
-            // 如果分钟数不同,则清空列表并添加新的trade
-            if last_trade_minutes != new_trade_minutes {
-                let depths_json = serde_json::to_string(trades).unwrap();
-                let date_str = minute_to_date(last_trade_minutes);
-                let path = generate_file_path(exchange, date_str.as_str(), new_trade.symbol.as_str(), "trades", last_trade_minutes);
+// 更新k线
+pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_, RecordMap>, exchange: &str) {
+    if new_record.time.eq(&Decimal::ZERO) {
+        return;
+    }
 
-                // info!(?path);
+    // 如果k线记录存在于map,则进行一系列操作,用于保存map
+    if let Some(record) = records_map.get_mut(new_record.symbol.as_str()) {
+        let last_trade_minutes = record.time.to_i64().unwrap() / 60000;             // 将毫秒转换成分钟数
+        let new_trade_minutes = new_record.time.to_i64().unwrap() / 60000;          // 同上
 
-                write_to_file(depths_json, path).await;
+        // 如果分钟数不同,则清空列表并添加新的depth
+        if last_trade_minutes != new_trade_minutes {
+            let record_json = serde_json::to_string(record).unwrap();
+            let date_str = minute_to_date(last_trade_minutes);
+            let path = generate_file_path(exchange, date_str.as_str(), new_record.symbol.as_str(), "record", last_trade_minutes);
 
-                trades.clear();
-            }
+            write_to_file(record_json, path).await;
         }
-        trades.push(SpecialTrade::new(&new_trade));
-    } else {
-        // 如果该symbol不存在,则创建新的Vec并添加trade
-        trades_map.insert(new_trade.symbol.clone(), vec![SpecialTrade::new(&new_trade)]);
     }
+
+    records_map.insert(new_record.symbol.clone(), new_record.clone());
 }

+ 5 - 17
src/main.rs

@@ -2,16 +2,9 @@ mod json_db_utils;
 mod control_c;
 mod server;
 mod listener_tools;
-mod binance_usdt_swap_data_listener;
-mod gate_usdt_swap_data_listener;
-mod coinex_usdt_swap_data_listener;
-mod phemex_usdt_swap_data_listener;
-mod mexc_usdt_swap_data_listener;
 mod bybit_usdt_swap_data_listener;
-mod bitget_usdt_swap_data_listener;
-mod gate_coin_spot_data_listener;
-mod msv;
 mod rank;
+mod trend14400;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -28,22 +21,17 @@ fn log_level_init(log_str: String, port: u32, app_name: String) -> WorkerGuard {
 #[tokio::main(flavor = "multi_thread")]
 async fn main() {
     // 日志级别配置
-    let _ = log_level_init("info".to_string(), 28888, "micro_rank".to_string());
+    let _guard = log_level_init("info".to_string(), 14400, "14400trend".to_string());
     // 掌控全局的关闭
     let running = Arc::new(AtomicBool::new(true));
     // 初始化数据服务器
-    server::run_server(28888, running.clone());
+    server::run_server(14400, running.clone());
     // ctrl c退出检查程序
     control_c::exit_handler(running.clone());
+
     // 启动各交易所的数据监听器
-    binance_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
-    coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
-    bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_coin_spot_data_listener::run_listener(running.clone()).await;
+
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 0 - 123
src/mexc_usdt_swap_data_listener.rs

@@ -1,123 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use chrono::Utc;
-use lazy_static::lazy_static;
-use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
-use tokio::sync::{Mutex};
-use tracing::info;
-use exchanges::mexc_swap_rest::MexcSwapRest;
-use exchanges::mexc_swap_ws::{MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
-use exchanges::response_base::ResponseData;
-use rust_decimal::prelude::FromPrimitive;
-use serde_json::json;
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
-
-const EXCHANGE_NAME: &str = "mexc_usdt_swap";
-
-lazy_static! {
-    // 给其他模块使用
-    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "mexc_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut mexc_rest = MexcSwapRest::new(false, login);
-    let params = json!({});
-    let response = mexc_rest.get_market(params).await;
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data.as_array().unwrap();
-        let mut mul_map = MUL_MAP.lock().await;
-        for symbol_info in symbol_infos {
-            // quanto_multiplier是ct_val
-            let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
-            let mul = Decimal::from_f64(symbol_info["contractSize"].as_f64().unwrap()).unwrap();
-            mul_map.insert(symbol.clone(), mul);
-
-            symbols.push(symbol)
-        }
-    }
-
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = MexcSwapWs::new_with_tag(ws_name, false, None, MexcSwapWsType::PublicAndPrivate);
-            ws.set_subscribe(vec![
-                MexcSwapSubscribeType::PuFuturesTrades,
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(65)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            tokio::time::sleep(Duration::from_secs(1800)).await;
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 订单流数据
-        "futures.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::MexcSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为mexc的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}

+ 0 - 376
src/msv.rs

@@ -1,376 +0,0 @@
-use std::cmp::{max, min};
-use std::str::FromStr;
-use rust_decimal::{Decimal, MathematicalOps};
-use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
-use rust_decimal_macros::dec;
-use serde::{Deserialize, Serialize};
-use serde_json::{Value};
-use standard::{SimpleDepth, Trade};
-
-
-/// 技术指标结构体
-/// - `msv(Vec<Vec<Decimal>>)`: msv
-/// - `liqs(Vec<Vec<Decimal>>)`: liqs
-/// - `eprs(Vec<Vec<Decimal>>)`: eprs
-/// - `sigmas(Vec<Vec<Decimal>>)`: sigmas
-/// - `sigma_mas(Vec<Vec<Decimal>>)`: sigma_mas
-/// - `total_size(i64)`: total_size
-/// - `result_size(i64)`: result_size
-#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct Indicators {
-    pub msv: Vec<Vec<Decimal>>,
-    pub liqs: Vec<Vec<Decimal>>,
-    pub eprs: Vec<Vec<Decimal>>,
-    pub sigmas: Vec<Vec<Decimal>>,
-    pub sigma_mas: Vec<Vec<Decimal>>,
-    pub total_size: i64,
-    pub result_size: i64,
-}
-
-// 将trades转换为具体指标 trades 50 [] stime etime
-pub fn generate_msv_by_trades(mut trades: Vec<Trade>, mills_back: Decimal, simple_depths: Vec<SimpleDepth>, start_time: i64, end_time: i64) -> Indicators {
-    // 具体波动
-    let mut msv_data: Vec<Vec<Decimal>> = vec![];
-    // 预期利润幅度(except_profit_rate)
-    let mut epr_data: Vec<Vec<Decimal>> = vec![];
-    // 波动率sigma
-    let mut sigma_data: Vec<Vec<Decimal>> = vec![];
-
-    const GAMMA: Decimal = dec!(0.5);
-
-    // ================== 计算每个点的具体波动率以及回溯幅度 ===================
-    trades.sort_by(|a, b| Decimal::from_str(a.id.as_str()).unwrap().cmp(&Decimal::from_str(b.id.as_str()).unwrap()));
-    for (index, trade) in trades.iter().enumerate() {
-        if index == 0 {
-            continue;
-        }
-
-        // 该元素向前遍历range毫秒
-        let mut range_index = index;
-        // 该区间的预定价格
-        let mut ref_price = trade.price;
-        let mut dissociation = Decimal::ZERO;
-        loop {
-            // 下标合法性判断
-            if range_index == 0 {
-                break;
-            }
-
-            let flag_trade = trades.get(range_index).unwrap();
-            let range_time = trade.time - flag_trade.time;
-            // 判断该ticker是否是range ms以外
-            if range_time > mills_back {
-                break;
-            }
-
-            ref_price = ref_price * GAMMA + flag_trade.price * (Decimal::ONE - GAMMA);
-            dissociation = dissociation + flag_trade.size.abs();
-
-            range_index -= 1;
-        }
-
-        // 获取到range毫秒以后的预定价格,计算回去的幅度
-        let mut future_ref_price_sum = Decimal::ZERO;
-        let mut future_ref_count = Decimal::ZERO;
-        let mut future_range_index = index + 1;
-        loop {
-            // 下标合法性判断
-            if future_range_index >= trades.len() {
-                break;
-            }
-
-            let flag_trade = trades.get(future_range_index).unwrap();
-            let range_time = flag_trade.time - trade.time;
-
-            // 判断该ticker是否是range ms以外
-            if range_time > mills_back && future_ref_count > Decimal::ZERO {
-                break;
-            }
-
-            future_range_index += 1;
-            future_ref_price_sum += flag_trade.price;
-            future_ref_count += Decimal::ONE;
-        }
-        let future_ref_price = if future_ref_count < Decimal::ONE {
-            trade.price
-        } else {
-            future_ref_price_sum / future_ref_count
-        };
-
-        // 计算过去至多100条数据的sigma值 sigma^2 = (1 / (tn-t0))*sum((S(tk) - S(tk-1)) ^ 2)
-        let mut sigma_index = index - 1;
-        let t_last = trade.time;
-
-        let mut _t_first = trade.time;
-        // 右值
-        let mut total_right = Decimal::ZERO;
-        loop {
-            let flag_trade = trades.get(sigma_index).unwrap();
-            let next_trade = trades.get(sigma_index + 1).unwrap();
-
-            // 下标合法性判断
-            if sigma_index == 0 || sigma_index + 100 <= index {
-                _t_first = flag_trade.time;
-                break;
-            }
-
-            // 计算差值
-            let diff = Decimal::ONE - flag_trade.price / next_trade.price;
-            total_right += diff * diff;
-
-            sigma_index = sigma_index - 1;
-        }
-        let sigma_square = if _t_first == t_last {
-            let time_diff = Decimal::ONE;
-            (Decimal::ONE / time_diff) * total_right
-        } else {
-            let time_diff = (t_last - _t_first) / Decimal::ONE_THOUSAND;
-            (Decimal::ONE / time_diff) * total_right
-        };
-        let mut sigma = sigma_square.sqrt().unwrap();
-        sigma.rescale(6);
-        // 计算过去至多100个sigma值的平均值
-        let sigma_ma = if sigma_data.len() > 0 {
-            let mut sigma_ma_index = sigma_data.len();
-            let mut sigma_total = Decimal::ZERO;
-            let mut sigma_count = Decimal::ZERO;
-            loop {
-                if sigma_ma_index == 0 || sigma_ma_index + 99 < sigma_data.len() {
-                    break;
-                }
-                // 步进
-                sigma_ma_index -= 1;
-                // 计算
-                sigma_total += sigma_data[sigma_ma_index][1];
-                sigma_count += Decimal::ONE;
-            }
-            let mut sigma_ma = sigma_total / sigma_count;
-            sigma_ma.rescale(6);
-
-            sigma_ma
-        } else {
-            sigma
-        };
-
-        // ==================== 波动逻辑计算 ====================
-        let last_price = trade.price;
-        let mut rate = Decimal::ONE_HUNDRED * (last_price - ref_price) / ref_price;
-        rate.rescale(2);
-        // 去除小数位之后,可以忽略一些太小的波动,减少图表生成压力
-        if rate.eq(&Decimal::ZERO) {
-            continue;
-        }
-
-        // ==================== 预期利润逻辑计算 ====================
-        // 首先计算未来一段时间的价格与现在的距离
-        let mut future_rate = Decimal::ONE_HUNDRED * (future_ref_price - last_price) / last_price;
-        future_rate.rescale(2);
-        // 根据具体向上波动还是向下波动来计算预期最大利润
-        let mut epr = if rate > Decimal::ZERO {
-            -future_rate
-        } else {
-            future_rate
-        };
-        epr = min(epr, rate.abs());
-
-        // 去重,以及保留最大的波动率
-        if msv_data.len() > 0 {
-            let last = msv_data[msv_data.len() - 1].clone();
-            let last_time = last[0];
-            let last_rate = last[1];
-
-            // 如果时间相同,则可能会进行remove等操作
-            if last_time == trade.time {
-                // 如果最新的波动率大于最后波动率
-                if rate.abs() > last_rate.abs() {
-                    msv_data.remove(msv_data.len() - 1);
-                    msv_data.push(vec![trade.time, rate, dissociation]);
-
-                    epr_data.remove(epr_data.len() - 1);
-                    epr_data.push(vec![trade.time, epr]);
-
-                    sigma_data.remove(sigma_data.len() - 1);
-                    sigma_data.push(vec![trade.time, sigma, sigma_ma]);
-                }
-            } else {
-                msv_data.push(vec![trade.time, rate, dissociation]);
-                epr_data.push(vec![trade.time, epr]);
-                sigma_data.push(vec![trade.time, sigma, sigma_ma]);
-            }
-        } else {
-            msv_data.push(vec![trade.time, rate, dissociation]);
-            epr_data.push(vec![trade.time, epr]);
-            sigma_data.push(vec![trade.time, sigma, sigma_ma]);
-        }
-    }
-
-    // 按时间序列填充数据
-    let mut msv_index = 0;
-    let mut final_msv_data: Vec<Vec<Decimal>> = vec![];
-    let mut final_epr_data: Vec<Vec<Decimal>> = vec![];
-    let mut final_sigma_data: Vec<Vec<Decimal>> = vec![];
-    let mut final_sigma_ma_data: Vec<Vec<Decimal>> = vec![];
-
-    let mut depth_index = 0;
-    let mut final_volume_data: Vec<Vec<Decimal>> = vec![];
-
-    let mut index_timestamp = Decimal::from_i64(start_time).unwrap();
-    let last_timestamp = Decimal::from_i64(end_time).unwrap();
-    let step_timestamp = dec!(1000);
-    loop {
-        let mut max_msv_data = Decimal::ZERO;
-        let mut max_msv_qty_data = Decimal::ZERO;
-        let mut max_epr_data = Decimal::ZERO;
-        let mut max_sigma_data = Decimal::ZERO;
-        let mut max_sigma_ma_data = Decimal::ZERO;
-
-        // ====================================== 数据生产 ===============================================
-        // 获取时间范围内的波动率数据
-        loop {
-            // 下标合法性判断
-            if msv_index >= msv_data.len() {
-                break;
-            }
-
-            // msv_data的指定下标数据不在时间范围内(时间范围:指的是[index_timestamp-mills_back, index_timestamp]这个范围)
-            if index_timestamp < msv_data[msv_index][0] {
-                break;
-            }
-
-            // -------------- 大小判断,取值
-            let msv_d = msv_data[msv_index][1];
-            let msv_qty_data = msv_data[msv_index][2];
-            let epr_d = epr_data[msv_index][1];
-            let sigma_d = sigma_data[msv_index][1];
-            let sigma_ma_d = sigma_data[msv_index][2];
-            // msv波动数据
-            if max_msv_data.abs() < msv_d.abs() {
-                max_msv_data = msv_d;
-                max_msv_qty_data = msv_qty_data;
-                max_epr_data = epr_d;
-                max_sigma_data = sigma_d;
-                max_sigma_ma_data = sigma_ma_d;
-            }
-            // // 波动率sigma
-            // if max_sigma_data.abs() < sigma_d {
-            //     max_sigma_data = sigma_d;
-            // }
-
-            // 下标步近
-            msv_index = msv_index + 1;
-        }
-
-        // 获取时间范围内的深度数据、买一及卖一价数据
-        let mut max_size = Decimal::ZERO;
-        let mut min_size = Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND;
-        loop {
-            // 下标合法性判断
-            if depth_index >= simple_depths.len() {
-                break;
-            }
-            let depth = &simple_depths[depth_index];
-            // 时间范围合法性判断,只统计那一秒以内的深度总交易量
-            if index_timestamp < depth.time {
-                break;
-            }
-            // 这一秒的深度最大值、最小值
-            max_size = max(max_size, depth.size);
-            min_size = min(min_size, depth.size);
-            // 下标步近
-            depth_index += 1;
-        }
-
-        // ====================================== 智能填充数据 ===============================================
-        // 流动性数据叠加
-        // let rst_size = if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && final_depth_data.len() > 0 {
-        //     final_depth_data.last().unwrap()[1]
-        // } else {
-        //     if (max_size == Decimal::ZERO || min_size == Decimal::ONE_THOUSAND * Decimal::ONE_THOUSAND) && simple_depths.len() > 0 {
-        //         simple_depths[0].size
-        //     } else {
-        //         if simple_depths.len() > 0 {
-        //             (max_size + min_size) / Decimal::TWO
-        //         } else {
-        //             Decimal::ZERO
-        //         }
-        //     }
-        // };
-        // final_depth_data.push(vec![index_timestamp, rst_size]);
-        //
-        // // 建议开仓距离
-        // let mut rst_spread = if rst_size == Decimal::ZERO {
-        //     Decimal::ZERO
-        // } else {
-        //     dec!(10000) / rst_size
-        // };
-        // rst_spread.rescale(6);
-        // final_spread_data.push(vec![index_timestamp, rst_spread]);
-
-        // 波动率数据处理
-        // 如果这两个值为0,则代表这mills_back毫秒以内是没有数据的,填充0数据,使得x轴是完整的
-        if max_msv_data == Decimal::ZERO {
-            final_msv_data.push(vec![index_timestamp, Decimal::ZERO, Decimal::ZERO]);
-            final_epr_data.push(vec![index_timestamp, Decimal::ZERO]);
-            final_volume_data.push(vec![index_timestamp, Decimal::ZERO]);
-
-            if final_sigma_data.len() > 0 {
-                final_sigma_data.push(vec![index_timestamp, final_sigma_data.last().unwrap()[1]]);
-                final_sigma_ma_data.push(vec![index_timestamp, final_sigma_ma_data.last().unwrap()[1]]);
-            } else {
-                final_sigma_data.push(vec![index_timestamp, Decimal::ZERO]);
-                final_sigma_ma_data.push(vec![index_timestamp, Decimal::ZERO]);
-            }
-
-            // 说明在这个时间范围内是有数据存在的,将各类副图放置完全
-        } else {
-            final_msv_data.push(vec![index_timestamp, max_msv_data, max_msv_qty_data]);
-            final_epr_data.push(vec![index_timestamp, max_epr_data]);
-
-            let mut final_qty = max_msv_qty_data / Decimal::ONE_THOUSAND;
-            final_qty.rescale(2);
-            final_volume_data.push(vec![index_timestamp, final_qty]);
-
-            final_sigma_data.push(vec![index_timestamp, max_sigma_data]);
-            final_sigma_ma_data.push(vec![index_timestamp, max_sigma_ma_data]);
-        }
-
-        // ====================================== 时间步进处理 ======================================
-        // 对时间进行步近
-        index_timestamp = index_timestamp + step_timestamp;
-        // 时间越界
-        if index_timestamp > last_timestamp {
-            break;
-        }
-    }
-
-    // 结果统计
-    let total_size = trades.len().to_i64().unwrap();
-    let result_size = final_msv_data.len().to_i64().unwrap();
-    Indicators {
-        msv: final_msv_data,
-        liqs: final_volume_data,
-        eprs: final_epr_data,
-        sigmas: final_sigma_data,
-        sigma_mas: final_sigma_ma_data,
-        total_size,
-        result_size,
-    }
-}
-
-// 将json转换为trades
-pub fn parse_json_to_trades(trades_json: Value) -> Vec<Trade> {
-    let mut rst = vec![];
-
-    for trade_json in trades_json.as_array().unwrap() {
-        let arr = trade_json.as_array().unwrap();
-        rst.push(Trade {
-            id: arr[0].as_str().unwrap().to_string(),
-            time: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
-            size: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
-            price: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
-            symbol: "".to_string(),
-        });
-    }
-
-    rst
-}

+ 0 - 127
src/phemex_usdt_swap_data_listener.rs

@@ -1,127 +0,0 @@
-use std::collections::{BTreeMap, HashMap};
-use std::sync::{Arc};
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use chrono::Utc;
-use lazy_static::lazy_static;
-use rust_decimal::Decimal;
-use rust_decimal_macros::dec;
-use tokio::sync::{Mutex};
-use exchanges::phemex_swap_rest::PhemexSwapRest;
-use exchanges::phemex_swap_ws::{PhemexSwapSubscribeType, PhemexSwapWs, PhemexSwapWsType};
-use exchanges::response_base::ResponseData;
-use serde_json::{json};
-use standard::exchange::ExchangeEnum;
-use standard::exchange_struct_handler::ExchangeStructHandler;
-use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
-use crate::listener_tools::{TradeMap, update_trade};
-use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
-
-const EXCHANGE_NAME: &str = "phemex_usdt_swap";
-
-lazy_static! {
-    // 给其他模块使用
-    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
-
-    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-}
-
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let name = "phemex_usdt_swap_listener";
-    // 订阅所有币种
-    let login = BTreeMap::new();
-    let mut phemex_rest = PhemexSwapRest::new(false, login);
-    let params = json!({});
-    let response = phemex_rest.get_market(params).await;
-    let mut loc_symbols = vec![];
-    let mut symbols = vec![];
-    if response.code == 200 {
-        let symbol_infos = response.data["perpProductsV2"].as_array().unwrap();
-        let mut mul_map = MUL_MAP.lock().await;
-        for symbol_info in symbol_infos {
-            if symbol_info["quoteCurrency"] != "USDT" || symbol_info["status"] == "Delisted" { continue; }
-            // quanto_multiplier是ct_val
-            let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
-            let base_currency: String = symbol_info["baseCurrency"].as_str().unwrap().to_string().split_whitespace().collect();
-            let loc_symbol = format!("{}_{}", base_currency, symbol_info["quoteCurrency"].as_str().unwrap());
-            let mul = Decimal::ONE;
-            mul_map.insert(loc_symbol.clone(), mul);
-
-            loc_symbols.push(loc_symbol);
-            symbols.push(symbol)
-        }
-    }
-    for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-
-        tokio::spawn(async move {
-            let mut ws = PhemexSwapWs::new_with_tag(ws_name, false, None, PhemexSwapWsType::PublicAndPrivate).clone();
-            ws.set_subscribe(vec![
-                PhemexSwapSubscribeType::PuFuturesTrades,
-                // PhemexSwapSubscribeType::PuFuturesRecords,
-                // PhemexSwapSubscribeType::PuFuturesOrderBook
-            ]);
-
-            // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-    }
-    // 每分钟计算msv
-    tokio::spawn(async move {
-        loop {
-            let end_timestamp = Utc::now().timestamp_millis();
-            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
-            for symbol in loc_symbols.clone() {
-                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
-                let trades = parse_json_to_trades(trades_value);
-                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
-                let mut indicator_map = INDICATOR_MAP.lock().await;
-                indicator_map.insert(symbol, msv);
-            }
-            tokio::time::sleep(Duration::from_secs(60)).await;
-        }
-    });
-    // 定时删除数据
-    tokio::spawn(async move {
-        loop {
-            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
-            tokio::time::sleep(Duration::from_secs(3600)).await;
-        }
-    });
-}
-
-// 读取数据
-pub async fn data_listener(response: ResponseData) {
-    if response.code != 200 {
-        return;
-    }
-
-    match response.channel.as_str() {
-        // 订单流数据
-        "futures.trades" => {
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::PhemexSwap, &response);
-            let mul_map = MUL_MAP.lock().await;
-
-            for trade in trades.iter_mut() {
-                // 真实交易量处理,因为phemex的量都是张数
-                let mul = mul_map[trade.symbol.as_str()];
-                let mut real_size = trade.size * mul * trade.price;
-                real_size.rescale(2);
-                trade.size = real_size;
-
-                // 更新到本地数据库
-                let trades_map = TRADES_MAP.lock().await;
-                update_trade(trade, trades_map, EXCHANGE_NAME).await;
-            }
-        }
-        _ => {
-            // info!("48 未知的数据类型: {:?}", response)
-        }
-    }
-}

+ 4 - 131
src/rank.rs

@@ -1,12 +1,9 @@
-use std::cmp::{min};
 use std::collections::HashMap;
-use rust_decimal::{Decimal, MathematicalOps};
-use rust_decimal_macros::dec;
+use rust_decimal::{Decimal};
 use serde::{Deserialize, Serialize};
 use serde_json::{Value};
 use tokio::sync::MutexGuard;
-use crate::json_db_utils::get_symbols_by_exchange;
-use crate::msv::Indicators;
+use crate::trend14400::Indicators;
 
 #[derive(Serialize, Deserialize, Clone)]
 pub struct Rank {
@@ -24,132 +21,8 @@ pub struct Rank {
     liquidity_avg: Decimal,
     is_binance: bool,
 }
-pub
-
-const ONE_QUARTER: Decimal = dec!(0.25);
-const ONE_PERCENT: Decimal = dec!(0.01);
-const TWENTY: Decimal = dec!(20);
-// const SIXTY: Decimal = dec!(60);
-const TWO_HUNDRED: Decimal = dec!(200);
-// const TEN_THOUSAND: Decimal = dec!(10000);
 
 // 根据最终的msv计算排行榜
-pub fn generate_rank_by_indicator_map(indicator_map: &MutexGuard<HashMap<String, Indicators>>) -> Value {
-    let mut rank_list: Vec<Rank> = vec![];
-    let binance_symbols = get_symbols_by_exchange("binance_usdt_swap").as_array().unwrap_or(&vec![]).clone();
-    for (key, indicators) in indicator_map.iter() {
-        let symbol = key.clone();
-        let is_binance = binance_symbols.iter().find(|item| item.as_str().unwrap().to_string() == symbol).is_some();
-        // ============== msv相关数据的计算 =================
-        let mut msv_abs_total = Decimal::ZERO;
-        let mut msv_abs_max = Decimal::ZERO;
-        let mut msv_count = Decimal::ZERO;
-        let mut effective_epr_count = Decimal::ZERO;
-        let mut epr_total = Decimal::ZERO;
-        let mut epr_max = Decimal::ZERO;
-
-        // let start_time = indicators.msv[0][0];
-        // let end_time = indicators.msv[indicators.msv.len() - 1][0];
-        // let timing_difference = end_time - start_time;
-        // 0.175 0.225 0.275 0.325
-
-        for (index, value) in indicators.msv.iter().enumerate() {
-            let msv_abs_value = value[1].abs();
-
-            if msv_abs_value <= Decimal::ZERO {
-                continue;
-            }
-            // let a_scale = ((value[0] - start_time) / timing_difference).round_dp(2) + dec!(0.25);
-
-            // let scale = if a_scale > Decimal::ONE { Decimal::ONE } else { a_scale };
-            msv_count += Decimal::ONE;
-            // msv_abs_total += msv_abs_value * scale;
-            msv_abs_total += msv_abs_value;
-
-            if msv_abs_value > msv_abs_max {
-                msv_abs_max = msv_abs_value
-            }
-
-            let epr = &indicators.eprs[index];
-            if epr[1] > msv_abs_value * ONE_QUARTER || epr[1].abs() > ONE_PERCENT {
-                effective_epr_count += Decimal::ONE;
-                // epr_total += epr[1] * scale;
-                epr_total += epr[1];
-
-                if value[1] > epr_max {
-                    epr_max = value[1]
-                }
-            }
-        }
-        // 计算波动率平均值
-        let mut msv_abs_avg = if msv_count == Decimal::ZERO {
-            Decimal::ZERO
-        } else {
-            msv_abs_total / msv_count
-        };
-        msv_abs_avg.rescale(6);
-        // 开仓基准值, 除以200是先除以2,再除以100(因为系统开仓参数是0.01代表1%)
-        let mut coverted_open_base = (msv_abs_max + msv_abs_avg) / TWO_HUNDRED;
-        coverted_open_base.rescale(8);
-
-        // ============== liq相关数据的计算 =================
-        let mut liquidity_total = Decimal::ZERO;
-        for value in indicators.liqs.iter() {
-            liquidity_total += value[1] * Decimal::ONE_THOUSAND;
-        }
-        let mut liquidity_avg = if msv_count == Decimal::ZERO {
-            Decimal::ZERO
-        } else {
-            liquidity_total / msv_count
-        };
-        liquidity_avg.rescale(0);
-
-        // ======================== 最终计算 =========================
-        // msv_score计算规则
-        let mut msv_score = if msv_abs_total > Decimal::ZERO && epr_total > Decimal::ZERO {
-            epr_total * (epr_total / msv_abs_total)
-        } else {
-            Decimal::ZERO
-        };
-        msv_score.rescale(2);
-
-        // liquidity_score
-        let mut liquidity_score = min((liquidity_avg / Decimal::ONE_THOUSAND).sqrt().unwrap(), TWENTY);
-        liquidity_score.rescale(2);
-
-        // frequency_score计算规则
-        let mut frequency_score = ((effective_epr_count / Decimal::ONE_THOUSAND) * TWENTY).sqrt().unwrap();
-        frequency_score.rescale(2);
-
-        let temp = msv_score * liquidity_score * frequency_score;
-        let mut score = if temp > Decimal::ONE {
-            Decimal::TEN * temp.log10()
-        } else {
-            Decimal::ZERO
-        };
-        score.rescale(2);
-
-        let rank = Rank {
-            symbol,
-            score,
-            msv_score,
-            liquidity_score,
-            frequency_score,
-            msv_abs_total,
-            msv_abs_max,
-            msv_abs_avg,
-            coverted_open_base,
-            epr_total,
-            effective_count: msv_count,
-            liquidity_avg,
-            is_binance,
-        };
-
-        rank_list.push(rank);
-    }
-
-    // 按 score 倒序排序
-    rank_list.sort_by(|a, b| b.score.cmp(&a.score));
-
-    return serde_json::to_value(&rank_list).unwrap();
+pub fn generate_rank_by_indicator_map(_indicator_map: &MutexGuard<HashMap<String, Indicators>>) -> Value {
+    Value::Null
 }

+ 32 - 32
src/server.rs

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
 use actix_cors::Cors;
-use crate::{binance_usdt_swap_data_listener, bitget_usdt_swap_data_listener, bybit_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_coin_spot_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
+use crate::{bybit_usdt_swap_data_listener, rank};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -20,7 +20,7 @@ impl RankQuery {
             return false;
         }
 
-        return true;
+        true
     }
 }
 
@@ -105,34 +105,34 @@ async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
     }
     let exchange = query.exchange.clone().unwrap().clone();
     let indicators = match exchange.as_str() {
-        "gate_usdt_swap" => {
-            gate_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        }
-        "coinex_usdt_swap" => {
-            coinex_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        }
-        "binance_usdt_swap" => {
-            binance_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        }
-        "phemex_usdt_swap" => {
-            phemex_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        }
-        "mexc_usdt_swap" => {
-            mexc_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        }
+        // "gate_usdt_swap" => {
+        //     gate_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+        // }
+        // "coinex_usdt_swap" => {
+        //     coinex_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+        // }
+        // "binance_usdt_swap" => {
+        //     binance_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+        // }
+        // "phemex_usdt_swap" => {
+        //     phemex_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+        // }
+        // "mexc_usdt_swap" => {
+        //     mexc_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+        // }
         "bybit_usdt_swap" => {
             bybit_usdt_swap_data_listener::INDICATOR_MAP.lock().await
         }
-        "bitget_usdt_swap" => {
-            bitget_usdt_swap_data_listener::INDICATOR_MAP.lock().await
-        }
-        "gate_coin_spot" => {
-            gate_coin_spot_data_listener::INDICATOR_MAP.lock().await
-        }
+        // "bitget_usdt_swap" => {
+        //     bitget_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+        // }
+        // "gate_coin_spot" => {
+        //     gate_coin_spot_data_listener::INDICATOR_MAP.lock().await
+        // }
         _ => {
             let response = Response {
                 query: serde_json::to_value(&query.into_inner()).unwrap(),
-                msg: Some("查询内容有误,exchange当前仅支持:[gate_usdt_swap, coinex_usdt_swap, binance_usdt_swap, phemex_usdt_swap, mexc_usdt_swap]".to_string()),
+                msg: Some("查询内容有误,exchange当前仅支持:[bybit_usdt_swap]".to_string()),
                 code: 500,
                 data: Value::Null,
             };
@@ -170,14 +170,14 @@ async fn get_exchanges() -> impl Responder {
         // "cointr_usdt_swap",
         // "binance_usdt_swap",
         // "htx_usdt_swap",
-        "binance_usdt_swap",
-        "gate_usdt_swap",
-        "coinex_usdt_swap",
-        "phemex_usdt_swap",
-        "mexc_usdt_swap",
+        // "binance_usdt_swap",
+        // "gate_usdt_swap",
+        // "coinex_usdt_swap",
+        // "phemex_usdt_swap",
+        // "mexc_usdt_swap",
         "bybit_usdt_swap",
-        "bitget_usdt_swap",
-        "gate_coin_spot",
+        // "bitget_usdt_swap",
+        // "gate_coin_spot",
     ];
     let response_data = json!(exchanges);
 
@@ -215,7 +215,7 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
         .expect("Bind port error")
         .run();
 
-    info!("数据仓库服务已运行。");
+    info!("14400trend服务已运行。");
 
     let r = running.clone();
     tokio::spawn(async move {

+ 18 - 0
src/trend14400.rs

@@ -0,0 +1,18 @@
+use rust_decimal::{Decimal};
+use serde::{Deserialize, Serialize};
+use standard::{Record};
+
+
+/// 技术指标结构体
+/// - `trend_strength(Decimal)`: 趋势强度
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct Indicators {
+    pub trend_strength: Decimal,
+}
+
+// 将records转为1440trend
+pub fn generate_14400trend_by_records(_records: &Vec<Record>) -> Indicators {
+    Indicators {
+        trend_strength: Decimal::ZERO,
+    }
+}

+ 24 - 2
standard/src/bybit_swap_handle.rs

@@ -3,7 +3,8 @@ use std::time::SystemTime;
 use rust_decimal::Decimal;
 use exchanges::response_base::ResponseData;
 use rust_decimal::prelude::FromPrimitive;
-use crate::{Trade};
+use serde_json::Value;
+use crate::{Record, Trade};
 
 pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
     let result = res_data.data["data"].as_array().unwrap();
@@ -23,4 +24,25 @@ pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
     }
 
     return trades;
-}
+}
+
+pub fn handle_records(value: &Value) -> Vec<Record> {
+    let mut records = vec![];
+    let s = value["topic"].as_str().unwrap().to_string();
+    let s_split: Vec<String> = s.split(".").map(|s| s.to_string()).collect();
+    let symbol = s_split[2].replace("USDT", "_USDT");
+    let data = value["data"].clone();
+    for record_value in data.as_array().unwrap() {
+        records.push(Record {
+            time: Decimal::from_i64(record_value["timestamp"].as_i64().unwrap()).unwrap(),
+            open: Decimal::from_str(record_value["open"].as_str().unwrap()).unwrap(),
+            high: Decimal::from_str(record_value["high"].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(record_value["low"].as_str().unwrap()).unwrap(),
+            close: Decimal::from_str(record_value["close"].as_str().unwrap()).unwrap(),
+            volume: Decimal::from_str(record_value["volume"].as_str().unwrap()).unwrap(),
+            symbol: symbol.clone(),
+        });
+    }
+
+    return records;
+}

+ 14 - 1
standard/src/exchange_struct_handler.rs

@@ -1,6 +1,7 @@
 use exchanges::response_base::ResponseData;
+use tracing::error;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, bitget_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_spot_handle, gate_swap_handle, mexc_swap_handle, phemex_swap_handle};
+use crate::{binance_swap_handle, bitget_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_spot_handle, gate_swap_handle, mexc_swap_handle, phemex_swap_handle, Record};
 use crate::{Trade};
 
 #[allow(dead_code)]
@@ -37,4 +38,16 @@ impl ExchangeStructHandler {
             }
         }
     }
+    // 处理蜡烛信息
+    pub fn records_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Vec<Record> {
+        match exchange {
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_records(&res_data.data)
+            }
+            _ => {
+                error!("未找到该交易所!records_handle: {:?}", exchange);
+                panic!("未找到该交易所!records_handle: {:?}", exchange);
+            }
+        }
+    }
 }