framework_3_0_test.rs 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. use std::future::Future;
  2. use std::time::{Duration, Instant};
  3. use exchanges::response_base::ResponseData;
  4. use std::sync::Arc;
  5. use tokio::sync::Mutex;
  6. use tracing::info;
  7. use global::log_utils::init_log_with_info;
  8. // {"clientOid": "1453351218bitget", "force": "gtc", "marginCoin": "USDT", "marginMode": "crossed", "orderType": "limit","price": "0.61810", "productType": "USDT-FUTURES", "reduceOnly": "NO", "side": "sell", "size": "17","symbol": "MOODENGUSDT"}
  9. // {"clientOid": "1439414436bitget", "force": "gtc", "marginCoin": "USDT", "marginMode": "crossed", "orderType": "limit","price": "0.61253", "productType": "USDT-FUTURES", "reduceOnly": "NO", "side": "buy", "size": "17","symbol": "MOODENGUSDT"}
  10. struct Core {
  11. pub max_delay: u128,
  12. }
  13. #[tokio::test]
  14. async fn framework_3_0() {
  15. init_log_with_info();
  16. let core = Arc::new(Mutex::new(Core { max_delay: 0 }));
  17. let handler_am = Arc::new(Mutex::new(TestHandler { core_am: core }));
  18. // 一个闭包解决了引用问题。
  19. generator(|data| {
  20. let handler_c = handler_am.clone();
  21. async move {
  22. let mut handler = handler_c.lock().await;
  23. handler.on_data(data).await
  24. }
  25. }).await;
  26. }
  27. // 消息创造者
  28. async fn generator<F, Fut>(handle_function: F)
  29. where
  30. F: Fn(ResponseData) -> Fut,
  31. Fut: Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  32. {
  33. let data = ResponseData::new("aaa".to_string(),
  34. "code".to_string(),
  35. "msg".to_string(),
  36. "data".to_string());
  37. // let mut c = 0;
  38. loop {
  39. let mut s_data = data.clone();
  40. s_data.time = chrono::Utc::now().timestamp_micros();
  41. s_data.res_time = Instant::now();
  42. handle_function(s_data).await;
  43. tokio::time::sleep(Duration::from_micros(10)).await;
  44. }
  45. }
  46. struct TestHandler {
  47. pub core_am: Arc<Mutex<Core>>,
  48. }
  49. impl TestHandler {
  50. // 消息处理者
  51. pub async fn on_data(&mut self, data: ResponseData) {
  52. let mut core = self.core_am.lock().await;
  53. let delay_nanos = data.res_time.elapsed().as_nanos();
  54. if delay_nanos > core.max_delay {
  55. core.max_delay = delay_nanos;
  56. }
  57. // 处理响应数据
  58. info!("delay nanos: {}, max delay is: {}", delay_nanos, core.max_delay);
  59. }
  60. }