|
|
@@ -13,7 +13,7 @@ use tokio::net::TcpStream;
|
|
|
use tokio::sync::Mutex;
|
|
|
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
|
|
|
use tokio_tungstenite::tungstenite::{Error, Message};
|
|
|
-use tracing::trace;
|
|
|
+use tracing::{trace};
|
|
|
|
|
|
use crate::proxy;
|
|
|
use crate::proxy::{ProxyEnum, ProxyResponseEnum};
|
|
|
@@ -85,6 +85,10 @@ impl AbstractWsMode {
|
|
|
let write_clone3 = Arc::clone(&write_arc);
|
|
|
let ws_to_stdout = async {
|
|
|
while let Some(message) = read.next().await {
|
|
|
+ if !bool_v1.load(Ordering::Relaxed) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
let mut write_lock3 = write_clone3.lock().await;
|
|
|
let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
|
|
|
// let response_data = func(message);
|
|
|
@@ -102,7 +106,25 @@ impl AbstractWsMode {
|
|
|
*/
|
|
|
match code.as_str() {
|
|
|
"200" => {
|
|
|
- if bool_v1.load(Ordering::Relaxed) {
|
|
|
+ if data.label.contains("gate_usdt_swap") {
|
|
|
+ if data.channel == "futures.order_book" && read_tx.is_empty() {
|
|
|
+ read_tx.unbounded_send(data).unwrap();
|
|
|
+ } else {
|
|
|
+ read_tx.unbounded_send(data).unwrap();
|
|
|
+ }
|
|
|
+ } else if data.label.contains("binance_usdt_swap") {
|
|
|
+ if data.channel == "bookTicker" && read_tx.is_empty() {
|
|
|
+ read_tx.unbounded_send(data).unwrap();
|
|
|
+ } else {
|
|
|
+ read_tx.unbounded_send(data).unwrap();
|
|
|
+ }
|
|
|
+ } else if data.label.contains("bybit_usdt_swap") {
|
|
|
+ if data.channel == "orderbook" && read_tx.is_empty() {
|
|
|
+ read_tx.unbounded_send(data).unwrap();
|
|
|
+ } else {
|
|
|
+ read_tx.unbounded_send(data).unwrap();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
read_tx.unbounded_send(data).unwrap();
|
|
|
}
|
|
|
}
|