Jelajahi Sumber

币安爆仓订单识别

skyffire 9 bulan lalu
induk
melakukan
0e41998198

+ 7 - 0
exchanges/src/binance_swap_ws.rs

@@ -23,6 +23,7 @@ pub enum BinanceSwapSubscribeType {
     PuBookTicker,
     PuAggTrade,
     PuDepth20levels100ms,
+    ForceOrder,
     PuDepthUpdate,
     PuKline,
 }
@@ -111,6 +112,7 @@ impl BinanceSwapWs {
                 BinanceSwapSubscribeType::PuDepth20levels100ms => false,
                 BinanceSwapSubscribeType::PuDepthUpdate => false,
                 BinanceSwapSubscribeType::PuKline => false,
+                BinanceSwapSubscribeType::ForceOrder => false,
             } {
                 return true;
             }
@@ -138,6 +140,9 @@ impl BinanceSwapWs {
             BinanceSwapSubscribeType::PuKline => {
                 format!("{}@kline_1m", symbol)
             }
+            BinanceSwapSubscribeType::ForceOrder => {
+                format!("{}@forceOrder", symbol)
+            }
         }
     }
     //订阅信息生成
@@ -261,6 +266,8 @@ impl BinanceSwapWs {
                 res_data.channel = "depth".to_string();
             } else if channel.contains("@bookTicker") {
                 res_data.channel = "bookTicker".to_string();
+            } else if channel.contains("@forceOrder") {
+                res_data.channel = "forceOrder".to_string();
             } else if channel.contains("@kline") {
                 res_data.channel = "kline".to_string();
             } else {

+ 9 - 1
src/binance_usdt_swap_data_listener.rs

@@ -13,7 +13,7 @@ use standard::exchange::ExchangeEnum;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use standard::{Depth, OrderBook};
 use crate::json_db_utils::delete_db_by_exchange;
-use crate::listener_tools::{DepthMap, RecordMap, TradeMap, update_depth, update_record, update_trade};
+use crate::listener_tools::{DepthMap, RecordMap, TradeMap, update_depth, update_record, update_trade, ForceOrderMap, update_force_order};
 const EXCHANGE_NAME: &str = "binance_usdt_swap";
 
 lazy_static! {
@@ -22,6 +22,7 @@ lazy_static! {
     static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+    static ref FORCE_ORDER_MAP: Mutex<ForceOrderMap> = Mutex::new(HashMap::new());
 }
 
 pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
@@ -56,6 +57,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             ws.set_subscribe(vec![
                 BinanceSwapSubscribeType::PuAggTrade,
                 BinanceSwapSubscribeType::PuKline,
+                BinanceSwapSubscribeType::ForceOrder
             ]);
 
             // 建立链接
@@ -131,6 +133,12 @@ pub async fn data_listener(response: ResponseData) {
                 update_record(record, record_map, EXCHANGE_NAME).await;
             }
         },
+        "forceOrder" => {
+            let force_order = ExchangeStructHandler::force_order_handle(ExchangeEnum::BinanceSwap, &response);
+
+            let force_order_map= FORCE_ORDER_MAP.lock().await;
+            update_force_order(force_order, force_order_map, EXCHANGE_NAME).await;
+        }
         _ => {
             info!("48 未知的数据类型: {:?}", response)
         }

+ 30 - 1
src/json_db_utils.rs

@@ -8,7 +8,7 @@ use tokio::fs::File;
 use tokio::io::AsyncWriteExt;
 use tokio::{fs};
 use tracing::{error, info};
-use standard::{Record, SimpleDepth, SpecialDepth, SpecialTrade};
+use standard::{ForceOrder, Record, SimpleDepth, SpecialDepth, SpecialTrade};
 
 pub async fn write_to_file(json_data: String, file_path: String) {
     // 尝试创建文件路径
@@ -104,6 +104,35 @@ pub async fn collect_special_trades_json(start_timestamp: i64, end_timestamp: i6
     serde_json::to_value(&all_trades).unwrap()
 }
 
+// 将一个时间段范围内的所有ForceOrder返回(以json形式)
+pub async fn collect_force_orders_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
+    let mut force_orders: Vec<ForceOrder> = Vec::new();
+    let filenames = generate_filenames(start_timestamp, end_timestamp, exchange, symbol, "force_order");
+
+    for filename in filenames {
+        let file_path = PathBuf::from(filename.as_str());
+        let file_content = fs::read_to_string(file_path).await;
+
+        // 检查文件内容是否成功读取
+        let mut rsts = if let Ok(content) = file_content {
+            // 尝试反序列化文件内容
+            if let Ok(rsts) = serde_json::from_str::<Vec<ForceOrder>>(&content) {
+                rsts // 成功反序列化,返回结果
+            } else {
+                vec![] // 反序列化失败,返回空 Vec
+            }
+        } else {
+            vec![] // 读取文件失败,返回空 Vec
+        };
+
+        rsts.reverse();
+        // info!("{} 找到 {} 条", filename, trades.len());
+        force_orders.append(&mut rsts);
+    }
+
+    serde_json::to_value(&force_orders).unwrap()
+}
+
 // 将一个时间段范围内的所有Record返回(以json形式)
 pub async fn collect_records_json(start_timestamp: i64, end_timestamp: i64, exchange: &str, symbol: &str) -> Value {
     let mut records = Vec::new();

+ 29 - 3
src/listener_tools.rs

@@ -3,13 +3,14 @@ use std::str::FromStr;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
-use standard::{Depth, Record, SimpleDepth, SpecialDepth, SpecialTrade, Trade};
+use standard::{Depth, ForceOrder, Record, SimpleDepth, SpecialDepth, SpecialTrade, Trade};
 use crate::json_db_utils::{generate_file_path, minute_to_date, write_to_file};
 
 pub type DepthMap = HashMap<String, Vec<SpecialDepth>>;
 pub type SimpleDepthMap = HashMap<String, Vec<SimpleDepth>>;
 pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
 pub type RecordMap = HashMap<String, Record>;
+pub type ForceOrderMap = HashMap<String, Vec<ForceOrder>>;
 
 // 更新订单流数据
 pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, TradeMap>, exchange: &str) {
@@ -21,13 +22,13 @@ pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, Trad
 
             // 如果分钟数不同,则清空列表并添加新的trade
             if last_trade_minutes != new_trade_minutes {
-                let depths_json = serde_json::to_string(trades).unwrap();
+                let trades_json = serde_json::to_string(trades).unwrap();
                 let date_str = minute_to_date(last_trade_minutes);
                 let path = generate_file_path(exchange, date_str.as_str(), new_trade.symbol.as_str(), "trades", last_trade_minutes);
 
                 // info!(?path);
 
-                write_to_file(depths_json, path).await;
+                write_to_file(trades_json, path).await;
 
                 trades.clear();
             }
@@ -63,6 +64,31 @@ pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_,
     records_map.insert(new_record.symbol.clone(), new_record.clone());
 }
 
+// 更新爆仓信息
+pub async fn update_force_order(new_force_order: ForceOrder, mut force_order_map: MutexGuard<'_, ForceOrderMap>, exchange: &str) {
+    if let Some(force_order_list) = force_order_map.get_mut(new_force_order.symbol.as_str()) {
+        if let Some(force_order) = force_order_list.last() {
+            let last_force_minutes = force_order.time.to_i64().unwrap() / 60000;           // 将毫秒转换成分钟数
+            let new_force_minutes = new_force_order.time.to_i64().unwrap() / 60000;        // 同上
+
+            // 如果分钟数不同,则清空列表并添加新的trade
+            if last_force_minutes != new_force_minutes {
+                let force_order_list_json = serde_json::to_string(force_order_list).unwrap();
+                let date_str = minute_to_date(last_force_minutes);
+                let path = generate_file_path(exchange, date_str.as_str(), new_force_order.symbol.as_str(), "force_order", last_force_minutes);
+
+                write_to_file(force_order_list_json, path).await;
+
+                force_order_list.clear();
+            }
+        }
+        force_order_list.push(new_force_order);
+    } else {
+        // 如果该symbol不存在,则创建新的Vec并添加trade
+        force_order_map.insert(new_force_order.symbol.clone(), vec![new_force_order]);
+    }
+}
+
 // 更新深度数据
 pub async fn update_depth(new_depth: &Depth, mut depth_map: MutexGuard<'_, DepthMap>, exchange: &str) {
     if let Some(depths) = depth_map.get_mut(new_depth.symbol.as_str()) {

+ 34 - 1
src/server.rs

@@ -4,7 +4,7 @@ use actix_web::{web, App, HttpResponse, HttpServer, Responder, get};
 use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
-use crate::json_db_utils::{collect_depth_json, collect_records_json, collect_simple_depth_json, collect_special_trades_json, get_symbols_by_exchange};
+use crate::json_db_utils::{collect_depth_json, collect_force_orders_json, collect_records_json, collect_simple_depth_json, collect_special_trades_json, get_symbols_by_exchange};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -117,6 +117,38 @@ async fn get_trades(query: web::Query<SimpleQuery>) -> impl Responder {
     }
 }
 
+#[get("/force_orders")]
+async fn get_force_orders(query: web::Query<SimpleQuery>) -> impl Responder {
+    if query.validate() {
+        let response_data = collect_force_orders_json(
+            query.start_time.clone().unwrap(),
+            query.end_time.clone().unwrap(),
+            query.exchange.clone().unwrap().as_str(),
+            query.symbol.clone().unwrap().as_str()
+        ).await;
+
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询成功".to_string()),
+            code: 200,
+            data: response_data,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    } else {
+        let response = Response {
+            query: serde_json::to_value(&query.into_inner()).unwrap(),
+            msg: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
+            code: 500,
+            data: Value::Null,
+        };
+
+        let json_string = serde_json::to_string(&response).unwrap();
+        HttpResponse::Ok().content_type("application/json").body(json_string)
+    }
+}
+
 #[get("/trades_count")]
 async fn get_trades_count(query: web::Query<ExchangeSpecialQuery>) -> impl Responder {
     if query.validate() {
@@ -388,6 +420,7 @@ pub fn run_server(port: u32, running: Arc<AtomicBool>) {
             .service(get_exchanges)
             .service(get_order_book)
             .service(get_order_book_simple)
+            .service(get_force_orders)
     })
     .bind(addr)
     .expect("Bind port error")

+ 29 - 1
standard/src/binance_swap_handle.rs

@@ -1,9 +1,10 @@
 use std::str::FromStr;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
 use serde_json::Value;
 use exchanges::response_base::ResponseData;
-use crate::{OrderBook, Record, Trade};
+use crate::{ForceOrder, OrderBook, Record, Trade};
 
 // 处理特殊Ticker信息
 // pub fn handle_book_ticker(res_data: &ResponseData) -> SpecialDepth {
@@ -80,4 +81,31 @@ pub fn handle_records(value: &Value) -> Vec<Record> {
         volume,
         symbol,
     }]
+}
+
+// 格式化强平订单
+pub fn handle_force_order(res_data: &ResponseData) -> ForceOrder {
+    let json_value = &res_data.data;
+    let o = &json_value["o"];
+
+    let time = Decimal::from_i64(o["T"].as_i64().unwrap()).unwrap();
+    let symbol = o["s"].as_str().unwrap().replace("USDT", "_USDT");
+    let side = o["S"].as_str().unwrap().to_string();
+    let price = Decimal::from_str(o["p"].as_str().unwrap()).unwrap();
+    let amount = Decimal::from_str(o["z"].as_str().unwrap()).unwrap();
+    let mut value = price * amount;
+    value.rescale(0);
+    value += dec!(0.1);
+
+    if side == "SELL" {
+        value = -value
+    }
+
+    ForceOrder {
+        time,
+        symbol,
+        price,
+        amount,
+        value
+    }
 }

+ 13 - 1
standard/src/exchange_struct_handler.rs

@@ -4,7 +4,7 @@ use rust_decimal::prelude::FromPrimitive;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, coinex_swap_handle, htx_swap_handle, bingx_swap_handle, mexc_swap_handle, okx_swap_handle, bitmart_swap_handle, kucoin_swap_handle, coinsph_swap_handle, phemex_swap_handle, woo_swap_handle, cointr_swap_handle, gate_spot_handle};
+use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, coinex_swap_handle, htx_swap_handle, bingx_swap_handle, mexc_swap_handle, okx_swap_handle, bitmart_swap_handle, kucoin_swap_handle, coinsph_swap_handle, phemex_swap_handle, woo_swap_handle, cointr_swap_handle, gate_spot_handle, ForceOrder};
 use crate::{Record, Ticker, Trade, Depth};
 use crate::{Account, OrderBook, Position, SpecialOrder};
 
@@ -270,4 +270,16 @@ impl ExchangeStructHandler {
             }
         }
     }
+    // 处理爆仓信息
+    pub fn force_order_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> ForceOrder {
+        match exchange {
+            ExchangeEnum::BinanceSwap => {
+                binance_swap_handle::handle_force_order(res_data)
+            }
+            _ => {
+                error!("暂未提供此交易所方法!force_order_handle:{:?}", exchange);
+                panic!("暂未提供此交易所方法!force_order_handle:{:?}", exchange);
+            }
+        }
+    }
 }

+ 9 - 0
standard/src/lib.rs

@@ -317,6 +317,15 @@ impl SpecialOrder {
     }
 }
 
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct ForceOrder {
+    pub time: Decimal,
+    pub symbol: String,
+    pub price: Decimal,
+    pub amount: Decimal,
+    pub value: Decimal,
+}
+
 /// Order结构体(订单)
 /// - `id(String)`: 交易单唯一标识
 /// - `custom_id(String)`: 自定义Id