Эх сурвалжийг харах

修改Okx订阅
开放Okx收集

DESKTOP-NE65RNK\Citrus_limon 4 сар өмнө
parent
commit
539d1d6f8c

+ 7 - 7
exchanges/src/okx_swap_ws.rs

@@ -136,7 +136,7 @@ impl OkxSwapWs {
             // 大写
             *symbol = symbol.to_uppercase();
             // 字符串替换
-            *symbol = symbol.replace("_", "-");
+            *symbol = format!("{}-SWAP", symbol.replace("_", "-"));
         }
         self.symbol_s = b_array;
     }
@@ -158,19 +158,19 @@ impl OkxSwapWs {
     //订阅枚举解析
     pub fn enum_to_string(symbol: String, subscribe_type: OkxSwapSubscribeType) -> Value {
         match subscribe_type {
-            OkxSwapSubscribeType::PuFuturesDepth => {//深度
+            OkxSwapSubscribeType::PuFuturesDepth => { //深度
                 json!([{
                     "channel":"books5",
                     "instId": symbol
                 }])
             }
-            OkxSwapSubscribeType::BuFuturesRecords => {//k线
+            OkxSwapSubscribeType::BuFuturesRecords => { //k线
                 json!([{
                     "channel": "candle1s",
                     "instId": symbol
                 }])
             }
-            OkxSwapSubscribeType::PuFuturesTrades => {//公开成交
+            OkxSwapSubscribeType::PuFuturesTrades => { //公开成交
                 json!([{
                     "channel":"trades",
                     "instId": symbol
@@ -198,9 +198,9 @@ impl OkxSwapWs {
                                              handle_function: F,
                                              write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                              write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
-        where
-            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
-            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    where
+        F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+        Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
     {
         let login_is = self.contains_pr();
         let subscription = self.get_subscription();

+ 145 - 147
src/okx_usdt_swap_data_listener.rs

@@ -1,147 +1,145 @@
-// use std::collections::{BTreeMap, HashMap};
-// use std::str::FromStr;
-// use std::sync::{Arc};
-// use std::sync::atomic::AtomicBool;
-// use std::time::Duration;
-// use lazy_static::lazy_static;
-// use rust_decimal::Decimal;
-// use rust_decimal_macros::dec;
-// use tokio::sync::{Mutex};
-// use tracing::info;
-// use exchanges::okx_swap_rest::OkxSwapRest;
-// use exchanges::okx_swap_ws::{OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
-// use exchanges::response_base::ResponseData;
-// use serde_json::json;
-// use standard::exchange::ExchangeEnum;
-// use standard::exchange_struct_handler::ExchangeStructHandler;
-// use crate::json_db_utils::delete_db_by_exchange;
-// use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
-//
-// const EXCHANGE_NAME: &str = "okx_usdt_swap";
-//
-// lazy_static! {
-//     // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
-//     static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
-//     static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
-//     static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
-// }
-//
-// pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
-//     let name = "okx_usdt_swap_listener";
-//     // 订阅所有币种
-//     let login = BTreeMap::new();
-//     let mut okx_rest = OkxSwapRest::new(false, login);
-//     let params = json!({
-//         "instType":"SWAP"
-//     });
-//     let response = okx_rest.get_market(params).await;
-//     let mut symbols = vec![];
-//     if response.code == 200 {
-//         let symbol_infos = response.data.as_array().unwrap();
-//         let mut mul_map = MUL_MAP.lock().await;
-//         for symbol_info in symbol_infos {
-//             let ct_val_ccy = symbol_info["ctValCcy"].as_str().unwrap();
-//             let settle_ccy = symbol_info["settleCcy"].as_str().unwrap();
-//             if settle_ccy != "USDT" { continue; };
-//             let symbol = format!("{}_{}", ct_val_ccy, settle_ccy);
-//             let mul = Decimal::from_str(symbol_info["ctMult"].as_str().unwrap()).unwrap();
-//             mul_map.insert(symbol.clone(), mul);
-//
-//             symbols.push(symbol)
-//         }
-//     }
-//     for chunk in symbols.chunks(20) {
-//         let bu_ws_name = name.to_string();
-//         let (bu_write_tx, bu_write_rx) = futures_channel::mpsc::unbounded();
-//         let bu_write_tx_am = Arc::new(Mutex::new(bu_write_tx));
-//         let bu_symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-//         let bu_is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-//         tokio::spawn(async move {
-//             let mut ws = OkxSwapWs::new_with_tag(bu_ws_name.clone(), false, None, OkxSwapWsType::Business);
-//             ws.set_subscribe(vec![
-//                 OkxSwapSubscribeType::BuFuturesRecords,
-//             ]);
-//
-//             // 建立链接
-//             ws.set_symbols(bu_symbols_chunk);
-//             ws.ws_connect_async(bu_is_shutdown_clone, data_listener, &bu_write_tx_am, bu_write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-//         });
-//
-//         let pub_ws_name = name.to_string();
-//         let (pub_write_tx, pub_write_rx) = futures_channel::mpsc::unbounded();
-//         let pub_write_tx_am = Arc::new(Mutex::new(pub_write_tx));
-//         let pub_symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
-//         let pub_is_shutdown_clone = Arc::clone(&is_shutdown_arc);
-//         tokio::spawn(async move {
-//             let mut ws = OkxSwapWs::new_with_tag(pub_ws_name.clone(), false, None, OkxSwapWsType::Public);
-//             ws.set_subscribe(vec![
-//                 OkxSwapSubscribeType::PuFuturesTrades,
-//             ]);
-//
-//             // 建立链接
-//             ws.set_symbols(pub_symbols_chunk);
-//             ws.ws_connect_async(pub_is_shutdown_clone, data_listener, &pub_write_tx_am, pub_write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-//         });
-//     }
-//     // 定时删除数据
-//     tokio::spawn(async move {
-//         loop {
-//             delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
-//             tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
-//         }
-//     });
-// }
-//
-// // 读取数据
-// pub async fn data_listener(response: ResponseData) {
-//     if response.code != 200 {
-//         return;
-//     }
-//
-//     match response.channel.as_str() {
-//         // 深度数据
-//         "futures.order_book" => {
-//             // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::OkxSwap, &response);
-//             //
-//             // update_depth(&depth).await;
-//         }
-//         // 订单流数据
-//         "futures.trades" => {
-//             let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::OkxSwap, &response);
-//             let mul_map = MUL_MAP.lock().await;
-//
-//             for trade in trades.iter_mut() {
-//                 // 真实交易量处理,因为okx的量都是张数
-//                 let mul = mul_map[trade.symbol.as_str()];
-//                 let mut real_size = trade.size * mul * trade.price;
-//                 real_size.rescale(2);
-//                 trade.size = real_size;
-//
-//                 // 更新到本地数据库
-//                 let trades_map = TRADES_MAP.lock().await;
-//                 update_trade(trade, trades_map, EXCHANGE_NAME).await;
-//             }
-//         }
-//         // k线数据
-//         "futures.candlesticks" => {
-//             let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::OkxSwap, &response);
-//
-//             let mul_map = MUL_MAP.lock().await;
-//             for record in records.iter_mut() {
-//                 // 真实交易量处理,因为okx的量都是张数
-//                 let mul = mul_map[record.symbol.as_str()];
-//                 let mid_price = (record.high + record.low) * dec!(0.5);
-//                 let mut real_volume = record.volume * mul * mid_price;
-//                 real_volume.rescale(2);
-//                 record.volume = real_volume;
-//
-//                 // 更新到本地数据库
-//                 let record_map = RECORD_MAP.lock().await;
-//                 update_record(record, record_map, EXCHANGE_NAME).await;
-//             }
-//         }
-//         _ => {
-//             info!("48 未知的数据类型: {:?}", response)
-//         }
-//     }
-// }
+use std::collections::{BTreeMap, HashMap};
+use std::str::FromStr;
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use lazy_static::lazy_static;
+use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
+use tokio::sync::{Mutex};
+use tracing::info;
+use exchanges::okx_swap_rest::OkxSwapRest;
+use exchanges::okx_swap_ws::{OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
+use exchanges::response_base::ResponseData;
+use serde_json::json;
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::delete_db_by_exchange;
+use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
+
+const EXCHANGE_NAME: &str = "okx_usdt_swap";
+
+lazy_static! {
+    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "okx_usdt_swap_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut okx_rest = OkxSwapRest::new(false, login);
+    let params = json!({
+        "instType":"SWAP"
+    });
+    let response = okx_rest.get_market(params).await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let symbol_infos = response.data.as_array().unwrap();
+        let mut mul_map = MUL_MAP.lock().await;
+        for symbol_info in symbol_infos {
+            let ct_val_ccy = symbol_info["ctValCcy"].as_str().unwrap();
+            let settle_ccy = symbol_info["settleCcy"].as_str().unwrap();
+            if settle_ccy != "USDT" { continue; };
+            let symbol = format!("{}_{}", ct_val_ccy, settle_ccy);
+            let mul = Decimal::from_str(symbol_info["ctMult"].as_str().unwrap()).unwrap();
+            mul_map.insert(symbol.clone(), mul);
+
+            symbols.push(symbol)
+        }
+    }
+    for chunk in symbols.chunks(20) {
+        let bu_ws_name = name.to_string();
+        let (bu_write_tx, bu_write_rx) = futures_channel::mpsc::unbounded();
+        let bu_write_tx_am = Arc::new(Mutex::new(bu_write_tx));
+        let bu_symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let bu_is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+        tokio::spawn(async move {
+            let mut ws = OkxSwapWs::new_with_tag(bu_ws_name.clone(), false, None, OkxSwapWsType::Business);
+            ws.set_subscribe(vec![
+                OkxSwapSubscribeType::BuFuturesRecords,
+            ]);
+            // 建立链接
+            ws.set_symbols(bu_symbols_chunk);
+            ws.ws_connect_async(bu_is_shutdown_clone, data_listener, &bu_write_tx_am, bu_write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+
+        let pub_ws_name = name.to_string();
+        let (pub_write_tx, pub_write_rx) = futures_channel::mpsc::unbounded();
+        let pub_write_tx_am = Arc::new(Mutex::new(pub_write_tx));
+        let pub_symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let pub_is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+        tokio::spawn(async move {
+            let mut ws = OkxSwapWs::new_with_tag(pub_ws_name.clone(), false, None, OkxSwapWsType::Public);
+            ws.set_subscribe(vec![
+                OkxSwapSubscribeType::PuFuturesTrades,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(pub_symbols_chunk);
+            ws.ws_connect_async(pub_is_shutdown_clone, data_listener, &pub_write_tx_am, pub_write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+        }
+    });
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+    match response.channel.as_str() {
+        // 深度数据
+        "futures.order_book" => {
+            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::OkxSwap, &response);
+            //
+            // update_depth(&depth).await;
+        }
+        // 订单流数据
+        "futures.trades" => {
+            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::OkxSwap, &response);
+            let mul_map = MUL_MAP.lock().await;
+
+            for trade in trades.iter_mut() {
+                // 真实交易量处理,因为okx的量都是张数
+                let mul = mul_map[trade.symbol.as_str()];
+                let mut real_size = trade.size * mul * trade.price;
+                real_size.rescale(2);
+                trade.size = real_size;
+
+                // 更新到本地数据库
+                let trades_map = TRADES_MAP.lock().await;
+                update_trade(trade, trades_map, EXCHANGE_NAME).await;
+            }
+        }
+        // k线数据
+        "futures.candlesticks" => {
+            let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::OkxSwap, &response);
+
+            let mul_map = MUL_MAP.lock().await;
+            for record in records.iter_mut() {
+                // 真实交易量处理,因为okx的量都是张数
+                let mul = mul_map[record.symbol.as_str()];
+                let mid_price = (record.high + record.low) * dec!(0.5);
+                let mut real_volume = record.volume * mul * mid_price;
+                real_volume.rescale(2);
+                record.volume = real_volume;
+
+                // 更新到本地数据库
+                let record_map = RECORD_MAP.lock().await;
+                update_record(record, record_map, EXCHANGE_NAME).await;
+            }
+        }
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+}

+ 1 - 1
src/server.rs

@@ -200,7 +200,6 @@ async fn get_trades_count(query: web::Query<ExchangeSpecialQuery>) -> impl Respo
 #[get("/exchanges")]
 async fn get_exchanges() -> impl Responder {
     let exchanges = vec![
-        // "okx_usdt_swap",
         // "bingx_usdt_swap",
         // "mexc_usdt_swap",
         // "bitmart_usdt_swap",
@@ -213,6 +212,7 @@ async fn get_exchanges() -> impl Responder {
         // "mexc_usdt_swap",
         // "gate_coin_spot",
 
+        "okx_usdt_swap",
         "gate_usdt_swap",
         "binance_usdt_swap",
         "coinex_usdt_swap",

+ 2 - 2
standard/src/okx_swap_handle.rs

@@ -6,7 +6,7 @@ use exchanges::response_base::ResponseData;
 use crate::{OrderBook, Trade, Record};
 
 pub fn handle_records(value: &Value) -> Vec<Record> {
-    let symbol = value["arg"]["instId"].as_str().unwrap().to_string().replace("-", "_");
+    let symbol = value["arg"]["instId"].as_str().unwrap().to_string().replace("-SWAP", "").replace("-", "_");
     let records_list = value["data"].as_array().unwrap();
     let mut records = vec![];
 
@@ -47,7 +47,7 @@ pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
             time: Decimal::from_str(item["ts"].as_str().unwrap()).unwrap(),
             size: if side { size } else { -size },
             price: Decimal::from_str(item["px"].as_str().unwrap().to_string().as_str()).unwrap(),
-            symbol: item["instId"].as_str().unwrap().to_string().replace("-", "_"),
+            symbol: item["instId"].as_str().unwrap().to_string().replace("-SWAP", "").replace("-", "_"),
         })
     }
     trades