Prechádzať zdrojové kódy

终于可以开始通讯了……

skyffire 6 mesiacov pred
rodič
commit
7681b276ac

+ 64 - 14
src/exchange/mexc_spot_ws.rs

@@ -12,7 +12,7 @@ use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{error, info, trace};
 
-use crate::exchange::response_base::ResponseData;
+use crate::exchange::response_base::Response;
 use crate::exchange::socket_tool::AbstractWsMode;
 
 //类型
@@ -36,7 +36,17 @@ pub struct MexcSpotWsParam {
 pub enum MexcSpotWsSubscribeType {
     // 深度
     PuFuturesDepth,
-    // K线数据
+    // K线数据,Min -> 分钟; Hour -> 小时; Day -> 天; Week -> 周, M -> 月
+    // Min1
+    // Min5
+    // Min15
+    // Min30
+    // Min60
+    // Hour4
+    // Hour8
+    // Day1
+    // Week1
+    // Month1
     PuFuturesRecords(String),
 }
 
@@ -167,7 +177,7 @@ impl MexcSpotWs {
                                              _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
                                              write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
         where
-            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            F: Fn(Response) -> Future + Clone + Send + 'static + Sync,
             Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
     {
         let login_is = self.contains_pr();
@@ -213,20 +223,20 @@ impl MexcSpotWs {
     }
 
     //数据解析-Text
-    pub fn message_text(text: String) -> Option<ResponseData> {
+    pub fn message_text(text: String) -> Option<Response> {
         let response_data = Self::ok_text(text);
         Option::from(response_data)
     }
     //数据解析-ping
-    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null))
+    pub fn message_ping(_pi: Vec<u8>) -> Option<Response> {
+        Option::from(Response::new("".to_string(), -300, "success".to_string(), Value::Null))
     }
     //数据解析-pong
-    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null))
+    pub fn message_pong(_po: Vec<u8>) -> Option<Response> {
+        Option::from(Response::new("".to_string(), -301, "success".to_string(), Value::Null))
     }
     //数据解析-二进制
-    pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
+    pub fn message_binary(po: Vec<u8>) -> Option<Response> {
         //二进制WebSocket消息
         // let message_str = format!("Binary:{:?}", _po);
         // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
@@ -245,23 +255,23 @@ impl MexcSpotWs {
                     return Option::from(response_data);
                 }
                 Err(_) => {
-                    return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+                    return Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
                 }
             }
         } else {
-            return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+            return Option::from(Response::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
         }
     }
     //数据解析
-    pub fn ok_text(text: String) -> ResponseData
+    pub fn ok_text(text: String) -> Response
     {
-        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let mut res_data = Response::new("".to_string(), 200, "success".to_string(), Value::Null);
         let json_value: Value = serde_json::from_str(&text).unwrap();
 
         match json_value["channel"].as_str() {
             Some(method) => {
                 if method.contains("pong") {
-                    return ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null);
+                    return Response::new("".to_string(), -301, "success".to_string(), Value::Null);
                 } else if method.contains("rs.sub.") {
                     //订阅响应
                     let data = json_value["data"].as_str().unwrap();
@@ -306,3 +316,43 @@ impl MexcSpotWs {
         res_data
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+    use std::sync::atomic::AtomicBool;
+    use tokio::sync::Mutex;
+    use tokio_tungstenite::tungstenite::Message;
+    use tracing::info;
+    use crate::exchange::mexc_spot_ws::{MexcSpotWs, MexcSpotWsSubscribeType, MexcSpotWsType};
+    use crate::exchange::response_base::Response;
+    use crate::utils::log_setup::setup_logging;
+
+    #[tokio::test]
+    async fn test_mexc_spot_ws() {
+        let ws_running = Arc::new(AtomicBool::new(true));
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
+        let _guard = setup_logging().unwrap();
+
+        let mut ws = MexcSpotWs::new_with_tag("Mexc".to_string(), None, MexcSpotWsType::PublicAndPrivate);
+
+        ws.set_subscribe(vec![
+            MexcSpotWsSubscribeType::PuFuturesRecords("Min1".to_string())
+        ]);
+
+        ws.set_symbols(vec!["BTCUSDT".to_string()]);
+
+        let fun = move |response: Response| {
+            info!("{}", serde_json::to_string_pretty(&response.data).unwrap());
+
+            async move {}
+        };
+
+        // 链接
+        info!("开始链接");
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        ws.ws_connect_async(ws_running, fun, &write_tx_am, write_rx)
+            .await
+            .expect("链接失败");
+    }
+}

+ 6 - 6
src/exchange/response_base.rs

@@ -3,7 +3,7 @@ use tokio::time::Instant;
 
 /**交易所返回数据处理之后,统一保存格式,为了内部其他接口调用*/
 #[derive(Debug, Clone)]
-pub struct ResponseData {
+pub struct Response {
     pub label: String,
     pub code: i16,
     pub message: String,
@@ -15,9 +15,9 @@ pub struct ResponseData {
     pub data_type: String       // 數據類型, 例如 bybit 深度信息:snapshot(全量),delta(增量)
 }
 
-impl ResponseData {
-    pub fn new(label: String, code: i16, message: String, data: Value) -> ResponseData {
-        ResponseData {
+impl Response {
+    pub fn new(label: String, code: i16, message: String, data: Value) -> Response {
+        Response {
             label,
             code,
             message,
@@ -29,8 +29,8 @@ impl ResponseData {
             ins: Instant::now(),
         }
     }
-    pub fn error(label: String, message: String) -> ResponseData {
-        ResponseData {
+    pub fn error(label: String, message: String) -> Response {
+        Response {
             label,
             code: -1,
             message: format!("{}", &message),

+ 19 - 19
src/exchange/socket_tool.rs

@@ -18,7 +18,7 @@ use tracing::{error, info, trace};
 
 use crate::exchange::proxy;
 use crate::exchange::proxy::{ProxyEnum, ProxyResponseEnum};
-use crate::exchange::response_base::ResponseData;
+use crate::exchange::response_base::Response;
 
 #[derive(Debug)]
 pub enum HeartbeatType {
@@ -41,11 +41,11 @@ impl AbstractWsMode {
                                                        message_ping: PI,
                                                        message_pong: PO,
                                                        message_binary: B)
-        where T: Fn(String) -> Option<ResponseData> + Copy,
-              PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
-              PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
-              F: Fn(ResponseData) -> Future + Clone,
-              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+        where T: Fn(String) -> Option<Response> + Copy,
+              PI: Fn(Vec<u8>) -> Option<Response> + Copy,
+              PO: Fn(Vec<u8>) -> Option<Response> + Copy,
+              F: Fn(Response) -> Future + Clone,
+              B: Fn(Vec<u8>) -> Option<Response> + Copy,
               Future: future::Future<Output=()> + Send + 'static,
     {
         let (ws_write, mut ws_read) = ws_stream.split();
@@ -177,11 +177,11 @@ impl AbstractWsMode {
                                                            message_ping: PI,
                                                            message_pong: PO,
                                                            message_binary: B)
-        where T: Fn(String) -> Option<ResponseData> + Copy,
-              PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
-              PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
-              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
-              F: Fn(ResponseData) -> Future + Clone,
+        where T: Fn(String) -> Option<Response> + Copy,
+              PI: Fn(Vec<u8>) -> Option<Response> + Copy,
+              PO: Fn(Vec<u8>) -> Option<Response> + Copy,
+              B: Fn(Vec<u8>) -> Option<Response> + Copy,
+              F: Fn(Response) -> Future + Clone,
               Future: future::Future<Output=()> + Send + 'static,
     {
         //1.是否走代理
@@ -264,11 +264,11 @@ impl AbstractWsMode {
                                           message_text: T,
                                           message_ping: PI,
                                           message_pong: PO,
-                                          message_binary: B) -> Option<ResponseData>
-        where T: Fn(String) -> Option<ResponseData>,
-              PI: Fn(Vec<u8>) -> Option<ResponseData>,
-              PO: Fn(Vec<u8>) -> Option<ResponseData>,
-              B: Fn(Vec<u8>) -> Option<ResponseData>
+                                          message_binary: B) -> Option<Response>
+        where T: Fn(String) -> Option<Response>,
+              PI: Fn(Vec<u8>) -> Option<Response>,
+              PO: Fn(Vec<u8>) -> Option<Response>,
+              B: Fn(Vec<u8>) -> Option<Response>
     {
         match message {
             Ok(Message::Text(text)) => message_text(text),
@@ -278,18 +278,18 @@ impl AbstractWsMode {
             Ok(Message::Close(c)) => {
                 let message_str = format!("关闭指令:{:?}", c);
                 trace!("{:?}",message_str);
-                Option::from(ResponseData::new("".to_string(), 0, message_str, Value::Null))
+                Option::from(Response::new("".to_string(), 0, message_str, Value::Null))
             }
             Ok(Message::Frame(f)) => {
                 //原始帧 正常读取数据不会读取到该 信息类型
                 let message_str = format!("意外读取到原始帧:{:?}", f);
                 trace!("{:?}",message_str);
-                Option::from(ResponseData::new("".to_string(), -2, message_str, Value::Null))
+                Option::from(Response::new("".to_string(), -2, message_str, Value::Null))
             }
             Err(e) => {
                 let message_str = format!("服务器响应:{:?}", e);
                 trace!("{:?}",message_str);
-                Option::from(ResponseData::new("".to_string(), -1, message_str, Value::Null))
+                Option::from(Response::new("".to_string(), -1, message_str, Value::Null))
             }
         }
     }