strategy.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. use std::str::FromStr;
  2. use std::sync::Arc;
  3. use anyhow::{anyhow, bail, Result};
  4. use rust_decimal::Decimal;
  5. use std::time::{Duration, Instant};
  6. use rust_decimal_macros::dec;
  7. use serde_json::Value;
  8. use tokio::sync::Mutex;
  9. use tokio::time::sleep;
  10. use tracing::{info, warn};
  11. use crate::data_manager::DataManager;
  12. use crate::exchange::extended_rest_client::ExtendedRestClient;
  13. use crate::utils::response::Response;
  14. #[derive(Debug, Clone, PartialEq)]
  15. pub enum StrategyState {
  16. Idle, // 空闲状态,准备下单
  17. WaitingLimitOrder { // 等待限价单成交
  18. order_id: String,
  19. price: Decimal,
  20. },
  21. CancellingOrder { // 撤单中
  22. order_id: String,
  23. },
  24. ExecutingMarketOrder { // 执行市价单
  25. },
  26. WaitingMarketOrder { // 等待市价单执行完毕
  27. order_id: String,
  28. },
  29. }
  30. #[allow(dead_code)]
  31. pub struct Strategy {
  32. state: StrategyState,
  33. order_quantity: Decimal, // 写死的订单数量
  34. filled_quantity: Decimal, // 成交数量
  35. rest_client: Arc<Mutex<ExtendedRestClient>>, // rest客户端
  36. min_order_interval_ms: u128, // 最小下单间隔(毫秒)
  37. min_query_order_interval_ms: u128, // 最小查单间隔(毫秒)
  38. last_order_time: Instant,
  39. last_query_order_time: Instant,
  40. }
  41. impl Strategy {
  42. pub fn new(client_am: Arc<Mutex<ExtendedRestClient>>) -> Strategy {
  43. Strategy {
  44. state: StrategyState::Idle,
  45. order_quantity: dec!(0.0001),
  46. filled_quantity: Decimal::ZERO,
  47. rest_client: client_am,
  48. min_order_interval_ms: 200,
  49. min_query_order_interval_ms: 200,
  50. last_order_time: Instant::now(),
  51. last_query_order_time: Instant::now(),
  52. }
  53. }
  54. pub async fn do_strategy(&mut self, dm: &DataManager) -> Result<()> {
  55. match self.state.clone() {
  56. StrategyState::Idle => {
  57. if let Err(e) = self.handle_idle_state(dm).await {
  58. warn!("空闲状态处理失败: {}", e);
  59. }
  60. }
  61. StrategyState::WaitingLimitOrder { order_id, price } => {
  62. if let Err(e) = self.handle_waiting_limit_order(dm, order_id, price).await {
  63. warn!("等待限价单状态处理失败: {}", e);
  64. }
  65. }
  66. StrategyState::CancellingOrder { order_id } => {
  67. if let Err(e) = self.handle_cancelling_order(dm, order_id).await {
  68. warn!("撤单状态处理失败: {}", e);
  69. }
  70. }
  71. StrategyState::ExecutingMarketOrder { } => {
  72. if let Err(e) = self.handle_executing_market_order(dm).await {
  73. warn!("市价单执行状态处理失败: {}", e);
  74. }
  75. }
  76. StrategyState::WaitingMarketOrder { order_id } => {
  77. if let Err(e) = self.handle_waiting_market_order(dm, order_id).await {
  78. warn!("等待限价单状态处理失败: {}", e);
  79. }
  80. }
  81. }
  82. Ok(())
  83. }
  84. // 状态1: 空闲状态,下限价买单
  85. async fn handle_idle_state(&mut self, dm: &DataManager) -> Result<()> {
  86. info!("============================ 进入空闲状态,准备下买单 ===========================");
  87. // 检查是否满足下单时间间隔
  88. let elapsed = self.last_order_time.elapsed().as_millis();
  89. if elapsed < self.min_order_interval_ms {
  90. return Ok(());
  91. }
  92. // 尝试下单,只有成功才转换状态
  93. match self.place_limit_buy_order(dm.best_bid, self.order_quantity).await {
  94. Ok(order_id) => {
  95. info!("限价买单下单成功,订单ID: {}", order_id);
  96. self.state = StrategyState::WaitingLimitOrder {
  97. order_id,
  98. price: dm.best_bid
  99. };
  100. self.last_order_time = Instant::now();
  101. self.last_query_order_time = self.last_order_time;
  102. Ok(())
  103. }
  104. Err(e) => {
  105. warn!("下限价买单失败: {}", e);
  106. Err(e)
  107. }
  108. }
  109. }
  110. // 状态2&3: 等待限价单成交
  111. async fn handle_waiting_limit_order(
  112. &mut self,
  113. dm: &DataManager,
  114. order_id: String,
  115. price: Decimal,
  116. ) -> Result<()> {
  117. // info!("等待限价单成交,订单ID: {}, 价格: {}", order_id, price);
  118. // 下单之后过一会再检查订单
  119. let elapsed = self.last_query_order_time.elapsed().as_millis();
  120. if elapsed < self.min_query_order_interval_ms {
  121. return Ok(());
  122. }
  123. self.last_query_order_time = Instant::now();
  124. // 检查订单是否已成交
  125. match self.check_order_filled(&order_id).await {
  126. Ok(true) => {
  127. info!("限价单已成交,准备执行市价单");
  128. self.state = StrategyState::ExecutingMarketOrder { };
  129. return Ok(());
  130. }
  131. Ok(false) => {
  132. // 订单未成交,继续检查价格
  133. }
  134. Err(e) => {
  135. warn!("检查订单成交状态失败: {}", e);
  136. return Err(e);
  137. }
  138. }
  139. // 检查价格是否变化
  140. if dm.best_bid != price {
  141. info!("价格已变化,从 {} 到 {},准备撤单", price, dm.best_bid);
  142. self.state = StrategyState::CancellingOrder {
  143. order_id
  144. };
  145. }
  146. Ok(())
  147. }
  148. // 状态3: 撤单处理
  149. async fn handle_cancelling_order(
  150. &mut self,
  151. _dm: &DataManager,
  152. order_id: String,
  153. ) -> Result<()> {
  154. info!("撤单中,订单ID: {}", order_id);
  155. // 尝试撤单
  156. if let Err(e) = self.cancel_order(&order_id).await {
  157. warn!("撤单操作失败: {}", e);
  158. return Err(e);
  159. }
  160. // 撤单后暂停几秒再检查
  161. sleep(Duration::from_millis(3000)).await;
  162. // 撤单后检查是否有成交
  163. match self.check_order_partially_filled(&order_id).await {
  164. Ok(true) => {
  165. info!("撤单后发现有成交,准备执行市价单");
  166. self.state = StrategyState::ExecutingMarketOrder { };
  167. Ok(())
  168. }
  169. Ok(false) => {
  170. info!("撤单完成,无成交,返回空闲状态");
  171. self.state = StrategyState::Idle;
  172. Ok(())
  173. }
  174. Err(e) => {
  175. warn!("检查订单部分成交状态失败: {}", e);
  176. Err(e)
  177. }
  178. }
  179. }
  180. // 状态4: 执行市价单
  181. async fn handle_executing_market_order(
  182. &mut self,
  183. dm: &DataManager,
  184. ) -> Result<()> {
  185. info!("执行市价卖单");
  186. // 检查是否满足下单时间间隔
  187. let elapsed = self.last_order_time.elapsed().as_millis();
  188. if elapsed < self.min_order_interval_ms {
  189. return Ok(());
  190. }
  191. // 尝试下市价单
  192. match self.place_market_sell_order(dm.best_bid, self.filled_quantity).await {
  193. Ok(id) => {
  194. info!("市价卖单下单成功,订单ID: {}", id);
  195. self.last_order_time = Instant::now();
  196. self.state = StrategyState::WaitingMarketOrder {
  197. order_id: id.clone(),
  198. };
  199. Ok(())
  200. }
  201. Err(e) => {
  202. warn!("下市价卖单失败: {}", e);
  203. Err(e)
  204. }
  205. }
  206. }
  207. // 状态5:等待市价单成交
  208. async fn handle_waiting_market_order(
  209. &mut self,
  210. _dm: &DataManager,
  211. order_id: String,
  212. ) -> Result<()> {
  213. let elapsed = self.last_query_order_time.elapsed().as_millis();
  214. if elapsed < self.min_query_order_interval_ms {
  215. return Ok(());
  216. }
  217. self.last_query_order_time = Instant::now();
  218. // 等待市价单成交(市价单通常立即成交)
  219. match self.check_order_filled(&order_id).await {
  220. Ok(true) => {
  221. info!("市价单已成交,返回空闲状态");
  222. self.state = StrategyState::Idle;
  223. self.filled_quantity = Decimal::ZERO;
  224. Ok(())
  225. }
  226. Ok(false) => {
  227. self.state = StrategyState::ExecutingMarketOrder { };
  228. Ok(())
  229. }
  230. Err(e) => {
  231. warn!("检查市价单成交状态失败: {}", e);
  232. Err(e)
  233. }
  234. }
  235. }
  236. // ==================== 抽象方法(待实现)====================
  237. /// 下限价买单
  238. async fn place_limit_buy_order(&self, price: Decimal, quantity: Decimal) -> Result<String> {
  239. info!("下限价买单: 价格={}, 数量={}", price, quantity);
  240. let mut client = self.rest_client.lock().await;
  241. // 调用client执行下单
  242. let create_result = client.post_order(
  243. "LIMIT",
  244. "BUY",
  245. quantity.to_string().as_str(),
  246. price.to_string().as_str(),
  247. false,
  248. ).await;
  249. // 解析下单结果并返回
  250. self.match_create_order_result(&create_result)
  251. }
  252. /// 下市价卖单
  253. async fn place_market_sell_order(&self, price: Decimal, quantity: Decimal) -> Result<String> {
  254. info!("下市价卖单: 数量={}", quantity);
  255. let mut client = self.rest_client.lock().await;
  256. // 调用client执行下单
  257. let create_result = client.post_order(
  258. "MARKET",
  259. "SELL",
  260. quantity.to_string().as_str(),
  261. price.to_string().as_str(),
  262. true,
  263. ).await;
  264. // 解析下单结果并返回
  265. self.match_create_order_result(&create_result)
  266. }
  267. /// 撤单
  268. async fn cancel_order(&self, order_id: &str) -> Result<()> {
  269. info!("撤单: {}", order_id);
  270. let mut client = self.rest_client.lock().await;
  271. let response = client.cancel_order(order_id).await;
  272. let value = &response.data;
  273. // 预先捕获整个 Value 的字符串表示,用于错误报告
  274. let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
  275. // 获取status
  276. let status = value.get("status")
  277. .and_then(|v| v.as_str())
  278. .ok_or_else(|| anyhow!("撤单-获取 'status' 失败,原始JSON:{}", value_str))?;
  279. // 判定status
  280. if status != "OK" {
  281. bail!("撤单失败,状态不为OK,原始JSON:{}", value_str)
  282. }
  283. Ok(())
  284. }
  285. /// 检查订单是否完全成交
  286. async fn check_order_filled(&mut self, order_id: &str) -> Result<bool> {
  287. self.check_order_filled_status(order_id, true).await
  288. }
  289. /// 检查订单是否有部分成交
  290. async fn check_order_partially_filled(&mut self, order_id: &str) -> Result<bool> {
  291. self.check_order_filled_status(order_id, false).await
  292. }
  293. /// 检查订单成交状态
  294. /// - `require_full_fill`: true 表示只接受完全成交,false 表示部分成交也接受
  295. async fn check_order_filled_status(&mut self, order_id: &str, require_full_fill: bool) -> Result<bool> {
  296. let check_type = if require_full_fill { "完全成交" } else { "部分成交" };
  297. // info!("检查订单是否{}: {}", check_type, order_id);
  298. let data = self.get_order_result(order_id).await?;
  299. let data_str = serde_json::to_string(&data)
  300. .unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
  301. // 获取订单状态 [NEW, PARTIALLY_FILLED, FILLED, UNTRIGGERED, CANCELLED, REJECTED, EXPIRED, TRIGGERED]
  302. let status = data.get("status")
  303. .and_then(|v| v.as_str())
  304. .ok_or_else(|| anyhow!("查单-获取 'data.status' 失败,原始JSON:{}", data_str))?;
  305. // 根据要求判断是否符合成交条件
  306. let is_filled = if require_full_fill {
  307. status == "FILLED"
  308. } else {
  309. status == "FILLED" || status == "PARTIALLY_FILLED"
  310. };
  311. if is_filled {
  312. // 获取真实成交数量
  313. let filled_qty = data.get("filledQty")
  314. .and_then(|v| v.as_str())
  315. .ok_or_else(|| anyhow!("查单-获取 'data.filledQty' 失败,原始JSON:{}", data_str))
  316. .and_then(|v| Decimal::from_str(v)
  317. .map_err(|e| anyhow!("查单-解析 'data.filledQty' 为 Decimal 失败: {}, 值: {}", e, v))
  318. )?;
  319. self.filled_quantity = filled_qty.normalize();
  320. info!("订单 {} 已{},成交数量: {}", order_id, check_type, filled_qty);
  321. Ok(true)
  322. } else {
  323. Ok(false)
  324. }
  325. }
  326. async fn get_order_result(&self, order_id: &str) -> Result<Value> {
  327. let mut client = self.rest_client.lock().await;
  328. let response = client.get_order(order_id).await;
  329. let value = &response.data;
  330. // 预先捕获整个 Value 的字符串表示,用于错误报告
  331. let value_str = serde_json::to_string(&value)
  332. .unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
  333. // 获取并判定 status
  334. let status = value.get("status")
  335. .and_then(|v| v.as_str())
  336. .ok_or_else(|| anyhow!("查单-获取 'status' 失败,原始JSON:{}", value_str))?;
  337. if status != "OK" {
  338. bail!("查单失败,状态不为OK: {},原始JSON:{}", status, value_str)
  339. }
  340. // 获取 data 字段
  341. value.get("data")
  342. .cloned()
  343. .ok_or_else(|| anyhow!("查单-获取 'data' 字段失败,原始 JSON: {}", value_str))
  344. }
  345. fn match_create_order_result(&self, create_result: &Result<Response>) -> Result<String> {
  346. match create_result {
  347. Ok(response) => {
  348. let value = &response.data;
  349. // 预先捕获整个 Value 的字符串表示,用于错误报告
  350. let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
  351. // 获取status
  352. let status = value.get("status")
  353. .and_then(|v| v.as_str())
  354. .ok_or_else(|| anyhow!("下单-获取 'status' 失败,原始JSON:{}", value_str))?;
  355. // 判定status
  356. if status != "OK" {
  357. bail!("下单失败,状态不为OK,原始JSON:{}", value_str)
  358. }
  359. // 尝试获取 data 字段
  360. let data = value.get("data")
  361. .ok_or_else(|| anyhow!("下单-获取 'data' 字段失败,原始 JSON: {}", value_str))?;
  362. // 获取order的id
  363. let id = data.get("id")
  364. .and_then(|v| v.as_i64())
  365. .ok_or_else(|| anyhow!("下单-获取 'data.id' 失败,原始JSON:{}", value_str))?;
  366. Ok(id.to_string())
  367. }
  368. Err(error) => {
  369. bail!("下单失败:{}", error);
  370. }
  371. }
  372. }
  373. }