core.rs 82 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766
  1. use tokio::time::Instant;
  2. use std::collections::{BTreeMap, HashMap};
  3. use std::io::Error;
  4. use std::str::FromStr;
  5. use std::sync::{Arc};
  6. use std::sync::atomic::{AtomicBool, Ordering};
  7. use std::time::Duration;
  8. use chrono::{Utc};
  9. use rust_decimal::{Decimal};
  10. use rust_decimal::prelude::{ToPrimitive};
  11. use rust_decimal_macros::dec;
  12. use tokio::spawn;
  13. use tokio::sync::mpsc::{Sender};
  14. use tokio::sync::{Mutex};
  15. use tokio::task::JoinHandle;
  16. use tokio::time::sleep;
  17. use tracing::{error, info, warn};
  18. use global::cci::CentralControlInfo;
  19. use global::params::Params;
  20. use global::trace_stack::TraceStack;
  21. use standard::{Account, Depth, ForceOrder, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Record, SpecialTicker, Ticker, Trade};
  22. use standard::exchange::{Exchange};
  23. use standard::exchange::ExchangeEnum::{BinanceSwap, BybitSwap, BitgetSwap, GateSwap};
  24. use crate::model::{LocalPosition, OrderInfo, TokenParam};
  25. use crate::predictor::Predictor;
  26. use crate::strategy::Strategy;
  27. use crate::utils;
  28. use crate::utils::clip;
  29. pub struct Core {
  30. pub params: Params,
  31. // 启动时间
  32. pub start_time: i64,
  33. // 币对
  34. pub symbol: String,
  35. // 基础货币
  36. pub base: String,
  37. // 报价货币
  38. pub quote: String,
  39. //
  40. pub strategy: Strategy,
  41. // 本地挂单表
  42. pub local_orders: HashMap<String, OrderInfo>,
  43. // 本地订单缓存队列
  44. pub local_orders_backup: HashMap<String, OrderInfo>,
  45. // 本地订单缓存cid队列
  46. pub local_orders_backup_cid: Vec<String>,
  47. // 本地已处理cid缓存队列
  48. pub handled_orders_cid: Vec<String>,
  49. // 本地利润值
  50. pub local_profit: Decimal,
  51. // 本地U保证金
  52. pub local_cash: Decimal,
  53. // 本地币保证金
  54. pub local_coin: Decimal,
  55. // 仓位信息
  56. pub local_position: LocalPosition,
  57. // 仓位信息-来自订单
  58. pub local_position_by_orders: LocalPosition,
  59. //
  60. pub local_buy_amount: Decimal,
  61. pub local_sell_amount: Decimal,
  62. pub local_buy_value: Decimal,
  63. pub local_sell_value: Decimal,
  64. pub local_cancel_log: HashMap<String, i64>,
  65. pub interval: u64,
  66. pub exchange: String,
  67. pub exit_msg: String,
  68. // 仓位检查结果序列
  69. pub position_check_series: Vec<i8>,
  70. // 止损大小
  71. pub stop_loss: Decimal,
  72. // 资金使用率
  73. pub used_pct: Decimal,
  74. // 启停信号 0 表示运行 大于1开始倒计时 1时停机
  75. pub mode_signal: i8,
  76. // 交易盘口订单流更新时间
  77. pub trade_order_update_time: i64,
  78. // onTick触发时间记录
  79. pub on_tick_event_time: i64,
  80. // 盘口ticker信息
  81. pub tickers: HashMap<String, SpecialTicker>,
  82. // 盘口 depth信息
  83. pub depths: HashMap<String, Vec<Decimal>>,
  84. // 行情更新延迟监控(风控)
  85. pub market_update_time: HashMap<String, i64>,
  86. pub market_update_interval: HashMap<String, Decimal>,
  87. pub ref_num: i8,
  88. pub ref_name: Vec<String>,
  89. pub trade_name: String,
  90. pub ready: i8,
  91. pub predictor: Predictor,
  92. pub market: Market,
  93. pub platform_rest: Box<dyn Platform + Send + Sync>,
  94. // 市场最优买卖价
  95. pub max_buy_min_sell_cache: HashMap<String, Vec<Decimal>>,
  96. // 最近一次的depth信息
  97. pub local_depths: HashMap<String, Vec<Decimal>>,
  98. pub is_update: HashMap<String, bool>,
  99. pub running: Arc<AtomicBool>,
  100. pub hold_coin: Decimal,
  101. // 打印限频
  102. pub prev_log_ready_timestamp: i64,
  103. pub log_ready_log_interval: i64,
  104. // 中控
  105. pub cci_arc: Arc<Mutex<CentralControlInfo>>, // 中控信息汇集
  106. // 老版的trader_msg留下来的
  107. pub agg_market: Vec<Decimal>,
  108. pub ref_price: Vec<Vec<Decimal>>,
  109. pub predict: Decimal,
  110. }
  111. impl Core {
  112. pub async fn new(exchange: String,
  113. params: Params,
  114. exchange_params: BTreeMap<String, String>,
  115. order_sender: Sender<Order>,
  116. error_sender: Sender<Error>,
  117. running: Arc<AtomicBool>,
  118. cci_arc: Arc<Mutex<CentralControlInfo>>) -> Core {
  119. let symbol = params.pair.clone();
  120. let pairs: Vec<&str> = params.pair.split('_').collect();
  121. let mut core_obj = Core {
  122. params: params.clone(),
  123. start_time: 0,
  124. symbol: symbol.clone(),
  125. base: pairs[0].to_string(),
  126. quote: pairs[1].to_string(),
  127. // 现货底仓
  128. hold_coin: clip(params.hold_coin, Decimal::ZERO, Decimal::ONE_HUNDRED * Decimal::ONE_HUNDRED),
  129. strategy: Strategy::new(&params, true),
  130. local_orders: Default::default(),
  131. local_orders_backup: Default::default(),
  132. local_orders_backup_cid: Default::default(),
  133. handled_orders_cid: Default::default(),
  134. local_profit: Default::default(),
  135. local_cash: Default::default(),
  136. local_coin: Default::default(),
  137. local_position: LocalPosition {
  138. long_pos: Default::default(),
  139. short_pos: Default::default(),
  140. long_avg: Default::default(),
  141. short_avg: Default::default(),
  142. },
  143. local_position_by_orders: LocalPosition {
  144. long_pos: Default::default(),
  145. short_pos: Default::default(),
  146. long_avg: Default::default(),
  147. short_avg: Default::default(),
  148. },
  149. local_buy_amount: Default::default(),
  150. local_sell_amount: Default::default(),
  151. local_buy_value: Default::default(),
  152. local_sell_value: Default::default(),
  153. local_cancel_log: Default::default(),
  154. interval: params.interval,
  155. exchange: params.exchange.clone(),
  156. exit_msg: "正常退出".to_string(),
  157. position_check_series: Default::default(),
  158. stop_loss: params.stop_loss,
  159. used_pct: dec!(1),
  160. mode_signal: 0,
  161. trade_order_update_time: Utc::now().timestamp_millis(),
  162. on_tick_event_time: Utc::now().timestamp_millis(),
  163. tickers: Default::default(),
  164. depths: Default::default(),
  165. market_update_time: Default::default(),
  166. market_update_interval: Default::default(),
  167. ref_num: params.ref_exchange.len() as i8,
  168. ref_name: Default::default(),
  169. trade_name: "".to_string(),
  170. ready: 0,
  171. predictor: Predictor::new(cci_arc.clone(), params.clone()),
  172. market: Market {
  173. symbol: symbol.clone(),
  174. base_asset: "".to_string(),
  175. quote_asset: "".to_string(),
  176. tick_size: Default::default(),
  177. price_precision: Default::default(),
  178. amount_precision: Default::default(),
  179. min_qty: Default::default(),
  180. max_qty: Default::default(),
  181. min_notional: Default::default(),
  182. max_notional: Default::default(),
  183. multiplier: Default::default(),
  184. amount_size: Default::default(),
  185. },
  186. platform_rest: match exchange.as_str() {
  187. // "kucoin_usdt_swap" => {
  188. // Exchange::new(KucoinSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  189. // }
  190. "gate_usdt_swap" => {
  191. Exchange::new(GateSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  192. }
  193. // "gate_usdt_spot" => {
  194. // Exchange::new(GateSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  195. // }
  196. "binance_usdt_swap" => {
  197. Exchange::new(BinanceSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  198. }
  199. // "binance_spot" => {
  200. // Exchange::new(BinanceSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  201. // }
  202. // "bitget_spot" => {
  203. // Exchange::new(BitgetSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  204. // }
  205. "bitget_usdt_swap" => {
  206. Exchange::new(BitgetSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  207. }
  208. // "okex_usdt_swap" => {
  209. // Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  210. // }
  211. "bybit_usdt_swap" => {
  212. Exchange::new(BybitSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  213. }
  214. // "coinex_usdt_swap" => {
  215. // Exchange::new(CoinexSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  216. // }
  217. // "htx_usdt_swap" => {
  218. // Exchange::new(HtxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
  219. // }
  220. _ => {
  221. error!("203未找到对应的交易所rest枚举!");
  222. panic!("203未找到对应的交易所rest枚举!");
  223. }
  224. },
  225. max_buy_min_sell_cache: Default::default(),
  226. local_depths: Default::default(),
  227. is_update: Default::default(),
  228. running,
  229. prev_log_ready_timestamp: 0,
  230. log_ready_log_interval: 10 * 1000,
  231. cci_arc,
  232. agg_market: vec![],
  233. ref_price: vec![],
  234. predict: Default::default(),
  235. };
  236. for i in 0..=params.ref_exchange.len() - 1 {
  237. // 拼接不会消耗原字符串
  238. let tickers_key: String = format!("{}{}{}{}", params.ref_exchange[i], "@", params.ref_pair[i], "@ref");
  239. let ref_name_element = tickers_key.clone();
  240. let depths_key: String = tickers_key.clone();
  241. let market_update_time_key = tickers_key.clone();
  242. let market_update_interval_key = tickers_key.clone();
  243. let max_buy_min_sell_cache_key = tickers_key.clone();
  244. core_obj.tickers.insert(tickers_key, SpecialTicker::new());
  245. core_obj.ref_name.push(ref_name_element);
  246. core_obj.depths.insert(depths_key, Default::default());
  247. core_obj.market_update_time.insert(market_update_time_key, Default::default());
  248. core_obj.market_update_interval.insert(market_update_interval_key, Default::default());
  249. core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
  250. }
  251. let name = format!("{}{}{}", core_obj.exchange.clone(), "@", core_obj.symbol);
  252. let market_update_time_key = name.clone();
  253. let market_update_interval_key = name.clone();
  254. let tickers_key = name.clone();
  255. let depths_key = name.clone();
  256. let max_buy_min_sell_cache_key = name.clone();
  257. core_obj.trade_name = name;
  258. core_obj.market_update_time.insert(market_update_time_key, Default::default());
  259. core_obj.market_update_interval.insert(market_update_interval_key, Default::default());
  260. core_obj.tickers.insert(tickers_key, SpecialTicker::new());
  261. core_obj.depths.insert(depths_key, Default::default());
  262. core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
  263. // broker.newWs
  264. let mut price_alpha: Vec<Decimal> = Vec::new();
  265. for ref_pair_str in params.ref_pair {
  266. if params.pair.contains("1000") && !ref_pair_str.contains("1000") {
  267. price_alpha.push(dec!(1000.0));
  268. } else if !params.pair.contains("1000") && ref_pair_str.contains("1000") {
  269. price_alpha.push(dec!(0.001))
  270. } else {
  271. price_alpha.push(dec!(1.0));
  272. }
  273. }
  274. info!("价格系数:{:?}", price_alpha);
  275. return core_obj;
  276. }
  277. pub async fn update_force_order(&mut self, force_order: ForceOrder) {
  278. self.predictor.on_force_order(force_order).await;
  279. }
  280. // #[instrument(skip(self, data, trace_stack), level="TRACE")]
  281. pub async fn update_order(&mut self, data: Vec<OrderInfo>, trace_stack: TraceStack) {
  282. for order in data {
  283. self.update_local_order(order, trace_stack.clone()).await;
  284. }
  285. }
  286. // #[instrument(skip(self, data, trace_stack), level="TRACE")]
  287. pub async fn update_local_order(&mut self, data: OrderInfo, trace_stack: TraceStack) {
  288. // if data.filled != Decimal::ZERO {
  289. // // info!("\n\n");
  290. // // info!("接收到订单信息①:{:?}", data);
  291. // }
  292. /*
  293. 更新订单
  294. 首先直接复写本地订单
  295. 1、如果是开仓单
  296. 如果新增: 增加本地订单
  297. 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位
  298. 如果成交: 删除本地订单 发送平仓订单 修改本地仓位
  299. 2、如果是平仓单
  300. 如果新增: 增加本地订单
  301. 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位
  302. 如果成交: 删除本地订单 修改本地仓位
  303. NEW 可以从 ws / rest 来
  304. REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
  305. 为了防止下单失败依然有订单成交 本地需要做一个缓存
  306. */
  307. // 触发订单更新
  308. self.trade_order_update_time = Utc::now().timestamp_millis();
  309. // 更新跟踪
  310. if self.local_orders.contains_key(&data.client_id) {
  311. let mut order_info = self.local_orders.get(&data.client_id).unwrap().clone();
  312. if data.trace_stack.after_network != 0 { order_info.trace_stack = data.trace_stack.clone() }
  313. self.local_orders.insert(data.client_id.clone(), order_info);
  314. }
  315. // 新增订单推送 仅需要cid oid信息
  316. if data.status == "NEW" {
  317. // 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控)
  318. if self.local_orders.contains_key(&data.client_id) {
  319. let mut order_info = self.local_orders.get(&data.client_id).unwrap().clone();
  320. order_info.order_id = data.order_id;
  321. order_info.local_time = Utc::now().timestamp_millis();
  322. self.local_orders.insert(data.client_id.clone(), order_info);
  323. }
  324. } else if data.status == "REMOVE" {
  325. // 如果在撤单记录中 说明此订单结束生命周期 可以移除记录
  326. if self.local_cancel_log.contains_key(&data.client_id) {
  327. self.local_cancel_log.remove(&data.client_id);
  328. }
  329. // 在cid缓存队列中 说明是本策略的订单
  330. if self.local_orders_backup.contains_key(&data.client_id) {
  331. // 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算
  332. if self.handled_orders_cid.contains(&data.client_id) {
  333. // info!("订单已经参与过仓位计算 拒绝重复进行计算, 订单号:{}", data.client_id);
  334. } else {
  335. // 添加进已处理队列
  336. self.handled_orders_cid.push(data.client_id.clone());
  337. // 提取成交信息 方向 价格 量
  338. let filled = data.filled;
  339. let side = self.local_orders_backup.get(&data.client_id).unwrap().side.clone();
  340. let mut filled_price = Decimal::ZERO;
  341. if data.filled_price > filled_price {
  342. filled_price = data.filled_price;
  343. } else {
  344. filled_price = self.local_orders_backup.get(&data.client_id).unwrap().price.clone();
  345. }
  346. // 只有开仓成交才触发onPosition
  347. // 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况
  348. if filled > Decimal::ZERO {
  349. let mut filled_order = data.clone();
  350. filled_order.side = side.clone();
  351. // info!("移除本地订单:{:?}, local_by_orders: {:?}", filled_order, self.local_position_by_orders);
  352. // 如果是现货交易 还需要修改equity
  353. if self.exchange.contains("spot") {
  354. // 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
  355. let fee = data.fee;
  356. // buy 开多
  357. if side == "kd" {
  358. self.local_buy_amount += filled - fee;
  359. self.local_buy_value += (filled - fee) * filled_price;
  360. let new_long_pos = self.local_position_by_orders.long_pos + filled - fee;
  361. if new_long_pos == Decimal::ZERO {
  362. self.local_position_by_orders.long_avg = Decimal::ZERO;
  363. self.local_position_by_orders.long_pos = Decimal::ZERO;
  364. } else {
  365. self.local_position_by_orders.long_avg = (self.local_position_by_orders.long_pos * self.local_position_by_orders.long_avg +
  366. filled * filled_price) / new_long_pos;
  367. self.local_position_by_orders.long_pos = new_long_pos;
  368. }
  369. self.local_cash -= filled * filled_price;
  370. self.local_coin = filled - fee;
  371. // sell 平多
  372. } else if side == "pd" {
  373. self.local_sell_amount += filled;
  374. self.local_sell_value += filled * filled_price;
  375. self.local_profit += filled * (filled_price - self.local_position_by_orders.long_avg);
  376. let new_long_pos = self.local_position_by_orders.long_pos - filled;
  377. if new_long_pos == Decimal::ZERO {
  378. self.local_position_by_orders.long_avg = Decimal::ZERO;
  379. self.local_position_by_orders.long_pos = Decimal::ZERO;
  380. } else {
  381. self.local_position_by_orders.long_pos = new_long_pos;
  382. }
  383. self.local_cash += filled * filled_price - fee;
  384. self.local_coin -= filled;
  385. // buy 平空
  386. } else if side == "pk" {
  387. self.local_buy_amount += filled - fee;
  388. self.local_buy_value += (filled - fee) * filled_price;
  389. self.local_profit += filled * (self.local_position_by_orders.short_avg - filled_price);
  390. let new_short_pos = self.local_position_by_orders.short_pos - filled - fee;
  391. if new_short_pos == Decimal::ZERO {
  392. self.local_position_by_orders.short_avg = Decimal::ZERO;
  393. self.local_position_by_orders.short_pos = Decimal::ZERO;
  394. } else {
  395. self.local_position_by_orders.short_pos = new_short_pos;
  396. }
  397. self.local_cash -= filled * filled_price;
  398. self.local_coin += filled - fee;
  399. // sell 开空
  400. } else if side == "kk" {
  401. self.local_sell_amount += filled;
  402. self.local_sell_value += filled * filled_price;
  403. let new_short_pos = self.local_position_by_orders.short_pos + filled;
  404. if new_short_pos == Decimal::ZERO {
  405. self.local_position_by_orders.short_avg = Decimal::ZERO;
  406. self.local_position_by_orders.short_pos = Decimal::ZERO;
  407. } else {
  408. self.local_position_by_orders.short_avg = (self.local_position_by_orders.short_pos * self.local_position_by_orders.short_avg
  409. + filled * filled_price) / new_short_pos;
  410. self.local_position_by_orders.short_pos = new_short_pos;
  411. }
  412. self.local_cash += filled * filled_price - fee;
  413. self.local_coin -= filled;
  414. } else {
  415. info!("错误的仓位方向{}", side);
  416. }
  417. // 合约订单流仓位计算
  418. } else {
  419. // buy 开多
  420. if side == "kd" {
  421. self.local_buy_amount += filled;
  422. self.local_buy_value += filled * filled_price;
  423. let new_long_pos = self.local_position_by_orders.long_pos + filled;
  424. if new_long_pos == Decimal::ZERO {
  425. self.local_position_by_orders.long_avg = Decimal::ZERO;
  426. self.local_position_by_orders.long_pos = Decimal::ZERO;
  427. } else {
  428. self.local_position_by_orders.long_avg = (self.local_position_by_orders.long_pos *
  429. self.local_position_by_orders.long_avg + filled * filled_price) / new_long_pos;
  430. self.local_position_by_orders.long_pos = self.local_position_by_orders.long_pos + filled;
  431. }
  432. // sell 开空
  433. } else if side == "kk" {
  434. self.local_sell_amount += filled;
  435. self.local_sell_value += filled * filled_price;
  436. let new_short_pos = self.local_position_by_orders.short_pos + filled;
  437. if new_short_pos == Decimal::ZERO {
  438. self.local_position_by_orders.short_avg = Decimal::ZERO;
  439. self.local_position_by_orders.short_pos = Decimal::ZERO;
  440. } else {
  441. self.local_position_by_orders.short_avg = (self.local_position_by_orders.short_pos * self.local_position_by_orders.short_avg + filled * filled_price) / new_short_pos;
  442. self.local_position_by_orders.short_pos = self.local_position_by_orders.short_pos + filled;
  443. }
  444. // sell 平多
  445. } else if side == "pd" {
  446. self.local_sell_amount += filled;
  447. self.local_sell_value += filled * filled_price;
  448. self.local_position_by_orders.long_pos = self.local_position_by_orders.long_pos - filled;
  449. if self.local_position_by_orders.long_pos == Decimal::ZERO {
  450. self.local_position_by_orders.long_avg = Decimal::ZERO;
  451. }
  452. // buy 平空
  453. } else if side == "pk" {
  454. self.local_buy_amount += filled;
  455. self.local_buy_value += filled * filled_price;
  456. self.local_position_by_orders.short_pos = self.local_position_by_orders.short_pos - filled;
  457. if self.local_position_by_orders.short_pos == Decimal::ZERO {
  458. self.local_position_by_orders.short_avg = Decimal::ZERO;
  459. }
  460. } else {
  461. error!("错误的仓位方向{}", side);
  462. }
  463. // 统计合约交易手续费 正fee为扣手续费 负fee为返佣
  464. if data.fee > Decimal::ZERO {
  465. self.local_profit -= data.fee;
  466. }
  467. }
  468. // info!("成交单耗时数据:{}", time_record.to_string());
  469. info!("更新推算仓位 {:?}", self.local_position_by_orders);
  470. // 本地计算利润
  471. self._print_local_trades_summary();
  472. // 打印各类信息
  473. self.strategy.local_orders = self.local_orders.clone();
  474. self.strategy._print_summary();
  475. info!("------------------------------------------------------------------------------------");
  476. }
  477. // 每次有订单变动就触发一次策略
  478. if self.mode_signal == 0 && self.ready == 1 {
  479. // 更新交易数据
  480. self.update_trade_msg();
  481. // 触发策略挂单逻辑
  482. // 更新策略时间
  483. self.strategy.local_time = Utc::now().timestamp_millis();
  484. // trace_stack.on_before_strategy();
  485. let mut order = self.strategy.do_strategy(&mut self.predictor, &self.local_orders, &self.local_position, &self.local_coin, &self.local_cash);
  486. // trace_stack.on_after_strategy();
  487. // 记录指令触发信息
  488. if order.is_not_empty() {
  489. // info!("触发onOrder");
  490. self._update_local_orders(&mut order);
  491. //交易所处理订单信号
  492. let mut platform_rest_fb = self.platform_rest.clone_box();
  493. let mut ts = trace_stack.clone();
  494. ts.set_order_command(order.to_string());
  495. // ts.on_before_send_thread();
  496. // ts.on_before_send();
  497. // info!("update_order 订单指令:{:?}", order);
  498. spawn(async move {
  499. platform_rest_fb.command_order(&mut order, &mut ts).await;
  500. });
  501. }
  502. }
  503. }
  504. } else {
  505. // info!("订单不属于本策略 拒绝进行仓位计算: {}", data.client_id);
  506. }
  507. if self.local_orders.contains_key(&data.client_id) {
  508. // 成交数量不为空,则打印耗时追踪
  509. if data.filled > Decimal::ZERO {
  510. let local_order = self.local_orders.get(&data.client_id).unwrap();
  511. info!("订单耗时追踪:{:?}", local_order.trace_stack.to_string());
  512. }
  513. // debug!("删除本地订单, client_id:{:?}", data);
  514. self.local_orders.remove(&data.client_id);
  515. } else {
  516. // debug!("该订单不在本地挂单表中, order:{:?}", data);
  517. }
  518. } else {
  519. error!("未知的订单事件类型:{:?}", data);
  520. }
  521. }
  522. // #[instrument(skip(self), level="TRACE")]
  523. pub fn _print_local_trades_summary(&mut self) {
  524. // 计算本地累计利润
  525. let local_buy_amount = self.local_buy_amount.round_dp(5);
  526. let local_buy_value = self.local_buy_value.round_dp(5);
  527. let local_sell_amount = self.local_sell_amount.round_dp(5);
  528. let local_sell_value = self.local_sell_value.round_dp(5);
  529. if self.strategy.mp > Decimal::ZERO {
  530. let unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp;
  531. let realized = local_sell_value - local_buy_value;
  532. let local_profit = (unrealized + realized).round_dp(5);
  533. self.strategy.local_profit = local_profit;
  534. info!("买量 {},卖量 {},买额{},卖额{}", local_buy_amount, local_sell_amount, local_buy_value, local_sell_value);
  535. }
  536. }
  537. // 检测初始数据是否齐全
  538. // #[instrument(skip(self), level="TRACE")]
  539. pub fn check_ready(&mut self) {
  540. // 检查 ticker 行情
  541. // for i in &self.ref_name {
  542. // if self.tickers.is_empty() || !self.tickers.contains_key(i) {
  543. // self.log_ready_status(format!("529参考盘口ticker未准备好: {:?}", self.tickers));
  544. // return;
  545. // } else {
  546. // if self.tickers.get(i).unwrap().buy == dec!(0) || self.tickers.get(i).unwrap().sell == dec!(0) {
  547. // self.log_ready_status(format!("533参考盘口ticker未准备好: {:?}", self.tickers));
  548. // return;
  549. // }
  550. // }
  551. // }
  552. // if self.tickers.contains_key(&self.trade_name) {
  553. // if self.tickers.get(&self.trade_name).unwrap().buy == dec!(0) || self.tickers.get(&self.trade_name).unwrap().sell == dec!(0) {
  554. // self.log_ready_status(format!("540交易盘口ticker未准备好: {:?}", self.tickers));
  555. // return;
  556. // }
  557. // } else {
  558. // self.log_ready_status(format!("544交易盘口ticker未准备好: {:?}", self.tickers));
  559. // return;
  560. // }
  561. // // 检查 market 行情
  562. // self.agg_market = self.get_all_market_data();
  563. // if self.agg_market.len() != LENGTH * (1usize + self.ref_num as usize) {
  564. // self.log_ready_status(format!("550聚合行情未准备好: market长度:{}, 检验数: {}", self.agg_market.len(), LENGTH * (1usize + self.ref_num as usize)));
  565. // return;
  566. // } else {
  567. // // 如果准备就绪,则可以开始交易
  568. // info!("----------------------------------聚合行情准备就绪,可以开始交易---------------------------------");
  569. // self.predictor.market_info_handler(&self.agg_market);
  570. // self.ready = 1;
  571. // }
  572. self.ready = 1;
  573. }
  574. // #[instrument(skip(self, msg), level="TRACE")]
  575. pub fn log_ready_status(&mut self, msg: String) {
  576. // 隔一会再打印未准备就绪的台词
  577. let now_timestamp = Utc::now().timestamp_millis();
  578. if now_timestamp - self.prev_log_ready_timestamp > self.log_ready_log_interval {
  579. self.prev_log_ready_timestamp = now_timestamp;
  580. info!("{}", msg);
  581. }
  582. }
  583. pub async fn on_trade(&mut self, trade: &Trade, _name_ref: &String, _trace_stack: &mut TraceStack, index: usize) {
  584. self.predictor.on_trade(trade, index).await;
  585. }
  586. pub async fn on_ticker(&mut self, ticker: &Ticker, _trace_stack: &mut TraceStack) {
  587. self.predictor.on_ticker(ticker).await;
  588. }
  589. pub async fn on_record(&mut self, record: &Record) {
  590. self.predictor.on_record(record).await;
  591. }
  592. // #[instrument(skip(self, depth, name_ref, trace_stack), level="TRACE")]
  593. pub async fn on_depth(&mut self, depth: &Depth, name_ref: &String, trace_stack: &mut TraceStack, index: usize) {
  594. // ================================ 刷新更新间隔 =========================================
  595. let now_time = depth.time.to_i64().unwrap();
  596. if self.market_update_time.contains_key(name_ref) && *self.market_update_time.get(name_ref).unwrap() != 0i64 {
  597. let interval = Decimal::from(now_time - self.market_update_time.get(name_ref).unwrap());
  598. if *self.market_update_interval.get(name_ref).unwrap() == dec!(0) {
  599. self.market_update_interval.insert(name_ref.clone(), interval);
  600. } else {
  601. let value = self.market_update_interval.get(name_ref).unwrap();
  602. self.market_update_interval.insert(name_ref.clone(), value * dec!(0.999) + interval * dec!(0.001));
  603. }
  604. }
  605. self.market_update_time.insert(self.trade_name.clone(), now_time);
  606. self.market_update_time.insert(self.ref_name[0].clone(), now_time);
  607. // ================================ 在系统已经准备就绪的情况下,更新相关参数 =========================================
  608. if self.mode_signal == 0 && self.ready == 1 {
  609. // 更新公平价格
  610. self.predictor.on_depth(depth, index).await;
  611. // 触发事件撤单逻辑
  612. // 更新策略时间
  613. self.strategy.local_time = Utc::now().timestamp_millis();
  614. // 产生交易信号
  615. let mut orders = self.strategy.do_strategy(&mut self.predictor, &self.local_orders, &self.local_position, &self.local_coin, &self.local_cash);
  616. trace_stack.on_after_strategy();
  617. if orders.is_not_empty() {
  618. let mut platform_rest_fb = self.platform_rest.clone_box();
  619. // 先更新本地记录再发单。
  620. self._update_local_orders(&mut orders);
  621. // info!("订单指令:{:?}", orders);
  622. let mut ts = trace_stack.clone();
  623. spawn(async move {
  624. platform_rest_fb.command_order(&mut orders, &mut ts).await;
  625. });
  626. }
  627. }
  628. // 同步更新核心数据到中控数据显示和策略层
  629. {
  630. let mut unrealized_pn_l = self.local_profit;
  631. unrealized_pn_l.rescale(4);
  632. let mut now_price = self.strategy.mp;
  633. now_price.rescale(8);
  634. let mut now_balance = (self.local_cash + self.local_coin * self.predictor.mid_price) / self.used_pct;
  635. now_balance.rescale(4);
  636. let mut cci = self.cci_arc.lock().await;
  637. cci.unrealized_pn_l = unrealized_pn_l;
  638. cci.now_price = now_price;
  639. cci.now_balance = now_balance;
  640. self.predictor.on_balance(now_balance).await;
  641. }
  642. }
  643. // #[instrument(skip(self, data), level="TRACE")]
  644. pub async fn update_position(&mut self, data: Vec<Position>) {
  645. if data.is_empty() {
  646. return;
  647. }
  648. let mut position = LocalPosition::new();
  649. let mut pos_avg_price = Decimal::ZERO;
  650. for pos in &data {
  651. pos_avg_price = pos.price;
  652. if pos.position_mode == PositionModeEnum::Long {
  653. position.long_pos = pos.amount;
  654. position.long_avg = pos.price;
  655. } else if pos.position_mode == PositionModeEnum::Short {
  656. position.short_pos = pos.amount.abs();
  657. position.short_avg = pos.price;
  658. }
  659. }
  660. // 更新仓位信息
  661. if position != self.local_position {
  662. info!("收到新的仓位推送: {:?}", data);
  663. info!("更新本地仓位: {:?}", position);
  664. info!("\n\n");
  665. self.local_position = position;
  666. }
  667. // 持仓相关信息
  668. let mut pos = self.local_position_by_orders.long_pos - self.local_position_by_orders.short_pos;
  669. if !self.exchange.contains("spot") {
  670. pos = self.local_position.long_pos - self.local_position.short_pos;
  671. }
  672. pos.rescale(8);
  673. self.predictor.on_inventory(&pos, &pos_avg_price, &self.strategy.min_amount_value).await;
  674. let mut entry_price;
  675. if pos.gt(&Decimal::ZERO) {
  676. entry_price = self.local_position_by_orders.long_avg;
  677. } else {
  678. entry_price = self.local_position_by_orders.short_avg;
  679. }
  680. entry_price.rescale(8);
  681. let mut cci = self.cci_arc.lock().await;
  682. cci.pos = pos;
  683. cci.entry_price = entry_price;
  684. }
  685. // #[instrument(skip(self), level="TRACE")]
  686. // pub fn on_agg_market(&mut self) {
  687. // /* 处理聚合行情
  688. // 1. 获取聚合行情
  689. // 2. 更新预测器
  690. // 3. 触发tick回测
  691. // */
  692. // // 更新聚合市场数据
  693. // // 更新聚合市场信息
  694. // self.agg_market = self.get_all_market_data();
  695. // // 更新预测器
  696. // self.predictor.market_info_handler(&self.agg_market);
  697. // }
  698. // #[instrument(skip(self), level="TRACE")]
  699. pub fn update_trade_msg(&mut self) {
  700. // 更新保证金
  701. self.local_cash = self.local_cash.round_dp(10);
  702. self.local_coin = self.local_coin.round_dp(10);
  703. // 使用本地推算仓位
  704. // self.trade_msg.position = self.local_position_by_orders.clone();
  705. // self.trade_msg.orders = self.local_orders.clone();
  706. // 更新 ref
  707. let mut ref_tickers: BTreeMap<String, Ticker> = BTreeMap::new();
  708. for i in &self.ref_name {
  709. let bp = self.tickers.get(i).unwrap().buy;
  710. let ap = self.tickers.get(i).unwrap().sell;
  711. ref_tickers.insert(i.clone(), Ticker {
  712. time: Decimal::ZERO,
  713. high: Default::default(),
  714. low: Default::default(),
  715. sell: ap,
  716. buy: bp,
  717. last: Default::default(),
  718. volume: Default::default(),
  719. open_interest: Default::default(),
  720. });
  721. }
  722. self.ref_price = self.predictor.get_ref_price(&ref_tickers);
  723. }
  724. // 本地记录所有报单信息
  725. // #[instrument(skip(self, orders), level="TRACE")]
  726. pub fn _update_local_orders(&mut self, orders: &mut OrderCommand) {
  727. orders.limits_open.extend(orders.limits_close.clone());
  728. if !orders.limits_open.is_empty() {
  729. for j in orders.limits_open.keys() {
  730. let order_info = OrderInfo {
  731. symbol: self.symbol.clone(),
  732. amount: Decimal::from_str(orders.limits_open.get(j).unwrap()[0].as_str()).unwrap(),
  733. side: orders.limits_open.get(j).unwrap()[1].clone(),
  734. price: Decimal::from_str(orders.limits_open.get(j).unwrap()[2].as_str()).unwrap(),
  735. client_id: orders.limits_open.get(j).unwrap()[3].clone(),
  736. filled_price: Default::default(),
  737. filled: Decimal::ZERO,
  738. order_id: "".to_string(),
  739. local_time: self.strategy.local_time,
  740. create_time: self.strategy.local_time,
  741. status: "".to_string(),
  742. fee: Default::default(),
  743. trace_stack: TraceStack::new(0, Instant::now()),
  744. };
  745. // 本地挂单表
  746. self.local_orders.insert(orders.limits_open.get(j).unwrap()[3].clone(), order_info.clone());
  747. // 本地缓存表
  748. self.local_orders_backup.insert(orders.limits_open.get(j).unwrap()[3].clone(), order_info);
  749. // 本地缓存cid表
  750. self.local_orders_backup_cid.push(orders.limits_open.get(j).unwrap()[3].clone());
  751. }
  752. }
  753. if !orders.cancel.is_empty() {
  754. for cancel_key in orders.cancel.keys() {
  755. let cid = orders.cancel.get(cancel_key).unwrap()[0].clone();
  756. if self.local_cancel_log.contains_key(&cid) {
  757. let num = self.local_cancel_log.get(&cid).unwrap() + 1;
  758. self.local_cancel_log.insert(cid, num);
  759. } else {
  760. self.local_cancel_log.insert(cid, 0);
  761. }
  762. }
  763. }
  764. let max_len = 9999usize;
  765. // 清除过于久远的历史记录
  766. if self.local_orders_backup_cid.len() > max_len {
  767. let cid = self.local_orders_backup_cid[0].clone();
  768. // 判断是否超过1个小时 如果超过则移除历史记录
  769. if self.local_orders_backup.contains_key(&cid) {
  770. let local_time = self.local_orders_backup.get(&cid).unwrap().local_time;
  771. if Utc::now().timestamp_millis() - local_time > 3600000 {
  772. self.local_orders_backup.remove(&cid);
  773. self.local_orders_backup_cid.retain(|x| *x != cid)
  774. }
  775. }
  776. }
  777. if self.handled_orders_cid.len() > max_len {
  778. self.handled_orders_cid.remove(0usize);
  779. }
  780. }
  781. // 获取深度信息
  782. // #[instrument(skip(self), level="TRACE")]
  783. pub fn get_all_market_data(&mut self) -> Vec<Decimal> {
  784. // 只能定时触发 组合市场信息=交易盘口+参考盘口
  785. let mut market: Vec<Decimal> = Vec::new();
  786. // 获取交易盘口市场信息
  787. let mut data: Vec<Decimal> = self.local_depths.get(&self.trade_name).unwrap().clone();
  788. self.is_update.insert(self.symbol.clone(), true);
  789. let mut max_min_price = self.max_buy_min_sell_cache.get(&self.trade_name).unwrap().clone();
  790. market.append(&mut data);
  791. market.append(&mut max_min_price);
  792. for i in &self.ref_name {
  793. // 获取参考盘口市场信息
  794. data = self.local_depths.get(i).unwrap().clone();
  795. self.is_update.insert(i.clone(), true);
  796. max_min_price = self.max_buy_min_sell_cache.get(i).unwrap().clone();
  797. data.append(&mut max_min_price);
  798. market.append(&mut data);
  799. }
  800. return market;
  801. }
  802. // #[instrument(skip(self), level="TRACE")]
  803. pub async fn get_exchange_info(&mut self) {
  804. self.market = self.platform_rest.get_self_market();
  805. info!(?self.market);
  806. }
  807. // #[instrument(skip(self, data), level="TRACE")]
  808. pub async fn update_equity(&mut self, data: Account) {
  809. /*
  810. 更新保证金信息
  811. 合约一直更新
  812. 现货只有当出现异常时更新
  813. */
  814. if self.exchange.contains("spot") {
  815. return;
  816. }
  817. self.local_cash = data.balance * self.used_pct;
  818. self.predictor.on_balance(self.local_cash).await;
  819. }
  820. // #[instrument(skip(self), level="TRACE")]
  821. pub async fn update_equity_rest_swap(&mut self) {
  822. match self.platform_rest.get_account().await {
  823. Ok(account) => {
  824. /*
  825. 更新保证金信息
  826. 合约一直更新
  827. 现货只有当出现异常时更新
  828. */
  829. self.local_cash = account.balance * self.used_pct
  830. }
  831. Err(e) => {
  832. info!("获取账户信息错误: {:?}", e);
  833. }
  834. }
  835. }
  836. // #[instrument(skip(self), level="TRACE")]
  837. pub async fn update_equity_rest_spot(&mut self) {
  838. match self.platform_rest.get_spot_account().await {
  839. Ok(mut val) => {
  840. // 如果返回的数组里没有交易货币,则补充交易货币
  841. if !val.iter().any(|a| a.coin.to_uppercase().eq(&self.base.to_uppercase())) {
  842. let mut base_coin_account = Account::new();
  843. base_coin_account.coin = self.base.to_uppercase();
  844. val.push(base_coin_account);
  845. }
  846. for account in val {
  847. // 交易货币
  848. if self.base.to_uppercase() == account.coin {
  849. self.local_coin = account.balance;
  850. }
  851. // 本位货币
  852. if self.quote.to_uppercase() == account.coin {
  853. self.local_cash = account.balance;
  854. }
  855. }
  856. }
  857. Err(err) => {
  858. error!("获取仓位信息异常: {}", err);
  859. }
  860. }
  861. }
  862. // #[instrument(skip(self), level="TRACE")]
  863. pub async fn check_risk(&mut self) {
  864. // 参数检查的风控
  865. if self.strategy.start_cash == Decimal::ZERO {
  866. warn!("请检查交易账户余额");
  867. warn!(?self.strategy.start_cash);
  868. return;
  869. }
  870. if self.strategy.mp == Decimal::ZERO {
  871. warn!("请检查最新价格");
  872. warn!(?self.strategy.mp);
  873. return;
  874. }
  875. // 不是现货执行的回撤风控1
  876. if !self.exchange.contains("spot") {
  877. let draw_back = Decimal::ONE - self.strategy.equity / self.strategy.max_equity;
  878. if draw_back > self.stop_loss {
  879. let exit_msg = format!("{} 总资金吊灯回撤 {}。当前净值:{}, 最高净值{},触发止损,准备停机。",
  880. self.params.account_name, draw_back, self.strategy.equity, self.strategy.max_equity);
  881. warn!(exit_msg);
  882. self.exit_msg = exit_msg;
  883. self.stop().await;
  884. }
  885. }
  886. // 回撤风控2
  887. let draw_back = self.local_profit / self.strategy.start_equity;
  888. if draw_back < -self.stop_loss {
  889. let exit_msg = format!("{} 交易亏损,触发止损,准备停机。", self.params.account_name);
  890. warn!(exit_msg);
  891. self.exit_msg = exit_msg;
  892. self.stop().await;
  893. }
  894. // 回撤风控3,加浮亏的回撤风控
  895. // let floating_profit = if !self.local_position.long_pos.is_zero() && !self.local_position.long_avg.is_zero() && !self.strategy.mp.is_zero() {
  896. // let profit_price = self.strategy.mp - self.local_position.long_avg;
  897. //
  898. // profit_price * self.local_position.long_pos
  899. // } else if !self.local_position.short_pos.is_zero() && !self.local_position.short_pos.is_zero() && !self.strategy.mp.is_zero() {
  900. // let profit_price = self.local_position.short_avg - self.strategy.mp;
  901. //
  902. // profit_price * self.local_position.short_pos
  903. // } else {
  904. // Decimal::ZERO
  905. // };
  906. // let floating_draw_back = floating_profit / self.strategy.start_equity;
  907. // if floating_draw_back < -self.stop_loss {
  908. // let exit_msg = format!("{} 浮动亏损太大{},当前价格:{}, 准备停机。", self.params.account_name, floating_profit, self.strategy.mp);
  909. // warn!(exit_msg);
  910. // self.exit_msg = exit_msg;
  911. // self.stop().await;
  912. // }
  913. // 报单延迟风控,平均延迟允许上限5000ms
  914. if self.ready == 1 && self.platform_rest.get_request_avg_delay() > dec!(5000) {
  915. let exit_msg = format!("{} 延迟爆表 触发风控 准备停机。", self.params.account_name);
  916. warn!(exit_msg);
  917. self.exit_msg = exit_msg;
  918. self.stop().await;
  919. }
  920. // 仓位异常风控,只在合约模式下执行
  921. // if !self.exchange.contains("spot") {
  922. // let local_position_by_orders_pos = (self.local_position_by_orders.long_pos - self.local_position_by_orders.short_pos).abs();
  923. // let local_position_pos = (self.local_position.long_pos - self.local_position.short_pos).abs();
  924. //
  925. // if local_position_by_orders_pos != local_position_pos {
  926. // warn!("{}发现仓位异常", self.params.account_name);
  927. // warn!(?self.local_position_by_orders, ?self.local_position);
  928. // self.position_check_series.push(1);
  929. // } else {
  930. // self.position_check_series.push(0);
  931. // }
  932. //
  933. // // self.position_check_series长度限制
  934. // if self.position_check_series.len() > 6 {
  935. // self.position_check_series.remove(0);
  936. // }
  937. //
  938. // // 连续不符合判定
  939. // if self.position_check_series.iter().sum::<i8>() >= 6 {
  940. // let exit_msg = format!("{} 合约连续检查本地仓位和推算仓位不符合,退出。", self.params.account_name);
  941. // warn!(exit_msg);
  942. // self.exit_msg = exit_msg;
  943. // self.stop().await;
  944. // }
  945. // }
  946. // 下单异常风控
  947. if self.strategy.total_amount == Decimal::ZERO {
  948. let exit_msg = format!("{} 开仓量为0,退出。", self.params.account_name);
  949. warn!(exit_msg);
  950. self.exit_msg = exit_msg;
  951. self.stop().await;
  952. }
  953. // 行情更新异常风控
  954. let mut exchange_names = self.ref_name.clone();
  955. exchange_names.push(self.trade_name.clone());
  956. for exchange_name in exchange_names {
  957. let now_time_millis = Utc::now().timestamp_millis();
  958. let last_update_millis = self.market_update_time.get(&exchange_name).unwrap();
  959. let delay = now_time_millis - last_update_millis;
  960. let limit = global::public_params::MARKET_DELAY_LIMIT;
  961. if self.ready == 1 && delay > limit {
  962. let exit_msg = format!("{} ticker_name:{}, delay:{}ms,行情更新延迟过高,退出。",
  963. self.params.account_name, exchange_name, delay);
  964. warn!(?now_time_millis, ?last_update_millis, ?limit);
  965. warn!(exit_msg);
  966. self.exit_msg = exit_msg;
  967. self.stop().await;
  968. }
  969. }
  970. let local_orders = self.local_orders.clone();
  971. // 订单异常风控
  972. for (client_id, order) in local_orders {
  973. // 订单长时间停留 怀疑漏单 但未必一定漏 5min
  974. if Utc::now().timestamp_millis() - order.local_time > 5 * 60 * 1000 {
  975. let exit_msg = format!("{}订单停留过长,怀疑异常,退出,cid:{}。", self.params.account_name, client_id);
  976. warn!(exit_msg);
  977. self.exit_msg = exit_msg;
  978. self.stop().await;
  979. }
  980. }
  981. // // 持仓均价异常风控(浮盈风控)
  982. // if self.strategy.long_pos_bias != Decimal::ZERO {
  983. // if self.strategy.long_hold_value > Decimal::TWO * self.strategy.min_amount_value {
  984. // if self.strategy.long_pos_bias > dec!(4) || self.strategy.long_pos_bias < -Decimal::TWO {
  985. // let exit_msg = format!("{} long_pos_bias: {},持仓均价异常(mp: {}, avg: {}),退出。", self.params.account_name, self.strategy.long_pos_bias, self.strategy.mp, self.strategy.pos.long_avg);
  986. // warn!(exit_msg);
  987. // self.exit_msg = exit_msg;
  988. // self.stop().await;
  989. // }
  990. // }
  991. // }
  992. // if self.strategy.short_pos_bias != Decimal::ZERO {
  993. // if self.strategy.short_hold_value > Decimal::TWO * self.strategy.min_amount_value {
  994. // if self.strategy.short_pos_bias > dec!(4) || self.strategy.short_pos_bias < -Decimal::TWO {
  995. // let exit_msg = format!("{} short_pos_bias: {},持仓均价异常(mp: {}, avg: {}),退出。", self.params.account_name, self.strategy.short_pos_bias, self.strategy.mp, self.strategy.pos.short_avg);
  996. // warn!(exit_msg);
  997. // self.exit_msg = exit_msg;
  998. // self.stop().await;
  999. // }
  1000. // }
  1001. // }
  1002. // 订单撤单异常风控
  1003. for (client_id, cancel_delay) in self.local_cancel_log.clone() {
  1004. if cancel_delay > 300 {
  1005. let exit_msg = format!("{} 长时间无法撤销,client_id: {},退出。", self.params.account_name, client_id);
  1006. warn!(exit_msg);
  1007. warn!(?self.strategy.ref_price, ?self.strategy.mp);
  1008. self.exit_msg = exit_msg;
  1009. self.stop().await;
  1010. }
  1011. }
  1012. // 定价异常风控
  1013. if self.ready == 1 && (self.predictor.fair_price - self.predictor.mid_price).abs() / self.predictor.mid_price > dec!(0.03) {
  1014. let exit_msg = format!("{} 定价偏离过大,怀疑定价异常,退出。", self.params.account_name);
  1015. warn!(exit_msg);
  1016. warn!(?self.predictor.fair_price, ?self.predictor.mid_price);
  1017. self.exit_msg = exit_msg;
  1018. self.stop().await;
  1019. }
  1020. }
  1021. // #[instrument(skip(self), level="TRACE")]
  1022. pub async fn buy_token(&mut self) {
  1023. // 买入平台币
  1024. // 获取U数量,平台币数量
  1025. // 更新账户
  1026. let mut cash = Decimal::ZERO;
  1027. let mut token = Decimal::ZERO;
  1028. let token_param = get_exchange_token(&self.exchange);
  1029. if token_param.token == "***" {
  1030. error!("购买平台币失败,未找到交易所的平台币!");
  1031. return;
  1032. }
  1033. match self.platform_rest.get_spot_account().await {
  1034. Ok(val) => {
  1035. for account in val {
  1036. if account.coin == "USDT".to_string() {
  1037. cash += account.balance;
  1038. }
  1039. if token_param.token == account.coin {
  1040. token += account.balance;
  1041. }
  1042. }
  1043. }
  1044. Err(err) => {
  1045. error!("购买{}-获取账户失败 {}", token_param.token, err);
  1046. }
  1047. }
  1048. info!("持u {} , 持有平台币 {}", cash, token);
  1049. match self.platform_rest.get_ticker_symbol(format!("{}_USDT", token_param.token)).await {
  1050. Ok(val) => {
  1051. let mp = (val.buy + val.sell) / Decimal::TWO;
  1052. let token_value = token * mp;
  1053. if token_value < token_param.limit_value {
  1054. info!("平台币 {} 数量过少,需要补充。", token_param.token);
  1055. let need_cash: Decimal = Decimal::TWO * Decimal::ONE_HUNDRED;
  1056. if cash > need_cash {
  1057. info!("准备买入{}", token_param.token);
  1058. match self.platform_rest.take_order_symbol(token_param.token, Decimal::ONE, "t-888", "kd", mp * Decimal::from_str("1.001").unwrap(), Decimal::from_str("50").unwrap() / mp).await {
  1059. Ok(value) => {
  1060. info!("买入平台币下单成功: {:?}", value);
  1061. }
  1062. Err(error) => {
  1063. error!("买入平台币下单失败: {}", error)
  1064. }
  1065. }
  1066. } else {
  1067. info!("账户余额:{}{},至少需要:{}{}, 不执行买入{}操作!", cash, self.quote, need_cash, self.quote, token_param.token);
  1068. }
  1069. } else {
  1070. info!("平台币{}数量充足!", token_param.token);
  1071. }
  1072. }
  1073. Err(err) => {
  1074. error!("购买平台币-获取平台币行情失败 {}", err);
  1075. }
  1076. }
  1077. }
  1078. // #[instrument(skip(self, target_hold_coin), level="TRACE")]
  1079. pub async fn check_position(&mut self, target_hold_coin: Decimal, is_first_clear: bool) -> bool {
  1080. info!("------------------------------------------------------------------------------------------------------------");
  1081. info!("步骤一:检查挂单:");
  1082. let mut is_order_clear = false;
  1083. match self.platform_rest.cancel_orders_all().await {
  1084. Ok(val) => {
  1085. let length = val.len();
  1086. is_order_clear = length == 0;
  1087. info!("已清空所有挂单({}条)", length);
  1088. for o in val {
  1089. info!(" {:?}", o);
  1090. }
  1091. }
  1092. Err(err) => {
  1093. warn!("取消所有订单异常({}),启动备用方法。", err);
  1094. match self.platform_rest.cancel_orders().await {
  1095. Ok(val) => {
  1096. let length = val.len();
  1097. is_order_clear = length == 0;
  1098. info!("清空所有挂单({}条):{:?}", length, val);
  1099. }
  1100. Err(exc) => {
  1101. error!("清空当前币对订单异常: {}", exc);
  1102. }
  1103. }
  1104. }
  1105. }
  1106. info!("挂单检查完毕。");
  1107. info!("");
  1108. // 如果不是第一次检查,留500ms给交易所撮合
  1109. if !is_first_clear {
  1110. sleep(Duration::from_millis(500)).await;
  1111. }
  1112. info!("步骤二:检查仓位:");
  1113. let is_position_clear;
  1114. if self.exchange.contains("spot") { // 现货
  1115. is_position_clear = self.check_position_spot(target_hold_coin.clone()).await == 0;
  1116. info!("检查遗漏仓位(现货),目标持仓:{}USDT", target_hold_coin);
  1117. } else { // 合约
  1118. is_position_clear = self.check_position_swap().await == 0;
  1119. info!("遗漏仓位检查完毕(合约)!");
  1120. }
  1121. info!("------------------------------------------------------------------------------------------------------------");
  1122. info!("");
  1123. return is_order_clear && is_position_clear;
  1124. }
  1125. // #[instrument(skip(self, target_hold_coin), level="TRACE")]
  1126. pub async fn check_position_spot(&mut self, target_hold_coin: Decimal) -> usize {
  1127. let mut length = 0;
  1128. match self.platform_rest.get_spot_account().await {
  1129. Ok(mut val) => {
  1130. // 如果返回的数组里没有交易货币,则补充交易货币
  1131. if !val.iter().any(|a| a.coin.to_uppercase().eq(&self.base.to_uppercase())) {
  1132. let mut base_coin_account = Account::new();
  1133. base_coin_account.coin = self.base.to_uppercase();
  1134. val.push(base_coin_account);
  1135. }
  1136. // 仓位补货、卖货
  1137. for account in val {
  1138. let coin_name = account.coin.to_uppercase();
  1139. if check_coin(&self.exchange, &coin_name) {
  1140. continue;
  1141. }
  1142. let symbol = format!("{}_USDT", coin_name);
  1143. let mut _hold_coin = Decimal::ZERO;
  1144. // 如果是交易币,则设定仓位目标
  1145. if coin_name.eq(self.base.to_uppercase().as_str()) {
  1146. _hold_coin = target_hold_coin;
  1147. }
  1148. let ap;
  1149. let bp;
  1150. let mp;
  1151. match self.platform_rest.get_ticker().await {
  1152. Ok(ticker) => {
  1153. ap = ticker.sell;
  1154. bp = ticker.buy;
  1155. mp = (ap + bp) / Decimal::TWO;
  1156. let coin_value = account.balance * mp;
  1157. let diff = _hold_coin - coin_value;
  1158. let side;
  1159. let price = Decimal::ZERO;
  1160. let amount;
  1161. if diff > Decimal::from(10) {
  1162. side = "kd";
  1163. // price = mp*1.001;
  1164. amount = diff;
  1165. } else if diff < Decimal::from(-10) {
  1166. side = "kk";
  1167. // price = mp*0.999;
  1168. amount = -diff / mp;
  1169. } else {
  1170. // 不需要调整说明没有仓位了。
  1171. continue;
  1172. }
  1173. // 需要调整说明有仓位。
  1174. length = 1;
  1175. info!(?ticker);
  1176. info!("需要调整现货仓位 {}USDT(目标:{}USDT) 共计{}{}。", diff, _hold_coin, amount, coin_name);
  1177. let ts = TraceStack::new(0, Instant::now());
  1178. // ts.on_before_send();
  1179. // 价格0,市价单
  1180. match self.platform_rest.take_order_symbol(symbol.clone(), Decimal::ONE, utils::generate_client_id(None).as_str(), side, price, amount).await {
  1181. Ok(v) => {
  1182. // ts.on_after_send();
  1183. info!("side: {}, {} 下单,{:?}, {}", side, symbol, v, ts.to_string());
  1184. // 执行完当前币对 结束循环
  1185. continue;
  1186. }
  1187. Err(ex) => {
  1188. // ts.on_after_send();
  1189. error!("side: {}, {} {}, {}", side, symbol, ex, ts.to_string());
  1190. // 执行完当前币对 结束循环
  1191. continue;
  1192. }
  1193. }
  1194. }
  1195. Err(_e) => {
  1196. error!("清仓中- 获取 {} 币对行情错误,该币对无法调整仓位。", symbol);
  1197. continue;
  1198. }
  1199. }
  1200. }
  1201. // 补仓之后获取最新余额
  1202. self.update_equity_rest_spot().await;
  1203. }
  1204. Err(err) => {
  1205. error!("获取仓位信息异常: {}", err);
  1206. }
  1207. }
  1208. info!("---------------------------遗漏仓位检查完毕(现货)!-----------------------------------");
  1209. return length;
  1210. }
  1211. // #[instrument(skip(self), level="TRACE")]
  1212. pub async fn check_position_swap(&mut self) -> usize {
  1213. let mut length = 0;
  1214. match self.platform_rest.get_positions().await {
  1215. Ok(val) => {
  1216. info!("检查仓位信息");
  1217. for position in val {
  1218. if position.amount.eq(&Decimal::ZERO) {
  1219. continue;
  1220. }
  1221. length = length + 1;
  1222. info!(" 仓位:{:?}", position);
  1223. let price = Decimal::ZERO;
  1224. let side;
  1225. info!(?position);
  1226. match position.position_mode {
  1227. PositionModeEnum::Long => {
  1228. // pd
  1229. side = "pd";
  1230. }
  1231. PositionModeEnum::Short => {
  1232. // pk
  1233. side = "pk";
  1234. }
  1235. _ => {
  1236. error!(" 仓位position_mode匹配失败,不做操作!");
  1237. // 执行完当前币对 结束循环
  1238. continue;
  1239. }
  1240. }
  1241. // 发起清仓订单
  1242. let mut ts = TraceStack::new(0, Instant::now());
  1243. ts.on_before_send();
  1244. // 市价单
  1245. match self.platform_rest.take_order_symbol(position.symbol.clone(),
  1246. Decimal::ONE,
  1247. utils::generate_client_id(None).as_str(),
  1248. side,
  1249. price,
  1250. position.amount.abs()).await {
  1251. Ok(order) => {
  1252. ts.on_after_send();
  1253. info!(" {}仓位清除市价下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
  1254. // 执行完当前币对 结束循环
  1255. continue;
  1256. }
  1257. Err(error) => {
  1258. // ts.on_after_send();
  1259. error!(" {}仓位清除市价下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
  1260. // 执行完当前币对 结束循环
  1261. continue;
  1262. }
  1263. };
  1264. }
  1265. // match self.platform_rest.get_ticker_symbol(position.symbol.clone()).await {
  1266. // Ok(ticker) => {
  1267. // let ap = ticker.sell;
  1268. // let bp = ticker.buy;
  1269. // let mp = (ap + bp) / Decimal::TWO;
  1270. // let price;
  1271. // let side;
  1272. // let market_info;
  1273. // // 获取market
  1274. // match self.platform_rest.get_market_symbol(position.symbol.clone()).await {
  1275. // Ok(market) => {
  1276. // market_info = market;
  1277. // }
  1278. // Err(err) => {
  1279. // error!(" {} 获取当前market异常: {}", position.symbol.clone(), err);
  1280. // continue;
  1281. // }
  1282. // }
  1283. // info!(?position);
  1284. // match position.position_mode {
  1285. // PositionModeEnum::Long => {
  1286. // // pd
  1287. // price = (mp * dec!(0.9985) / market_info.tick_size).floor() * market_info.tick_size;
  1288. // side = "pd";
  1289. // }
  1290. // PositionModeEnum::Short => {
  1291. // // pk
  1292. // price = (mp * dec!(1.0015) / market_info.tick_size).floor() * market_info.tick_size;
  1293. // side = "pk";
  1294. // }
  1295. // _ => {
  1296. // error!(" 仓位position_mode匹配失败,不做操作!");
  1297. // // 执行完当前币对 结束循环
  1298. // continue;
  1299. // }
  1300. // }
  1301. // // 发起清仓订单
  1302. // info!(?ticker);
  1303. // let mut ts = TraceStack::new(0, Instant::now());
  1304. // ts.on_before_send();
  1305. // match self.platform_rest.take_order_symbol(position.symbol.clone(), Decimal::ONE, utils::generate_client_id(None).as_str(), side, price, position.amount.abs()).await {
  1306. // Ok(order) => {
  1307. // ts.on_after_send();
  1308. // info!(" {}仓位清除下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
  1309. // // 执行完当前币对 结束循环
  1310. // continue;
  1311. // }
  1312. // Err(error) => {
  1313. // ts.on_after_send();
  1314. // error!(" {}仓位清除下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
  1315. // // 执行完当前币对 结束循环
  1316. // continue;
  1317. // }
  1318. // };
  1319. // }
  1320. // Err(err) => {
  1321. // error!(" {} 获取当前ticker异常: {}", position.symbol.clone(), err)
  1322. // }
  1323. // }
  1324. // }
  1325. }
  1326. Err(error) => {
  1327. length = 0;
  1328. error!("获取仓位信息异常: {}", error);
  1329. }
  1330. }
  1331. return length;
  1332. }
  1333. // #[instrument(skip(self), level="TRACE")]
  1334. pub async fn stop(&mut self) {
  1335. /*
  1336. * 停机函数
  1337. * mode_signal 不能小于80
  1338. * 前3秒用于maker平仓
  1339. * 后1秒用于撤maker平仓单
  1340. * 休眠1秒再执行check_position 避免卡单导致漏仓位
  1341. */
  1342. info!("进入停机流程...");
  1343. self.running.store(false, Ordering::Relaxed);
  1344. self.mode_signal = 80;
  1345. // info!("开始退出操作");
  1346. // info!("为避免api失效导致遗漏仓位 建议人工复查");
  1347. // self.check_position().await;
  1348. // // 开启停机信号
  1349. // // sleep(Duration::from_secs(1)).await;
  1350. // info!("双重检查遗漏仓位");
  1351. // self.check_position().await;
  1352. // info!("停机退出 停机原因: {}", self.exit_msg);
  1353. // // 发送交易状态 await self._post_params()
  1354. // // TODO: 向中控发送信号
  1355. // info!("退出进程!");
  1356. }
  1357. // #[instrument(skip(self), level="TRACE")]
  1358. pub async fn exit(&mut self) {
  1359. info!("-------------------------启动退出流程({})----------------------------", self.exit_msg);
  1360. info!("");
  1361. self.clear_position_and_orders(Decimal::ZERO).await;
  1362. info!("订单、仓位清除完毕,为避免api失效导致遗漏仓位,建议人工复查。");
  1363. info!("停机原因:{}。", self.exit_msg);
  1364. }
  1365. // #[instrument(skip(self), level="TRACE")]
  1366. pub async fn before_trade(&mut self) -> bool {
  1367. sleep(Duration::from_secs(1)).await;
  1368. // 获取市场信息
  1369. self.get_exchange_info().await;
  1370. // 获取价格信息
  1371. let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!");
  1372. info!(?ticker);
  1373. let mp = (ticker.buy + ticker.sell) / Decimal::TWO;
  1374. // 获取账户信息
  1375. if self.exchange.contains("spot") {
  1376. self.update_equity_rest_spot().await;
  1377. } else {
  1378. self.update_equity_rest_swap().await;
  1379. }
  1380. // 更新中控账户相关信息
  1381. {
  1382. let mut now_balance = self.local_cash / self.used_pct;
  1383. now_balance.rescale(4);
  1384. let mut cci = self.cci_arc.lock().await;
  1385. cci.now_balance = now_balance;
  1386. }
  1387. // 初始资金
  1388. let start_cash = self.local_cash.clone();
  1389. let start_coin = self.local_coin.clone();
  1390. if start_cash.is_zero() && start_coin.is_zero() {
  1391. self.exit_msg = format!("{}{}{}{}", "初始余额为零 cash: ", start_cash, " coin: ", start_coin);
  1392. // 停止程序
  1393. self.stop().await;
  1394. return false;
  1395. }
  1396. info!("初始cash: {start_cash} 初始coin: {start_coin}");
  1397. // 初始化策略基础信息
  1398. if mp <= Decimal::ZERO {
  1399. self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp);
  1400. // 停止程序
  1401. self.stop().await;
  1402. return false;
  1403. } else {
  1404. info!("初始价格为 {}", mp);
  1405. }
  1406. self.strategy.mp = mp.clone();
  1407. self.strategy.start_cash = start_cash.clone();
  1408. self.strategy.start_coin = start_coin.clone();
  1409. self.strategy.start_equity = start_cash + start_coin * mp;
  1410. self.strategy.max_equity = self.strategy.start_equity.clone();
  1411. self.strategy.equity = self.strategy.start_equity.clone();
  1412. self.strategy.total_amount = self.strategy.equity * self.strategy.lever_rate / self.strategy.mp;
  1413. // 获取数量精度
  1414. self.strategy.step_size = self.market.amount_size.clone();
  1415. if self.strategy.step_size > Decimal::ONE {
  1416. self.strategy.step_size = self.strategy.step_size.trunc();
  1417. }
  1418. // 获取价格精度
  1419. self.strategy.tick_size = self.market.tick_size.clone();
  1420. if self.strategy.tick_size > Decimal::ONE {
  1421. self.strategy.tick_size = self.strategy.tick_size.trunc();
  1422. }
  1423. // 获取最小下单价值
  1424. self.strategy.min_amount_value = self.market.min_notional;
  1425. if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero() || self.strategy.min_amount_value.is_zero() {
  1426. self.exit_msg = format!("交易精度未正常获取 step_size:{}, tick_size:{}, min_amount_value:{}", self.strategy.step_size, self.strategy.tick_size, self.strategy.min_amount_value);
  1427. // 停止程序
  1428. self.stop().await;
  1429. return false;
  1430. } else {
  1431. info!("数量精度 {}", self.strategy.step_size);
  1432. info!("价格精度 {}", self.strategy.tick_size);
  1433. info!("最小下单价值 {}", self.strategy.min_amount_value);
  1434. }
  1435. let grid = Decimal::from(self.params.grid.clone());
  1436. // 计算下单数量
  1437. let long_one_hand_value: Decimal = start_cash * self.params.lever_rate / grid;
  1438. let short_one_hand_value: Decimal;
  1439. let long_one_hand_amount: Decimal = (long_one_hand_value / mp / &self.strategy.step_size).floor() * self.strategy.step_size;
  1440. let short_one_hand_amount: Decimal;
  1441. if self.exchange.contains("spot") {
  1442. short_one_hand_value = start_coin * mp * self.params.lever_rate / grid;
  1443. short_one_hand_amount = (short_one_hand_value / mp / self.strategy.step_size).floor() * self.strategy.step_size;
  1444. } else {
  1445. short_one_hand_value = start_cash * self.params.lever_rate / grid;
  1446. short_one_hand_amount = (short_one_hand_value / mp / self.strategy.step_size).floor() * self.strategy.step_size;
  1447. }
  1448. info!("最低单手交易下单量为 buy: {}, sell: {}", long_one_hand_amount, short_one_hand_amount);
  1449. info!(?long_one_hand_value, ?short_one_hand_value, ?long_one_hand_amount, ?short_one_hand_amount, ?self.local_cash, ?self.local_coin);
  1450. let hand_min_limit = Decimal::new(5, 0);
  1451. if (long_one_hand_amount.is_zero() && short_one_hand_amount.is_zero()) ||
  1452. (long_one_hand_value < hand_min_limit && short_one_hand_value < hand_min_limit) {
  1453. self.exit_msg = format!("请检查账户余额!初始下单量太少 buy: {} sell: {}", long_one_hand_amount, short_one_hand_amount);
  1454. // 停止程序
  1455. self.stop().await;
  1456. return false;
  1457. }
  1458. // 初始化调度器
  1459. self.local_cash = start_cash;
  1460. self.local_coin = start_coin;
  1461. self.predictor.on_balance(self.local_cash).await;
  1462. // 买入平台币
  1463. if self.exchange.contains("spot") { // 现货
  1464. self.buy_token().await;
  1465. }
  1466. // 清空挂单和仓位
  1467. self.clear_position_and_orders(self.hold_coin).await;
  1468. /*
  1469. ###### 交易前准备就绪 可以开始交易 ######
  1470. self.loop.create_task(self.rest.go())
  1471. self.loop.create_task(self.on_timer())
  1472. self.loop.create_task(self._run_server())
  1473. self.loop.create_task(self.run_stratey())
  1474. self.loop.create_task(self.early_stop_loop())
  1475. */
  1476. return true;
  1477. }
  1478. pub async fn clear_position_and_orders(&mut self, target_hold_coin: Decimal) {
  1479. let mut clear_count = 1;
  1480. let mut total_clear_count = 1;
  1481. loop {
  1482. let is_clear = self.check_position(target_hold_coin, clear_count <= 1).await;
  1483. // 如果N次中间有一次没清理干净,则重新清理
  1484. if !is_clear {
  1485. clear_count = 1
  1486. }
  1487. // 如果连续N次都检查到清理干净,则表明大概率是清理干净了的
  1488. if clear_count >= 1 {
  1489. info!("连续{}次清理完成。", clear_count);
  1490. info!("");
  1491. break
  1492. }
  1493. // 最大次数判定
  1494. if total_clear_count >= 500 {
  1495. info!("清理次数达到上限:{}次,不再执行清理。", total_clear_count);
  1496. info!("");
  1497. break
  1498. }
  1499. clear_count += 1;
  1500. total_clear_count += 1;
  1501. info!("清理指令发送完毕,启动第{}次检查。", clear_count);
  1502. info!("");
  1503. // 每次清理间隔1s
  1504. sleep(Duration::from_secs(1)).await;
  1505. }
  1506. }
  1507. }
  1508. // #[instrument(skip(core_arc), level="TRACE")]
  1509. pub fn run_strategy(core_arc: Arc<Mutex<Core>>) -> JoinHandle<()> {
  1510. spawn(async move {
  1511. //定期触发策略
  1512. info!("定时触发器启动");
  1513. info!("前期准备完成");
  1514. sleep(Duration::from_secs(1)).await;
  1515. loop {
  1516. let start_time = Utc::now().timestamp_millis();
  1517. let mut delay = 1u64;
  1518. {
  1519. let mut core = core_arc.lock().await;
  1520. if core.ready == 1 {
  1521. // 更新交易信息集合
  1522. core.update_trade_msg();
  1523. if core.mode_signal != 0 {
  1524. if core.mode_signal > 1 {
  1525. core.mode_signal -= 1;
  1526. }
  1527. if core.mode_signal == 1 {
  1528. return;
  1529. }
  1530. // 触发策略 更新策略时间
  1531. core.strategy.local_time = Utc::now().timestamp_millis();
  1532. let mut platform_rest_fb = core.platform_rest.clone_box();
  1533. // 获取信号
  1534. if core.mode_signal > 20 {
  1535. // 先执行onExit
  1536. let mut predictor = core.predictor.clone();
  1537. let local_orders = core.local_orders.clone();
  1538. let local_position = core.local_position.clone();
  1539. let local_cash = core.local_cash.clone();
  1540. let local_coin = core.local_coin.clone();
  1541. let mut orders = core.strategy.on_exit(&mut predictor, &local_orders, &local_position, &local_coin, &local_cash);
  1542. if orders.is_not_empty() {
  1543. info!("触发onExit");
  1544. info!(?orders);
  1545. core._update_local_orders(&mut orders);
  1546. spawn(async move {
  1547. let mut ts = TraceStack::new(0, Instant::now());
  1548. platform_rest_fb.command_order(&mut orders, &mut ts).await;
  1549. });
  1550. }
  1551. } else {
  1552. // 再执行onSleep
  1553. let mut predictor = core.predictor.clone();
  1554. let local_orders = core.local_orders.clone();
  1555. let local_position = core.local_position.clone();
  1556. let local_cash = core.local_cash.clone();
  1557. let local_coin = core.local_coin.clone();
  1558. let mut orders = core.strategy.on_sleep(&mut predictor, &local_orders, &local_position, &local_coin, &local_cash);
  1559. // 记录指令触发信息
  1560. if orders.is_not_empty() {
  1561. info!("触发onSleep");
  1562. info!(?orders);
  1563. core._update_local_orders(&mut orders);
  1564. spawn(async move {
  1565. let mut ts = TraceStack::new(0, Instant::now());
  1566. platform_rest_fb.command_order(&mut orders, &mut ts).await;
  1567. });
  1568. }
  1569. }
  1570. }
  1571. } else {
  1572. core.check_ready();
  1573. }
  1574. // 计算耗时并进行休眠
  1575. let pass_time = (Utc::now().timestamp_millis() - start_time).to_u64().unwrap();
  1576. if pass_time < core.interval {
  1577. delay = core.interval - pass_time;
  1578. }
  1579. }
  1580. sleep(Duration::from_millis(delay)).await;
  1581. }
  1582. })
  1583. }
  1584. // 定期触发的系统逻辑
  1585. // #[instrument(skip(core_arc), level="TRACE")]
  1586. pub fn on_timer(core_arc: Arc<Mutex<Core>>) -> JoinHandle<()> {
  1587. let core_arc_clone = core_arc.clone();
  1588. return spawn(async move {
  1589. tokio::time::sleep(Duration::from_secs(20)).await;
  1590. loop {
  1591. tokio::time::sleep(Duration::from_secs(10)).await;
  1592. let mut core = core_arc_clone.lock().await;
  1593. {
  1594. // 检查风控
  1595. core.check_risk().await;
  1596. // 线程停止信号
  1597. if core.mode_signal == 1 {
  1598. return;
  1599. }
  1600. // 计算预估成交额
  1601. let total_trade_value = core.local_buy_value + core.local_sell_value;
  1602. let time_diff = Decimal::from(Utc::now().timestamp_millis() - core.start_time);
  1603. let _86400 = dec!(86400);
  1604. let _10000 = dec!(10000);
  1605. let trade_vol_24h = (total_trade_value / time_diff) * _86400;
  1606. core.strategy.trade_vol_24h_w = trade_vol_24h / _10000;
  1607. core.strategy.trade_vol_24h_w.rescale(2);
  1608. // TODO core没有rest
  1609. // info!("Rest报单平均延迟{}ms", core.rest.avg_delay);
  1610. // info!("Rest报单最高延迟{}ms", core.rest.max_delay);
  1611. for (_name, _interval) in &core.market_update_interval {
  1612. // debug!("WS盘口{}行情平均更新间隔{}ms。", name, interval);
  1613. }
  1614. }
  1615. }
  1616. });
  1617. }
  1618. // 是不是不用调整仓位的币
  1619. pub fn check_coin(exchanges: &String, coin_name: &String) -> bool {
  1620. let mut result = false;
  1621. match exchanges.as_str() {
  1622. "bitget_spot" => {
  1623. result = ["BGB", "USDT"].contains(&coin_name.as_str());
  1624. }
  1625. _ => {}
  1626. }
  1627. result
  1628. }
  1629. //获取平台币
  1630. pub fn get_exchange_token(exchanges: &String) -> TokenParam {
  1631. return match exchanges.as_str() {
  1632. "bitget_spot" => {
  1633. TokenParam {
  1634. token: "BGB".to_string(),
  1635. // 30u
  1636. limit_value: Decimal::TEN * (Decimal::ONE + Decimal::TWO),
  1637. }
  1638. }
  1639. _ => {
  1640. TokenParam {
  1641. token: "***".to_string(),
  1642. limit_value: Decimal::ZERO,
  1643. }
  1644. }
  1645. };
  1646. }