Pārlūkot izejas kodu

添加bybit交易所

DESKTOP-NE65RNK\Citrus_limon 1 gadu atpakaļ
vecāks
revīzija
b29837e031

+ 14 - 1
exchanges/src/bybit_swap_rest.rs

@@ -117,6 +117,19 @@ impl BybitSwapRest {
         data
     }
     //查詢可交易產品的規格信息
+    pub async fn get_instruments_list(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "category":"linear",
+         });
+        let data = self.request("GET".to_string(),
+                                "/v5".to_string(),
+                                "/market/instruments-info".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查詢可交易產品的規格信息
     pub async fn get_instruments_info(&mut self, symbol: String) -> ResponseData {
         let params = serde_json::json!({
             "category":"linear",
@@ -438,7 +451,7 @@ impl BybitSwapRest {
             let body = response.text().await.unwrap();
 
             ResponseData::error(self.tag.clone(), body)
-        }
+        };
     }
 
     pub fn on_success_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {

+ 4 - 4
exchanges/src/bybit_swap_ws.rs

@@ -313,7 +313,7 @@ impl BybitSwapWs {
             }
         } else if json_value.get("topic").is_some() && json_value.get("data").is_some() {
             let channel = json_value["topic"].to_string();
-            res_data.data = json_value["data"].clone();
+            res_data.data = json_value.clone();
 
             res_data.code = 200;
 
@@ -323,13 +323,11 @@ impl BybitSwapWs {
                 // bybit 时间在data块外
                 res_data.reach_time = json_value.get("ts").unwrap().as_i64().unwrap_or(0i64);
             } else if channel.contains("publicTrade") {
-                res_data.channel = "trade".to_string();
+                res_data.channel = "trades".to_string();
                 res_data.data_type = json_value["type"].as_str().unwrap().to_string();
             } else if channel.contains("tickers") {
                 res_data.channel = "tickers".to_string();
                 res_data.data["ts"] = json_value["ts"].clone();
-            } else if channel.contains("kline") {
-                res_data.channel = "kline".to_string();
             } else if channel.contains("position") {
                 res_data.channel = "position".to_string();
             } else if channel.contains("execution") {
@@ -338,6 +336,8 @@ impl BybitSwapWs {
                 res_data.channel = "order".to_string();
             } else if channel.contains("wallet") {
                 res_data.channel = "wallet".to_string();
+            } else if channel.contains("kline") {
+                res_data.channel = "kline.1m".to_string();
             } else {
                 res_data.code = -1;
                 res_data.channel = "未知的频道".to_string();

+ 114 - 0
src/bybit_usdt_swap_data_listener.rs

@@ -0,0 +1,114 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use lazy_static::lazy_static;
+use tokio::sync::{Mutex};
+use tracing::info;
+use exchanges::bybit_swap_rest::BybitSwapRest;
+use exchanges::bybit_swap_ws::{BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
+use exchanges::response_base::ResponseData;
+use rust_decimal::Decimal;
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::delete_db_by_exchange;
+use crate::listener_tools::{RecordMap, TradeMap, update_record, update_trade};
+
+const EXCHANGE_NAME: &str = "bybit_usdt_swap";
+
+lazy_static! {
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "bybit_usdt_swap_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut bybit_rest = BybitSwapRest::new(false, login);
+    let response = bybit_rest.get_instruments_list().await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+
+        let data = response.data["list"].as_array().unwrap();
+        let mut mul_map = MUL_MAP.lock().await;
+        for info in data {
+            let symbol = info["symbol"].as_str().unwrap().to_string().replace("USDT","_USDT");
+            let mul = Decimal::ONE;
+            mul_map.insert(symbol.clone(), mul);
+
+            symbols.push(symbol)
+        }
+    }
+
+    // 将 symbols 分成每份20个元素的小块
+    for chunk in symbols.chunks(20) {
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let ws_name = name.to_string();
+        let write_tx_clone = Arc::clone(&write_tx_am);
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = BybitSwapWs::new_with_tag(ws_name, false, None, BybitSwapWsType::Public);
+            ws.set_subscribe(vec![
+                BybitSwapSubscribeType::PuBlicTrade,
+                BybitSwapSubscribeType::PuKline("1".to_string()),
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_clone, write_rx).await.unwrap();
+        });
+    }
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+        }
+    });
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 深度数据
+        "深度" => {
+            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
+            //
+            // update_depth(&depth).await;
+        },
+        // 订单流数据
+        "trades" => {
+            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BybitSwap, &response);
+
+            for trade in trades.iter() {
+                let trades_map = TRADES_MAP.lock().await;
+
+                update_trade(&trade, trades_map, EXCHANGE_NAME).await
+            }
+        },
+        // k线数据
+        "kline.1m" => {
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BybitSwap, &response);
+
+            if records.is_empty() {
+                return;
+            }
+
+            let record_map= RECORD_MAP.lock().await;
+            update_record(&records[records.len() - 1], record_map, EXCHANGE_NAME).await;
+        },
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+}

+ 8 - 6
src/main.rs

@@ -17,6 +17,7 @@ mod phemex_usdt_swap_data_listener;
 mod woo_usdt_swap_data_listener;
 mod cointr_usdt_swap_data_listener;
 mod gate_usdt_spot_data_listener;
+mod bybit_usdt_swap_data_listener;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -51,12 +52,13 @@ async fn main() {
     // bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // htx_usdt_swap_data_listener::run_listener(running.clone()).await;
 
-    binance_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
-    coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_usdt_spot_data_listener::run_listener(running.clone()).await;
-    mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // binance_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // gate_usdt_spot_data_listener::run_listener(running.clone()).await;
+    // mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
+    bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 1 - 0
src/server.rs

@@ -184,6 +184,7 @@ async fn get_exchanges() -> impl Responder {
         "phemex_usdt_swap",
         "mexc_usdt_swap",
         "gate_usdt_spot",
+        "bybit_usdt_swap",
     ];
     let response_data = json!(exchanges);
 

+ 47 - 6
standard/src/bybit_swap_handle.rs

@@ -3,7 +3,8 @@ use rust_decimal::Decimal;
 use serde_json::{from_value, Value};
 use tracing::{error};
 use exchanges::response_base::ResponseData;
-use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder};
+use rust_decimal::prelude::FromPrimitive;
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Record, Trade};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -11,13 +12,13 @@ pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account
 }
 
 pub fn format_account_info(data: Vec<Value>, symbol: &String) -> Account {
-    let account = data.iter().find(| &item | item["accountType"] == "UNIFIED");
+    let account = data.iter().find(|&item| item["accountType"] == "UNIFIED");
     match account {
         None => {
             error!("Bybit:格式化统一账户信息错误!\nformat_account_info: data={:?}", data);
             panic!("Bybit:格式化统一账户信息错误!\nformat_account_info: data={:?}", data)
         }
-        Some(val) =>{
+        Some(val) => {
             let arr: Vec<Value> = from_value(val["coin"].clone()).unwrap();
             let upper_str = symbol.to_uppercase();
             let symbol_array: Vec<&str> = upper_str.split("_").collect();
@@ -60,7 +61,7 @@ pub fn format_position_item(position: &Value, ct_val: &Decimal) -> Position {
             panic!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
         }
     };
-    let symbol_mapper =  position["symbol"].as_str().unwrap().to_string();
+    let symbol_mapper = position["symbol"].as_str().unwrap().to_string();
     let currency = "USDT";
     let coin = &symbol_mapper[..symbol_mapper.find(currency).unwrap_or(0)];
     let size_str: String = from_value(position["size"].clone()).unwrap();
@@ -77,7 +78,7 @@ pub fn format_position_item(position: &Value, ct_val: &Decimal) -> Position {
         _ => {}
     }
     Position {
-        symbol: format!{"{}_{}", coin, currency},
+        symbol: format! {"{}_{}", coin, currency},
         margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
         amount,
         frozen_amount: Decimal::ZERO,
@@ -126,7 +127,7 @@ pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
         deal_amount,
         avg_price,
         status: custom_status,
-        order_type: "limit".to_string()
+        order_type: "limit".to_string(),
     };
 
     return rst_order;
@@ -164,4 +165,44 @@ pub fn format_depth_items(value: serde_json::Value) -> Vec<OrderBook> {
         })
     }
     return depth_items;
+}
+
+pub fn handle_records(value: &Value) -> Vec<Record> {
+    let mut records = vec![];
+    let s = value["topic"].as_str().unwrap().to_string();
+    let s_split: Vec<String> = s.split(".").map(|s| s.to_string()).collect();
+    let symbol = s_split[2].replace("USDT", "_USDT");
+    let data = value["data"].clone();
+    for record_value in data.as_array().unwrap() {
+        records.push(Record {
+            time: Decimal::from_i64(record_value["timestamp"].as_i64().unwrap()).unwrap(),
+            open: Decimal::from_str(record_value["open"].as_str().unwrap()).unwrap(),
+            high: Decimal::from_str(record_value["high"].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(record_value["low"].as_str().unwrap()).unwrap(),
+            close: Decimal::from_str(record_value["close"].as_str().unwrap()).unwrap(),
+            volume: Decimal::from_str(record_value["volume"].as_str().unwrap()).unwrap(),
+            symbol: symbol.clone(),
+        });
+    }
+
+    return records;
+}
+
+pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
+    let result = res_data.data["data"].as_array().unwrap();
+    let mut trades = vec![];
+
+    for item in result {
+        let side = item["S"].as_str().unwrap();
+        let size = Decimal::from_str(item["v"].as_str().unwrap()).unwrap();
+        trades.push(Trade {
+            id: item["i"].to_string(),
+            time: Decimal::from_i64(item["T"].as_i64().unwrap()).unwrap(),
+            size: if side == "Buy" { size } else { -size },
+            price: Decimal::from_str(item["p"].as_str().unwrap()).unwrap(),
+            symbol: item["s"].as_str().unwrap().replace("USDT", "_USDT"),
+        })
+    }
+
+    return trades;
 }

+ 6 - 0
standard/src/exchange_struct_handler.rs

@@ -142,6 +142,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::CointrSwap => {
                 cointr_swap_handle::format_trade_items(&res_data)
             }
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::format_trade_items(&res_data)
+            }
             _ => {
                 error!("未找到该交易所!trades_handle: {:?}", exchange);
                 panic!("未找到该交易所!trades_handle: {:?}", exchange);
@@ -202,6 +205,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::CointrSwap => {
                 cointr_swap_handle::handle_records(&res_data.data)
             }
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_records(&res_data.data)
+            }
             _ => {
                 error!("未找到该交易所!records_handle: {:?}", exchange);
                 panic!("未找到该交易所!records_handle: {:?}", exchange);