瀏覽代碼

Merge branch 'fix_bybit_01' into test

# Conflicts:
#	exchanges/src/socket_tool.rs
skyfffire 1 年之前
父節點
當前提交
9499e75234
共有 3 個文件被更改,包括 22 次插入40 次删除
  1. 7 37
      exchanges/src/socket_tool.rs
  2. 9 2
      standard/src/bybit_swap.rs
  3. 6 1
      strategy/src/quant.rs

+ 7 - 37
exchanges/src/socket_tool.rs

@@ -10,7 +10,6 @@ use futures_util::stream::{SplitSink, SplitStream};
 use ring::hmac;
 use serde_json::json;
 use tokio::net::TcpStream;
-use tokio::{spawn};
 use tokio::sync::Mutex;
 use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
 use tokio_tungstenite::tungstenite::{Error, Message};
@@ -57,9 +56,6 @@ impl AbstractWsMode {
             }
         };
 
-        let read_arc = Arc::new(Mutex::new(read_tx));
-
-        let read1 = read_arc.clone();
         loop {
             match connect_async(address_url.clone(), proxy).await {
                 Ok((ws_stream, _)) => {
@@ -84,7 +80,7 @@ impl AbstractWsMode {
                             let mut write_lock2 = write_clone2.lock().await;
                             write_lock2.send(message).await?;
                         }
-                        Ok::<(), Error>(())
+                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
                     };
                     let write_clone3 = Arc::clone(&write_arc);
                     let ws_to_stdout = async {
@@ -97,18 +93,9 @@ impl AbstractWsMode {
                             let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
                             // let response_data = func(message);
                             if response_data.is_some() {
-                                let data = response_data.unwrap();
-
-                                if data.code == "200" {
-                                    let r = read1.clone();
-                                    let read = r.lock().await;
-
-                                    let mut data_c = data.clone();
-                                    data_c.label = lable.clone();
-                                    data_c.time = chrono::Utc::now().timestamp_micros();
-                                    read.unbounded_send(data_c).unwrap();
-                                }
-
+                                let mut data = response_data.unwrap();
+                                data.label = lable.clone();
+                                data.time = chrono::Utc::now().timestamp_micros();
                                 let code = data.code.clone();
                                 /*
                                     200 -正确返回
@@ -120,6 +107,7 @@ impl AbstractWsMode {
                                 */
                                 match code.as_str() {
                                     "200" => {
+                                        read_tx.unbounded_send(data).unwrap();
                                     }
                                     "-200" => {
                                         //登录成功
@@ -157,27 +145,9 @@ impl AbstractWsMode {
                                 }
                             }
                         }
-                        Ok::<(), Error>(())
+                        Ok::<(), tokio_tungstenite::tungstenite::Error>(())
                     };
 
-                    // // 防止 cpu 休眠。
-                    // let read2 = read_arc.clone();
-                    // spawn(async move {
-                    //     let t_str = "t".to_string();
-                    //     let response = ResponseData::new(t_str.clone(),
-                    //                               t_str.clone(),
-                    //                               t_str.clone(),
-                    //                               t_str.clone());
-                    //     loop {
-                    //         tokio::time::sleep(Duration::from_micros(1)).await;
-                    //
-                    //         let t = response.clone();
-                    //         let r = read2.clone();
-                    //         let read = r.lock().await;
-                    //         read.unbounded_send(t.clone()).unwrap();
-                    //     }
-                    // });
-
                     //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
                     pin_mut!(stdin_to_ws, ws_to_stdout,);
                     future::select(stdin_to_ws, ws_to_stdout).await;
@@ -396,7 +366,7 @@ pub async fn client(add_url: String) {
     //创建通道 开启线程,向通道写入数据
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
     let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
-    spawn(write_sell(write_tx));
+    tokio::spawn(write_sell(write_tx));
 
 
     //创建socket,,并且读写分离

+ 9 - 2
standard/src/bybit_swap.rs

@@ -413,12 +413,17 @@ impl Platform for BybitSwap {
     async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
         let symbol_upper = symbol.replace("_", "").trim().to_uppercase();
         let size = (amount / ct_val).floor();
+        let order_type = if price == Decimal::ZERO {
+            "Market"
+        } else {
+            "Limit"
+        };
         let mut params = json!({
             "orderLinkId": format!("t-{}", custom_id),
             "symbol": symbol_upper,
             "price": price.to_string(),
             "category": "linear",
-            "orderType":"Limit",
+            "orderType": order_type,
             "qty": json!(size),
             // 0.單向持倉 1.買側雙向持倉 2.賣側雙向持倉
             "positionIdx": json!(1),
@@ -577,7 +582,7 @@ impl Platform for BybitSwap {
                                 result_sd.send(order).await.unwrap();
                             }
                             Err(_err) => {
-                                // error!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
+                                error!("bybit:撤单失败,而且查单也失败了,oid={}, cid={}。", order_id.clone(), custom_id.clone());
                                 // panic!("撤单失败,而且查单也失败了,gate_io_swap,oid={}, cid={}。", order_id.clone(), custom_id.clone());
                             }
                         }
@@ -617,6 +622,8 @@ impl Platform for BybitSwap {
                         result_sd.send(result).await.unwrap();
                     }
                     Err(error) => {
+                        error!("bybit:下单失败:{:?}", error);
+
                         let mut err_order = Order::new();
                         err_order.custom_id = cid.clone();
                         err_order.status = "REMOVE".to_string();

+ 6 - 1
strategy/src/quant.rs

@@ -1358,7 +1358,12 @@ impl Quant {
                     let mut ts = TraceStack::default();
                     ts.on_before_send();
                     // 市价单
-                    match self.platform_rest.take_order_symbol(position.symbol.clone(), Decimal::ONE, utils::generate_client_id(None).as_str(), side, price, position.amount.abs()).await {
+                    match self.platform_rest.take_order_symbol(position.symbol.clone(),
+                                                               Decimal::ONE,
+                                                               utils::generate_client_id(None).as_str(),
+                                                               side,
+                                                               price,
+                                                               position.amount.abs()).await {
                         Ok(order) => {
                             ts.on_after_send();
                             info!("    {}仓位清除市价下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());