Эх сурвалжийг харах

第一个版本:gate_usdt_swap的深度数据拿到了。

skyffire 1 жил өмнө
parent
commit
2895722b6d

+ 1 - 0
Cargo.toml

@@ -24,4 +24,5 @@ rust_decimal = { version = "1.32.0", features = ["maths"] }
 rust_decimal_macros = "1.32.0"
 futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
 futures-channel = "0.3.28"
+lazy_static = "1.4.0"
 

+ 2 - 0
global/src/log_utils.rs

@@ -75,6 +75,7 @@ pub fn final_init(level: &str, port: u32, app_name: String) -> WorkerGuard {
 
     let fmt_layer = fmt::layer()
         .with_timer(local_time.clone())
+        .with_line_number(true)
         .with_target(true)
         .with_level(true)
         .with_writer(io::stdout)
@@ -82,6 +83,7 @@ pub fn final_init(level: &str, port: u32, app_name: String) -> WorkerGuard {
 
     let file_layer = fmt::layer()
         .with_timer(local_time.clone())
+        .with_line_number(true)
         .with_target(true)
         .with_ansi(false)
         .with_level(true)

+ 89 - 10
src/gate_usdt_swap_data_listener.rs

@@ -1,9 +1,24 @@
+use std::collections::HashMap;
+use std::mem::{size_of, size_of_val};
 use std::sync::{Arc};
 use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use lazy_static::lazy_static;
 use tokio::sync::Mutex;
+use tokio::time::Instant;
 use tracing::info;
 use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
 use exchanges::response_base::ResponseData;
+use standard::{Depth, OrderBook};
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+
+type DepthMap = HashMap<String, Vec<Depth>>;
+
+lazy_static! {
+    static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref LAST_PRINT_TIME: Mutex<Instant> = Mutex::new(Instant::now());
+}
 
 pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
@@ -15,19 +30,83 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
         let mut ws = GateSwapWs::new_with_tag(name.to_string(), false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
             ws.set_subscribe(vec![
                 GateSwapSubscribeType::PuFuturesTrades,
-                GateSwapSubscribeType::PuFuturesBookTicker,
+                GateSwapSubscribeType::PuFuturesCandlesticks,
                 GateSwapSubscribeType::PuFuturesOrderBook
             ]);
 
-        // 读取数据
-        let fun = move |data: ResponseData| {
-            async move {
-                info!(?data);
-            }
-        };
-
         // 建立链接
         ws.set_symbols(symbols);
-        ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        ws.ws_connect_async(is_shutdown_arc, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
     });
-}
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    let mut depth_map = DEPTH_MAP.lock().await;
+
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 深度数据
+        "futures.order_book" => {
+            let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
+
+            depth_map.entry(depth.symbol.clone())
+                .or_insert_with(Vec::new)
+                .push(depth);
+        },
+        // 订单流数据
+        "futures.trades" => {
+
+        },
+        // k线数据
+        "futures.candlesticks" => {
+
+        },
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+
+    let mut last_print_time = LAST_PRINT_TIME.lock().await;
+    if last_print_time.elapsed() >= Duration::from_secs(60) {
+        estimate_depth_map_memory_usage(&depth_map);
+        *last_print_time = Instant::now();
+    }
+}
+
+pub fn estimate_depth_map_memory_usage(depth_map: &DepthMap) {
+    let mut memory_usage_bytes = 0;
+
+    // 估算HashMap固定的开销大小,这个是一个近似值
+    memory_usage_bytes += size_of::<HashMap<String, Vec<Depth>>>();
+
+    for (symbol, depths) in depth_map {
+        // 估算每个symbol字符串的内存使用
+        memory_usage_bytes += size_of_val(symbol.as_str());
+
+        // 估算每个深度向量的内存使用
+        memory_usage_bytes += size_of_val(&depths);
+
+        for depth in depths {
+            // 估算每个Depth结构体的内存使用
+            memory_usage_bytes += size_of::<Depth>();
+
+            // 估算symbol字符串的内存使用
+            memory_usage_bytes += size_of_val(depth.symbol.as_str());
+
+            // 估算asks和bids向量的内存使用
+            memory_usage_bytes += size_of_val(&depth.asks);
+            memory_usage_bytes += size_of_val(&depth.bids);
+
+            // 估算OrderBook结构体在asks和bids向量中的内存使用
+            memory_usage_bytes += depth.asks.len() * size_of::<OrderBook>();
+            memory_usage_bytes += depth.bids.len() * size_of::<OrderBook>();
+        }
+    }
+
+    let memory_usage_mb = memory_usage_bytes as f64 / (1024.0 * 1024.0);
+    info!("Estimated memory usage: {:.2} MB", memory_usage_mb);
+}

+ 4 - 4
standard/src/binance_swap_handle.rs

@@ -3,7 +3,7 @@ use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
 use serde_json::Value;
 use exchanges::response_base::ResponseData;
-use crate::{MarketOrder, SpecialDepth, SpecialTicker};
+use crate::{OrderBook, SpecialDepth, SpecialTicker};
 
 
 // 处理特殊Ticker信息
@@ -28,10 +28,10 @@ pub fn handle_book_ticker(res_data: &ResponseData) -> SpecialDepth {
 }
 
 // 格式化深度信息
-pub fn format_depth_items(value: Value) -> Vec<MarketOrder> {
-    let mut depth_items: Vec<MarketOrder> = vec![];
+pub fn format_depth_items(value: Value) -> Vec<OrderBook> {
+    let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
-        depth_items.push(MarketOrder {
+        depth_items.push(OrderBook {
             price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
             amount: Decimal::from_str(value[1].as_str().unwrap()).unwrap(),
         })

+ 4 - 4
standard/src/bitget_swap_handle.rs

@@ -3,7 +3,7 @@ use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
 use serde_json::Value;
 use exchanges::response_base::ResponseData;
-use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialOrder};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder};
 
 // 处理账号信息
 pub fn handle_account_info(response: &ResponseData, _symbol: &String) -> Account {
@@ -85,10 +85,10 @@ pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
 }
 
 // 格式化深度信息
-pub fn format_depth_items(value: Value) -> Vec<MarketOrder> {
-    let mut depth_items: Vec<MarketOrder> = vec![];
+pub fn format_depth_items(value: Value) -> Vec<OrderBook> {
+    let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
-        depth_items.push(MarketOrder {
+        depth_items.push(OrderBook {
             price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
             amount: Decimal::from_str(value[1].as_str().unwrap()).unwrap(),
         })

+ 4 - 4
standard/src/bybit_swap_handle.rs

@@ -5,7 +5,7 @@ use rust_decimal_macros::dec;
 use serde_json::{from_value, Value};
 use tracing::{error};
 use exchanges::response_base::ResponseData;
-use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -156,11 +156,11 @@ pub fn handle_ticker(res_data: &ResponseData) -> SpecialDepth {
     }
 }
 
-pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
-    let mut depth_items: Vec<MarketOrder> = vec![];
+pub fn format_depth_items(value: serde_json::Value) -> Vec<OrderBook> {
+    let mut depth_items: Vec<OrderBook> = vec![];
     for val in value.as_array().unwrap() {
         let arr = val.as_array().unwrap();
-        depth_items.push(MarketOrder {
+        depth_items.push(OrderBook {
             price: Decimal::from_str(arr[0].as_str().unwrap()).unwrap(),
             amount: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
         })

+ 7 - 4
standard/src/exchange_struct_handler.rs

@@ -5,7 +5,7 @@ use tracing::{error, info};
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
 use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, Depth};
-use crate::{Account, MarketOrder, Position, SpecialDepth, SpecialOrder};
+use crate::{Account, OrderBook, Position, SpecialDepth, SpecialOrder};
 
 #[allow(dead_code)]
 pub struct ExchangeStructHandler;
@@ -13,10 +13,11 @@ pub struct ExchangeStructHandler;
 #[allow(dead_code)]
 impl ExchangeStructHandler {
     // 处理深度信息
-    pub fn depth_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Depth {
-        let depth_asks: Vec<MarketOrder>;
-        let depth_bids: Vec<MarketOrder>;
+    pub fn order_book_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Depth {
+        let depth_asks: Vec<OrderBook>;
+        let depth_bids: Vec<OrderBook>;
         let t: Decimal;
+        let mut symbol = String::new();
 
         match exchange {
             // ExchangeEnum::BinanceSpot => {
@@ -34,6 +35,7 @@ impl ExchangeStructHandler {
                 depth_asks = gate_swap_handle::format_depth_items(&res_data.data["asks"]);
                 depth_bids = gate_swap_handle::format_depth_items(&res_data.data["bids"]);
                 t = Decimal::from_str(&res_data.data["t"].to_string()).unwrap();
+                symbol = res_data.data["contract"].as_str().unwrap().to_string()
             }
             ExchangeEnum::KucoinSwap => {
                 depth_asks = kucoin_handle::format_depth_items(res_data.data["asks"].clone());
@@ -74,6 +76,7 @@ impl ExchangeStructHandler {
             asks: depth_asks,
             bids: depth_bids,
             time: t,
+            symbol: symbol,
         }
     }
     // 处理Ticker信息

+ 4 - 4
standard/src/gate_swap_handle.rs

@@ -5,7 +5,7 @@ use rust_decimal_macros::dec;
 use serde_json::Value;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
-use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -138,10 +138,10 @@ pub fn handle_book_ticker(res_data: &ResponseData) -> SpecialDepth {
     }
 }
 
-pub fn format_depth_items(value: &Value) -> Vec<MarketOrder> {
-    let mut depth_items: Vec<MarketOrder> = vec![];
+pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
+    let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
-        depth_items.push(MarketOrder {
+        depth_items.push(OrderBook {
             price: Decimal::from_str(value["p"].as_str().unwrap()).unwrap(),
             amount: Decimal::from_f64(value["s"].as_f64().unwrap()).unwrap(),
         })

+ 4 - 4
standard/src/kucoin_handle.rs

@@ -4,7 +4,7 @@ use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
 use serde_json::Value;
 use exchanges::response_base::ResponseData;
-use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker, utils};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker, utils};
 use crate::exchange::ExchangeEnum;
 
 // 处理账号信息
@@ -131,10 +131,10 @@ pub fn format_order_item(order: &Value, ct_val: Decimal) -> Order {
     }
 }
 
-pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
-    let mut depth_items: Vec<MarketOrder> = vec![];
+pub fn format_depth_items(value: serde_json::Value) -> Vec<OrderBook> {
+    let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {
-        depth_items.push(MarketOrder {
+        depth_items.push(OrderBook {
             price: Decimal::from_str(value[0].as_str().unwrap()).unwrap(),
             amount: Decimal::from_f64(value[1].as_f64().unwrap()).unwrap(),
         })

+ 13 - 9
standard/src/lib.rs

@@ -3,6 +3,7 @@ use std::io::{Error};
 use async_trait::async_trait;
 use rust_decimal::Decimal;
 use serde_json::Value;
+use serde::{Serialize, Deserialize};
 
 use crate::exchange::ExchangeEnum;
 
@@ -79,19 +80,22 @@ impl Account {
 
 /// Depth结构体(市场深度)
 /// - `time(Decimal)`: 深度更新时间戳(ms);
+/// - `symbol(String)`: 币对符号;
 /// - `asks(Vec<MarketOrder>)`: 卖方深度列表;
 /// - `bids(Vec<MarketOrder>)`: 买方深度列表;
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct Depth {
     pub time: Decimal,
-    pub asks: Vec<MarketOrder>,
-    pub bids: Vec<MarketOrder>,
+    pub symbol: String,
+    pub asks: Vec<OrderBook>,
+    pub bids: Vec<OrderBook>,
 }
 
 impl Depth {
     pub fn new() -> Depth {
         Depth {
             time: Decimal::ZERO,
+            symbol: String::new(),
             asks: vec![],
             bids: vec![],
         }
@@ -152,18 +156,18 @@ impl SpecialTicker {
     }
 }
 
-/// MarketOrder结构体(市场深度单)
+/// OrderBook结构体(市场深度单)
 /// - `price(Decimal)`: 价格
 /// - `amount(Decimal)`: 数量
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct MarketOrder {
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct OrderBook {
     pub price: Decimal,
     pub amount: Decimal,
 }
 
-impl MarketOrder {
-    pub fn new() -> MarketOrder {
-        MarketOrder {
+impl OrderBook {
+    pub fn new() -> OrderBook {
+        OrderBook {
             price: Default::default(),
             amount: Default::default(),
         }