Sfoglia il codice sorgente

ws接口封装完成,等待测试。

skyffire 1 anno fa
parent
commit
d5ba88a291
1 ha cambiato i file con 313 aggiunte e 0 eliminazioni
  1. 313 0
      exchanges/src/bitget_swap_ws.rs

+ 313 - 0
exchanges/src/bitget_swap_ws.rs

@@ -0,0 +1,313 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use chrono::{Utc};
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::{json, Value};
+use tracing::{error, info, trace};
+use ring::hmac;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+pub enum BitgetSwapWsType {
+    Public,
+    Private,
+}
+
+#[derive(Clone)]
+pub enum BitgetSwapSubscribeType {
+    PuTrade,
+
+    PrAccount,
+    PrPosition,
+    PrOrders,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BitgetSwapLogin {
+    pub api_key: String,
+    pub secret_key: String,
+    pub passphrase_key: String,
+}
+
+#[derive(Clone)]
+pub struct BitgetSwapWs {
+    label: String,                                      // 类型
+    address_url: String,                                // 地址
+    login_param: Option<BitgetSwapLogin>,               // 账号
+    symbol_s: Vec<String>,                              // 币对
+    subscribe_types: Vec<BitgetSwapSubscribeType>,      // 订阅
+    heartbeat_time: u64,                                // 心跳间隔
+}
+
+impl BitgetSwapWs {
+    pub fn new(is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
+        return BitgetSwapWs::new_label("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
+        let address_url = match ws_type {
+            BitgetSwapWsType::Public => {
+                "wss://ws.bitget.com/v2/ws/public".to_string()
+            }
+            BitgetSwapWsType::Private => {
+                "wss://ws.bitget.com/v2/ws/private".to_string()
+            }
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        BitgetSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 20,
+        }
+    }
+
+    // 添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BitgetSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+
+    // 手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 小写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("-", "");
+            *symbol = symbol.replace("_", "");
+        }
+        self.symbol_s = b_array;
+    }
+
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                BitgetSwapSubscribeType::PuTrade => false,
+
+                BitgetSwapSubscribeType::PrAccount => true,
+                BitgetSwapSubscribeType::PrOrders => true,
+                BitgetSwapSubscribeType::PrPosition => true
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数*******************************************************/
+    /*******************************************************************************************************/
+    // 枚举解析成json
+    pub fn enum_to_json(symbol: String, subscribe_type: BitgetSwapSubscribeType) -> Value {
+        match subscribe_type {
+            // 公共订阅
+            BitgetSwapSubscribeType::PuTrade => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "trade",
+                    "instId": symbol,
+                })
+            },
+
+            // 私有订阅
+            BitgetSwapSubscribeType::PrAccount => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "account",
+                    "coin": "default",
+                })
+            },
+            BitgetSwapSubscribeType::PrPosition => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "positions",
+                    "instId": "default"
+                })
+            },
+            BitgetSwapSubscribeType::PrOrders => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "orders",
+                    "instId": "default"
+                })
+            },
+        }
+    }
+
+    // 订阅信息生成
+    pub fn get_subscription(&self) -> String {
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_json(symbol.clone(), subscribe_type.clone());
+                params.push(ty_str);
+            }
+        }
+        let str = json!({
+            "op": "subscribe",
+            "args": params
+        });
+
+        str.to_string()
+    }
+
+    // 登录数据组装
+    fn log_in_to_str(&self) -> String {
+        let mut login_json_str = "".to_string();
+
+        let mut access_key: String = "".to_string();
+        let mut secret_key: String = "".to_string();
+        let mut passphrase: String = "".to_string();
+        match self.login_param.clone() {
+            None => {}
+            Some(param) => {
+                access_key = param.api_key;
+                secret_key = param.secret_key;
+                passphrase = param.passphrase_key;
+            }
+        }
+        if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
+            let timestamp = Utc::now().timestamp().to_string();
+            // 时间戳 + 请求类型+ 请求参数字符串
+            let message = format!("{}GET{}", timestamp, "/user/verify");
+            let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+            let result = hmac::sign(&hmac_key, &message.as_bytes());
+            let sign = base64::encode(result);
+
+            let login_json = json!({
+                              "op": "login",
+                              "args": [{
+                                "apiKey": access_key,
+                                "passphrase": passphrase,
+                                "timestamp": timestamp,
+                                "sign": sign
+                              }]
+                        });
+
+            info!("---login_json: {0}", login_json.to_string());
+            info!("---登陆: {}", login_json.to_string());
+            login_json_str = login_json.to_string();
+        }
+        login_json_str
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                            is_shutdown_arc: Arc<AtomicBool>,
+                                            handle_function: F,
+                                            write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                            write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
+
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+        });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        if login_is {
+            //登录相关
+            let login_str = self.log_in_to_str();
+            let write_tx_clone2 = Arc::clone(write_tx_am);
+            AbstractWsMode::send_subscribe(write_tx_clone2, Message::Text(login_str)).await;
+            tokio::time::sleep(Duration::from_millis(1000 * 3)).await;
+        }
+        subscribe_array.push(subscription.to_string());
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("bitget_usdt_swap socket 连接中……");
+                // ws层重连
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong).await;
+
+                error!("bitget_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************数据解析*******************************************************/
+    /******************************************************************************************************/
+    // 数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    // 数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), Value::Null));
+    }
+    // 数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), Value::Null));
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData {
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        if json_value.get("event").is_some() && json_value["event"].as_str() == Option::from("login") {
+            if json_value.get("code").is_some() && json_value["code"] == 0 {
+                res_data.message = "登陆成功".to_string();
+            } else {
+                res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
+            }
+            res_data.channel = "login".to_string();
+            res_data.code = "-200".to_string();
+            res_data
+        } else if json_value.get("event").is_some() && json_value["event"].as_str() == Option::from("subscribe") {
+            res_data.code = "-201".to_string();
+            res_data.data = json_value.clone();
+            res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
+            res_data
+        } else if json_value.get("action").is_some() {
+            res_data.data = json_value["data"].clone();
+            if res_data.data == "[]" {
+                res_data.code = "".to_string();
+            } else {
+                res_data.code = "200".to_string();
+            }
+            res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
+            res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
+            res_data
+        } else {
+            res_data
+        }
+    }
+}