main.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. mod server;
  2. use std::collections::BTreeMap;
  3. use std::io::Error;
  4. use std::str::FromStr;
  5. use std::sync::Arc;
  6. use std::time::Duration;
  7. use tokio::sync::{mpsc, Mutex};
  8. use tokio::try_join;
  9. use tracing::{error, info};
  10. use exchanges::gate_swap_rest::GateSwapRest;
  11. use standard::exchange::ExchangeEnum::GateSwap;
  12. use standard::Order;
  13. use strategy::model::OrderInfo;
  14. use strategy::params::Params;
  15. use strategy::{exchange_disguise, quant};
  16. use strategy::quant::Quant;
  17. #[tokio::main]
  18. async fn main() {
  19. // 获取本地配置
  20. let params = Params::new("config.toml").unwrap();
  21. // 日志级别配置
  22. let tracing_log_level = tracing::Level::from_str(params.log_level.as_str()).unwrap();
  23. global::log_utils::final_init(tracing_log_level);
  24. info!(?params);
  25. info!(?tracing_log_level);
  26. let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
  27. exchange_params.insert("access_key".to_string(), params.access_key.clone());
  28. exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
  29. let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
  30. let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
  31. let mut quant_obj = Quant::new(GateSwap, params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone()).await;
  32. let trade_name = quant_obj.trade_name.clone();
  33. let mut quant_arc = Arc::new(Mutex::new(quant_obj));
  34. info!("quant初始化……");
  35. quant_arc.lock().await.before_trade().await;
  36. let ref_name = quant_arc.lock().await.ref_name[0].clone();
  37. // 参考交易所
  38. exchange_disguise::run_reference_exchange(params.ref_exchange.get(0).unwrap().clone(), quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
  39. // 交易交易所
  40. exchange_disguise::run_transactional_exchange(params.exchange, quant_arc.clone(), trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
  41. // 启动定期触发的系统逻辑
  42. quant::on_timer(quant_arc.clone());
  43. // 启动策略逻辑
  44. quant::run_strategy(quant_arc.clone());
  45. info!("quant初始化完成。");
  46. let order_handler_quant_arc = quant_arc.clone();
  47. let order_handler_thread = tokio::spawn(async move {
  48. loop {
  49. tokio::time::sleep(Duration::from_millis(1)).await;
  50. match order_receiver.recv().await {
  51. Some(order) => {
  52. {
  53. let mut quant = order_handler_quant_arc.lock().await;
  54. let mut order_info = OrderInfo {
  55. symbol: "".to_string(),
  56. amount: order.amount.abs(),
  57. side: "".to_string(),
  58. price: order.price,
  59. client_id: order.custom_id,
  60. filled_price: order.avg_price,
  61. filled: order.deal_amount.abs(),
  62. order_id: order.id,
  63. local_time: 0,
  64. create_time: 0,
  65. status: order.status,
  66. fee: Default::default(),
  67. };
  68. quant.update_local_order(order_info.clone());
  69. }
  70. },
  71. None => {
  72. error!("Order channel has been closed!");
  73. }
  74. }
  75. }
  76. });
  77. // let error_handler_quant_arc = quant_arc.clone();
  78. let error_handler_thread = tokio::spawn(async move {
  79. loop {
  80. tokio::time::sleep(Duration::from_millis(1)).await;
  81. match error_receiver.recv().await {
  82. Some(error) => {
  83. // let quant = error_handler_quant_arc.lock().await;
  84. error!("main: 订单出现错误{:?}", error);
  85. },
  86. None => {
  87. error!("Error channel has been closed!");
  88. }
  89. }
  90. }
  91. });
  92. let server_thread = tokio::spawn(async move {
  93. // let server = server::run_server(5566, quant_arc);
  94. // info!("中控服务已运行。");
  95. // server.await
  96. });
  97. try_join!(order_handler_thread, error_handler_thread, server_thread).unwrap();
  98. }