Browse Source

在windows上试试

skyfffire 1 year ago
parent
commit
c7c800f0fa
3 changed files with 67 additions and 45 deletions
  1. 31 26
      exchanges/src/socket_tool.rs
  2. 22 8
      strategy/src/binance_usdt_swap.rs
  3. 14 11
      strategy/src/gate_swap.rs

+ 31 - 26
exchanges/src/socket_tool.rs

@@ -104,38 +104,43 @@ impl AbstractWsMode {
                                     let read = r.lock().await;
 
                                     let mut data_c = data.clone();
-                                    data_c.label = lable.clone();
-                                    data_c.time = chrono::Utc::now().timestamp_micros();
 
+                                    let mut is_send = false;
                                     if data_c.label.contains("gate_usdt_swap") {
                                         if data_c.channel == "futures.order_book" {
                                             if read.len() == 0 {
-                                                read.unbounded_send(data_c).unwrap();
+                                                is_send = true;
                                             }
                                         } else {
-                                            read.unbounded_send(data_c).unwrap();
+                                            is_send = true;
                                         }
                                     } else if data_c.label.contains("binance_usdt_swap") {
                                         if data_c.channel == "bookTicker" {
                                             if read.len() == 0 {
-                                                read.unbounded_send(data_c).unwrap();
+                                                is_send = true;
                                             }
                                         } else {
-                                            read.unbounded_send(data_c).unwrap();
+                                            is_send = true;
                                         }
                                     } else if data_c.label.contains("bybit_usdt_swap") {
                                         if data_c.channel == "orderbook" {
                                             if read.len() == 0 {
-                                                read.unbounded_send(data_c).unwrap();
+                                                is_send = true;
                                             }
                                         } else {
-                                            read.unbounded_send(data_c).unwrap();
+                                            is_send = true;
                                         }
                                     } else {
                                         if read.len() == 0 {
-                                            read.unbounded_send(data_c).unwrap();
+                                            is_send = true;
                                         }
                                     }
+
+                                    if is_send {
+                                        data_c.label = lable.clone();
+                                        data_c.time = chrono::Utc::now().timestamp_micros();
+                                        read.unbounded_send(data_c).unwrap();
+                                    }
                                 }
 
                                 let code = data.code.clone();
@@ -189,23 +194,23 @@ impl AbstractWsMode {
                         Ok::<(), Error>(())
                     };
 
-                    // 防止 cpu 休眠。
-                    let read2 = read_arc.clone();
-                    spawn(async move {
-                        let t_str = "t".to_string();
-                        let response = ResponseData::new(t_str.clone(),
-                                                  t_str.clone(),
-                                                  t_str.clone(),
-                                                  t_str.clone());
-                        loop {
-                            tokio::time::sleep(Duration::from_micros(1)).await;
-
-                            let t = response.clone();
-                            let r = read2.clone();
-                            let read = r.lock().await;
-                            read.unbounded_send(t.clone()).unwrap();
-                        }
-                    });
+                    // // 防止 cpu 休眠。
+                    // let read2 = read_arc.clone();
+                    // spawn(async move {
+                    //     let t_str = "t".to_string();
+                    //     let response = ResponseData::new(t_str.clone(),
+                    //                               t_str.clone(),
+                    //                               t_str.clone(),
+                    //                               t_str.clone());
+                    //     loop {
+                    //         tokio::time::sleep(Duration::from_micros(1)).await;
+                    //
+                    //         let t = response.clone();
+                    //         let r = read2.clone();
+                    //         let read = r.lock().await;
+                    //         read.unbounded_send(t.clone()).unwrap();
+                    //     }
+                    // });
 
                     //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
                     pin_mut!(stdin_to_ws, ws_to_stdout,);

+ 22 - 8
strategy/src/binance_usdt_swap.rs

@@ -43,7 +43,21 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
 
             loop {
                 if let Some(data) = read_rx.next().await {
-                    on_data(bot_arc_clone.clone(), &mut update_flag_u, &mut max_buy, &mut min_sell, data).await;
+                    let mut trace_stack = TraceStack::default();
+                    trace_stack.on_after_network(data.time);
+                    trace_stack.on_before_unlock_quant();
+
+                    if data.time != 0 {
+                        println!("bian>{}", trace_stack.to_string());
+                    }
+
+                    // let mut trace_stack = TraceStack::default();
+                    // trace_stack.on_after_network(data.time);
+                    // trace_stack.on_before_unlock_quant();
+                    //
+                    // if data.time != 0 {
+                    //     info!("bian>{}", trace_stack.to_string());
+                    // }
                 }
             }
         });
@@ -59,13 +73,13 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  _max_buy: &mut Decimal,
                  _min_sell: &mut Decimal,
                  data: ResponseData) {
-    let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_quant();
-
-    if data.time != 0 {
-        info!("bian>{}", trace_stack.to_string());
-    }
+    // let mut trace_stack = TraceStack::default();
+    // trace_stack.on_after_network(data.time);
+    // trace_stack.on_before_unlock_quant();
+    //
+    // if data.time != 0 {
+    //     info!("bian>{}", trace_stack.to_string());
+    // }
 
     // if data.code != "200".to_string() {
     //     return;

+ 14 - 11
strategy/src/gate_swap.rs

@@ -81,13 +81,20 @@ pub async fn gate_swap_run(bool_v1: Arc<AtomicBool>,
 
         loop {
             if let Some(data) = read_rx.next().await {
-                on_data(bot_arc_clone.clone(),
-                        &mut update_flag_u,
-                        multiplier,
-                        run_symbol.clone(),
-                        &mut max_buy,
-                        &mut min_sell,
-                        data).await;
+                let mut trace_stack = TraceStack::default();
+                trace_stack.on_after_network(data.time);
+                trace_stack.on_before_unlock_quant();
+
+                if data.time != 0 {
+                    println!("bian>{}", trace_stack.to_string());
+                }
+                // on_data(bot_arc_clone.clone(),
+                //         &mut update_flag_u,
+                //         multiplier,
+                //         run_symbol.clone(),
+                //         &mut max_buy,
+                //         &mut min_sell,
+                //         data).await;
             }
         }
     });
@@ -104,10 +111,6 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
     trace_stack.on_after_network(data.time);
     trace_stack.on_before_unlock_quant();
 
-    if data.time != 0 {
-        info!("bian>{}", trace_stack.to_string());
-    }
-
     // if data.code != "200".to_string() {
     //     return;
     // }