Browse Source

okx交易所基础部分开发完毕,准备测试。

skyfffire 1 year ago
parent
commit
f0d4bfed66
4 changed files with 108 additions and 18 deletions
  1. 7 0
      strategy/src/exchange_disguise.rs
  2. 92 16
      strategy/src/okx_usdt_swap.rs
  3. 4 1
      strategy/src/quant.rs
  4. 5 1
      strategy/src/utils.rs

+ 7 - 0
strategy/src/exchange_disguise.rs

@@ -11,6 +11,7 @@ use crate::bitget_spot::bitget_spot_run;
 use crate::gate_swap::gate_swap_run;
 use crate::kucoin_spot::kucoin_spot_run;
 use crate::kucoin_swap::kucoin_swap_run;
+use crate::okx_usdt_swap::okex_swap_run;
 use crate::quant::Quant;
 
 // 交易交易所启动
@@ -28,6 +29,9 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>,
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, true, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
+        "okex_usdt_swap" => {
+            okex_swap_run(bool_v1,false, quant_arc, name, symbols, is_colo, exchange_params).await;
+        },
         "bitget_spot" => {
             bitget_spot_run(bool_v1,false, quant_arc, name, symbols, is_colo, exchange_params).await;
         }
@@ -56,6 +60,9 @@ pub async fn run_reference_exchange(bool_v1: Arc<AtomicBool>,
         "gate_usdt_swap" => {
             gate_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
+        "okex_usdt_swap" => {
+            okex_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
+        },
         "kucoin_usdt_swap" => {
             kucoin_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },

+ 92 - 16
strategy/src/okx_usdt_swap.rs

@@ -1,21 +1,89 @@
-// use std::collections::BTreeMap;
-// use std::sync::Arc;
-// use std::sync::atomic::AtomicBool;
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
 // use rust_decimal::Decimal;
-// use tokio::sync::Mutex;
+use tokio::sync::Mutex;
+use tracing::info;
+use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
 // use exchanges::response_base::ResponseData;
-// use crate::quant::Quant;
-
-// pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
-//                            is_trade: bool,
-//                            quant_arc: Arc<Mutex<Quant>>,
-//                            name: String,
-//                            symbols: Vec<String>,
-//                            is_colo: bool,
-//                            exchange_params: BTreeMap<String, String>) {
-//
-// }
-//
+use crate::quant::Quant;
+
+pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
+                           is_trade: bool,
+                           _quant_arc: Arc<Mutex<Quant>>,
+                           name: String,
+                           symbols: Vec<String>,
+                           is_colo: bool,
+                           exchange_params: BTreeMap<String, String>) {
+    // 启动公共频道
+    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+    let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
+
+    let mut ws_public = OkxSwapWs::new_label(name.clone(), is_colo, None, OkxSwapWsType::Public);
+    ws_public.set_symbols(symbols.clone());
+    if is_trade {
+        ws_public.set_subscribe(vec![
+            OkxSwapSubscribeType::PuBooks5
+        ])
+    } else {
+        ws_public.set_subscribe(vec![
+            OkxSwapSubscribeType::PuBooks50L2tbt
+        ])
+    }
+    // 挂起公共ws
+    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+    let bool_clone_public = Arc::clone(&bool_v1);
+    tokio::spawn(async move {
+        ws_public.ws_connect_async(bool_clone_public,
+                                   &write_tx_am_public,
+                                   write_rx_public,
+                                   read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+    // 接收public数据
+    tokio::spawn(async move {
+        loop {
+            if let Some(public_data) = read_rx_public.next().await {
+                info!(?public_data);
+            }
+        }
+    });
+
+    if is_trade {
+        let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
+        let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
+        let auth = Some(parse_btree_map_to_okx_swap_login(exchange_params));
+
+        let mut ws_private = OkxSwapWs::new_label(name.clone(), is_colo, auth, OkxSwapWsType::Private);
+        ws_private.set_symbols(symbols.clone());
+        ws_private.set_subscribe(vec![
+            OkxSwapSubscribeType::PrBalanceAndPosition,
+            OkxSwapSubscribeType::PrAccount("USDT".to_string()),
+            OkxSwapSubscribeType::PrOrders
+        ]);
+
+
+        // 挂起私有ws
+        let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+        let bool_clone_private = Arc::clone(&bool_v1);
+        tokio::spawn(async move {
+            ws_private.ws_connect_async(bool_clone_private,
+                                &write_tx_am_private,
+                                write_rx_private,
+                                read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+
+        // 接收private信息
+        tokio::spawn(async move {
+            loop {
+                if let Some(private_data) = read_rx_private.next().await {
+                    info!(?private_data);
+                }
+            }
+        });
+    }
+}
+
 // async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
 //                  update_flag_u: &mut Decimal,
 //                  multiplier: Decimal,
@@ -25,3 +93,11 @@
 //                  data: ResponseData) {
 //
 // }
+
+fn parse_btree_map_to_okx_swap_login(exchange_params: BTreeMap<String, String>) -> OkxSwapLogin {
+    OkxSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        secret_key: exchange_params.get("secret_key").unwrap().clone(),
+        passphrase: exchange_params.get("pass_key").unwrap().clone(),
+    }
+}

+ 4 - 1
strategy/src/quant.rs

@@ -20,7 +20,7 @@ use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
-use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, BitgetSpot, GateSpot, GateSwap, KucoinSwap};
+use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, BitgetSpot, GateSpot, GateSwap, KucoinSwap, OkxSwap};
 
 use crate::model::{LocalPosition, OrderInfo, TokenParam, TraderMsg};
 use crate::predictor::Predictor;
@@ -207,6 +207,9 @@ impl Quant {
                 "bitget_spot" => {
                     Exchange::new(BitgetSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
                 }
+                "okex_usdt_swap" => {
+                    Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
                 _ => {
                     error!("203未找到对应的交易所rest枚举!");
                     panic!("203未找到对应的交易所rest枚举!");

+ 5 - 1
strategy/src/utils.rs

@@ -61,6 +61,8 @@ pub fn get_limit_requests_num_per_second(exchange: String) -> i64 {
         return public_params::COINEX_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("coinex_spot") {
         return public_params::COINEX_SPOT_LIMIT * public_params::RATIO;
+    } else if exchange.eq("okex_usdt_swap") {
+        return public_params::OKEX_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT * public_params::RATIO;
     } else {
@@ -87,6 +89,8 @@ pub fn get_limit_order_requests_num_per_second(exchange: String) -> i64 {
         return public_params::COINEX_USDT_SWAP_LIMIT
     } else if exchange.eq("coinex_spot") {
         return public_params::COINEX_SPOT_LIMIT
+    } else if exchange.eq("okex_usdt_swap") {
+        return public_params::OKEX_USDT_SWAP_LIMIT
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT
     } else {
@@ -144,6 +148,6 @@ mod tests {
         println!("timestamp: {}", now.timestamp());
         println!("timestamp_millis: {}", now.timestamp_millis());
         println!("timestamp_micros: {}", now.timestamp_micros());
-        println!("timestamp_nanos: {}", now.timestamp_nanos());
+        println!("timestamp_nanos: {}", now.timestamp_nanos_opt().unwrap());
     }
 }