Selaa lähdekoodia

添加okx数据中心

DESKTOP-NE65RNK\Citrus_limon 1 vuosi sitten
vanhempi
commit
7f23c9637a

+ 7 - 7
src/main.rs

@@ -34,13 +34,13 @@ async fn main() {
     // ctrl c退出检查程序
     control_c::exit_handler(running.clone());
     // 启动各交易所的数据监听器
-    // binance_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // gate_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // htx_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // bingx_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
+    binance_usdt_swap_data_listener::run_listener(running.clone()).await;
+    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
+    coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    htx_usdt_swap_data_listener::run_listener(running.clone()).await;
+    bingx_usdt_swap_data_listener::run_listener(running.clone()).await;
+    mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     okx_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();

+ 24 - 13
src/okx_usdt_swap_data_listener.rs

@@ -41,7 +41,7 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
         for symbol_info in symbol_infos {
             let ct_val_ccy = symbol_info["ctValCcy"].as_str().unwrap();
             let settle_ccy = symbol_info["settleCcy"].as_str().unwrap();
-
+            if settle_ccy != "USDT" { continue; };
             let symbol = format!("{}_{}", ct_val_ccy, settle_ccy);
             let mul = Decimal::from_str(symbol_info["ctMult"].as_str().unwrap()).unwrap();
             mul_map.insert(symbol.clone(), mul);
@@ -49,25 +49,37 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
             symbols.push(symbol)
         }
     }
-
     for chunk in symbols.chunks(20) {
-        let ws_name = name.to_string();
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-        let write_tx_am = Arc::new(Mutex::new(write_tx));
-        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+        let bu_ws_name = name.to_string();
+        let (bu_write_tx, bu_write_rx) = futures_channel::mpsc::unbounded();
+        let bu_write_tx_am = Arc::new(Mutex::new(bu_write_tx));
+        let bu_symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let bu_is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+        tokio::spawn(async move {
+            let mut ws = OkxSwapWs::new_with_tag(bu_ws_name.clone(), false, None, OkxSwapWsType::Business);
+            ws.set_subscribe(vec![
+                OkxSwapSubscribeType::BuFuturesRecords,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(bu_symbols_chunk);
+            ws.ws_connect_async(bu_is_shutdown_clone, data_listener, &bu_write_tx_am, bu_write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
 
+        let pub_ws_name = name.to_string();
+        let (pub_write_tx, pub_write_rx) = futures_channel::mpsc::unbounded();
+        let pub_write_tx_am = Arc::new(Mutex::new(pub_write_tx));
+        let pub_symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let pub_is_shutdown_clone = Arc::clone(&is_shutdown_arc);
         tokio::spawn(async move {
-            let mut ws = OkxSwapWs::new_with_tag(ws_name, false, None, OkxSwapWsType::Public);
+            let mut ws = OkxSwapWs::new_with_tag(pub_ws_name.clone(), false, None, OkxSwapWsType::Public);
             ws.set_subscribe(vec![
                 OkxSwapSubscribeType::PuFuturesTrades,
-                OkxSwapSubscribeType::PuFuturesRecords,
-                // OkxSwapSubscribeType::PuFuturesOrderBook
             ]);
 
             // 建立链接
-            ws.set_symbols(symbols_chunk);
-            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+            ws.set_symbols(pub_symbols_chunk);
+            ws.ws_connect_async(pub_is_shutdown_clone, data_listener, &pub_write_tx_am, pub_write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         });
     }
 }
@@ -87,7 +99,6 @@ pub async fn data_listener(response: ResponseData) {
         }
         // 订单流数据
         "futures.trades" => {
-            println!("------------------------------- {:?}", response);
             let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::OkxSwap, &response);
             let mul_map = MUL_MAP.lock().await;
 

+ 7 - 1
standard/src/exchange_struct_handler.rs

@@ -4,7 +4,7 @@ use rust_decimal::prelude::FromPrimitive;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, coinex_swap_handle, kucoin_handle, htx_swap_handle, bingx_swap_handle, mexc_swap_handle};
+use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, coinex_swap_handle, kucoin_handle, htx_swap_handle, bingx_swap_handle, mexc_swap_handle, okx_swap_handle};
 use crate::{Record, Ticker, Trade, Depth};
 use crate::{Account, OrderBook, Position, SpecialOrder};
 
@@ -95,6 +95,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::MexcSwap => {
                 mexc_swap_handle::format_trade_items(&res_data)
             }
+            ExchangeEnum::OkxSwap => {
+                okx_swap_handle::format_trade_items(&res_data)
+            }
             _ => {
                 error!("未找到该交易所!trades_handle: {:?}", exchange);
                 panic!("未找到该交易所!trades_handle: {:?}", exchange);
@@ -131,6 +134,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::MexcSwap => {
                 mexc_swap_handle::handle_records(&res_data.data)
             }
+            ExchangeEnum::OkxSwap => {
+                okx_swap_handle::handle_records(&res_data.data)
+            }
             _ => {
                 error!("未找到该交易所!records_handle: {:?}", exchange);
                 panic!("未找到该交易所!records_handle: {:?}", exchange);

+ 31 - 20
standard/src/okx_swap_handle.rs

@@ -1,3 +1,4 @@
+use std::str::FromStr;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
 use serde_json::Value;
@@ -5,16 +6,22 @@ use exchanges::response_base::ResponseData;
 use crate::{OrderBook, Trade, Record};
 
 pub fn handle_records(value: &Value) -> Vec<Record> {
-    let data = value["data"].clone();
-    return vec![Record {
-        time: Decimal::from_i64(data["t"].as_i64().unwrap() * 1000).unwrap(),
-        open: Decimal::from_f64(data["o"].as_f64().unwrap()).unwrap(),
-        high: Decimal::from_f64(data["h"].as_f64().unwrap()).unwrap(),
-        low: Decimal::from_f64(data["l"].as_f64().unwrap()).unwrap(),
-        close: Decimal::from_f64(data["c"].as_f64().unwrap()).unwrap(),
-        volume: Decimal::from_f64(data["q"].as_f64().unwrap()).unwrap(),
-        symbol: data["symbol"].as_str().unwrap().to_string(),
-    }];
+    let symbol = value["arg"]["instId"].as_str().unwrap().to_string().replace("-", "_");
+    let records_list = value["data"].as_array().unwrap();
+    let mut records = vec![];
+
+    for record_value in records_list {
+        records.push(Record {
+            time: Decimal::from_str(record_value[0].as_str().unwrap()).unwrap(),
+            open: Decimal::from_str(record_value[1].as_str().unwrap()).unwrap(),
+            high: Decimal::from_str(record_value[2].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(record_value[3].as_str().unwrap()).unwrap(),
+            close: Decimal::from_str(record_value[4].as_str().unwrap()).unwrap(),
+            volume: Decimal::from_str(record_value[5].as_str().unwrap()).unwrap(),
+            symbol: symbol.clone(),
+        });
+    }
+    records
 }
 
 pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
@@ -29,15 +36,19 @@ pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
 }
 
 pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
-    let result = res_data.data["data"].clone();
+    let result = res_data.data["data"].as_array().unwrap();
+    let mut trades = vec![];
 
-    let side = result["T"].as_i64().unwrap() == 1;
-    let size = Decimal::from_f64(result["v"].as_f64().unwrap()).unwrap();
-    return vec![Trade {
-        id: result["t"].to_string(),
-        time: Decimal::from_i64(result["t"].as_i64().unwrap()).unwrap(),
-        size: if side { size } else { -size },
-        price: Decimal::from_f64(result["p"].as_f64().unwrap()).unwrap(),
-        symbol: res_data.data["symbol"].as_str().unwrap().to_string(),
-    }];
+    for item in result {
+        let side = item["side"].as_str().unwrap() == "buy";
+        let size = Decimal::from_str(item["sz"].as_str().unwrap()).unwrap();
+        trades.push(Trade {
+            id: item["tradeId"].as_str().unwrap().to_string(),
+            time: Decimal::from_str(item["ts"].as_str().unwrap()).unwrap(),
+            size: if side { size } else { -size },
+            price: Decimal::from_str(item["px"].as_str().unwrap().to_string().as_str()).unwrap(),
+            symbol: item["instId"].as_str().unwrap().to_string().replace("-", "_"),
+        })
+    }
+    trades
 }