|
|
@@ -1,7 +1,10 @@
|
|
|
use std::collections::{BTreeMap, HashMap};
|
|
|
+use std::str::FromStr;
|
|
|
use std::sync::{Arc};
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
use lazy_static::lazy_static;
|
|
|
+use rust_decimal::Decimal;
|
|
|
+use rust_decimal_macros::dec;
|
|
|
use tokio::sync::{Mutex};
|
|
|
use tracing::info;
|
|
|
use exchanges::gate_swap_rest::GateSwapRest;
|
|
|
@@ -17,6 +20,7 @@ lazy_static! {
|
|
|
// static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
|
|
|
static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
|
|
|
static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
|
|
|
+ static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
|
|
|
}
|
|
|
|
|
|
pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
@@ -27,9 +31,15 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
let response = gate_rest.get_market_details("usdt".to_string()).await;
|
|
|
let mut symbols = vec![];
|
|
|
if response.code == 200 {
|
|
|
- let data = response.data.as_array().unwrap();
|
|
|
- for info in data {
|
|
|
- symbols.push(info["name"].as_str().unwrap().to_string())
|
|
|
+ let symbol_infos = response.data.as_array().unwrap();
|
|
|
+ let mut mul_map = MUL_MAP.lock().await;
|
|
|
+ for symbol_info in symbol_infos {
|
|
|
+ // quanto_multiplier是ct_val
|
|
|
+ let symbol = symbol_info["name"].as_str().unwrap().to_string();
|
|
|
+ let mul = Decimal::from_str(symbol_info["quanto_multiplier"].as_str().unwrap().to_string().as_str()).unwrap();
|
|
|
+ mul_map.insert(symbol.clone(), mul);
|
|
|
+
|
|
|
+ symbols.push(symbol)
|
|
|
}
|
|
|
}
|
|
|
info!(?symbols);
|
|
|
@@ -71,21 +81,36 @@ pub async fn data_listener(response: ResponseData) {
|
|
|
},
|
|
|
// 订单流数据
|
|
|
"futures.trades" => {
|
|
|
- let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response);
|
|
|
+ let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response);
|
|
|
+ let mul_map = MUL_MAP.lock().await;
|
|
|
|
|
|
- for trade in trades.iter() {
|
|
|
- let trades_map = TRADES_MAP.lock().await;
|
|
|
+ for trade in trades.iter_mut() {
|
|
|
+ // 真实交易量处理,因为gate的量都是张数
|
|
|
+ let mul = mul_map[trade.symbol.as_str()];
|
|
|
+ let mut real_size = trade.size * mul * trade.price;
|
|
|
+ real_size.rescale(2);
|
|
|
+ trade.size = real_size;
|
|
|
|
|
|
+ // 更新到本地数据库
|
|
|
+ let trades_map = TRADES_MAP.lock().await;
|
|
|
update_trade(trade, trades_map, EXCHANGE_NAME).await;
|
|
|
}
|
|
|
},
|
|
|
// k线数据
|
|
|
"futures.candlesticks" => {
|
|
|
- let records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSwap, &response);
|
|
|
+ let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSwap, &response);
|
|
|
|
|
|
- for record in records.iter() {
|
|
|
- let record_map= RECORD_MAP.lock().await;
|
|
|
+ let mul_map = MUL_MAP.lock().await;
|
|
|
+ for record in records.iter_mut() {
|
|
|
+ // 真实交易量处理,因为gate的量都是张数
|
|
|
+ let mul = mul_map[record.symbol.as_str()];
|
|
|
+ let mid_price = (record.high + record.low) * dec!(0.5);
|
|
|
+ let mut real_volume = record.volume * mul * mid_price;
|
|
|
+ real_volume.rescale(2);
|
|
|
+ record.volume = real_volume;
|
|
|
|
|
|
+ // 更新到本地数据库
|
|
|
+ let record_map= RECORD_MAP.lock().await;
|
|
|
update_record(record, record_map, EXCHANGE_NAME).await;
|
|
|
}
|
|
|
},
|