|
|
@@ -1,131 +1,131 @@
|
|
|
-// use std::collections::{BTreeMap, HashMap};
|
|
|
-// use std::sync::{Arc};
|
|
|
-// use std::sync::atomic::AtomicBool;
|
|
|
-// use std::time::Duration;
|
|
|
-// use lazy_static::lazy_static;
|
|
|
-// use rust_decimal::Decimal;
|
|
|
-// use rust_decimal_macros::dec;
|
|
|
-// use tokio::sync::{Mutex};
|
|
|
-// use tracing::info;
|
|
|
-// use exchanges::mexc_swap_rest::MexcSwapRest;
|
|
|
-// use exchanges::mexc_swap_ws::{MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
|
|
|
-// use exchanges::response_base::ResponseData;
|
|
|
-// use rust_decimal::prelude::FromPrimitive;
|
|
|
-// use serde_json::json;
|
|
|
-// use standard::exchange::ExchangeEnum;
|
|
|
-// use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
-// use crate::json_db_utils::delete_db_by_exchange;
|
|
|
-// use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
|
|
|
-//
|
|
|
-// const EXCHANGE_NAME: &str = "mexc_usdt_swap";
|
|
|
-//
|
|
|
-// lazy_static! {
|
|
|
-// // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
|
|
|
-// static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
|
|
|
-// static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
|
|
|
-// static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
|
|
|
-// }
|
|
|
-//
|
|
|
-// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
-// let name = "mexc_usdt_swap_listener";
|
|
|
-// // 订阅所有币种
|
|
|
-// let login = BTreeMap::new();
|
|
|
-// let mut mexc_rest = MexcSwapRest::new(false, login);
|
|
|
-// let params = json!({});
|
|
|
-// let response = mexc_rest.get_market(params).await;
|
|
|
-// let mut symbols = vec![];
|
|
|
-// if response.code == 200 {
|
|
|
-// let symbol_infos = response.data.as_array().unwrap();
|
|
|
-// let mut mul_map = MUL_MAP.lock().await;
|
|
|
-// for symbol_info in symbol_infos {
|
|
|
-// // quanto_multiplier是ct_val
|
|
|
-// let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
|
|
|
-// let mul = Decimal::from_f64(symbol_info["contractSize"].as_f64().unwrap()).unwrap();
|
|
|
-// mul_map.insert(symbol.clone(), mul);
|
|
|
-//
|
|
|
-// symbols.push(symbol)
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// for chunk in symbols.chunks(20) {
|
|
|
-// let ws_name = name.to_string();
|
|
|
-// let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
-// let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
-// let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
|
|
|
-// let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
-//
|
|
|
-// tokio::spawn(async move {
|
|
|
-// let mut ws = MexcSwapWs::new_with_tag(ws_name, false, None, MexcSwapWsType::PublicAndPrivate);
|
|
|
-// ws.set_subscribe(vec![
|
|
|
-// MexcSwapSubscribeType::PuFuturesTrades,
|
|
|
-// MexcSwapSubscribeType::PuFuturesRecords,
|
|
|
-// // MexcSwapSubscribeType::PuFuturesOrderBook
|
|
|
-// ]);
|
|
|
-//
|
|
|
-// // 建立链接
|
|
|
-// ws.set_symbols(symbols_chunk);
|
|
|
-// ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
-// });
|
|
|
-// }
|
|
|
-// // 定时删除数据
|
|
|
-// tokio::spawn(async move {
|
|
|
-// loop {
|
|
|
-// delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
|
|
|
-// tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
|
|
|
-// }
|
|
|
-// });
|
|
|
-// }
|
|
|
-//
|
|
|
-// // 读取数据
|
|
|
-// pub async fn data_listener(response: ResponseData) {
|
|
|
-// if response.code != 200 {
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// match response.channel.as_str() {
|
|
|
-// // 深度数据
|
|
|
-// "futures.order_book" => {
|
|
|
-// // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::MexcSwap, &response);
|
|
|
-// //
|
|
|
-// // update_depth(&depth).await;
|
|
|
-// }
|
|
|
-// // 订单流数据
|
|
|
-// "futures.trades" => {
|
|
|
-// let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::MexcSwap, &response);
|
|
|
-// let mul_map = MUL_MAP.lock().await;
|
|
|
-//
|
|
|
-// for trade in trades.iter_mut() {
|
|
|
-// // 真实交易量处理,因为mexc的量都是张数
|
|
|
-// let mul = mul_map[trade.symbol.as_str()];
|
|
|
-// let mut real_size = trade.size * mul * trade.price;
|
|
|
-// real_size.rescale(2);
|
|
|
-// trade.size = real_size;
|
|
|
-//
|
|
|
-// // 更新到本地数据库
|
|
|
-// let trades_map = TRADES_MAP.lock().await;
|
|
|
-// update_trade(trade, trades_map, EXCHANGE_NAME).await;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// // k线数据
|
|
|
-// "futures.candlesticks" => {
|
|
|
-// let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::MexcSwap, &response);
|
|
|
-//
|
|
|
-// let mul_map = MUL_MAP.lock().await;
|
|
|
-// for record in records.iter_mut() {
|
|
|
-// // 真实交易量处理,因为mexc的量都是张数
|
|
|
-// let mul = mul_map[record.symbol.as_str()];
|
|
|
-// let mid_price = (record.high + record.low) * dec!(0.5);
|
|
|
-// let mut real_volume = record.volume * mul * mid_price;
|
|
|
-// real_volume.rescale(2);
|
|
|
-// record.volume = real_volume;
|
|
|
-//
|
|
|
-// // 更新到本地数据库
|
|
|
-// let record_map = RECORD_MAP.lock().await;
|
|
|
-// update_record(record, record_map, EXCHANGE_NAME).await;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// _ => {
|
|
|
-// info!("48 未知的数据类型: {:?}", response)
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
+use std::collections::{BTreeMap, HashMap};
|
|
|
+use std::sync::{Arc};
|
|
|
+use std::sync::atomic::AtomicBool;
|
|
|
+use std::time::Duration;
|
|
|
+use lazy_static::lazy_static;
|
|
|
+use rust_decimal::Decimal;
|
|
|
+use rust_decimal_macros::dec;
|
|
|
+use tokio::sync::{Mutex};
|
|
|
+use tracing::info;
|
|
|
+use exchanges::mexc_swap_rest::MexcSwapRest;
|
|
|
+use exchanges::mexc_swap_ws::{MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
|
|
|
+use exchanges::response_base::ResponseData;
|
|
|
+use rust_decimal::prelude::FromPrimitive;
|
|
|
+use serde_json::json;
|
|
|
+use standard::exchange::ExchangeEnum;
|
|
|
+use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
+use crate::json_db_utils::delete_db_by_exchange;
|
|
|
+use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
|
|
|
+
|
|
|
+const EXCHANGE_NAME: &str = "mexc_usdt_swap";
|
|
|
+
|
|
|
+lazy_static! {
|
|
|
+ // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
|
|
|
+ static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
|
|
|
+ static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
|
|
|
+ static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
|
|
|
+}
|
|
|
+
|
|
|
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
+ let name = "mexc_usdt_swap_listener";
|
|
|
+ // 订阅所有币种
|
|
|
+ let login = BTreeMap::new();
|
|
|
+ let mut mexc_rest = MexcSwapRest::new(false, login);
|
|
|
+ let params = json!({});
|
|
|
+ let response = mexc_rest.get_market(params).await;
|
|
|
+ let mut symbols = vec![];
|
|
|
+ if response.code == 200 {
|
|
|
+ let symbol_infos = response.data.as_array().unwrap();
|
|
|
+ let mut mul_map = MUL_MAP.lock().await;
|
|
|
+ for symbol_info in symbol_infos {
|
|
|
+ // quanto_multiplier是ct_val
|
|
|
+ let symbol = symbol_info["symbol"].as_str().unwrap().to_string();
|
|
|
+ let mul = Decimal::from_f64(symbol_info["contractSize"].as_f64().unwrap()).unwrap();
|
|
|
+ mul_map.insert(symbol.clone(), mul);
|
|
|
+
|
|
|
+ symbols.push(symbol)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for chunk in symbols.chunks(20) {
|
|
|
+ let ws_name = name.to_string();
|
|
|
+ let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
+ let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
+ let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
|
|
|
+ let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
+
|
|
|
+ tokio::spawn(async move {
|
|
|
+ let mut ws = MexcSwapWs::new_with_tag(ws_name, false, None, MexcSwapWsType::PublicAndPrivate);
|
|
|
+ ws.set_subscribe(vec![
|
|
|
+ MexcSwapSubscribeType::PuFuturesTrades,
|
|
|
+ MexcSwapSubscribeType::PuFuturesRecords,
|
|
|
+ // MexcSwapSubscribeType::PuFuturesOrderBook
|
|
|
+ ]);
|
|
|
+
|
|
|
+ // 建立链接
|
|
|
+ ws.set_symbols(symbols_chunk);
|
|
|
+ ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
+ });
|
|
|
+ }
|
|
|
+ // 定时删除数据
|
|
|
+ tokio::spawn(async move {
|
|
|
+ loop {
|
|
|
+ delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
|
|
|
+ tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
|
|
|
+ }
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+// 读取数据
|
|
|
+pub async fn data_listener(response: ResponseData) {
|
|
|
+ if response.code != 200 {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ match response.channel.as_str() {
|
|
|
+ // 深度数据
|
|
|
+ "futures.order_book" => {
|
|
|
+ // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::MexcSwap, &response);
|
|
|
+ //
|
|
|
+ // update_depth(&depth).await;
|
|
|
+ }
|
|
|
+ // 订单流数据
|
|
|
+ "futures.trades" => {
|
|
|
+ let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::MexcSwap, &response);
|
|
|
+ let mul_map = MUL_MAP.lock().await;
|
|
|
+
|
|
|
+ for trade in trades.iter_mut() {
|
|
|
+ // 真实交易量处理,因为mexc的量都是张数
|
|
|
+ let mul = mul_map[trade.symbol.as_str()];
|
|
|
+ let mut real_size = trade.size * mul * trade.price;
|
|
|
+ real_size.rescale(2);
|
|
|
+ trade.size = real_size;
|
|
|
+
|
|
|
+ // 更新到本地数据库
|
|
|
+ let trades_map = TRADES_MAP.lock().await;
|
|
|
+ update_trade(trade, trades_map, EXCHANGE_NAME).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // k线数据
|
|
|
+ "futures.candlesticks" => {
|
|
|
+ let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::MexcSwap, &response);
|
|
|
+
|
|
|
+ let mul_map = MUL_MAP.lock().await;
|
|
|
+ for record in records.iter_mut() {
|
|
|
+ // 真实交易量处理,因为mexc的量都是张数
|
|
|
+ let mul = mul_map[record.symbol.as_str()];
|
|
|
+ let mid_price = (record.high + record.low) * dec!(0.5);
|
|
|
+ let mut real_volume = record.volume * mul * mid_price;
|
|
|
+ real_volume.rescale(2);
|
|
|
+ record.volume = real_volume;
|
|
|
+
|
|
|
+ // 更新到本地数据库
|
|
|
+ let record_map = RECORD_MAP.lock().await;
|
|
|
+ update_record(record, record_map, EXCHANGE_NAME).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ _ => {
|
|
|
+ info!("48 未知的数据类型: {:?}", response)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|