Selaa lähdekoodia

添加gate币本位合约

DESKTOP-NE65RNK\Citrus_limon 1 vuosi sitten
vanhempi
commit
1f2b70eb9e

+ 17 - 17
exchanges/src/gate_spot_ws.rs

@@ -17,17 +17,17 @@ use crate::socket_tool::{AbstractWsMode, HeartbeatType};
 
 //类型
 pub enum GateSpotWsType {
-    PublicAndPrivate(String),
+    PublicAndPrivate,
 }
 
 
 //订阅频道
 #[derive(Clone)]
 pub enum GateSpotSubscribeType {
-    PuFuturesOrderBook,
-    PuFuturesCandlesticks,
-    PuFuturesTrades,
-    PuFuturesBookTicker,
+    PuSpotOrderBook,
+    PuSpotCandlesticks,
+    PuSpotTrades,
+    PuSpotBookTicker,
 }
 
 //账号信息
@@ -66,13 +66,13 @@ impl GateSpotWs {
     {
         /*******公共频道-私有频道数据组装*/
         let address_url = match ws_type {
-            GateSpotWsType::PublicAndPrivate(name) => {
+            GateSpotWsType::PublicAndPrivate => {
                 if is_colo {
-                    let url = format!("wss://fxws-private.gateapi.io/v4/ws/{}", name.to_string());
+                    let url = format!("wss://fxws-private.gateapi.io/v4/ws/");
                     info!("开启高速通道:{:?}",url);
                     url
                 } else {
-                    let url = format!("wss://api.gateio.ws/ws/v4/{}", name.to_string());
+                    let url = format!("wss://api.gateio.ws/ws/v4/");
                     info!("走普通通道:{}",url);
                     url
                 }
@@ -111,10 +111,10 @@ impl GateSpotWs {
     fn contains_pr(&self) -> bool {
         for t in self.subscribe_types.clone() {
             if match t {
-                GateSpotSubscribeType::PuFuturesOrderBook => false,
-                GateSpotSubscribeType::PuFuturesCandlesticks => false,
-                GateSpotSubscribeType::PuFuturesTrades => false,
-                GateSpotSubscribeType::PuFuturesBookTicker => false,
+                GateSpotSubscribeType::PuSpotOrderBook => false,
+                GateSpotSubscribeType::PuSpotCandlesticks => false,
+                GateSpotSubscribeType::PuSpotTrades => false,
+                GateSpotSubscribeType::PuSpotBookTicker => false,
             } {
                 return true;
             }
@@ -139,7 +139,7 @@ impl GateSpotWs {
         }
 
         match subscribe_type {
-            GateSpotSubscribeType::PuFuturesOrderBook => {
+            GateSpotSubscribeType::PuSpotOrderBook => {
                 json!({
                     "time": time,
                     "channel": "spot.order_book",
@@ -147,7 +147,7 @@ impl GateSpotWs {
                     "payload": [symbol, "20", "100ms"]
                 })
             }
-            GateSpotSubscribeType::PuFuturesBookTicker => {
+            GateSpotSubscribeType::PuSpotBookTicker => {
                 json!({
                     "time": time,
                     "channel": "spot.book_ticker",
@@ -155,7 +155,7 @@ impl GateSpotWs {
                     "payload": [symbol]
                 })
             }
-            GateSpotSubscribeType::PuFuturesCandlesticks => {
+            GateSpotSubscribeType::PuSpotCandlesticks => {
                 json!({
                     "time": time,
                     "channel": "spot.candlesticks",
@@ -163,7 +163,7 @@ impl GateSpotWs {
                     "payload":  ["1m", symbol]
                 })
             }
-            GateSpotSubscribeType::PuFuturesTrades => {
+            GateSpotSubscribeType::PuSpotTrades => {
                 json!({
                     "time": time,
                     "channel": "spot.trades",
@@ -282,7 +282,7 @@ impl GateSpotWs {
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
     {
-        // trace!("原始数据:{}", text);
+        trace!("原始数据:{}", text);
         let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
         let json_value: Value = serde_json::from_str(&text).unwrap();
 

+ 148 - 0
src/gate_coin_spot_data_listener.rs

@@ -0,0 +1,148 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use lazy_static::lazy_static;
+use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
+use tokio::sync::{Mutex};
+use tracing::info;
+use exchanges::gate_spot_rest::GateSpotRest;
+use exchanges::gate_spot_ws::{GateSpotSubscribeType, GateSpotWs, GateSpotWsType};
+use exchanges::response_base::ResponseData;
+use serde_json::json;
+use standard::{Depth, OrderBook, SimpleDepth};
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::delete_db_by_exchange;
+use crate::listener_tools::{RecordMap, TradeMap, DepthMap, SimpleDepthMap, update_record, update_trade, update_simple_depth};
+
+const EXCHANGE_NAME: &str = "gate_coin_spot";
+
+lazy_static! {
+    static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref SIMPLE_DEPTH_MAP: Mutex<SimpleDepthMap> = Mutex::new(HashMap::new());
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+    static ref MUL_MAP: Mutex<HashMap<String, Decimal>> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "gate_coin_spot_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut gate_rest = GateSpotRest::new(false, login);
+    let params = json!({});
+    let response = gate_rest.get_market_details(params).await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let symbol_infos = response.data.as_array().unwrap();
+        let mut mul_map = MUL_MAP.lock().await;
+        for symbol_info in symbol_infos {
+            let trade_status = symbol_info["trade_status"].as_str().unwrap();
+            let quote = symbol_info["quote"].as_str().unwrap();
+            if trade_status != "tradable" || quote == "USDT" { continue; };
+            // if trade_status != "tradable" { continue; };
+            // quanto_multiplier是ct_val
+            let symbol = symbol_info["id"].as_str().unwrap().to_string();
+            mul_map.insert(symbol.clone(), Decimal::ONE);
+            symbols.push(symbol)
+        }
+    }
+    for chunk in symbols.chunks(20) {
+        let ws_name = name.to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
+            ws.set_subscribe(vec![
+                GateSpotSubscribeType::PuSpotTrades,
+                GateSpotSubscribeType::PuSpotCandlesticks,
+                // GateSpotSubscribeType::PuFuturesOrderBook,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            delete_db_by_exchange(EXCHANGE_NAME, vec!["trades", "record"], 2880).await;
+            tokio::time::sleep(Duration::from_secs(60 * 60 * 4)).await;
+        }
+    });
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 深度数据
+        "spot.order_book" => {
+            let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSpot, &response);
+            let mul_map = MUL_MAP.lock().await;
+            let new_depth = Depth {
+                time: depth.time,
+                symbol: depth.symbol.clone(),
+                asks: depth.asks.iter().map(|ask| OrderBook { price: ask.price, amount: ask.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
+                bids: depth.bids.iter().map(|bid| OrderBook { price: bid.price, amount: bid.amount * mul_map[depth.symbol.clone().as_str()] }).collect(),
+            };
+
+            // 更新到本地数据库
+            // ------ 简易深度数据
+            let simple_depth = SimpleDepth::new(&new_depth);
+            let simple_depth_map = SIMPLE_DEPTH_MAP.lock().await;
+            update_simple_depth(&simple_depth, simple_depth_map, EXCHANGE_NAME).await;
+
+            // ------ 标准深度数据
+            // let depth_map = DEPTH_MAP.lock().await;
+            // update_depth(&new_depth, depth_map, EXCHANGE_NAME).await;
+        }
+        // 订单流数据
+        "spot.trades" => {
+            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSpot, &response);
+            let mul_map = MUL_MAP.lock().await;
+
+            for trade in trades.iter_mut() {
+                // 真实交易量处理,因为gate的量都是张数
+                let mul = mul_map[trade.symbol.as_str()];
+                let mut real_size = trade.size * mul * trade.price;
+                real_size.rescale(2);
+                trade.size = real_size;
+
+                // 更新到本地数据库
+                let trades_map = TRADES_MAP.lock().await;
+                update_trade(trade, trades_map, EXCHANGE_NAME).await;
+            }
+        }
+        // k线数据
+        "spot.candlesticks" => {
+            let mut records = ExchangeStructHandler::records_handle(ExchangeEnum::GateSpot, &response);
+
+            let mul_map = MUL_MAP.lock().await;
+            for record in records.iter_mut() {
+                // 真实交易量处理,因为gate的量都是张数
+                let mul = mul_map[record.symbol.as_str()];
+                let mid_price = (record.high + record.low) * dec!(0.5);
+                let mut real_volume = record.volume * mul * mid_price;
+                real_volume.rescale(2);
+                record.volume = real_volume;
+
+                // 更新到本地数据库
+                let record_map = RECORD_MAP.lock().await;
+                update_record(record, record_map, EXCHANGE_NAME).await;
+            }
+        }
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+}

+ 6 - 6
src/gate_usdt_spot_data_listener.rs

@@ -40,9 +40,9 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
         let mut mul_map = MUL_MAP.lock().await;
         for symbol_info in symbol_infos {
             let trade_status = symbol_info["trade_status"].as_str().unwrap();
-            // let quote = symbol_info["quote"].as_str().unwrap();
-            // if trade_status != "tradable" || quote != "USDT" { continue; };
-            if trade_status != "tradable" { continue; };
+            let quote = symbol_info["quote"].as_str().unwrap();
+            if trade_status != "tradable" || quote != "USDT" { continue; };
+            // if trade_status != "tradable" { continue; };
             // quanto_multiplier是ct_val
             let symbol = symbol_info["id"].as_str().unwrap().to_string();
             mul_map.insert(symbol.clone(), Decimal::ONE);
@@ -57,10 +57,10 @@ pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
         let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
 
         tokio::spawn(async move {
-            let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate("usdt".to_string()));
+            let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
             ws.set_subscribe(vec![
-                GateSpotSubscribeType::PuFuturesTrades,
-                GateSpotSubscribeType::PuFuturesCandlesticks,
+                GateSpotSubscribeType::PuSpotTrades,
+                GateSpotSubscribeType::PuSpotCandlesticks,
                 // GateSpotSubscribeType::PuFuturesOrderBook,
             ]);
 

+ 3 - 1
src/main.rs

@@ -17,6 +17,7 @@ mod phemex_usdt_swap_data_listener;
 mod woo_usdt_swap_data_listener;
 mod cointr_usdt_swap_data_listener;
 mod gate_usdt_spot_data_listener;
+mod gate_coin_spot_data_listener;
 mod bybit_usdt_swap_data_listener;
 
 use std::sync::Arc;
@@ -50,12 +51,13 @@ async fn main() {
     // woo_usdt_swap_data_listener::run_listener(running.clone()).await;
     // cointr_usdt_swap_data_listener::run_listener(running.clone()).await;
     // htx_usdt_swap_data_listener::run_listener(running.clone()).await;
+    gate_usdt_spot_data_listener::run_listener(running.clone()).await;
 
     binance_usdt_swap_data_listener::run_listener(running.clone()).await;
     gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
     phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_usdt_spot_data_listener::run_listener(running.clone()).await;
+    gate_coin_spot_data_listener::run_listener(running.clone()).await;
     mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;

+ 2 - 1
src/server.rs

@@ -176,13 +176,14 @@ async fn get_exchanges() -> impl Responder {
         // "coinsph_usdt_swap"
         // "woo_usdt_swap",
         // "cointr_usdt_swap",
+        // "gate_usdt_spot",
 
         "gate_usdt_swap",
         "binance_usdt_swap",
         "coinex_usdt_swap",
         "phemex_usdt_swap",
         "mexc_usdt_swap",
-        "gate_usdt_spot",
+        "gate_coin_spot",
         "bybit_usdt_swap",
         "bitget_usdt_swap",
     ];