소스 검색

现货ws加入,还没有测试。

skyffire 6 달 전
부모
커밋
1b4d718512
7개의 변경된 파일967개의 추가작업 그리고 4개의 파일을 삭제
  1. 10 2
      Cargo.toml
  2. 0 0
      src/exchange/mexc_spot_client.rs
  3. 310 0
      src/exchange/mexc_spot_ws.rs
  4. 6 2
      src/exchange/mod.rs
  5. 134 0
      src/exchange/proxy.rs
  6. 50 0
      src/exchange/response_base.rs
  7. 457 0
      src/exchange/socket_tool.rs

+ 10 - 2
Cargo.toml

@@ -15,9 +15,17 @@ tokio = { version = "1", features = ["full"] }
 # - "tokio-native-tls": 与 tokio 集成,支持 HTTPS (SSL/TLS),使用系统的 TLS 实现
 reqwest = { version = "0.11", features = ["json", "tokio-native-tls"] }
 
+
+ring = "0.16.20"
+base64 = "0.13"
+
+futures-channel = "0.3.28"
+
+# 解压缩
+flate2 = "1.0"
+
 # WebSocket 客户端,基于 tokio 构建,用于订阅 K 线和深度
-# - "tokio-native-tls": 与 tokio 集成,支持 WSS (SSL/TLS)
-tokio-tungstenite = { version = "0.21", features = ["tokio-native-tls"] }
+tokio-tungstenite= { git = "https://github.com/HonestHouLiang/tokio-tungstenite.git",rev = "208fc9b09bcc2e2c8cb52e1cde5087446464fc91"  }
 
 # futures 工具库,提供一些异步编程中常用的 trait 和工具
 # 包含了 Stream 的一些方法,例如 split 用于分离 WebSocket stream 的读写端

+ 0 - 0
src/exchange/mexc_client.rs → src/exchange/mexc_spot_client.rs


+ 310 - 0
src/exchange/mexc_spot_ws.rs

@@ -0,0 +1,310 @@
+use std::io::Read;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+
+use flate2::read::GzDecoder;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use serde_json::json;
+use serde_json::Value;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::exchange::response_base::ResponseData;
+use crate::exchange::socket_tool::AbstractWsMode;
+
+//类型
+pub enum MexcSpotWsType {
+    PublicAndPrivate,
+}
+
+
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct MexcSpotWsParam {
+    pub token: String,
+    pub ws_url: String,
+    pub ws_ping_interval: i64,
+    pub ws_ping_timeout: i64,
+    pub is_ok_subscribe: bool,
+}
+
+//订阅频道
+#[derive(Clone)]
+pub enum MexcSpotWsSubscribeType {
+    // 深度
+    PuFuturesDepth,
+    // K线数据
+    PuFuturesRecords(String),
+}
+
+//账号信息
+#[derive(Clone, Debug)]
+pub struct MexcSpotWsLogin {
+    pub access_key: String,
+    pub secret_key: String,
+    pub pass_key: String,
+}
+
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct MexcSpotWs {
+    //类型
+    tag: String,
+    //地址
+    address_url: String,
+    //账号
+    login_param: Option<MexcSpotWsLogin>,
+    //登录数据
+    ws_param: MexcSpotWsParam,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<MexcSpotWsSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl MexcSpotWs {
+    // ============================================= 构造函数 ================================================
+    pub fn new_with_tag(tag: String, login_param: Option<MexcSpotWsLogin>, ws_type: MexcSpotWsType) -> MexcSpotWs {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            MexcSpotWsType::PublicAndPrivate => {
+                let url = "ws://wbs-api.mexc.com/ws".to_string();
+                url
+            }
+        };
+
+        /*******公共频道-私有频道数据组装*/
+        let ws_param = MexcSpotWsParam {
+            token: "".to_string(),
+            ws_url: "".to_string(),
+            ws_ping_interval: 0,
+            ws_ping_timeout: 0,
+            is_ok_subscribe: false,
+        };
+
+        MexcSpotWs {
+            tag,
+            address_url,
+            login_param,
+            ws_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 18,
+        }
+    }
+
+    // ============================================= 订阅函数 ================================================
+    // 手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<MexcSpotWsSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    // 手动添加币对
+    pub fn set_symbols(&mut self, mut symbol_array: Vec<String>) {
+        for symbol in symbol_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("_", "_");
+        }
+        self.symbol_s = symbol_array;
+    }
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                MexcSpotWsSubscribeType::PuFuturesRecords(_) => false,
+                MexcSpotWsSubscribeType::PuFuturesDepth => false,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    // 订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: MexcSpotWsSubscribeType) -> Value {
+        match subscribe_type {
+            // 深度
+            MexcSpotWsSubscribeType::PuFuturesDepth => {
+                json!({
+                    "method":"sub.depth",
+                    "param":{
+                        "symbol": symbol
+                    }
+                })
+            }
+            // k线
+            MexcSpotWsSubscribeType::PuFuturesRecords(interval) => {
+                json!({
+                    "method":"sub.kline",
+                    "param":{
+                        "symbol": symbol,
+                        "interval": interval
+                    }
+                })
+            }
+        }
+    }
+    // 订阅信息生成
+    pub fn get_subscription(&self) -> Vec<String> {
+        let mut array = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
+                array.push(ty_str.to_string());
+            }
+        }
+        array
+    }
+
+    // 链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let tag = self.tag.clone();
+        // let heartbeat_time = self.ws_param.ws_ping_interval.clone();
+
+        //心跳-- 方法内部线程启动
+        // let write_tx_clone1 = write_tx_am.clone();
+        // tokio::spawn(async move {
+        //     trace!("线程-异步心跳-开始");
+        //     AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
+        //     trace!("线程-异步心跳-结束");
+        // });
+
+
+        // 设置订阅
+        let subscribe_array = subscription.clone();
+        if login_is {
+            //登录相关
+        }
+
+        // 链接
+        let t2 = tokio::spawn(async move {
+            let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
+
+            loop {
+                info!("Mexc_usdt_swap socket 连接中……");
+                AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
+                                                 false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
+
+                error!("Mexc_usdt_swap socket 断连,1s以后重连……");
+                tokio::time::sleep(Duration::from_secs(1)).await;
+            }
+        });
+
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null))
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null))
+    }
+    //数据解析-二进制
+    pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
+        //二进制WebSocket消息
+        // let message_str = format!("Binary:{:?}", _po);
+        // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
+        // let result = String::from_utf8(bytes);
+        // let result = String::from_utf8(po);
+
+        let mut gz_decoder = GzDecoder::new(&po[..]);
+        let mut decompressed_data = Vec::new();
+
+        // 尝试解压数据
+        if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
+            // 将解压后的字节向量转换为 UTF-8 字符串
+            match String::from_utf8(decompressed_data) {
+                Ok(text) => {
+                    let response_data = Self::ok_text(text);
+                    return Option::from(response_data);
+                }
+                Err(_) => {
+                    return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+                }
+            }
+        } else {
+            return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
+        }
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
+        let json_value: Value = serde_json::from_str(&text).unwrap();
+
+        match json_value["channel"].as_str() {
+            Some(method) => {
+                if method.contains("pong") {
+                    return ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null);
+                } else if method.contains("rs.sub.") {
+                    //订阅响应
+                    let data = json_value["data"].as_str().unwrap();
+                    if method.contains(".depth") {
+                        res_data.channel = "futures.order_book".to_string();
+                    } else if method.contains(".kline") {
+                        res_data.channel = "futures.candlesticks".to_string();
+                    } else if method.contains(".deal") {
+                        res_data.channel = "futures.trades".to_string();
+                    } else {
+                        res_data.channel = "未知频道订阅".to_string();
+                    }
+
+                    if data == "success" {
+                        res_data.code = -201;
+                        res_data.message = "订阅成功".to_string();
+                    } else {
+                        res_data.code = 400;
+                        res_data.message = "订阅失败".to_string();
+                    }
+                } else if method.contains("push.") {
+                    if method.contains(".depth") {
+                        res_data.channel = "futures.order_book".to_string();
+                    } else if method.contains(".kline") {
+                        res_data.channel = "futures.candlesticks".to_string();
+                    } else if method.contains(".deal") {
+                        res_data.channel = "futures.trades".to_string();
+                    } else {
+                        res_data.channel = "未知频道推送".to_string();
+                    }
+                    res_data.code = 200;
+                    res_data.data = json_value.clone();
+                } else {
+                    res_data.code = -1;
+                    res_data.message = "未知解析".to_string();
+                }
+            }
+            None => {
+                res_data.code = -1;
+                res_data.message = "未知解析".to_string();
+            }
+        }
+
+        res_data
+    }
+}

+ 6 - 2
src/exchange/mod.rs

@@ -1,3 +1,7 @@
 mod types;
-mod mexc_client;
-mod ws_manager;
+mod ws_manager;
+mod mexc_spot_client;
+mod mexc_spot_ws;
+mod response_base;
+mod socket_tool;
+mod proxy;

+ 134 - 0
src/exchange/proxy.rs

@@ -0,0 +1,134 @@
+use std::env;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use tracing::trace;
+
+
+pub enum ProxyEnum {
+    REST,
+    WS,
+}
+
+pub enum ProxyResponseEnum {
+    NO,
+    YES(SocketAddr),
+}
+
+
+/**代理工具*/
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct ParsingDetail {
+    pub ip_address: String,
+    pub port: String,
+}
+
+impl ParsingDetail {
+    pub fn env_proxy(proxy_enum: ProxyEnum) -> ProxyResponseEnum {
+        let proxy_address = env::var("proxy_address");
+        // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
+        let ip_port = match proxy_address {
+            Ok(value) => {
+                trace!("环境变量读取成功:key:proxy_address , val:{}", value);
+                env::set_var("http_proxy", value.to_string());
+                env::set_var("https_proxy", value.to_string());
+                value
+            }
+            Err(_) => {
+                trace!("环境变量读取失败:'proxy_address'");
+                "".to_string()
+            }
+        };
+        if ip_port.len() > 0 {
+            match proxy_enum {
+                ProxyEnum::REST => {
+                    env::set_var("http_proxy", ip_port.to_string());
+                    env::set_var("https_proxy", ip_port.to_string());
+                    return ProxyResponseEnum::NO;
+                }
+                ProxyEnum::WS => {
+                    let ip_port: Vec<&str> = ip_port.split(":").collect();
+                    let ip_array: Vec<&str> = ip_port[0].split(".").collect();
+                    let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                        ip_array[0].parse().unwrap(),
+                        ip_array[1].parse().unwrap(),
+                        ip_array[2].parse().unwrap(),
+                        ip_array[3].parse().unwrap())
+                    ), ip_port[1].parse().unwrap());
+                    return ProxyResponseEnum::YES(proxy);
+                }
+            }
+        }
+        return ProxyResponseEnum::NO;
+    }
+
+    fn new(ip_address: String,
+           port: String, ) -> ParsingDetail {
+        ParsingDetail { ip_address, port }
+    }
+    //获取环境变量配置'proxy_address'拿到代理地址
+    pub fn parsing_environment_variables(is_unusual: Option<&str>) -> ParsingDetail {
+        let proxy_address_name = match is_unusual {
+            None => {
+                "proxy_address"
+            }
+            Some(v) => {
+                match v {
+                    "binance" => {
+                        "binance_proxy_address"
+                    }
+                    _ => {
+                        "proxy_address"
+                    }
+                }
+            }
+        };
+        let proxy_address = env::var(proxy_address_name);
+        // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
+        match proxy_address {
+            Ok(value) => {
+                trace!("环境变量读取成功:key:proxy_address , val:{}", value);
+                let ip_port: Vec<&str> = value.split(":").collect();
+                let parsing_detail = ParsingDetail::new(ip_port[0].to_string(), ip_port[1].to_string());
+                parsing_detail
+            }
+            Err(_) => {
+                trace!("环境变量读取失败:'proxy_address'");
+                let parsing_detail = ParsingDetail::new("".to_string(), "".to_string());
+                parsing_detail
+            }
+        }
+    }
+
+    //http请求是否开启代理:HTTP 只需要调用该方法即可
+    //原理是 设置全局代理,所以程序如果要走代理只需要执行一次,后续的get,post..都会走代理
+    pub fn http_enable_proxy(is_unusual: Option<&str>) -> bool {
+        //拿到环境变量解析的数据
+        let parsing_detail = Self::parsing_environment_variables(is_unusual);
+        if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
+            let http_proxy = format!("http://{}:{}", parsing_detail.ip_address, parsing_detail.port);
+            env::set_var("http_proxy", http_proxy.clone());
+            env::set_var("https_proxy", http_proxy.clone());
+            trace!("代理设置成功{0}", http_proxy.to_string());
+            true
+        } else {
+            trace!("无法开启代理:环境变量获取失败:{:?}", parsing_detail);
+            false
+        }
+    }
+
+    pub fn removes_proxy(is_unusual: Option<&str>) -> bool {
+        //拿到环境变量解析的数据
+        let parsing_detail = Self::parsing_environment_variables(is_unusual);
+        if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
+            env::remove_var("http_proxy");
+            env::remove_var("https_proxy");
+            true
+        } else {
+            false
+        }
+    }
+}
+
+
+
+

+ 50 - 0
src/exchange/response_base.rs

@@ -0,0 +1,50 @@
+use serde_json::Value;
+use tokio::time::Instant;
+
+/**交易所返回数据处理之后,统一保存格式,为了内部其他接口调用*/
+#[derive(Debug, Clone)]
+pub struct ResponseData {
+    pub label: String,
+    pub code: i16,
+    pub message: String,
+    pub channel: String,
+    pub data: Value,
+    pub ins: Instant,           // 数据接收的ins
+    pub time: i64,              // 数据接受的时间
+    pub reach_time: i64,        // 远程数据时间 弃用
+    pub data_type: String       // 數據類型, 例如 bybit 深度信息:snapshot(全量),delta(增量)
+}
+
+impl ResponseData {
+    pub fn new(label: String, code: i16, message: String, data: Value) -> ResponseData {
+        ResponseData {
+            label,
+            code,
+            message,
+            data,
+            channel: "".to_string(),
+            time: 0,
+            reach_time: 0,
+            data_type: String::new(),
+            ins: Instant::now(),
+        }
+    }
+    pub fn error(label: String, message: String) -> ResponseData {
+        ResponseData {
+            label,
+            code: -1,
+            message: format!("{}", &message),
+            data: Value::Null,
+            channel: "".to_string(),
+            time: 0,
+            reach_time: 0,
+            data_type: String::new(),
+            ins: Instant::now(),
+        }
+    }
+
+    pub fn to_string(&self) -> String {
+        format!("{:?}", self)
+    }
+}
+

+ 457 - 0
src/exchange/socket_tool.rs

@@ -0,0 +1,457 @@
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+
+use chrono::Utc;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use futures_util::{future, pin_mut, SinkExt, StreamExt};
+use futures_util::stream::{SplitSink, SplitStream};
+use ring::hmac;
+use serde_json::{json, Value};
+use tokio::net::TcpStream;
+use tokio::sync::Mutex;
+use tokio::time::Instant;
+use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace};
+
+use crate::exchange::proxy;
+use crate::exchange::proxy::{ProxyEnum, ProxyResponseEnum};
+use crate::exchange::response_base::ResponseData;
+
+#[derive(Debug)]
+pub enum HeartbeatType {
+    Ping,
+    Pong,
+    Custom(String),
+}
+
+pub struct AbstractWsMode {}
+
+impl AbstractWsMode {
+    pub async fn ws_connected<T, PI, PO, F, B, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                       is_first_login: bool,
+                                                       label: String,
+                                                       is_shutdown_arc: Arc<AtomicBool>,
+                                                       handle_function: &F,
+                                                       subscribe_array: Vec<String>,
+                                                       ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
+                                                       message_text: T,
+                                                       message_ping: PI,
+                                                       message_pong: PO,
+                                                       message_binary: B)
+        where T: Fn(String) -> Option<ResponseData> + Copy,
+              PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              F: Fn(ResponseData) -> Future + Clone,
+              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              Future: future::Future<Output=()> + Send + 'static,
+    {
+        let (ws_write, mut ws_read) = ws_stream.split();
+        let ws_write_arc = Arc::new(Mutex::new(ws_write));
+
+        // 将socket 的写操作与【写通道(外部向socket写)】链接起来,将数据以ok的结构体封装进行传递
+        // 这里是形成链式操作,如果要将外界的信息传进来(使用socket查单、下单之类的,部分交易所可以支持),就要这要弄
+        let mut write_to_socket_rx = write_to_socket_rx_arc.lock().await;
+        let ws_write_channel_clone = Arc::clone(&ws_write_arc);
+        let stdin_to_ws = async {
+            while let Some(message) = write_to_socket_rx.next().await {
+                let mut write_lock2 = ws_write_channel_clone.lock().await;
+                write_lock2.send(message).await?;
+            }
+            Ok::<(), Error>(())
+        };
+        // 如果不需要事先登录,则直接订阅消息
+        if !is_first_login {
+            info!("不需要先登录,订阅内容:{:?}", subscribe_array.clone());
+            for s in &subscribe_array {
+                let mut write_lock = ws_write_arc.lock().await;
+                write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+            }
+        }
+
+        let ws_write_inner = Arc::clone(&ws_write_arc);
+        let ws_to_stdout = async {
+            while let Some(message) = ws_read.next().await {
+                if !is_shutdown_arc.load(Ordering::Relaxed) {
+                    continue;
+                }
+
+                let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong, message_binary);
+                // let response_data = func(message);
+                if response_data.is_some() {
+                    let mut data = response_data.unwrap();
+                    data.label = label.clone();
+
+                    let code = data.code.clone();
+
+                    if code == 200 {
+                        let mut data_c = data.clone();
+                        data_c.ins = Instant::now();
+                        data_c.time = Utc::now().timestamp_millis();
+
+                        handle_function(data_c).await;
+                    }
+
+                    /*
+                        200 -正确返回
+                       -200 -登录成功
+                       -201 -订阅成功
+                       -300 -客户端收到服务器心跳ping,需要响应
+                       -301 -客户端收到服务器心跳pong,需要响应
+                       -302 -客户端收到服务器心跳自定义,需要响应自定义
+                    */
+                    match code {
+                        200 => {
+                            let mut data_c = data.clone();
+                            data_c.ins = Instant::now();
+                            data_c.time = Utc::now().timestamp_millis();
+
+                            handle_function(data_c).await;
+                        }
+                        -200 => {
+                            //登录成功
+                            info!("ws登录成功:{:?}", data);
+                            info!("订阅内容:{:?}", subscribe_array.clone());
+                            if is_first_login {
+                                for s in &subscribe_array {
+                                    let mut write_lock = ws_write_arc.lock().await;
+                                    write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+                                }
+                                info!("订阅完成!");
+                            }
+                        }
+                        -201 => {
+                            //订阅成功
+                            trace!("订阅成功:{:?}", data);
+                        }
+                        -300 => {
+                            //服务器发送心跳 ping 给客户端,客户端需要pong回应
+                            trace!("服务器响应-ping");
+                            if data.data != Value::Null {
+                                let mut ws_write = ws_write_inner.lock().await;
+                                ws_write.send(Message::Pong(Vec::from(data.data.to_string()))).await?;
+                                trace!("客户端回应服务器-pong");
+                            }
+                        }
+                        -301 => {
+                            //服务器发送心跳 pong 给客户端,客户端需要ping回应
+                            trace!("服务器响应-pong");
+                            if data.data != Value::Null {
+                                let mut ws_write = ws_write_inner.lock().await;
+                                ws_write.send(Message::Ping(Vec::from(data.data.to_string()))).await?;
+                                trace!("客户端回应服务器-ping");
+                            }
+                        }
+                        -302 => {
+                            //客户端收到服务器心跳自定义,需要响应自定义
+                            trace!("特定字符心跳,特殊响应:{:?}", data);
+                            let mut ws_write = ws_write_inner.lock().await;
+                            ws_write.send(Message::Text(data.data.to_string())).await?;
+                            trace!("特殊字符心跳-回应完成");
+                        }
+                        _ => {
+                            trace!("未知:{:?}",data);
+                        }
+                    }
+                }
+            }
+            Ok::<(), Error>(())
+        };
+
+        //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+        pin_mut!(stdin_to_ws, ws_to_stdout,);
+        future::select(stdin_to_ws, ws_to_stdout).await;
+    }
+
+    //创建链接
+    pub async fn ws_connect_async<T, PI, PO, F, B, Future>(is_shutdown_arc: Arc<AtomicBool>,
+                                                           handle_function: F,
+                                                           address_url: String,
+                                                           is_first_login: bool,
+                                                           label: String,
+                                                           subscribe_array: Vec<String>,
+                                                           write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                           message_text: T,
+                                                           message_ping: PI,
+                                                           message_pong: PO,
+                                                           message_binary: B)
+        where T: Fn(String) -> Option<ResponseData> + Copy,
+              PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              B: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
+              F: Fn(ResponseData) -> Future + Clone,
+              Future: future::Future<Output=()> + Send + 'static,
+    {
+        //1.是否走代理
+        /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+        let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
+            ProxyResponseEnum::NO => {
+                // trace!("非 代理");
+                None
+            }
+            ProxyResponseEnum::YES(proxy) => {
+                // trace!("代理");
+                Option::from(proxy)
+            }
+        };
+
+        match connect_async(address_url.clone(), proxy).await {
+            Ok((ws_stream, _)) => {
+                info!("socket 链接成功,{}。", address_url);
+
+                Self::ws_connected(write_to_socket_rx_arc,
+                                   is_first_login,
+                                   label,
+                                   is_shutdown_arc,
+                                   &handle_function,
+                                   subscribe_array.clone(),
+                                   ws_stream,
+                                   message_text,
+                                   message_ping,
+                                   message_pong,
+                                   message_binary).await;
+            }
+            Err(e) => {
+                error!("WebSocket 握手失败:{:?}", e);
+            }
+        }
+    }
+
+    //心跳包
+    pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
+        loop {
+            tokio::time::sleep(Duration::from_millis(millis)).await;
+            let write_tx_clone = write_tx_clone.lock().await;
+            match write_tx_clone.unbounded_send(
+                match h_type {
+                    HeartbeatType::Ping => {
+                        Message::Ping(Vec::from("Ping"))
+                    }
+                    HeartbeatType::Pong => {
+                        Message::Pong(Vec::from("Pong"))
+                    }
+                    HeartbeatType::Custom(ref str) => {
+                        Message::Text(str.parse().unwrap())
+                    }
+                }
+            ) {
+                Ok(_o) => {
+                    trace!("发送指令-心跳:{:?}",h_type);
+                }
+                Err(k) => {
+                    error!("发送失败:原因{:?}",k)
+                }
+            }
+            // write_tx_clone.unbounded_send(
+            //     match h_type {
+            //         HeartbeatType::Ping => {
+            //             Message::Ping(Vec::from("Ping"))
+            //         }
+            //         HeartbeatType::Pong => {
+            //             Message::Pong(Vec::from("Pong"))
+            //         }
+            //         HeartbeatType::Custom(ref str) => {
+            //             Message::Text(str.parse().unwrap())
+            //         }
+            //     }
+            // ).expect("发送失败");
+        }
+    }
+    //数据解析
+    pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
+                                          message_text: T,
+                                          message_ping: PI,
+                                          message_pong: PO,
+                                          message_binary: B) -> Option<ResponseData>
+        where T: Fn(String) -> Option<ResponseData>,
+              PI: Fn(Vec<u8>) -> Option<ResponseData>,
+              PO: Fn(Vec<u8>) -> Option<ResponseData>,
+              B: Fn(Vec<u8>) -> Option<ResponseData>
+    {
+        match message {
+            Ok(Message::Text(text)) => message_text(text),
+            Ok(Message::Ping(pi)) => message_ping(pi),
+            Ok(Message::Pong(po)) => message_pong(po),
+            Ok(Message::Binary(s)) => message_binary(s), //二进制WebSocket消息
+            Ok(Message::Close(c)) => {
+                let message_str = format!("关闭指令:{:?}", c);
+                trace!("{:?}",message_str);
+                Option::from(ResponseData::new("".to_string(), 0, message_str, Value::Null))
+            }
+            Ok(Message::Frame(f)) => {
+                //原始帧 正常读取数据不会读取到该 信息类型
+                let message_str = format!("意外读取到原始帧:{:?}", f);
+                trace!("{:?}",message_str);
+                Option::from(ResponseData::new("".to_string(), -2, message_str, Value::Null))
+            }
+            Err(e) => {
+                let message_str = format!("服务器响应:{:?}", e);
+                trace!("{:?}",message_str);
+                Option::from(ResponseData::new("".to_string(), -1, message_str, Value::Null))
+            }
+        }
+    }
+    //发送数据
+    pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
+        let write_tx_clone = write_tx_clone.lock().await;
+        write_tx_clone.unbounded_send(message.clone()).unwrap();
+        trace!("发送指令:{:?}",message);
+        true
+    }
+}
+
+//创建链接
+pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
+                                                       SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
+    //1.是否走代理
+    /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+    let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
+        ProxyResponseEnum::NO => {
+            trace!("非代理");
+            None
+        }
+        ProxyResponseEnum::YES(proxy) => {
+            trace!("代理");
+            Option::from(proxy)
+        }
+    };
+
+    let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
+    trace!("WebSocket 握手完成。");
+    ws_stream.split()
+}
+
+
+pub async fn client(add_url: String) {
+    let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+        127,
+        0,
+        0,
+        1)
+    ), 7890);
+
+
+    //创建通道 开启线程,向通道写入数据
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
+    tokio::spawn(write_sell(write_tx));
+
+
+    //创建socket,并且读写分离
+    let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
+    trace!("WebSocket handshake has been successfully completed");
+    let (write, read) = ws_stream.split();
+
+    //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+    let stdin_to_ws = write_rx.map(Ok).forward(write);
+    let ws_to_stdout = {
+        trace!("---1");
+        //读,循环读取,然后拿到 message,,然后开启异步处理 message,
+        let result = read.for_each(|message| async {
+            read_tx.unbounded_send(message.unwrap()).unwrap();
+        });
+        trace!("---3");
+        result
+    };
+
+    tokio::spawn(read_sell(read_rx));
+
+    //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+    pin_mut!(stdin_to_ws, ws_to_stdout);
+    future::select(stdin_to_ws, ws_to_stdout).await;
+}
+
+
+//模拟 业务场景中 发送指令给目标交易所
+async fn write_sell(tx: UnboundedSender<Message>) {
+    let _str = json!({
+                "op": "subscribe",
+                "args": [
+                    {
+                        // "channel":"orders",
+                        // "instType":"SWAP",
+                        // "instFamily":"BTC-USDT"
+                        "channel":"books5",
+                        "instId":"BTC-USDT"
+                    }
+                ]
+            });
+    let str_array: Vec<String> = vec![
+        // log_in_to_str(),
+        // str.to_string(),
+    ];
+
+    let i = 0;
+    loop {
+        if str_array.len() > i {
+            let send_str = str_array.get(i).unwrap();
+            tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
+        }
+        tokio::time::sleep(Duration::from_secs(5)).await;
+        tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
+        tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
+    }
+}
+
+async fn read_sell(mut rx: UnboundedReceiver<Message>) {
+    loop {
+        if let Some(message) = rx.next().await {
+            match message {
+                Message::Text(s) => {
+                    trace!("Text: {}", s);
+                }
+                Message::Binary(s) => {
+                    trace!("Binary: {:?}", s);
+                }
+                Message::Ping(s) => {
+                    trace!("Ping: {:?}", s);
+                }
+                Message::Pong(s) => {
+                    trace!("Pong: {:?}", s);
+                }
+                Message::Close(s) => {
+                    trace!("Close: {:?}", s);
+                }
+                Message::Frame(s) => {
+                    trace!("Frame: {:?}", s);
+                }
+            }
+        }
+        tokio::time::sleep(Duration::from_millis(1)).await
+    }
+}
+
+pub fn log_in_to_str() -> String {
+    let mut login_json_str = "".to_string();
+
+    let access_key: String = "".to_string();
+    let secret_key: String = "".to_string();
+    let passphrase: String = "".to_string();
+
+    if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
+        let timestamp = Utc::now().timestamp().to_string();
+        // 时间戳 + 请求类型+ 请求参数字符串
+        let message = format!("{}GET{}", timestamp, "/users/self/verify");
+        let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+        let sign = base64::encode(result);
+
+        let login_json = json!({
+            "op": "login",
+            "args": [{
+                "apiKey": access_key,
+                "passphrase": passphrase,
+                "timestamp": timestamp,
+                "sign": sign
+            }]
+        });
+
+        // trace!("---login_json:{0}", login_json.to_string());
+        // trace!("--登录:{}", login_json.to_string());
+        login_json_str = login_json.to_string();
+    }
+
+    login_json_str
+}