use tokio::time::Instant; use std::collections::{BTreeMap, HashMap}; use std::io::Error; use std::sync::Arc; use std::sync::atomic::{AtomicBool}; use std::time::Duration; use chrono::{Utc}; use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue}; use rust_decimal::Decimal; use rust_decimal_macros::dec; use tokio::sync::mpsc::{Sender}; use tokio::sync::{Mutex}; use tokio::time::sleep; use tracing::{error, info, warn}; use global::cci::CentralControlInfo; use global::clear_position_result::ClearPositionResult; use global::params::Params; use global::trace_stack::TraceStack; use standard::{Account, Market, Order, Platform, Position, PositionModeEnum, SpecialTicker}; use standard::exchange::{Exchange}; use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap}; use crate::model::{LocalPosition, OrderInfo}; use crate::predictor::Predictor; use crate::strategy::Strategy; use crate::utils; use crate::utils::clip; pub struct ClearCore { pub params: Params, // 启动时间 pub start_time: i64, // 币对 pub symbol: String, // 基础货币 pub base: String, // 报价货币 pub quote: String, // pub strategy: Strategy, // 本地挂单表 pub local_orders: HashMap, // 本地订单缓存队列 pub local_orders_backup: HashMap, // 本地订单缓存cid队列 pub local_orders_backup_cid: Vec, // 本地已处理cid缓存队列 pub handled_orders_cid: Vec, // 本地利润值 pub local_profit: Decimal, // 本地U保证金 pub local_cash: Decimal, // 本地币保证金 pub local_coin: Decimal, // 仓位信息 pub local_position: LocalPosition, // 仓位信息-来自订单 pub local_position_by_orders: LocalPosition, // pub local_buy_amount: Decimal, pub local_sell_amount: Decimal, pub local_buy_value: Decimal, pub local_sell_value: Decimal, pub local_cancel_log: HashMap, pub interval: u64, pub exchange: String, pub exit_msg: String, // 仓位检查结果序列 pub position_check_series: Vec, // 止损大小 pub stop_loss: Decimal, // 资金使用率 pub used_pct: Decimal, // 启停信号 0 表示运行 大于1开始倒计时 1时停机 pub mode_signal: i8, // 交易盘口订单流更新时间 pub trade_order_update_time: i64, // onTick触发时间记录 pub on_tick_event_time: i64, // 盘口ticker信息 pub tickers: HashMap, // 盘口 depth信息 pub depths: HashMap>, // 行情更新延迟监控(风控) pub market_update_time: HashMap, pub market_update_interval: HashMap, pub ref_num: i8, pub ref_name: Vec, pub trade_name: String, pub ready: i8, pub predictor: Predictor, pub market: Market, pub platform_rest: Box, // 市场最优买卖价 pub max_buy_min_sell_cache: HashMap>, // 最近一次的depth信息 pub local_depths: HashMap>, pub is_update: HashMap, pub running: Arc, pub hold_coin: Decimal, // 打印限频 pub prev_log_ready_timestamp: i64, pub log_ready_log_interval: i64, // 中控 pub cci_arc: Arc>, // 中控信息汇集 // 老版的trader_msg留下来的 pub agg_market: Vec, pub ref_price: Vec>, pub predict: Decimal, } impl ClearCore { pub async fn new(exchange: String, params: Params, exchange_params: BTreeMap, order_sender: Sender, error_sender: Sender, running: Arc, cci_arc: Arc>) -> ClearCore { let symbol = params.pair.clone(); let pairs: Vec<&str> = params.pair.split('_').collect(); let mut core_obj = ClearCore { params: params.clone(), start_time: 0, symbol: symbol.clone(), base: pairs[0].to_string(), quote: pairs[1].to_string(), // 现货底仓 hold_coin: clip(params.hold_coin, Decimal::ZERO, Decimal::ONE_HUNDRED * Decimal::ONE_HUNDRED), strategy: Strategy::new(¶ms, true), local_orders: Default::default(), local_orders_backup: Default::default(), local_orders_backup_cid: Default::default(), handled_orders_cid: Default::default(), local_profit: Default::default(), local_cash: Default::default(), local_coin: Default::default(), local_position: LocalPosition { long_pos: Default::default(), short_pos: Default::default(), long_avg: Default::default(), short_avg: Default::default(), }, local_position_by_orders: LocalPosition { long_pos: Default::default(), short_pos: Default::default(), long_avg: Default::default(), short_avg: Default::default(), }, local_buy_amount: Default::default(), local_sell_amount: Default::default(), local_buy_value: Default::default(), local_sell_value: Default::default(), local_cancel_log: Default::default(), interval: params.interval, exchange: params.exchange, exit_msg: "正常退出".to_string(), position_check_series: Default::default(), stop_loss: params.stop_loss, used_pct: dec!(0.95), mode_signal: 0, trade_order_update_time: Utc::now().timestamp_millis(), on_tick_event_time: Utc::now().timestamp_millis(), tickers: Default::default(), depths: Default::default(), market_update_time: Default::default(), market_update_interval: Default::default(), ref_num: params.ref_exchange.len() as i8, ref_name: Default::default(), trade_name: "".to_string(), ready: 0, predictor: Predictor { loop_count: 0, market_info_list: vec![], mid_price_list: vec![], ref_mid_price_per_exchange_per_frame: vec![], ref_exchange_length: 0, data_length_max: 0, alpha: vec![], gamma: Default::default(), avg_spread_list: vec![], }, market: Market { symbol: symbol.clone(), base_asset: "".to_string(), quote_asset: "".to_string(), tick_size: Default::default(), price_precision: Default::default(), amount_precision: Default::default(), min_qty: Default::default(), max_qty: Default::default(), min_notional: Default::default(), max_notional: Default::default(), ct_val: Default::default(), amount_size: Default::default(), }, platform_rest: match exchange.as_str() { "kucoin_usdt_swap" => { Exchange::new(KucoinSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await } "gate_usdt_swap" => { Exchange::new(GateSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await } // "gate_usdt_spot" => { // Exchange::new(GateSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await // } "binance_usdt_swap" => { Exchange::new(BinanceSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await } // "binance_spot" => { // Exchange::new(BinanceSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await // } // "bitget_spot" => { // Exchange::new(BitgetSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await // } "bitget_usdt_swap" => { Exchange::new(BitgetSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await } // "okex_usdt_swap" => { // Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await // } "bybit_usdt_swap" => { Exchange::new(BybitSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await } "coinex_usdt_swap" => { Exchange::new(CoinexSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await } "htx_usdt_swap" => { Exchange::new(HtxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await } _ => { error!("203未找到对应的交易所rest枚举!"); panic!("203未找到对应的交易所rest枚举!"); } }, max_buy_min_sell_cache: Default::default(), local_depths: Default::default(), is_update: Default::default(), running, prev_log_ready_timestamp: 0, log_ready_log_interval: 10 * 1000, cci_arc, agg_market: vec![], ref_price: vec![], predict: Default::default(), }; for i in 0..=params.ref_exchange.len() - 1 { // 拼接不会消耗原字符串 let tickers_key: String = format!("{}{}{}{}", params.ref_exchange[i], "@", params.ref_pair[i], "@ref"); let ref_name_element = tickers_key.clone(); let depths_key: String = tickers_key.clone(); let market_update_time_key = tickers_key.clone(); let market_update_interval_key = tickers_key.clone(); let max_buy_min_sell_cache_key = tickers_key.clone(); core_obj.tickers.insert(tickers_key, SpecialTicker::new()); core_obj.ref_name.push(ref_name_element); core_obj.depths.insert(depths_key, Default::default()); core_obj.market_update_time.insert(market_update_time_key, Default::default()); core_obj.market_update_interval.insert(market_update_interval_key, Default::default()); core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]); } let name = format!("{}{}{}", core_obj.exchange.clone(), "@", core_obj.symbol); let market_update_time_key = name.clone(); let market_update_interval_key = name.clone(); let tickers_key = name.clone(); let depths_key = name.clone(); let max_buy_min_sell_cache_key = name.clone(); core_obj.trade_name = name; core_obj.market_update_time.insert(market_update_time_key, Default::default()); core_obj.market_update_interval.insert(market_update_interval_key, Default::default()); core_obj.tickers.insert(tickers_key, SpecialTicker::new()); core_obj.depths.insert(depths_key, Default::default()); core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]); // broker.newWs let mut price_alpha: Vec = Vec::new(); for ref_pair_str in params.ref_pair { if params.pair.contains("1000") && !ref_pair_str.contains("1000") { price_alpha.push(dec!(1000.0)); } else if !params.pair.contains("1000") && ref_pair_str.contains("1000") { price_alpha.push(dec!(0.001)) } else { price_alpha.push(dec!(1.0)); } } info!("价格系数:{:?}", price_alpha); core_obj.predictor = Predictor::new(core_obj.ref_name.len()) .alpha(price_alpha) .gamma(params.gamma); return core_obj; } pub fn log_ready_status(&mut self, msg: String) { // 隔一会再打印未准备就绪的台词 let now_timestamp = Utc::now().timestamp_millis(); if now_timestamp - self.prev_log_ready_timestamp > self.log_ready_log_interval { self.prev_log_ready_timestamp = now_timestamp; info!("{}", msg); } } // #[instrument(skip(self, data), level="TRACE")] pub async fn update_position(&mut self, data: Vec) { if data.is_empty() { return; } let mut position = LocalPosition::new(); for pos in &data { if pos.position_mode == PositionModeEnum::Long { position.long_pos = pos.amount; position.long_avg = pos.price; } else if pos.position_mode == PositionModeEnum::Short { position.short_pos = pos.amount.abs(); position.short_avg = pos.price; } } // 更新仓位信息 if position != self.local_position { info!("收到新的仓位推送, position: {:?}", data); info!("更新本地仓位:{:?}", position); self.local_position = position; } // 更新中控持仓相关的信息 { let mut pos = self.local_position_by_orders.long_pos - self.local_position_by_orders.short_pos; if !self.exchange.contains("spot") { pos = self.local_position.long_pos - self.local_position.short_pos; } pos.rescale(8); let mut entry_price; if pos.gt(&Decimal::ZERO) { entry_price = self.local_position_by_orders.long_avg; } else { entry_price = self.local_position_by_orders.short_avg; } entry_price.rescale(8); let mut cci = self.cci_arc.lock().await; cci.pos = pos; cci.entry_price = entry_price; } } // #[instrument(skip(self), level="TRACE")] pub async fn get_exchange_info(&mut self) { self.market = self.platform_rest.get_self_market(); info!(?self.market); } // #[instrument(skip(self, data), level="TRACE")] pub async fn update_equity(&mut self, data: Account) { /* 更新保证金信息 合约一直更新 现货只有当出现异常时更新 */ if self.exchange.contains("spot") { return; } self.local_cash = data.balance * self.used_pct; } // #[instrument(skip(self), level="TRACE")] pub async fn update_equity_rest_swap(&mut self) { match self.platform_rest.get_account().await { Ok(account) => { /* 更新保证金信息 合约一直更新 现货只有当出现异常时更新 */ self.local_cash = account.balance * self.used_pct } Err(e) => { info!("获取账户信息错误: {:?}", e); } } } pub async fn update_position_rest_swap(&mut self) { let position = self.platform_rest.get_position().await; match position { Ok(val) => { // info!("bybit_swap:定时获取的仓位信息"); self.update_position(val).await; } Err(err) => { error!("bybit_swap:定时获取仓位信息错误!\nget_position:res_data={:?}", err); } } } // #[instrument(skip(self), level="TRACE")] pub async fn update_equity_rest_spot(&mut self) { match self.platform_rest.get_spot_account().await { Ok(mut val) => { // 如果返回的数组里没有交易货币,则补充交易货币 if !val.iter().any(|a| a.coin.to_uppercase().eq(&self.base.to_uppercase())) { let mut base_coin_account = Account::new(); base_coin_account.coin = self.base.to_uppercase(); val.push(base_coin_account); } for account in val { // 交易货币 if self.base.to_uppercase() == account.coin { self.local_coin = account.balance; } // 本位货币 if self.quote.to_uppercase() == account.coin { self.local_cash = account.balance; } } } Err(err) => { error!("获取仓位信息异常: {}", err); } } } // #[instrument(skip(self, target_hold_coin), level="TRACE")] pub async fn check_position(&mut self) -> ClearPositionResult { let mut result = ClearPositionResult::new(); info!("------------------------------------------------------------------------------------------------------------"); info!("步骤一:检查挂单:"); match self.platform_rest.cancel_orders_all().await { Ok(val) => { let length = val.len(); result.clear_order_num = length.to_string(); info!("已清空所有挂单({}条)", length); result.clear_order_str = format!("清空所有挂单:{:?}", val); for o in val { info!(" {:?}", o); } } Err(err) => { warn!("取消所有订单异常({}),启动备用方法。", err); match self.platform_rest.cancel_orders().await { Ok(val) => { let length = val.len(); result.clear_order_num = length.to_string(); result.clear_order_str = format!("清空所有挂单(备用):{:?}", val); info!("清空所有挂单({}条):{:?}", length, val); } Err(exc) => { result.clear_order_str = exc.to_string(); result.clear_other_err = true; error!("清空当前币对订单异常: {}", exc); } } } } info!("挂单检查完毕。"); info!(""); info!("步骤二:检查仓位:"); match self.platform_rest.get_positions().await { Ok(val) => { info!("检查仓位信息"); result.clear_position_num = val.len().to_string(); for position in val { if position.amount.eq(&Decimal::ZERO) { continue; } info!(" 仓位:{:?}", position); let price = Decimal::ZERO; let side; info!(?position); match position.position_mode { PositionModeEnum::Long => { // pd side = "pd"; } PositionModeEnum::Short => { // pk side = "pk"; } _ => { error!(" 仓位position_mode匹配失败,不做操作!"); // 执行完当前币对 结束循环 continue; } } // 发起清仓订单 let mut ts = TraceStack::new(0, Instant::now()); ts.on_before_send(); // 市价单 match self.platform_rest.take_order_symbol(position.symbol.clone(), Decimal::ONE, utils::generate_client_id(None).as_str(), side, price, position.amount.abs()).await { Ok(order) => { ts.on_after_send(); info!(" {}仓位清除市价下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string()); result.clear_position_str = format!("{} >仓位信息:{:?} 下单信息: {:?}",result.clear_position_str, position, order); // 执行完当前币对 结束循环 continue; } Err(error) => { // ts.on_after_send(); error!(" {}仓位清除市价下单异常 {}, {}", position.symbol.clone(), error, ts.to_string()); result.clear_other_str = format!("{} >仓位信息:{:?} 下单异常信息: {:?}",result.clear_other_str, position, error); // 执行完当前币对 结束循环 continue; } }; } } Err(error) => { result.clear_other_err = true; result.clear_position_str = format!("获取仓位异常 {}", error); error!("获取仓位信息异常: {}", error); } } info!("------------------------------------------------------------------------------------------------------------"); info!(""); return result; } // #[instrument(skip(self), level="TRACE")] pub async fn exit(&mut self, r_id: String) -> bool { info!("-------------------------启动退出流程({})----------------------------", self.exit_msg); info!(""); let mut result = self.check_position().await; // 设置机器人id result.r_id = r_id; info!("清仓程序结果 {:?}", result); // 判断是否有清仓,是否有异常 if result.clear_position_num != "0" || result.clear_order_num != "0" || result.clear_other_err{ info!("上报了清仓信息!!!"); send_clear_msg_request(&result).await; // 上报清仓日志 } info!("订单、仓位清除完毕,为避免api失效导致遗漏仓位,建议人工复查。"); info!("停机原因:{}。", self.exit_msg); return true; } // #[instrument(skip(self), level="TRACE")] pub async fn before_trade(&mut self) -> bool { sleep(Duration::from_secs(1)).await; // 获取市场信息 self.get_exchange_info().await; // 获取价格信息 let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!"); info!(?ticker); let mp = (ticker.buy + ticker.sell) / Decimal::TWO; // 获取账户信息 if self.exchange.contains("spot") { self.update_equity_rest_spot().await; } else { self.update_equity_rest_swap().await; } // 更新中控账户相关信息 { let mut now_balance = self.local_cash / self.used_pct; now_balance.rescale(4); let mut cci = self.cci_arc.lock().await; cci.now_balance = now_balance; } // 初始资金 let start_cash = self.local_cash.clone(); let start_coin = self.local_coin.clone(); if start_cash.is_zero() && start_coin.is_zero() { self.exit_msg = format!("{}{}{}{}", "初始余额为零 cash: ", start_cash, " coin: ", start_coin); } info!("初始cash: {start_cash} 初始coin: {start_coin}"); // 初始化策略基础信息 if mp <= Decimal::ZERO { self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp); return false; } else { info!("初始价格为 {}", mp); } self.strategy.mp = mp.clone(); self.strategy.start_cash = start_cash.clone(); self.strategy.start_coin = start_coin.clone(); self.strategy.start_equity = start_cash + start_coin * mp; self.strategy.max_equity = self.strategy.start_equity.clone(); self.strategy.equity = self.strategy.start_equity.clone(); self.strategy.total_amount = self.strategy.equity * self.strategy.lever_rate / self.strategy.mp; // 获取数量精度 self.strategy.step_size = self.market.amount_size.clone(); if self.strategy.step_size > Decimal::ONE { self.strategy.step_size = self.strategy.step_size.trunc(); } // 获取价格精度 self.strategy.tick_size = self.market.tick_size.clone(); if self.strategy.tick_size > Decimal::ONE { self.strategy.tick_size = self.strategy.tick_size.trunc(); } if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero() { self.exit_msg = format!("{}{}{}{}", "交易精度未正常获取 step_size: ", self.strategy.step_size, " tick_size:", self.strategy.tick_size); return false; } else { info!("数量精度 {}", self.strategy.step_size); info!("价格精度 {}", self.strategy.tick_size); } // 初始化调度器 self.local_cash = start_cash; self.local_coin = start_coin; // 买入平台币 if self.exchange.contains("spot") { // 现货 } // 清空挂单和仓位 return true; } } // 清仓消息上报中控 pub async fn send_clear_msg_request(body_params: &ClearPositionResult) { // 创建客户端 let client = reqwest::Client::new(); // 创建请求头 let mut headers = HeaderMap::new(); headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); headers.insert("report-token", HeaderValue::from_static("r7T$8gBV!f&L@E2+")); headers.insert("auth", HeaderValue::from_static("4L")); let body = serde_json::to_string(&body_params).unwrap(); // 发送 POST 请求 let res = client .post("https://t4lapi.skyfffire.com/api/report/searchPositions") .body(body) .headers(headers) .send() .await; match res { Ok(response) => { let status = response.status(); let response_text = response.text().await.unwrap_or("获取请求的响应文本异常".to_string()); // 检查响应状态并读取响应体 if status.is_success() { info!("清仓结果上报中控,请求成功,响应文本: {}", response_text); } else { println!("清仓结果上报中控,请求失败: 响应异常码 {},响应文本 {}", status, response_text); } }, Err(e) => { error!("清仓结果上报中控,请求发送失败,异常:{}", e) } } }