Bläddra i källkod

Merge branch 'dev' into test

skyffire 1 år sedan
förälder
incheckning
bc6524370e

+ 2 - 2
Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "as-rust"
-version = "1.4.4"
+version = "1.5.1"
 edition = "2021"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -20,7 +20,7 @@ actix-rt = "2.5.0"
 actix-web = "4.0.0-beta.12"
 ctrlc = "3.2.5"
 serde_json = "1.0.105"
-rust_decimal = "1.32.0"
+rust_decimal = { version = "1.32.0", features = ["maths"] }
 rust_decimal_macros = "1.32.0"
 
 [workspace]

+ 2 - 2
config.toml.sample

@@ -8,7 +8,7 @@ access_key = ""
 secret_key = ""
 # 交易账户pass_key(部分交易所不需要填写)
 pass_key = ""
-# 交易交易所,可选[gate_usdt_swap, kucoin_usdt_swap, bitget_spot]
+# 交易交易所,可选[gate_usdt_swap, kucoin_usdt_swap, bitget_spot, bybit_usdt_swap]
 exchange = "gate_usdt_swap"
 # 交易交易所的交易对
 pair = "blz_usdt"
@@ -22,7 +22,7 @@ lever_rate = 0.1
 hold_coin = 0
 # quant的run_strategy函数,用于定期检查使用,单位是毫秒
 interval = 100
-# 参考交易所,可选[gate_usdt_swap, kucoin_usdt_swap, kucoin_spot, binance_usdt_swap, binance_spot]
+# 参考交易所,可选[gate_usdt_swap, kucoin_usdt_swap, kucoin_spot, binance_usdt_swap, binance_spot, bybit_usdt_swap]
 ref_exchange = ["binance_usdt_swap"]
 # 参考交易对
 ref_pair = ["blz_usdt"]

+ 11 - 8
exchanges/src/binance_spot_ws.rs

@@ -180,13 +180,16 @@ impl BinanceSpotWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-现货");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:币安-现货链接关闭-{:?}",e); }
+            }
             trace!("线程-异步链接-结束");
         });
         tokio::try_join!(t2).unwrap();
@@ -199,7 +202,7 @@ impl BinanceSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 11 - 9
exchanges/src/binance_swap_ws.rs

@@ -180,14 +180,16 @@ impl BinanceSwapWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:币安-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -199,7 +201,7 @@ impl BinanceSwapWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 11 - 8
exchanges/src/bitget_spot_ws.rs

@@ -271,13 +271,16 @@ impl BitgetSpotWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:bitget-现货链接关闭-{:?}",e); }
+            }
             trace!("线程-异步链接-结束");
         });
         tokio::try_join!(t2).unwrap();
@@ -290,7 +293,7 @@ impl BitgetSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 59 - 8
exchanges/src/bybit_swap_rest.rs

@@ -62,7 +62,7 @@ impl BybitSwapRest {
     /*******************************************************************************************************/
     //服務器時間
     pub async fn get_server_time(&mut self) -> ResponseData {
-        let  params = serde_json::json!({
+        let params = serde_json::json!({
          });
         let data = self.request("GET".to_string(),
                                 "/v5".to_string(),
@@ -72,9 +72,39 @@ impl BybitSwapRest {
         ).await;
         data
     }
+    //查詢最新行情信息
+    pub async fn get_tickers(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+               "category":"linear",
+                "symbol":symbol
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/tickers".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢市場價格K線數據
+    pub async fn get_kline(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+               "category":"linear",
+                "symbol":symbol,
+                "interval":"1",
+                "limit":"200"
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/kline".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
     //查詢公告
     pub async fn get_announcements(&mut self) -> ResponseData {
-        let  params = serde_json::json!({
+        let params = serde_json::json!({
             "locale":"zh-TW"
          });
         let data = self.request("GET".to_string(),
@@ -87,7 +117,7 @@ impl BybitSwapRest {
     }
     //查詢可交易產品的規格信息
     pub async fn get_instruments_info(&mut self, symbol: String) -> ResponseData {
-        let  params = serde_json::json!({
+        let params = serde_json::json!({
             "category":"linear",
             "symbol":symbol
          });
@@ -101,13 +131,16 @@ impl BybitSwapRest {
     }
 
     //查看持仓信息
-    pub async fn get_positions(&mut self, symbol: String) -> ResponseData {
+    pub async fn get_positions(&mut self, symbol: String, settle_coin: String) -> ResponseData {
         let mut params = serde_json::json!({
             "category":"linear",
          });
         if symbol.len() > 0 {
             params.as_object_mut().unwrap().insert("symbol".parse().unwrap(), serde_json::Value::from(symbol));
         }
+        if settle_coin.len() > 0 {
+            params.as_object_mut().unwrap().insert("settleCoin".parse().unwrap(), serde_json::Value::from(settle_coin));
+        }
         let data = self.request("GET".to_string(),
                                 "/v5".to_string(),
                                 "/position/list".to_string(),
@@ -152,9 +185,10 @@ impl BybitSwapRest {
 
 
     //查詢錢包餘額
-    pub async fn get_account_balance(&mut self) -> ResponseData {
-        let  params = serde_json::json!({
-            "accountType":"UNIFIED"
+    pub async fn get_account_balance(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+            "accountType":"UNIFIED",
+            "coin":symbol
          });
         let data = self.request("GET".to_string(),
                                 "/v5".to_string(),
@@ -216,6 +250,23 @@ impl BybitSwapRest {
         ).await;
         data
     }
+
+    //撤銷所有訂單
+    pub async fn cancel_orders(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+         });
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/order/cancel-all".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+
     /*******************************************************************************************************/
     /*****************************************工具函数********************************************************/
     /*******************************************************************************************************/
@@ -337,7 +388,7 @@ impl BybitSwapRest {
     pub fn sign(access_key: String,
                 secret_key: String,
                 method: String,
-                params: String,   timestamp: String) -> String
+                params: String, timestamp: String) -> String
     {
         /*签名生成*/
         let url_param_str = RestTool::parse_params_to_str(params.clone());

+ 339 - 0
exchanges/src/bybit_swap_ws.rs

@@ -0,0 +1,339 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+
+use chrono::Utc;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::json;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{info, trace};
+
+use ring::hmac;
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+//类型
+pub enum BybitSwapWsType {
+    Public,
+    Private,
+}
+
+//订阅频道
+#[derive(Clone)]
+pub enum BybitSwapSubscribeType {
+    PuOrderBook1,
+    PuOrderBook50,
+    PuBlicTrade,
+    PuTickers,
+    PuKline(String),
+
+    PrPosition,
+    PrExecution,
+    PrOrder,
+    PrWallet,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BybitSwapLogin {
+    pub api_key: String,
+    pub secret_key: String,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BybitSwapWs {
+    //类型
+    label: String,
+    //地址
+    address_url: String,
+    //账号
+    login_param: Option<BybitSwapLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<BybitSwapSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl BybitSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
+        return BybitSwapWs::new_label("default-BybitSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            BybitSwapWsType::Public => {
+                "wss://stream.bybit.com/v5/public/linear?max_alive_time=1m".to_string()
+            }
+            BybitSwapWsType::Private => {
+                "wss://stream.bybit.com/v5/private?max_alive_time=1m".to_string()
+            }
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        BybitSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 10,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BybitSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+            *symbol = symbol.replace("-", "");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                BybitSwapSubscribeType::PuOrderBook1 => false,
+                BybitSwapSubscribeType::PuOrderBook50 => false,
+                BybitSwapSubscribeType::PuBlicTrade => false,
+                BybitSwapSubscribeType::PuTickers => false,
+                BybitSwapSubscribeType::PuKline(_) => false,
+
+                BybitSwapSubscribeType::PrPosition => true,
+                BybitSwapSubscribeType::PrExecution => true,
+                BybitSwapSubscribeType::PrOrder => true,
+                BybitSwapSubscribeType::PrWallet => true,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: BybitSwapSubscribeType) -> String {
+        match subscribe_type {
+            BybitSwapSubscribeType::PuOrderBook1 => {
+                format!("orderbook.1.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuOrderBook50 => {
+                format!("orderbook.50.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuBlicTrade => {
+                format!("publicTrade.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuTickers => {
+                format!("tickers.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuKline(t) => {
+                format!("kline.{}.{}", t, symbol)
+            }
+
+            BybitSwapSubscribeType::PrPosition => {
+                format!("position")
+            }
+            BybitSwapSubscribeType::PrExecution => {
+                format!("execution")
+            }
+            BybitSwapSubscribeType::PrOrder => {
+                format!("order")
+            }
+            BybitSwapSubscribeType::PrWallet => {
+                format!("wallet")
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> String {
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                params.push(ty_str);
+            }
+        }
+
+        let str = json!({
+                "op": "subscribe",
+                "args": params
+            });
+        str.to_string()
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async(&mut self,
+                                  bool_v1: Arc<AtomicBool>,
+                                  write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                  write_rx: UnboundedReceiver<Message>,
+                                  read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        let timestamp = Utc::now().timestamp_millis();
+        let login_param = self.login_param.clone();
+        let (api_key, secret_key) = match login_param {
+            None => { ("".to_string(), "".to_string()) }
+            Some(p) => {
+                (p.api_key.clone().to_string(), p.secret_key.clone().to_string())
+            }
+        };
+        let heartbeat_time = self.heartbeat_time.clone();
+        trace!("{:?}",format!("{}",subscription));
+
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        if login_is {
+            let expires = timestamp + 1000;
+            let message = format!("GET/realtime{}", expires);
+
+            let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+            let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+            let signature = hex::encode(result.as_ref());
+
+            //登录相关
+            let str = json!({
+                "op": "auth",
+                "args": [api_key, expires, signature]
+            });
+            subscribe_array.push(str.to_string());
+        }
+        subscribe_array.push(subscription.to_string());
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            trace!("线程-异步链接-开始");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:bitget-期货链接关闭-{:?}",e); }
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData {
+        // trace!("原始数据");
+        // trace!(?text);
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+        if json_value.get("success").is_some() {
+            //订阅内容
+            let success = json_value["success"].as_bool().unwrap();
+            // let ret_msg = json_value["ret_msg"].as_str().unwrap();
+            let op = json_value["op"].as_str().unwrap();
+            let success_error = if success {
+                "成功"
+            } else {
+                "失败"
+            };
+
+            if op == "auth" {
+                res_data.code = "-200".to_string();
+                res_data.message = format!("登录{}", success_error);
+            } else if op == "subscribe" {
+                res_data.message = format!("订阅{}", success_error);
+                res_data.code = "-201".to_string();
+            } else {
+                res_data.code = "".to_string();
+                res_data.channel = "未知订阅".to_string();
+            }
+        } else if json_value.get("topic").is_some() && json_value.get("data").is_some() {
+            let channel = json_value["topic"].to_string();
+            res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
+
+            res_data.code = "200".to_string();
+
+            if channel.contains("orderbook") {
+                res_data.channel = "orderbook".to_string();
+                res_data.data_type = json_value["type"].as_str().unwrap().to_string();
+                // bybit 时间在data块外
+                res_data.reach_time = json_value.get("ts").unwrap().as_i64().unwrap_or(0i64);
+            } else if channel.contains("publicTrade") {
+                res_data.channel = "trade".to_string();
+                res_data.data_type = json_value["type"].as_str().unwrap().to_string();
+            } else if channel.contains("tickers") {
+                res_data.channel = "tickers".to_string();
+            } else if channel.contains("kline") {
+                res_data.channel = "kline".to_string();
+            } else if channel.contains("position") {
+                res_data.channel = "position".to_string();
+            } else if channel.contains("execution") {
+                res_data.channel = "execution".to_string();
+            } else if channel.contains("order") {
+                res_data.channel = "order".to_string();
+            } else if channel.contains("wallet") {
+                res_data.channel = "wallet".to_string();
+            } else {
+                res_data.code = "".to_string();
+                res_data.channel = "未知的频道".to_string();
+            }
+        } else {
+            //推送数据
+            res_data.code = "".to_string();
+            res_data.channel = "未知的频道".to_string();
+        }
+
+        res_data
+    }
+}

+ 10 - 9
exchanges/src/crypto_spot_ws.rs

@@ -111,7 +111,6 @@ impl CryptoSpotWs {
                 CryptoSpotSubscribeType::PuTicker => false,
                 CryptoSpotSubscribeType::PuTrade => false,
                 CryptoSpotSubscribeType::PuCandlestick => false,
-
             } {
                 return true;
             }
@@ -199,14 +198,16 @@ impl CryptoSpotWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:crypt-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");

+ 22 - 11
exchanges/src/gate_swap_ws.rs

@@ -1,5 +1,6 @@
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use chrono::Utc;
 
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use hex;
@@ -87,7 +88,7 @@ impl GateSwapWs {
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 20,
+            heartbeat_time: 1000 * 10,
         }
     }
 
@@ -255,13 +256,18 @@ impl GateSwapWs {
         let address_url = self.address_url.clone();
         let label = self.label.clone();
         let heartbeat_time = self.heartbeat_time.clone();
+        let timestamp = Utc::now().timestamp();
 
 
         //心跳-- 方法内部线程启动
         let write_tx_clone1 = Arc::clone(write_tx_am);
         tokio::spawn(async move {
             trace!("线程-异步心跳-开始");
-            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            let ping_str = json!({
+                "time" : timestamp,
+                "channel" : "futures.ping",
+            });
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
             trace!("线程-异步心跳-结束");
         });
 
@@ -277,14 +283,16 @@ impl GateSwapWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("币安-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:gate-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -314,7 +322,10 @@ impl GateSwapWs {
         let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
-        if json_value.get("error").is_some() {
+        if json_value["channel"].as_str() == Option::from("futures.pong") {
+            res_data.code = "-301".to_string();
+            res_data.message = "success".to_string();
+        } else if json_value.get("error").is_some() {
             let message = json_value["error"]["message"].as_str().unwrap().to_string();
             let mes = message.trim_end_matches('\n');
 

+ 11 - 9
exchanges/src/kucoin_spot_ws.rs

@@ -306,14 +306,16 @@ impl KucoinSpotWs {
 
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array.clone(),
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("kucoin-现货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array.clone(),
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:kucoin-现货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -326,7 +328,7 @@ impl KucoinSpotWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 13 - 10
exchanges/src/kucoin_swap_ws.rs

@@ -31,7 +31,8 @@ pub struct KucoinSwapWsParam {
 //订阅频道
 #[derive(Clone)]
 pub enum KucoinSwapSubscribeType {
-    PuContractMarketLevel2Depth50,//买卖盘 快照,asks:卖,bids:买入
+    PuContractMarketLevel2Depth50,
+    //买卖盘 快照,asks:卖,bids:买入
     PuContractMarketExecution,
     PuContractMarkettickerV2,
 
@@ -320,14 +321,16 @@ impl KucoinSwapWs {
 
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
-                                             label.clone(), subscribe_array,
-                                             write_rx, read_tx,
-                                             Self::message_text,
-                                             Self::message_ping,
-                                             Self::message_pong,
-            ).await.expect("kucoin-期货");
-            trace!("线程-异步链接-结束");
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+                                                   label.clone(), subscribe_array,
+                                                   write_rx, read_tx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:kucoin-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");
@@ -340,7 +343,7 @@ impl KucoinSwapWs {
     /*******************************************************************************************************/
     //数据解析-Text
     pub fn message_text(text: String) -> Option<ResponseData> {
-        let  response_data = Self::ok_text(text);
+        let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping

+ 1 - 0
exchanges/src/lib.rs

@@ -22,4 +22,5 @@ pub mod kucoin_spot_rest;
 pub mod crypto_spot_ws;
 pub mod bybit_swap_rest;
 pub mod xlsx_utils;
+pub mod bybit_swap_ws;
 

+ 5 - 3
exchanges/src/okx_swap_ws.rs

@@ -304,14 +304,16 @@ impl OkxSwapWs {
         //链接
         let t2 = tokio::spawn(async move {
             trace!("线程-异步链接-开始");
-            AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
+            match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
                                              label.clone(), subscribe_array,
                                              write_rx, read_tx,
                                              Self::message_text,
                                              Self::message_ping,
                                              Self::message_pong,
-            ).await.expect("OKX-期货");
-            trace!("线程-异步链接-结束");
+            ).await{
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:okx-期货链接关闭-{:?}",e); }
+            }
         });
         tokio::try_join!(t2).unwrap();
         trace!("线程-心跳与链接-结束");

+ 5 - 3
exchanges/src/response_base.rs

@@ -6,13 +6,14 @@ pub struct ResponseData {
     pub message: String,
     pub channel: String,
     pub data: String,
-    pub time: i64,
-    pub reach_time: i64,
+    pub time: i64,       //数据接受的时间
+    pub reach_time: i64, //远程数据时间 弃用
+    pub data_type: String // 數據類型, 例如 bybit 深度信息:snapshot(全量),delta(增量)
 }
 
 impl ResponseData {
     pub fn new(label: String, code: String, message: String, data: String) -> ResponseData {
-        ResponseData { label, code, message, data, channel: "".to_string(), time: 0, reach_time: 0 }
+        ResponseData { label, code, message, data, channel: "".to_string(), time: 0, reach_time: 0 , data_type: String::new()}
     }
     pub fn error(label: String, message: String) -> ResponseData {
         ResponseData {
@@ -23,6 +24,7 @@ impl ResponseData {
             channel: "".to_string(),
             time: 0,
             reach_time: 0,
+            data_type: String::new()
         }
     }
 

+ 185 - 105
exchanges/src/socket_tool.rs

@@ -23,6 +23,7 @@ use crate::response_base::ResponseData;
 pub enum HeartbeatType {
     Ping,
     Pong,
+    Custom(String),
 }
 
 pub struct AbstractWsMode {}
@@ -56,123 +57,199 @@ impl AbstractWsMode {
         };
 
         loop {
-            let (ws_stream, _) = connect_async(address_url.clone(), proxy).await?;
-            trace!("WebSocket 握手完成。");
-            let (write, mut read) = ws_stream.split();
-
-            let write_arc = Arc::new(Mutex::new(write));
-            let write_clone = Arc::clone(&write_arc);
-            //订阅写入(包括订阅信息 )
-            trace!("订阅内容:{:?}",subscribe_array.clone());
-            for s in &subscribe_array {
-                let mut write_lock = write_clone.lock().await;
-                write_lock.send(Message::Text(s.parse().unwrap())).await?;
-            }
+            match connect_async(address_url.clone(), proxy).await {
+                Ok((ws_stream, _)) => {
+                    trace!("WebSocket 握手完成。");
+                    let (write, mut read) = ws_stream.split();
+
+                    let write_arc = Arc::new(Mutex::new(write));
+                    let write_clone = Arc::clone(&write_arc);
+                    //订阅写入(包括订阅信息 )
+                    trace!("订阅内容:{:?}",subscribe_array.clone());
+                    for s in &subscribe_array {
+                        let mut write_lock = write_clone.lock().await;
+                        write_lock.send(Message::Text(s.parse().unwrap())).await?;
+                    }
 
-            //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
-            // let stdin_to_ws = write_rx.map(Ok).forward(write);
-            // Writing task
-            let write_clone2 = Arc::clone(&write_arc);
-            let stdin_to_ws = async {
-                while let Some(message) = write_rx.next().await {
-                    let mut write_lock2 = write_clone2.lock().await;
-                    write_lock2.send(message).await?;
-                }
-                Ok::<(), tokio_tungstenite::tungstenite::Error>(())
-            };
-            let write_clone3 = Arc::clone(&write_arc);
-            let ws_to_stdout = async {
-                while let Some(message) = read.next().await {
-                    let mut write_lock3 = write_clone3.lock().await;
-                    let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
-                    // let response_data = func(message);
-                    if response_data.is_some() {
-                        let mut data = response_data.unwrap();
-                        data.label = lable.clone();
-                        let code = data.code.clone();
-                        /*
-                            200 -正确返回
-                           -200 -登录成功
-                           -201 -订阅成功
-                           -300 -客户端收到服务器心跳ping,需要响应
-                           -301 -客户端收到服务器心跳pong,需要响应
-                           -302 -客户端收到服务器心跳自定义,需要响应自定义
-                        */
-                        match code.as_str() {
-                            "200" => {
-                                if bool_v1.load(Ordering::Relaxed) {
-                                    read_tx.unbounded_send(data).unwrap();
-                                }
-                            }
-                            "-200" => {
-                                //登录成功
-                                trace!("登录成功:{:?}", data);
-                            }
-                            "-201" => {
-                                //订阅成功
-                                trace!("订阅成功:{:?}", data);
-                            }
-                            "-300" => {
-                                //服务器发送心跳 ping 给客户端,客户端需要pong回应
-                                trace!("服务器响应-ping");
-                                if data.data.len() > 0 {
-                                    write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
-                                    trace!("客户端回应服务器-pong");
-                                }
-                            }
-                            "-301" => {
-                                //服务器发送心跳 pong 给客户端,客户端需要ping回应
-                                trace!("服务器响应-pong");
-                                if data.data.len() > 0 {
-                                    write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
-                                    trace!("客户端回应服务器-ping");
+                    //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+                    // let stdin_to_ws = write_rx.map(Ok).forward(write);
+                    // Writing task
+                    let write_clone2 = Arc::clone(&write_arc);
+                    let stdin_to_ws = async {
+                        while let Some(message) = write_rx.next().await {
+                            let mut write_lock2 = write_clone2.lock().await;
+                            write_lock2.send(message).await?;
+                        }
+                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+                    };
+                    let write_clone3 = Arc::clone(&write_arc);
+                    let ws_to_stdout = async {
+                        while let Some(message) = read.next().await {
+                            let mut write_lock3 = write_clone3.lock().await;
+                            let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
+                            // let response_data = func(message);
+                            if response_data.is_some() {
+                                let mut data = response_data.unwrap();
+                                data.label = lable.clone();
+                                let code = data.code.clone();
+                                /*
+                                    200 -正确返回
+                                   -200 -登录成功
+                                   -201 -订阅成功
+                                   -300 -客户端收到服务器心跳ping,需要响应
+                                   -301 -客户端收到服务器心跳pong,需要响应
+                                   -302 -客户端收到服务器心跳自定义,需要响应自定义
+                                */
+                                match code.as_str() {
+                                    "200" => {
+                                        if bool_v1.load(Ordering::Relaxed) {
+                                            read_tx.unbounded_send(data).unwrap();
+                                        }
+                                    }
+                                    "-200" => {
+                                        //登录成功
+                                        trace!("登录成功:{:?}", data);
+                                    }
+                                    "-201" => {
+                                        //订阅成功
+                                        trace!("订阅成功:{:?}", data);
+                                    }
+                                    "-300" => {
+                                        //服务器发送心跳 ping 给客户端,客户端需要pong回应
+                                        trace!("服务器响应-ping");
+                                        if data.data.len() > 0 {
+                                            write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
+                                            trace!("客户端回应服务器-pong");
+                                        }
+                                    }
+                                    "-301" => {
+                                        //服务器发送心跳 pong 给客户端,客户端需要ping回应
+                                        trace!("服务器响应-pong");
+                                        if data.data.len() > 0 {
+                                            write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
+                                            trace!("客户端回应服务器-ping");
+                                        }
+                                    }
+                                    "-302" => {
+                                        //客户端收到服务器心跳自定义,需要响应自定义
+                                        trace!("特定字符心跳,特殊响应:{:?}", data);
+                                        write_lock3.send(Message::Text(data.data)).await?;
+                                        trace!("特殊字符心跳-回应完成");
+                                    }
+                                    _ => {
+                                        trace!("未知:{:?}",data);
+                                    }
                                 }
                             }
-                            "-302" => {
-                                //客户端收到服务器心跳自定义,需要响应自定义
-                                trace!("特定字符心跳,特殊响应:{:?}", data);
-                                write_lock3.send(Message::Text(data.data)).await?;
-                                trace!("特殊字符心跳-回应完成");
-                            }
-                            _ => {
-                                trace!("未知:{:?}",data);
-                            }
                         }
-                    }
+                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+                    };
+
+                    //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+                    pin_mut!(stdin_to_ws, ws_to_stdout,);
+                    future::select(stdin_to_ws, ws_to_stdout).await;
+                }
+                Err(e) => {
+                    trace!("WebSocket 握手失败:{:?}",e);
                 }
-                Ok::<(), tokio_tungstenite::tungstenite::Error>(())
-            };
-            // let ws_to_stdout = {
-            //     trace!("---1");
-            //     //读,循环读取,然后拿到 message,,然后开启异步处理 message,
-            //     let result = read.for_each(|message| async {
-            //         let response_data = func(message);
+            }
+            trace!("---5");
+            trace!("---4");
+            trace!("重启...");
+
+            // let (ws_stream, _) = connect_async(address_url.clone(), proxy).await.unwrap();
+            // trace!("WebSocket 握手完成。");
+            // let (write, mut read) = ws_stream.split();
+            //
+            // let write_arc = Arc::new(Mutex::new(write));
+            // let write_clone = Arc::clone(&write_arc);
+            // //订阅写入(包括订阅信息 )
+            // trace!("订阅内容:{:?}",subscribe_array.clone());
+            // for s in &subscribe_array {
+            //     let mut write_lock = write_clone.lock().await;
+            //     write_lock.send(Message::Text(s.parse().unwrap())).await?;
+            // }
+            //
+            // //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+            // // let stdin_to_ws = write_rx.map(Ok).forward(write);
+            // // Writing task
+            // let write_clone2 = Arc::clone(&write_arc);
+            // let stdin_to_ws = async {
+            //     while let Some(message) = write_rx.next().await {
+            //         let mut write_lock2 = write_clone2.lock().await;
+            //         write_lock2.send(message).await?;
+            //     }
+            //     Ok::<(), tokio_tungstenite::tungstenite::Error>(())
+            // };
+            // let write_clone3 = Arc::clone(&write_arc);
+            // let ws_to_stdout = async {
+            //     while let Some(message) = read.next().await {
+            //         let mut write_lock3 = write_clone3.lock().await;
+            //         let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
+            //         // let response_data = func(message);
             //         if response_data.is_some() {
             //             let mut data = response_data.unwrap();
             //             data.label = lable.clone();
             //             let code = data.code.clone();
-            //             if code.as_str() == "-1" {
-            //                 // let close_frame = CloseFrame {
-            //                 //     code: CloseCode::Normal,
-            //                 //     reason: Cow::Borrowed("Bye bye"),
-            //                 // };
-            //                 // let close_message = Message::Close(Some(close_frame));
-            //                 // write.send(close_message);
-            //             } else if code.as_str() == "200" {
-            //                 read_tx.unbounded_send(data).unwrap();
+            //             /*
+            //                 200 -正确返回
+            //                -200 -登录成功
+            //                -201 -订阅成功
+            //                -300 -客户端收到服务器心跳ping,需要响应
+            //                -301 -客户端收到服务器心跳pong,需要响应
+            //                -302 -客户端收到服务器心跳自定义,需要响应自定义
+            //             */
+            //             match code.as_str() {
+            //                 "200" => {
+            //                     if bool_v1.load(Ordering::Relaxed) {
+            //                         read_tx.unbounded_send(data).unwrap();
+            //                     }
+            //                 }
+            //                 "-200" => {
+            //                     //登录成功
+            //                     trace!("登录成功:{:?}", data);
+            //                 }
+            //                 "-201" => {
+            //                     //订阅成功
+            //                     trace!("订阅成功:{:?}", data);
+            //                 }
+            //                 "-300" => {
+            //                     //服务器发送心跳 ping 给客户端,客户端需要pong回应
+            //                     trace!("服务器响应-ping");
+            //                     if data.data.len() > 0 {
+            //                         write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
+            //                         trace!("客户端回应服务器-pong");
+            //                     }
+            //                 }
+            //                 "-301" => {
+            //                     //服务器发送心跳 pong 给客户端,客户端需要ping回应
+            //                     trace!("服务器响应-pong");
+            //                     if data.data.len() > 0 {
+            //                         write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
+            //                         trace!("客户端回应服务器-ping");
+            //                     }
+            //                 }
+            //                 "-302" => {
+            //                     //客户端收到服务器心跳自定义,需要响应自定义
+            //                     trace!("特定字符心跳,特殊响应:{:?}", data);
+            //                     write_lock3.send(Message::Text(data.data)).await?;
+            //                     trace!("特殊字符心跳-回应完成");
+            //                 }
+            //                 _ => {
+            //                     trace!("未知:{:?}",data);
+            //                 }
             //             }
             //         }
-            //     });
-            //     trace!("---3");
-            //     result
+            //     }
+            //     Ok::<(), tokio_tungstenite::tungstenite::Error>(())
             // };
-
-            //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
-            pin_mut!(stdin_to_ws, ws_to_stdout,);
-            future::select(stdin_to_ws, ws_to_stdout).await;
-            trace!("---5");
-            trace!("---4");
-            trace!("重启...");
+            //
+            // //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+            // pin_mut!(stdin_to_ws, ws_to_stdout,);
+            // future::select(stdin_to_ws, ws_to_stdout).await;
+            // trace!("---5");
+            // trace!("---4");
+            // trace!("重启...");
         }
         // return Ok(());
     }
@@ -190,6 +267,9 @@ impl AbstractWsMode {
                     HeartbeatType::Pong => {
                         Message::Pong(Vec::from("Pong"))
                     }
+                    HeartbeatType::Custom(ref str) => {
+                        Message::Text(str.parse().unwrap())
+                    }
                 }
             ).expect("发送失败");
             trace!("发送指令-心跳:{:?}",h_type);

+ 186 - 86
exchanges/tests/bybit_swap_test.rs

@@ -1,81 +1,162 @@
 use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
+use tokio::sync::Mutex;
+use tracing::trace;
 
 use exchanges::bybit_swap_rest::BybitSwapRest;
+use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
 
 const ACCESS_KEY: &str = "";
 const SECRET_KEY: &str = "";
 
 
 //ws-订阅公共频道信息
-// #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
-// async fn ws_custom_subscribe_pu() {
-//     global::log_utils::init_log_with_trace();
-//
-//
-//     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-//     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-//
-//
-//     let mut ws = get_ws(None, BybitSwapWsType::Public).await;
-//     ws.set_symbols(vec!["BTC_USDT".to_string()]);
-//     ws.set_subscribe(vec![
-//         // BybitSwapSubscribeType::PuBooks5,
-//         // BybitSwapSubscribeType::Putrades,
-//         BybitSwapSubscribeType::PuBooks50L2tbt,
-//         // BybitSwapSubscribeType::PuIndexTickers,
-//     ]);
-//
-//
-//     let write_tx_am = Arc::new(Mutex::new(write_tx));
-//     let bool_v1 = Arc::new(AtomicBool::new(true));
-//
-//     //读取
-//     let _bool_v1_clone = Arc::clone(&bool_v1);
-//     let _tr = tokio::spawn(async move {
-//         trace!("线程-数据读取-开启");
-//         loop {
-//             if let Some(data) = read_rx.next().await {
-//                 trace!("读取数据data:{:?}",data)
-//             }
-//         }
-//         // trace!("线程-数据读取-结束");
-//     });
-//
-//     //写数据
-//     // let bool_v2_clone = Arc::clone(&bool_v1);
-//     // let write_tx_clone = Arc::clone(&write_tx_am);
-//     // let su = ws.get_subscription();
-//     // let tw = tokio::spawn(async move {
-//     //     trace!("线程-数据写入-开始");
-//     //     loop {
-//     //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
-//     //         // let close_frame = CloseFrame {
-//     //         //     code: CloseCode::Normal,
-//     //         //     reason: Cow::Borrowed("Bye bye"),
-//     //         // };
-//     //         // let message = Message::Close(Some(close_frame));
-//     //
-//     //
-//     //         let message = Message::Text(su.clone());
-//     //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
-//     //         trace!("发送指令成功");
-//     //     }
-//     //     trace!("线程-数据写入-结束");
-//     // });
-//
-//     let t1 = tokio::spawn(async move {
-//         //链接
-//         let bool_v3_clone = Arc::clone(&bool_v1);
-//         ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-//         trace!("test 唯一线程结束--");
-//     });
-//     tokio::try_join!(t1).unwrap();
-//     trace!("当此结束");
-//     trace!("重启!");
-//     trace!("参考交易所关闭");
-//     return;
-// }
-//
+#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+async fn ws_custom_subscribe_pu() {
+    global::log_utils::init_log_with_trace();
+
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+
+    let mut ws = get_ws(None, BybitSwapWsType::Public).await;
+    ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    ws.set_subscribe(vec![
+        // BybitSwapSubscribeType::PuOrderBook1,
+        // BybitSwapSubscribeType::PuOrderBook50,
+        // BybitSwapSubscribeType::PuBlicTrade,
+        BybitSwapSubscribeType::PuTickers,
+    ]);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let bool_v1 = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _bool_v1_clone = Arc::clone(&bool_v1);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            if let Some(data) = read_rx.next().await {
+                trace!("读取数据data:{:?}",data)
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let t1 = tokio::spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
+
+//ws-订阅私有频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+async fn ws_custom_subscribe_pr() {
+    global::log_utils::init_log_with_trace();
+
+
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+    let logparam = BybitSwapLogin {
+        api_key: ACCESS_KEY.to_string(),
+        secret_key: SECRET_KEY.to_string(),
+    };
+
+    let mut ws = get_ws(Option::from(logparam), BybitSwapWsType::Private).await;
+    ws.set_symbols(vec!["BTC_USDT".to_string()]);
+    ws.set_subscribe(vec![
+        BybitSwapSubscribeType::PrPosition,
+        // BybitSwapSubscribeType::PrExecution,
+        // BybitSwapSubscribeType::PrOrder,
+        // BybitSwapSubscribeType::PrWallet,
+    ]);
+
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let bool_v1 = Arc::new(AtomicBool::new(true));
+
+    //读取
+    let _bool_v1_clone = Arc::clone(&bool_v1);
+    let _tr = tokio::spawn(async move {
+        trace!("线程-数据读取-开启");
+        loop {
+            if let Some(data) = read_rx.next().await {
+                trace!("读取数据data:{:?}",data)
+            }
+        }
+        // trace!("线程-数据读取-结束");
+    });
+
+    //写数据
+    // let bool_v2_clone = Arc::clone(&bool_v1);
+    // let write_tx_clone = Arc::clone(&write_tx_am);
+    // let su = ws.get_subscription();
+    // let tw = tokio::spawn(async move {
+    //     trace!("线程-数据写入-开始");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
+    //         // let close_frame = CloseFrame {
+    //         //     code: CloseCode::Normal,
+    //         //     reason: Cow::Borrowed("Bye bye"),
+    //         // };
+    //         // let message = Message::Close(Some(close_frame));
+    //
+    //
+    //         let message = Message::Text(su.clone());
+    //         AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
+    //         trace!("发送指令成功");
+    //     }
+    //     trace!("线程-数据写入-结束");
+    // });
+
+    let t1 = tokio::spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        trace!("test 唯一线程结束--");
+    });
+    tokio::try_join!(t1).unwrap();
+    trace!("当此结束");
+    trace!("重启!");
+    trace!("参考交易所关闭");
+    return;
+}
+
 
 //rest-服務器時間
 #[tokio::test]
@@ -87,6 +168,27 @@ async fn rest_get_server_time_test() {
     println!("Bybit--服務器時間--{:?}", req_data);
 }
 
+//rest-查詢最新行情信息
+#[tokio::test]
+async fn rest_get_tickers_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_tickers("DOGEUSDT".to_string()).await;
+    println!("Bybit--查詢最新行情信息--{:?}", req_data);
+}
+
+//rest-查詢市場價格K線數據
+#[tokio::test]
+async fn rest_get_kline_test() {
+    global::log_utils::init_log_with_trace();
+
+    let mut ret = get_rest();
+    let req_data = ret.get_kline("DOGEUSDT".to_string()).await;
+    println!("Bybit--查詢市場價格K線數據--{:?}", req_data);
+}
+
+
 //rest-查詢公告
 #[tokio::test]
 async fn rest_get_announcements_test() {
@@ -114,7 +216,7 @@ async fn rest_get_account_balance_test() {
     global::log_utils::init_log_with_trace();
 
     let mut ret = get_rest();
-    let req_data = ret.get_account_balance().await;
+    let req_data = ret.get_account_balance("USDT".to_string()).await;
     println!("Bybit--查詢錢包餘額--{:?}", req_data);
 }
 
@@ -124,7 +226,7 @@ async fn rest_get_positions_test() {
     global::log_utils::init_log_with_trace();
 
     let mut ret = get_rest();
-    let req_data = ret.get_positions("DOGEUSDT".to_string()).await;
+    let req_data = ret.get_positions("DOGEUSDT".to_string(), "".to_string()).await;
     println!("Bybit--查看持仓信息--{:?}", req_data);
 }
 
@@ -175,7 +277,7 @@ async fn rest_get_order_test() {
     global::log_utils::init_log_with_trace();
 
     let mut ret = get_rest();
-    let req_data = ret.get_order("DOGEUSDT".to_string(),
+    let req_data = ret.get_order("LINKUSDT".to_string(),
                                  "".to_string(), "".to_string()).await;
     println!("Bybit--查詢實時委託單--{:?}", req_data);
 }
@@ -192,23 +294,21 @@ async fn rest_cancel_order_test() {
     println!("Bybit--撤单--{:?}", req_data);
 }
 
+//rest-撤銷所有訂單
+#[tokio::test]
+async fn rest_cancel_orders_test() {
+    global::log_utils::init_log_with_trace();
 
-// //rest-查詢實時委託單
-// #[tokio::test]
-// async fn rest_get_order_test() {
-//     global::log_utils::init_log_with_trace();
-//
-//     let mut ret = get_rest();
-//     let req_data = ret.get_order("BTCUSDT".to_string(),
-//                                  "".to_string(), "".to_string()).await;
-//     println!("Bybit--查詢可交易產品的規格信息--{:?}", req_data);
-// }
+    let mut ret = get_rest();
+    let req_data = ret.cancel_orders("DOGEUSDT".to_string()).await;
+    println!("Bybit--撤銷所有訂單--{:?}", req_data);
+}
 
 
-// async fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
-//     let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
-//     ku_ws
-// }
+async fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
+    let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
+    ku_ws
+}
 
 fn get_rest() -> BybitSwapRest {
     let mut btree_map: BTreeMap<String, String> = BTreeMap::new();

+ 1 - 1
global/src/params.rs

@@ -48,7 +48,7 @@ pub struct Params {
     // 日志级别,从低到高依次是:[trace, debug, info, warn, error]
     pub log_level: String,
     // 中控端口
-    pub port: u32,
+    pub port: u32
 }
 
 impl Params {

+ 1 - 1
global/src/public_params.rs

@@ -29,6 +29,6 @@ pub const COINEX_USDT_SWAP_LIMIT:i64 = 20;
 pub const OKEX_USDT_SWAP_LIMIT:i64 = 30;
 pub const BITGET_USDT_SWAP_LIMIT:i64 = 10;
 pub const BITGET_USDT_SPOT_LIMIT:i64 = 100;
-pub const BYBIT_USDT_SWAP_LIMIT:i64 = 1;
+pub const BYBIT_USDT_SWAP_LIMIT:i64 = 10;
 pub const MEXC_SPOT_LIMIT:i64 = 333;
 pub const RATIO:i64 = 4;

+ 2 - 0
src/main.rs

@@ -79,4 +79,6 @@ async fn main() {
     let mut quant = quant_arc.lock().await;
     quant.exit().await;
     info!("程序已退出!为以防万一,请再次检查仓位和订单!");
+    // 强制退出
+    std::process::exit(0);
 }

+ 770 - 0
standard/src/bybit_swap.rs

@@ -0,0 +1,770 @@
+use std::collections::{BTreeMap, HashMap};
+use std::io::{Error, ErrorKind};
+use std::str::FromStr;
+use tokio::sync::mpsc::Sender;
+use async_trait::async_trait;
+use rust_decimal::Decimal;
+use serde_json::{from_str, from_value, json, Value};
+use futures::stream::FuturesUnordered;
+use futures::{TryStreamExt};
+use rust_decimal::prelude::FromPrimitive;
+use serde::{Deserialize, Serialize};
+use tracing::{error, debug, trace};
+use exchanges::bybit_swap_rest::BybitSwapRest;
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
+use global::trace_stack::TraceStack;
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct SwapTicker{
+    symbol: String,
+    high_price24h: Decimal,
+    low_price24h: Decimal,
+    bid1_price: Decimal,
+    ask1_price: Decimal,
+    last_price: Decimal,
+    volume24h: Decimal
+}
+
+#[allow(dead_code)]
+#[derive(Clone)]
+pub struct BybitSwap {
+    exchange: ExchangeEnum,
+    symbol: String,
+    symbol_uppercase: String,
+    is_colo: bool,
+    params: BTreeMap<String, String>,
+    request: BybitSwapRest,
+    market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
+}
+
+impl BybitSwap {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> BybitSwap {
+        let market = Market::new();
+        let mut bybit_swap = BybitSwap {
+            exchange: ExchangeEnum::BybitSwap,
+            symbol: symbol.to_uppercase(),
+            symbol_uppercase: symbol.replace("_", "").to_uppercase(),
+            is_colo,
+            params: params.clone(),
+            request: BybitSwapRest::new(is_colo, params.clone()),
+            market,
+            order_sender,
+            error_sender,
+        };
+
+        // 修改持仓模式
+        let symbol_array: Vec<&str> = symbol.split("_").collect();
+        let mode_result = bybit_swap.set_dual_mode(symbol_array[1], true).await;
+        match mode_result {
+            Ok(_) => {
+                trace!("Bybit:设置持仓模式成功!")
+            }
+            Err(error) => {
+                error!("Bybit:设置持仓模式失败!mode_result={}", error)
+            }
+        }
+        // 获取市场信息
+        bybit_swap.market = BybitSwap::get_market(&mut bybit_swap).await.unwrap_or(bybit_swap.market);
+        return bybit_swap;
+    }
+}
+
+#[async_trait]
+impl Platform for BybitSwap {
+    // 克隆方法
+    fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
+    // 获取交易所模式
+    fn get_self_exchange(&self) -> ExchangeEnum {
+        ExchangeEnum::GateSwap
+    }
+    // 获取交易对
+    fn get_self_symbol(&self) -> String { self.symbol.clone() }
+    // 获取是否使用高速通道
+    fn get_self_is_colo(&self) -> bool {
+        self.is_colo
+    }
+    // 获取params信息
+    fn get_self_params(&self) -> BTreeMap<String, String> {
+        self.params.clone()
+    }
+    // 获取market信息
+    fn get_self_market(&self) -> Market { self.market.clone() }
+    // 获取请求时间
+    fn get_request_delays(&self) -> Vec<i64> { self.request.get_delays() }
+    // 获取请求平均时间
+    fn get_request_avg_delay(&self) -> Decimal { self.request.get_avg_delay() }
+    // 获取请求最大时间
+    fn get_request_max_delay(&self) -> i64 { self.request.get_max_delay() }
+
+    // 获取服务器时间
+    async fn get_server_time(&mut self) -> Result<String, Error> {
+        let res_data = self.request.get_server_time().await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: serde_json::Value = serde_json::from_str(res_data_str).unwrap();
+            let result = res_data_json["server_time"].to_string();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取账号信息
+    async fn get_account(&mut self) -> Result<Account, Error> {
+        let symbol_array: Vec<&str> = self.symbol.split("_").collect();
+        let res_data = self.request.get_account_balance(symbol_array[1].parse().unwrap()).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str).unwrap();
+            let arr_infos: Vec<Value> = from_value(res_data_json["list"].clone()).unwrap();
+            if arr_infos.len() < 1usize{
+                return Err(Error::new(ErrorKind::NotFound, format!("{} 无账户信息", symbol_array[1])));
+            }
+            let coin_infos: Vec<Value> = from_value(arr_infos[0]["coin"].clone()).unwrap();
+            if coin_infos.len() < 1usize{
+               return Err(Error::new(ErrorKind::NotFound, format!("{} 无账户信息", symbol_array[1])));
+            }
+            let balance = Decimal::from_str(coin_infos[0]["equity"].as_str().unwrap()).unwrap();
+            let available_balance = Decimal::from_str(coin_infos[0]["walletBalance"].as_str().unwrap()).unwrap();
+            let frozen_balance = balance - available_balance;
+            let result = Account {
+                coin: symbol_array[1].to_string(),
+                balance,
+                available_balance,
+                frozen_balance,
+                stocks: Decimal::ZERO,
+                available_stocks: Decimal::ZERO,
+                frozen_stocks: Decimal::ZERO,
+            };
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bybit_swap:该交易所方法未实现".to_string()))
+    }
+
+    // 获取持仓信息
+    async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let ct_val = self.market.ct_val;
+        let res_data = self.request.get_positions(symbol, "".to_string()).await;
+        if res_data.code == "200" {
+            let res_data_str: Value = from_str(&res_data.data).unwrap();
+            let res_data_json: Vec<Value> = from_value(res_data_str["list"].clone()).unwrap();
+            let result = res_data_json.iter().map(|item| { format_position_item(item, ct_val) }).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取所有持仓
+    async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
+        let symbol_array: Vec<&str> = self.symbol.split("_").collect();
+        let ct_val = self.market.ct_val;
+        let res_data = self.request.get_positions("".to_string(), symbol_array[1].to_string().to_uppercase()).await;
+        if res_data.code == "200" {
+            let res_data_str: Value = from_str(&res_data.data).unwrap();
+            let res_data_json: Vec<Value> = from_value(res_data_str["list"].clone()).unwrap();
+            let result = res_data_json.iter().map(|item| { format_position_item(item, ct_val) }).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取市场行情
+    async fn get_ticker(&mut self) -> Result<Ticker, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.get_tickers(symbol).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str).unwrap();
+            let list :Vec<SwapTicker>= from_value(res_data_json["list"].clone()).unwrap_or(Vec::new());
+
+            if list.len() < 1usize {
+                error!("bybit_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
+                return Err(Error::new(ErrorKind::Other, res_data.to_string()));
+            }
+            let value = list[0].clone();
+            Ok(Ticker{
+                time: chrono::Utc::now().timestamp_millis(),
+                high: value.high_price24h,
+                low: value.low_price24h,
+                sell: value.ask1_price,
+                buy: value.bid1_price,
+                last: value.last_price,
+                volume: value.volume24h
+            })
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
+        let symbol_upper = symbol.replace("_", "").to_uppercase();
+        let res_data = self.request.get_tickers(symbol_upper.clone()).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let arr: Value = from_str(res_data_str).unwrap();
+            let list :Vec<SwapTicker> = from_value(arr["list"].clone()).unwrap();
+            let ticker_info = list.iter().find(|&item| item.symbol == symbol_upper);
+
+            match ticker_info {
+                None => {
+                    error!("bybit_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let result = Ticker {
+                        time: chrono::Utc::now().timestamp_millis(),
+                        high: value.high_price24h,
+                        low: value.low_price24h,
+                        sell: value.ask1_price,
+                        buy: value.bid1_price,
+                        last: value.last_price,
+                        volume: value.volume24h
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market(&mut self) -> Result<Market, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.get_instruments_info(symbol.clone()).await;
+        if res_data.code == "200" {
+            let res_data_str = res_data.data.clone();
+            let res_data_json: Value = from_str(res_data_str.as_str()).unwrap();
+            let arr_data: Vec<Value> = from_value(res_data_json["list"].clone()).unwrap();
+            let market_info = arr_data.iter().find(|&item| item["symbol"].as_str().unwrap() == symbol);
+            match market_info {
+                None => {
+                    error!("bybit_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let base_coin = value["baseCoin"].as_str().unwrap();
+                    let quote_coin = value["quoteCoin"].as_str().unwrap();
+                    let name = format!("{}_{}",base_coin, quote_coin);
+                    let tick_size = Decimal::from_str(value["priceFilter"]["minPrice"].as_str().unwrap().trim()).unwrap();
+                    let min_qty = Decimal::from_str(value["lotSizeFilter"]["minOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let max_qty = Decimal::from_str(value["lotSizeFilter"]["maxOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let ct_val = Decimal::ONE;
+
+                    let amount_size = min_qty * ct_val;
+                    let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
+                    let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
+                    let min_notional = min_qty * ct_val;
+                    let max_notional = max_qty * ct_val;
+
+                    let result = Market {
+                        symbol: name,
+                        base_asset: base_coin.to_string(),
+                        quote_asset: quote_coin.to_string(),
+                        tick_size,
+                        amount_size,
+                        price_precision,
+                        amount_precision,
+                        min_qty,
+                        max_qty,
+                        min_notional,
+                        max_notional,
+                        ct_val,
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
+        let symbol = symbol.replace("_", "").to_uppercase();
+        let res_data = self.request.get_instruments_info(symbol.clone()).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str.as_str()).unwrap();
+            let arr_data: Vec<Value> = from_value(res_data_json["list"].clone()).unwrap();
+            let market_info = arr_data.iter().find(|item| item["symbol"].as_str().unwrap() == symbol);
+            match market_info {
+                None => {
+                    error!("bybit_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let base_coin = value["baseCoin"].as_str().unwrap();
+                    let quote_coin = value["quoteCoin"].as_str().unwrap();
+                    let name = format!("{}_{}",base_coin, quote_coin);
+                    let tick_size = Decimal::from_str(value["priceFilter"]["minPrice"].as_str().unwrap().trim()).unwrap();
+                    let min_qty = Decimal::from_str(value["lotSizeFilter"]["minOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let max_qty = Decimal::from_str(value["lotSizeFilter"]["maxOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let ct_val = Decimal::ONE;
+
+                    let amount_size = min_qty * ct_val;
+                    let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
+                    let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
+                    let min_notional = min_qty * ct_val;
+                    let max_notional = max_qty * ct_val;
+
+                    let result = Market {
+                        symbol: name,
+                        base_asset: base_coin.to_string(),
+                        quote_asset: quote_coin.to_string(),
+                        tick_size,
+                        amount_size,
+                        price_precision,
+                        amount_precision,
+                        min_qty,
+                        max_qty,
+                        min_notional,
+                        max_notional,
+                        ct_val,
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 获取订单详情
+    async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let ct_val = self.market.ct_val;
+        let id = if !custom_id.trim().eq("") { format!("t-{}", custom_id) } else { String::new() };
+        let res_data = self.request.get_order(symbol, order_id.parse().unwrap(), id).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str::<Value>(res_data_str).unwrap()["list"].clone();
+            if res_data_json.is_array() && res_data_json.as_array().unwrap().len() == 0 {
+                return Err(Error::new(ErrorKind::Other, "没有该订单!"));
+            }
+            let result = format_order_item(res_data_json.as_array().unwrap()[0].clone(), ct_val);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取订单列表
+    async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> {
+       Err(Error::new(ErrorKind::Other, "bybit获取订单列表暂未实现".to_string()))
+    }
+    // 下单接口
+    async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let ct_val = self.market.ct_val;
+        let size = (amount / ct_val).floor();
+        let mut params = json!({
+            "orderLinkId": format!("t-{}", custom_id),
+            "symbol": symbol.to_string(),
+            "price": price.to_string(),
+            "category": "linear",
+            "orderType":"Limit",
+            "qty": json!(size),
+            // 0.單向持倉 1.買側雙向持倉 2.賣側雙向持倉
+            "positionIdx": json!(1),
+            "reduceOnly": json!(false)
+        });
+
+        if price.eq(&Decimal::ZERO) {
+            params["timeInForce"] = json!("IOC".to_string());
+        }
+        match origin_side {
+            "kd" => {
+                params["side"] = json!("Buy");
+            }
+            "pd" => {
+                params["side"] = json!("Sell");
+                // 减仓
+                params["reduceOnly"] = json!(true);
+            }
+            "kk" => {
+                params["side"] = json!("Sell");
+                params["positionIdx"] = json!(2);
+            }
+            "pk" => {
+                params["side"] = json!("Buy");
+                // 减仓
+                params["reduceOnly"] = json!(true);
+                params["positionIdx"] = json!(2);
+            }
+            _ => { error!("下单参数错误"); }
+        };
+        let res_data = self.request.swap_order(params).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str).unwrap();
+            let result = format_new_order_item(res_data_json, price, size);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol_upper = symbol.replace("_", "").trim().to_uppercase();
+        let size = (amount / ct_val).floor();
+        let mut params = json!({
+            "orderLinkId": format!("t-{}", custom_id),
+            "symbol": symbol_upper,
+            "price": price.to_string(),
+            "category": "linear",
+            "orderType":"Limit",
+            "qty": json!(size),
+            // 0.單向持倉 1.買側雙向持倉 2.賣側雙向持倉
+            "positionIdx": json!(1),
+            "reduceOnly": json!(false)
+        });
+
+        if price.eq(&Decimal::ZERO) {
+            params["timeInForce"] = json!("IOC".to_string());
+        }
+        match origin_side {
+            "kd" => {
+                params["side"] = json!("Buy");
+            }
+            "pd" => {
+                params["side"] = json!("Sell");
+                params["positionIdx"] = json!(1);
+                // 减仓
+                params["reduceOnly"] = json!(true);
+            }
+            "kk" => {
+                params["side"] = json!("Sell");
+                params["positionIdx"] = json!(2);
+            }
+            "pk" => {
+                params["side"] = json!("Buy");
+                params["positionIdx"] = json!(2);
+                // 减仓
+                params["reduceOnly"] = json!(true);
+            }
+            _ => { error!("下单参数错误"); }
+        };
+        let res_data = self.request.swap_order(params.clone()).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str).unwrap();
+            let result = format_new_order_item(res_data_json, price, size);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 撤销订单
+    async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let id = format!("t-{}", custom_id);
+        let res_data = self.request.cancel_order(symbol, String::from(order_id), id.clone()).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str).unwrap();
+            let result = format_cancel_order_item(res_data_json);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 批量撤销订单
+    async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.cancel_orders(symbol).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str).unwrap();
+            let res_arr: Vec<Value> = from_value(res_data_json).unwrap();
+            let result = res_arr.iter().map(|item| format_cancel_order_item(item.clone())).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.cancel_orders(symbol).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let res_data_json: Value = from_str(res_data_str).unwrap();
+            let res_arr: Vec<Value> = from_value(res_data_json["list"].clone()).unwrap();
+            let result = res_arr.iter().map(|item| format_cancel_order_item(item.clone())).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 设置持仓模式
+    async fn set_dual_mode(&mut self, _coin: &str, is_dual_mode: bool) -> Result<String, Error> {
+        let coin_format = self.symbol_uppercase.clone();
+        let mut mod_num = 0;
+        if is_dual_mode {
+            mod_num = 3;
+        }
+        let res_data = self.request.set_position_mode(coin_format, mod_num).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let result = res_data_str.clone();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 更新双持仓模式下杠杆
+    async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.set_leverage(symbol, leverage.to_string()).await;
+        if res_data.code == "200" {
+            let res_data_str = &res_data.data;
+            let result = res_data_str.clone();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate:该交易所方法未实现".to_string())) }
+
+    // 交易账户互转
+    async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
+        // let coin_format = coin.to_string().to_lowercase();
+        // let res_data = self.request.wallet_transfers(coin_format.clone(), from.to_string(), to.to_string(), amount.to_string(), coin_format.clone()).await;
+        // if res_data.code == "200" {
+        //     let res_data_str = &res_data.data;
+        //     let result = res_data_str.clone();
+        //     Ok(result)
+        // } else {
+        //     Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        // }
+        Err(Error::new(ErrorKind::Other, "暂未实现!"))
+    }
+
+    // 指令下单
+    async fn command_order(&mut self, order_command: OrderCommand, trace_stack: TraceStack) {
+        let mut handles = vec![];
+        // 撤销订单
+        let cancel = order_command.cancel;
+        for item in cancel.keys() {
+            let mut self_clone = self.clone();
+            let cancel_clone = cancel.clone();
+            let item_clone = item.clone();
+            let order_id = cancel_clone.get(&item_clone).unwrap().get(1).unwrap_or(&"".to_string()).clone();
+            let custom_id = cancel_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
+            let handle = tokio::spawn(async move {
+                let result = self_clone.cancel_order(&order_id, &custom_id).await;
+                match result {
+                    Ok(_) => {
+                        // result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        // 取消失败去查订单。
+                        let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
+                        match query_rst {
+                            Ok(order) => {
+                                result_sd.send(order).await.unwrap();
+                            }
+                            Err(_err) => {
+                                // error!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
+                                // panic!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
+                            }
+                        }
+                        err_sd.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+        // 下单指令
+        let mut limits = HashMap::new();
+        limits.extend(order_command.limits_open);
+        limits.extend(order_command.limits_close);
+        for item in limits.keys() {
+            let mut self_clone = self.clone();
+            let limits_clone = limits.clone();
+            let item_clone = item.clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
+            let mut ts = trace_stack.clone();
+
+            let handle = tokio::spawn(async move {
+                let value = limits_clone[&item_clone].clone();
+                let amount = Decimal::from_str(value.get(0).unwrap_or(&"0".to_string())).unwrap();
+                let side = value.get(1).unwrap();
+                let price = Decimal::from_str(value.get(2).unwrap_or(&"0".to_string())).unwrap();
+                let cid = value.get(3).unwrap();
+
+                //  order_name: [数量,方向,价格,c_id]
+                let result = self_clone.take_order(cid, side, price, amount).await;
+                match result {
+                    Ok(mut result) => {
+                        // 记录此订单完成时间
+                        ts.on_after_send();
+                        result.trace_stack = ts;
+
+                        result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        let mut err_order = Order::new();
+                        err_order.custom_id = cid.clone();
+                        err_order.status = "REMOVE".to_string();
+
+                        result_sd.send(err_order).await.unwrap();
+                        err_sd.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+        // 检查订单指令
+        let check = order_command.check;
+        for item in check.keys() {
+            let mut self_clone = self.clone();
+            let check_clone = check.clone();
+            let item_clone = item.clone();
+            let order_id = check_clone[&item_clone].get(1).unwrap_or(&"".to_string()).clone();
+            let custom_id = check_clone[&item_clone].get(0).unwrap_or(&"".to_string()).clone();
+            let result_sd = self.order_sender.clone();
+            let err_sd = self.error_sender.clone();
+            let handle = tokio::spawn(async move {
+                let result = self_clone.get_order_detail(&order_id, &custom_id).await;
+                match result {
+                    Ok(result) => {
+                        result_sd.send(result).await.unwrap();
+                    }
+                    Err(error) => {
+                        err_sd.send(error).await.unwrap();
+                    }
+                }
+            });
+            handles.push(handle)
+        }
+
+        let futures = FuturesUnordered::from_iter(handles);
+        let _: Result<Vec<_>, _> = futures.try_collect().await;
+    }
+}
+
+pub fn format_position_item(position: &Value, ct_val: Decimal) -> Position {
+    let position_idx = position["positionIdx"].to_string();
+    let mut position_mode = match position_idx.as_str() {
+        "0" => PositionModeEnum::Both,
+        "1" => PositionModeEnum::Long,
+        "2" => PositionModeEnum::Short,
+        _ => {
+            error!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position);
+            panic!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
+        }
+    };
+    let size_str: String = from_value(position["size"].clone()).unwrap();
+    let size = Decimal::from_str(size_str.as_str()).unwrap();
+    let amount = size * ct_val;
+    let mut profit = Decimal::ZERO;
+    let profit_str = position["unrealisedPnl"].as_str().unwrap_or("0");
+    if profit_str != "" {
+        profit = Decimal::from_str(profit_str).unwrap();
+    }
+
+    match position_mode {
+        PositionModeEnum::Both => {
+            position_mode = match amount {
+                amount if amount > Decimal::ZERO => PositionModeEnum::Long,
+                amount if amount < Decimal::ZERO => PositionModeEnum::Short,
+                _ => { PositionModeEnum::Both }
+            }
+        }
+        _ => {}
+    }
+    Position {
+        symbol: position["symbol"].as_str().unwrap_or("").parse().unwrap(),
+        margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
+        amount,
+        frozen_amount: Decimal::ZERO,
+        price: Decimal::from_str(position["avgPrice"].as_str().unwrap()).unwrap(),
+        profit,
+        position_mode,
+        margin: Decimal::from_str(position["positionBalance"].as_str().unwrap()).unwrap(),
+    }
+}
+
+fn format_cancel_order_item(order: Value) -> Order {
+     Order {
+        id: format!("{}", order["orderId"].as_str().unwrap()),
+        custom_id: order["orderLinkId"].as_str().unwrap().replace("t-my-custom-id_", "").replace("t-", ""),
+        price: Decimal::ZERO,
+        amount: Decimal::ZERO,
+        deal_amount: Decimal::ZERO,
+        avg_price: Decimal::ZERO,
+        status: "REMOVE".to_string(),
+        order_type: "limit".to_string(),
+        trace_stack: TraceStack::default().on_special("688 trace_stack".to_string())
+    }
+}
+
+fn format_new_order_item(order: Value, price: Decimal, amount: Decimal) -> Order {
+    Order {
+        id: format!("{}", order["orderId"].as_str().unwrap()),
+        custom_id: order["orderLinkId"].as_str().unwrap().replace("t-my-custom-id_", "").replace("t-", ""),
+        price,
+        amount,
+        deal_amount: Decimal::ZERO,
+        avg_price: price,
+        status: "NEW".to_string(),
+        order_type: "limit".to_string(),
+        trace_stack: TraceStack::default().on_special("688 trace_stack".to_string())
+    }
+}
+
+pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
+    debug!("format-order-start, bybit_swap");
+    debug!(?order);
+    let status = order["orderStatus"].as_str().unwrap_or("");
+    let text = order["orderLinkId"].as_str().unwrap_or("");
+    let mut size = Decimal::ZERO;
+    let mut deal_amount = Decimal::ZERO;
+    let mut avg_price = Decimal::ZERO;
+
+    let right_str = order["cumExecQty"].to_string();
+    let size_str = order["qty"].to_string();
+
+    if !order.get("qty").is_some() {
+        size = Decimal::from_str(size_str.as_str()).unwrap();
+        let right_val = Decimal::from_str(order["cumExecValue"].as_str().unwrap()).unwrap();
+        let right = Decimal::from_str(right_str.as_str()).unwrap();
+        if right != Decimal::ZERO {
+            avg_price = right_val / right;
+        }
+        deal_amount = right * ct_val;
+    }
+
+    let amount = size * ct_val;
+    let custom_status = if status == "Filled" || status == "Cancelled" { "REMOVE".to_string() } else if status == "New" { "NEW".to_string() } else {
+        "NULL".to_string()
+    };
+    let rst_order = Order {
+        id: format!("{}", order["orderId"].as_str().unwrap()),
+        custom_id: text.replace("t-my-custom-id_", "").replace("t-", ""),
+        price: Decimal::from_str(order["price"].as_str().unwrap()).unwrap(),
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: "limit".to_string(),
+        trace_stack: TraceStack::default().on_special("688 trace_stack".to_string()),
+    };
+    debug!(?rst_order);
+    debug!("format-order-end, bybit_swap");
+    return rst_order;
+}

+ 188 - 0
standard/src/bybit_swap_handle.rs

@@ -0,0 +1,188 @@
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
+use serde_json::{from_str, from_value};
+use toml::Value;
+use tracing::{debug, error};
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
+
+// 处理账号信息
+pub fn handle_account_info(res_data: ResponseData, symbol: String) -> Account {
+    let res_data_str = res_data.data;
+    let res_data_json: Vec<serde_json::Value> = from_str(&res_data_str).unwrap();
+    format_account_info(res_data_json, symbol)
+}
+
+pub fn format_account_info(data: Vec<serde_json::Value>, symbol: String) -> Account {
+    let account = data.iter().find(| &item | item["accountType"] == "UNIFIED");
+    match account {
+        None => {
+            error!("Bybit:格式化统一账户信息错误!\nformat_account_info: data={:?}", data);
+            panic!("Bybit:格式化统一账户信息错误!\nformat_account_info: data={:?}", data)
+        }
+        Some(val) =>{
+            let arr: Vec<Value> = serde_json::from_value(val["coin"].clone()).unwrap();
+            let upper_str = symbol.to_uppercase();
+            let symbol_array: Vec<&str> = upper_str.split("_").collect();
+            let balance_info = arr.iter().find(|&item| item["coin"].as_str().unwrap() == symbol_array[1]);
+            match balance_info {
+                None => {
+                    error!("Bybit:格式化usdt余额信息错误!\nformat_account_info: data={:?}", balance_info);
+                    panic!("Bybit:格式化usdt余额信息错误!\nformat_account_info: data={:?}", balance_info)
+                }
+                Some(value) => {
+                    let balance = Decimal::from_str(&value["walletBalance"].as_str().unwrap().to_string()).unwrap();
+                    Account {
+                        coin: symbol_array[1].to_string(),
+                        balance,
+                        available_balance: Decimal::ZERO,
+                        frozen_balance: Decimal::ZERO,
+                        stocks: Decimal::ZERO,
+                        available_stocks: Decimal::ZERO,
+                        frozen_stocks: Decimal::ZERO,
+                    }
+                }
+            }
+        }
+    }
+
+
+
+}
+
+// 处理position信息
+pub fn handle_position(res_data: ResponseData, ct_val: Decimal) -> Vec<Position> {
+    let res_data_json: Vec<serde_json::Value> = from_str(&res_data.data).unwrap();
+    res_data_json.iter().map(|item| { format_position_item(item, ct_val) }).collect()
+}
+
+pub fn format_position_item(position: &serde_json::Value, ct_val: Decimal) -> Position {
+    let position_idx: String = position["positionIdx"].to_string();
+    let mut position_mode = match position_idx.as_str() {
+        "0" => PositionModeEnum::Both,
+        "1" => PositionModeEnum::Long,
+        "2" => PositionModeEnum::Short,
+        _ => {
+            error!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position);
+            panic!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
+        }
+    };
+    let symbol_mapper =  position["symbol"].as_str().unwrap().to_string();
+    let currency = "USDT";
+    let coin = &symbol_mapper[..symbol_mapper.find(currency).unwrap_or(0)];
+    let size_str: String = from_value(position["size"].clone()).unwrap();
+    let size = Decimal::from_str(size_str.as_str()).unwrap();
+    let amount = size * ct_val;
+    match position_mode {
+        PositionModeEnum::Both => {
+            position_mode = match amount {
+                amount if amount > Decimal::ZERO => PositionModeEnum::Long,
+                amount if amount < Decimal::ZERO => PositionModeEnum::Short,
+                _ => { PositionModeEnum::Both }
+            }
+        }
+        _ => {}
+    }
+    Position {
+        symbol: format!{"{}_{}", coin, currency},
+        margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
+        amount,
+        frozen_amount: Decimal::ZERO,
+        price: Decimal::from_str(position["entryPrice"].as_str().unwrap()).unwrap(),
+        profit: Decimal::from_str(position["unrealisedPnl"].as_str().unwrap()).unwrap(),
+        position_mode,
+        margin: Decimal::from_str(position["positionBalance"].as_str().unwrap()).unwrap(),
+    }
+}
+
+// 处理order信息
+pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
+    let res_data_str = res_data.data;
+    let res_data_json: Vec<serde_json::Value> = from_str(&*res_data_str).unwrap();
+    let mut order_info = Vec::new();
+    for item in res_data_json.iter() {
+        order_info.push(format_order_item(item.clone(), ct_val));
+    };
+
+    SpecialOrder {
+        name: res_data.label,
+        order: order_info,
+    }
+}
+
+pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
+    debug!("format-order-start, bybit_handle");
+    debug!(?order);
+    let status = order["orderStatus"].as_str().unwrap_or("");
+    let text = order["orderLinkId"].as_str().unwrap_or("");
+    let size = Decimal::from_str(order["qty"].as_str().unwrap()).unwrap();
+    let right = Decimal::from_str(order["cumExecQty"].as_str().unwrap()).unwrap();
+    let right_val = Decimal::from_str(order["cumExecValue"].as_str().unwrap()).unwrap();
+    let price = Decimal::from_str(order["price"].as_str().unwrap()).unwrap();
+    let amount = size * ct_val;
+    let mut avg_price = Decimal::ZERO;
+    if right != Decimal::ZERO {
+        avg_price = right_val / right;
+    }
+    let deal_amount = right * ct_val;
+    let custom_status = if status == "Filled" || status == "Cancelled" { "REMOVE".to_string() } else if status == "New" { "NEW".to_string() } else {
+       "NULL".to_string()
+    };
+    let rst_order = Order {
+        id: format!("{}", order["orderId"].as_str().unwrap()),
+        custom_id: text.replace("t-my-custom-id_", "").replace("t-", ""),
+        price,
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: "limit".to_string(),
+        trace_stack: TraceStack::default().on_special("120 bybit_handle".to_string()),
+    };
+
+    debug!(?rst_order);
+    debug!("format-order-end, bybit_handle");
+    return rst_order;
+}
+// 处理特殊Ticket信息
+pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
+    let res_data_str = res_data.data;
+    let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
+    format_special_ticker(res_data_json, res_data.label)
+}
+
+pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+    let depth_asks = format_depth_items(data["a"].clone());
+    let depth_bids = format_depth_items(data["b"].clone());
+    let t = Decimal::from_str(&data["ts"].to_string()).unwrap();
+    let create_at = data["t"].as_i64().unwrap() * 1000;
+
+    let ap = depth_asks[0].price;
+    let bp = depth_bids[0].price;
+    let aq = depth_asks[0].amount;
+    let bq = depth_bids[0].amount;
+    let mp = (bp + ap) * dec!(0.5);
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
+    let depth_info = vec![bp, bq, ap, aq];
+    SpecialDepth {
+        name: label,
+        depth: depth_info,
+        ticker: ticker_info,
+        t,
+        create_at,
+    }
+}
+
+pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
+    let mut depth_items: Vec<MarketOrder> = vec![];
+    for val in value.as_array().unwrap() {
+        let arr = val.as_array().unwrap();
+        depth_items.push(MarketOrder {
+            price: Decimal::from_str(arr[0].as_str().unwrap()).unwrap(),
+            amount: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
+        })
+    }
+    return depth_items;
+}

+ 5 - 0
standard/src/exchange.rs

@@ -8,6 +8,7 @@ use crate::gate_spot::GateSpot;
 use crate::gate_swap::GateSwap;
 use crate::kucoin_swap::KucoinSwap;
 use crate::bitget_spot::BitgetSpot;
+use crate::bybit_swap::BybitSwap;
 use crate::kucoin_spot::KucoinSpot;
 use crate::okx_swap::OkxSwap;
 
@@ -27,6 +28,7 @@ pub enum ExchangeEnum {
     KucoinSpot,
     OkxSwap,
     BitgetSpot,
+    BybitSwap
 }
 
 /// Exchange结构体
@@ -92,6 +94,9 @@ impl Exchange {
             ExchangeEnum::BitgetSpot => {
                 Box::new(BitgetSpot::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
+            ExchangeEnum::BybitSwap => {
+                Box::new(BybitSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
+            }
             _ => {
                 // BinanceSpot
                 Box::new(BinanceSpot::new(symbol, is_colo, params))

+ 162 - 113
standard/src/handle_info.rs

@@ -3,17 +3,25 @@ use std::str::FromStr;
 use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
-use tracing::error;
+use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::public_params;
 use crate::exchange::ExchangeEnum;
-use crate::{Account, binance_handle, binance_spot_handle, bitget_spot_handle, gate_handle, kucoin_handle, kucoin_spot_handle, MarketOrder, okx_handle, Position, SpecialDepth, SpecialOrder, SpecialTicker};
+use crate::{Account, binance_handle, binance_spot_handle, bitget_spot_handle, gate_handle, kucoin_handle, kucoin_spot_handle, bybit_swap_handle, MarketOrder, okx_handle, Position, SpecialDepth, SpecialOrder, SpecialTicker};
 
 #[allow(dead_code)]
 pub struct HandleSwapInfo;
 
+pub struct DepthParam {
+    pub depth_asks: Vec<MarketOrder>,
+    pub depth_bids: Vec<MarketOrder>,
+    pub t: Decimal,
+    pub create_at: i64
+}
+
 #[allow(dead_code)]
 impl HandleSwapInfo {
+
     // 处理账号信息
     pub fn handle_account_info(exchange: ExchangeEnum, res_data: ResponseData, symbol: String) -> Account {
         match exchange {
@@ -35,6 +43,9 @@ impl HandleSwapInfo {
             }
             ExchangeEnum::BitgetSpot => {
                 bitget_spot_handle::handle_account_info(res_data, symbol)
+            },
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_account_info(res_data, symbol)
             }
             _ => {
                 error!("未找到该交易所!handle_account_info: {:?}",exchange);
@@ -65,6 +76,9 @@ impl HandleSwapInfo {
             }
             ExchangeEnum::BitgetSpot => {
                 bitget_spot_handle::handle_special_ticker(res_data)
+            },
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_special_ticker(res_data)
             }
             _ => {
                 error!("未找到该交易所!handle_special_ticker: {:?}",exchange);
@@ -95,6 +109,9 @@ impl HandleSwapInfo {
             ExchangeEnum::BitgetSpot => {
                 error!("暂未提供此交易所方法!handle_position:{:?}", exchange);
                 panic!("暂未提供此交易所方法!handle_position:{:?}", exchange);
+            },
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_position(res_data, ct_val)
             }
             _ => {
                 error!("未找到该交易所!handle_position: {:?}",exchange);
@@ -123,6 +140,9 @@ impl HandleSwapInfo {
             }
             ExchangeEnum::BitgetSpot => {
                 bitget_spot_handle::handle_order(res_data, ct_val)
+            },
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_order(res_data, ct_val)
             }
             _ => {
                 error!("未找到该交易所!handle_order: {:?}",exchange);
@@ -130,123 +150,152 @@ impl HandleSwapInfo {
             }
         }
     }
+
     // 处理深度信息
     pub fn handle_special_depth(exchange: ExchangeEnum, res_data: ResponseData) -> SpecialDepth {
-        let res_data_str = res_data.data;
-        let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
-        let mut depth_asks: Vec<MarketOrder>;
-        let mut depth_bids: Vec<MarketOrder>;
-        let t: Decimal;
-        let create_at: i64;
-        match exchange {
-            ExchangeEnum::BinanceSpot => {
-                depth_asks = binance_handle::format_depth_items(res_data_json["asks"].clone());
-                depth_bids = binance_handle::format_depth_items(res_data_json["bids"].clone());
-                t = Decimal::from_str(&res_data_json["lastUpdateId"].to_string()).unwrap();
-                create_at = 0;
-            }
-            ExchangeEnum::BinanceSwap => {
-                depth_asks = binance_handle::format_depth_items(res_data_json["a"].clone());
-                depth_bids = binance_handle::format_depth_items(res_data_json["b"].clone());
-                t = Decimal::from_str(&res_data_json["u"].to_string()).unwrap();
-                create_at = res_data_json["E"].as_i64().unwrap() * 1000;
-            }
-            ExchangeEnum::GateSwap => {
-                depth_asks = gate_handle::format_depth_items(res_data_json["asks"].clone());
-                depth_bids = gate_handle::format_depth_items(res_data_json["bids"].clone());
-                // todo! 有id可以取 保证与py一致
-                t = Decimal::from_str(&res_data_json["t"].to_string()).unwrap();
-                create_at = res_data_json["t"].as_i64().unwrap() * 1000;
-            }
-            ExchangeEnum::KucoinSwap => {
-                depth_asks = kucoin_handle::format_depth_items(res_data_json["asks"].clone());
-                depth_bids = kucoin_handle::format_depth_items(res_data_json["bids"].clone());
-                t = Decimal::from_str(&res_data_json["sequence"].to_string()).unwrap();
-                create_at = res_data_json["ts"].as_i64().unwrap() * 1000;
-            }
-            ExchangeEnum::KucoinSpot => {
-                depth_asks = kucoin_spot_handle::format_depth_items(res_data_json["asks"].clone());
-                depth_bids = kucoin_spot_handle::format_depth_items(res_data_json["bids"].clone());
-                t = Decimal::from_str(&res_data_json["timestamp"].to_string()).unwrap();
-                create_at = res_data_json["timestamp"].as_i64().unwrap() * 1000;
-            }
-            ExchangeEnum::OkxSwap => {
-                depth_asks = okx_handle::format_depth_items(res_data_json[0]["asks"].clone());
-                depth_bids = okx_handle::format_depth_items(res_data_json[0]["bids"].clone());
-                t = Decimal::from_str(&res_data_json[0]["seqId"].to_string()).unwrap();
-                create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
-            }
-            ExchangeEnum::BitgetSpot => {
-                depth_asks = bitget_spot_handle::format_depth_items(res_data_json[0]["asks"].clone());
-                depth_bids = bitget_spot_handle::format_depth_items(res_data_json[0]["bids"].clone());
-                t = Decimal::from_str(res_data_json[0]["ts"].as_str().unwrap()).unwrap();
-                create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
-            }
-            _ => {
-                error!("未找到该交易所!handle_special_depth: {:?}",exchange);
-                panic!("未找到该交易所!handle_special_depth: {:?}", exchange);
-            }
+        let lable = res_data.label.clone();
+        // 格式化
+        let mut format_depth = format_depth(exchange, res_data);
+        // 运算、组装
+        make_special_depth(lable, &mut format_depth.depth_asks, &mut format_depth.depth_bids, format_depth.t, format_depth.create_at)
+    }
+
+}
+
+
+pub fn make_special_depth(label: String, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, t: Decimal, create_at: i64) -> SpecialDepth{
+    depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
+    depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
+    let mp = (depth_asks[0].price + depth_bids[0].price) * dec!(0.5);
+    let step = (public_params::EFF_RANGE * mp / Decimal::from_usize(public_params::LEVEL).unwrap()).round_dp(mp.scale());
+    let mut ap = Vec::new();
+    let mut bp = Vec::new();
+    let mut av: Vec<Decimal> = Vec::new();
+    let mut bv: Vec<Decimal> = Vec::new();
+    for i in 0..public_params::LEVEL {
+        let price = (depth_asks[0].price + step * Decimal::from_f64(i as f64).unwrap()).round_dp(depth_asks[0].price.scale());
+        ap.push(price);
+    }
+    for i in 0..public_params::LEVEL {
+        let price = (depth_bids[0].price - step * Decimal::from_f64(i as f64).unwrap()).round_dp(depth_bids[0].price.scale());
+        bp.push(price);
+    }
+    let mut ap_price_tag = depth_asks[0].price + step;
+    let mut ap_index = 0;
+    for item in depth_asks.iter() {
+        let price = item.price;
+        let amount = item.amount;
+        if av.get(ap_index).is_none() { av.push(Decimal::ZERO) };
+        if price < ap_price_tag {
+            av[ap_index] += amount;
+        } else {
+            ap_price_tag += step;
+            ap_index += 1;
+            if ap_index == public_params::LEVEL {
+                break;
+            }
+            av[ap_index] += amount
         }
-        depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
-        depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
-        let mp = (depth_asks[0].price + depth_bids[0].price) * dec!(0.5);
-        let step = (public_params::EFF_RANGE * mp / Decimal::from_usize(public_params::LEVEL).unwrap()).round_dp(mp.scale());
-        let mut ap = Vec::new();
-        let mut bp = Vec::new();
-        let mut av: Vec<Decimal> = Vec::new();
-        let mut bv: Vec<Decimal> = Vec::new();
-        for i in 0..public_params::LEVEL {
-            let price = (depth_asks[0].price + step * Decimal::from_f64(i as f64).unwrap()).round_dp(depth_asks[0].price.scale());
-            ap.push(price);
+    }
+
+    let mut bp_price_tag = depth_bids[0].price - step;
+    let mut bp_index = 0;
+    for item in depth_bids.iter() {
+        let price = item.price;
+        let amount = item.amount;
+        if bv.get(bp_index).is_none() { bv.push(Decimal::ZERO) };
+        if price > bp_price_tag {
+            bv[bp_index] += amount;
+        } else {
+            bp_price_tag -= step;
+            bp_index += 1;
+            if bp_index == public_params::LEVEL {
+                break;
+            }
+            bv[bp_index] += amount
         }
-        for i in 0..public_params::LEVEL {
-            let price = (depth_bids[0].price - step * Decimal::from_f64(i as f64).unwrap()).round_dp(depth_bids[0].price.scale());
-            bp.push(price);
+    }
+
+    let ticker_info = SpecialTicker { sell: depth_asks[0].price, buy: depth_bids[0].price, mid_price: mp, t, create_at };
+    let depth_info = bp.iter().cloned().chain(bv.iter().cloned()).chain(ap.iter().cloned()).chain(av.iter().cloned()).collect();
+    SpecialDepth {
+        name: label,
+        depth: depth_info,
+        ticker: ticker_info,
+        t,
+        create_at,
+    }
+}
+
+pub fn format_depth(exchange: ExchangeEnum, res_data: ResponseData) -> DepthParam{
+    let res_data_str = res_data.data;
+    let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
+    let depth_asks: Vec<MarketOrder>;
+    let depth_bids: Vec<MarketOrder>;
+    let t: Decimal;
+    let create_at: i64;
+    match exchange {
+        ExchangeEnum::BinanceSpot => {
+            depth_asks = binance_handle::format_depth_items(res_data_json["asks"].clone());
+            depth_bids = binance_handle::format_depth_items(res_data_json["bids"].clone());
+            t = Decimal::from_str(&res_data_json["lastUpdateId"].to_string()).unwrap();
+            create_at = 0;
         }
-        let mut ap_price_tag = depth_asks[0].price + step;
-        let mut ap_index = 0;
-        for item in depth_asks.iter() {
-            let price = item.price;
-            let amount = item.amount;
-            if av.get(ap_index).is_none() { av.push(Decimal::ZERO) };
-            if price < ap_price_tag {
-                av[ap_index] += amount;
-            } else {
-                ap_price_tag += step;
-                ap_index += 1;
-                if ap_index == public_params::LEVEL {
-                    break;
-                }
-                av[ap_index] += amount
-            }
+        ExchangeEnum::BinanceSwap => {
+            depth_asks = binance_handle::format_depth_items(res_data_json["a"].clone());
+            depth_bids = binance_handle::format_depth_items(res_data_json["b"].clone());
+            t = Decimal::from_str(&res_data_json["u"].to_string()).unwrap();
+            create_at = res_data_json["E"].as_i64().unwrap() * 1000;
         }
-
-        let mut bp_price_tag = depth_bids[0].price - step;
-        let mut bp_index = 0;
-        for item in depth_bids.iter() {
-            let price = item.price;
-            let amount = item.amount;
-            if bv.get(bp_index).is_none() { bv.push(Decimal::ZERO) };
-            if price > bp_price_tag {
-                bv[bp_index] += amount;
-            } else {
-                bp_price_tag -= step;
-                bp_index += 1;
-                if bp_index == public_params::LEVEL {
-                    break;
-                }
-                bv[bp_index] += amount
-            }
+        ExchangeEnum::GateSwap => {
+            depth_asks = gate_handle::format_depth_items(res_data_json["asks"].clone());
+            depth_bids = gate_handle::format_depth_items(res_data_json["bids"].clone());
+            // todo! 有id可以取 保证与py一致
+            t = Decimal::from_str(&res_data_json["t"].to_string()).unwrap();
+            create_at = res_data_json["t"].as_i64().unwrap() * 1000;
         }
-
-        let ticker_info = SpecialTicker { sell: depth_asks[0].price, buy: depth_bids[0].price, mid_price: mp, t, create_at };
-        let depth_info = bp.iter().cloned().chain(bv.iter().cloned()).chain(ap.iter().cloned()).chain(av.iter().cloned()).collect();
-        SpecialDepth {
-            name: res_data.label,
-            depth: depth_info,
-            ticker: ticker_info,
-            t,
-            create_at,
+        ExchangeEnum::KucoinSwap => {
+            depth_asks = kucoin_handle::format_depth_items(res_data_json["asks"].clone());
+            depth_bids = kucoin_handle::format_depth_items(res_data_json["bids"].clone());
+            t = Decimal::from_str(&res_data_json["sequence"].to_string()).unwrap();
+            create_at = res_data_json["ts"].as_i64().unwrap() * 1000;
+        }
+        ExchangeEnum::KucoinSpot => {
+            depth_asks = kucoin_spot_handle::format_depth_items(res_data_json["asks"].clone());
+            depth_bids = kucoin_spot_handle::format_depth_items(res_data_json["bids"].clone());
+            t = Decimal::from_str(&res_data_json["timestamp"].to_string()).unwrap();
+            create_at = res_data_json["timestamp"].as_i64().unwrap() * 1000;
+        }
+        ExchangeEnum::OkxSwap => {
+            depth_asks = okx_handle::format_depth_items(res_data_json[0]["asks"].clone());
+            depth_bids = okx_handle::format_depth_items(res_data_json[0]["bids"].clone());
+            t = Decimal::from_str(&res_data_json[0]["seqId"].to_string()).unwrap();
+            create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
+        }
+        ExchangeEnum::BitgetSpot => {
+            depth_asks = bitget_spot_handle::format_depth_items(res_data_json[0]["asks"].clone());
+            depth_bids = bitget_spot_handle::format_depth_items(res_data_json[0]["bids"].clone());
+            t = Decimal::from_str(res_data_json[0]["ts"].as_str().unwrap()).unwrap();
+            create_at = res_data_json[0]["ts"].as_str().unwrap().parse::<i64>().unwrap() * 1000;
+        }
+        ExchangeEnum::BybitSwap => {
+            depth_asks = bybit_swap_handle::format_depth_items(res_data_json["a"].clone());
+            depth_bids = bybit_swap_handle::format_depth_items(res_data_json["b"].clone());
+            t = Decimal::from_i64(res_data.reach_time).unwrap();
+            create_at = res_data.reach_time * 1000;
         }
+        _ => {
+            error!("未找到该交易所!handle_special_depth: {:?}",exchange);
+            panic!("未找到该交易所!handle_special_depth: {:?}", exchange);
+        }
+    }
+
+    DepthParam{
+        depth_asks,
+        depth_bids,
+        t,
+        create_at
     }
-}
+}
+
+

+ 2 - 0
standard/src/lib.rs

@@ -30,6 +30,8 @@ mod bitget_spot;
 pub mod bitget_spot_handle;
 mod kucoin_spot;
 pub mod kucoin_spot_handle;
+mod bybit_swap;
+mod bybit_swap_handle;
 
 /// 持仓模式枚举
 /// - `Both`:单持仓方向

+ 1 - 1
strategy/Cargo.toml

@@ -10,7 +10,7 @@ serde = "1.0.183"
 serde_derive = "1.0"
 serde_json = "1.0.104"
 tokio = { version = "1.31.0", features = ["full"] }
-rust_decimal = "1.32.0"
+rust_decimal = { version = "1.32.0", features = ["maths"]}
 rust_decimal_macros = "1.32.0"
 rand = "0.8.4"
 chrono = "0.4.26"

+ 18 - 17
strategy/src/binance_usdt_swap.rs

@@ -10,7 +10,7 @@ use crate::model::{OriginalTradeBa};
 use crate::quant::Quant;
 use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use futures_util::StreamExt;
-use crate::exchange_disguise::{on_special_depth, on_trade};
+use crate::exchange_disguise::{on_special_depth};
 
 // 参考 币安 合约 启动
 pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
@@ -71,22 +71,23 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         // let name = data.label.clone();
 
         // 订单流逻辑
-        on_trade(trade.clone(), bot_arc_clone.clone()).await;
+        // on_trade(trade.clone(), bot_arc_clone.clone()).await;
 
-        // // 原本的逻辑
-        // let mut quant = bot_arc_clone.lock().await;
-        // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(name.as_str()).unwrap() {
-        //     *max_buy = Decimal::ZERO;
-        //     *min_sell = Decimal::ZERO;
-        //     quant.is_update.remove(name.as_str());
-        // }
-        // if trade.p > *max_buy || *max_buy == Decimal::ZERO {
-        //     *max_buy = trade.p
-        // }
-        // if trade.p < *min_sell || *min_sell == Decimal::ZERO {
-        //     *min_sell = trade.p
-        // }
-        // quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+        // 原本的逻辑
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+            *_max_buy = Decimal::ZERO;
+            *_min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        if trade.p > *_max_buy || *_max_buy == Decimal::ZERO {
+            *_max_buy = trade.p
+        }
+        if trade.p < *_min_sell || *_min_sell == Decimal::ZERO {
+            *_min_sell = trade.p
+        }
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*_max_buy, *_min_sell]);
     } else if data.channel == "bookTicker" {
         trace_stack.on_before_format();
         // 将ticker数据转换为模拟深度
@@ -104,4 +105,4 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
 
         on_special_depth(bot_arc_clone, update_flag_u, data.label.clone(), trace_stack, special_depth).await;
     }
-}
+}

+ 271 - 0
strategy/src/bybit_usdt_swap.rs

@@ -0,0 +1,271 @@
+use std::cmp::Ordering;
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
+use rust_decimal::Decimal;
+use tokio::spawn;
+use tokio::sync::Mutex;
+use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::{BybitSwap};
+use standard::handle_info::{DepthParam, format_depth, make_special_depth};
+use standard::MarketOrder;
+use crate::model::{OrderInfo, OriginalTradeBy};
+use crate::quant::Quant;
+use crate::exchange_disguise::on_special_depth;
+
+// 1交易、0参考 bybit 合约 启动
+pub async fn bybit_swap_run(bool_v1: Arc<AtomicBool>,
+                           is_trade: bool,
+                           _quant_arc: Arc<Mutex<Quant>>,
+                           name: String,
+                           symbols: Vec<String>,
+                           is_colo: bool,
+                           exchange_params: BTreeMap<String, String>) {
+    // 启动公共频道
+    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+    let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
+
+    let mut ws_public = BybitSwapWs::new_label(name.clone(), is_colo, None, BybitSwapWsType::Public);
+    ws_public.set_symbols(symbols.clone());
+    ws_public.set_subscribe(vec![
+        BybitSwapSubscribeType::PuOrderBook50
+    ]);
+    if is_trade {
+        ws_public.set_subscribe(vec![
+            BybitSwapSubscribeType::PuBlicTrade
+        ]);
+    }
+    // 挂起公共ws
+    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+    let bool_clone_public = Arc::clone(&bool_v1);
+     spawn(async move {
+        ws_public.ws_connect_async(bool_clone_public,
+                                   &write_tx_am_public,
+                                   write_rx_public,
+                                   read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+    // 消费数据
+    let bot_arc_clone = _quant_arc.clone();
+    // 接收public数据
+    spawn(async move {
+        // ticker
+        let mut update_flag_u = Decimal::ZERO;
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        let mut depth_asks: Vec<MarketOrder> = Vec::new();
+        let mut depth_bids: Vec<MarketOrder> = Vec::new();
+
+        loop {
+            if let Some(public_data) = read_rx_public.next().await {
+                on_public_data(bot_arc_clone.clone(),
+                               &mut update_flag_u,
+                               &mut max_buy,
+                               &mut min_sell,
+                               public_data,
+                               &mut depth_asks,
+                               &mut depth_bids).await;
+            }
+        }
+    });
+    let trade_symbols = symbols.clone();
+    // 交易交易所需要启动私有ws
+    if is_trade {
+        let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
+        let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
+        let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
+
+        let mut ws_private = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
+        ws_private.set_symbols(trade_symbols);
+        ws_private.set_subscribe(vec![
+            BybitSwapSubscribeType::PrPosition,
+            BybitSwapSubscribeType::PrOrder,
+            BybitSwapSubscribeType::PrWallet
+        ]);
+
+
+        // 挂起私有ws
+        let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+        let bool_clone_private = Arc::clone(&bool_v1);
+        spawn(async move {
+            ws_private.ws_connect_async(bool_clone_private,
+                                        &write_tx_am_private,
+                                        write_rx_private,
+                                        read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+
+        // 消费数据
+        let bot_arc_clone = _quant_arc.clone();
+        // 接收private信息
+        spawn(async move {
+            let ct_val = _quant_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
+            let run_symbol = symbols.clone()[0].clone();
+            loop {
+                if let Some(private_data) = read_rx_private.next().await {
+                    on_private_data(bot_arc_clone.clone(),
+                                    ct_val,
+                                    private_data,
+                                    run_symbol.clone()).await;
+                }
+            }
+        });
+    }
+}
+
+
+
+async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
+    let mut trace_stack = TraceStack::default();
+
+    trace_stack.on_after_network(data.time);
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "wallet" {
+        let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, data, run_symbol.clone());
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_equity(account);
+        }
+    } else if data.channel == "order" {
+        trace_stack.on_before_format();
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, data.clone(), ct_val.clone());
+        trace_stack.on_after_format();
+
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for order in orders.order {
+            if order.status == "NULL" {
+                continue;
+            }
+            let order_info = OrderInfo {
+                symbol: "".to_string(),
+                amount: order.amount.abs(),
+                side: "".to_string(),
+                price: order.price,
+                client_id: order.custom_id,
+                filled_price: order.avg_price,
+                filled: order.deal_amount.abs(),
+                order_id: order.id,
+                local_time: 0,
+                create_time: 0,
+                status: order.status,
+                fee: Default::default(),
+                trace_stack: Default::default(),
+            };
+            order_infos.push(order_info);
+        }
+
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_order(order_infos, trace_stack);
+        }
+    } else if data.channel == "position" {
+        let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap,data, ct_val.clone());
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_position(positions);
+        }
+    }
+}
+
+async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>) {
+    let mut trace_stack = TraceStack::default();
+    trace_stack.on_after_network(data.time);
+    trace_stack.on_before_quant();
+
+    if data.code != "200".to_string() {
+        return;
+    }
+    if data.channel == "orderbook" {
+        let mut is_update = false;
+        let data_type = data.data_type.clone();
+        let label = data.label.clone();
+        if data_type == "delta"  {
+            is_update = true;
+        }
+        trace_stack.on_before_format();
+        let mut depth_format: DepthParam = format_depth(BybitSwap, data);
+        // 是增量更新
+        if is_update {
+            update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
+        } else { // 全量
+            depth_asks.clear();
+            depth_asks.append(&mut depth_format.depth_asks);
+            depth_bids.clear();
+            depth_bids.append(&mut depth_format.depth_bids);
+
+        }
+        let depth = make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
+        trace_stack.on_before_network(depth_format.create_at.clone());
+        trace_stack.on_after_format();
+
+        on_special_depth(bot_arc_clone, update_flag_u, label, trace_stack, depth).await;
+    } else if data.channel == "trade" {
+        let mut quant = bot_arc_clone.lock().await;
+        let str = data.label.clone();
+        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+            *max_buy = Decimal::ZERO;
+            *min_sell = Decimal::ZERO;
+            quant.is_update.remove(str.as_str());
+        }
+        let trades: Vec<OriginalTradeBy> = serde_json::from_str(data.data.as_str()).unwrap();
+        for trade in trades {
+            if trade.p > *max_buy || *max_buy == Decimal::ZERO{
+                *max_buy = trade.p
+            }
+            if trade.p < *min_sell || *min_sell == Decimal::ZERO{
+                *min_sell = trade.p
+            }
+        }
+        quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    }
+}
+
+fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
+    BybitSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        secret_key: exchange_params.get("secret_key").unwrap().clone(),
+    }
+}
+fn update_order_book(depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, asks : Vec<MarketOrder>, bids: Vec<MarketOrder>) {
+    for i in asks {
+        let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
+        match index_of_value {
+            Some(index) => {
+                if i.amount == Decimal::ZERO {
+                    depth_asks.remove(index);
+                } else {
+                    depth_asks[index].amount = i.amount.clone();
+                }
+            },
+            None => {
+                depth_asks.push(i.clone());
+            },
+        }
+    }
+    for i in bids {
+        let index_of_value = depth_bids.iter().position(|x| x.price == i.price);
+        match index_of_value {
+            Some(index) => {
+                if i.amount == Decimal::ZERO {
+                    depth_bids.remove(index);
+                } else {
+                    depth_bids[index].amount = i.amount.clone();
+                }
+            },
+            None => {
+                depth_bids.push(i.clone());
+            },
+        }
+    }
+    depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
+    depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
+
+    // 限制总长度100
+    depth_asks.truncate(100);
+    depth_bids.truncate(100);
+}

+ 55 - 49
strategy/src/exchange_disguise.rs

@@ -8,10 +8,10 @@ use standard::SpecialDepth;
 use crate::binance_spot::reference_binance_spot_run;
 use crate::binance_usdt_swap::reference_binance_swap_run;
 use crate::bitget_spot::bitget_spot_run;
+use crate::bybit_usdt_swap::bybit_swap_run;
 use crate::gate_swap::gate_swap_run;
 use crate::kucoin_spot::kucoin_spot_run;
 use crate::kucoin_swap::kucoin_swap_run;
-use crate::model::OriginalTradeBa;
 use crate::okx_usdt_swap::okex_swap_run;
 use crate::quant::Quant;
 
@@ -35,6 +35,9 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>,
         },
         "bitget_spot" => {
             bitget_spot_run(bool_v1,true, quant_arc, name, symbols, is_colo, exchange_params).await;
+        },
+        "bybit_usdt_swap" => {
+            bybit_swap_run(bool_v1,true, quant_arc, name, symbols, is_colo, exchange_params).await;
         }
         _ => {
             let msg = format!("不支持的交易交易所:{}", exchange_name);
@@ -73,6 +76,9 @@ pub async fn run_reference_exchange(bool_v1: Arc<AtomicBool>,
         "bitget_spot" => {
             bitget_spot_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
+        "bybit_usdt_swap" => {
+            bybit_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
+        },
         _ => {
             let msg = format!("不支持的参考交易所:{}", exchange_name);
             panic!("{}", msg);
@@ -94,54 +100,54 @@ pub async fn on_special_depth(bot_arc: Arc<Mutex<Quant>>,
     }
 }
 
-pub async fn on_trade(trade: OriginalTradeBa,
-                      bot_arc_clone: Arc<Mutex<Quant>>) {
-    let mut bot = bot_arc_clone.lock().await;
-    // 1. 塞入数据到bot
-    bot.trades.push(trade.clone());
-    // 2. 长度检查
-    if bot.trades.len() > bot.recall_max_count {
-        bot.trades.remove(0);
-    }
-    // 3. 如果少于100条,不进行判断
-    if bot.trades.len() < 100 {
-        return;
-    }
-    // 求最近的多空总和
-    let mut long_sum = Decimal::ZERO;
-    let mut short_sum = Decimal::ZERO;
-    let last_trade_t = trade.t.clone();
-    let mut rev = bot.trades.clone();
-    rev.reverse();
-    for trade_o in rev {
-        // 如果该元素已过期,我们是按时间顺序插入的,说明前面的应该都过期了,跳出循环,停止检测
-        if trade_o.t < last_trade_t - bot.recall_time {
-            continue;
-        }
-
-        // 卖出订单
-        if trade_o.m {
-            short_sum += trade_o.q;
-        } else {
-            long_sum += trade_o.q;
-        }
-    }
-
-    // 做多主动性
-    if (long_sum / (long_sum + short_sum)) > bot.long_volume_rate {
-        if bot.side != "long".to_string() {
-            bot.side = "long".to_string();
-        }
-    } else if (short_sum / (long_sum + short_sum)) > bot.short_volume_rate {
-        if bot.side != "short".to_string() {
-            bot.side = "short".to_string();
-        }
-    } else {
-        if bot.side != "normal".to_string() {
-            bot.side = "normal".to_string();
-        }
-    }
-}
+// pub async fn on_trade(trade: OriginalTradeBa,
+//                       bot_arc_clone: Arc<Mutex<Quant>>) {
+    // let mut bot = bot_arc_clone.lock().await;
+    // // 1. 塞入数据到bot
+    // bot.trades.push(trade.clone());
+    // // 2. 长度检查
+    // if bot.trades.len() > bot.recall_max_count {
+    //     bot.trades.remove(0);
+    // }
+    // // 3. 如果少于100条,不进行判断
+    // if bot.trades.len() < 100 {
+    //     return;
+    // }
+    // // 求最近的多空总和
+    // let mut long_sum = Decimal::ZERO;
+    // let mut short_sum = Decimal::ZERO;
+    // let last_trade_t = trade.t.clone();
+    // let mut rev = bot.trades.clone();
+    // rev.reverse();
+    // for trade_o in rev {
+    //     // 如果该元素已过期,我们是按时间顺序插入的,说明前面的应该都过期了,跳出循环,停止检测
+    //     if trade_o.t < last_trade_t - bot.recall_time {
+    //         continue;
+    //     }
+    //
+    //     // 卖出订单
+    //     if trade_o.m {
+    //         short_sum += trade_o.q;
+    //     } else {
+    //         long_sum += trade_o.q;
+    //     }
+    // }
+    //
+    // // 做多主动性
+    // if (long_sum / (long_sum + short_sum)) > bot.long_volume_rate {
+    //     if bot.side != "long".to_string() {
+    //         bot.side = "long".to_string();
+    //     }
+    // } else if (short_sum / (long_sum + short_sum)) > bot.short_volume_rate {
+    //     if bot.side != "short".to_string() {
+    //         bot.side = "short".to_string();
+    //     }
+    // } else {
+    //     if bot.side != "normal".to_string() {
+    //         bot.side = "normal".to_string();
+    //     }
+    // }
+// }
 
 pub async fn on_order() {}
 

+ 2 - 1
strategy/src/lib.rs

@@ -11,4 +11,5 @@ mod gate_swap;
 mod kucoin_swap;
 mod kucoin_spot;
 mod bitget_spot;
-mod okx_usdt_swap;
+mod okx_usdt_swap;
+mod bybit_usdt_swap;

+ 6 - 0
strategy/src/model.rs

@@ -114,6 +114,12 @@ pub struct OriginalTradeGa {
     pub price: Decimal
 }
 
+#[derive(Serialize, Deserialize)]
+pub struct OriginalTradeBy {
+    pub v: Decimal,
+    pub p: Decimal
+}
+
 #[derive(Serialize, Deserialize)]
 pub struct OriginalTradeOK {
     // 数量

+ 4 - 1
strategy/src/quant.rs

@@ -20,7 +20,7 @@ use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
-use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, BitgetSpot, GateSpot, GateSwap, KucoinSwap, OkxSwap};
+use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, BitgetSpot, BybitSwap, GateSpot, GateSwap, KucoinSwap, OkxSwap};
 
 use crate::model::{LocalPosition, OrderInfo, OriginalTradeBa, TokenParam, TraderMsg};
 use crate::predictor::Predictor;
@@ -218,6 +218,9 @@ impl Quant {
                 "okex_usdt_swap" => {
                     Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
                 }
+                "bybit_usdt_swap" => {
+                    Exchange::new(BybitSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
                 _ => {
                     error!("203未找到对应的交易所rest枚举!");
                     panic!("203未找到对应的交易所rest枚举!");

+ 5 - 0
strategy/src/utils.rs

@@ -65,6 +65,8 @@ pub fn get_limit_requests_num_per_second(exchange: String) -> i64 {
         return public_params::OKEX_USDT_SWAP_LIMIT * public_params::RATIO;
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT * public_params::RATIO;
+    } else if exchange.eq("bybit_usdt_swap"){
+        return public_params::BYBIT_USDT_SWAP_LIMIT * public_params::RATIO;
     } else {
         error!("限频规则(ratio)未找到,请检查配置!");
         panic!("限频规则(ratio)未找到,请检查配置!");
@@ -93,6 +95,8 @@ pub fn get_limit_order_requests_num_per_second(exchange: String) -> i64 {
         return public_params::OKEX_USDT_SWAP_LIMIT
     } else if exchange.eq("bitget_spot") {
         return public_params::BITGET_USDT_SPOT_LIMIT
+    } else if exchange.eq("bybit_usdt_swap") {
+        return public_params::BYBIT_USDT_SWAP_LIMIT
     } else {
         error!("限频规则(limit)未找到,请检查配置!");
         panic!("限频规则(limit)未找到,请检查配置!");
@@ -150,4 +154,5 @@ mod tests {
         println!("timestamp_micros: {}", now.timestamp_micros());
         println!("timestamp_nanos: {}", now.timestamp_nanos_opt().unwrap());
     }
+
 }