|
|
@@ -1,18 +1,25 @@
|
|
|
use std::collections::{BTreeMap, HashMap};
|
|
|
use std::sync::{Arc};
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
+use std::time::Duration;
|
|
|
use lazy_static::lazy_static;
|
|
|
-use tokio::sync::{Mutex};
|
|
|
+use rust_decimal::Decimal;
|
|
|
+use rust_decimal_macros::dec;
|
|
|
+use serde_json::{json, Value};
|
|
|
+use tokio::sync::{Mutex, MutexGuard};
|
|
|
use tracing::info;
|
|
|
use exchanges::binance_swap_rest::BinanceSwapRest;
|
|
|
use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
|
|
|
use exchanges::response_base::ResponseData;
|
|
|
use standard::exchange::ExchangeEnum;
|
|
|
use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
+use standard::{Depth, OrderBook, SpecialDepth};
|
|
|
use crate::listener_tools::{DepthMap, RecordMap, TradeMap, update_depth, update_record, update_trade};
|
|
|
const EXCHANGE_NAME: &str = "binance_usdt_swap";
|
|
|
|
|
|
lazy_static! {
|
|
|
+ static ref LOCAL_DEPTH: Mutex<HashMap<String, Depth>> = Mutex::new(HashMap::new()); // 本地缓存的订单簿
|
|
|
+
|
|
|
static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
|
|
|
static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
|
|
|
static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
|
|
|
@@ -23,56 +30,78 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
|
|
|
// 订阅所有币种
|
|
|
let login = BTreeMap::new();
|
|
|
let mut binance_rest = BinanceSwapRest::new(false, login);
|
|
|
- let response = binance_rest.get_exchange_info().await;
|
|
|
- let mut symbols = vec![];
|
|
|
- if response.code == 200 {
|
|
|
- let data = response.data["symbols"].as_array().unwrap();
|
|
|
- for info in data {
|
|
|
- let s = info["symbol"].as_str().unwrap().to_string();
|
|
|
- if !s.ends_with("USDT") {
|
|
|
- continue
|
|
|
+ // let response = binance_rest.get_exchange_info().await;
|
|
|
+ // let mut symbols = vec![];
|
|
|
+ // if response.code == 200 {
|
|
|
+ // let data = response.data["symbols"].as_array().unwrap();
|
|
|
+ // for info in data {
|
|
|
+ // let s = info["symbol"].as_str().unwrap().to_string();
|
|
|
+ // if !s.ends_with("USDT") {
|
|
|
+ // continue
|
|
|
+ // }
|
|
|
+ // let symbol = s.replace("USDT", "_USDT");
|
|
|
+ // symbols.push(symbol)
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // info!(?symbols);
|
|
|
+ //
|
|
|
+ // for chunk in symbols.chunks(20) {
|
|
|
+ // let ws_name = name.to_string();
|
|
|
+ // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
+ // let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
+ // let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
|
|
|
+ // let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
+ //
|
|
|
+ // tokio::spawn(async move {
|
|
|
+ // let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
|
|
|
+ // ws.set_subscribe(vec![
|
|
|
+ // BinanceSwapSubscribeType::PuAggTrade,
|
|
|
+ // BinanceSwapSubscribeType::PuKline,
|
|
|
+ // ]);
|
|
|
+ //
|
|
|
+ // // 建立链接
|
|
|
+ // ws.set_symbols(symbols_chunk);
|
|
|
+ // ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
+ // });
|
|
|
+ // }
|
|
|
+
|
|
|
+ // 2024-5-7任务 单独订阅ETH的全档位深度数据
|
|
|
+ let depth_symbols = vec!["ETH_USDT".to_string()];
|
|
|
+ // 每60s初始化一次深度信息
|
|
|
+ let symbols_clone = depth_symbols.clone();
|
|
|
+ tokio::spawn(async move {
|
|
|
+ loop {
|
|
|
+ for depth_symbol in &symbols_clone {
|
|
|
+ let formated_str = depth_symbol.replace("_", "");
|
|
|
+ let mut response = binance_rest.get_order_book(formated_str.as_str(), 1000).await;
|
|
|
+
|
|
|
+ response.channel = "depthInit".to_string();
|
|
|
+ response.data["a"] = response.data["asks"].clone();
|
|
|
+ response.data["b"] = response.data["bids"].clone();
|
|
|
+ response.data["s"] = json!(formated_str.as_str());
|
|
|
+ response.data["asks"] = Value::Null;
|
|
|
+ response.data["bids"] = Value::Null;
|
|
|
+
|
|
|
+ data_listener(response).await;
|
|
|
}
|
|
|
- let symbol = s.replace("USDT", "_USDT");
|
|
|
- symbols.push(symbol)
|
|
|
+
|
|
|
+ tokio::time::sleep(Duration::from_secs(10)).await;
|
|
|
}
|
|
|
- }
|
|
|
- info!(?symbols);
|
|
|
-
|
|
|
- for chunk in symbols.chunks(20) {
|
|
|
- let ws_name = name.to_string();
|
|
|
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
- let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
- let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
|
|
|
- let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
-
|
|
|
- tokio::spawn(async move {
|
|
|
- let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
|
|
|
- ws.set_subscribe(vec![
|
|
|
- BinanceSwapSubscribeType::PuAggTrade,
|
|
|
- BinanceSwapSubscribeType::PuKline,
|
|
|
- ]);
|
|
|
-
|
|
|
- // 建立链接
|
|
|
- ws.set_symbols(symbols_chunk);
|
|
|
- ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
- });
|
|
|
- }
|
|
|
+ });
|
|
|
+ let ws_name = name.to_string();
|
|
|
+ let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
+ let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
+ let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
+ tokio::spawn(async move {
|
|
|
+ let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
|
|
|
+ ws.set_subscribe(vec![
|
|
|
+ BinanceSwapSubscribeType::PuDepthUpdate
|
|
|
+ ]);
|
|
|
|
|
|
- // // 2024-5-7任务 单独订阅ETH的全档位深度数据
|
|
|
- // let ws_name = name.to_string();
|
|
|
- // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
- // let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
- // let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
|
|
|
- // tokio::spawn(async move {
|
|
|
- // let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
|
|
|
- // ws.set_subscribe(vec![
|
|
|
- // BinanceSwapSubscribeType::PuDepth20levels100ms
|
|
|
- // ]);
|
|
|
- //
|
|
|
- // // 建立链接
|
|
|
- // ws.set_symbols(vec!["ETH_USDT".to_string()]);
|
|
|
- // ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
- // });
|
|
|
+ // 建立链接
|
|
|
+ ws.set_symbols(depth_symbols);
|
|
|
+ ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
// 读取数据
|
|
|
@@ -81,14 +110,39 @@ pub async fn data_listener(response: ResponseData) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- match response.channel.as_str() {
|
|
|
- // 深度数据
|
|
|
- "深度" => {
|
|
|
- let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
|
|
|
- let depth_map = DEPTH_MAP.lock().await;
|
|
|
+ // 更新的增量深度数据特殊处理
|
|
|
+ let mut channel = response.channel.clone();
|
|
|
+ if channel.contains("@depth@100ms") {
|
|
|
+ channel = "depthUpdate".to_string();
|
|
|
+ }
|
|
|
+
|
|
|
+ match channel.as_str() {
|
|
|
+ // 初始化的深度数据
|
|
|
+ "depthInit" => {
|
|
|
+ let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::BinanceSwap, &response);
|
|
|
|
|
|
+ // 更新本地队列
|
|
|
+ let mut local_depth_map = LOCAL_DEPTH.lock().await;
|
|
|
+ local_depth_map.insert(depth.symbol.clone(), depth.clone());
|
|
|
+
|
|
|
+ // 标准化后进行深度更新逻辑
|
|
|
+ let depth_map = DEPTH_MAP.lock().await;
|
|
|
update_depth(&depth, depth_map, EXCHANGE_NAME).await;
|
|
|
},
|
|
|
+ // 增量深度数据
|
|
|
+ "depthUpdate" => {
|
|
|
+ let update = ExchangeStructHandler::order_book_handle(ExchangeEnum::BinanceSwap, &response);
|
|
|
+ // 更新本地队列
|
|
|
+ let local_depth_map = LOCAL_DEPTH.lock().await;
|
|
|
+ if local_depth_map.contains_key(update.symbol.as_str()) {
|
|
|
+ merge_depths(local_depth_map, &update);
|
|
|
+
|
|
|
+ // 标准化后进行深度更新逻辑
|
|
|
+ let updated_local_depth = LOCAL_DEPTH.lock().await.get(update.symbol.as_str()).unwrap().clone();
|
|
|
+ let depth_map = DEPTH_MAP.lock().await;
|
|
|
+ update_depth(&updated_local_depth, depth_map, EXCHANGE_NAME).await;
|
|
|
+ }
|
|
|
+ },
|
|
|
// 订单流数据
|
|
|
"aggTrade" => {
|
|
|
let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, &response);
|
|
|
@@ -114,3 +168,44 @@ pub async fn data_listener(response: ResponseData) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+fn merge_depths(mut local_depth_map: MutexGuard<'_, HashMap<String, Depth>>, update: &Depth) {
|
|
|
+ if let Some(local) = local_depth_map.get_mut(update.symbol.as_str()) {
|
|
|
+ local.time = update.time;
|
|
|
+
|
|
|
+ // Merge asks
|
|
|
+ merge_order_books(&mut local.asks, &update.asks, true);
|
|
|
+
|
|
|
+ // Merge bids
|
|
|
+ merge_order_books(&mut local.bids, &update.bids, false);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+fn merge_order_books(local_books: &mut Vec<OrderBook>, update_books: &Vec<OrderBook>, is_asc: bool) {
|
|
|
+ for update_book in update_books {
|
|
|
+ if update_book.amount.eq(&Decimal::ZERO) {
|
|
|
+ // Remove the order book at this price point if amount is zero
|
|
|
+ local_books.retain(|book| book.price != update_book.price);
|
|
|
+ } else {
|
|
|
+ let mut found = false;
|
|
|
+ for local_book in local_books.iter_mut() {
|
|
|
+ if local_book.price == update_book.price {
|
|
|
+ // Update the amount for this price
|
|
|
+ local_book.amount = update_book.amount;
|
|
|
+ found = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !found {
|
|
|
+ // Add new order book if not found
|
|
|
+ local_books.push(update_book.clone());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if is_asc {
|
|
|
+ local_books.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
|
|
|
+ } else {
|
|
|
+ local_books.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
|
|
|
+ }
|
|
|
+}
|