|
@@ -1,148 +1,148 @@
|
|
|
-use std::collections::{BTreeMap, HashMap};
|
|
|
|
|
-use std::sync::{Arc};
|
|
|
|
|
-use std::sync::atomic::AtomicBool;
|
|
|
|
|
-use std::time::Duration;
|
|
|
|
|
-use lazy_static::lazy_static;
|
|
|
|
|
-use rust_decimal::Decimal;
|
|
|
|
|
-use rust_decimal_macros::dec;
|
|
|
|
|
-use tokio::sync::{Mutex};
|
|
|
|
|
-use tracing::info;
|
|
|
|
|
-use exchanges::gate_spot_rest::GateSpotRest;
|
|
|
|
|
-use exchanges::gate_spot_ws::{GateSpotSubscribeType, GateSpotWs, GateSpotWsType};
|
|
|
|
|
-use exchanges::response_base::ResponseData;
|
|
|
|
|
-use serde_json::json;
|
|
|
|
|
-use standard::{Depth, OrderBook, SimpleDepth};
|
|
|
|
|
-use standard::exchange::ExchangeEnum;
|
|
|
|
|
-use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
|
|
-use crate::json_db_utils::delete_db_by_exchange;
|
|
|
|
|
-use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
|
|
|
|
|
-
|
|
|
|
|
-const EXCHANGE_NAME: &str = "gate_coin_spot";
|
|
|
|
|
-
|
|
|
|
|
-lazy_static! {
|
|
|
|
|
- static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
|
|
|
|
|
- static ref SIMPLE_DEPTH_MAP: Mutex<SimpleDepthMap> = Mutex::new(HashMap::new());
|
|
|
|
|
- static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
|
|
|
|
|
- static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
|
|
|
|
|
- static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
|
|
- let name = "gate_coin_spot_listener";
|
|
|
|
|
- // 订阅所有币种
|
|
|
|
|
- let login = BTreeMap::new();
|
|
|
|
|
- let mut gate_rest = GateSpotRest::new(false, login);
|
|
|
|
|
- let params = json!({});
|
|
|
|
|
- let response = gate_rest.get_market_details(params).await;
|
|
|
|
|
- let mut symbols = vec![];
|
|
|
|
|
- if response.code == 200 {
|
|
|
|
|
- let symbol_infos = response.data.as_array().unwrap();
|
|
|
|
|
- let mut mul_map = MUL_MAP.lock().await;
|
|
|
|
|
- for symbol_info in symbol_infos {
|
|
|
|
|
- let trade_status = symbol_info["trade_status"].as_str().unwrap();
|
|
|
|
|
- let quote = symbol_info["quote"].as_str().unwrap();
|
|
|
|
|
- if trade_status != "tradable" || quote == "USDT" { continue; };
|
|
|
|
|
- // if trade_status != "tradable" { continue; };
|
|
|
|
|
- // quanto_multiplier是ct_val
|
|
|
|
|
- let symbol = symbol_info["id"].as_str().unwrap().to_string();
|
|
|
|
|
- mul_map.insert(symbol.clone(), Decimal::ONE);
|
|
|
|
|
- symbols.push(symbol)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- for chunk in symbols.chunks(20) {
|
|
|
|
|
- let ws_name = name.to_string();
|
|
|
|
|
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
|
|
- let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
|
|
- let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
|
|
|
|
|
- let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
|
|
-
|
|
|
|
|
- tokio::spawn(async move {
|
|
|
|
|
- let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
|
|
|
|
|
- ws.set_subscribe(vec![
|
|
|
|
|
- GateSpotSubscribeType::PuSpotTrades,
|
|
|
|
|
- GateSpotSubscribeType::PuSpotCandlesticks,
|
|
|
|
|
- // GateSpotSubscribeType::PuFuturesOrderBook,
|
|
|
|
|
- ]);
|
|
|
|
|
-
|
|
|
|
|
- // 建立链接
|
|
|
|
|
- ws.set_symbols(symbols_chunk);
|
|
|
|
|
- ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
- // 定时删除数据
|
|
|
|
|
- tokio::spawn(async move {
|
|
|
|
|
- loop {
|
|
|
|
|
- delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
|
|
|
|
|
- tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// 读取数据
|
|
|
|
|
-pub async fn data_listener(response: ResponseData) {
|
|
|
|
|
- if response.code != 200 {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- match response.channel.as_str() {
|
|
|
|
|
- // 深度数据
|
|
|
|
|
- "spot.order_book" => {
|
|
|
|
|
- let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSpot, &response);
|
|
|
|
|
- let mul_map = MUL_MAP.lock().await;
|
|
|
|
|
- let new_depth = Depth {
|
|
|
|
|
- time: depth.time,
|
|
|
|
|
- symbol: depth.symbol.clone(),
|
|
|
|
|
- asks: depth.asks.iter().map(|ask| OrderBook { price: ask.price, amount: ask.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
|
|
|
|
|
- bids: depth.bids.iter().map(|bid| OrderBook { price: bid.price, amount: bid.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- // 更新到本地数据库
|
|
|
|
|
- // ------ 简易深度数据
|
|
|
|
|
- let simple_depth = SimpleDepth::new(&new_depth);
|
|
|
|
|
- let simple_depth_map = SIMPLE_DEPTH_MAP.lock().await;
|
|
|
|
|
- update_simple_depth(&simple_depth, simple_depth_map, EXCHANGE_NAME).await;
|
|
|
|
|
-
|
|
|
|
|
- // ------ 标准深度数据
|
|
|
|
|
- // let depth_map = DEPTH_MAP.lock().await;
|
|
|
|
|
- // update_depth(&new_depth, depth_map, EXCHANGE_NAME).await;
|
|
|
|
|
- }
|
|
|
|
|
- // 订单流数据
|
|
|
|
|
- "spot.trades" => {
|
|
|
|
|
- let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSpot, &response);
|
|
|
|
|
- let mul_map = MUL_MAP.lock().await;
|
|
|
|
|
-
|
|
|
|
|
- for trade in trades.iter_mut() {
|
|
|
|
|
- // 真实交易量处理,因为gate的量都是张数
|
|
|
|
|
- let mul = mul_map[trade.symbol.as_str()];
|
|
|
|
|
- let mut real_size = trade.size * mul * trade.price;
|
|
|
|
|
- real_size.rescale(2);
|
|
|
|
|
- trade.size = real_size;
|
|
|
|
|
-
|
|
|
|
|
- // 更新到本地数据库
|
|
|
|
|
- let trades_map = TRADES_MAP.lock().await;
|
|
|
|
|
- update_trade(trade, trades_map, EXCHANGE_NAME).await;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- // k线数据
|
|
|
|
|
- "spot.candlesticks" => {
|
|
|
|
|
- let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSpot, &response);
|
|
|
|
|
-
|
|
|
|
|
- let mul_map = MUL_MAP.lock().await;
|
|
|
|
|
- for record in records.iter_mut() {
|
|
|
|
|
- // 真实交易量处理,因为gate的量都是张数
|
|
|
|
|
- let mul = mul_map[record.symbol.as_str()];
|
|
|
|
|
- let mid_price = (record.high + record.low) * dec!(0.5);
|
|
|
|
|
- let mut real_volume = record.volume * mul * mid_price;
|
|
|
|
|
- real_volume.rescale(2);
|
|
|
|
|
- record.volume = real_volume;
|
|
|
|
|
-
|
|
|
|
|
- // 更新到本地数据库
|
|
|
|
|
- let record_map = RECORD_MAP.lock().await;
|
|
|
|
|
- update_record(record, record_map, EXCHANGE_NAME).await;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- _ => {
|
|
|
|
|
- info!("48 未知的数据类型: {:?}", response)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
|
|
+// use std::collections::{BTreeMap, HashMap};
|
|
|
|
|
+// use std::sync::{Arc};
|
|
|
|
|
+// use std::sync::atomic::AtomicBool;
|
|
|
|
|
+// use std::time::Duration;
|
|
|
|
|
+// use lazy_static::lazy_static;
|
|
|
|
|
+// use rust_decimal::Decimal;
|
|
|
|
|
+// use rust_decimal_macros::dec;
|
|
|
|
|
+// use tokio::sync::{Mutex};
|
|
|
|
|
+// use tracing::info;
|
|
|
|
|
+// use exchanges::gate_spot_rest::GateSpotRest;
|
|
|
|
|
+// use exchanges::gate_spot_ws::{GateSpotSubscribeType, GateSpotWs, GateSpotWsType};
|
|
|
|
|
+// use exchanges::response_base::ResponseData;
|
|
|
|
|
+// use serde_json::json;
|
|
|
|
|
+// use standard::{Depth, OrderBook, SimpleDepth};
|
|
|
|
|
+// use standard::exchange::ExchangeEnum;
|
|
|
|
|
+// use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
|
|
+// use crate::json_db_utils::delete_db_by_exchange;
|
|
|
|
|
+// use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
|
|
|
|
|
+//
|
|
|
|
|
+// const EXCHANGE_NAME: &str = "gate_coin_spot";
|
|
|
|
|
+//
|
|
|
|
|
+// lazy_static! {
|
|
|
|
|
+// static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
|
|
|
|
|
+// static ref SIMPLE_DEPTH_MAP: Mutex<SimpleDepthMap> = Mutex::new(HashMap::new());
|
|
|
|
|
+// static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
|
|
|
|
|
+// static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
|
|
|
|
|
+// static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
|
|
+// let name = "gate_coin_spot_listener";
|
|
|
|
|
+// // 订阅所有币种
|
|
|
|
|
+// let login = BTreeMap::new();
|
|
|
|
|
+// let mut gate_rest = GateSpotRest::new(false, login);
|
|
|
|
|
+// let params = json!({});
|
|
|
|
|
+// let response = gate_rest.get_market_details(params).await;
|
|
|
|
|
+// let mut symbols = vec![];
|
|
|
|
|
+// if response.code == 200 {
|
|
|
|
|
+// let symbol_infos = response.data.as_array().unwrap();
|
|
|
|
|
+// let mut mul_map = MUL_MAP.lock().await;
|
|
|
|
|
+// for symbol_info in symbol_infos {
|
|
|
|
|
+// let trade_status = symbol_info["trade_status"].as_str().unwrap();
|
|
|
|
|
+// let quote = symbol_info["quote"].as_str().unwrap();
|
|
|
|
|
+// if trade_status != "tradable" || quote == "USDT" { continue; };
|
|
|
|
|
+// // if trade_status != "tradable" { continue; };
|
|
|
|
|
+// // quanto_multiplier是ct_val
|
|
|
|
|
+// let symbol = symbol_info["id"].as_str().unwrap().to_string();
|
|
|
|
|
+// mul_map.insert(symbol.clone(), Decimal::ONE);
|
|
|
|
|
+// symbols.push(symbol)
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+// for chunk in symbols.chunks(20) {
|
|
|
|
|
+// let ws_name = name.to_string();
|
|
|
|
|
+// let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
|
|
+// let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
|
|
+// let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
|
|
|
|
|
+// let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
|
|
+//
|
|
|
|
|
+// tokio::spawn(async move {
|
|
|
|
|
+// let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
|
|
|
|
|
+// ws.set_subscribe(vec![
|
|
|
|
|
+// GateSpotSubscribeType::PuSpotTrades,
|
|
|
|
|
+// GateSpotSubscribeType::PuSpotCandlesticks,
|
|
|
|
|
+// // GateSpotSubscribeType::PuFuturesOrderBook,
|
|
|
|
|
+// ]);
|
|
|
|
|
+//
|
|
|
|
|
+// // 建立链接
|
|
|
|
|
+// ws.set_symbols(symbols_chunk);
|
|
|
|
|
+// ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
|
|
+// });
|
|
|
|
|
+// }
|
|
|
|
|
+// // 定时删除数据
|
|
|
|
|
+// tokio::spawn(async move {
|
|
|
|
|
+// loop {
|
|
|
|
|
+// delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
|
|
|
|
|
+// tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
|
|
|
|
|
+// }
|
|
|
|
|
+// });
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// // 读取数据
|
|
|
|
|
+// pub async fn data_listener(response: ResponseData) {
|
|
|
|
|
+// if response.code != 200 {
|
|
|
|
|
+// return;
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// match response.channel.as_str() {
|
|
|
|
|
+// // 深度数据
|
|
|
|
|
+// "spot.order_book" => {
|
|
|
|
|
+// let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSpot, &response);
|
|
|
|
|
+// let mul_map = MUL_MAP.lock().await;
|
|
|
|
|
+// let new_depth = Depth {
|
|
|
|
|
+// time: depth.time,
|
|
|
|
|
+// symbol: depth.symbol.clone(),
|
|
|
|
|
+// asks: depth.asks.iter().map(|ask| OrderBook { price: ask.price, amount: ask.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
|
|
|
|
|
+// bids: depth.bids.iter().map(|bid| OrderBook { price: bid.price, amount: bid.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
|
|
|
|
|
+// };
|
|
|
|
|
+//
|
|
|
|
|
+// // 更新到本地数据库
|
|
|
|
|
+// // ------ 简易深度数据
|
|
|
|
|
+// let simple_depth = SimpleDepth::new(&new_depth);
|
|
|
|
|
+// let simple_depth_map = SIMPLE_DEPTH_MAP.lock().await;
|
|
|
|
|
+// update_simple_depth(&simple_depth, simple_depth_map, EXCHANGE_NAME).await;
|
|
|
|
|
+//
|
|
|
|
|
+// // ------ 标准深度数据
|
|
|
|
|
+// // let depth_map = DEPTH_MAP.lock().await;
|
|
|
|
|
+// // update_depth(&new_depth, depth_map, EXCHANGE_NAME).await;
|
|
|
|
|
+// }
|
|
|
|
|
+// // 订单流数据
|
|
|
|
|
+// "spot.trades" => {
|
|
|
|
|
+// let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSpot, &response);
|
|
|
|
|
+// let mul_map = MUL_MAP.lock().await;
|
|
|
|
|
+//
|
|
|
|
|
+// for trade in trades.iter_mut() {
|
|
|
|
|
+// // 真实交易量处理,因为gate的量都是张数
|
|
|
|
|
+// let mul = mul_map[trade.symbol.as_str()];
|
|
|
|
|
+// let mut real_size = trade.size * mul * trade.price;
|
|
|
|
|
+// real_size.rescale(2);
|
|
|
|
|
+// trade.size = real_size;
|
|
|
|
|
+//
|
|
|
|
|
+// // 更新到本地数据库
|
|
|
|
|
+// let trades_map = TRADES_MAP.lock().await;
|
|
|
|
|
+// update_trade(trade, trades_map, EXCHANGE_NAME).await;
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+// // k线数据
|
|
|
|
|
+// "spot.candlesticks" => {
|
|
|
|
|
+// let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSpot, &response);
|
|
|
|
|
+//
|
|
|
|
|
+// let mul_map = MUL_MAP.lock().await;
|
|
|
|
|
+// for record in records.iter_mut() {
|
|
|
|
|
+// // 真实交易量处理,因为gate的量都是张数
|
|
|
|
|
+// let mul = mul_map[record.symbol.as_str()];
|
|
|
|
|
+// let mid_price = (record.high + record.low) * dec!(0.5);
|
|
|
|
|
+// let mut real_volume = record.volume * mul * mid_price;
|
|
|
|
|
+// real_volume.rescale(2);
|
|
|
|
|
+// record.volume = real_volume;
|
|
|
|
|
+//
|
|
|
|
|
+// // 更新到本地数据库
|
|
|
|
|
+// let record_map = RECORD_MAP.lock().await;
|
|
|
|
|
+// update_record(record, record_map, EXCHANGE_NAME).await;
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+// _ => {
|
|
|
|
|
+// info!("48 未知的数据类型: {:?}", response)
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|