Browse Source

一些更新

skyffire 1 year ago
parent
commit
fb1a90a40a
2 changed files with 54 additions and 12 deletions
  1. 46 7
      exchanges/tests/bybit_swap_test.rs
  2. 8 5
      strategy/src/avellaneda_stoikov.rs

+ 46 - 7
exchanges/tests/bybit_swap_test.rs

@@ -7,6 +7,7 @@ use tracing::trace;
 
 use exchanges::bybit_swap_rest::BybitSwapRest;
 use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
+use exchanges::response_base::ResponseData;
 
 const ACCESS_KEY: &str = "";
 const SECRET_KEY: &str = "";
@@ -19,10 +20,10 @@ async fn ws_custom_subscribe_pu() {
 
 
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
 
 
-    let mut ws = get_ws(None, BybitSwapWsType::Public).await;
+    let mut ws = get_ws(None, BybitSwapWsType::Public);
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
         // BybitSwapSubscribeType::PuOrderBook1,
@@ -69,10 +70,28 @@ async fn ws_custom_subscribe_pu() {
     //     trace!("线程-数据写入-结束");
     // });
 
+
+    let fun = move |data: ResponseData| {
+        // 在 async 块之前克隆 Arc
+        // let core_arc_cc = core_arc_clone.clone();
+        // let mul = multiplier.clone();
+        //
+        // let depth_asks = Arc::clone(&depth_asks);
+        // let depth_bids = Arc::clone(&depth_bids);
+
+        async move {
+            trace!("333333333333333:ResponseData:{:?}",data);
+            // let mut depth_asks = depth_asks.lock().await;
+            // let mut depth_bids = depth_bids.lock().await;
+            // 使用克隆后的 Arc,避免 move 语义
+            // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
+        }
+    };
+
     let t1 = tokio::spawn(async move {
         //链接
         let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        ws.ws_connect_async(bool_v3_clone,fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         trace!("test 唯一线程结束--");
     });
     tokio::try_join!(t1).unwrap();
@@ -86,18 +105,19 @@ async fn ws_custom_subscribe_pu() {
 //ws-订阅私有频道信息
 #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
 async fn ws_custom_subscribe_pr() {
+
     global::log_utils::init_log_with_trace();
 
 
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
 
     let logparam = BybitSwapLogin {
         api_key: ACCESS_KEY.to_string(),
         secret_key: SECRET_KEY.to_string(),
     };
 
-    let mut ws = get_ws(Option::from(logparam), BybitSwapWsType::Private).await;
+    let mut ws = get_ws(Option::from(logparam), BybitSwapWsType::Private);
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
         BybitSwapSubscribeType::PrPosition,
@@ -144,10 +164,29 @@ async fn ws_custom_subscribe_pr() {
     //     trace!("线程-数据写入-结束");
     // });
 
+    let fun = move |data: ResponseData| {
+        // 在 async 块之前克隆 Arc
+        // let core_arc_cc = core_arc_clone.clone();
+        // let mul = multiplier.clone();
+        //
+        // let depth_asks = Arc::clone(&depth_asks);
+        // let depth_bids = Arc::clone(&depth_bids);
+
+        async move {
+            trace!("333333333333333:ResponseData:{:?}",data);
+            // let mut depth_asks = depth_asks.lock().await;
+            // let mut depth_bids = depth_bids.lock().await;
+            // 使用克隆后的 Arc,避免 move 语义
+            // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
+        }
+    };
+
     let t1 = tokio::spawn(async move {
         //链接
         let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        // ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        ws.ws_connect_async(bool_v3_clone,fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+
         trace!("test 唯一线程结束--");
     });
     tokio::try_join!(t1).unwrap();
@@ -305,7 +344,7 @@ async fn rest_cancel_orders_test() {
 }
 
 
-async fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
+ fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
     let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
     ku_ws
 }

+ 8 - 5
strategy/src/avellaneda_stoikov.rs

@@ -394,8 +394,11 @@ impl AvellanedaStoikov {
     }
 
     pub async fn update_inventory(&mut self, inventory: &Decimal, min_amount_value: &Decimal) {
-        self.prev_trade_time = Utc::now().timestamp_micros();
+        let prev_inventory = self.inventory;
         self.inventory = (inventory / (min_amount_value / self.mid_price)).round();
+        if prev_inventory != self.inventory {
+            self.prev_trade_time = Utc::now().timestamp_micros();
+        }
 
         self.update_level().await;
         self.processor().await;
@@ -456,11 +459,11 @@ impl AvellanedaStoikov {
             }
 
             if self.money_flow_diff > Decimal::ZERO {
-                self.ask_delta += self.base_delta;
-                self.bid_delta -= self.base_delta;
-            } else if self.money_flow_diff < Decimal::ZERO {
-                self.ask_delta -= self.base_delta;
+                self.ask_delta -= self.base_delta * (Decimal::TWO - self.t_diff);
                 self.bid_delta += self.base_delta;
+            } else if self.money_flow_diff < Decimal::ZERO {
+                self.ask_delta += self.base_delta;
+                self.bid_delta -= self.base_delta * (Decimal::TWO - self.t_diff);
             }
         }
     }