core_libs.rs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. use strategy::core::Core;
  2. use std::collections::BTreeMap;
  3. use std::io::Error;
  4. use strategy::{exchange_disguise, core};
  5. use std::sync::Arc;
  6. use std::sync::atomic::{AtomicBool};
  7. use std::time::Duration;
  8. use chrono::{Datelike, FixedOffset, Local, TimeZone, Utc};
  9. use tokio::sync::{mpsc, Mutex};
  10. use tokio::time::{sleep_until, Instant};
  11. use tracing::{error, info};
  12. use global::cci::CentralControlInfo;
  13. use global::params::Params;
  14. use global::trace_stack::TraceStack;
  15. use standard::Order;
  16. use strategy::model::OrderInfo;
  17. pub async fn init(params: Params,
  18. ws_running: Arc<AtomicBool>,
  19. running: Arc<AtomicBool>,
  20. cci_arc: Arc<Mutex<CentralControlInfo>>) -> Arc<Mutex<Core>> {
  21. // 封装
  22. let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
  23. exchange_params.insert("access_key".to_string(), params.access_key.clone());
  24. exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
  25. exchange_params.insert("pass_key".to_string(), params.pass_key.clone());
  26. let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
  27. let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
  28. let mut core_obj = Core::new(params.exchange.clone(),
  29. params.clone(),
  30. exchange_params.clone(),
  31. order_sender.clone(),
  32. error_sender.clone(),
  33. running.clone(),
  34. cci_arc.clone()).await;
  35. let ref_name = core_obj.ref_name[0].clone();
  36. let trade_name = core_obj.trade_name.clone();
  37. info!("core初始化……");
  38. core_obj.before_trade().await;
  39. let core_arc = Arc::new(Mutex::new(core_obj));
  40. // 参考交易所
  41. exchange_disguise::run_reference_exchange(ws_running.clone(),
  42. params.ref_exchange.get(0).unwrap().clone(),
  43. core_arc.clone(),
  44. ref_name,
  45. params.ref_pair.clone(),
  46. params.colo != 0i8,
  47. exchange_params.clone()).await;
  48. // 交易交易所
  49. exchange_disguise::run_transactional_exchange(ws_running.clone(),
  50. params.exchange,
  51. core_arc.clone(),
  52. trade_name,
  53. vec![params.pair.clone()],
  54. params.colo != 0i8,
  55. exchange_params.clone()).await;
  56. // 启动定期触发的系统逻辑
  57. core::on_timer(core_arc.clone());
  58. // 启动策略逻辑
  59. core::run_strategy(core_arc.clone());
  60. info!("core初始化完成。");
  61. let order_handler_core_arc = core_arc.clone();
  62. tokio::spawn(async move {
  63. loop {
  64. tokio::time::sleep(Duration::from_millis(1)).await;
  65. match order_receiver.recv().await {
  66. Some(order) => {
  67. // 刚下的订单有可能会成交,所以有几率触发后续订单逻辑,所以这里也是一个订单触发逻辑
  68. let trace_stack = TraceStack::new(Utc::now().timestamp_micros(), Instant::now());
  69. if order.status != "NULL" {
  70. // trace_stack.on_before_format();
  71. let mut core = order_handler_core_arc.lock().await;
  72. // let mut delay_time_lock_instance = delay_time_lock.lock().await;
  73. let order_info = OrderInfo {
  74. symbol: "".to_string(),
  75. amount: order.amount.abs(),
  76. side: "".to_string(),
  77. price: order.price,
  78. client_id: order.custom_id,
  79. filled_price: order.avg_price,
  80. filled: order.deal_amount.abs(),
  81. order_id: order.id,
  82. local_time: 0,
  83. create_time: 0,
  84. status: order.status,
  85. fee: Default::default(),
  86. trace_stack: order.trace_stack.clone(),
  87. };
  88. // trace_stack.on_after_format();
  89. core.update_local_order(order_info.clone(), trace_stack).await;
  90. }
  91. },
  92. None => {
  93. error!("Order channel has been closed!");
  94. }
  95. }
  96. }
  97. });
  98. let _error_handler_core_arc = core_arc.clone();
  99. tokio::spawn(async move {
  100. loop {
  101. tokio::time::sleep(Duration::from_millis(1)).await;
  102. match error_receiver.recv().await {
  103. Some(_error) => {
  104. // let mut core = _error_handler_core_arc.lock().await;
  105. // error!("main: 订单出现错误{:?}", _error);
  106. // core.strategy._print_summary();
  107. },
  108. None => {
  109. error!("Error channel has been closed!");
  110. }
  111. }
  112. }
  113. });
  114. // 定时交易量更新
  115. let trade_volume_core_arc = core_arc.clone();
  116. tokio::spawn(async move {
  117. info!("交易量统计定时任务启动(每天早上8:30)...");
  118. loop {
  119. // 定义北京时间偏移(UTC+8)
  120. let beijing_offset = FixedOffset::east_opt(8 * 3600).unwrap();
  121. // 获取当前时间并转换为北京时间
  122. let now = Local::now().with_timezone(&beijing_offset);
  123. // 计算今天的 8:30(北京时间)
  124. let mut target_time = beijing_offset.with_ymd_and_hms(now.year(), now.month(), now.day(), 8, 30, 0).unwrap();
  125. // 如果当前时间已经过了今天的 8:30,则目标时间调整为明天 8:30
  126. if now >= target_time {
  127. target_time = target_time + chrono::Duration::days(1);
  128. }
  129. info!("下一次交易量统计将在 {} 执行", target_time.timestamp());
  130. // 将目标时间换算为时间戳,计算剩余等待时间
  131. let duration_to_wait = (target_time.timestamp() - now.timestamp()) as u64;
  132. let target_instant = Instant::now() + Duration::from_secs(duration_to_wait);
  133. sleep_until(target_instant).await;
  134. // 防止过快循环,等待一分钟后再继续下一次循环
  135. // 更新近1天(昨8点~今8点)的交易量统计
  136. let mut core = trade_volume_core_arc.lock().await;
  137. core.update_trade_volume().await;
  138. // 等待一分钟以避免过快地循环
  139. sleep_until(Instant::now() + Duration::from_secs(5*60)).await;
  140. }
  141. });
  142. // 定时仓位检测
  143. // let markt_price_core_arc = core_arc.clone();
  144. // tokio::spawn(async move {
  145. // info!("rest仓位检测定时任务启动(5s)...");
  146. // loop {
  147. // tokio::time::sleep(Duration::from_secs(5)).await;
  148. //
  149. // let mut core = markt_price_core_arc.lock().await;
  150. // match core.platform_rest.get_positions().await {
  151. // Ok(pos) => {
  152. // if pos.len() > 0 {
  153. // core.update_position(pos).await;
  154. // }
  155. // },
  156. // Err(err) => {
  157. // error!("rest持仓数据获取异常 {}", err);
  158. // }
  159. // };
  160. // }
  161. // });
  162. return core_arc;
  163. }