skyffire před 1 rokem
rodič
revize
1b7e66eff0

+ 6 - 5
exchanges/src/binance_swap_ws.rs

@@ -2,7 +2,7 @@ use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
-use serde_json::json;
+use serde_json::{json, Value};
 use tokio::sync::Mutex;
 use tokio_tungstenite::tungstenite::{Error, Message};
 use tracing::{info, trace};
@@ -212,17 +212,17 @@ impl BinanceSwapWs {
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "pong".to_string()));
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), Value::String("pong".to_string())));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), Value::Null));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData {
         // trace!("原始数据");
         // trace!(?text);
-        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), Value::Null);
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
         if json_value.get("result").is_some() && json_value.get("id").is_some() &&
@@ -234,7 +234,8 @@ impl BinanceSwapWs {
             res_data.code = json_value["error"]["code"].to_string();
             res_data.message = json_value["error"]["msg"].to_string();
         } else if json_value.get("stream").is_some() {//订阅返回
-            res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
+            info!("{}", json_value.get("data").as_ref().unwrap());
+            // res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
             res_data.code = "200".to_string();
 
             let channel = format!("{}", json_value.get("stream").as_ref().unwrap());

+ 7 - 1
exchanges/src/gate_swap_rest.rs

@@ -487,9 +487,11 @@ impl GateSwapRest {
             params.clone(),
             headers,
         ).await;
+
         let time_array = chrono::Utc::now().timestamp_millis() - start_time;
         self.delays.push(time_array);
         self.get_delay_info();
+
         let res_data = Self::res_data_analysis(get_response, base_url, params);
         res_data
     }
@@ -569,6 +571,10 @@ impl GateSwapRest {
         Ok(res_data)
     }
 
+    pub fn on_error_data() -> ResponseData {
+
+    }
+
     //res_data 解析
     pub fn res_data_analysis(result: Result<ResponseData, reqwest::Error>, base_url: String, params: String) -> ResponseData {
         match result {
@@ -581,7 +587,7 @@ impl GateSwapRest {
                         Ok(data) => {
                             let label = data["label"].as_str().unwrap();
                             let mut error = ResponseData::error(res_data.label, label.parse().unwrap());
-                            error.data = format!("请求地址:{},请求参数:{}", base_url, params);
+                            error.message = format!("请求地址:{},请求参数:{}", base_url, params);
                             error
                         }
                         Err(e) => {

+ 5 - 5
exchanges/src/gate_swap_ws.rs

@@ -314,17 +314,17 @@ impl GateSwapWs {
     }
     //数据解析-ping
     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), Value::Null));
     }
     //数据解析-pong
     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), Value::Null));
     }
     //数据解析
     pub fn ok_text(text: String) -> ResponseData
     {
         // trace!("原始数据:{}", text);
-        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), Value::Null);
         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
 
         if json_value["channel"].as_str() == Option::from("futures.pong") {
@@ -338,11 +338,11 @@ impl GateSwapWs {
             res_data.message = mes.to_string();
         } else if json_value["result"]["status"].as_str() == Option::from("success") {//订阅返回
             res_data.code = "-201".to_string();
-            res_data.data = text;
+            res_data.data = json_value;
         } else {
             res_data.channel = format!("{}", json_value["channel"].as_str().unwrap());
             res_data.code = "200".to_string();
-            res_data.data = json_value["result"].to_string();
+            res_data.data = json_value["result"].clone();
         }
         res_data
     }

+ 35 - 35
exchanges/src/http_tool.rs

@@ -1,7 +1,7 @@
 use reqwest::{Client};
-use reqwest::header::HeaderMap;
 use tracing::trace;
 use crate::response_base::ResponseData;
+
 #[derive(Clone)]
 pub struct RestTool {
     pub base_url: String,
@@ -12,39 +12,40 @@ impl RestTool {
     pub fn new(base_url: String) -> RestTool {
         RestTool { base_url, client: Client::new() }
     }
-    pub async fn http_toll(&self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
-        let res_data: ResponseData;
-        /****请求接口与 地址*/
-        let url = format!("{}{}", self.base_url.to_string(), request_path);
-        let request_type = request_type.clone().to_uppercase();
-        let addrs_url = format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()));
-        // let params_json: serde_json::Value = serde_json::from_str(&params).unwrap();
-        // trace!("url:{}",url);
-        // trace!("addrs_url:{}",url);
-        // trace!("params_json:{}",params_json);
-        // trace!("headers:{:?}",headers);
-
-        let req = match request_type.as_str() {
-            "GET" => self.client.get(addrs_url.clone()).headers(headers),
-            "POST" => self.client.post(addrs_url.clone()).body(params).headers(headers),
-            "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
-            // "PUT" => self.client.put(url.clone()).json(&params),
-            _ => return Ok(ResponseData::error("".to_string(),format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
-        };
 
-        let response = req.send().await?;
-        if response.status().is_success() {
-            // 读取响应的内容
-            let body = response.text().await?;
-            // trace!("ok-----{}", body);
-            res_data = ResponseData::new("".to_string(),"200".to_string(), "success".to_string(), body);
-        } else {
-            let body = response.text().await?;
-            // trace!("error-----{}", body);
-            res_data = ResponseData::error("".to_string(),body.to_string())
-        }
-        Ok(res_data)
-    }
+    // pub async fn http_toll(&self, request_path: String, request_type: String, params: String, headers: HeaderMap) -> Result<ResponseData, reqwest::Error> {
+    //     let res_data: ResponseData;
+    //     /****请求接口与 地址*/
+    //     let url = format!("{}{}", self.base_url.to_string(), request_path);
+    //     let request_type = request_type.clone().to_uppercase();
+    //     let addrs_url = format!("{}?{}", url.clone(), RestTool::parse_params_to_str(params.clone()));
+    //     // let params_json: serde_json::Value = serde_json::from_str(&params).unwrap();
+    //     // trace!("url:{}",url);
+    //     // trace!("addrs_url:{}",url);
+    //     // trace!("params_json:{}",params_json);
+    //     // trace!("headers:{:?}",headers);
+    //
+    //     let req = match request_type.as_str() {
+    //         "GET" => self.client.get(addrs_url.clone()).headers(headers),
+    //         "POST" => self.client.post(addrs_url.clone()).body(params).headers(headers),
+    //         "DELETE" => self.client.delete(addrs_url.clone()).headers(headers),
+    //         // "PUT" => self.client.put(url.clone()).json(&params),
+    //         _ => return Ok(ResponseData::error("".to_string(),format!("错误的请求类型:{}", request_type.clone()))), // 处理未知请求类型
+    //     };
+    //
+    //     let response = req.send().await?;
+    //     if response.status().is_success() {
+    //         // 读取响应的内容
+    //         let body = response.text().await?;
+    //         // trace!("ok-----{}", body);
+    //         res_data = ResponseData::new("".to_string(),"200".to_string(), "success".to_string(), body);
+    //     } else {
+    //         let body = response.text().await?;
+    //         // trace!("error-----{}", body);
+    //         res_data = ResponseData::error("".to_string(),body.to_string())
+    //     }
+    //     Ok(res_data)
+    // }
 
     //map数据转 get请求参数
     pub fn parse_params_to_str(parameters: String) -> String {
@@ -78,8 +79,7 @@ impl RestTool {
                 if res_data.code != "0" {
                     res_data
                 } else {
-                    let body: String = res_data.data;
-                    let json_value: serde_json::Value = serde_json::from_str(&body).unwrap();
+                    let json_value= res_data.data;
 
                     let code = json_value["code"].as_str().unwrap();
                     let data = serde_json::to_string(&json_value["data"]).unwrap();

+ 4 - 3
exchanges/src/response_base.rs

@@ -1,3 +1,4 @@
+use serde_json::Value;
 use tokio::time::Instant;
 
 /**交易所返回数据处理之后,同意保存格式,为了内部其他接口调用*/
@@ -7,7 +8,7 @@ pub struct ResponseData {
     pub code: String,
     pub message: String,
     pub channel: String,
-    pub data: String,
+    pub data: Value,
     pub ins: Instant,           // 数据接收的ins
     pub time: i64,              // 数据接受的时间
     pub reach_time: i64,        // 远程数据时间 弃用
@@ -15,7 +16,7 @@ pub struct ResponseData {
 }
 
 impl ResponseData {
-    pub fn new(label: String, code: String, message: String, data: String) -> ResponseData {
+    pub fn new(label: String, code: String, message: String, data: Value) -> ResponseData {
         ResponseData {
             label,
             code,
@@ -33,7 +34,7 @@ impl ResponseData {
             label,
             code: "-1".to_string(),
             message: format!("{}", &message),
-            data: "".to_string(),
+            data: Value::Null,
             channel: "".to_string(),
             time: 0,
             reach_time: 0,

+ 10 - 10
exchanges/src/socket_tool.rs

@@ -8,7 +8,7 @@ use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use futures_util::{future, pin_mut, SinkExt, StreamExt};
 use futures_util::stream::{SplitSink, SplitStream};
 use ring::hmac;
-use serde_json::json;
+use serde_json::{json, Value};
 use tokio::net::TcpStream;
 use tokio::sync::Mutex;
 use tokio::time::Instant;
@@ -110,18 +110,18 @@ impl AbstractWsMode {
                         "-300" => {
                             //服务器发送心跳 ping 给客户端,客户端需要pong回应
                             trace!("服务器响应-ping");
-                            if data.data.len() > 0 {
+                            if data.data != Value::Null {
                                 let mut ws_write = ws_write_inner.lock().await;
-                                ws_write.send(Message::Pong(Vec::from(data.data))).await?;
+                                ws_write.send(Message::Pong(Vec::from(data.data.to_string()))).await?;
                                 trace!("客户端回应服务器-pong");
                             }
                         }
                         "-301" => {
                             //服务器发送心跳 pong 给客户端,客户端需要ping回应
                             trace!("服务器响应-pong");
-                            if data.data.len() > 0 {
+                            if data.data != Value::Null {
                                 let mut ws_write = ws_write_inner.lock().await;
-                                ws_write.send(Message::Ping(Vec::from(data.data))).await?;
+                                ws_write.send(Message::Ping(Vec::from(data.data.to_string()))).await?;
                                 trace!("客户端回应服务器-ping");
                             }
                         }
@@ -129,7 +129,7 @@ impl AbstractWsMode {
                             //客户端收到服务器心跳自定义,需要响应自定义
                             trace!("特定字符心跳,特殊响应:{:?}", data);
                             let mut ws_write = ws_write_inner.lock().await;
-                            ws_write.send(Message::Text(data.data)).await?;
+                            ws_write.send(Message::Text(data.data.to_string())).await?;
                             trace!("特殊字符心跳-回应完成");
                         }
                         _ => {
@@ -242,14 +242,14 @@ impl AbstractWsMode {
                 trace!("{:?}",message_str);
                 Option::from(ResponseData::new("".to_string(),
                                                "2".to_string(),
-                                               message_str, "".to_string()))
+                                               message_str, Value::Null))
             }
             Ok(Message::Close(c)) => {
                 let message_str = format!("关闭指令:{:?}", c);
                 trace!("{:?}",message_str);
                 Option::from(ResponseData::new("".to_string(),
                                                "0".to_string(),
-                                               message_str, "".to_string()))
+                                               message_str, Value::Null))
             }
             Ok(Message::Frame(f)) => {
                 //原始帧 正常读取数据不会读取到该 信息类型
@@ -257,14 +257,14 @@ impl AbstractWsMode {
                 trace!("{:?}",message_str);
                 Option::from(ResponseData::new("".to_string(),
                                                "-2".to_string(),
-                                               message_str, "".to_string()))
+                                               message_str, Value::Null))
             }
             Err(e) => {
                 let message_str = format!("服务器响应:{:?}", e);
                 trace!("{:?}",message_str);
                 Option::from(ResponseData::new("".to_string(),
                                                "-1".to_string(),
-                                               message_str, "".to_string()))
+                                               message_str, Value::Null))
             }
         }
     }

+ 0 - 0
standard/src/binance_handle.rs → standard/src/binance_swap_handle.rs


+ 6 - 6
standard/src/handle_info.rs

@@ -7,7 +7,7 @@ use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::public_params;
 use crate::exchange::ExchangeEnum;
-use crate::{Account, binance_handle, binance_spot_handle, bitget_spot_handle, gate_handle, kucoin_handle, kucoin_spot_handle, bybit_swap_handle, MarketOrder, okx_handle, Position, SpecialDepth, SpecialOrder, SpecialTicker};
+use crate::{Account, binance_swap_handle, binance_spot_handle, bitget_spot_handle, gate_handle, kucoin_handle, kucoin_spot_handle, bybit_swap_handle, MarketOrder, okx_handle, Position, SpecialDepth, SpecialOrder, SpecialTicker};
 
 #[allow(dead_code)]
 pub struct HandleSwapInfo;
@@ -60,7 +60,7 @@ impl HandleSwapInfo {
                 binance_spot_handle::handle_special_ticker(res_data)
             }
             ExchangeEnum::BinanceSwap => {
-                binance_handle::handle_special_ticker(res_data)
+                binance_swap_handle::handle_special_ticker(res_data)
             }
             ExchangeEnum::GateSwap => {
                 gate_handle::handle_special_ticker(res_data)
@@ -236,14 +236,14 @@ pub fn format_depth(exchange: ExchangeEnum, res_data: ResponseData) -> DepthPara
     let create_at: i64;
     match exchange {
         ExchangeEnum::BinanceSpot => {
-            depth_asks = binance_handle::format_depth_items(res_data_json["asks"].clone());
-            depth_bids = binance_handle::format_depth_items(res_data_json["bids"].clone());
+            depth_asks = binance_swap_handle::format_depth_items(res_data_json["asks"].clone());
+            depth_bids = binance_swap_handle::format_depth_items(res_data_json["bids"].clone());
             t = Decimal::from_str(&res_data_json["lastUpdateId"].to_string()).unwrap();
             create_at = 0;
         }
         ExchangeEnum::BinanceSwap => {
-            depth_asks = binance_handle::format_depth_items(res_data_json["a"].clone());
-            depth_bids = binance_handle::format_depth_items(res_data_json["b"].clone());
+            depth_asks = binance_swap_handle::format_depth_items(res_data_json["a"].clone());
+            depth_bids = binance_swap_handle::format_depth_items(res_data_json["b"].clone());
             t = Decimal::from_str(&res_data_json["u"].to_string()).unwrap();
             create_at = res_data_json["E"].as_i64().unwrap() * 1000;
         }

+ 1 - 1
standard/src/lib.rs

@@ -17,7 +17,7 @@ pub mod handle_info;
 // 引入binance模块
 mod binance_swap;
 mod binance_spot;
-pub mod binance_handle;
+pub mod binance_swap_handle;
 pub mod binance_spot_handle;
 // 引入gate模块
 mod gate_swap;