Bläddra i källkod

bybit ws 新增

hl 1 år sedan
förälder
incheckning
69c00a39d1
4 ändrade filer med 485 tillägg och 77 borttagningar
  1. 325 0
      exchanges/src/bybit_swap_ws.rs
  2. 1 0
      exchanges/src/lib.rs
  3. 2 2
      exchanges/src/response_base.rs
  4. 157 75
      exchanges/tests/bybit_swap_test.rs

+ 325 - 0
exchanges/src/bybit_swap_ws.rs

@@ -0,0 +1,325 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use chrono::Utc;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::json;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{info, trace};
+
+use ring::hmac;
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+//类型
+pub enum BybitSwapWsType {
+    Public,
+    Private,
+}
+
+//订阅频道
+#[derive(Clone)]
+pub enum BybitSwapSubscribeType {
+    PuOrderBook1,
+    PuOrderBook50,
+    PuBlicTrade,
+    PuTickers,
+    PuKline(String),
+
+    PrPosition,
+    PrExecution,
+    PrOrder,
+    PrWallet,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BybitSwapLogin {
+    pub api_key: String,
+    pub secret_key: String,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BybitSwapWs {
+    //类型
+    label: String,
+    //地址
+    address_url: String,
+    //账号
+    login_param: Option<BybitSwapLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<BybitSwapSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl BybitSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
+        return BybitSwapWs::new_label("default-BybitSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            BybitSwapWsType::Public => {
+                "wss://stream.bybit.com/v5/public/linear?max_alive_time=1m".to_string()
+            }
+            BybitSwapWsType::Private => {
+                "wss://stream.bybit.com/v5/private?max_alive_time=1m".to_string()
+            }
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        BybitSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 10,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BybitSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+            *symbol = symbol.replace("-", "");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                BybitSwapSubscribeType::PuOrderBook1 => false,
+                BybitSwapSubscribeType::PuOrderBook50 => false,
+                BybitSwapSubscribeType::PuBlicTrade => false,
+                BybitSwapSubscribeType::PuTickers => false,
+                BybitSwapSubscribeType::PuKline(_) => false,
+
+                BybitSwapSubscribeType::PrPosition => true,
+                BybitSwapSubscribeType::PrExecution => true,
+                BybitSwapSubscribeType::PrOrder => true,
+                BybitSwapSubscribeType::PrWallet => true,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: BybitSwapSubscribeType) -> String {
+        match subscribe_type {
+            BybitSwapSubscribeType::PuOrderBook1 => {
+                format!("orderbook.1.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuOrderBook50 => {
+                format!("orderbook.50.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuBlicTrade => {
+                format!("publicTrade.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuTickers => {
+                format!("tickers.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuKline(t) => {
+                format!("kline.{}.{}", t, symbol)
+            }
+
+            BybitSwapSubscribeType::PrPosition => {
+                format!("position")
+            }
+            BybitSwapSubscribeType::PrExecution => {
+                format!("execution")
+            }
+            BybitSwapSubscribeType::PrOrder => {
+                format!("order")
+            }
+            BybitSwapSubscribeType::PrWallet => {
+                format!("wallet")
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> String {
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                params.push(ty_str);
+            }
+        }
+
+        let str = json!({
+                "op": "subscribe",
+                "args": params
+            });
+        str.to_string()
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async(&mut self,
+                                  bool_v1: Arc<AtomicBool>,
+                                  write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                  write_rx: UnboundedReceiver<Message>,
+                                  read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        let timestamp = Utc::now().timestamp_millis();
+        let login_param = self.login_param.clone();
+        let (api_key, secret_key) = match self.login_param.clone() {
+            None => { ("".to_string(), "".to_string()) }
+            Some(p) => {
+                (p.api_key.clone().to_string(), p.secret_key.clone().to_string())
+            }
+        };
+        let heartbeat_time = self.heartbeat_time.clone();
+        trace!("{:?}",format!("{}",subscription));
+
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        if login_is {
+            let expires = timestamp + 1000;
+            let message = format!("GET/realtime{}", expires);
+
+            let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+            let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+            let signature = hex::encode(result.as_ref());
+
+            //登录相关
+            let str = json!({
+                "op": "auth",
+                "args": [api_key, expires, signature]
+            });
+            subscribe_array.push(str.to_string());
+        }
+        subscribe_array.push(subscription.to_string());
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            trace!("线程-异步链接-开始");
+            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                             label.clone(), subscribe_array,
+                                             write_rx, read_tx,
+                                             Self::message_text,
+                                             Self::message_ping,
+                                             Self::message_pong,
+            ).await.expect("币安-期货");
+            trace!("线程-异步链接-结束");
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData {
+        // trace!("原始数据");
+        // trace!(?text);
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+        if json_value.get("success").is_some() {
+            //订阅内容
+            let success = json_value["success"].as_bool().unwrap();
+            let ret_msg = json_value["ret_msg"].as_str().unwrap();
+            let op = json_value["op"].as_str().unwrap();
+            let success_error = if success {
+                "成功"
+            } else {
+                "失败"
+            };
+
+            if op == "auth" {
+                res_data.code = "-200".to_string();
+                res_data.message =  format!("登录{}",success_error);
+            } else if op == "subscribe" {
+                res_data.message =  format!("订阅{}",success_error);
+                res_data.code = "-201".to_string();
+            } else {
+                res_data.code = "".to_string();
+                res_data.channel = "未知订阅".to_string();
+            }
+        } else if json_value.get("topic").is_some() && json_value.get("data").is_some() {
+            let channel = json_value["topic"].to_string();
+            res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
+            res_data.code = "200".to_string();
+
+
+            if channel.contains("orderbook") {
+                res_data.channel = "orderbook".to_string();
+            } else if channel.contains("publicTrade") {
+                res_data.channel = "trade".to_string();
+            } else if channel.contains("tickers") {
+                res_data.channel = "tickers".to_string();
+            } else if channel.contains("kline") {
+                res_data.channel = "kline".to_string();
+            } else {
+                res_data.code = "".to_string();
+                res_data.channel = "未知的频道".to_string();
+            }
+        } else {
+            //推送数据
+            res_data.code = "".to_string();
+            res_data.channel = "未知的频道".to_string();
+        }
+
+        res_data
+    }
+}

+ 1 - 0
exchanges/src/lib.rs

@@ -22,4 +22,5 @@ pub mod kucoin_spot_rest;
 pub mod crypto_spot_ws;
 pub mod bybit_swap_rest;
 pub mod xlsx_utils;
+pub mod bybit_swap_ws;
 

+ 2 - 2
exchanges/src/response_base.rs

@@ -6,8 +6,8 @@ pub struct ResponseData {
     pub message: String,
     pub channel: String,
     pub data: String,
-    pub time: i64,
-    pub reach_time: i64,
+    pub time: i64,       //数据接受的时间
+    pub reach_time: i64, //远程数据时间 弃用
 }
 
 impl ResponseData {

+ 157 - 75
exchanges/tests/bybit_swap_test.rs

@@ -1,81 +1,163 @@
 use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
+use tokio::sync::Mutex;
+use tracing::trace;
 
 use exchanges::bybit_swap_rest::BybitSwapRest;
+use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
 
-const ACCESS_KEY: &str = "";
-const SECRET_KEY: &str = "";
+const ACCESS_KEY: &str = "JKHMEL6kD7I7WjbHKP";
+const SECRET_KEY: &str = "jmofU9X9PjzGZ8BlO0xZLcUzImHE2CaTSQ3Y";
 
 
 //ws-订阅公共频道信息
-// #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
-// async fn ws_custom_subscribe_pu() {
-//     global::log_utils::init_log_with_trace();
-//
-//
-//     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-//     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-//
-//
-//     let mut ws = get_ws(None, BybitSwapWsType::Public).await;
-//     ws.set_symbols(vec!["BTC_USDT".to_string()]);
-//     ws.set_subscribe(vec![
-//         // BybitSwapSubscribeType::PuBooks5,
-//         // BybitSwapSubscribeType::Putrades,
-//         BybitSwapSubscribeType::PuBooks50L2tbt,
-//         // BybitSwapSubscribeType::PuIndexTickers,
-//     ]);
-//
-//
-//     let write_tx_am = Arc::new(Mutex::new(write_tx));
-//     let bool_v1 = Arc::new(AtomicBool::new(true));
-//
-//     //读取
-//     let _bool_v1_clone = Arc::clone(&bool_v1);
-//     let _tr = tokio::spawn(async move {
-//         trace!("线程-数据读取-开启");
-//         loop {
-//             if let Some(data) = read_rx.next().await {
-//                 trace!("读取数据data:{:?}",data)
-//             }
-//         }
-//         // trace!("线程-数据读取-结束");
-//     });
-//
-//     //写数据
-//     // let bool_v2_clone = Arc::clone(&bool_v1);
-//     // let write_tx_clone = Arc::clone(&write_tx_am);
-//     // let su = ws.get_subscription();
-//     // let tw = tokio::spawn(async move {
-//     //     trace!("线程-数据写入-开始");
-//     //     loop {
-//     //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
-//     //         // let close_frame = CloseFrame {
-//     //         //     code: CloseCode::Normal,
-//     //         //     reason: Cow::Borrowed("Bye bye"),
-//     //         // };
-//     //         // let message = Message::Close(Some(close_frame));
-//     //
-//     //
-//     //         let message = Message::Text(su.clone());
-//     //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
-//     //         trace!("发送指令成功");
-//     //     }
-//     //     trace!("线程-数据写入-结束");
-//     // });
-//
-//     let t1 = tokio::spawn(async move {
-//         //链接
-//         let bool_v3_clone = Arc::clone(&bool_v1);
-//         ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-//         trace!("test 唯一线程结束--");
-//     });
-//     tokio::try_join!(t1).unwrap();
-//     trace!("当此结束");
-//     trace!("重启!");
-//     trace!("参考交易所关闭");
-//     return;
-// }
-//
+#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+async fn ws_custom_subscribe_pu() {
+    global::log_utils::init_log_with_trace();
+
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+
+    let mut ws = get_ws(None, BybitSwapWsType::Public).await;
+    ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    ws.set_subscribe(vec![
+        // BybitSwapSubscribeType::PuOrderBook1,
+        // BybitSwapSubscribeType::PuOrderBook50,
+        // BybitSwapSubscribeType::PuBlicTrade,
+        BybitSwapSubscribeType::PuTickers,
+
+    ]);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let bool_v1 = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _bool_v1_clone = Arc::clone(&bool_v1);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            if let Some(data) = read_rx.next().await {
+                trace!("读取数据data:{:?}",data)
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let t1 = tokio::spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
+
+//ws-订阅私有频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+async fn ws_custom_subscribe_pr() {
+    global::log_utils::init_log_with_trace();
+
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+    let logparam = BybitSwapLogin{
+        api_key:ACCESS_KEY.to_string(),
+        secret_key :SECRET_KEY.to_string(),
+    };
+
+    let mut ws = get_ws(Option::from(logparam), BybitSwapWsType::Private).await;
+    ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    ws.set_subscribe(vec![
+        BybitSwapSubscribeType::PrPosition,
+        // BybitSwapSubscribeType::PrExecution,
+        // BybitSwapSubscribeType::PrOrder,
+        // BybitSwapSubscribeType::PrWallet,
+    ]);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let bool_v1 = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _bool_v1_clone = Arc::clone(&bool_v1);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            if let Some(data) = read_rx.next().await {
+                trace!("读取数据data:{:?}",data)
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let t1 = tokio::spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
 
 //rest-服務器時間
 #[tokio::test]
@@ -205,10 +287,10 @@ async fn rest_cancel_order_test() {
 // }
 
 
-// async fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
-//     let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
-//     ku_ws
-// }
+async fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
+    let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
+    ku_ws
+}
 
 fn get_rest() -> BybitSwapRest {
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();