浏览代码

异步 token-tu的库,尝试阶段,尚不完善,不可替换项目当前使用 tu库

hl 2 年之前
父节点
当前提交
64cf9940bf
共有 5 个文件被更改,包括 447 次插入3 次删除
  1. 10 2
      exchanges/Cargo.toml
  2. 51 0
      exchanges/src/proxy.rs
  3. 284 0
      exchanges/src/socket_tool.rs
  4. 1 1
      exchanges/src/utils.rs
  5. 101 0
      exchanges/tests/socket_tool_test.rs

+ 10 - 2
exchanges/Cargo.toml

@@ -9,6 +9,12 @@ edition = "2021"
 # json
 serde_json = "1.0.104"
 tungstenite = { git = "https://github.com/PrivateRookie/tungstenite-rs.git", rev = "1d9289276518e5ab7e5194126d40b441d8938375" }
+#tungstenite = { git = "https://github.com/PrivateRookie/tungstenite-rs.git", rev = "f368f3087d50d97658fda5337550e587bb1ba1b6" }
+
+tokio-tungstenite= { git = "https://github.com/HonestHouLiang/tokio-tungstenite.git",rev = "208fc9b09bcc2e2c8cb52e1cde5087446464fc91"  }
+futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
+futures-channel = "0.3.28"
+
 url = "2.4.0"
 base64 = "0.13"
 tokio = { version = "1.31.0", features = ["full"] }
@@ -23,7 +29,7 @@ data-encoding = "2.4.0"
 
 hmac = "0.8.1"
 sha2 = "0.9.8"
-tokio-tungstenite = "0.14"
+#tokio-tungstenite = "0.14"
 
 ##代替f64避免精度丢失
 rust_decimal = "1.32.0"
@@ -33,4 +39,6 @@ rust_decimal_macros = "1.32.0"
 ##日志
 global = { path="../global" }
 tracing = "0.1"
-tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
+tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
+
+##

+ 51 - 0
exchanges/src/proxy.rs

@@ -1,6 +1,19 @@
 use std::env;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use tracing::trace;
 
+
+pub enum ProxyEnum {
+    REST,
+    WS,
+}
+
+pub enum ProxyResponseEnum {
+    NO,
+    YES(SocketAddr),
+}
+
+
 /**代理工具*/
 #[derive(Debug)]
 #[derive(Clone)]
@@ -10,6 +23,44 @@ pub struct ParsingDetail {
 }
 
 impl ParsingDetail {
+    pub fn env_proxy(proxy_enum: ProxyEnum) -> ProxyResponseEnum {
+        let proxy_address = env::var("proxy_address");
+        // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
+        let ip_port = match proxy_address {
+            Ok(value) => {
+                trace!("环境变量读取成功:key:proxy_address , val:{}", value);
+                env::set_var("http_proxy", value.to_string());
+                env::set_var("https_proxy", value.to_string());
+                value
+            }
+            Err(_) => {
+                trace!("环境变量读取失败:'proxy_address'");
+                "".to_string()
+            }
+        };
+        if ip_port.len() > 0 {
+            match proxy_enum {
+                ProxyEnum::REST => {
+                    env::set_var("http_proxy", ip_port.to_string());
+                    env::set_var("https_proxy", ip_port.to_string());
+                return     ProxyResponseEnum::NO;
+                }
+                ProxyEnum::WS => {
+                    let ip_port: Vec<&str> = ip_port.split(":").collect();
+                    let ip_array: Vec<&str> = ip_port[0].split(".").collect();
+                    let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                        ip_array[0].parse().unwrap(),
+                        ip_array[1].parse().unwrap(),
+                        ip_array[2].parse().unwrap(),
+                        ip_array[3].parse().unwrap())
+                    ), ip_port[1].parse().unwrap());
+              return       ProxyResponseEnum::YES(proxy);
+                }
+            }
+        }
+        return  ProxyResponseEnum::NO;
+    }
+
     fn new(ip_address: String,
            port: String, ) -> ParsingDetail {
         ParsingDetail { ip_address, port }

+ 284 - 0
exchanges/src/socket_tool.rs

@@ -0,0 +1,284 @@
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use url::Url;
+
+
+use std::time::Duration;
+use chrono::Utc;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+
+use futures_util::{future, pin_mut, StreamExt};
+use futures_util::stream::{SplitSink, SplitStream};
+use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
+use tokio_tungstenite::tungstenite::{Error, Message};
+use ring::hmac;
+use serde_json::json;
+use tokio::net::TcpStream;
+use tokio_tungstenite::tungstenite::error::UrlError;
+use tracing::trace;
+use tungstenite::client::IntoClientRequest;
+use crate::proxy;
+use crate::proxy::{ProxyEnum, ProxyResponseEnum};
+
+pub struct BinanceSwapModel {
+    address_url: String,
+}
+
+impl BinanceSwapModel {
+    pub fn new() -> OkxSwapModel {
+        OkxSwapModel {
+            // ws: AbstractWsMode::new(),
+            address_url: "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string(),
+        }
+    }
+    //链接
+    pub async fn ws_connect_async(&self, write_rx: UnboundedReceiver<Message>, read_tx: UnboundedSender<Message>) ->  Result<(), Error> {
+        AbstractWsMode::ws_connect_async(self.address_url.clone(), write_rx, read_tx).await
+    }
+}
+
+
+pub struct OkxSwapModel {
+    // ws: AbstractWsMode,
+    address_url: String,
+}
+
+impl OkxSwapModel {
+    pub fn new() -> OkxSwapModel {
+        OkxSwapModel {
+            // ws: AbstractWsMode::new(),
+            address_url: "wss://ws.okx.com:8443/ws/v5/public".to_string(),
+        }
+    }
+    //链接
+    pub async fn ws_connect_async(&self, write_rx: UnboundedReceiver<Message>, read_tx: UnboundedSender<Message>) ->  Result<(), Error> {
+        AbstractWsMode::ws_connect_async(self.address_url.clone(), write_rx, read_tx).await
+    }
+
+    // //心跳包发送
+    // pub fn send_ping_or_pong(){
+    // }
+}
+
+// // 抽象ws 接口
+// pub trait AbstractWs {
+//     fn new() -> Self;
+//     //创建链接
+//     fn ws_connect_async(&mut self) -> Pin<Box<dyn Future<Output=Result<bool, Error>> + Send>>;
+//     //返回通道(发送指令通道)
+//     fn send_message(&self, text: String);
+//     //返回通道(数据接受通道)
+//     // fn get_read_rx(&self) -> UnboundedReceiver<Message>;
+// }
+
+pub struct AbstractWsMode {}
+
+impl AbstractWsMode
+    where SplitSink<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tokio_tungstenite::tungstenite::Message>: futures_util::Sink<Message>,
+{
+    //创建链接
+    pub async fn ws_connect_async(address_url: String, write_rx: UnboundedReceiver<Message>, read_tx: UnboundedSender<Message>) -> Result<(), Error> {
+        //1.是否走代理
+        /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+        let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
+            ProxyResponseEnum::NO => {
+                trace!("非 代理");
+                None
+            }
+            ProxyResponseEnum::YES(proxy) => {
+                trace!("代理");
+                Option::from(proxy)
+            }
+        };
+        match connect_async(address_url, proxy).await {
+            Ok((ws_stream, _)) => {
+                trace!("WebSocket 握手完成。");
+                tokio::spawn(async move {
+                    let (write, read) = ws_stream.split();
+
+                    //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+                    let stdin_to_ws = write_rx.map(Ok).forward(write);
+                    let ws_to_stdout = {
+                        trace!("---1");
+                        //读,循环读取,然后拿到 message,,然后开启异步处理 message,
+                        let result = read.for_each(|message| async {
+                            read_tx.unbounded_send(message.unwrap()).unwrap();
+                        });
+                        trace!("---3");
+                        result
+                    };
+
+                    //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+                    pin_mut!(stdin_to_ws, ws_to_stdout,);
+                    future::select(stdin_to_ws, ws_to_stdout).await;
+                    trace!("---5");
+                });
+                trace!("---4");
+                Ok(())
+            }
+            Err(e) => {
+                trace!("链接失败");
+                Err(Error::Url(UrlError::UnableToConnect("连接失败".to_string())))
+            }
+        }
+    }
+
+}
+
+
+//创建链接
+pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
+                                                       SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
+    //1.是否走代理
+    /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
+    let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
+    let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
+        ProxyResponseEnum::NO => {
+            trace!("非 代理");
+            None
+        }
+        ProxyResponseEnum::YES(proxy) => {
+            trace!("代理");
+            Option::from(proxy)
+        }
+    };
+
+    let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
+    trace!("WebSocket 握手完成。");
+    ws_stream.split()
+}
+
+
+pub async fn client(add_url: String) {
+    let request = Url::parse(add_url.as_str()).expect("Can't connect to case count URL");
+    let client_request = request.into_client_request().unwrap();
+
+    let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+        127,
+        0,
+        0,
+        1)
+    ), 7890);
+
+
+    //创建通道 开启线程,向通道写入数据
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
+    tokio::spawn(write_sell(write_tx));
+
+
+    //创建socket,,并且读写分离
+    let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
+    trace!("WebSocket handshake has been successfully completed");
+    let (write, read) = ws_stream.split();
+
+    //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
+    let stdin_to_ws = write_rx.map(Ok).forward(write);
+    let ws_to_stdout = {
+        trace!("---1");
+        //读,循环读取,然后拿到 message,,然后开启异步处理 message,
+        let result = read.for_each(|message| async {
+            read_tx.unbounded_send(message.unwrap()).unwrap();
+        });
+        trace!("---3");
+        result
+    };
+
+    tokio::spawn(read_sell(read_rx));
+
+    //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+    pin_mut!(stdin_to_ws, ws_to_stdout);
+    future::select(stdin_to_ws, ws_to_stdout).await;
+}
+
+
+//模拟 业务场景中 发送指令给目标交易所
+async fn write_sell(tx: futures_channel::mpsc::UnboundedSender<Message>) {
+    let str = serde_json::json!({
+                "op": "subscribe",
+                "args": [
+                        {
+                        // "channel":"orders",
+                        // "instType":"SWAP",
+                        // "instFamily":"BTC-USDT"
+                        "channel":"books5",
+                        "instId":"BTC-USDT"
+                        }
+                    ]
+            });
+    let str_array: Vec<String> = vec![
+        // log_in_to_str(),
+        // str.to_string(),
+    ];
+
+    let i = 0;
+    loop {
+        if str_array.len() > i {
+            let send_str = str_array.get(i).unwrap();
+            tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
+        }
+        tokio::time::sleep(Duration::from_secs(5)).await;
+        tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
+        tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
+    }
+}
+
+
+async fn read_sell(mut rx: futures_channel::mpsc::UnboundedReceiver<Message>) {
+    loop {
+        if let Some(message) = rx.next().await {
+            match message {
+                Message::Text(s) => {
+                    trace!("Text: {}", s);
+                }
+                Message::Binary(s) => {
+                    trace!("Binary: {:?}", s);
+                }
+                Message::Ping(s) => {
+                    trace!("Ping: {:?}", s);
+                }
+                Message::Pong(s) => {
+                    trace!("Pong: {:?}", s);
+                }
+                Message::Close(s) => {
+                    trace!("Close: {:?}", s);
+                }
+                Message::Frame(s) => {
+                    trace!("Frame: {:?}", s);
+                }
+            }
+        }
+        tokio::time::sleep(Duration::from_millis(1)).await
+    }
+}
+
+
+pub fn log_in_to_str() -> String {
+    let mut login_json_str = "".to_string();
+
+    let mut access_key: String = "b812814f-b792-432c-9c65-44ebe3b8976b".to_string();
+    let mut secret_key: String = "D6AF4764DF5FDC74BF2CC88A4A8FD035".to_string();
+    let mut passphrase: String = "Astest!@#1".to_string();
+
+    if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
+        let timestamp = Utc::now().timestamp().to_string();
+        // 时间戳 + 请求类型+ 请求参数字符串
+        let message = format!("{}GET{}", timestamp, "/users/self/verify");
+        let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
+        let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
+        let sign = base64::encode(result);
+
+        let login_json = json!({
+                              "op": "login",
+                                "args": [{
+                                "apiKey": access_key,
+                                "passphrase": passphrase,
+                                "timestamp": timestamp,
+                                "sign": sign  }]
+                        });
+
+        // trace!("---login_json:{0}", login_json.to_string());
+        // trace!("--登陆:{}", login_json.to_string());
+        login_json_str = login_json.to_string();
+    }
+    login_json_str
+}

+ 1 - 1
exchanges/src/utils.rs

@@ -2,7 +2,7 @@ use chrono::Utc;
 
 pub fn get_time_microsecond() -> i64 {
     let now = Utc::now();
-    let total_micros = now.timestamp_micros();
+    let total_micros = now.timestamp_micros(); //微妙
     total_micros
 }
 

+ 101 - 0
exchanges/tests/socket_tool_test.rs

@@ -0,0 +1,101 @@
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::time::Duration;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use futures_util::StreamExt;
+use tokio::sync::mpsc::{channel, Sender};
+use tokio::try_join;
+use tokio_tungstenite::client_async;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::trace;
+use exchanges::binance_swap_rest::BinanceSwapRest;
+use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
+use exchanges::response_base::ResponseData;
+use exchanges::socket_tool::{BinanceSwapModel, OkxSwapModel};
+
+//ws-订阅公共频道信息
+#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+async fn ws_custom_subscribe() {
+    global::log_utils::init_log_with_trace();
+
+    // 币安订阅
+    // let base_url = "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string();
+    // okx 公共订阅
+    // let str = serde_json::json!({
+    //             "op": "subscribe",
+    //             "args": [
+    //                     {
+    //                     "channel":"books5",
+    //                     "instId":"BTC-USDT"
+    //                     }
+    //                 ]
+    //         });
+
+    //创建通道,发送指令通道, 与读取推送数据通道
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+
+    //对象
+    let model = BinanceSwapModel::new();
+    let is_ok = model.ws_connect_async(write_rx, read_tx).await;
+    //写数据,订阅频道,有些是Url跟参数,有些是单独发送订阅字符串
+    // write_tx.unbounded_send(Message::Text(str.to_string())).expect("发送失败");
+    match is_ok {
+        Ok(_) => {
+            let t1 = tokio::spawn(async move {
+                loop {
+                    if let Some(message) = read_rx.next().await {
+                        match message {
+                            Message::Text(s) => {
+                                trace!("Text: {}",s);
+                                //okx 数据解析
+                                // let json_value: serde_json::Value = serde_json::from_str(&s).unwrap();
+                                // if json_value.get("event").is_some() {//订阅返回
+                                //     if json_value["event"].as_str() == Option::from("login") &&
+                                //         json_value["code"].as_str() == Option::from("0") {
+                                //     } else if json_value["event"].as_str() == Option::from("error") {
+                                //     } else if json_value["event"].as_str() == Option::from("subscribe") {
+                                //     } else {
+                                //     }
+                                // } else {
+                                //     if json_value.get("arg").is_some() && json_value.get("data").is_some() {
+                                //         trace!("Text: {}",json_value["data"][0]["ts"].as_str().unwrap());
+                                //     } else {
+                                //     }
+                                // }
+                            }
+                            Message::Binary(s) => {
+                                trace!("Binary: {:?}", s);
+                            }
+                            Message::Ping(s) => {
+                                trace!("Ping: {:?}", s);
+                            }
+                            Message::Pong(s) => {
+                                trace!("Pong: {:?}", s);
+                            }
+                            Message::Close(s) => {
+                                trace!("Close: {:?}", s);
+                            }
+                            Message::Frame(s) => {
+                                trace!("Frame: {:?}", s);
+                            }
+                        }
+                    }
+                }
+            });
+            try_join!(t1).unwrap();
+        }
+        _ => {}
+    }
+}
+
+
+fn get_ws(btree_map: BTreeMap<String, String>, tx: Sender<ResponseData>) -> BinanceSwapWs {
+    let binance_ws = BinanceSwapWs::new(false,
+                                        btree_map,
+                                        BinanceWsType::PublicAndPrivate,
+                                        tx);
+    binance_ws
+}