use std::future::Future; use std::time::{Duration, Instant}; use exchanges::response_base::ResponseData; use std::sync::Arc; use tokio::sync::Mutex; use tracing::info; use global::log_utils::init_log_with_info; // {"clientOid": "1453351218bitget", "force": "gtc", "marginCoin": "USDT", "marginMode": "crossed", "orderType": "limit","price": "0.61810", "productType": "USDT-FUTURES", "reduceOnly": "NO", "side": "sell", "size": "17","symbol": "MOODENGUSDT"} // {"clientOid": "1439414436bitget", "force": "gtc", "marginCoin": "USDT", "marginMode": "crossed", "orderType": "limit","price": "0.61253", "productType": "USDT-FUTURES", "reduceOnly": "NO", "side": "buy", "size": "17","symbol": "MOODENGUSDT"} struct Core { pub max_delay: u128, } #[tokio::test] async fn framework_3_0() { init_log_with_info(); let core = Arc::new(Mutex::new(Core { max_delay: 0 })); let handler_am = Arc::new(Mutex::new(TestHandler { core_am: core })); // 一个闭包解决了引用问题。 generator(|data| { let handler_c = handler_am.clone(); async move { let mut handler = handler_c.lock().await; handler.on_data(data).await } }).await; } // 消息创造者 async fn generator(handle_function: F) where F: Fn(ResponseData) -> Fut, Fut: Future + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 () { let data = ResponseData::new("aaa".to_string(), "code".to_string(), "msg".to_string(), "data".to_string()); // let mut c = 0; loop { let mut s_data = data.clone(); s_data.time = chrono::Utc::now().timestamp_micros(); s_data.res_time = Instant::now(); handle_function(s_data).await; tokio::time::sleep(Duration::from_micros(10)).await; } } struct TestHandler { pub core_am: Arc>, } impl TestHandler { // 消息处理者 pub async fn on_data(&mut self, data: ResponseData) { let mut core = self.core_am.lock().await; let delay_nanos = data.res_time.elapsed().as_nanos(); if delay_nanos > core.max_delay { core.max_delay = delay_nanos; } // 处理响应数据 info!("delay nanos: {}, max delay is: {}", delay_nanos, core.max_delay); } }