Преглед на файлове

添加gate币本位现货

DESKTOP-NE65RNK\Citrus_limon преди 1 година
родител
ревизия
e8949c373c

+ 297 - 0
exchanges/src/gate_spot_rest.rs

@@ -0,0 +1,297 @@
+use std::collections::BTreeMap;
+use reqwest::header::HeaderMap;
+use ring::{digest};
+use hex;
+use hmac::{Hmac, Mac, NewMac};
+use reqwest::Client;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use rust_decimal_macros::dec;
+use serde_json::Value;
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+use sha2::Sha512;
+use tracing::{error, info};
+
+#[derive(Clone)]
+pub struct GateSpotRest {
+    tag: String,
+    base_url: String,
+    client: reqwest::Client,
+    /*******参数*/
+    //登录所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+}
+
+impl GateSpotRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: BTreeMap<String, String>) -> GateSpotRest
+    {
+        return GateSpotRest::new_with_tag("default-GateSwapRest".to_string(), is_colo, login_param);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: BTreeMap<String, String>) -> GateSpotRest
+    {
+        let base_url = if is_colo {
+            let url = "https://apiv4-private.gateapi.io".to_string();
+            info!("开启高速通道:{:?}",url);
+            url
+        } else {
+            let url = "https://api.gateio.ws".to_string();
+            info!("走普通通道:{}",url);
+            url
+        };
+
+
+        if is_colo {} else {}
+        /*****返回结构体*******/
+        GateSpotRest {
+            tag,
+            base_url: base_url.to_string(),
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    //获取服务器当前时间
+    pub async fn get_server_time(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+         });
+        let data = self.request("GET".to_string(),
+                                "/api/v4".to_string(),
+                                "/spot/time".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+    //查询所有的合约信息
+    pub async fn get_market_details(&mut self, params: Value) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/api/v4".to_string(),
+                                "/spot/currency_pairs".to_string(),
+                                false,
+                                params.to_string(),
+        ).await;
+        data
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    fn get_delay_info(&mut self) {
+        let last_100 = if self.delays.len() > 100 {
+            self.delays[self.delays.len() - 100..].to_vec()
+        } else {
+            self.delays.clone()
+        };
+
+        let max_value = last_100.iter().max().unwrap();
+        if max_value.clone().to_owned() > self.max_delay {
+            self.max_delay = max_value.clone().to_owned();
+        }
+
+        let sum: i64 = last_100.iter().sum();
+        let sum_v = Decimal::from_i64(sum).unwrap();
+        let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+        self.avg_delay = (sum_v / len_v).round_dp(1);
+        self.delays = last_100.clone().into_iter().collect();
+    }
+
+    //调用请求
+    async fn request(&mut self,
+                     requesst_type: String,
+                     prefix_url: String,
+                     request_url: String,
+                     is_login: bool,
+                     params: String) -> ResponseData
+    {
+        // trace!("login_param:{:?}", self.login_param);
+        //解析账号信息
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        if self.login_param.contains_key("access_key") {
+            access_key = self.login_param.get("access_key").unwrap().to_string();
+        }
+        if self.login_param.contains_key("secret_key") {
+            secret_key = self.login_param.get("secret_key").unwrap().to_string();
+        }
+        let mut is_login_param = true;
+        if access_key == "" || secret_key == "" {
+            is_login_param = false
+        }
+
+        //请求头配置-如果需要登录则存在额外配置
+        let mut body = "".to_string();
+        let timestamp = chrono::Utc::now().timestamp().to_string();
+
+        let mut headers = HeaderMap::new();
+        if requesst_type == "GET" {
+            headers.insert("Content-type", "application/x-www-form-urlencoded".parse().unwrap());
+            headers.insert("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ".parse().unwrap());
+        } else {
+            headers.insert("Accept", "application/json".parse().unwrap());
+            headers.insert("Content-Type", "application/json".parse().unwrap());
+        }
+
+        if requesst_type == "POST" {
+            body = params.clone();
+        }
+
+        //是否需要登录-- 组装sing
+        if is_login {
+            if !is_login_param {
+                let e = ResponseData::error(self.tag.clone(), "登录参数错误!".to_string());
+                return e;
+            } else { //需要登录-且登录参数齐全
+                //组装sing
+                let sing = Self::sign(secret_key.clone(),
+                                      requesst_type.clone(),
+                                      prefix_url.clone(),
+                                      request_url.clone(),
+                                      params.clone(),
+                                      body.clone(),
+                                      timestamp.clone(),
+                );
+                // trace!("sing:{}", sing);
+                //组装header
+                headers.extend(Self::headers(access_key, timestamp, sing));
+            }
+        }
+
+        // trace!("headers:{:?}", headers);
+        let base_url = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response = self.http_tool(
+            base_url.clone(),
+            requesst_type.to_string(),
+            params.clone(),
+            headers,
+        ).await;
+
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        self.get_delay_info();
+
+        response
+    }
+
+    pub fn headers(access_key: String, timestamp: String, sign: String) -> HeaderMap {
+        let mut headers = HeaderMap::new();
+        headers.insert("KEY", access_key.clone().parse().unwrap());
+        headers.insert("Timestamp", timestamp.clone().parse().unwrap());
+        headers.insert("SIGN", sign.clone().parse().unwrap());
+        headers
+    }
+    pub fn sign(secret_key: String,
+                requesst_type: String, prefix_url: String, request_url: String,
+                params: String, body_data: String, timestamp: String) -> String
+    {
+        let url = format!("{}{}", prefix_url, request_url);
+        let params_str = RestTool::parse_params_to_str(params);
+        let body = Some(body_data);
+        let hashed_payload = if let Some(body) = body {
+            let mut m = digest::Context::new(&digest::SHA512);
+            m.update(body.as_bytes());
+            hex::encode(m.finish().as_ref())
+        } else {
+            String::new()
+        };
+        // trace!("hashed_payload:{}", hashed_payload);
+
+        let message = format!("{}\n{}\n{}\n{}\n{}",
+                              requesst_type,
+                              url,
+                              params_str,
+                              hashed_payload,
+                              timestamp);
+        // trace!("**********", );
+        // trace!("组装数据:{}", message);
+        // trace!("**********", );
+
+        let mut mac = Hmac::<Sha512>::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC");
+        mac.update(message.as_bytes());
+        let result = mac.finalize().into_bytes();
+        let sign = hex::encode(result);
+        sign
+    }
+
+
+    async fn http_tool(&mut self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+        let addrs_url = format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()));
+
+        let request_builder = match request_type.as_str() {
+            "GET" => self.client.get(addrs_url.clone()).headers(headers),
+            "POST" => self.client.post(addrs_url.clone()).body(params.clone()).headers(headers),
+            "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+        // 读取响应的内容
+        let response = request_builder.send().await.unwrap();
+        let is_success = response.status().is_success(); // 先检查状态码
+        let text = response.text().await.unwrap();
+        return if is_success {
+            self.on_success_data(&text)
+        } else {
+            self.on_error_data(&text, &addrs_url, &params)
+        };
+    }
+
+    pub fn on_success_data(&mut self, text: &String) -> ResponseData {
+        let data = serde_json::from_str(text.as_str()).unwrap();
+
+        ResponseData::new(self.tag.clone(), 200, "success".to_string(), data)
+    }
+
+    pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text);
+
+        match json_value {
+            Ok(data) => {
+                let message;
+
+                if !data["message"].is_null() {
+                    message = format!("{}:{}", data["tag"].as_str().unwrap(), data["message"].as_str().unwrap());
+                } else {
+                    message = data["tag"].to_string();
+                }
+
+                let mut error = ResponseData::error(self.tag.clone(), message);
+                error.message = format!("请求地址:{}, 请求参数:{}, 报错内容:{}。", base_url, params, error.message);
+                error
+            }
+            Err(e) => {
+                error!("解析错误:{:?}", e);
+                let error = ResponseData::error("".to_string(),
+                                                format!("json 解析失败:{},相关参数:{}", e, text));
+                error
+            }
+        }
+    }
+}

+ 308 - 0
exchanges/src/gate_spot_ws.rs

@@ -0,0 +1,308 @@
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use chrono::Utc;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use hex;
+use hmac::{Hmac, Mac, NewMac};
+use rust_decimal::prelude::ToPrimitive;
+use serde_json::{json, Value};
+use sha2::Sha512;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+//类型
+pub enum GateSpotWsType {
+    PublicAndPrivate,
+}
+
+
+//订阅频道
+#[derive(Clone)]
+pub enum GateSpotSubscribeType {
+    PuSpotOrderBook,
+    PuSpotCandlesticks,
+    PuSpotTrades,
+    PuSpotBookTicker,
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct GateSpotLogin {
+    pub api_key: String,
+    pub secret: String,
+}
+
+
+#[derive(Clone)]
+pub struct GateSpotWs {
+    //类型
+    tag: String,
+    //地址
+    address_url: String,
+    //账号信息
+    login_param: Option<GateSpotLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<GateSpotSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl GateSpotWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<GateSpotLogin>, ws_type: GateSpotWsType) -> GateSpotWs {
+        return GateSpotWs::new_with_tag("default-GateSpotWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<GateSpotLogin>, ws_type: GateSpotWsType) -> GateSpotWs
+    {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            GateSpotWsType::PublicAndPrivate => {
+                if is_colo {
+                    let url = format!("wss://fxws-private.gateapi.io/v4/ws/");
+                    info!("开启高速通道:{:?}",url);
+                    url
+                } else {
+                    let url = format!("wss://api.gateio.ws/ws/v4/");
+                    info!("走普通通道:{}",url);
+                    url
+                }
+            }
+        };
+
+
+        GateSpotWs {
+            tag,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 10,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<GateSpotSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("-", "_");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                GateSpotSubscribeType::PuSpotOrderBook => false,
+                GateSpotSubscribeType::PuSpotCandlesticks => false,
+                GateSpotSubscribeType::PuSpotTrades => false,
+                GateSpotSubscribeType::PuSpotBookTicker => false,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: GateSpotSubscribeType, login_param: Option<GateSpotLogin>) -> Value {
+        let time = chrono::Utc::now().timestamp();
+        let mut _access_key = "".to_string();
+        let mut _secret_key = "".to_string();
+        match login_param {
+            None => {}
+            Some(param) => {
+                _access_key = param.api_key.clone();
+                _secret_key = param.secret.clone();
+            }
+        }
+
+        match subscribe_type {
+            GateSpotSubscribeType::PuSpotOrderBook => {
+                json!({
+                    "time": time,
+                    "channel": "spot.order_book",
+                    "event": "subscribe",
+                    "payload": [symbol, "20", "100ms"]
+                })
+            }
+            GateSpotSubscribeType::PuSpotBookTicker => {
+                json!({
+                    "time": time,
+                    "channel": "spot.book_ticker",
+                    "event": "subscribe",
+                    "payload": [symbol]
+                })
+            }
+            GateSpotSubscribeType::PuSpotCandlesticks => {
+                json!({
+                    "time": time,
+                    "channel": "spot.candlesticks",
+                    "event": "subscribe",
+                    "payload":  ["1m", symbol]
+                })
+            }
+            GateSpotSubscribeType::PuSpotTrades => {
+                json!({
+                    "time": time,
+                    "channel": "spot.trades",
+                    "event": "subscribe",
+                    "payload": [symbol]
+                })
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Vec<Value> {
+        let mut args = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(),
+                                                  subscribe_type.clone(),
+                                                  self.login_param.clone(),
+                );
+                args.push(ty_str);
+            }
+        }
+        args
+    }
+    //生成签名
+    fn _sign(secret_key: String, channel: String, event: String, time: String) -> String {
+        let message = format!("channel={}&event={}&time={}", channel, event, time);
+        let mut mac = Hmac::<Sha512>::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC");
+        mac.update(message.as_bytes());
+        let result = mac.finalize().into_bytes();
+        let sign = hex::encode(result);
+        sign
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+    where
+        F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+        Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let tag = self.tag.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
+        let timestamp = Utc::now().timestamp();
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            let ping_str = json!({
+                "time" : timestamp,
+                "channel" : "spot.ping",
+            });
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        if login_is {
+            //登录相关
+        }
+        for s in subscription {
+            subscribe_array.push(s.to_string());
+        }
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("gate_usdt_swap socket 连接中……");
+
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
+
+                error!("gate_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
+    }
+    //数据解析-二进制
+    pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        let message_str = format!("Binary:{:?}", _po);
+        Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        trace!("原始数据:{}", text);
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        if json_value["channel"].as_str() == Option::from("spot.pong") {
+            res_data.code = -301;
+            res_data.message = "success".to_string();
+        } else if json_value.get("error").is_some() {
+            let message = json_value["error"]["message"].as_str().unwrap().to_string();
+            let mes = message.trim_end_matches('\n');
+
+            res_data.code = json_value["error"]["code"].as_i64().unwrap().to_i16().unwrap();
+            res_data.message = mes.to_string();
+        } else if json_value["result"]["status"].as_str() == Option::from("success") {//订阅返回
+            res_data.code = -201;
+            res_data.data = json_value;
+        } else {
+            res_data.channel = format!("{}", json_value["channel"].as_str().unwrap());
+            res_data.code = 200;
+            res_data.data = json_value["result"].clone();
+        }
+        res_data
+    }
+}

+ 2 - 0
exchanges/src/lib.rs

@@ -17,3 +17,5 @@ pub mod bybit_swap_rest;
 pub mod bybit_swap_ws;
 pub mod bitget_swap_rest;
 pub mod bitget_swap_ws;
+pub mod gate_spot_rest;
+pub mod gate_spot_ws;

+ 116 - 0
src/gate_coin_spot_data_listener.rs

@@ -0,0 +1,116 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use chrono::Utc;
+use lazy_static::lazy_static;
+use tokio::sync::{Mutex};
+use tracing::info;
+use exchanges::gate_spot_rest::GateSpotRest;
+use exchanges::gate_spot_ws::{GateSpotSubscribeType, GateSpotWs, GateSpotWsType};
+use exchanges::response_base::ResponseData;
+use rust_decimal_macros::dec;
+use serde_json::json;
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::json_db_utils::{collect_special_trades_json, delete_db_by_exchange};
+use crate::listener_tools::{TradeMap, update_trade};
+use crate::msv::{generate_msv_by_trades, Indicators, parse_json_to_trades};
+
+const EXCHANGE_NAME: &str = "gate_coin_spot";
+
+lazy_static! {
+    // 给其他模块使用
+    pub static ref INDICATOR_MAP: Mutex<HashMap<String, Indicators>> = Mutex::new(HashMap::new());
+
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "gate_coin_spot_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut gate_rest = GateSpotRest::new(false, login);
+    let params = json!({});
+    let response = gate_rest.get_market_details(params).await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let symbol_infos = response.data.as_array().unwrap();
+        for symbol_info in symbol_infos {
+            let trade_status = symbol_info["trade_status"].as_str().unwrap();
+            let quote = symbol_info["quote"].as_str().unwrap();
+            if trade_status != "tradable" || quote == "USDT" { continue; };
+            let symbol = symbol_info["id"].as_str().unwrap().to_string();
+            symbols.push(symbol)
+        }
+    }
+    for chunk in symbols.chunks(20) {
+        let ws_name = name.to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = GateSpotWs::new_with_tag(ws_name, false, None, GateSpotWsType::PublicAndPrivate);
+            ws.set_subscribe(vec![
+                GateSpotSubscribeType::PuSpotTrades,
+                // GateSpotSubscribeType::PuSpotCandlesticks,
+                // GateSpotSubscribeType::PuFuturesOrderBook,
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+    // 每分钟计算msv
+    tokio::spawn(async move {
+        loop {
+            let end_timestamp = Utc::now().timestamp_millis();
+            let start_timestamp = end_timestamp - 60 * 1000 * 60 * 2;
+            for symbol in symbols.clone() {
+                let trades_value = collect_special_trades_json(start_timestamp, end_timestamp, EXCHANGE_NAME, &symbol).await;
+                let trades = parse_json_to_trades(trades_value);
+                let msv = generate_msv_by_trades(trades, dec!(50), vec![], start_timestamp, end_timestamp);
+                let mut indicator_map = INDICATOR_MAP.lock().await;
+                indicator_map.insert(symbol, msv);
+            }
+            tokio::time::sleep(Duration::from_secs(60)).await;
+        }
+    });
+    // 定时删除数据
+    tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_secs(1800)).await;
+            delete_db_by_exchange(EXCHANGE_NAME, "trades", 5 * 60).await;
+        }
+    });
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 订单流数据
+        "spot.trades" => {
+            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSpot, &response);
+            for trade in trades.iter_mut() {
+                // 真实交易量处理,因为gate的量都是张数
+                let mut real_size = trade.size * trade.price;
+                real_size.rescale(2);
+                trade.size = real_size;
+
+                // 更新到本地数据库
+                let trades_map = TRADES_MAP.lock().await;
+                update_trade(trade, trades_map, EXCHANGE_NAME).await;
+            }
+        }
+        _ => {
+            info!("48 未知的数据类型: {:?}", response)
+        }
+    }
+}

+ 2 - 0
src/main.rs

@@ -9,6 +9,7 @@ mod phemex_usdt_swap_data_listener;
 mod mexc_usdt_swap_data_listener;
 mod bybit_usdt_swap_data_listener;
 mod bitget_usdt_swap_data_listener;
+mod gate_coin_spot_data_listener;
 mod msv;
 mod rank;
 
@@ -42,6 +43,7 @@ async fn main() {
     mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
     bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
+    gate_coin_spot_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 5 - 1
src/server.rs

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tracing::{info};
 use actix_cors::Cors;
-use crate::{binance_usdt_swap_data_listener, bitget_usdt_swap_data_listener, bybit_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
+use crate::{binance_usdt_swap_data_listener, bitget_usdt_swap_data_listener, bybit_usdt_swap_data_listener, coinex_usdt_swap_data_listener, gate_coin_spot_data_listener, gate_usdt_swap_data_listener, mexc_usdt_swap_data_listener, phemex_usdt_swap_data_listener, rank};
 
 // 定义用于反序列化查询参数的结构体
 #[derive(Serialize, Deserialize, Clone)]
@@ -68,6 +68,9 @@ async fn get_rank_list(query: web::Query<RankQuery>) -> impl Responder {
         "bitget_usdt_swap" => {
             bitget_usdt_swap_data_listener::INDICATOR_MAP.lock().await
         },
+        "gate_coin_spot" => {
+            gate_coin_spot_data_listener::INDICATOR_MAP.lock().await
+        },
         _ => {
             let response = Response {
                 query: serde_json::to_value(&query.into_inner()).unwrap(),
@@ -116,6 +119,7 @@ async fn get_exchanges() -> impl Responder {
         "mexc_usdt_swap",
         "bybit_usdt_swap",
         "bitget_usdt_swap",
+        "gate_coin_spot"
     ];
     let response_data = json!(exchanges);
 

+ 5 - 0
standard/src/exchange.rs

@@ -6,6 +6,7 @@ use crate::binance_swap::BinanceSwap;
 use crate::bitget_swap::BitgetSwap;
 use crate::bybit_swap::BybitSwap;
 use crate::coinex_swap::CoinexSwap;
+use crate::gate_spot::GateSpot;
 use crate::gate_swap::GateSwap;
 use crate::mexc_swap::MexcSwap;
 use crate::phemex_swap::PhemexSwap;
@@ -25,6 +26,7 @@ pub enum ExchangeEnum {
     MexcSwap,
     BybitSwap,
     BitgetSwap,
+    GateSpot,
 }
 
 /// Exchange结构体
@@ -90,6 +92,9 @@ impl Exchange {
             ExchangeEnum::BitgetSwap => {
                 Box::new(BitgetSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
+            ExchangeEnum::GateSpot => {
+                Box::new(GateSpot::new(symbol, is_colo, params, order_sender, error_sender).await)
+            }
         }
     }
 }

+ 4 - 1
standard/src/exchange_struct_handler.rs

@@ -1,6 +1,6 @@
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, bitget_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_swap_handle, mexc_swap_handle, phemex_swap_handle};
+use crate::{binance_swap_handle, bitget_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_spot_handle, gate_swap_handle, mexc_swap_handle, phemex_swap_handle};
 use crate::{Trade};
 
 #[allow(dead_code)]
@@ -32,6 +32,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::BitgetSwap => {
                 bitget_swap_handle::format_trade_items(&res_data)
             }
+            ExchangeEnum::GateSpot => {
+                gate_spot_handle::format_trade_items(&res_data)
+            }
         }
     }
 }

+ 172 - 0
standard/src/gate_spot.rs

@@ -0,0 +1,172 @@
+use std::collections::BTreeMap;
+use std::io::{Error, ErrorKind};
+use std::str::FromStr;
+use tokio::sync::mpsc::Sender;
+use async_trait::async_trait;
+use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
+use tracing::{error, warn};
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order};
+use exchanges::gate_spot_rest::GateSpotRest;
+use rust_decimal::prelude::FromPrimitive;
+use serde_json::{json, Value};
+
+#[allow(dead_code)]
+#[derive(Clone)]
+pub struct GateSpot {
+    exchange: ExchangeEnum,
+    symbol: String,
+    is_colo: bool,
+    params: BTreeMap<String, String>,
+    request: GateSpotRest,
+    market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
+}
+
+impl GateSpot {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> GateSpot {
+        let market = Market::new();
+        let mut gate_spot = GateSpot {
+            exchange: ExchangeEnum::GateSpot,
+            symbol: symbol.to_uppercase(),
+            is_colo,
+            params: params.clone(),
+            request: GateSpotRest::new(is_colo, params.clone()),
+            market,
+            order_sender,
+            error_sender,
+        };
+        gate_spot.market = GateSpot::get_market(&mut gate_spot).await.unwrap_or(gate_spot.market);
+        return gate_spot;
+    }
+}
+
+#[async_trait]
+impl Platform for GateSpot {
+    // 克隆方法
+    fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
+    // 获取交易所模式
+    fn get_self_exchange(&self) -> ExchangeEnum {
+        ExchangeEnum::GateSpot
+    }
+    // 获取交易对
+    fn get_self_symbol(&self) -> String { self.symbol.clone() }
+    // 获取是否使用高速通道
+    fn get_self_is_colo(&self) -> bool {
+        self.is_colo
+    }
+    // 获取params信息
+    fn get_self_params(&self) -> BTreeMap<String, String> {
+        self.params.clone()
+    }
+
+    fn get_self_market(&self) -> Market {
+        warn!("gate_spot:该交易所方法未实现");
+        return Market::new();
+    }
+
+    fn get_request_delays(&self) -> Vec<i64> {
+        warn!("gate_spot:该交易所方法未实现");
+        return vec![];
+    }
+
+    fn get_request_avg_delay(&self) -> Decimal {
+        warn!("gate_spot:该交易所方法未实现");
+        return dec!(0);
+    }
+
+    fn get_request_max_delay(&self) -> i64 {
+        warn!("gate_spot:该交易所方法未实现");
+        return 0;
+    }
+
+    async fn get_server_time(&mut self) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_account(&mut self) -> Result<Account, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_position(&mut self) -> Result<Vec<Position>, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_positions(&mut self) -> Result<Vec<Position>, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_ticker(&mut self) -> Result<Ticker, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_ticker_symbol(&mut self, _symbol: String) -> Result<Ticker, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_market(&mut self) -> Result<Market, Error> {
+        let params = json!({});
+        let res_data = self.request.get_market_details(params).await;
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let market_info = res_data_json.iter().find(|item| item["id"].as_str().unwrap() == self.symbol);
+            match market_info {
+                None => {
+                    error!("gate_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let symbol = value["id"].as_str().unwrap().to_string();
+                    let base_asset = value["base"].as_str().unwrap().to_string();
+                    let quote_asset = value["quote"].as_str().unwrap().to_string();
+                    let tick_size = dec!(-1);
+                    let amount_size = dec!(-1);
+                    let price_precision = Decimal::from_f64(value["precision"].as_f64().unwrap()).unwrap();
+                    let amount_precision = Decimal::from_f64(value["amount_precision"].as_f64().unwrap()).unwrap();
+                    let min_qty = Decimal::from_str(value["min_base_amount"].as_str().unwrap()).unwrap();
+                    let max_qty = Decimal::from_str(value["max_base_amount"].as_str().unwrap()).unwrap();
+                    let min_notional = dec!(-1);
+                    let max_notional = dec!(-1);
+                    let ct_val = Decimal::ONE;
+
+                    let result = Market {
+                        symbol,
+                        base_asset,
+                        quote_asset,
+                        tick_size,
+                        amount_size,
+                        price_precision,
+                        amount_precision,
+                        min_qty,
+                        max_qty,
+                        min_notional,
+                        max_notional,
+                        ct_val,
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market_symbol(&mut self, _symbol: String) -> Result<Market, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_order_detail(&mut self, _order_id: &str, _custom_id: &str) -> Result<Order, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn take_order(&mut self, _custom_id: &str, _origin_side: &str, _price: Decimal, _amount: Decimal) -> Result<Order, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn take_order_symbol(&mut self, _symbol: String, _ct_val: Decimal, _custom_id: &str, _origin_side: &str, _price: Decimal, _amount: Decimal) -> Result<Order, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn cancel_order(&mut self, _order_id: &str, _custom_id: &str) -> Result<Order, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn set_dual_mode(&mut self, _coin: &str, _is_dual_mode: bool) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn set_dual_leverage(&mut self, _leverage: &str) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<Value, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+
+    async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<Value, Error> { Err(Error::new(ErrorKind::NotFound, "gate_spot:该交易所方法未实现".to_string())) }
+}

+ 47 - 0
standard/src/gate_spot_handle.rs

@@ -0,0 +1,47 @@
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use serde_json::Value;
+use exchanges::response_base::ResponseData;
+use crate::{Trade, Record, OrderBook};
+
+pub fn handle_records(value: &Value) -> Vec<Record> {
+    let mut records = vec![];
+
+    let n = value["n"].as_str().unwrap().to_string();
+    let n_split: Vec<String> = n.split("_").map(|s| s.to_string()).collect();
+    let symbol = format!("{}_{}", n_split[n_split.len() - 2], n_split[n_split.len() - 1]);
+    records.push(Record {
+        time: Decimal::from_str(value["t"].as_str().unwrap()).unwrap() * Decimal::ONE_THOUSAND,
+        open: Decimal::from_str(value["o"].as_str().unwrap()).unwrap(),
+        high: Decimal::from_str(value["h"].as_str().unwrap()).unwrap(),
+        low: Decimal::from_str(value["l"].as_str().unwrap()).unwrap(),
+        close: Decimal::from_str(value["c"].as_str().unwrap()).unwrap(),
+        volume: Decimal::from_str(value["v"].as_str().unwrap()).unwrap(),
+        symbol,
+    });
+
+    return records;
+}
+pub fn format_depth_items(value: &Value) -> Vec<OrderBook> {
+    let mut depth_items: Vec<OrderBook> = vec![];
+    for value in value.as_array().unwrap() {
+        let info = value.as_array().unwrap();
+        depth_items.push(OrderBook {
+            price: Decimal::from_str(info[0].as_str().unwrap()).unwrap(),
+            amount: Decimal::from_str(info[1].as_str().unwrap()).unwrap(),
+        })
+    }
+    return depth_items;
+}
+pub fn format_trade_items(res_data: &ResponseData) -> Vec<Trade> {
+    let side = res_data.data["side"].as_str().unwrap();
+    let amount = Decimal::from_str(res_data.data["amount"].as_str().unwrap()).unwrap();
+    return vec![Trade {
+        id: res_data.data["id"].to_string(),
+        time: Decimal::from_str(res_data.data["create_time_ms"].as_str().unwrap()).unwrap().floor(),
+        size: if side == "buy" { amount } else { -amount },
+        price: Decimal::from_str(res_data.data["price"].as_str().unwrap().to_string().as_str()).unwrap(),
+        symbol: res_data.data["currency_pair"].as_str().unwrap().to_string(),
+    }];
+}
+

+ 2 - 0
standard/src/lib.rs

@@ -27,6 +27,8 @@ pub mod bybit_swap;
 pub mod bybit_swap_handle;
 pub mod bitget_swap;
 pub mod bitget_swap_handle;
+pub mod gate_spot;
+pub mod gate_spot_handle;
 
 /// 持仓模式枚举
 /// - `Both`:单持仓方向