|
@@ -1,4 +1,3 @@
|
|
|
-use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
|
|
|
use std::sync::Arc;
|
|
use std::sync::Arc;
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
use std::time::Duration;
|
|
use std::time::Duration;
|
|
@@ -6,9 +5,7 @@ use std::time::Duration;
|
|
|
use chrono::Utc;
|
|
use chrono::Utc;
|
|
|
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
|
|
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
|
|
|
use futures_util::{future, pin_mut, SinkExt, StreamExt};
|
|
use futures_util::{future, pin_mut, SinkExt, StreamExt};
|
|
|
-use futures_util::stream::{SplitSink, SplitStream};
|
|
|
|
|
-use ring::hmac;
|
|
|
|
|
-use serde_json::{json, Value};
|
|
|
|
|
|
|
+use serde_json::{Value};
|
|
|
use tokio::net::TcpStream;
|
|
use tokio::net::TcpStream;
|
|
|
use tokio::sync::Mutex;
|
|
use tokio::sync::Mutex;
|
|
|
use tokio::time::Instant;
|
|
use tokio::time::Instant;
|
|
@@ -22,8 +19,8 @@ use crate::exchange::response_base::Response;
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
#[derive(Debug)]
|
|
|
pub enum HeartbeatType {
|
|
pub enum HeartbeatType {
|
|
|
- Ping,
|
|
|
|
|
- Pong,
|
|
|
|
|
|
|
+ // Ping,
|
|
|
|
|
+ // Pong,
|
|
|
Custom(String),
|
|
Custom(String),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -112,7 +109,7 @@ impl AbstractWsMode {
|
|
|
handle_function(data_c).await;
|
|
handle_function(data_c).await;
|
|
|
}
|
|
}
|
|
|
-200 => {
|
|
-200 => {
|
|
|
- //登录成功
|
|
|
|
|
|
|
+ // 登录成功
|
|
|
info!("ws登录成功:{:?}", data);
|
|
info!("ws登录成功:{:?}", data);
|
|
|
if is_first_login {
|
|
if is_first_login {
|
|
|
for s in &subscribe_array {
|
|
for s in &subscribe_array {
|
|
@@ -128,7 +125,7 @@ impl AbstractWsMode {
|
|
|
// trace!("订阅成功:{:?}", data);
|
|
// trace!("订阅成功:{:?}", data);
|
|
|
}
|
|
}
|
|
|
-300 => {
|
|
-300 => {
|
|
|
- //服务器发送心跳 ping 给客户端,客户端需要pong回应
|
|
|
|
|
|
|
+ // 服务器发送心跳 ping 给客户端,客户端需要pong回应
|
|
|
trace!("服务器响应-ping");
|
|
trace!("服务器响应-ping");
|
|
|
if data.data != Value::Null {
|
|
if data.data != Value::Null {
|
|
|
let mut ws_write = ws_write_inner.lock().await;
|
|
let mut ws_write = ws_write_inner.lock().await;
|
|
@@ -137,7 +134,7 @@ impl AbstractWsMode {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-301 => {
|
|
-301 => {
|
|
|
- //服务器发送心跳 pong 给客户端,客户端需要ping回应
|
|
|
|
|
|
|
+ // 服务器发送心跳 pong 给客户端,客户端需要ping回应
|
|
|
trace!("服务器响应-pong");
|
|
trace!("服务器响应-pong");
|
|
|
if data.data != Value::Null {
|
|
if data.data != Value::Null {
|
|
|
let mut ws_write = ws_write_inner.lock().await;
|
|
let mut ws_write = ws_write_inner.lock().await;
|
|
@@ -146,7 +143,7 @@ impl AbstractWsMode {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
-302 => {
|
|
-302 => {
|
|
|
- //客户端收到服务器心跳自定义,需要响应自定义
|
|
|
|
|
|
|
+ // 客户端收到服务器心跳自定义,需要响应自定义
|
|
|
trace!("特定字符心跳,特殊响应:{:?}", data);
|
|
trace!("特定字符心跳,特殊响应:{:?}", data);
|
|
|
let mut ws_write = ws_write_inner.lock().await;
|
|
let mut ws_write = ws_write_inner.lock().await;
|
|
|
ws_write.send(Message::Text(data.data.to_string())).await?;
|
|
ws_write.send(Message::Text(data.data.to_string())).await?;
|
|
@@ -185,8 +182,7 @@ impl AbstractWsMode {
|
|
|
F: Fn(Response) -> Future + Clone,
|
|
F: Fn(Response) -> Future + Clone,
|
|
|
Future: future::Future<Output=()> + Send + 'static,
|
|
Future: future::Future<Output=()> + Send + 'static,
|
|
|
{
|
|
{
|
|
|
- //1.是否走代理
|
|
|
|
|
- /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
|
|
|
|
|
|
|
+ // 1.是否走代理
|
|
|
let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
|
|
let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
|
|
|
ProxyResponseEnum::NO => {
|
|
ProxyResponseEnum::NO => {
|
|
|
// trace!("非 代理");
|
|
// trace!("非 代理");
|
|
@@ -220,19 +216,19 @@ impl AbstractWsMode {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- //心跳包
|
|
|
|
|
- pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
|
|
|
|
|
|
|
+ // 自动心跳包
|
|
|
|
|
+ pub async fn ping_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
|
|
|
loop {
|
|
loop {
|
|
|
tokio::time::sleep(Duration::from_millis(millis)).await;
|
|
tokio::time::sleep(Duration::from_millis(millis)).await;
|
|
|
let write_tx_clone = write_tx_clone.lock().await;
|
|
let write_tx_clone = write_tx_clone.lock().await;
|
|
|
match write_tx_clone.unbounded_send(
|
|
match write_tx_clone.unbounded_send(
|
|
|
match h_type {
|
|
match h_type {
|
|
|
- HeartbeatType::Ping => {
|
|
|
|
|
- Message::Ping(Vec::from("Ping"))
|
|
|
|
|
- }
|
|
|
|
|
- HeartbeatType::Pong => {
|
|
|
|
|
- Message::Pong(Vec::from("Pong"))
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // HeartbeatType::Ping => {
|
|
|
|
|
+ // Message::Ping(Vec::from("Ping"))
|
|
|
|
|
+ // }
|
|
|
|
|
+ // HeartbeatType::Pong => {
|
|
|
|
|
+ // Message::Pong(Vec::from("Pong"))
|
|
|
|
|
+ // }
|
|
|
HeartbeatType::Custom(ref str) => {
|
|
HeartbeatType::Custom(ref str) => {
|
|
|
Message::Text(str.parse().unwrap())
|
|
Message::Text(str.parse().unwrap())
|
|
|
}
|
|
}
|
|
@@ -245,22 +241,10 @@ impl AbstractWsMode {
|
|
|
error!("发送失败:原因{:?}",k)
|
|
error!("发送失败:原因{:?}",k)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // write_tx_clone.unbounded_send(
|
|
|
|
|
- // match h_type {
|
|
|
|
|
- // HeartbeatType::Ping => {
|
|
|
|
|
- // Message::Ping(Vec::from("Ping"))
|
|
|
|
|
- // }
|
|
|
|
|
- // HeartbeatType::Pong => {
|
|
|
|
|
- // Message::Pong(Vec::from("Pong"))
|
|
|
|
|
- // }
|
|
|
|
|
- // HeartbeatType::Custom(ref str) => {
|
|
|
|
|
- // Message::Text(str.parse().unwrap())
|
|
|
|
|
- // }
|
|
|
|
|
- // }
|
|
|
|
|
- // ).expect("发送失败");
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- //数据解析
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // 数据解析
|
|
|
pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
|
|
pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
|
|
|
message_text: T,
|
|
message_text: T,
|
|
|
message_ping: PI,
|
|
message_ping: PI,
|
|
@@ -294,165 +278,4 @@ impl AbstractWsMode {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- //发送数据
|
|
|
|
|
- pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
|
|
|
|
|
- let write_tx_clone = write_tx_clone.lock().await;
|
|
|
|
|
- write_tx_clone.unbounded_send(message.clone()).unwrap();
|
|
|
|
|
- trace!("发送指令:{:?}",message);
|
|
|
|
|
- true
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-//创建链接
|
|
|
|
|
-pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
|
|
|
|
|
- SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
|
|
|
|
|
- //1.是否走代理
|
|
|
|
|
- /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
|
|
|
|
|
- let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
|
|
|
|
|
- ProxyResponseEnum::NO => {
|
|
|
|
|
- trace!("非代理");
|
|
|
|
|
- None
|
|
|
|
|
- }
|
|
|
|
|
- ProxyResponseEnum::YES(proxy) => {
|
|
|
|
|
- trace!("代理");
|
|
|
|
|
- Option::from(proxy)
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
|
|
|
|
|
- trace!("WebSocket 握手完成。");
|
|
|
|
|
- ws_stream.split()
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-pub async fn client(add_url: String) {
|
|
|
|
|
- let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
|
|
|
|
|
- 127,
|
|
|
|
|
- 0,
|
|
|
|
|
- 0,
|
|
|
|
|
- 1)
|
|
|
|
|
- ), 7890);
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- //创建通道 开启线程,向通道写入数据
|
|
|
|
|
- let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
|
|
- let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
|
|
|
|
|
- tokio::spawn(write_sell(write_tx));
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- //创建socket,并且读写分离
|
|
|
|
|
- let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
|
|
|
|
|
- trace!("WebSocket handshake has been successfully completed");
|
|
|
|
|
- let (write, read) = ws_stream.split();
|
|
|
|
|
-
|
|
|
|
|
- //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
|
|
|
|
|
- let stdin_to_ws = write_rx.map(Ok).forward(write);
|
|
|
|
|
- let ws_to_stdout = {
|
|
|
|
|
- trace!("---1");
|
|
|
|
|
- //读,循环读取,然后拿到 message,,然后开启异步处理 message,
|
|
|
|
|
- let result = read.for_each(|message| async {
|
|
|
|
|
- read_tx.unbounded_send(message.unwrap()).unwrap();
|
|
|
|
|
- });
|
|
|
|
|
- trace!("---3");
|
|
|
|
|
- result
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- tokio::spawn(read_sell(read_rx));
|
|
|
|
|
-
|
|
|
|
|
- //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
|
|
|
|
|
- pin_mut!(stdin_to_ws, ws_to_stdout);
|
|
|
|
|
- future::select(stdin_to_ws, ws_to_stdout).await;
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-//模拟 业务场景中 发送指令给目标交易所
|
|
|
|
|
-async fn write_sell(tx: UnboundedSender<Message>) {
|
|
|
|
|
- let _str = json!({
|
|
|
|
|
- "op": "subscribe",
|
|
|
|
|
- "args": [
|
|
|
|
|
- {
|
|
|
|
|
- // "channel":"orders",
|
|
|
|
|
- // "instType":"SWAP",
|
|
|
|
|
- // "instFamily":"BTC-USDT"
|
|
|
|
|
- "channel":"books5",
|
|
|
|
|
- "instId":"BTC-USDT"
|
|
|
|
|
- }
|
|
|
|
|
- ]
|
|
|
|
|
- });
|
|
|
|
|
- let str_array: Vec<String> = vec![
|
|
|
|
|
- // log_in_to_str(),
|
|
|
|
|
- // str.to_string(),
|
|
|
|
|
- ];
|
|
|
|
|
-
|
|
|
|
|
- let i = 0;
|
|
|
|
|
- loop {
|
|
|
|
|
- if str_array.len() > i {
|
|
|
|
|
- let send_str = str_array.get(i).unwrap();
|
|
|
|
|
- tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
|
|
|
|
|
- }
|
|
|
|
|
- tokio::time::sleep(Duration::from_secs(5)).await;
|
|
|
|
|
- tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
|
|
|
|
|
- tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-async fn read_sell(mut rx: UnboundedReceiver<Message>) {
|
|
|
|
|
- loop {
|
|
|
|
|
- if let Some(message) = rx.next().await {
|
|
|
|
|
- match message {
|
|
|
|
|
- Message::Text(s) => {
|
|
|
|
|
- trace!("Text: {}", s);
|
|
|
|
|
- }
|
|
|
|
|
- Message::Binary(s) => {
|
|
|
|
|
- trace!("Binary: {:?}", s);
|
|
|
|
|
- }
|
|
|
|
|
- Message::Ping(s) => {
|
|
|
|
|
- trace!("Ping: {:?}", s);
|
|
|
|
|
- }
|
|
|
|
|
- Message::Pong(s) => {
|
|
|
|
|
- trace!("Pong: {:?}", s);
|
|
|
|
|
- }
|
|
|
|
|
- Message::Close(s) => {
|
|
|
|
|
- trace!("Close: {:?}", s);
|
|
|
|
|
- }
|
|
|
|
|
- Message::Frame(s) => {
|
|
|
|
|
- trace!("Frame: {:?}", s);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- tokio::time::sleep(Duration::from_millis(1)).await
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
-pub fn log_in_to_str() -> String {
|
|
|
|
|
- let mut login_json_str = "".to_string();
|
|
|
|
|
-
|
|
|
|
|
- let access_key: String = "".to_string();
|
|
|
|
|
- let secret_key: String = "".to_string();
|
|
|
|
|
- let passphrase: String = "".to_string();
|
|
|
|
|
-
|
|
|
|
|
- if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
|
|
|
|
|
- let timestamp = Utc::now().timestamp().to_string();
|
|
|
|
|
- // 时间戳 + 请求类型+ 请求参数字符串
|
|
|
|
|
- let message = format!("{}GET{}", timestamp, "/users/self/verify");
|
|
|
|
|
- let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
|
|
|
|
|
- let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
|
|
|
|
|
- let sign = base64::encode(result);
|
|
|
|
|
-
|
|
|
|
|
- let login_json = json!({
|
|
|
|
|
- "op": "login",
|
|
|
|
|
- "args": [{
|
|
|
|
|
- "apiKey": access_key,
|
|
|
|
|
- "passphrase": passphrase,
|
|
|
|
|
- "timestamp": timestamp,
|
|
|
|
|
- "sign": sign
|
|
|
|
|
- }]
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- // trace!("---login_json:{0}", login_json.to_string());
|
|
|
|
|
- // trace!("--登录:{}", login_json.to_string());
|
|
|
|
|
- login_json_str = login_json.to_string();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- login_json_str
|
|
|
|
|
-}
|
|
|