Forráskód Böngészése

订阅更新,新增:INIT状态,用于等待数据初始化完成

skyfffire 2 hete
szülő
commit
9ede546f44

+ 2 - 4
src/data_manager.rs

@@ -52,8 +52,6 @@ impl DataManager {
     pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> {
         // 1. 预解析为通用的 Value
         let v = response.data.clone();
-        
-        // info!("准备分发的消息:{}, {}", serde_json::to_string_pretty(&v)?, response.label);
 
         // 2. 获取 topic_info 字段用于路由消息,在该策略中extended可以用label
         let topic_info = &response.label;
@@ -63,9 +61,9 @@ impl DataManager {
             self.process_best_prices(&v).await?;            
         } else if topic_info.contains("spot@public.aggre.depth.v3.api.pb") {
             
-        } else {
+        } else {            
             // 如果是未知的 topic,返回一个错误
-            bail!("Received a message with an unknown topic_info: {}", topic_info);
+            bail!("Received a message with an unknown topic_info: {}, value: \n {}", topic_info, serde_json::to_string_pretty(&v)?);
         }
 
         Ok(())

+ 1 - 1
src/exchange/extended_rest_client.rs

@@ -431,7 +431,7 @@ impl ExtendedRestClient {
             headers,
         ).await;
 
-        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        let time_array = Utc::now().timestamp_millis() - start_time;
         self.delays.push(time_array);
         self.get_delay_info();
 

+ 15 - 8
src/exchange/extended_stream_client.rs

@@ -60,6 +60,10 @@ impl ExtendedStreamClient {
         Self::new(label, account_option, format!("orderbooks/{}?depth=1", symbol), is_testnet)
     }
 
+    pub fn account(label: &str, account_option: Option<ExtendedAccount>, is_testnet: bool) -> ExtendedStreamClient {
+        Self::new(label, account_option, "account".to_string(), is_testnet)
+    }
+
     // 链接
     pub async fn ws_connect_async<F, Future>(&mut self,
                                              is_shutdown_arc: Arc<AtomicBool>,
@@ -81,10 +85,6 @@ impl ExtendedStreamClient {
             StreamUtils::ping_pong(write_tx_clone1, HeartbeatType::Custom(ping_obj.to_string()), heartbeat_time).await;
         });
 
-        if self.account_option.is_some() {
-            // 登录相关
-        }
-
         // 提取host
         let parsed_uri: http::Uri = address_url.parse()?;
         let host_domain = parsed_uri.host().ok_or("URI 缺少主机名").unwrap().to_string();
@@ -100,13 +100,16 @@ impl ExtendedStreamClient {
             host_domain.to_string() // 没有端口或使用默认端口
         };
 
+        // 提前克隆需要的数据
+        let api_key_option = self.account_option.as_ref().map(|account| account.api_key.clone());
+
         // 链接
         let t2 = tokio::spawn(async move {
             let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
 
             loop {
                 // 通过构建request的方式进行ws链接,可以携带header
-                let request = Request::builder()
+                let mut request_builder = Request::builder()
                     .method("GET")
                     .uri(&address_url)
                     .header("Sec-WebSocket-Key", generate_key())
@@ -114,9 +117,13 @@ impl ExtendedStreamClient {
                     .header("Host", host_header_value.clone())
                     .header("User-Agent", "RustClient/1.0")
                     .header("Upgrade", "websocket")
-                    .header("Connection", "Upgrade")
-                    .body(())
-                    .unwrap();
+                    .header("Connection", "Upgrade");
+
+                if let Some(ref api_key) = api_key_option {
+                    request_builder = request_builder.header("X-Api-Key", api_key.clone());
+                }
+
+                let request = request_builder.body(()).unwrap();
 
                 trace!("Extended_usdt_swap socket 连接中……");
                 StreamUtils::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), request,

+ 3 - 2
src/main.rs

@@ -105,7 +105,8 @@ pub async fn run_extended_subscriptions(running: Arc<AtomicBool>) -> Result<()>
 
     // 订阅数据的客户端
     let stream_client_list = vec![
-        ExtendedStreamClient::best_prices(format!("ExtendedBestPrices_{}", market).as_str(), None, market, is_testnet)
+        ExtendedStreamClient::best_prices(format!("ExtendedBestPrices_{}", market).as_str(), None, market, is_testnet),
+        ExtendedStreamClient::account("ExtendedAccount", Some(account.clone()), is_testnet),
     ];
     
     // rest客户端
@@ -136,7 +137,7 @@ pub async fn run_extended_subscriptions(running: Arc<AtomicBool>) -> Result<()>
             let sm_clone = Arc::clone(&sm);
             async move {
                 // 数据不新鲜直接跳过
-                if response.reach_time - response.received_time > 500 {
+                if response.reach_time - response.received_time > 100 {
                     return
                 }
                 

+ 19 - 1
src/strategy.rs

@@ -14,6 +14,7 @@ use crate::utils::response::Response;
 
 #[derive(Debug, Clone, PartialEq)]
 pub enum StrategyState {
+    Init,                           // 初始化状态,等待收集行情数据
     Idle,                           // 空闲状态,准备下单
     WaitingLimitOrder {             // 等待限价单成交
         order_id: String,
@@ -47,7 +48,7 @@ pub struct Strategy {
 impl Strategy {
     pub fn new(client_am: Arc<Mutex<ExtendedRestClient>>) -> Strategy {
         Strategy {
-            state: StrategyState::Idle,
+            state: StrategyState::Init,
             order_quantity: dec!(0.0001),
             filled_quantity: Decimal::ZERO,
 
@@ -63,6 +64,11 @@ impl Strategy {
 
     pub async fn do_strategy(&mut self, dm: &DataManager) -> Result<()> {
         match self.state.clone() {
+            StrategyState::Init => {
+                if let Err(e) = self.handle_init_state(dm).await {
+                    warn!("行情数据初始化失败: {}", e);
+                }
+            }
             StrategyState::Idle => {
                 if let Err(e) = self.handle_idle_state(dm).await {
                     warn!("空闲状态处理失败: {}", e);
@@ -93,6 +99,18 @@ impl Strategy {
         Ok(())
     }
 
+    // 状态0: 等待行情数据准备完毕
+    async fn handle_init_state(&mut self, dm: &DataManager) -> Result<()> {
+        // 最佳价格检测
+        if dm.best_bid.is_zero() || dm.best_ask.is_zero() {
+            return Ok(());
+        }
+        
+        // 到这里了就准备完毕了
+        self.state = StrategyState::Idle;
+        Ok(())
+    }
+
     // 状态1: 空闲状态,下限价买单
     async fn handle_idle_state(&mut self, dm: &DataManager) -> Result<()> {
         info!("============================ 进入空闲状态,准备下买单 ===========================");