Explorar o código

数据到了策略层了。

skyfffire hai 2 semanas
pai
achega
487c0e0b51
Modificáronse 4 ficheiros con 23 adicións e 7 borrados
  1. 1 2
      src/data_manager.rs
  2. 14 1
      src/main.rs
  3. 7 3
      src/strategy.rs
  4. 1 1
      src/utils/log_setup.rs

+ 1 - 2
src/data_manager.rs

@@ -1,4 +1,3 @@
-use std::collections::{HashMap};
 use std::str::FromStr;
 use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
 use anyhow::{anyhow, bail, Result};
@@ -49,7 +48,7 @@ impl DataManager {
     }
     
     pub async fn dispatch_message(&mut self, response: &Response) -> Result<()> {
-        // 预解析为通用的 Value
+        // 1. 预解析为通用的 Value
         let v = response.data.clone();
         
         // info!("准备分发的消息:{}, {}", serde_json::to_string_pretty(&v)?, response.label);

+ 14 - 1
src/main.rs

@@ -3,7 +3,7 @@ mod exchange;
 mod strategy;
 mod data_manager;
 
-use anyhow::{anyhow, Result};
+use anyhow::{Result};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use backtrace::Backtrace;
@@ -14,6 +14,7 @@ use tracing::{error, info, warn};
 use utils::log_setup;
 use crate::data_manager::DataManager;
 use crate::exchange::extended_stream_client::ExtendedStreamClient;
+use crate::strategy::Strategy;
 use crate::utils::response::Response;
 
 #[tokio::main]
@@ -102,18 +103,24 @@ pub async fn run_extended_subscriptions(running: Arc<AtomicBool>) -> Result<()>
     let data_manager = DataManager::new();
     let data_manager_am = Arc::new(Mutex::new(data_manager));
 
+    // 策略执行
+    let strategy = Strategy::new();
+    let strategy_am = Arc::new(Mutex::new(strategy));
+
     // 异步去订阅、并阻塞
     for mut stream_client in stream_client_list {
         let running_clone = Arc::clone(&running);
 
         // 定义需要处理数据的fun
         let dm = data_manager_am.clone();
+        let sm = strategy_am.clone();
         let fun = move |response: Response| {
             if response.code != 200 {
                 error!("出现错误代码:{}", serde_json::to_string_pretty(&response.data).unwrap());
             }
 
             let dm_clone = Arc::clone(&dm);
+            let sm_clone = Arc::clone(&sm);
             async move {
                 let mut dm_guard = dm_clone.lock().await;
                 
@@ -124,6 +131,12 @@ pub async fn run_extended_subscriptions(running: Arc<AtomicBool>) -> Result<()>
                 if let Err(e) = dm_guard.dispatch_message(&response).await {
                     warn!("消息分发过程中出现错误: {}", e);
                 }
+
+                // 随后执行策略
+                let mut sm_guard = sm_clone.lock().await;
+                if let Err(e) = sm_guard.do_strategy(&dm_guard).await {
+                    warn!("策略执行过程中出现错误: {}", e);
+                }
             }
         };
 

+ 7 - 3
src/strategy.rs

@@ -1,7 +1,9 @@
 use serde_json::Value;
 use tracing::info;
+use anyhow::Result;
+use crate::data_manager::DataManager;
 
-struct Strategy {
+pub struct Strategy {
     
 }
 
@@ -10,7 +12,9 @@ impl Strategy {
         Strategy {}
     }
     
-    pub fn on_message(message: &Value) {
-        info!("{}", message);
+    pub async fn do_strategy(&mut self, data_manager: &DataManager) -> Result<()> {
+        
+
+        Ok(())
     }
 }

+ 1 - 1
src/utils/log_setup.rs

@@ -44,7 +44,7 @@ pub fn setup_logging() -> Result<Vec<WorkerGuard>, Box<dyn std::error::Error>> {
         std::fs::create_dir_all("logs")?;
         println!("Created 'logs' directory for log files.");
     }
-    let file_appender = rolling::daily("logs", "pin_trading_tool.log");
+    let file_appender = rolling::daily("logs", "extended.log");
     // 注意:现在我们将 guard 变量存储起来
     let (non_blocking_file_writer, guard_file) = tracing_appender::non_blocking(file_appender);
     guards.push(guard_file); // 将 guard 添加到列表中