core_libs.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. use strategy::core::Core;
  2. use std::collections::BTreeMap;
  3. use std::io::Error;
  4. use strategy::{exchange_disguise, core};
  5. use std::sync::Arc;
  6. use std::sync::atomic::{AtomicBool};
  7. use std::time::Duration;
  8. use chrono::Utc;
  9. use tokio::sync::{mpsc, Mutex};
  10. use tokio::time::Instant;
  11. use tracing::{error, info};
  12. use global::cci::CentralControlInfo;
  13. use global::params::Params;
  14. use global::trace_stack::TraceStack;
  15. use standard::{Order, PriceOrder};
  16. use strategy::model::OrderInfo;
  17. pub async fn init(params: Params,
  18. ws_running: Arc<AtomicBool>,
  19. running: Arc<AtomicBool>,
  20. cci_arc: Arc<Mutex<CentralControlInfo>>) -> Arc<Mutex<Core>> {
  21. // 封装
  22. let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
  23. exchange_params.insert("access_key".to_string(), params.access_key.clone());
  24. exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
  25. exchange_params.insert("pass_key".to_string(), params.pass_key.clone());
  26. let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
  27. let (price_order_sender, mut price_order_receiver) = mpsc::channel::<PriceOrder>(100);
  28. let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
  29. let mut core_obj = Core::new(params.exchange.clone(),
  30. params.clone(),
  31. exchange_params.clone(),
  32. order_sender.clone(),
  33. price_order_sender.clone(),
  34. error_sender.clone(),
  35. running.clone(),
  36. cci_arc.clone()).await;
  37. let ref_name = core_obj.ref_name[0].clone();
  38. let trade_name = core_obj.trade_name.clone();
  39. info!("core初始化……");
  40. core_obj.before_trade().await;
  41. let core_arc = Arc::new(Mutex::new(core_obj));
  42. // 参考交易所
  43. exchange_disguise::run_reference_exchange(ws_running.clone(),
  44. params.ref_exchange.get(0).unwrap().clone(),
  45. core_arc.clone(),
  46. ref_name,
  47. params.ref_pair.clone(),
  48. params.colo != 0i8,
  49. exchange_params.clone()).await;
  50. // 交易交易所
  51. exchange_disguise::run_transactional_exchange(ws_running.clone(),
  52. params.exchange,
  53. core_arc.clone(),
  54. trade_name,
  55. vec![params.pair.clone()],
  56. params.colo != 0i8,
  57. exchange_params.clone()).await;
  58. // 启动定期触发的系统逻辑
  59. core::on_timer(core_arc.clone());
  60. // 启动策略逻辑
  61. core::run_strategy(core_arc.clone());
  62. info!("core初始化完成。");
  63. let order_handler_core_arc = core_arc.clone();
  64. tokio::spawn(async move {
  65. loop {
  66. tokio::time::sleep(Duration::from_millis(1)).await;
  67. match order_receiver.recv().await {
  68. Some(order) => {
  69. // 刚下的订单有可能会成交,所以有几率触发后续订单逻辑,所以这里也是一个订单触发逻辑
  70. let trace_stack = TraceStack::new(Utc::now().timestamp_micros(), Instant::now());
  71. if order.status != "NULL" {
  72. // trace_stack.on_before_format();
  73. let mut core = order_handler_core_arc.lock().await;
  74. // let mut delay_time_lock_instance = delay_time_lock.lock().await;
  75. let order_info = OrderInfo {
  76. symbol: "".to_string(),
  77. amount: order.amount.abs(),
  78. side: "".to_string(),
  79. price: order.price,
  80. client_id: order.custom_id,
  81. filled_price: order.avg_price,
  82. filled: order.deal_amount.abs(),
  83. order_id: order.id,
  84. local_time: 0,
  85. create_time: 0,
  86. status: order.status,
  87. fee: Default::default(),
  88. trace_stack: order.trace_stack.clone(),
  89. };
  90. // trace_stack.on_after_format();
  91. core.update_local_order(order_info.clone(), trace_stack).await;
  92. }
  93. },
  94. None => {
  95. error!("Order channel has been closed!");
  96. break
  97. }
  98. }
  99. }
  100. });
  101. let price_order_handler_core_arc = core_arc.clone();
  102. tokio::spawn(async move {
  103. loop {
  104. tokio::time::sleep(Duration::from_millis(1)).await;
  105. match price_order_receiver.recv().await {
  106. Some(price_order) => {
  107. let mut core = price_order_handler_core_arc.lock().await;
  108. core.update_local_price_order(price_order).await;
  109. },
  110. None => {
  111. error!("Price order channel has been closed!");
  112. break
  113. }
  114. }
  115. }
  116. });
  117. let _error_handler_core_arc = core_arc.clone();
  118. tokio::spawn(async move {
  119. loop {
  120. tokio::time::sleep(Duration::from_millis(1)).await;
  121. match error_receiver.recv().await {
  122. Some(_error) => {
  123. // let mut core = _error_handler_core_arc.lock().await;
  124. // error!("main: 订单出现错误{:?}", _error);
  125. // core.strategy._print_summary();
  126. },
  127. None => {
  128. error!("Error channel has been closed!");
  129. break
  130. }
  131. }
  132. }
  133. });
  134. return core_arc;
  135. }