소스 검색

币安 交易交易所

hl 1 년 전
부모
커밋
eb56b56ad5
4개의 변경된 파일215개의 추가작업 그리고 98개의 파일을 삭제
  1. 14 2
      exchanges/src/binance_swap_rest.rs
  2. 120 17
      exchanges/src/binance_swap_ws.rs
  3. 53 73
      exchanges/tests/binance_swap_test.rs
  4. 28 6
      exchanges/tests/gate_swap_test.rs

+ 14 - 2
exchanges/src/binance_swap_rest.rs

@@ -271,6 +271,18 @@ impl BinanceSwapRest {
         data
     }
 
+    //生成 listenKey
+    pub async fn get_listenKey(&mut self) -> ResponseData {
+        let  params = serde_json::json!({
+         });
+        let data = self.request("POST".to_string(),
+                                "".to_string(),
+                                format!("/fapi/v1/listenKey"),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
 
     /*******************************************************************************************************/
     /*****************************************工具函数********************************************************/
@@ -421,11 +433,11 @@ impl BinanceSwapRest {
         if response.status().is_success() {
             // 读取响应的内容
             let body = response.text().await?;
-            // trace!("ok-----{}", body);
+            trace!("ok-----{}", body);
             res_data = ResponseData::new(self.label.clone(), "200".to_string(), "success".to_string(), body);
         } else {
             let body = response.text().await?;
-            // trace!("error-----{}", body);
+            trace!("error-----{}", body);
             res_data = ResponseData::error(self.label.clone(), body.to_string())
         }
 

+ 120 - 17
exchanges/src/binance_swap_ws.rs

@@ -1,3 +1,4 @@
+use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 
@@ -6,9 +7,10 @@ use serde_json::json;
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{info, trace};
+use crate::binance_swap_rest::BinanceSwapRest;
 
 use crate::response_base::ResponseData;
-use crate::socket_tool::AbstractWsMode;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
 
 //类型
 pub enum BinanceSwapWsType {
@@ -21,6 +23,11 @@ pub enum BinanceSwapSubscribeType {
     PuBookTicker,
     PuAggTrade,
     PuDepth20levels100ms,
+    PuMarkPrice,
+
+    PrBalance,
+    PrAccount,
+
 }
 
 //账号信息
@@ -46,6 +53,8 @@ pub struct BinanceSwapWs {
     subscribe_types: Vec<BinanceSwapSubscribeType>,
     //心跳间隔
     heartbeat_time: u64,
+    //rest
+    rest_swap: Option<BinanceSwapRest>,
 }
 
 impl BinanceSwapWs {
@@ -70,13 +79,15 @@ impl BinanceSwapWs {
             info!("走普通通道:{}",address_url);
         }
 
+
         BinanceSwapWs {
             label,
             address_url,
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 20,
+            heartbeat_time: 1000 * 10,
+            rest_swap: None,
         }
     }
 
@@ -105,6 +116,10 @@ impl BinanceSwapWs {
                 BinanceSwapSubscribeType::PuBookTicker => false,
                 BinanceSwapSubscribeType::PuAggTrade => false,
                 BinanceSwapSubscribeType::PuDepth20levels100ms => false,
+                BinanceSwapSubscribeType::PuMarkPrice => false,
+
+                BinanceSwapSubscribeType::PrBalance => true,
+                BinanceSwapSubscribeType::PrAccount => true,
             } {
                 return true;
             }
@@ -115,7 +130,18 @@ impl BinanceSwapWs {
     /*****************************************工具函数********************************************************/
     /*******************************************************************************************************/
     //订阅枚举解析
-    pub fn enum_to_string(symbol: String, subscribe_type: BinanceSwapSubscribeType) -> String {
+    pub fn enum_to_string_pr(symbol: String, subscribe_type: BinanceSwapSubscribeType, listenKey: String) -> String {
+        match subscribe_type {
+            BinanceSwapSubscribeType::PrBalance => {
+                format!("{}@balance", listenKey)
+            }
+            BinanceSwapSubscribeType::PrAccount => {
+                format!("{}@account", listenKey)
+            }
+            _ => { "".to_string() }
+        }
+    }
+    pub fn enum_to_string_pu(symbol: String, subscribe_type: BinanceSwapSubscribeType) -> String {
         match subscribe_type {
             BinanceSwapSubscribeType::PuAggTrade => {
                 format!("{}@aggTrade", symbol)
@@ -126,6 +152,11 @@ impl BinanceSwapWs {
             BinanceSwapSubscribeType::PuBookTicker => {
                 format!("{}@bookTicker", symbol)
             }
+            BinanceSwapSubscribeType::PuMarkPrice => {
+                format!("{}@markPrice", symbol)
+            }
+
+            _ => { "".to_string() }
         }
     }
     //订阅信息生成
@@ -133,16 +164,57 @@ impl BinanceSwapWs {
         let mut params = vec![];
         for symbol in &self.symbol_s {
             for subscribe_type in &self.subscribe_types {
-                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
-                params.push(ty_str);
+                let ty_str = Self::enum_to_string_pu(symbol.clone(), subscribe_type.clone());
+                if ty_str.len() > 0 {
+                    params.push(ty_str);
+                }
             }
         }
 
-        let str = json!({
+        if params.len() > 0 {
+            let str = json!({
             "method": "SUBSCRIBE",
             "params":  params,
             "id": 1
             });
+            str.to_string()
+        } else {
+            "".to_string()
+        }
+    }
+    //获取 listenKey
+    pub async fn get_listenKey(&self) -> String {
+        let req = self.rest_swap.clone().unwrap().get_listenKey().await;
+        if req.code.as_str() != "200" {
+            trace!("get_listenKey获取失败(私有订阅必要参数!)~!");
+            "".to_string()
+        } else {
+            let body = req.data.clone();
+            let json_value: serde_json::Value = serde_json::from_str(&body).unwrap();
+            let listenKey = json_value["listenKey"].as_str().unwrap().to_string();
+            listenKey
+        }
+    }
+    //私人订阅信息生成
+    pub async fn get_subscription_pr(&self) -> String {
+        let listenKey = self.get_listenKey().await;
+
+        if listenKey.len() == 0 {
+            return "".to_string();
+        }
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string_pr(symbol.clone(), subscribe_type.clone(), listenKey.clone());
+                params.push(ty_str);
+            }
+        }
+
+        let str = json!({
+            "method": "REQUEST",
+            "params":params,
+            "id": 12
+            });
         str.to_string()
     }
     /*******************************************************************************************************/
@@ -151,7 +223,7 @@ impl BinanceSwapWs {
     //链接
     pub async fn ws_connect_async(&mut self,
                                   bool_v1: Arc<AtomicBool>,
-                                  _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                  write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                   write_rx: UnboundedReceiver<Message>,
                                   read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
     {
@@ -159,23 +231,46 @@ impl BinanceSwapWs {
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
         let label = self.label.clone();
-        // let heartbeat_time = self.heartbeat_time.clone();
+        let login_param = self.login_param.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
 
 
         //心跳-- 方法内部线程启动
-        // let write_tx_clone1 = Arc::clone(write_tx_am);
-        // tokio::spawn(async move {
-        //     trace!("线程-异步心跳-开始");
-        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
-        //     trace!("线程-异步心跳-结束");
-        // });
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
 
         //设置订阅
         let mut subscribe_array = vec![];
         if login_is {
+            match login_param {
+                None => {
+                    trace!("登录参数缺失~!");
+                    return Ok(());
+                }
+                Some(p) => {
+                    let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
+                    btree_map.insert("access_key".to_string(), p.api_key.to_string());
+                    btree_map.insert("secret_key".to_string(), p.api_secret.to_string());
+                    self.rest_swap = Option::from(BinanceSwapRest::new(false, btree_map));
+                }
+            }
             //登录相关
+            let subscribe_pr = self.get_subscription_pr().await;
+            if subscribe_pr.len() == 0 {
+                trace!("私有订阅 组装失败~!");
+                return Ok(());
+            } else {
+                subscribe_array.push(subscribe_pr);
+            }
+        }
+        if subscription.len() > 0 {
+            subscribe_array.push(subscription.to_string());
         }
-        subscribe_array.push(subscription.to_string());
+
 
         //链接
         let t2 = tokio::spawn(async move {
@@ -219,11 +314,17 @@ impl BinanceSwapWs {
         let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
+
         if json_value.get("result").is_some() && json_value.get("id").is_some() &&
-            json_value.get("id").unwrap() == 1
+            (json_value.get("id").unwrap() == 1 || json_value.get("id").unwrap() == 12)
         {
             res_data.code = "-201".to_string();
-            res_data.message = "订阅成功".to_string();
+            if json_value.get("id").unwrap() == 1 {
+                res_data.message = "公共订阅成功".to_string();
+            }
+            if json_value.get("id").unwrap() == 12 {
+                res_data.message = "私人订阅成功".to_string();
+            }
         } else if json_value.get("error").is_some() {//订阅返回
             res_data.code = json_value["error"]["code"].to_string();
             res_data.message = json_value["error"]["msg"].to_string();
@@ -238,6 +339,8 @@ impl BinanceSwapWs {
                 res_data.channel = "depth".to_string();
             } else if channel.contains("@bookTicker") {
                 res_data.channel = "bookTicker".to_string();
+            } else if channel.contains("@markPrice") {
+                res_data.channel = "markPrice".to_string();
             } else {
                 res_data.code = "".to_string();
                 res_data.channel = "未知的频道".to_string();

+ 53 - 73
exchanges/tests/binance_swap_test.rs

@@ -2,8 +2,10 @@ use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use std::time::Duration;
+use chrono::Utc;
 
 use futures_util::StreamExt;
+use serde_json::json;
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::Message;
 use tracing::trace;
@@ -27,14 +29,20 @@ async fn ws_custom_subscribe() {
 
     // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
     // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
+    let login_param = BinanceSwapLogin {
+        api_key: ACCESS_KEY.to_string(),
+        api_secret: SECRET_KEY.to_string(),
+    };
 
-
-    let mut ws = get_ws(None);
+    let mut ws = get_ws(Option::from(login_param));
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
-        BinanceSwapSubscribeType::PuBookTicker,
+        // BinanceSwapSubscribeType::PuBookTicker,
         // BinanceSwapSubscribeType::PuAggTrade,
+        BinanceSwapSubscribeType::PuMarkPrice,
         // BinanceSwapSubscribeType::PuDepth20levels100ms,
+
+        BinanceSwapSubscribeType::PrAccount,
     ]);
 
 
@@ -86,76 +94,6 @@ async fn ws_custom_subscribe() {
     trace!("重启!");
     trace!("参考交易所关闭");
     return;
-
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //************************************
-    //11 点31 分
-
-    // let mut bool_v1 = Arc::new(AtomicBool::new(true));
-    // //创建读写通道
-    // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-    // // 封装 write_tx 到 Arc 和 Mutex
-    // let write_tx_am = Arc::new(Mutex::new(write_tx));
-    //
-    // //对象
-    // let mut ws = get_ws(None);
-    // // 币对
-    // ws.set_symbols(vec!["BTC_USDT".to_string()]);
-    // //订阅
-    // ws.set_subscribe(vec![
-    //     BinanceSwapSubscribeType::PuBookTicker,
-    //     BinanceSwapSubscribeType::PuAggTrade,
-    //     BinanceSwapSubscribeType::PuDepth20levels100ms,
-    // ]);
-    //
-    //
-    // //模拟业务场景 开启链接
-    // let bool_v1_clone = Arc::clone(&bool_v1);
-    // let write_tx_clone1 = Arc::clone(&write_tx_am);
-    // let t1 = tokio::spawn(async move {
-    //     ws.ws_connect_async(bool_v1_clone, write_tx_clone1, write_rx, &read_tx).await.unwrap();
-    //     trace!("ws_connect_async 完成");
-    // });
-    //
-    // //模拟业务场景 一直监听数据
-    // let t2 = tokio::spawn(async move {
-    //     loop {
-    //         if let Some(data) = read_rx.next().await {
-    //             trace!("读取数据data:{:?}",data)
-    //         }
-    //     }
-    //     trace!("数据读取退出 完成");
-    // });
-    //
-    //
-    // //模拟用户主动写入数据
-    // // let write_tx_clone2 = Arc::clone(&write_tx_am);
-    // // let t3 = tokio::spawn(async move {
-    // //     //模拟心跳
-    // //     loop {
-    // //         tokio::time::sleep(Duration::from_millis(5000)).await;
-    // //         let mut write_tx_clone = write_tx_clone2.lock().unwrap();
-    // //         match write_tx_clone.unbounded_send(Message::Pong(Vec::from("pong"))) {
-    // //             Ok(_) => {
-    // //                 trace!("发送心跳");
-    // //                 continue;
-    // //             }
-    // //             Err(_) => {
-    // //                 break;
-    // //             }
-    // //         }
-    // //     }
-    // //     trace!("主动推出 完成");
-    // // });
-    // // tokio::try_join!(y,y1,y2).unwrap();
-    // tokio::try_join!(t1,t2).unwrap();
-    // trace!("323123213");
 }
 
 //rest-获取服务器时间
@@ -215,6 +153,48 @@ async fn rest_get_user_trades_test() {
     trace!(?rep_data)
 }
 
+//rest-生成 listenKey
+#[tokio::test]
+async fn rest_get_listenKey_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.get_listenKey().await;
+    trace!(?rep_data)
+}
+
+//杠杆设置
+#[tokio::test]
+async fn rest_change_pos_side_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let rep_data = rest.change_pos_side(true).await;
+    trace!(?rep_data)
+}
+
+//下单
+#[tokio::test]
+async fn rest_swap_order_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut rest = get_rest();
+    let timestamp = Utc::now().timestamp_millis() /  1000;
+    let timestamp_600 = (timestamp + 700);
+    trace!("{:?}-==={:?}---{:?}--",timestamp,timestamp_600,(timestamp_600 > (timestamp + 600)));
+    let json_str = json!({
+        "symbol":"XRPUSDT",
+        "side":"BUY",
+        "type":"LIMIT",
+        "quantity":"10",
+        "price":"0.59",
+        "timeInForce":"GTC",
+        "goodTillDate":timestamp_600,
+    });
+    let rep_data = rest.swap_order(json_str).await;
+    trace!(?rep_data)
+}
+
 
 fn get_ws(btree_map: Option<BinanceSwapLogin>) -> BinanceSwapWs {
     let binance_ws = BinanceSwapWs::new(false,

+ 28 - 6
exchanges/tests/gate_swap_test.rs

@@ -24,17 +24,18 @@ async fn ws_custom_subscribe() {
         api_key: ACCESS_KEY.to_string(),
         secret: SECRET_KEY.to_string(),
     };
+    let user_id = "13125616";
 
     let mut ws = get_ws(Option::from(param));
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
         // GateSwapSubscribeType::PuFuturesOrderBook,
-        GateSwapSubscribeType::PuFuturesCandlesticks,
-        GateSwapSubscribeType::PuFuturesTrades,
-
-        // GateSwapSubscribeType::PrFuturesBalances("".to_string()),
-        // GateSwapSubscribeType::PrFuturesOrders("".to_string()),
-        // GateSwapSubscribeType::PrFuturesPositions("".to_string()),
+        // GateSwapSubscribeType::PuFuturesCandlesticks,
+        // GateSwapSubscribeType::PuFuturesTrades,
+        //
+        GateSwapSubscribeType::PrFuturesBalances(user_id.to_string()),
+        // GateSwapSubscribeType::PrFuturesOrders(user_id.to_string()),
+        // GateSwapSubscribeType::PrFuturesPositions(user_id.to_string()),
     ]);
 
 
@@ -89,6 +90,27 @@ async fn ws_custom_subscribe() {
 }
 
 
+//rest-查询个人交易费率
+#[tokio::test]
+async fn rest_wallet_fee_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.wallet_fee().await;
+    println!("okx--查询个人交易费率--{:?}", req_data);
+}
+
+
+//rest-查询合约账户
+#[tokio::test]
+async fn rest_get_account_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_account("usdt".to_string()).await;
+    println!("okx--查询合约账户--{:?}", req_data);
+}
+
 //rest-设置持仓模式
 #[tokio::test]
 async fn rest_cancel_order_all_test() {