소스 검색

添加bybit数据

DESKTOP-NE65RNK\Citrus_limon 1 년 전
부모
커밋
8c8341423f

+ 494 - 0
exchanges/src/bybit_swap_rest.rs

@@ -0,0 +1,494 @@
+use std::collections::BTreeMap;
+
+use reqwest::Client;
+use reqwest::header::HeaderMap;
+use ring::hmac;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use serde_json::Value;
+use tracing::{info, trace};
+
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+
+#[derive(Clone, Debug)]
+pub struct BybitSwapRest {
+    pub tag: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //登录所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+}
+
+impl BybitSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> BybitSwapRest
+    {
+        return BybitSwapRest::new_with_tag("default-BybitSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> BybitSwapRest {
+        let base_url = if is_colo {
+            "https://api.bytick.com".to_string()
+        } else {
+            "https://api.bytick.com".to_string()
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",base_url);
+        } else {
+            info!("走普通通道:{}",base_url);
+        }
+        /*****返回结构体*******/
+        BybitSwapRest {
+            tag,
+            base_url,
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    //服務器時間
+    pub async fn get_server_time(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/time".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢最新行情信息
+    pub async fn get_tickers(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+               "category":"linear",
+                "symbol":symbol
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/tickers".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢市場價格K線數據
+    pub async fn get_kline(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+               "category":"linear",
+                "symbol":symbol,
+                "interval":"1",
+                "limit":"200"
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/kline".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢公告
+    pub async fn get_announcements(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "locale":"zh-TW"
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/announcements/index".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢可交易產品的規格信息
+    pub async fn get_instruments_list(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "category":"linear",
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/instruments-info".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢可交易產品的規格信息
+    pub async fn get_instruments_info(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+            "category":"linear",
+            "symbol":symbol
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/instruments-info".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //查看持仓信息
+    pub async fn get_positions(&mut self, symbol: String, settle_coin: String) -> ResponseData {
+        let mut params = serde_json::json!({
+            "category":"linear",
+         });
+        if symbol.len() > 0 {
+            params.as_object_mut().unwrap().insert("symbol".parse().unwrap(), serde_json::Value::from(symbol));
+        }
+        if settle_coin.len() > 0 {
+            params.as_object_mut().unwrap().insert("settleCoin".parse().unwrap(), serde_json::Value::from(settle_coin));
+        }
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/position/list".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //设置持仓模式
+    pub async fn set_position_mode(&mut self, symbol: String, mode: i64) -> ResponseData {
+        let params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+             "mode": mode,
+         });
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/position/switch-mode".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //設置槓桿
+    pub async fn set_leverage(&mut self, symbol: String,
+                              lever: String) -> ResponseData {
+        let params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+             "buyLeverage": lever,
+             "sellLeverage": lever,
+         });
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/position/set-leverage".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+
+    //查詢錢包餘額
+    pub async fn get_account_balance(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+            "accountType":"UNIFIED",
+            "coin":symbol
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/account/wallet-balance".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //創建委託單
+    pub async fn swap_order(&mut self, params: serde_json::Value) -> ResponseData {
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/order/create".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢實時委託單
+    pub async fn get_order(&mut self, symbol: String, order_id: String, order_link_id: String) -> ResponseData {
+        let mut params = serde_json::json!({
+            "category":"linear",
+            "symbol":symbol,
+         });
+        if order_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderId".parse().unwrap(), serde_json::Value::from(order_id));
+        }
+        if order_link_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderLinkId".parse().unwrap(), serde_json::Value::from(order_link_id));
+        }
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/order/realtime".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //撤单
+    pub async fn cancel_order(&mut self, symbol: String, order_id: String, order_link_id: String) -> ResponseData {
+        let mut params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+         });
+        if order_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderId".parse().unwrap(), serde_json::Value::from(order_id));
+        }
+        if order_link_id.len() > 0 {
+            params.as_object_mut().unwrap().insert("orderLinkId".parse().unwrap(), serde_json::Value::from(order_link_id));
+        }
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/order/cancel".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    //撤銷所有訂單
+    pub async fn cancel_orders(&mut self, symbol: String) -> ResponseData {
+        let params = serde_json::json!({
+             "category": "linear",
+             "symbol": symbol,
+         });
+        let data = self.request("POST".to_string(),
+                                "/v5".to_string(),
+                                "/order/cancel-all".to_string(),
+                                true,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+    //调用请求
+    pub async fn request(&mut self,
+                         method: String,
+                         prefix_url: String,
+                         request_url: String,
+                         is_login: bool,
+                         params: String) -> ResponseData
+    {
+        trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        // let mut passphrase = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        // if self.login_param.contains_key("pass_key") {
+        //     passphrase = self.login_param.get("pass_key").unwrap().to_string();
+        // }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" {
+            is_login_param = false
+        }
+
+
+        //请求头配置-如果需要登录则存在额外配置
+        let mut body = "".to_string();
+        let timestamp = Self::get_timestamp();
+        let mut headers = HeaderMap::new();
+        headers.insert("Content-Type", "application/json; charset=utf-8".parse().unwrap());
+        headers.insert("X-BAPI-RECV-WINDOW", "5000".parse().unwrap());
+
+        if method == "POST" {
+            body = params.clone();
+        }
+
+
+        //是否需要登录-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
+                return e;
+            } else {
+                //需要登录-且登录参数齐全
+                trace!("param:{}", params);
+                trace!("body:{}", body);
+                //组装sing
+                let sing = Self::sign(
+                    access_key.clone(),
+                    secret_key.clone(),
+                    method.clone(),
+                    params.clone(),
+                    timestamp.clone(),
+                );
+                //组装header
+                headers.extend(Self::headers(sing, timestamp, access_key));
+            }
+        }
+
+
+        // trace!("headers:{:?}", headers);
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response_data = self.http_tool(
+            format!("{}{}", prefix_url.clone(), request_url.clone()),
+            method.to_string(),
+            params.clone(),
+            headers,
+        ).await;
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+
+        response_data
+    }
+
+    pub fn headers(sign: String, timestamp: String, access_key: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("X-BAPI-SIGN-TYPE", "2".parse().unwrap());
+        headers.insert("X-BAPI-API-KEY", access_key.parse().unwrap());
+        headers.insert("X-BAPI-TIMESTAMP", timestamp.parse().unwrap());
+        headers.insert("X-BAPI-SIGN", sign.parse().unwrap());
+        // headers.insert("X-Referer", passphrase.parse().unwrap());
+        headers
+    }
+    pub fn sign(access_key: String,
+                secret_key: String,
+                method: String,
+                params: String, timestamp: String) -> String
+    {
+        /*签名生成*/
+        let url_param_str = RestTool::parse_params_to_str(params.clone());
+        let parameters = if method == "GET" {
+            url_param_str
+        } else {
+            params
+        };
+
+        let message = format!("{}{}5000{}", timestamp, access_key, parameters);
+        trace!("message:{}",message);
+
+        // 做签名
+        let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+        let sign = hex::encode(result.as_ref());
+        sign
+    }
+
+    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url: String = if RestTool::parse_params_to_str(params.clone()) == "" {
+            url.clone()
+        } else {
+            format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()))
+        };
+
+        let request_builder = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(url.clone()).body(params.clone()).headers(headers),
+            "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+        let response = request_builder.send().await.unwrap();
+        return if response.status().is_success() {
+            // 读取响应的内容
+            let body = response.text().await.unwrap();
+
+            self.on_success_data(&body, &addrs_url, &params)
+        } else {
+            let body = response.text().await.unwrap();
+
+            ResponseData::error(self.tag.clone(), body)
+        };
+    }
+
+    pub fn on_success_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
+        let json_value: Value = serde_json::from_str(text.as_str()).unwrap();
+
+        let code: i64 = if json_value["retCode"].as_i64().is_some() {
+            json_value["retCode"].as_i64().unwrap()
+        } else if json_value["ret_code"].as_i64().is_some() {
+            json_value["ret_code"].as_i64().unwrap()
+        } else {
+            -1
+        };
+
+        if code == 0 {
+            let data = serde_json::to_string(&json_value["result"]).unwrap();
+            let mut success = ResponseData::new("".to_string(), 200, "success".to_string(), data.parse().unwrap());
+            success.time = json_value["time"].as_i64().unwrap();
+            success
+        } else {
+            let msg: &str = if json_value["retMsg"].as_str().is_some() {
+                json_value["retMsg"].as_str().unwrap()
+            } else if json_value["ret_msg"].as_str().is_some() {
+                json_value["ret_msg"].as_str().unwrap()
+            } else {
+                ""
+            };
+
+            let error = ResponseData::new("".to_string(),
+                                          code as i16,
+                                          format!("请求地址:{}, 请求参数:{}, 消息原文:{}。", base_url, params, msg),
+                                          Value::Null);
+            error
+        }
+    }
+
+    fn get_timestamp() -> String {
+        chrono::Utc::now().timestamp_millis()
+            .to_string()
+    }
+}

+ 353 - 0
exchanges/src/bybit_swap_ws.rs

@@ -0,0 +1,353 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+
+use chrono::Utc;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::{json, Value};
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use ring::hmac;
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+//类型
+pub enum BybitSwapWsType {
+    Public,
+    Private,
+}
+
+//订阅频道
+#[derive(Clone)]
+pub enum BybitSwapSubscribeType {
+    PuOrderBook1,
+    PuOrderBook50,
+    PuBlicTrade,
+    PuTickers,
+    PuKline(String),
+
+    PrPosition,
+    PrExecution,
+    PrOrder,
+    PrWallet,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BybitSwapLogin {
+    pub api_key: String,
+    pub secret_key: String,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct BybitSwapWs {
+    //类型
+    tag: String,
+    //地址
+    address_url: String,
+    //账号
+    login_param: Option<BybitSwapLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<BybitSwapSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl BybitSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
+        return BybitSwapWs::new_with_tag("default-BybitSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BybitSwapLogin>, ws_type: BybitSwapWsType) -> BybitSwapWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            BybitSwapWsType::Public => {
+                "wss://stream.bybit.com/v5/public/linear?max_alive_time=1m".to_string()
+            }
+            BybitSwapWsType::Private => {
+                "wss://stream.bybit.com/v5/private?max_alive_time=1m".to_string()
+            }
+        };
+
+        if is_colo {
+            info!("开启高速(未配置,走普通:{})通道",address_url);
+        } else {
+            info!("走普通通道:{}",address_url);
+        }
+
+        BybitSwapWs {
+            tag,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 10,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<BybitSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "");
+            *symbol = symbol.replace("-", "");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                BybitSwapSubscribeType::PuOrderBook1 => false,
+                BybitSwapSubscribeType::PuOrderBook50 => false,
+                BybitSwapSubscribeType::PuBlicTrade => false,
+                BybitSwapSubscribeType::PuTickers => false,
+                BybitSwapSubscribeType::PuKline(_) => false,
+
+                BybitSwapSubscribeType::PrPosition => true,
+                BybitSwapSubscribeType::PrExecution => true,
+                BybitSwapSubscribeType::PrOrder => true,
+                BybitSwapSubscribeType::PrWallet => true,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: BybitSwapSubscribeType) -> String {
+        match subscribe_type {
+            BybitSwapSubscribeType::PuOrderBook1 => {
+                format!("orderbook.1.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuOrderBook50 => {
+                format!("orderbook.50.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuBlicTrade => {
+                format!("publicTrade.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuTickers => {
+                format!("tickers.{}", symbol)
+            }
+            BybitSwapSubscribeType::PuKline(t) => {
+                format!("kline.{}.{}", t, symbol)
+            }
+
+            BybitSwapSubscribeType::PrPosition => {
+                format!("position")
+            }
+            BybitSwapSubscribeType::PrExecution => {
+                format!("execution")
+            }
+            BybitSwapSubscribeType::PrOrder => {
+                format!("order")
+            }
+            BybitSwapSubscribeType::PrWallet => {
+                format!("wallet")
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> String {
+        let mut params = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                params.push(ty_str);
+            }
+        }
+
+        let str = json!({
+                "op": "subscribe",
+                "args": params
+            });
+        str.to_string()
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let tag = self.tag.clone();
+        let login_param = self.login_param.clone();
+        let (api_key, secret_key) = match login_param {
+            None => { ("".to_string(), "".to_string()) }
+            Some(p) => {
+                (p.api_key.clone().to_string(), p.secret_key.clone().to_string())
+            }
+        };
+        let heartbeat_time = self.heartbeat_time.clone();
+        trace!("{:?}",format!("{}",subscription));
+
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("bybit_usdt_swap socket 连接中……");
+
+                //设置订阅
+                let mut subscribe_array = vec![];
+                if login_is {
+                    let timestamp = Utc::now().timestamp_millis();
+                    let expires = timestamp + 1000;
+                    let message = format!("GET/realtime{}", expires);
+
+                    let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+                    let result = hmac::sign(&hmac_key, &message.as_bytes());
+                    let signature = hex::encode(result.as_ref());
+
+                    //登录相关
+                    let str = json!({
+                        "op": "auth",
+                        "args": [api_key, expires, signature]
+                    });
+                    subscribe_array.push(str.to_string());
+                }
+                subscribe_array.push(subscription.to_string());
+
+                // ws网络层重连
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 false, tag.clone(), subscribe_array, write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
+
+                error!("bybit_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData {
+        // trace!("原始数据");
+        // trace!(?text);
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+        if json_value.get("success").is_some() {
+            info!("bybit_swap_ws订阅结果:{:?}", json_value);
+            //订阅内容
+            let success = json_value["success"].as_bool().unwrap();
+            // let ret_msg = json_value["ret_msg"].as_str().unwrap();
+            let op = json_value["op"].as_str().unwrap();
+            let success_error = if success {
+                "成功"
+            } else {
+                "失败"
+            };
+
+            if op == "auth" {
+                res_data.code = -200;
+                res_data.message = format!("登录{}", success_error);
+            } else if op == "subscribe" {
+                res_data.message = format!("订阅{}", success_error);
+                res_data.code = -201;
+            } else {
+                res_data.code = -1;
+                res_data.channel = "未知订阅".to_string();
+            }
+        } else if json_value.get("topic").is_some() && json_value.get("data").is_some() {
+            let channel = json_value["topic"].to_string();
+            res_data.data = json_value.clone();
+
+            res_data.code = 200;
+
+            if channel.contains("orderbook") {
+                res_data.channel = "orderbook".to_string();
+                res_data.data_type = json_value["type"].as_str().unwrap().to_string();
+                // bybit 时间在data块外
+                res_data.reach_time = json_value.get("ts").unwrap().as_i64().unwrap_or(0i64);
+            } else if channel.contains("publicTrade") {
+                res_data.channel = "trades".to_string();
+                res_data.data_type = json_value["type"].as_str().unwrap().to_string();
+            } else if channel.contains("tickers") {
+                res_data.channel = "tickers".to_string();
+                res_data.data["ts"] = json_value["ts"].clone();
+            } else if channel.contains("position") {
+                res_data.channel = "position".to_string();
+            } else if channel.contains("execution") {
+                res_data.channel = "execution".to_string();
+            } else if channel.contains("order") {
+                res_data.channel = "order".to_string();
+            } else if channel.contains("wallet") {
+                res_data.channel = "wallet".to_string();
+            } else if channel.contains("kline") {
+                res_data.channel = "kline.1m".to_string();
+            } else {
+                res_data.code = -1;
+                res_data.channel = "未知的频道".to_string();
+            }
+        } else {
+            //推送数据
+            res_data.code = -1;
+            res_data.channel = "未知的频道".to_string();
+        }
+
+        res_data
+    }
+}

+ 2 - 0
exchanges/src/lib.rs

@@ -13,3 +13,5 @@ pub mod phemex_swap_rest;
 pub mod phemex_swap_ws;
 pub mod mexc_swap_rest;
 pub mod mexc_swap_ws;
+pub mod bybit_swap_rest;
+pub mod bybit_swap_ws;

+ 118 - 0
src/bybit_usdt_swap_data_listener.rs

@@ -0,0 +1,118 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc};
+use std::sync::atomic::{AtomicBool};
+use std::time::Duration;
+use chrono::Utc;
+use lazy_static::lazy_static;
+use rust_decimal::Decimal;
+use tokio::sync::{Mutex};
+use tracing::info;
+use exchanges::bybit_swap_rest::BybitSwapRest;
+use exchanges::bybit_swap_ws::{BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
+use exchanges::response_base::ResponseData;
+use rust_decimal_macros::dec;
+use serde_json::Value;
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
+use crate::listener_tools::{TradeMap, update_trade};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+
+const EXCHANGE_NAME: &str = "bybit_usdt_swap";
+
+lazy_static! {
+    // 给其他模块使用
+    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
+
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "bybit_usdt_swap_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut bybit_rest = BybitSwapRest::new(false, login);
+    let response = bybit_rest.get_instruments_list().await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let symbol_infos: Vec<Value> = response.data["list"].as_array().unwrap().iter().filter(|item| { item["contractType"].as_str().unwrap() == "LinearPerpetual" && item["status"].as_str().unwrap() == "Trading" && item["quoteCoin"].as_str().unwrap() == "USDT" }).cloned().collect();
+        let mut mul_map = MUL_MAP.lock().await;
+        for symbol_info in symbol_infos {
+            let symbol = symbol_info["symbol"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+            mul_map.insert(symbol.clone(), Decimal::ONE);
+
+            symbols.push(symbol)
+        }
+    }
+
+    for chunk in symbols.chunks(20) {
+        let ws_name = name.to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = BybitSwapWs::new_with_tag(ws_name, false, None, BybitSwapWsType::Public);
+            ws.set_subscribe(vec![
+                BybitSwapSubscribeType::PuBlicTrade,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+    // 每分钟计算msv
+    tokio::spawn(async move {
+        loop {
+            let end_timestamp = Utc::now().timestamp_millis();
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 4;
+            for symbol in symbols.clone() {
+                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
+                let trades = parse_json_to_trades(trades_value);
+                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
+                let mut indicator_map = INDICATOR_MAP.lock().await;
+                indicator_map.insert(symbol, msv);
+            }
+            tokio::time::sleep(Duration::from_secs(60)).await;
+        }
+    });
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_secs(1800)).await;
+            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
+        }
+    });
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+    match response.channel.as_str() {
+        // 订单流数据
+        "trades" => {
+            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BybitSwap, &response);
+            let mul_map = MUL_MAP.lock().await;
+
+            for trade in trades.iter_mut() {
+                // 真实交易量处理,因为gate的量都是张数
+                let mul = mul_map[trade.symbol.as_str()];
+                let mut real_size = trade.size * mul * trade.price;
+                real_size.rescale(2);
+                trade.size = real_size;
+
+                // 更新到本地数据库
+                let trades_map = TRADES_MAP.lock().await;
+                update_trade(trade, trades_map, EXCHANGE_NAME).await;
+            }
+        }
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+}

+ 2 - 0
src/main.rs

@@ -7,6 +7,7 @@ mod gate_usdt_swap_data_listener;
 mod coinex_usdt_swap_data_listener;
 mod phemex_usdt_swap_data_listener;
 mod mexc_usdt_swap_data_listener;
+mod bybit_usdt_swap_data_listener;
 mod msv;
 mod rank;
 
@@ -38,6 +39,7 @@ async fn main() {
     coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
     phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
     mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
+    bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 5 - 1
src/server.rs

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
 use actix_cors::Cors;
-use crate::{binance_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
+use crate::{binance_usdt_swap_data_listener, bybit_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -62,6 +62,9 @@ async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
         "mexc_usdt_swap" => {
             mexc_usdt_swap_data_listener::INDICATOR_MAP.lock().await
         },
+        "bybit_usdt_swap" => {
+            bybit_usdt_swap_data_listener::INDICATOR_MAP.lock().await
+        },
         _ => {
             let response = Response {
                 query: serde_json::to_value(&query.into_inner()).unwrap(),
@@ -108,6 +111,7 @@ async fn get_exchanges() -> impl Responder {
         "coinex_usdt_swap",
         "phemex_usdt_swap",
         "mexc_usdt_swap",
+        "bybit_usdt_swap",
     ];
     let response_data = json!(exchanges);
 

+ 636 - 0
standard/src/bybit_swap.rs

@@ -0,0 +1,636 @@
+use std::collections::{BTreeMap};
+use std::io::{Error, ErrorKind};
+use std::str::FromStr;
+use tokio::sync::mpsc::Sender;
+use async_trait::async_trait;
+use rust_decimal::Decimal;
+use serde_json::{from_value, json, Value};
+use rust_decimal::prelude::FromPrimitive;
+use serde::{Deserialize, Serialize};
+use tracing::{error, trace};
+use exchanges::bybit_swap_rest::BybitSwapRest;
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, PositionModeEnum};
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct SwapTicker {
+    symbol: String,
+    high_price24h: Decimal,
+    low_price24h: Decimal,
+    bid1_price: Decimal,
+    ask1_price: Decimal,
+    last_price: Decimal,
+    volume24h: Decimal
+}
+
+#[allow(dead_code)]
+#[derive(Clone)]
+pub struct BybitSwap {
+    exchange: ExchangeEnum,
+    symbol: String,
+    symbol_uppercase: String,
+    is_colo: bool,
+    params: BTreeMap<String, String>,
+    request: BybitSwapRest,
+    market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
+}
+
+impl BybitSwap {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> BybitSwap {
+        let market = Market::new();
+        let mut bybit_swap = BybitSwap {
+            exchange: ExchangeEnum::BybitSwap,
+            symbol: symbol.to_uppercase(),
+            symbol_uppercase: symbol.replace("_", "").to_uppercase(),
+            is_colo,
+            params: params.clone(),
+            request: BybitSwapRest::new(is_colo, params.clone()),
+            market,
+            order_sender,
+            error_sender,
+        };
+
+        // 修改持仓模式
+        let symbol_array: Vec<&str> = symbol.split("_").collect();
+        let mode_result = bybit_swap.set_dual_mode(symbol_array[1], true).await;
+        match mode_result {
+            Ok(_) => {
+                trace!("Bybit:设置持仓模式成功!")
+            }
+            Err(error) => {
+                error!("Bybit:设置持仓模式失败!mode_result={}", error)
+            }
+        }
+        // 获取市场信息
+        bybit_swap.market = BybitSwap::get_market(&mut bybit_swap).await.unwrap_or(bybit_swap.market);
+        return bybit_swap;
+    }
+}
+
+#[async_trait]
+impl Platform for BybitSwap {
+    // 克隆方法
+    fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
+    // 获取交易所模式
+    fn get_self_exchange(&self) -> ExchangeEnum {
+        ExchangeEnum::GateSwap
+    }
+    // 获取交易对
+    fn get_self_symbol(&self) -> String { self.symbol.clone() }
+    // 获取是否使用高速通道
+    fn get_self_is_colo(&self) -> bool {
+        self.is_colo
+    }
+    // 获取params信息
+    fn get_self_params(&self) -> BTreeMap<String, String> {
+        self.params.clone()
+    }
+    // 获取market信息
+    fn get_self_market(&self) -> Market { self.market.clone() }
+    // 获取请求时间
+    fn get_request_delays(&self) -> Vec<i64> { self.request.get_delays() }
+    // 获取请求平均时间
+    fn get_request_avg_delay(&self) -> Decimal { self.request.get_avg_delay() }
+    // 获取请求最大时间
+    fn get_request_max_delay(&self) -> i64 { self.request.get_max_delay() }
+
+    // 获取服务器时间
+    async fn get_server_time(&mut self) -> Result<String, Error> {
+        let res_data = self.request.get_server_time().await;
+        if res_data.code == 200 {
+            let result = res_data.data["server_time"].to_string();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取账号信息
+    async fn get_account(&mut self) -> Result<Account, Error> {
+        let symbol_array: Vec<&str> = self.symbol.split("_").collect();
+        let res_data = self.request.get_account_balance(symbol_array[1].parse().unwrap()).await;
+        if res_data.code == 200 {
+            let arr_infos: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
+            if arr_infos.len() < 1usize{
+                return Err(Error::new(ErrorKind::NotFound, format!("{} 无账户信息", symbol_array[1])));
+            }
+            let coin_infos: Vec<Value> = from_value(arr_infos[0]["coin"].clone()).unwrap();
+            if coin_infos.len() < 1usize{
+               return Err(Error::new(ErrorKind::NotFound, format!("{} 无账户信息", symbol_array[1])));
+            }
+            let balance = Decimal::from_str(coin_infos[0]["equity"].as_str().unwrap()).unwrap();
+            let available_balance = Decimal::from_str(coin_infos[0]["walletBalance"].as_str().unwrap()).unwrap();
+            let frozen_balance = balance - available_balance;
+            let result = Account {
+                coin: symbol_array[1].to_string(),
+                balance,
+                available_balance,
+                frozen_balance,
+                stocks: Decimal::ZERO,
+                available_stocks: Decimal::ZERO,
+                frozen_stocks: Decimal::ZERO,
+            };
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bybit_swap:该交易所方法未实现".to_string()))
+    }
+
+    // 获取持仓信息
+    async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let ct_val = self.market.ct_val;
+        let res_data = self.request.get_positions(symbol, "".to_string()).await;
+        if res_data.code == 200 {
+            let result = res_data.data["list"].as_array().unwrap().iter().map(|item| { format_position_item(item, ct_val) }).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取所有持仓
+    async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
+        let symbol_array: Vec<&str> = self.symbol.split("_").collect();
+        let ct_val = self.market.ct_val;
+        let res_data = self.request.get_positions("".to_string(), symbol_array[1].to_string().to_uppercase()).await;
+        if res_data.code == 200 {
+            let result = res_data.data["list"].as_array().unwrap().iter().map(|item| { format_position_item(item, ct_val) }).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取市场行情
+    async fn get_ticker(&mut self) -> Result<Ticker, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.get_tickers(symbol).await;
+        if res_data.code == 200 {
+            let list :Vec<SwapTicker> = from_value(res_data.data["list"].clone()).unwrap_or(Vec::new());
+
+            if list.len() < 1usize {
+                error!("bybit_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
+                return Err(Error::new(ErrorKind::Other, res_data.to_string()));
+            }
+            let value = list[0].clone();
+            Ok(Ticker{
+                time: chrono::Utc::now().timestamp_millis(),
+                high: value.high_price24h,
+                low: value.low_price24h,
+                sell: value.ask1_price,
+                buy: value.bid1_price,
+                last: value.last_price,
+                volume: value.volume24h
+            })
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
+        let symbol_upper = symbol.replace("_", "").to_uppercase();
+        let res_data = self.request.get_tickers(symbol_upper.clone()).await;
+        if res_data.code == 200 {
+            let list: Vec<SwapTicker> = from_value(res_data.data["list"].clone()).unwrap();
+            let ticker_info = list.iter().find(|&item| item.symbol == symbol_upper);
+
+            match ticker_info {
+                None => {
+                    error!("bybit_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let result = Ticker {
+                        time: chrono::Utc::now().timestamp_millis(),
+                        high: value.high_price24h,
+                        low: value.low_price24h,
+                        sell: value.ask1_price,
+                        buy: value.bid1_price,
+                        last: value.last_price,
+                        volume: value.volume24h
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market(&mut self) -> Result<Market, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.get_instruments_info(symbol.clone()).await;
+        if res_data.code == 200 {
+            let arr_data: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
+            let market_info = arr_data.iter().find(|&item| item["symbol"].as_str().unwrap() == symbol);
+            match market_info {
+                None => {
+                    error!("bybit_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let base_coin = value["baseCoin"].as_str().unwrap();
+                    let quote_coin = value["quoteCoin"].as_str().unwrap();
+                    let name = format!("{}_{}",base_coin, quote_coin);
+                    let tick_size = Decimal::from_str(value["priceFilter"]["minPrice"].as_str().unwrap().trim()).unwrap();
+                    let min_qty = Decimal::from_str(value["lotSizeFilter"]["minOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let max_qty = Decimal::from_str(value["lotSizeFilter"]["maxOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let ct_val = Decimal::ONE;
+
+                    let amount_size = min_qty * ct_val;
+                    let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
+                    let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
+                    let min_notional = min_qty * ct_val;
+                    let max_notional = max_qty * ct_val;
+
+                    let result = Market {
+                        symbol: name,
+                        base_asset: base_coin.to_string(),
+                        quote_asset: quote_coin.to_string(),
+                        tick_size,
+                        amount_size,
+                        price_precision,
+                        amount_precision,
+                        min_qty,
+                        max_qty,
+                        min_notional,
+                        max_notional,
+                        ct_val,
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
+        let symbol = symbol.replace("_", "").to_uppercase();
+        let res_data = self.request.get_instruments_info(symbol.clone()).await;
+        if res_data.code == 200 {
+            let arr_data: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
+            let market_info = arr_data.iter().find(|item| item["symbol"].as_str().unwrap() == symbol);
+            match market_info {
+                None => {
+                    error!("bybit_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let base_coin = value["baseCoin"].as_str().unwrap();
+                    let quote_coin = value["quoteCoin"].as_str().unwrap();
+                    let name = format!("{}_{}",base_coin, quote_coin);
+                    let tick_size = Decimal::from_str(value["priceFilter"]["minPrice"].as_str().unwrap().trim()).unwrap();
+                    let min_qty = Decimal::from_str(value["lotSizeFilter"]["minOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let max_qty = Decimal::from_str(value["lotSizeFilter"]["maxOrderQty"].as_str().unwrap().trim()).unwrap();
+                    let ct_val = Decimal::ONE;
+
+                    let amount_size = min_qty * ct_val;
+                    let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
+                    let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
+                    let min_notional = min_qty * ct_val;
+                    let max_notional = max_qty * ct_val;
+
+                    let result = Market {
+                        symbol: name,
+                        base_asset: base_coin.to_string(),
+                        quote_asset: quote_coin.to_string(),
+                        tick_size,
+                        amount_size,
+                        price_precision,
+                        amount_precision,
+                        min_qty,
+                        max_qty,
+                        min_notional,
+                        max_notional,
+                        ct_val,
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 获取订单详情
+    async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let ct_val = self.market.ct_val;
+        let id = if !custom_id.trim().eq("") { format!("t-{}", custom_id) } else { String::new() };
+        let res_data = self.request.get_order(symbol, order_id.parse().unwrap(), id).await;
+        if res_data.code == 200 {
+            let res_data_json: Value = res_data.data["list"].clone();
+            if res_data_json.is_array() && res_data_json.as_array().unwrap().len() == 0 {
+                return Err(Error::new(ErrorKind::Other, "没有该订单!"));
+            }
+            let result = format_order_item(res_data_json.as_array().unwrap()[0].clone(), ct_val);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取订单列表
+    async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> {
+       Err(Error::new(ErrorKind::Other, "bybit获取订单列表暂未实现".to_string()))
+    }
+    // 下单接口
+    async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let ct_val = self.market.ct_val;
+        let size = amount / ct_val;
+        let mut params = json!({
+            "orderLinkId": format!("t-{}", custom_id),
+            "symbol": symbol.to_string(),
+            "price": price.to_string(),
+            "category": "linear",
+            "orderType":"Limit",
+            "qty": json!(size),
+            // 0.單向持倉 1.買側雙向持倉 2.賣側雙向持倉
+            "positionIdx": json!(1),
+            "reduceOnly": json!(false)
+        });
+
+        if price.eq(&Decimal::ZERO) {
+            params["timeInForce"] = json!("IOC".to_string());
+        }
+        match origin_side {
+            "kd" => {
+                params["side"] = json!("Buy");
+            }
+            "pd" => {
+                params["side"] = json!("Sell");
+                // 减仓
+                params["reduceOnly"] = json!(true);
+            }
+            "kk" => {
+                params["side"] = json!("Sell");
+                params["positionIdx"] = json!(2);
+            }
+            "pk" => {
+                params["side"] = json!("Buy");
+                // 减仓
+                params["reduceOnly"] = json!(true);
+                params["positionIdx"] = json!(2);
+            }
+            _ => { error!("下单参数错误"); }
+        };
+        let res_data = self.request.swap_order(params).await;
+        if res_data.code == 200 {
+            let result = format_new_order_item(res_data.data, price, size);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol_upper = symbol.replace("_", "").trim().to_uppercase();
+        let size = (amount / ct_val).floor();
+        let order_type = if price == Decimal::ZERO {
+            "Market"
+        } else {
+            "Limit"
+        };
+        let mut params = json!({
+            "orderLinkId": format!("t-{}", custom_id),
+            "symbol": symbol_upper,
+            "price": price.to_string(),
+            "category": "linear",
+            "orderType": order_type,
+            "qty": json!(size),
+            // 0.單向持倉 1.買側雙向持倉 2.賣側雙向持倉
+            "positionIdx": json!(1),
+            "reduceOnly": json!(false)
+        });
+
+        if price.eq(&Decimal::ZERO) {
+            params["timeInForce"] = json!("IOC".to_string());
+        }
+        match origin_side {
+            "kd" => {
+                params["side"] = json!("Buy");
+            }
+            "pd" => {
+                params["side"] = json!("Sell");
+                params["positionIdx"] = json!(1);
+                // 减仓
+                params["reduceOnly"] = json!(true);
+            }
+            "kk" => {
+                params["side"] = json!("Sell");
+                params["positionIdx"] = json!(2);
+            }
+            "pk" => {
+                params["side"] = json!("Buy");
+                params["positionIdx"] = json!(2);
+                // 减仓
+                params["reduceOnly"] = json!(true);
+            }
+            _ => { error!("下单参数错误"); }
+        };
+        let res_data = self.request.swap_order(params.clone()).await;
+        if res_data.code == 200 {
+            let result = format_new_order_item(res_data.data, price, size);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 撤销订单
+    async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let id = format!("t-{}", custom_id);
+        let res_data = self.request.cancel_order(symbol, String::from(order_id), id.clone()).await;
+        if res_data.code == 200 {
+            let result = format_cancel_order_item(res_data.data);
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 批量撤销订单
+    async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.cancel_orders(symbol).await;
+        if res_data.code == 200 {
+            let res_arr: Vec<Value> = from_value(res_data.data).unwrap();
+            let result = res_arr.iter().map(|item| format_cancel_order_item(item.clone())).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.cancel_orders(symbol).await;
+        if res_data.code == 200 {
+            let res_arr: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
+            let result = res_arr.iter().map(|item| format_cancel_order_item(item.clone())).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bybit_swap:该交易所方法未实现".to_string()))
+    }
+
+    async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "bybit_swap:该交易所方法未实现".to_string()))
+    }
+
+    // 设置持仓模式
+    async fn set_dual_mode(&mut self, _coin: &str, is_dual_mode: bool) -> Result<String, Error> {
+        let coin_format = self.symbol_uppercase.clone();
+        let mut mod_num = 0;
+        if is_dual_mode {
+            mod_num = 3;
+        }
+        let res_data = self.request.set_position_mode(coin_format, mod_num).await;
+        if res_data.code == 200 {
+            Ok(res_data.data.to_string())
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 更新双持仓模式下杠杆
+    async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
+        let symbol = self.symbol_uppercase.clone();
+        let res_data = self.request.set_leverage(symbol, leverage.to_string()).await;
+        if res_data.code == 200 {
+            Ok(res_data.data.to_string())
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate:该交易所方法未实现".to_string())) }
+
+    // 交易账户互转
+    async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
+        // let coin_format = coin.to_string().to_lowercase();
+        // let res_data = self.request.wallet_transfers(coin_format.clone(), from.to_string(), to.to_string(), amount.to_string(), coin_format.clone()).await;
+        // if res_data.code == 200 {
+        //     let res_data_str = &res_data.data;
+        //     let result = res_data_str.clone();
+        //     Ok(result)
+        // } else {
+        //     Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        // }
+        Err(Error::new(ErrorKind::Other, "暂未实现!"))
+    }
+}
+
+pub fn format_position_item(position: &Value, ct_val: Decimal) -> Position {
+    let position_idx = position["positionIdx"].to_string();
+    let mut position_mode = match position_idx.as_str() {
+        "0" => PositionModeEnum::Both,
+        "1" => PositionModeEnum::Long,
+        "2" => PositionModeEnum::Short,
+        _ => {
+            error!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position);
+            panic!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
+        }
+    };
+    let size_str: String = from_value(position["size"].clone()).unwrap();
+    let size = Decimal::from_str(size_str.as_str()).unwrap();
+    let amount = size * ct_val;
+    let mut profit = Decimal::ZERO;
+    let profit_str = position["unrealisedPnl"].as_str().unwrap_or("0");
+    if profit_str != "" {
+        profit = Decimal::from_str(profit_str).unwrap();
+    }
+
+    match position_mode {
+        PositionModeEnum::Both => {
+            position_mode = match amount {
+                amount if amount > Decimal::ZERO => PositionModeEnum::Long,
+                amount if amount < Decimal::ZERO => PositionModeEnum::Short,
+                _ => { PositionModeEnum::Both }
+            }
+        }
+        _ => {}
+    }
+    Position {
+        symbol: position["symbol"].as_str().unwrap_or("").parse().unwrap(),
+        margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
+        amount,
+        frozen_amount: Decimal::ZERO,
+        price: Decimal::from_str(position["avgPrice"].as_str().unwrap()).unwrap(),
+        profit,
+        position_mode,
+        margin: Decimal::from_str(position["positionBalance"].as_str().unwrap()).unwrap(),
+    }
+}
+
+fn format_cancel_order_item(order: Value) -> Order {
+     Order {
+        id: format!("{}", order["orderId"].as_str().unwrap()),
+        custom_id: order["orderLinkId"].as_str().unwrap().replace("t-my-custom-id_", "").replace("t-", ""),
+        price: Decimal::ZERO,
+        amount: Decimal::ZERO,
+        deal_amount: Decimal::ZERO,
+        avg_price: Decimal::ZERO,
+        status: "REMOVE".to_string(),
+        order_type: "limit".to_string()
+    }
+}
+
+fn format_new_order_item(order: Value, price: Decimal, amount: Decimal) -> Order {
+    Order {
+        id: format!("{}", order["orderId"].as_str().unwrap()),
+        custom_id: order["orderLinkId"].as_str().unwrap().replace("t-my-custom-id_", "").replace("t-", ""),
+        price,
+        amount,
+        deal_amount: Decimal::ZERO,
+        avg_price: price,
+        status: "NEW".to_string(),
+        order_type: "limit".to_string()
+    }
+}
+
+pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
+    let status = order["orderStatus"].as_str().unwrap_or("");
+    let text = order["orderLinkId"].as_str().unwrap_or("");
+    let mut size = Decimal::ZERO;
+    let mut deal_amount = Decimal::ZERO;
+    let mut avg_price = Decimal::ZERO;
+
+    let right_str = order["cumExecQty"].to_string();
+    let size_str = order["qty"].to_string();
+
+    if !order.get("qty").is_some() {
+        size = Decimal::from_str(size_str.as_str()).unwrap();
+        let right_val = Decimal::from_str(order["cumExecValue"].as_str().unwrap()).unwrap();
+        let right = Decimal::from_str(right_str.as_str()).unwrap();
+        if right != Decimal::ZERO {
+            avg_price = right_val / right;
+        }
+        deal_amount = right * ct_val;
+    }
+
+    let amount = size * ct_val;
+    let custom_status = if status == "Filled" || status == "Cancelled" { "REMOVE".to_string() } else if status == "New" { "NEW".to_string() } else {
+        "NULL".to_string()
+    };
+    let rst_order = Order {
+        id: format!("{}", order["orderId"].as_str().unwrap()),
+        custom_id: text.replace("t-my-custom-id_", "").replace("t-", ""),
+        price: Decimal::from_str(order["price"].as_str().unwrap()).unwrap(),
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: "limit".to_string()
+    };
+    return rst_order;
+}

+ 26 - 0
standard/src/bybit_swap_handle.rs

@@ -0,0 +1,26 @@
+use std::str::FromStr;
+use std::time::SystemTime;
+use rust_decimal::Decimal;
+use exchanges::response_base::ResponseData;
+use rust_decimal::prelude::FromPrimitive;
+use crate::{Trade};
+
+pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
+    let result = res_data.data["data"].as_array().unwrap();
+    let mut trades = vec![];
+
+    for item in result {
+        let side = item["S"].as_str().unwrap();
+        let size = Decimal::from_str(item["v"].as_str().unwrap()).unwrap();
+        let id = SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_nanos().to_string();
+        trades.push(Trade {
+            id: id.clone(),
+            time: Decimal::from_i64(item["T"].as_i64().unwrap()).unwrap(),
+            size: if side == "Buy" { size } else { -size },
+            price: Decimal::from_str(item["p"].as_str().unwrap()).unwrap(),
+            symbol: item["s"].as_str().unwrap().replace("USDT", "_USDT"),
+        })
+    }
+
+    return trades;
+}

+ 6 - 1
standard/src/exchange.rs

@@ -3,6 +3,7 @@ use std::io::Error;
 use tokio::sync::mpsc::Sender;
 use crate::{Order, Platform};
 use crate::binance_swap::BinanceSwap;
+use crate::bybit_swap::BybitSwap;
 use crate::coinex_swap::CoinexSwap;
 use crate::gate_swap::GateSwap;
 use crate::mexc_swap::MexcSwap;
@@ -20,7 +21,8 @@ pub enum ExchangeEnum {
     GateSwap,
     CoinexSwap,
     PhemexSwap,
-    MexcSwap
+    MexcSwap,
+    BybitSwap,
 }
 
 /// Exchange结构体
@@ -80,6 +82,9 @@ impl Exchange {
             ExchangeEnum::MexcSwap => {
                 Box::new(MexcSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
+            ExchangeEnum::BybitSwap => {
+                Box::new(BybitSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
+            }
         }
     }
 }

+ 4 - 1
standard/src/exchange_struct_handler.rs

@@ -1,6 +1,6 @@
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, coinex_swap_handle, gate_swap_handle, mexc_swap_handle, phemex_swap_handle};
+use crate::{binance_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_swap_handle, mexc_swap_handle, phemex_swap_handle};
 use crate::{ Trade };
 
 #[allow(dead_code)]
@@ -26,6 +26,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::MexcSwap => {
                 mexc_swap_handle::format_trade_items(&res_data)
             }
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::format_trade_items(&res_data)
+            }
         }
     }
 }

+ 2 - 0
standard/src/lib.rs

@@ -23,6 +23,8 @@ pub mod phemex_swap;
 pub mod phemex_swap_handle;
 pub mod mexc_swap;
 pub mod mexc_swap_handle;
+pub mod bybit_swap;
+pub mod bybit_swap_handle;
 
 /// 持仓模式枚举
 /// - `Both`:单持仓方向