quant_libs.rs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. use strategy::quant::Quant;
  2. use std::collections::BTreeMap;
  3. use std::io::Error;
  4. use strategy::{exchange_disguise, quant};
  5. use std::sync::Arc;
  6. use std::sync::atomic::{AtomicBool};
  7. use std::time::Duration;
  8. use chrono::Utc;
  9. use tokio::sync::{mpsc, Mutex};
  10. use tracing::{error, info};
  11. use global::cci::CentralControlInfo;
  12. use global::params::Params;
  13. use global::trace_stack::TraceStack;
  14. use standard::Order;
  15. use strategy::model::OrderInfo;
  16. pub async fn init(params: Params,
  17. ws_running: Arc<AtomicBool>,
  18. running: Arc<AtomicBool>,
  19. cci_arc: Arc<Mutex<CentralControlInfo>>) -> Arc<Mutex<Quant>> {
  20. // 封装
  21. let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
  22. exchange_params.insert("access_key".to_string(), params.access_key.clone());
  23. exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
  24. exchange_params.insert("pass_key".to_string(), params.pass_key.clone());
  25. let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
  26. let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
  27. let mut quant_obj = Quant::new(params.exchange.clone(),
  28. params.clone(),
  29. exchange_params.clone(),
  30. order_sender.clone(),
  31. error_sender.clone(),
  32. running.clone(),
  33. cci_arc.clone()).await;
  34. let ref_name = quant_obj.ref_name[0].clone();
  35. let trade_name = quant_obj.trade_name.clone();
  36. info!("quant初始化……");
  37. quant_obj.before_trade().await;
  38. let quant_arc = Arc::new(Mutex::new(quant_obj));
  39. // 参考交易所
  40. exchange_disguise::run_reference_exchange(ws_running.clone(),
  41. params.ref_exchange.get(0).unwrap().clone(),
  42. quant_arc.clone(),
  43. ref_name,
  44. params.ref_pair.clone(),
  45. params.colo != 0i8,
  46. exchange_params.clone()).await;
  47. // 交易交易所
  48. exchange_disguise::run_transactional_exchange(ws_running.clone(),
  49. params.exchange,
  50. quant_arc.clone(),
  51. trade_name,
  52. vec![params.pair.clone()],
  53. params.colo != 0i8,
  54. exchange_params.clone()).await;
  55. // 启动定期触发的系统逻辑
  56. quant::on_timer(quant_arc.clone());
  57. // 启动策略逻辑
  58. quant::run_strategy(quant_arc.clone());
  59. info!("quant初始化完成。");
  60. let order_handler_quant_arc = quant_arc.clone();
  61. tokio::spawn(async move {
  62. loop {
  63. tokio::time::sleep(Duration::from_millis(1)).await;
  64. match order_receiver.recv().await {
  65. Some(order) => {
  66. // 刚下的订单有可能会成交,所以有几率触发后续订单逻辑,所以这里也是一个订单触发逻辑
  67. let mut trace_stack = TraceStack::default();
  68. trace_stack.on_after_network(Utc::now().timestamp_micros());
  69. trace_stack.on_before_quant();
  70. if order.status != "NULL" {
  71. trace_stack.on_before_format();
  72. let mut quant = order_handler_quant_arc.lock().await;
  73. // let mut delay_time_lock_instance = delay_time_lock.lock().await;
  74. let order_info = OrderInfo {
  75. symbol: "".to_string(),
  76. amount: order.amount.abs(),
  77. side: "".to_string(),
  78. price: order.price,
  79. client_id: order.custom_id,
  80. filled_price: order.avg_price,
  81. filled: order.deal_amount.abs(),
  82. order_id: order.id,
  83. local_time: 0,
  84. create_time: 0,
  85. status: order.status,
  86. fee: Default::default(),
  87. trace_stack: order.trace_stack.clone(),
  88. };
  89. trace_stack.on_after_format();
  90. quant.update_local_order(order_info.clone(), trace_stack);
  91. }
  92. },
  93. None => {
  94. error!("Order channel has been closed!");
  95. }
  96. }
  97. }
  98. });
  99. let _error_handler_quant_arc = quant_arc.clone();
  100. tokio::spawn(async move {
  101. loop {
  102. tokio::time::sleep(Duration::from_millis(1)).await;
  103. match error_receiver.recv().await {
  104. Some(_error) => {
  105. // let mut quant = _error_handler_quant_arc.lock().await;
  106. // error!("main: 订单出现错误{:?}", _error);
  107. // quant.strategy._print_summary();
  108. },
  109. None => {
  110. error!("Error channel has been closed!");
  111. }
  112. }
  113. }
  114. });
  115. return quant_arc;
  116. }