Bläddra i källkod

新增 bingx 的公共信息订阅

hl 1 år sedan
förälder
incheckning
f82fed9b75

+ 3 - 1
exchanges/Cargo.toml

@@ -47,4 +47,6 @@ once_cell = "1.19.0"
 flate2 = "1.0.28"
 
 # 火币需要的一个加密包
-percent-encoding = "2.3.1"
+percent-encoding = "2.3.1"
+
+

+ 303 - 0
exchanges/src/bingx_swap_ws.rs

@@ -0,0 +1,303 @@
+use std::collections::BTreeMap;
+use std::io::Read;
+use std::str::FromStr;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use flate2::read::GzDecoder;
+use serde_json::json;
+
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::Value;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+//类型
+pub enum BingxSwapWsType {
+    PublicAndPrivate,
+}
+
+
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct BingxSwapWsParam {
+    pub token: String,
+    pub ws_url: String,
+    pub ws_ping_interval: i64,
+    pub ws_ping_timeout: i64,
+    pub is_ok_subscribe: bool,
+}
+
+//订阅频道
+#[derive(Clone)]
+pub enum BingxSwapSubscribeType {
+    // 深度
+    PuFuturesDepth,
+    // 公开成交
+    PuFuturesTrades,
+    // K线数据
+    PuFuturesRecords,
+}
+
+//账号信息
+#[derive(Clone, Debug)]
+pub struct BingxSwapLogin {
+    pub access_key: String,
+    pub secret_key: String,
+    pub pass_key: String,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BingxSwapWs {
+    //类型
+    tag: String,
+    //地址
+    address_url: String,
+    //账号
+    login_param: Option<BingxSwapLogin>,
+    //登录数据
+    ws_param: BingxSwapWsParam,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<BingxSwapSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl BingxSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
+        return Self::new_with_tag("default-BingxSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            BingxSwapWsType::PublicAndPrivate => {
+                let url = "wss://open-api-swap.bingx.com/swap-market".to_string();
+                info!("走普通通道(不支持colo通道):{}", url);
+                url
+            }
+        };
+
+        /*******公共频道-私有频道数据组装*/
+        let mut ws_param = BingxSwapWsParam {
+            token: "".to_string(),
+            ws_url: "".to_string(),
+            ws_ping_interval: 0,
+            ws_ping_timeout: 0,
+            is_ok_subscribe: false,
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        BingxSwapWs {
+            tag,
+            address_url,
+            login_param,
+            ws_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 18,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BingxSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "-");
+        }
+        self.symbol_s = b_array;
+    }
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                BingxSwapSubscribeType::PuFuturesTrades => false,
+                BingxSwapSubscribeType::PuFuturesRecords => false,
+                BingxSwapSubscribeType::PuFuturesDepth => false,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: BingxSwapSubscribeType) -> String {
+        match subscribe_type {
+            BingxSwapSubscribeType::PuFuturesDepth => {
+                format!("{}@depth5@100ms", symbol)
+            }
+            BingxSwapSubscribeType::PuFuturesRecords => {
+                format!("{}@trade", symbol)
+            }
+            BingxSwapSubscribeType::PuFuturesTrades => {
+                format!("{}@kline_1m", symbol)
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut array = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                let str = json!({
+                    "id": "id1",
+                    "reqType": "sub",
+                    "dataType": ty_str.to_string()
+                });
+                ;
+                array.push(str.to_string());
+            }
+        }
+        array
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let tag = self.tag.clone();
+        let heartbeat_time = self.ws_param.ws_ping_interval.clone();
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = write_tx_am.clone();
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
+
+
+        //设置订阅
+        let subscribe_array = subscription.clone();
+        if login_is {
+            //登录相关
+        }
+
+        //1 链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("Bingx_usdt_swap socket 连接中……");
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
+
+                error!("Bingx_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        // let message_str = format!("Binary:{:?}", _po);
+        // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+        // let result = String::from_utf8(bytes);
+        // let result = String::from_utf8(po);
+
+        let mut gz_decoder = GzDecoder::new(&po[..]);
+        let mut decompressed_data = Vec::new();
+
+        // 尝试解压数据
+        if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
+            // 将解压后的字节向量转换为 UTF-8 字符串
+            match String::from_utf8(decompressed_data) {
+                Ok(text) => {
+                    let response_data = Self::ok_text(text);
+                    return Option::from(response_data);
+                }
+                Err(e) => {
+                    return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+                }
+            }
+        } else {
+            return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+        }
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        // trace!("原始数据:{:?}",text);
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+
+        // { "id": "id1", "code": 0, "msg": "" }
+        if json_value["id"].as_str() == Option::from("id1") {
+            //订阅
+            if json_value["code"].as_i64() == Option::from(0) {
+                res_data.code = -201;
+                res_data.message = "订阅成功".to_string();
+            }else{
+                res_data.code = 400;
+                res_data.message = "订阅失败".to_string();
+            }
+        }else if json_value["code"].as_i64() == Option::from(0){
+            res_data.code = 200;
+            res_data.data = json_value.clone();
+        }else{
+            res_data.code = -1;
+            res_data.message = "未知解析".to_string();
+        }
+
+        res_data
+    }
+}

+ 1 - 1
exchanges/src/lib.rs

@@ -28,4 +28,4 @@ pub mod coinex_swap_ws;
 pub mod coinex_swap_rest;
 pub mod htx_swap_ws;
 pub mod htx_swap_rest;
-
+pub mod bingx_swap_ws;

+ 14 - 7
exchanges/src/socket_tool.rs

@@ -89,13 +89,13 @@ impl AbstractWsMode {
 
                     let code = data.code.clone();
 
-                    if code == 200 {
-                        let mut data_c = data.clone();
-                        data_c.ins = Instant::now();
-                        data_c.time = Utc::now().timestamp_millis();
-
-                        handle_function(data_c).await;
-                    }
+                    // if code == 200 {
+                    //     let mut data_c = data.clone();
+                    //     data_c.ins = Instant::now();
+                    //     data_c.time = Utc::now().timestamp_millis();
+                    //
+                    //     handle_function(data_c).await;
+                    // }
 
                     /*
                         200 -正确返回
@@ -106,6 +106,13 @@ impl AbstractWsMode {
                        -302 -客户端收到服务器心跳自定义,需要响应自定义
                     */
                     match code {
+                        200 =>{
+                            let mut data_c = data.clone();
+                            data_c.ins = Instant::now();
+                            data_c.time = Utc::now().timestamp_millis();
+
+                            handle_function(data_c).await;
+                        }
                         -200 => {
                             //登录成功
                             info!("ws登录成功:{:?}", data);

+ 103 - 0
exchanges/tests/bingx_swap_test.rs

@@ -0,0 +1,103 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use futures_util::StreamExt;
+use tokio::sync::Mutex;
+use tracing::trace;
+
+use exchanges::bingx_swap_ws::{BingxSwapLogin, BingxSwapSubscribeType, BingxSwapWs, BingxSwapWsType};
+use exchanges::response_base::ResponseData;
+
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
+const PASS_KEY: &str = "";
+
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn ws_custom_subscribe() {
+    global::log_utils::init_log_with_trace();
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
+
+    // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
+    // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let is_shutdown_arc = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            // 从通道中接收并丢弃所有的消息,直到通道为空
+            while let Ok(Some(_)) = read_rx.try_next() {
+
+                // 从通道中接收并丢弃所有的消息,直到通道为空
+                while let Ok(Some(_)) = read_rx.try_next() {
+                    // 消息被忽略
+                }
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let fun = move |data: ResponseData| {
+        async move {
+            trace!("---传入的方法~~~~{:?}", data);
+        }
+    };
+    let param = BingxSwapLogin {
+        access_key: ACCESS_KEY.to_string(),
+        secret_key: SECRET_KEY.to_string(),
+        pass_key: PASS_KEY.to_string(),
+    };
+    let t1 = tokio::spawn(async move {
+        let mut ws = get_ws(Option::from(param), BingxSwapWsType::PublicAndPrivate);
+        ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
+        ws.set_subscribe(vec![
+            BingxSwapSubscribeType::PuFuturesTrades,
+            BingxSwapSubscribeType::PuFuturesDepth,
+            BingxSwapSubscribeType::PuFuturesRecords,
+
+        ]);
+        //链接
+        let bool_v3_clone = Arc::clone(&is_shutdown_arc);
+        ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+fn get_ws(btree_map: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
+    let Bingx_ws = BingxSwapWs::new(false,btree_map, ws_type);
+    Bingx_ws
+}
+