use strategy::core::Core; use std::collections::BTreeMap; use std::io::Error; use strategy::{exchange_disguise, core}; use std::sync::Arc; use std::sync::atomic::{AtomicBool}; use std::time::Duration; use chrono::{Datelike, FixedOffset, Local, TimeZone, Utc}; use tokio::sync::{mpsc, Mutex}; use tokio::time::{sleep_until, Instant}; use tracing::{error, info}; use global::cci::CentralControlInfo; use global::params::Params; use global::trace_stack::TraceStack; use standard::Order; use strategy::model::OrderInfo; pub async fn init(params: Params, ws_running: Arc, running: Arc, cci_arc: Arc>) -> Arc> { // 封装 let mut exchange_params:BTreeMap = BTreeMap::new(); exchange_params.insert("access_key".to_string(), params.access_key.clone()); exchange_params.insert("secret_key".to_string(), params.secret_key.clone()); exchange_params.insert("pass_key".to_string(), params.pass_key.clone()); let (order_sender, mut order_receiver) = mpsc::channel::(100); let (error_sender, mut error_receiver) = mpsc::channel::(100); let mut core_obj = Core::new(params.exchange.clone(), params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone(), running.clone(), cci_arc.clone()).await; let ref_name = core_obj.ref_name[0].clone(); let trade_name = core_obj.trade_name.clone(); info!("core初始化……"); core_obj.before_trade().await; let core_arc = Arc::new(Mutex::new(core_obj)); // 参考交易所 exchange_disguise::run_reference_exchange(ws_running.clone(), params.ref_exchange.get(0).unwrap().clone(), core_arc.clone(), ref_name, params.ref_pair.clone(), params.colo != 0i8, exchange_params.clone()).await; // 交易交易所 exchange_disguise::run_transactional_exchange(ws_running.clone(), params.exchange, core_arc.clone(), trade_name, vec![params.pair.clone()], params.colo != 0i8, exchange_params.clone()).await; // 启动定期触发的系统逻辑 core::on_timer(core_arc.clone()); // 启动策略逻辑 core::run_strategy(core_arc.clone()); info!("core初始化完成。"); let order_handler_core_arc = core_arc.clone(); tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_millis(1)).await; match order_receiver.recv().await { Some(order) => { // 刚下的订单有可能会成交,所以有几率触发后续订单逻辑,所以这里也是一个订单触发逻辑 let trace_stack = TraceStack::new(Utc::now().timestamp_micros(), Instant::now()); if order.status != "NULL" { // trace_stack.on_before_format(); let mut core = order_handler_core_arc.lock().await; // let mut delay_time_lock_instance = delay_time_lock.lock().await; let order_info = OrderInfo { symbol: "".to_string(), amount: order.amount.abs(), side: "".to_string(), price: order.price, client_id: order.custom_id, filled_price: order.avg_price, filled: order.deal_amount.abs(), order_id: order.id, local_time: 0, create_time: 0, status: order.status, fee: Default::default(), trace_stack: order.trace_stack.clone(), }; // trace_stack.on_after_format(); core.update_local_order(order_info.clone(), trace_stack).await; } }, None => { error!("Order channel has been closed!"); } } } }); let _error_handler_core_arc = core_arc.clone(); tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_millis(1)).await; match error_receiver.recv().await { Some(_error) => { // let mut core = _error_handler_core_arc.lock().await; // error!("main: 订单出现错误{:?}", _error); // core.strategy._print_summary(); }, None => { error!("Error channel has been closed!"); } } } }); // 定时交易量更新 let trade_volume_core_arc = core_arc.clone(); tokio::spawn(async move { info!("交易量统计定时任务启动(每天早上8:30)..."); loop { // 定义北京时间偏移(UTC+8) let beijing_offset = FixedOffset::east_opt(8 * 3600).unwrap(); // 获取当前时间并转换为北京时间 let now = Local::now().with_timezone(&beijing_offset); // 计算今天的 8:30(北京时间) let mut target_time = beijing_offset.with_ymd_and_hms(now.year(), now.month(), now.day(), 8, 30, 0).unwrap(); // 如果当前时间已经过了今天的 8:30,则目标时间调整为明天 8:30 if now >= target_time { target_time = target_time + chrono::Duration::days(1); } info!("下一次交易量统计将在 {} 执行", target_time.timestamp()); // 将目标时间换算为时间戳,计算剩余等待时间 let duration_to_wait = (target_time.timestamp() - now.timestamp()) as u64; let target_instant = Instant::now() + Duration::from_secs(duration_to_wait); sleep_until(target_instant).await; // 防止过快循环,等待一分钟后再继续下一次循环 // 更新近1天(昨8点~今8点)的交易量统计 let mut core = trade_volume_core_arc.lock().await; core.update_trade_volume().await; // 等待一分钟以避免过快地循环 sleep_until(Instant::now() + Duration::from_secs(5*60)).await; } }); // 定时仓位检测 // let markt_price_core_arc = core_arc.clone(); // tokio::spawn(async move { // info!("rest仓位检测定时任务启动(5s)..."); // loop { // tokio::time::sleep(Duration::from_secs(5)).await; // // let mut core = markt_price_core_arc.lock().await; // match core.platform_rest.get_positions().await { // Ok(pos) => { // if pos.len() > 0 { // core.update_position(pos).await; // } // }, // Err(err) => { // error!("rest持仓数据获取异常 {}", err); // } // }; // } // }); return core_arc; }