Browse Source

1. 新增币安数据存放
2. 改进gate的订阅架构,方便以后升级

skyffire 1 year ago
parent
commit
b8a01492a6

+ 2 - 3
exchanges/src/binance_swap_rest.rs

@@ -76,13 +76,12 @@ impl BinanceSwapRest {
     }
     //获取交易规则和交易对
     pub async fn get_exchange_info(&mut self) -> ResponseData {
-        let params = serde_json::json!({
-         });
+        let params = serde_json::json!({});
 
         let data = self.request("GET".to_string(),
                                 "".to_string(),
                                 format!("/fapi/v1/exchangeInfo"),
-                                true,
+                                false,
                                 params.to_string(),
         ).await;
         data

+ 9 - 3
exchanges/src/binance_swap_ws.rs

@@ -23,6 +23,7 @@ pub enum BinanceSwapSubscribeType {
     PuBookTicker,
     PuAggTrade,
     PuDepth20levels100ms,
+    PuKline,
 }
 
 //账号信息
@@ -107,6 +108,7 @@ impl BinanceSwapWs {
                 BinanceSwapSubscribeType::PuBookTicker => false,
                 BinanceSwapSubscribeType::PuAggTrade => false,
                 BinanceSwapSubscribeType::PuDepth20levels100ms => false,
+                BinanceSwapSubscribeType::PuKline => false,
             } {
                 return true;
             }
@@ -128,6 +130,9 @@ impl BinanceSwapWs {
             BinanceSwapSubscribeType::PuBookTicker => {
                 format!("{}@bookTicker", symbol)
             }
+            BinanceSwapSubscribeType::PuKline => {
+                format!("{}@kline_1m", symbol)
+            }
         }
     }
     //订阅信息生成
@@ -144,7 +149,7 @@ impl BinanceSwapWs {
             "method": "SUBSCRIBE",
             "params":  params,
             "id": 1
-            });
+        });
         str.to_string()
     }
     /*******************************************************************************************************/
@@ -245,9 +250,10 @@ impl BinanceSwapWs {
                 res_data.channel = "depth".to_string();
             } else if channel.contains("@bookTicker") {
                 res_data.channel = "bookTicker".to_string();
+            } else if channel.contains("@kline") {
+                res_data.channel = "kline".to_string();
             } else {
-                res_data.code = -1;
-                res_data.channel = "未知的频道".to_string();
+                res_data.channel = channel.to_string();
             }
         } else {
             res_data.code = -1;

+ 104 - 0
src/binance_usdt_swap_data_listener.rs

@@ -0,0 +1,104 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use lazy_static::lazy_static;
+use tokio::sync::{Mutex};
+use tracing::info;
+use exchanges::binance_swap_rest::BinanceSwapRest;
+use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
+use exchanges::response_base::ResponseData;
+use standard::{Record, SpecialTrade};
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::listener_tools::{update_record, update_trade};
+
+// type DepthMap = HashMap<String, Vec<SpecialDepth>>;
+pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+pub type RecordMap = HashMap<String, Record>;
+const EXCHANGE_NAME: &str = "binance_usdt_swap";
+
+lazy_static! {
+    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "binance_usdt_swap_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut binance_rest = BinanceSwapRest::new(false, login);
+    let response = binance_rest.get_exchange_info().await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let data = response.data["symbols"].as_array().unwrap();
+        for info in data {
+            let s = info["symbol"].as_str().unwrap().to_string();
+            if !s.ends_with("USDT") {
+                continue
+            }
+            let symbol = s.replace("USDT", "_USDT");
+            symbols.push(symbol)
+        }
+    }
+    info!(?symbols);
+
+    for chunk in symbols.chunks(20) {
+        let ws_name = name.to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
+            ws.set_subscribe(vec![
+                BinanceSwapSubscribeType::PuAggTrade,
+                BinanceSwapSubscribeType::PuKline,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 深度数据
+        "深度" => {
+            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
+            //
+            // update_depth(&depth).await;
+        },
+        // 订单流数据
+        "aggTrade" => {
+            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, &response);
+
+            for trade in trades.iter() {
+                let trades_map = TRADES_MAP.lock().await;
+
+                update_trade(trade, trades_map, EXCHANGE_NAME).await;
+            }
+        },
+        // k线数据
+        "kline" => {
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BinanceSwap, &response);
+
+            for record in records.iter() {
+                let record_map= RECORD_MAP.lock().await;
+
+                update_record(record, record_map, EXCHANGE_NAME).await;
+            }
+        },
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+}

+ 14 - 8
src/gate_usdt_swap_data_listener.rs

@@ -24,8 +24,6 @@ lazy_static! {
 }
 
 pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let write_tx_am = Arc::new(Mutex::new(write_tx));
     let name = "gate_usdt_swap_listener";
     // 订阅所有币种
     let login = BTreeMap::new();
@@ -40,18 +38,26 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     }
     info!(?symbols);
 
-    tokio::spawn(async move {
-        let mut ws = GateSwapWs::new_with_tag(name.to_string(), false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+    for chunk in symbols.chunks(20) {
+        let ws_name = name.to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = GateSwapWs::new_with_tag(ws_name, false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
             ws.set_subscribe(vec![
                 GateSwapSubscribeType::PuFuturesTrades,
                 GateSwapSubscribeType::PuFuturesCandlesticks,
                 // GateSwapSubscribeType::PuFuturesOrderBook
             ]);
 
-        // 建立链接
-        ws.set_symbols(symbols);
-        ws.ws_connect_async(is_shutdown_arc, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    });
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
 }
 
 // 读取数据

+ 2 - 0
src/main.rs

@@ -4,6 +4,7 @@ mod server;
 mod listener_tools;
 mod bitget_usdt_swap_data_listener;
 mod gate_usdt_swap_data_listener;
+mod binance_usdt_swap_data_listener;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -30,6 +31,7 @@ async fn main() {
     // 启动各交易所的数据监听器
     gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
+    binance_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 47 - 2
standard/src/binance_swap_handle.rs

@@ -1,8 +1,9 @@
 use std::str::FromStr;
 use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
 use serde_json::Value;
-use crate::{OrderBook};
-
+use exchanges::response_base::ResponseData;
+use crate::{OrderBook, Record, Trade};
 
 // 处理特殊Ticker信息
 // pub fn handle_book_ticker(res_data: &ResponseData) -> SpecialDepth {
@@ -36,3 +37,47 @@ pub fn format_depth_items(value: Value) -> Vec<OrderBook> {
     }
     return depth_items;
 }
+
+pub fn format_trade_items(response: &ResponseData) -> Vec<Trade> {
+    let data = response.data.clone();
+
+    let id = data["a"].as_i64().unwrap().to_string();
+    let time = Decimal::from_i64(data["T"].as_i64().unwrap()).unwrap();
+    let is_sell = data["m"].as_bool().unwrap(); // 买方是否是做市方。如true,则此次成交是一个主动卖出单,否则是一个主动买入单。
+    let mut size = Decimal::from_str(data["q"].as_str().unwrap().to_string().as_str()).unwrap();
+    if is_sell {
+        size = -size
+    }
+    let price = Decimal::from_str(data["p"].as_str().unwrap().to_string().as_str()).unwrap();
+    let symbol = data["s"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+
+    vec![Trade {
+        id,
+        time,
+        size,
+        price,
+        symbol,
+    }]
+}
+
+pub fn handle_book_ticker(value: &Value) -> Vec<Record> {
+    let data = value["k"].clone();
+    let time = Decimal::from_i64(data["t"].as_i64().unwrap()).unwrap();
+
+    let open = Decimal::from_str(data["o"].as_str().unwrap().to_string().as_str()).unwrap();
+    let high = Decimal::from_str(data["h"].as_str().unwrap().to_string().as_str()).unwrap();
+    let low = Decimal::from_str(data["l"].as_str().unwrap().to_string().as_str()).unwrap();
+    let close = Decimal::from_str(data["c"].as_str().unwrap().to_string().as_str()).unwrap();
+    let volume = Decimal::from_str(data["q"].as_str().unwrap().to_string().as_str()).unwrap();
+    let symbol = data["s"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+
+    vec![Record {
+        time,
+        open,
+        high,
+        low,
+        close,
+        volume,
+        symbol,
+    }]
+}

+ 6 - 4
standard/src/exchange_struct_handler.rs

@@ -89,6 +89,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::BitgetSwap => {
                 bitget_swap_handle::format_trade_items(&res_data)
             },
+            ExchangeEnum::BinanceSwap => {
+                binance_swap_handle::format_trade_items(&res_data)
+            },
             _ => {
                 error!("未找到该交易所!trades_handle: {:?}", exchange);
                 panic!("未找到该交易所!trades_handle: {:?}", exchange);
@@ -140,10 +143,6 @@ impl ExchangeStructHandler {
             // ExchangeEnum::BinanceSpot => {
             //     binance_spot_handle::handle_special_ticker(res_data)
             // }
-            ExchangeEnum::BinanceSwap => {
-                // binance_swap_handle::handle_book_ticker(res_data)
-                panic!("BinanceSwap records_handle 未实现格式化");
-            }
             ExchangeEnum::KucoinSwap => {
                 // kucoin_handle::handle_book_ticker(res_data)
                 panic!("KucoinSwap records_handle 未实现格式化");
@@ -169,6 +168,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::BitgetSwap => {
                 bitget_swap_handle::handle_records(&res_data.data)
             }
+            ExchangeEnum::BinanceSwap => {
+                binance_swap_handle::handle_book_ticker(&res_data.data)
+            }
         }
     }
     // 处理账号信息