framework_3_0_test.rs 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. use std::future::Future;
  2. use std::time::{Duration, Instant};
  3. use exchanges::response_base::ResponseData;
  4. use std::sync::Arc;
  5. use tokio::sync::Mutex;
  6. use tracing::info;
  7. use global::log_utils::init_log_with_info;
  8. struct Core {
  9. pub max_delay: u128
  10. }
  11. #[tokio::test]
  12. async fn framework_3_0() {
  13. init_log_with_info();
  14. let core = Arc::new(Mutex::new(Core { max_delay: 0 }));
  15. let handler_am = Arc::new(Mutex::new(TestHandler { core_am: core }));
  16. // 一个闭包解决了引用问题。
  17. generator(|data| {
  18. let handler_c = handler_am.clone();
  19. async move {
  20. let mut handler = handler_c.lock().await;
  21. handler.on_data(data).await
  22. }
  23. }).await;
  24. }
  25. // 消息创造者
  26. async fn generator<F, Fut>(handle_function: F)
  27. where
  28. F: Fn(ResponseData) -> Fut,
  29. Fut: Future<Output = ()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  30. {
  31. let data = ResponseData::new("aaa".to_string(),
  32. "code".to_string(),
  33. "msg".to_string(),
  34. "data".to_string());
  35. // let mut c = 0;
  36. loop {
  37. let mut s_data = data.clone();
  38. s_data.time = chrono::Utc::now().timestamp_micros();
  39. s_data.res_time = Instant::now();
  40. handle_function(s_data).await;
  41. tokio::time::sleep(Duration::from_micros(10)).await;
  42. }
  43. }
  44. struct TestHandler {
  45. pub core_am: Arc<Mutex<Core>>
  46. }
  47. impl TestHandler {
  48. // 消息处理者
  49. pub async fn on_data(&mut self, data: ResponseData) {
  50. let mut core = self.core_am.lock().await;
  51. let delay_nanos = data.res_time.elapsed().as_nanos();
  52. if delay_nanos > core.max_delay {
  53. core.max_delay = delay_nanos;
  54. }
  55. // 处理响应数据
  56. info!("delay nanos: {}, max delay is: {}", delay_nanos, core.max_delay);
  57. }
  58. }