Эх сурвалжийг харах

bitget与gate的k线数据格式化做完了。

skyffire 1 жил өмнө
parent
commit
2742736873

+ 9 - 0
exchanges/src/bitget_swap_ws.rs

@@ -19,6 +19,7 @@ pub enum BitgetSwapWsType {
 pub enum BitgetSwapSubscribeType {
     PuTrade,
     PuBooks1,
+    PuCandle1m,
 
     PrAccount,
     PrPosition,
@@ -97,6 +98,7 @@ impl BitgetSwapWs {
             if match t {
                 BitgetSwapSubscribeType::PuTrade => false,
                 BitgetSwapSubscribeType::PuBooks1 => false,
+                BitgetSwapSubscribeType::PuCandle1m => false,
 
                 BitgetSwapSubscribeType::PrAccount => true,
                 BitgetSwapSubscribeType::PrOrders => true,
@@ -129,6 +131,13 @@ impl BitgetSwapWs {
                     "instId": symbol,
                 })
             },
+            BitgetSwapSubscribeType::PuCandle1m => {
+                json!({
+                    "instType": "USDT-FUTURES",
+                    "channel": "candle1m",
+                    "instId": symbol,
+                })
+            },
 
             // 私有订阅
             BitgetSwapSubscribeType::PrAccount => {

+ 9 - 2
src/bitget_usdt_swap_data_listener.rs

@@ -42,7 +42,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         let mut ws = BitgetSwapWs::new_with_tag(name.to_string(), false, None, BitgetSwapWsType::Public);
             ws.set_subscribe(vec![
-                BitgetSwapSubscribeType::PuTrade,
+                // BitgetSwapSubscribeType::PuTrade,
+                BitgetSwapSubscribeType::PuCandle1m
             ]);
 
         // 建立链接
@@ -75,7 +76,13 @@ pub async fn data_listener(response: ResponseData) {
             }
         },
         // k线数据
-        "k线" => {},
+        "candle1m" => {
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::BitgetSwap, &response);
+
+            for record in records.iter() {
+                info!(?record);
+            }
+        },
         _ => {
             info!("48 未知的数据类型: {:?}", response)
         }

+ 9 - 3
src/gate_usdt_swap_data_listener.rs

@@ -41,8 +41,8 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
     tokio::spawn(async move {
         let mut ws = GateSwapWs::new_with_tag(name.to_string(), false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
             ws.set_subscribe(vec![
-                GateSwapSubscribeType::PuFuturesTrades,
-                // GateSwapSubscribeType::PuFuturesCandlesticks,
+                // GateSwapSubscribeType::PuFuturesTrades,
+                GateSwapSubscribeType::PuFuturesCandlesticks,
                 // GateSwapSubscribeType::PuFuturesOrderBook
             ]);
 
@@ -76,7 +76,13 @@ pub async fn data_listener(response: ResponseData) {
             }
         },
         // k线数据
-        "futures.candlesticks" => {},
+        "futures.candlesticks" => {
+            let records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSwap, &response);
+
+            for record in records.iter() {
+                info!(?record);
+            }
+        },
         _ => {
             info!("48 未知的数据类型: {:?}", response)
         }

+ 5 - 1
src/listener_tools.rs

@@ -18,7 +18,7 @@ pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, Trad
                 let depths_json = serde_json::to_string(trades).unwrap();
                 let path = generate_file_path(exchange, new_trade.symbol.as_str(), "trades", last_trade_minutes);
 
-                info!(?path);
+                // info!(?path);
 
                 write_to_file(depths_json, path);
 
@@ -32,6 +32,10 @@ pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, Trad
     }
 }
 
+// pub async fn update_record(new_record: &Record, &records_map: MutexGuard<`_, RecordMap>, exchange: &str) {
+//
+// }
+
 // // 更新深度数据
 // pub async fn update_depth(new_depth: &Depth) {
 //     let mut depth_map = DEPTH_MAP.lock().await;

+ 3 - 3
src/main.rs

@@ -24,14 +24,14 @@ async fn main() {
     // 掌控全局的关闭
     let running = Arc::new(AtomicBool::new(true));
     // 启动各交易所的数据监听器
-    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
+    // gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
-    let panic_running = running.clone();
+    // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {
         let msg = format!("type=panic, msg={:?}, location={:?}", panic_info.to_string(), panic_info.location());
         warn!("{}", msg);
-        panic_running.store(false, Ordering::Relaxed);
+        // panic_running.store(false, Ordering::Relaxed);
     }));
     // 初始化数据服务器
     server::run_server(8888, running.clone());

+ 2 - 2
src/server.rs

@@ -69,12 +69,12 @@ async fn get_trades(query: web::Query<TradesQuery>) -> impl Responder {
         let response = Response {
             query: serde_json::to_value(&query.into_inner()).unwrap(),
             msg: Some("查询内容有误,必须包含四个参数:[symbol, exchange, start_time, end_time]".to_string()),
-            code: 400,
+            code: 500,
             data: Value::Null,
         };
 
         let json_string = serde_json::to_string(&response).unwrap();
-        HttpResponse::BadRequest().content_type("application/json").body(json_string)
+        HttpResponse::Ok().content_type("application/json").body(json_string)
     }
 }
 

+ 22 - 1
standard/src/bitget_swap_handle.rs

@@ -3,7 +3,7 @@ use rust_decimal::Decimal;
 use rust_decimal::prelude::FromPrimitive;
 use serde_json::Value;
 use exchanges::response_base::ResponseData;
-use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Trade};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Trade, Record};
 
 // 处理账号信息
 pub fn handle_account_info(response: &ResponseData, _symbol: &String) -> Account {
@@ -170,6 +170,27 @@ pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
     return trades
 }
 
+pub fn handle_records(value: &Value) -> Vec<Record> {
+    let mut records = vec![];
+
+    let record_value_vec = value["data"].as_array().unwrap();
+    let symbol = value["arg"]["instId"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+    for record_value in record_value_vec {
+        let arr = record_value.as_array().unwrap();
+        records.push(Record {
+            time: Decimal::from_str(arr[0].as_str().unwrap()).unwrap(),
+            open: Decimal::from_str(arr[1].as_str().unwrap()).unwrap(),
+            high: Decimal::from_str(arr[2].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(arr[3].as_str().unwrap()).unwrap(),
+            close: Decimal::from_str(arr[4].as_str().unwrap()).unwrap(),
+            volume: Decimal::from_str(arr[6].as_str().unwrap()).unwrap(),
+            symbol: symbol.clone(),
+        });
+    }
+
+    return records
+}
+
 // 处理特殊深度数据
 // pub fn handle_special_depth(res_data: ResponseData) -> SpecialDepth {
 //     HandleSwapInfo::handle_special_depth(ExchangeEnum::BitgetSwap, res_data)

+ 40 - 3
standard/src/exchange_struct_handler.rs

@@ -1,4 +1,4 @@
-use crate::Trade;
+use crate::{Record, Ticker, Trade};
 use std::str::FromStr;
 use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
@@ -6,7 +6,7 @@ use tracing::{error};
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
 use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, Depth};
-use crate::{Account, OrderBook, Position, SpecialDepth, SpecialOrder};
+use crate::{Account, OrderBook, Position, SpecialOrder};
 
 #[allow(dead_code)]
 pub struct ExchangeStructHandler;
@@ -96,7 +96,7 @@ impl ExchangeStructHandler {
         }
     }
     // 处理Ticker信息
-    pub fn book_ticker_handle(exchange: ExchangeEnum, _res_data: &ResponseData) -> SpecialDepth {
+    pub fn book_ticker_handle(exchange: ExchangeEnum, _res_data: &ResponseData) -> Ticker {
         match exchange {
             // ExchangeEnum::BinanceSpot => {
             //     binance_spot_handle::handle_special_ticker(res_data)
@@ -134,6 +134,43 @@ impl ExchangeStructHandler {
             }
         }
     }
+    // 处理蜡烛信息
+    pub fn records_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Vec<Record> {
+        match exchange {
+            // ExchangeEnum::BinanceSpot => {
+            //     binance_spot_handle::handle_special_ticker(res_data)
+            // }
+            ExchangeEnum::BinanceSwap => {
+                // binance_swap_handle::handle_book_ticker(res_data)
+                panic!("BinanceSwap records_handle 未实现格式化");
+            }
+            ExchangeEnum::KucoinSwap => {
+                // kucoin_handle::handle_book_ticker(res_data)
+                panic!("KucoinSwap records_handle 未实现格式化");
+            },
+            ExchangeEnum::BybitSwap => {
+                panic!("BybitSwap records_handle 未实现格式化");
+            }
+            // ExchangeEnum::KucoinSpot => {
+            //     kucoin_spot_handle::handle_special_ticker(res_data)
+            // }
+            // ExchangeEnum::KucoinSpot => {
+            //     kucoin_spot_handle::handle_special_ticker(res_data)
+            // }
+            // ExchangeEnum::OkxSwap => {
+            //     okx_handle::handle_special_ticker(res_data)
+            // }
+            // ExchangeEnum::BitgetSpot => {
+            //     bitget_spot_handle::handle_special_ticker(res_data)
+            // },
+            ExchangeEnum::GateSwap => {
+                gate_swap_handle::handle_records(&res_data.data)
+            }
+            ExchangeEnum::BitgetSwap => {
+                bitget_swap_handle::handle_records(&res_data.data)
+            }
+        }
+    }
     // 处理账号信息
     pub fn account_info_handle(exchange: ExchangeEnum, res_data: &ResponseData, symbol: &String) -> Account {
         match exchange {

+ 23 - 1
standard/src/gate_swap_handle.rs

@@ -4,7 +4,7 @@ use rust_decimal::prelude::FromPrimitive;
 use serde_json::Value;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
-use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Trade};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Trade, Record};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -137,6 +137,28 @@ pub fn format_order_item(order: serde_json::Value, ct_val: Decimal) -> Order {
 //     }
 // }
 
+pub fn handle_records(value: &Value) -> Vec<Record> {
+    let mut records = vec![];
+
+    for record_value in value.as_array().unwrap() {
+        let n = record_value["n"].as_str().unwrap().to_string();
+        let n_split: Vec<String> = n.split("_").map(|s| s.to_string()).collect();
+        let symbol = format!("{}_{}", n_split[n_split.len() - 2], n_split[n_split.len() - 1]);
+
+        records.push(Record {
+            time: Decimal::from(record_value["t"].as_i64().unwrap()) * Decimal::ONE_THOUSAND,
+            open: Decimal::from_str(record_value["o"].as_str().unwrap()).unwrap(),
+            high: Decimal::from_str(record_value["h"].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(record_value["l"].as_str().unwrap()).unwrap(),
+            close: Decimal::from_str(record_value["c"].as_str().unwrap()).unwrap(),
+            volume: Decimal::from(record_value["v"].as_i64().unwrap()),
+            symbol,
+        });
+    }
+
+    return records
+}
+
 pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for value in value.as_array().unwrap() {

+ 9 - 28
standard/src/lib.rs

@@ -94,9 +94,10 @@ pub struct Trade {
 }
 
 /// 特殊压缩结构体(订单流)
-/// - `0(Decimal)`: 交易更新时间戳(ms)
-/// - `1(Decimal)`: 交易量,负数是卖方
-/// - `2(Decimal)`: 成交价格
+/// - `0(Decimal)`: id
+/// - `1(Decimal)`: 交易更新时间戳(ms)
+/// - `2(Decimal)`: 交易量,负数是卖方
+/// - `3(Decimal)`: 成交价格
 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct SpecialTrade(pub Vec<String>);
 impl SpecialTrade {
@@ -144,29 +145,6 @@ impl Depth {
     }
 }
 
-/// 特殊Depth结构体(市场深度),用于存放到本地数据库中
-/// - `a(Vec<Vec<Decimal>>)`: asks;
-/// - `b(Vec<Vec<Decimal>>)`: bids;
-/// - `t(Decimal)`: 数据生成时间
-#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct SpecialDepth {
-    pub b: Vec<Vec<Decimal>>,
-    pub a: Vec<Vec<Decimal>>,
-    pub t: Decimal,
-}
-
-impl SpecialDepth {
-    pub fn new(depth: &Depth) -> SpecialDepth {
-        let bids = depth.bids.iter().map(|ob| vec![ob.price, ob.amount]).collect::<Vec<_>>();
-        let asks = depth.asks.iter().map(|ob| vec![ob.price, ob.amount]).collect::<Vec<_>>();
-        SpecialDepth {
-            b: bids,
-            a: asks,
-            t: depth.time,
-        }
-    }
-}
-
 /// 特殊Ticker结构体(市场行情)
 /// - `sell(Decimal)`: 卖一价
 /// - `buy(Decimal)`: 买一价
@@ -219,25 +197,28 @@ impl OrderBook {
 /// - `low(Decimal):` 最低价;
 /// - `close(Decimal)`: 收盘价;
 /// - `volume(Decimal)`: 交易量;
+/// - `symbol(String)`: 交易对;
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Record {
-    pub time: i64,
+    pub time: Decimal,
     pub open: Decimal,
     pub high: Decimal,
     pub low: Decimal,
     pub close: Decimal,
     pub volume: Decimal,
+    pub symbol: String
 }
 
 impl Record {
     pub fn new() -> Record {
         Record {
-            time: 0,
+            time: Default::default(),
             open: Default::default(),
             high: Default::default(),
             low: Default::default(),
             close: Default::default(),
             volume: Default::default(),
+            symbol: "".to_string(),
         }
     }
 }