quant.rs 61 KB


  1. use std::cmp::max;
  2. use std::collections::{BTreeMap, HashMap};
  3. use std::io::Error;
  4. use std::str::FromStr;
  5. use std::sync::{Arc};
  6. use std::sync::atomic::{AtomicBool, Ordering};
  7. use std::time::Duration;
  8. use chrono::{Utc};
  9. use rust_decimal::Decimal;
  10. use rust_decimal::prelude::{ToPrimitive};
  11. use rust_decimal_macros::dec;
  12. use tokio::spawn;
  13. use tokio::sync::mpsc::{Receiver, Sender};
  14. use tokio::sync::{Mutex};
  15. use tokio::task::JoinHandle;
  16. use tokio::time::sleep;
  17. use tracing::{debug, error, info, warn};
  18. use global::params::Params;
  19. use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
  20. use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
  21. use standard::exchange::{Exchange};
  22. use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, GateSpot, GateSwap, KucoinSwap};
  23. use crate::model::{LocalPosition, OrderInfo, TraderMsg};
  24. use crate::predictor::Predictor;
  25. use crate::strategy::Strategy;
  26. pub struct Quant {
  27. pub params: Params,
  28. // 启动时间
  29. pub start_time: i64,
  30. // 币对
  31. pub symbol: String,
  32. // 基础货币
  33. pub base: String,
  34. // 报价货币
  35. pub quote: String,
  36. //
  37. pub strategy: Strategy,
  38. // 本地挂单表
  39. pub local_orders: HashMap<String, OrderInfo>,
  40. // 本地订单缓存队列
  41. pub local_orders_backup: HashMap<String, OrderInfo>,
  42. // 本地订单缓存cid队列
  43. pub local_orders_backup_cid: Vec<String>,
  44. // 本地已处理cid缓存队列
  45. pub handled_orders_cid: Vec<String>,
  46. // 本地利润值
  47. pub local_profit: Decimal,
  48. // 本地U保证金
  49. pub local_cash: Decimal,
  50. // 本地币保证金
  51. pub local_coin: Decimal,
  52. // 仓位信息
  53. pub local_position: LocalPosition,
  54. // 仓位信息-自订单
  55. pub local_position_by_orders: LocalPosition,
  56. //
  57. pub local_buy_amount: Decimal,
  58. pub local_sell_amount: Decimal,
  59. pub local_buy_value: Decimal,
  60. pub local_sell_value: Decimal,
  61. pub local_cancel_log: HashMap<String, i64>,
  62. pub interval: u64,
  63. pub exchange: String,
  64. pub trade_msg: TraderMsg,
  65. pub exit_msg: String,
  66. // 仓位检查结果序列
  67. pub position_check_series: Vec<i8>,
  68. // 止损大小
  69. pub stop_loss: Decimal,
  70. // 资金使用率
  71. pub used_pct: Decimal,
  72. // 启停信号 0 表示运行 大于1开始倒计时 1时停机
  73. pub mode_signal: i8,
  74. // 交易盘口订单流更新时间
  75. pub trade_order_update_time: i64,
  76. // onTick触发时间记录
  77. pub on_tick_event_time: i64,
  78. // 盘口ticker信息
  79. pub tickers: HashMap<String, SpecialTicker>,
  80. // 盘口 depth信息
  81. pub depths: HashMap<String, Vec<Decimal>>,
  82. // 行情更新延迟监控(风控)
  83. pub market_update_time: HashMap<String, i64>,
  84. pub market_update_interval: HashMap<String, Decimal>,
  85. pub ref_num: i8,
  86. pub ref_name: Vec<String>,
  87. pub trade_name: String,
  88. pub ready: i8,
  89. pub predictor: Predictor,
  90. pub market: Market,
  91. pub platform_rest:Box<dyn Platform+Send+Sync>,
  92. // 市场最优买卖价
  93. pub max_buy_min_sell_cache: HashMap<String, Vec<Decimal>>,
  94. // 最近一次的depth信息
  95. pub local_depths: HashMap<String, Vec<Decimal>>,
  96. pub is_update: HashMap<String, bool>,
  97. pub running: Arc<AtomicBool>,
  98. }
  99. impl Quant {
  100. pub async fn new(exchange: String, params: Params, exchange_params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>, running: Arc<AtomicBool>) -> Quant {
  101. let symbol = params.pair.clone();
  102. let pairs: Vec<&str> = params.pair.split('_').collect();
  103. let mut quant_obj = Quant {
  104. params: params.clone(),
  105. start_time: 0,
  106. symbol: symbol.clone(),
  107. base: pairs[0].to_string(),
  108. quote: pairs[1].to_string(),
  109. strategy: Strategy::new(&params, true),
  110. local_orders: Default::default(),
  111. local_orders_backup: Default::default(),
  112. local_orders_backup_cid: Default::default(),
  113. handled_orders_cid: Default::default(),
  114. local_profit: Default::default(),
  115. local_cash: Default::default(),
  116. local_coin: Default::default(),
  117. local_position: LocalPosition {
  118. long_pos: Default::default(),
  119. short_pos: Default::default(),
  120. long_avg: Default::default(),
  121. short_avg: Default::default(),
  122. },
  123. local_position_by_orders: LocalPosition {
  124. long_pos: Default::default(),
  125. short_pos: Default::default(),
  126. long_avg: Default::default(),
  127. short_avg: Default::default(),
  128. },
  129. local_buy_amount: Default::default(),
  130. local_sell_amount: Default::default(),
  131. local_buy_value: Default::default(),
  132. local_sell_value: Default::default(),
  133. local_cancel_log: Default::default(),
  134. interval: params.interval,
  135. exchange: params.exchange,
  136. trade_msg: TraderMsg::new(),
  137. exit_msg: "正常退出".to_string(),
  138. position_check_series: Default::default(),
  139. stop_loss: params.stop_loss,
  140. used_pct: params.used_pct,
  141. mode_signal: 0,
  142. trade_order_update_time: Utc::now().timestamp_millis(),
  143. on_tick_event_time: Utc::now().timestamp_millis(),
  144. tickers: Default::default(),
  145. depths: Default::default(),
  146. market_update_time: Default::default(),
  147. market_update_interval: Default::default(),
  148. ref_num: params.ref_exchange.len() as i8,
  149. ref_name: Default::default(),
  150. trade_name: "".to_string(),
  151. ready: 0,
  152. predictor: Predictor {
  153. loop_count: 0,
  154. market_info_list: vec![],
  155. mid_price_list: vec![],
  156. ref_mid_price_per_exchange_per_frame: vec![],
  157. ref_exchange_length: 0,
  158. data_length_max: 0,
  159. alpha: vec![],
  160. gamma: Default::default(),
  161. avg_spread_list: vec![],
  162. },
  163. market: Market {
  164. symbol: symbol.clone(),
  165. base_asset: "".to_string(),
  166. quote_asset: "".to_string(),
  167. tick_size: Default::default(),
  168. amount_size: Default::default(),
  169. price_precision: Default::default(),
  170. amount_precision: Default::default(),
  171. min_qty: Default::default(),
  172. max_qty: Default::default(),
  173. min_notional: Default::default(),
  174. max_notional: Default::default(),
  175. ct_val: Default::default(),
  176. },
  177. platform_rest: match exchange.as_str() {
  178. "kucoin_usdt_swap" => {
  179. Exchange::new(KucoinSwap, symbol, false, exchange_params, order_sender, error_sender).await
  180. },
  181. "gate_usdt_swap" => {
  182. Exchange::new(GateSwap, symbol, false, exchange_params, order_sender, error_sender).await
  183. },
  184. "gate_usdt_spot" => {
  185. Exchange::new(GateSpot, symbol, false, exchange_params, order_sender, error_sender).await
  186. },
  187. "binance_usdt_swap" => {
  188. Exchange::new(BinanceSwap, symbol, false, exchange_params, order_sender, error_sender).await
  189. },
  190. "binance_usdt_spot" => {
  191. Exchange::new(BinanceSpot, symbol, false, exchange_params, order_sender, error_sender).await
  192. }
  193. _ => {
  194. error!("203未找到对应的交易所rest枚举!");
  195. panic!("203未找到对应的交易所rest枚举!");
  196. }
  197. },
  198. max_buy_min_sell_cache: Default::default(),
  199. local_depths: Default::default(),
  200. is_update: Default::default(),
  201. running,
  202. };
  203. for i in 0..=params.ref_exchange.len() - 1 {
  204. // 拼接不会消耗原字符串
  205. let tickers_key: String = format!("{}{}{}{}", params.ref_exchange[i], "@", params.ref_pair[i], "@ref");
  206. let ref_name_element = tickers_key.clone();
  207. let depths_key: String = tickers_key.clone();
  208. let market_update_time_key = tickers_key.clone();
  209. let market_update_interval_key = tickers_key.clone();
  210. let max_buy_min_sell_cache_key = tickers_key.clone();
  211. quant_obj.tickers.insert(tickers_key, SpecialTicker {
  212. sell: Default::default(),
  213. buy: Default::default(),
  214. mid_price: Default::default(),
  215. });
  216. quant_obj.ref_name.push(ref_name_element);
  217. quant_obj.depths.insert(depths_key, Default::default());
  218. quant_obj.market_update_time.insert(market_update_time_key, Default::default());
  219. quant_obj.market_update_interval.insert(market_update_interval_key, Default::default());
  220. quant_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
  221. }
  222. let name = format!("{}{}{}", quant_obj.exchange.clone(), "@", quant_obj.symbol);
  223. let market_update_time_key = name.clone();
  224. let market_update_interval_key = name.clone();
  225. let tickers_key = name.clone();
  226. let depths_key = name.clone();
  227. let max_buy_min_sell_cache_key = name.clone();
  228. quant_obj.trade_name = name;
  229. quant_obj.market_update_time.insert(market_update_time_key, Default::default());
  230. quant_obj.market_update_interval.insert(market_update_interval_key, Default::default());
  231. quant_obj.tickers.insert(tickers_key, SpecialTicker {
  232. sell: Default::default(),
  233. buy: Default::default(),
  234. mid_price: Default::default(),
  235. });
  236. quant_obj.depths.insert(depths_key, Default::default());
  237. quant_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
  238. // broker.newWs
  239. let mut price_alpha: Vec<Decimal> = Vec::new();
  240. for ref_pair_str in params.ref_pair {
  241. if params.pair.contains("1000") && !ref_pair_str.contains("1000") {
  242. price_alpha.push(dec!(1000.0));
  243. } else if !params.pair.contains("1000") && ref_pair_str.contains("1000") {
  244. price_alpha.push(dec!(0.001))
  245. } else {
  246. price_alpha.push(dec!(1.0));
  247. }
  248. }
  249. info!("价格系数:{:?}", price_alpha);
  250. quant_obj.predictor = Predictor::new(quant_obj.ref_name.len())
  251. .alpha(price_alpha)
  252. .gamma(params.gamma);
  253. return quant_obj;
  254. }
  255. pub async fn handle_signals(quant_arc: Arc<Mutex<Quant>>, mut rx: Receiver<Order>) {
  256. spawn(async move{
  257. loop {
  258. sleep(Duration::from_millis(1)).await;
  259. match rx.try_recv() {
  260. Ok(val)=>{
  261. // 只处理这两种订单回执
  262. if ["NEW", "REMOVE"].contains(&val.status.as_str()){
  263. let mut local_order_info = OrderInfo{
  264. symbol: "".to_string(),
  265. amount: Default::default(),
  266. side: "".to_string(),
  267. price: Default::default(),
  268. client_id: "".to_string(),
  269. filled_price: Default::default(),
  270. filled: Default::default(),
  271. order_id: "".to_string(),
  272. local_time: 0,
  273. create_time: 0,
  274. status: "".to_string(),
  275. fee: Decimal::ZERO,
  276. trace_stack: Default::default(),
  277. };
  278. if val.status== "NEW" {
  279. local_order_info.client_id = val.custom_id;
  280. local_order_info.order_id = val.id;
  281. } else if val.status == "REMOVE" {
  282. local_order_info.client_id = val.custom_id;
  283. }
  284. let mut bot = quant_arc.lock().await;
  285. // let mut time_delay = time_record.lock().await;
  286. // 写入本地订单缓存
  287. bot.update_local_order(local_order_info);
  288. }
  289. },
  290. Err(e) => {
  291. info!("订单回执消费失败!{}", e);
  292. return;
  293. }
  294. }
  295. }
  296. });
  297. }
  298. pub fn update_order(&mut self, data: Vec<OrderInfo>){
  299. for order in data {
  300. self.update_local_order(order);
  301. }
  302. }
  303. pub fn update_local_order(&mut self, data: OrderInfo) {
  304. if data.filled != Decimal::ZERO {
  305. info!("\n\n");
  306. info!("接收到订单信息①:{:?}", data);
  307. }
  308. /*
  309. 更新订单
  310. 首先直接复写本地订单
  311. 1、如果是开仓单
  312. 如果新增: 增加本地订单
  313. 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位
  314. 如果成交: 删除本地订单 发送平仓订单 修改本地仓位
  315. 2、如果是平仓单
  316. 如果新增: 增加本地订单
  317. 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位
  318. 如果成交: 删除本地订单 修改本地仓位
  319. NEW 可以从 ws / rest 来
  320. REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
  321. 为了防止下单失败依然有订单成交 本地需要做一个缓存
  322. */
  323. // 触发订单更新
  324. self.trade_order_update_time = Utc::now().timestamp_millis();
  325. // 新增订单推送 仅需要cid oid信息
  326. if data.status == "NEW" {
  327. // 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控)
  328. if self.local_orders.contains_key(&data.client_id) {
  329. let mut order_info = self.local_orders.get(&data.client_id).unwrap().clone();
  330. order_info.order_id = data.order_id;
  331. order_info.local_time = Utc::now().timestamp_millis();
  332. self.local_orders.insert(data.client_id.clone(), order_info);
  333. }
  334. } else if data.status == "REMOVE" {
  335. // 如果在撤单记录中 说明此订单结束生命周期 可以移除记录
  336. if self.local_cancel_log.contains_key(&data.client_id) {
  337. self.local_cancel_log.remove(&data.client_id);
  338. }
  339. if self.local_orders.contains_key(&data.client_id) {
  340. debug!("删除本地订单, client_id:{:?}", data);
  341. self.local_orders.remove(&data.client_id);
  342. } else {
  343. debug!("该订单不在本地挂单表中, order:{:?}", data);
  344. }
  345. // 在cid缓存队列中 说明是本策略的订单
  346. if self.local_orders_backup.contains_key(&data.client_id) {
  347. // 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算
  348. if self.handled_orders_cid.contains(&data.client_id) {
  349. debug!("订单已经参与过仓位计算 拒绝重复进行计算, 订单号:{}", data.client_id);
  350. } else {
  351. // 添加进已处理队列
  352. self.handled_orders_cid.push(data.client_id.clone());
  353. // 提取成交信息 方向 价格 量
  354. let filled = data.filled;
  355. let side = self.local_orders_backup.get(&data.client_id).unwrap().side.clone();
  356. let mut filled_price = Decimal::ZERO;
  357. if data.filled_price > filled_price {
  358. filled_price = data.filled_price;
  359. } else {
  360. filled_price = self.local_orders_backup.get(&data.client_id).unwrap().price.clone();
  361. }
  362. // 只有开仓成交才触发onPosition
  363. // 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况
  364. if filled > Decimal::ZERO {
  365. let mut filled_order = data.clone();
  366. filled_order.side = side.clone();
  367. info!("移除本地订单:{:?}, local_by_orders: {:?}", filled_order, self.local_position_by_orders);
  368. if self.exchange.contains("spot") { // 如果是现货交易 还需要修改equity
  369. // 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
  370. let fee = data.fee;
  371. if side == "kd" { // buy 开多
  372. self.local_buy_amount += filled - fee;
  373. self.local_buy_value += (filled - fee) * filled_price;
  374. let new_long_pos = self.local_position_by_orders.long_pos.clone();
  375. if new_long_pos == Decimal::ZERO {
  376. self.local_position_by_orders.long_avg = Decimal::ZERO;
  377. self.local_position_by_orders.long_pos = Decimal::ZERO;
  378. } else {
  379. self.local_position_by_orders.long_avg = (self.local_position_by_orders.long_pos * self.local_position_by_orders.long_avg +
  380. filled * filled_price) / new_long_pos;
  381. self.local_position_by_orders.long_pos = new_long_pos;
  382. }
  383. self.local_cash -= filled * filled_price;
  384. self.local_coin = filled - fee;
  385. } else if side == "pd" { // sell 平多
  386. self.local_sell_amount += filled;
  387. self.local_sell_value += filled * filled_price;
  388. self.local_profit += filled * (filled_price - self.local_position_by_orders.long_avg);
  389. let new_long_pos = self.local_position_by_orders.long_pos - filled;
  390. if new_long_pos == Decimal::ZERO {
  391. self.local_position_by_orders.long_avg = Decimal::ZERO;
  392. self.local_position_by_orders.long_pos = Decimal::ZERO;
  393. } else {
  394. self.local_position_by_orders.long_pos = new_long_pos;
  395. }
  396. self.local_cash += filled * filled_price - fee;
  397. self.local_coin -= filled;
  398. } else if side == "pk" { // buy 平空
  399. self.local_buy_amount += filled - fee;
  400. self.local_buy_value += (filled - fee) * filled_price;
  401. self.local_profit += filled * (self.local_position_by_orders.short_avg - filled_price);
  402. let new_short_pos = self.local_position_by_orders.short_pos - filled;
  403. if new_short_pos == Decimal::ZERO {
  404. self.local_position_by_orders.short_avg = Decimal::ZERO;
  405. self.local_position_by_orders.short_pos = Decimal::ZERO;
  406. } else {
  407. self.local_position_by_orders.short_pos = new_short_pos;
  408. }
  409. self.local_cash -= filled * filled_price;
  410. self.local_coin += filled - fee;
  411. } else if side == "kk" { // sell 开空
  412. self.local_sell_amount += filled;
  413. self.local_sell_value += filled * filled_price;
  414. let new_short_pos = self.local_position_by_orders.short_pos - filled;
  415. if new_short_pos == Decimal::ZERO {
  416. self.local_position_by_orders.short_avg = Decimal::ZERO;
  417. self.local_position_by_orders.short_pos = Decimal::ZERO;
  418. } else {
  419. self.local_position_by_orders.short_avg = (self.local_position_by_orders.short_pos * self.local_position_by_orders.short_avg
  420. + filled * filled_price) / new_short_pos;
  421. self.local_position_by_orders.short_pos = new_short_pos;
  422. }
  423. self.local_cash += filled * filled_price - fee;
  424. self.local_coin -= filled;
  425. } else {
  426. info!("错误的仓位方向{}", side);
  427. }
  428. } else { // 合约订单流仓位计算
  429. if side == "kd" { // buy 开多
  430. self.local_buy_amount += filled;
  431. self.local_buy_value += filled * filled_price;
  432. let new_long_pos = self.local_position_by_orders.long_pos + filled;
  433. if new_long_pos == Decimal::ZERO {
  434. self.local_position_by_orders.long_avg = Decimal::ZERO;
  435. self.local_position_by_orders.long_pos = Decimal::ZERO;
  436. } else {
  437. self.local_position_by_orders.long_avg = (self.local_position_by_orders.long_pos *
  438. self.local_position_by_orders.long_avg + filled * filled_price) / new_long_pos;
  439. self.local_position_by_orders.long_pos = self.local_position_by_orders.long_pos + filled;
  440. }
  441. } else if side == "kk" { // sell 开空
  442. self.local_sell_amount += filled;
  443. self.local_sell_value += filled * filled_price;
  444. let new_short_pos = self.local_position_by_orders.short_pos + filled;
  445. if new_short_pos == Decimal::ZERO {
  446. self.local_position_by_orders.short_avg = Decimal::ZERO;
  447. self.local_position_by_orders.short_pos = Decimal::ZERO;
  448. } else {
  449. self.local_position_by_orders.short_avg = (self.local_position_by_orders.short_pos * self.local_position_by_orders.short_avg + filled * filled_price) / new_short_pos;
  450. self.local_position_by_orders.short_pos = self.local_position_by_orders.short_pos + filled;
  451. }
  452. } else if side == "pd" { // sell 平多
  453. self.local_sell_amount += filled;
  454. self.local_sell_value += filled * filled_price;
  455. self.local_profit += filled * (filled_price - self.local_position_by_orders.long_avg);
  456. self.local_position_by_orders.long_pos = self.local_position_by_orders.long_pos - filled;
  457. if self.local_position_by_orders.long_pos == Decimal::ZERO {
  458. self.local_position_by_orders.long_avg = Decimal::ZERO;
  459. }
  460. } else if side == "pk" { // buy 平空
  461. self.local_buy_amount += filled;
  462. self.local_buy_value += filled * filled_price;
  463. self.local_profit += filled * (self.local_position_by_orders.short_avg - filled_price);
  464. self.local_position_by_orders.short_pos = self.local_position_by_orders.short_pos - filled;
  465. if self.local_position_by_orders.short_pos == Decimal::ZERO {
  466. self.local_position_by_orders.short_avg = Decimal::ZERO;
  467. }
  468. } else {
  469. error!("错误的仓位方向{}", side);
  470. }
  471. // 统计合约交易手续费 正fee为扣手续费 负fee为返佣
  472. if data.fee > Decimal::ZERO {
  473. self.local_profit -= data.fee;
  474. }
  475. }
  476. // info!("成交单耗时数据:{}", time_record.to_string());
  477. info!("更新推算仓位 {:?}", self.local_position_by_orders);
  478. // 本地计算利润
  479. self._print_local_trades_summary();
  480. // 打印各类信息
  481. self.strategy.local_orders = self.local_orders.clone();
  482. self.strategy._print_summary();
  483. }
  484. // 每次有订单变动就触发一次策略
  485. if self.mode_signal == 0 && self.ready == 1 {
  486. // 更新交易数据
  487. self.update_trade_msg();
  488. // 触发策略挂单逻辑
  489. // 更新策略时间
  490. self.strategy.local_time = Utc::now().timestamp_millis();
  491. // time_record.strategy_start = Utc::now().timestamp_micros();
  492. let order = self.strategy.on_time(&self.trade_msg);
  493. // time_record.strategy_end = Utc::now().timestamp_micros();
  494. // 记录指令触发信息
  495. if order.is_not_empty() {
  496. // info!("触发onOrder");
  497. self._update_local_orders(&order);
  498. // time_record.quant_end = Utc::now().timestamp_micros();
  499. // time_record.source = "508 orderUpdate".to_string();
  500. // time_record.order_command = order.to_string();
  501. //交易所处理订单信号
  502. let mut platform_rest_fb = self.platform_rest.clone_box();
  503. // info!("订单指令:{:?}", order);
  504. spawn(async move{
  505. // info!("update_local_order订单指令:{:?}", order);
  506. platform_rest_fb.command_order(order).await;
  507. });
  508. }
  509. }
  510. }
  511. } else {
  512. debug!("订单不属于本策略 拒绝进行仓位计算: {}", data.client_id);
  513. }
  514. } else {
  515. error!("未知的订单事件类型:{:?}", data);
  516. }
  517. }
  518. pub fn _print_local_trades_summary(&mut self) {
  519. // 计算本地累计利润
  520. let local_buy_amount = self.local_buy_amount.round_dp(5);
  521. let local_buy_value = self.local_buy_value.round_dp(5);
  522. let local_sell_amount = self.local_sell_amount.round_dp(5);
  523. let local_sell_value = self.local_sell_value.round_dp(5);
  524. if self.strategy.mp > Decimal::ZERO {
  525. let unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp;
  526. let realized = local_sell_value - local_buy_value;
  527. let local_profit = (unrealized + realized).round_dp(5);
  528. self.strategy.local_profit = local_profit;
  529. info!("买量 {},卖量 {},买额{},卖额{}", local_buy_amount, local_sell_amount, local_buy_value, local_sell_value);
  530. }
  531. }
  532. // 检测初始数据是否齐全
  533. pub fn check_ready(&mut self) {
  534. // 检查 ticker 行情
  535. for i in &self.ref_name {
  536. if self.tickers.is_empty() || !self.tickers.contains_key(i) {
  537. info!("529参考盘口ticker未准备好: {:?}", self.tickers);
  538. return;
  539. } else {
  540. if self.tickers.get(i).unwrap().buy == dec!(0) || self.tickers.get(i).unwrap().sell == dec!(0) {
  541. info!("533参考盘口ticker未准备好: {:?}", self.tickers);
  542. return;
  543. }
  544. }
  545. }
  546. if self.tickers.contains_key(&self.trade_name) {
  547. if self.tickers.get(&self.trade_name).unwrap().buy == dec!(0) || self.tickers.get(&self.trade_name).unwrap().sell == dec!(0) {
  548. info!("540交易盘口ticker未准备好: {:?}", self.tickers);
  549. return;
  550. }
  551. } else {
  552. info!("544交易盘口ticker未准备好: {:?}", self.tickers);
  553. return;
  554. }
  555. // 检查 market 行情
  556. let all_market: Vec<Decimal> = self.get_all_market_data();
  557. if all_market.len() != LENGTH * (1usize + self.ref_num as usize) {
  558. info!("550聚合行情未准备好: market长度:{}, 检验数: {}", all_market.len(), LENGTH * (1usize + self.ref_num as usize));
  559. return;
  560. } else {
  561. info!("553聚合行情准备就绪");
  562. self.trade_msg.market = all_market;
  563. self.predictor.market_info_handler(&self.trade_msg.market);
  564. }
  565. self.ready = 1
  566. }
  567. pub fn _update_depth(&mut self, depth: Vec<Decimal>, name :String) {
  568. // 要从回调传入的深度信息中获取data.name
  569. let market_update_interval_key = name.clone();
  570. let market_update_time_key = name.clone();
  571. let depths1_key = name.clone();
  572. let depths2_key = name.clone();
  573. let depths3_key = name.clone();
  574. let now_time = Utc::now().timestamp_millis();
  575. if self.market_update_time.contains_key(&name) && *self.market_update_time.get(&name).unwrap() != 0i64 {
  576. let interval = Decimal::from(now_time - self.market_update_time.get(&name).unwrap());
  577. if *self.market_update_interval.get(&name).unwrap() == dec!(0) {
  578. self.market_update_interval.insert(market_update_interval_key, interval);
  579. } else {
  580. let value = self.market_update_interval.get(&name).unwrap();
  581. self.market_update_interval.insert(market_update_interval_key, value * dec!(0.999) + interval * dec!(0.001));
  582. }
  583. }
  584. self.market_update_time.insert(market_update_time_key, now_time);
  585. // 初始化depths
  586. if self.depths.get(&name).unwrap().is_empty() {
  587. self.depths.insert(depths1_key, depth.clone());
  588. }
  589. // 判断是否需要触发ondepth
  590. // 是否是交易盘口
  591. if name == self.trade_name {
  592. // 更新depths
  593. self.depths.insert(depths2_key, depth.clone());
  594. // 允许交易
  595. if self.mode_signal == 0 && self.ready == 1 {
  596. self.on_agg_market();
  597. }
  598. } else if name == self.ref_name[0] { // 判断是否为当前跟踪的盘口
  599. // 判断是否需要触发ontick 对行情进行过滤
  600. // 过滤条件 价格变化很大 时间间隔很长
  601. let mut flag = 0;
  602. let bid_price_rate = (depth[BID_PRICE_INDEX] - self.depths.get(&name).unwrap()[BID_PRICE_INDEX]).abs() / depth[BID_PRICE_INDEX];
  603. let ask_price_rate = (depth[ASK_PRICE_INDEX] - self.depths.get(&name).unwrap()[ASK_PRICE_INDEX]).abs() / depth[ASK_PRICE_INDEX];
  604. let rate = dec!(0.0002);
  605. if bid_price_rate > rate || ask_price_rate > rate || Utc::now().timestamp_millis() - self.on_tick_event_time > 50 {
  606. // 允许交易
  607. flag = 1;
  608. // 更新ontick触发时间记录
  609. self.on_tick_event_time = Utc::now().timestamp_millis();
  610. }
  611. // 更新depths
  612. self.depths.insert(depths3_key, depth);
  613. // 允许交易
  614. if self.mode_signal == 0 && self.ready == 1 && flag == 1 {
  615. // 更新交易数据
  616. self.update_trade_msg();
  617. // 触发事件撤单逻辑
  618. // 更新策略时间
  619. self.strategy.local_time = Utc::now().timestamp_millis();
  620. // time_record.strategy_start = Utc::now().timestamp_micros();
  621. // 产生交易信号
  622. let orders = self.strategy.on_time(&self.trade_msg);
  623. // time_record.strategy_end = Utc::now().timestamp_micros();
  624. if orders.is_not_empty() {
  625. debug!("触发onTick");
  626. self._update_local_orders(&orders);
  627. // time_record.quant_end = Utc::now().timestamp_micros();
  628. //异步交易所处理订单信号
  629. let mut platform_rest_fb = self.platform_rest.clone_box();
  630. // time_record.source = "646 depth".to_string();
  631. // time_record.order_command = orders.to_string();
  632. // info!("订单指令:{:?}", orders);
  633. spawn(async move{
  634. // info!("_update_depth订单指令:{:?}", orders);
  635. platform_rest_fb.command_order(orders).await;
  636. });
  637. }
  638. }
  639. }
  640. }
  641. pub fn update_position(&mut self, data: Vec<Position>) {
  642. if data.is_empty() {
  643. return;
  644. }
  645. let mut position = LocalPosition::new();
  646. for pos in &data {
  647. if pos.position_mode == PositionModeEnum::Long {
  648. position.long_pos = pos.amount;
  649. position.long_avg = pos.price;
  650. } else if pos.position_mode == PositionModeEnum::Short {
  651. position.short_pos = pos.amount.abs();
  652. position.short_avg = pos.price;
  653. }
  654. }
  655. // 更新仓位信息
  656. if position != self.local_position {
  657. info!("收到新的仓位推送, position: {:?}, local_position: {:?}", data, position);
  658. info!("更新本地仓位:{:?}", self.local_position);
  659. self.local_position = position;
  660. }
  661. }
  662. pub fn _update_ticker(&mut self, data: SpecialTicker, name: String) {
  663. self.tickers.insert(name, data);
  664. }
  665. pub fn on_agg_market(&mut self) {
  666. /* 处理聚合行情
  667. 1. 获取聚合行情
  668. 2. 更新预测器
  669. 3. 触发tick回测
  670. */
  671. // 更新聚合市场数据
  672. let agg_market = self.get_all_market_data();
  673. // 更新聚合市场信息
  674. self.trade_msg.market = agg_market;
  675. // 更新预测器
  676. self.predictor.market_info_handler(&self.trade_msg.market);
  677. }
  678. pub fn update_trade_msg(&mut self) {
  679. // 更新保证金
  680. self.trade_msg.cash = self.local_cash.round_dp(10);
  681. self.trade_msg.coin = self.local_coin.round_dp(10);
  682. let position = self.local_position_by_orders.clone();
  683. // 使用本地推算仓位
  684. self.trade_msg.position = position;
  685. let orders = self.local_orders.clone();
  686. self.trade_msg.orders = orders;
  687. // 更新 ref
  688. let mut ref_tickers: BTreeMap<String, Ticker> = BTreeMap::new();
  689. for i in &self.ref_name {
  690. let bp = self.tickers.get(i).unwrap().buy.clone();
  691. let ap = self.tickers.get(i).unwrap().sell.clone();
  692. ref_tickers.insert(i.clone(), Ticker {
  693. time: 0,
  694. high: Default::default(),
  695. low: Default::default(),
  696. sell: ap,
  697. buy: bp,
  698. last: Default::default(),
  699. volume: Default::default(),
  700. });
  701. }
  702. let ref_price: Vec<Vec<Decimal>> = self.predictor.get_ref_price(&ref_tickers);
  703. self.trade_msg.ref_price = ref_price;
  704. }
  705. // 本地记录所有报单信息
  706. pub fn _update_local_orders(&mut self, orders: &OrderCommand) {
  707. let mut limits = HashMap::new();
  708. limits.extend(orders.clone().limits_open);
  709. limits.extend(orders.clone().limits_close);
  710. if !limits.is_empty() {
  711. for j in limits.keys() {
  712. let order_info = OrderInfo {
  713. symbol: self.symbol.clone(),
  714. amount: Decimal::from_str(limits.get(j).unwrap()[0].as_str()).unwrap(),
  715. side: limits.get(j).unwrap()[1].clone(),
  716. price: Decimal::from_str(limits.get(j).unwrap()[2].as_str()).unwrap(),
  717. client_id: limits.get(j).unwrap()[3].clone(),
  718. filled_price: Default::default(),
  719. filled: Decimal::ZERO,
  720. order_id: "".to_string(),
  721. local_time: self.strategy.local_time,
  722. create_time: self.strategy.local_time,
  723. status: "".to_string(),
  724. fee: Default::default(),
  725. trace_stack: Default::default(),
  726. };
  727. // 本地挂单表
  728. self.local_orders.insert(limits.get(j).unwrap()[3].clone(), order_info.clone());
  729. // 本地缓存表
  730. self.local_orders_backup.insert(limits.get(j).unwrap()[3].clone(), order_info);
  731. // 本地缓存cid表
  732. self.local_orders_backup_cid.push(limits.get(j).unwrap()[3].clone());
  733. }
  734. }
  735. if !orders.cancel.is_empty() {
  736. for cancel_key in orders.cancel.keys() {
  737. let cid = orders.cancel.get(cancel_key).unwrap()[0].clone();
  738. if self.local_cancel_log.contains_key(&cid) {
  739. let num = self.local_cancel_log.get(&cid).unwrap() + 1;
  740. self.local_cancel_log.insert(cid, num);
  741. } else {
  742. self.local_cancel_log.insert(cid, 0);
  743. }
  744. }
  745. }
  746. let max_len = 9999usize;
  747. // 清除过于久远的历史记录
  748. if self.local_orders_backup_cid.len() > max_len {
  749. let cid = self.local_orders_backup_cid[0].clone();
  750. // 判断是否超过1个小时 如果超过则移除历史记录
  751. if self.local_orders_backup.contains_key(&cid) {
  752. let local_time = self.local_orders_backup.get(&cid).unwrap().local_time;
  753. if Utc::now().timestamp_millis() - local_time > 3600000 {
  754. self.local_orders_backup.remove(&cid);
  755. self.local_orders_backup_cid.retain(|x| *x != cid)
  756. }
  757. }
  758. }
  759. if self.handled_orders_cid.len() > max_len {
  760. self.handled_orders_cid.remove(0usize);
  761. }
  762. }
  763. // 获取深度信息
  764. pub fn get_all_market_data(&mut self) -> Vec<Decimal> {
  765. // 只能定时触发 组合市场信息=交易盘口+参考盘口
  766. let mut market: Vec<Decimal> = Vec::new();
  767. // 获取交易盘口市场信息
  768. let mut data: Vec<Decimal> = self.local_depths.get(&self.trade_name).unwrap().clone();
  769. self.is_update.insert(self.symbol.clone(), true);
  770. let mut max_min_price = self.max_buy_min_sell_cache.get(&self.trade_name).unwrap().clone();
  771. market.append(&mut data);
  772. market.append(&mut max_min_price);
  773. for i in &self.ref_name {
  774. // 获取参考盘口市场信息
  775. data = self.local_depths.get(i).unwrap().clone();
  776. self.is_update.insert(i.clone(), true);
  777. max_min_price = self.max_buy_min_sell_cache.get(i).unwrap().clone();
  778. data.append(&mut max_min_price);
  779. market.append(&mut data);
  780. }
  781. return market;
  782. }
  783. pub async fn get_exchange_info(&mut self,) {
  784. self.market = self.platform_rest.get_self_market();
  785. }
  786. pub fn update_equity(&mut self, data: Account) {
  787. /*
  788. 更新保证金信息
  789. 合约一直更新
  790. 现货只有当出现异常时更新
  791. */
  792. if self.exchange.contains("spot") {
  793. return;
  794. }
  795. self.local_cash = data.balance * self.used_pct
  796. }
  797. pub async fn update_equity_rest(&mut self) {
  798. match self.platform_rest.get_account().await {
  799. Ok(val) => {
  800. /*
  801. 更新保证金信息
  802. 合约一直更新
  803. 现货只有当出现异常时更新
  804. */
  805. if self.exchange.contains("spot") {
  806. return;
  807. }
  808. self.local_cash = val.balance * self.used_pct
  809. },
  810. Err(e) => {
  811. info!("获取账户信息错误: {:?}", e);
  812. }
  813. }
  814. }
  815. pub async fn check_risk(&mut self) {
  816. // 参数检查的风控
  817. if self.strategy.start_cash == Decimal::ZERO {
  818. warn!("请检查交易账户余额");
  819. warn!(?self.strategy.start_cash);
  820. return;
  821. }
  822. if self.strategy.mp == Decimal::ZERO {
  823. warn!("请检查最新价格");
  824. warn!(?self.strategy.mp);
  825. return;
  826. }
  827. // 不是现货执行的回撤风控1
  828. if !self.exchange.contains("spot") {
  829. let draw_back = Decimal::ONE - self.strategy.equity / self.strategy.max_equity;
  830. if draw_back > self.stop_loss {
  831. let exit_msg = format!("{} 总资金吊灯回撤 {}。当前净值:{}, 最高净值{},触发止损,准备停机。",
  832. self.params.account_name, draw_back, self.strategy.equity, self.strategy.max_equity);
  833. warn!(exit_msg);
  834. self.exit_msg = exit_msg;
  835. self.stop().await;
  836. }
  837. }
  838. // 回撤风控2
  839. let draw_back = self.local_profit / self.strategy.start_equity;
  840. if draw_back < -self.stop_loss {
  841. let exit_msg = format!("{} 交易亏损,触发止损,准备停机。", self.params.account_name);
  842. warn!(exit_msg);
  843. self.exit_msg = exit_msg;
  844. self.stop().await;
  845. }
  846. // 报单延迟风控,平均延迟允许上限5000ms
  847. if self.platform_rest.get_request_avg_delay() > dec!(5000) {
  848. let exit_msg = format!("{} 延迟爆表 触发风控 准备停机。", self.params.account_name);
  849. warn!(exit_msg);
  850. self.exit_msg = exit_msg;
  851. self.stop().await;
  852. }
  853. // 仓位异常风控,只在合约模式下执行
  854. if !self.exchange.contains("spot") {
  855. let long_diff = (self.local_position.long_pos - self.local_position_by_orders.long_pos).abs();
  856. let short_diff = (self.local_position.short_pos - self.local_position_by_orders.short_pos).abs();
  857. let diff_pos = max(long_diff, short_diff);
  858. let diff_pos_value = diff_pos * self.strategy.mp;
  859. if diff_pos_value > self.strategy._min_amount_value {
  860. warn!("{}发现仓位异常", self.params.account_name);
  861. warn!(?self.local_position_by_orders, ?self.local_position);
  862. self.position_check_series.push(1);
  863. } else {
  864. self.position_check_series.push(0);
  865. }
  866. // self.position_check_series长度限制
  867. if self.position_check_series.len() > 30 {
  868. self.position_check_series.remove(0);
  869. }
  870. // 连续不符合判定
  871. if self.position_check_series.iter().sum::<i8>() >= 30 {
  872. let exit_msg = format!("{} 合约连续检查本地仓位和推算仓位不符合,退出。", self.params.account_name);
  873. warn!(exit_msg);
  874. self.exit_msg = exit_msg;
  875. self.stop().await;
  876. }
  877. }
  878. // 下单异常风控
  879. if self.strategy.total_amount == Decimal::ZERO {
  880. let exit_msg = format!("{} 开仓量为0,退出。", self.params.account_name);
  881. warn!(exit_msg);
  882. self.exit_msg = exit_msg;
  883. self.stop().await;
  884. }
  885. // 行情更新异常风控
  886. let mut exchange_names = self.ref_name.clone();
  887. exchange_names.push(self.trade_name.clone());
  888. for exchange_name in exchange_names {
  889. let now_time_millis = Utc::now().timestamp_millis();
  890. let last_update_millis = self.market_update_time.get(&exchange_name).unwrap();
  891. let delay = now_time_millis - last_update_millis;
  892. let limit = global::public_params::MARKET_DELAY_LIMIT;
  893. if delay > limit {
  894. let exit_msg = format!("{} ticker_name:{}, delay:{}ms,行情更新延迟过高,退出。",
  895. self.params.account_name, exchange_name, delay);
  896. warn!(?now_time_millis, ?last_update_millis, ?limit);
  897. warn!(exit_msg);
  898. self.exit_msg = exit_msg;
  899. self.stop().await;
  900. }
  901. }
  902. let local_orders = self.local_orders.clone();
  903. // 订单异常风控
  904. for (client_id, order) in local_orders{
  905. // 订单长时间停留 怀疑漏单 但未必一定漏 5min
  906. if Utc::now().timestamp_millis() - order.local_time > 5 * 60 * 1000 {
  907. let exit_msg = format!("{}订单停留过长,怀疑异常,退出,cid:{}。", self.params.account_name, client_id);
  908. warn!(exit_msg);
  909. self.exit_msg = exit_msg;
  910. self.stop().await;
  911. }
  912. }
  913. // 持仓均价异常风控
  914. if self.strategy.long_pos_bias != Decimal::ZERO {
  915. if self.strategy.long_hold_value > Decimal::TWO * self.strategy._min_amount_value {
  916. if self.strategy.long_pos_bias > dec!(4) || self.strategy.long_pos_bias < -Decimal::TWO {
  917. let exit_msg = format!("{} long_pos_bias: {},持仓均价异常,退出。", self.params.account_name, self.strategy.long_pos_bias);
  918. warn!(exit_msg);
  919. self.exit_msg = exit_msg;
  920. self.stop().await;
  921. }
  922. }
  923. }
  924. if self.strategy.short_pos_bias != Decimal::ZERO {
  925. if self.strategy.short_hold_value > Decimal::TWO * self.strategy._min_amount_value {
  926. if self.strategy.short_pos_bias > dec!(4) || self.strategy.short_pos_bias < -Decimal::TWO {
  927. let exit_msg = format!("{} short_pos_bias: {},持仓均价异常,退出。", self.params.account_name, self.strategy.long_pos_bias);
  928. warn!(exit_msg);
  929. self.exit_msg = exit_msg;
  930. self.stop().await;
  931. }
  932. }
  933. }
  934. // 订单撤单异常风控
  935. for (client_id, cancel_delay) in self.local_cancel_log.clone() {
  936. if cancel_delay > 300 {
  937. let exit_msg = format!("{} 长时间无法撤销,client_id: {},退出。", self.params.account_name, client_id);
  938. warn!(exit_msg);
  939. warn!(?self.strategy.ref_price, ?self.strategy.mp);
  940. self.exit_msg = exit_msg;
  941. self.stop().await;
  942. }
  943. }
  944. // 定价异常风控
  945. if (self.strategy.ref_price - self.strategy.mp).abs() / self.strategy.mp > dec!(0.03) {
  946. let exit_msg = format!("{} 定价偏离过大,怀疑定价异常,退出。", self.params.account_name);
  947. warn!(exit_msg);
  948. warn!(?self.strategy.ref_price, ?self.strategy.mp);
  949. self.exit_msg = exit_msg;
  950. self.stop().await;
  951. }
  952. }
  953. pub async fn check_position(&mut self){
  954. info!("清空挂单!");
  955. match self.platform_rest.cancel_orders().await{
  956. Ok(val)=>{
  957. info!(?val);
  958. },
  959. Err(e)=>{
  960. error!("清空挂单异常: {}", e);
  961. }
  962. };
  963. info!("检查遗漏仓位!");
  964. match self.platform_rest.get_positions().await {
  965. Ok(val)=>{
  966. for position in val {
  967. if !position.symbol.eq_ignore_ascii_case(self.symbol.as_str()){
  968. continue;
  969. }
  970. if position.amount.eq(&Decimal::ZERO) {
  971. continue;
  972. }
  973. match self.platform_rest.get_ticker().await {
  974. Ok(ticker)=>{
  975. let ap = ticker.sell;
  976. let bp = ticker.buy;
  977. let mp = ( ap + bp ) / Decimal::TWO;
  978. let price;
  979. let side;
  980. info!(?position);
  981. match position.position_mode {
  982. PositionModeEnum::Long => {
  983. // pd
  984. price = (mp*dec!(0.999)/self.market.tick_size).floor()*self.market.tick_size;
  985. side = "pd";
  986. },
  987. PositionModeEnum::Short => {
  988. // pk
  989. price = (mp*dec!(1.001)/self.market.tick_size).floor()*self.market.tick_size;
  990. side = "pk";
  991. }
  992. _ => {
  993. info!("仓位匹配失败,不做操作!");
  994. // 执行完当前币对 结束循环
  995. break;
  996. }
  997. }
  998. match self.platform_rest.take_order("t-123", side, price, position.amount.abs()).await {
  999. Ok(order)=>{
  1000. info!("清仓下单,{:?}", order);
  1001. // 执行完当前币对 结束循环
  1002. break;
  1003. },
  1004. Err(error)=>{
  1005. error!("清仓下单异常:{}", error);
  1006. // 执行完当前币对 结束循环
  1007. break;
  1008. }
  1009. };
  1010. },
  1011. Err(err)=>{
  1012. error!("获取当前ticker异常: {}",err)
  1013. }
  1014. }
  1015. }
  1016. },
  1017. Err(error)=>{
  1018. error!("获取仓位信息异常: {}", error);
  1019. }
  1020. }
  1021. }
  1022. pub async fn stop(&mut self){
  1023. /*
  1024. * 停机函数
  1025. * mode_signal 不能小于80
  1026. * 前6秒用于maker平仓
  1027. * 后2秒用于撤maker平仓单
  1028. * 休眠2秒再执行check_position 避免卡单导致漏仓位
  1029. */
  1030. info!("进入停机流程...");
  1031. self.mode_signal = 80;
  1032. sleep(Duration::from_secs(10)).await;
  1033. info!("开始退出操作");
  1034. info!("为避免api失效导致遗漏仓位 建议人工复查");
  1035. self.check_position().await;
  1036. // 开启停机信号
  1037. sleep(Duration::from_secs(3)).await;
  1038. info!("双重检查遗漏仓位");
  1039. self.check_position().await;
  1040. info!("停机退出 停机原因: {}", self.exit_msg);
  1041. // 发送交易状态 await self._post_params()
  1042. // TODO: 向中控发送信号
  1043. self.running.store(false, Ordering::Relaxed);
  1044. info!("退出进程!");
  1045. }
  1046. pub async fn exit(&mut self, delay: i8){
  1047. info!("预约退出操作 delay:{}", delay);
  1048. if delay > 0i8 {
  1049. sleep(Duration::from_secs(delay as u64)).await;
  1050. }
  1051. info!("开始退出操作");
  1052. info!("为避免api失效导致遗漏仓位 建议人工复查");
  1053. self.check_position().await;
  1054. // 开启停机信号
  1055. sleep(Duration::from_secs(3)).await;
  1056. info!("双重检查遗漏仓位");
  1057. self.check_position().await;
  1058. info!("停机退出 停机原因: {}", self.exit_msg);
  1059. // 发送交易状态 await self._post_params()
  1060. // TODO: 向中控发送信号
  1061. self.running.store(false, Ordering::Relaxed);
  1062. info!("退出进程!");
  1063. }
  1064. pub async fn before_trade(&mut self) -> bool {
  1065. sleep(Duration::from_secs(1)).await;
  1066. // 获取市场信息
  1067. self.get_exchange_info().await;
  1068. // 获取价格信息
  1069. let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!");
  1070. let mp = (ticker.buy + ticker.sell) / Decimal::TWO;
  1071. // 获取账户信息
  1072. self.update_equity_rest().await;
  1073. // 初始资金
  1074. let start_cash = self.local_cash.clone();
  1075. let start_coin = self.local_coin.clone();
  1076. if start_cash.is_zero() && start_coin.is_zero() {
  1077. self.exit_msg = format!("{}{}{}{}", "初始为零 cash: ", start_cash, " coin: ", start_coin);
  1078. // 停止程序
  1079. self.stop().await;
  1080. return false;
  1081. }
  1082. info!("初始cash: {start_cash} 初始coin: {start_coin}");
  1083. // 初始化策略基础信息
  1084. if mp <= Decimal::ZERO {
  1085. self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp);
  1086. // 停止程序
  1087. self.stop().await;
  1088. return false;
  1089. } else {
  1090. info!("初始价格为 {}", mp);
  1091. }
  1092. self.strategy.mp = mp.clone();
  1093. self.strategy.start_cash = start_cash.clone();
  1094. self.strategy.start_coin = start_coin.clone();
  1095. self.strategy.start_equity = start_cash + start_coin * mp;
  1096. self.strategy.max_equity = self.strategy.start_equity.clone();
  1097. self.strategy.equity = self.strategy.start_equity.clone();
  1098. self.strategy.total_amount = self.strategy.equity * self.strategy.lever_rate / self.strategy.mp;
  1099. // 获取数量精度
  1100. self.strategy.step_size = self.market.amount_size.clone();
  1101. if self.strategy.step_size > Decimal::ONE {
  1102. self.strategy.step_size = self.strategy.step_size.trunc();
  1103. }
  1104. // 获取价格精度
  1105. self.strategy.tick_size = self.market.tick_size.clone();
  1106. if self.strategy.tick_size > Decimal::ONE {
  1107. self.strategy.tick_size = self.strategy.tick_size.trunc();
  1108. }
  1109. if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero() {
  1110. self.exit_msg = format!("{}{}{}{}", "交易精度未正常获取 step_size: ", self.strategy.step_size, " tick_size:", self.strategy.tick_size);
  1111. // 停止程序
  1112. self.stop().await;
  1113. return false;
  1114. } else {
  1115. info!("数量精度 {}", self.strategy.step_size);
  1116. info!("价格精度 {}", self.strategy.tick_size);
  1117. }
  1118. let grid = Decimal::from(self.params.grid.clone());
  1119. // 计算下单数量
  1120. let long_one_hand_value: Decimal = start_cash * self.params.lever_rate / grid;
  1121. let short_one_hand_value: Decimal;
  1122. let long_one_hand_amount: Decimal = (long_one_hand_value / mp / &self.strategy.step_size).floor() * self.strategy.step_size;
  1123. let short_one_hand_amount: Decimal;
  1124. if self.exchange.contains("spot") {
  1125. short_one_hand_value = start_coin * mp * self.params.lever_rate / grid;
  1126. short_one_hand_amount = (short_one_hand_value / mp / self.strategy.step_size).floor() * self.strategy.step_size;
  1127. } else {
  1128. short_one_hand_value = start_cash * self.params.lever_rate / grid;
  1129. short_one_hand_amount = (short_one_hand_value / mp / self.strategy.step_size).floor() * self.strategy.step_size;
  1130. }
  1131. info!("最低单手交易下单量为 buy: {}, sell: {}", long_one_hand_amount, short_one_hand_amount);
  1132. let hand_min_limit = Decimal::new(5, 0);
  1133. if (long_one_hand_amount.is_zero() && short_one_hand_amount.is_zero()) ||
  1134. (long_one_hand_value < hand_min_limit && short_one_hand_value < hand_min_limit) {
  1135. self.exit_msg = format!("{}{}{}{}", "初始下单量太少 buy: ", long_one_hand_amount, " sell: ", short_one_hand_amount);
  1136. // 停止程序
  1137. self.stop().await;
  1138. return false;
  1139. }
  1140. // 初始化调度器
  1141. self.local_cash = start_cash;
  1142. self.local_coin = start_coin;
  1143. // 清空挂单和仓位
  1144. self.check_position().await;
  1145. /*
  1146. ###### 交易前准备就绪 可以开始交易 ######
  1147. self.loop.create_task(self.rest.go())
  1148. self.loop.create_task(self.on_timer())
  1149. self.loop.create_task(self._run_server())
  1150. self.loop.create_task(self.run_stratey())
  1151. self.loop.create_task(self.early_stop_loop())
  1152. */
  1153. return true;
  1154. }
  1155. }
  1156. pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
  1157. return spawn(async move {
  1158. //定期触发策略
  1159. info!("定时触发器启动");
  1160. info!("前期准备完成");
  1161. sleep(Duration::from_secs(10)).await;
  1162. loop {
  1163. let start_time = Utc::now().timestamp_millis();
  1164. let mut delay = 1u64;
  1165. {
  1166. let mut quant = quant_arc.lock().await;
  1167. if quant.ready == 1 {
  1168. // 更新交易信息集合
  1169. quant.update_trade_msg();
  1170. if quant.mode_signal != 0 {
  1171. if quant.mode_signal > 1 {
  1172. quant.mode_signal -= 1;
  1173. }
  1174. if quant.mode_signal == 1 {
  1175. return;
  1176. }
  1177. // 触发策略 更新策略时间
  1178. quant.strategy.local_time = Utc::now().timestamp_millis();
  1179. let trade_msg = quant.trade_msg.clone();
  1180. let mut platform_rest_fb = quant.platform_rest.clone_box();
  1181. // 获取信号
  1182. if quant.mode_signal > 20 {
  1183. // 先执行onExit
  1184. let orders = quant.strategy.on_exit(&trade_msg);
  1185. if orders.is_not_empty() {
  1186. info!("触发onExit");
  1187. info!(?orders);
  1188. quant._update_local_orders(&orders);
  1189. spawn(async move {
  1190. platform_rest_fb.command_order(orders).await;
  1191. });
  1192. }
  1193. } else {
  1194. // 再执行onSleep
  1195. let orders = quant.strategy.on_sleep(&trade_msg);
  1196. // 记录指令触发信息
  1197. if orders.is_not_empty() {
  1198. info!("触发onSleep");
  1199. info!(?orders);
  1200. quant._update_local_orders(&orders);
  1201. spawn(async move {
  1202. platform_rest_fb.command_order(orders).await;
  1203. });
  1204. }
  1205. }
  1206. }
  1207. } else {
  1208. quant.check_ready();
  1209. }
  1210. // 计算耗时并进行休眠
  1211. let pass_time = (Utc::now().timestamp_millis() - start_time).to_u64().unwrap();
  1212. if pass_time < quant.interval {
  1213. delay = quant.interval - pass_time;
  1214. }
  1215. }
  1216. sleep(Duration::from_millis(delay)).await;
  1217. }
  1218. });
  1219. }
  1220. // 定期触发的系统逻辑
  1221. pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {
  1222. let quant_arc_clone = quant_arc.clone();
  1223. return spawn(async move {
  1224. tokio::time::sleep(Duration::from_secs(20)).await;
  1225. loop {
  1226. tokio::time::sleep(Duration::from_secs(10)).await;
  1227. let mut quant = quant_arc_clone.lock().await;
  1228. {
  1229. // 检查风控
  1230. quant.check_risk().await;
  1231. // 线程停止信号
  1232. if quant.mode_signal == 1 {
  1233. return
  1234. }
  1235. // 计算预估成交额
  1236. let total_trade_value = quant.local_buy_value + quant.local_sell_value;
  1237. let time_diff = Decimal::from(Utc::now().timestamp_millis() - quant.start_time);
  1238. let trade_vol_24h = (total_trade_value / time_diff) * dec!(86400);
  1239. quant.strategy.trade_vol_24h_w = trade_vol_24h / dec!(10000);
  1240. quant.strategy.trade_vol_24h_w.rescale(2);
  1241. // TODO quant没有rest
  1242. // info!("Rest报单平均延迟{}ms", quant.rest.avg_delay);
  1243. // info!("Rest报单最高延迟{}ms", quant.rest.max_delay);
  1244. for (name, interval) in &quant.market_update_interval {
  1245. debug!("WS盘口{}行情平均更新间隔{}ms。", name, interval);
  1246. }
  1247. }
  1248. }
  1249. });
  1250. }