| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- use strategy::core::Core;
- use std::collections::BTreeMap;
- use std::io::Error;
- use strategy::{exchange_disguise, core};
- use std::sync::Arc;
- use std::sync::atomic::{AtomicBool};
- use std::time::Duration;
- use chrono::Utc;
- use tokio::sync::{mpsc, Mutex};
- use tokio::time::Instant;
- use tracing::{error, info};
- use global::cci::CentralControlInfo;
- use global::params::Params;
- use global::trace_stack::TraceStack;
- use standard::Order;
- use strategy::model::OrderInfo;
- pub async fn init(params: Params,
- ws_running: Arc<AtomicBool>,
- running: Arc<AtomicBool>,
- cci_arc: Arc<Mutex<CentralControlInfo>>) -> Arc<Mutex<Core>> {
- // 封装
- let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
- exchange_params.insert("access_key".to_string(), params.access_key.clone());
- exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
- exchange_params.insert("pass_key".to_string(), params.pass_key.clone());
- let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
- let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
- let mut core_obj = Core::new(params.exchange.clone(),
- params.clone(),
- exchange_params.clone(),
- order_sender.clone(),
- error_sender.clone(),
- running.clone(),
- cci_arc.clone()).await;
- info!("core初始化……");
- core_obj.before_trade().await;
- let core_arc = Arc::new(Mutex::new(core_obj));
- // 参考交易所
- let ref_names = core_arc.lock().await.ref_name.clone();
- for (ref_index, ref_name) in ref_names.iter().enumerate() {
- let ref_exchange = params.ref_exchange.get(ref_index).unwrap();
- let ref_symbol = params.ref_pair.get(ref_index).unwrap();
- exchange_disguise::run_reference_exchange(ws_running.clone(),
- core_arc.clone(),
- ref_index,
- ref_name.clone(),
- ref_exchange.clone(),
- vec![ref_symbol.clone()],
- params.colo != 0i8,
- exchange_params.clone()).await;
- }
- // 交易交易所
- let trade_name = core_arc.lock().await.trade_name.clone();
- exchange_disguise::run_transactional_exchange(ws_running.clone(),
- params.exchange,
- core_arc.clone(),
- trade_name,
- vec![params.pair.clone()],
- params.colo != 0i8,
- exchange_params.clone()).await;
- // 启动定期触发的系统逻辑
- core::on_timer(core_arc.clone());
- // 启动策略逻辑
- core::run_strategy(core_arc.clone());
- info!("core初始化完成。");
- let order_handler_core_arc = core_arc.clone();
- tokio::spawn(async move {
- loop {
- tokio::time::sleep(Duration::from_millis(1)).await;
- match order_receiver.recv().await {
- Some(order) => {
- // 刚下的订单有可能会成交,所以有几率触发后续订单逻辑,所以这里也是一个订单触发逻辑
- let trace_stack = TraceStack::new(Utc::now().timestamp_micros(), Instant::now());
- if order.status != "NULL" {
- // trace_stack.on_before_format();
- let mut core = order_handler_core_arc.lock().await;
- // let mut delay_time_lock_instance = delay_time_lock.lock().await;
- let order_info = OrderInfo {
- symbol: order.order_type,
- amount: order.amount.abs(),
- side: "".to_string(),
- price: order.price,
- client_id: order.custom_id,
- filled_price: order.avg_price,
- filled: order.deal_amount.abs(),
- order_id: order.id,
- local_time: Utc::now().timestamp_millis(),
- create_time: 0,
- status: order.status,
- fee: Default::default(),
- trace_stack: order.trace_stack.clone(),
- };
- // trace_stack.on_after_format();
- core.update_local_order(order_info.clone(), trace_stack).await;
- }
- },
- None => {
- error!("Order channel has been closed!");
- }
- }
- }
- });
- let _error_handler_core_arc = core_arc.clone();
- tokio::spawn(async move {
- loop {
- tokio::time::sleep(Duration::from_millis(1)).await;
- match error_receiver.recv().await {
- Some(_error) => {
- // let mut core = _error_handler_core_arc.lock().await;
- // error!("main: 订单出现错误{:?}", _error);
- // core.strategy._print_summary();
- },
- None => {
- error!("Error channel has been closed!");
- }
- }
- }
- });
- // 定时仓位检测
- // let markt_price_core_arc = core_arc.clone();
- // tokio::spawn(async move {
- // info!("rest仓位检测定时任务启动(5s)...");
- // loop {
- // tokio::time::sleep(Duration::from_secs(5)).await;
- //
- // let mut core = markt_price_core_arc.lock().await;
- // match core.platform_rest.get_positions().await {
- // Ok(pos) => {
- // if pos.len() > 0 {
- // core.update_position(pos).await;
- // }
- // },
- // Err(err) => {
- // error!("rest持仓数据获取异常 {}", err);
- // }
- // };
- // }
- // });
- return core_arc;
- }
|