| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- use std::str::FromStr;
- use std::sync::Arc;
- use anyhow::{anyhow, bail, Result};
- use rust_decimal::Decimal;
- use std::time::{Duration, Instant};
- use rust_decimal_macros::dec;
- use serde_json::Value;
- use tokio::sync::Mutex;
- use tokio::time::sleep;
- use tracing::{info, warn};
- use crate::data_manager::DataManager;
- use crate::exchange::extended_rest_client::ExtendedRestClient;
- use crate::utils::response::Response;
- #[derive(Debug, Clone, PartialEq)]
- pub enum StrategyState {
- Idle, // 空闲状态,准备下单
- WaitingLimitOrder { // 等待限价单成交
- order_id: String,
- price: Decimal,
- },
- CancellingOrder { // 撤单中
- order_id: String,
- },
- ExecutingMarketOrder { // 执行市价单
- },
- WaitingMarketOrder { // 等待市价单执行完毕
- order_id: String,
- },
- }
- #[allow(dead_code)]
- pub struct Strategy {
- state: StrategyState,
- order_quantity: Decimal, // 写死的订单数量
- filled_quantity: Decimal, // 成交数量
- rest_client: Arc<Mutex<ExtendedRestClient>>, // rest客户端
- min_order_interval_ms: u128, // 最小下单间隔(毫秒)
- min_query_order_interval_ms: u128, // 最小查单间隔(毫秒)
- last_order_time: Instant,
- last_query_order_time: Instant,
- }
- impl Strategy {
- pub fn new(client_am: Arc<Mutex<ExtendedRestClient>>) -> Strategy {
- Strategy {
- state: StrategyState::Idle,
- order_quantity: dec!(0.0001),
- filled_quantity: Decimal::ZERO,
- rest_client: client_am,
- min_order_interval_ms: 200,
- min_query_order_interval_ms: 200,
- last_order_time: Instant::now(),
- last_query_order_time: Instant::now(),
- }
- }
- pub async fn do_strategy(&mut self, dm: &DataManager) -> Result<()> {
- match self.state.clone() {
- StrategyState::Idle => {
- if let Err(e) = self.handle_idle_state(dm).await {
- warn!("空闲状态处理失败: {}", e);
- }
- }
- StrategyState::WaitingLimitOrder { order_id, price } => {
- if let Err(e) = self.handle_waiting_limit_order(dm, order_id, price).await {
- warn!("等待限价单状态处理失败: {}", e);
- }
- }
- StrategyState::CancellingOrder { order_id } => {
- if let Err(e) = self.handle_cancelling_order(dm, order_id).await {
- warn!("撤单状态处理失败: {}", e);
- }
- }
- StrategyState::ExecutingMarketOrder { } => {
- if let Err(e) = self.handle_executing_market_order(dm).await {
- warn!("市价单执行状态处理失败: {}", e);
- }
- }
- StrategyState::WaitingMarketOrder { order_id } => {
- if let Err(e) = self.handle_waiting_market_order(dm, order_id).await {
- warn!("等待限价单状态处理失败: {}", e);
- }
- }
- }
- Ok(())
- }
- // 状态1: 空闲状态,下限价买单
- async fn handle_idle_state(&mut self, dm: &DataManager) -> Result<()> {
- info!("============================ 进入空闲状态,准备下买单 ===========================");
- // 检查是否满足下单时间间隔
- let elapsed = self.last_order_time.elapsed().as_millis();
- if elapsed < self.min_order_interval_ms {
- return Ok(());
- }
- // 尝试下单,只有成功才转换状态
- match self.place_limit_buy_order(dm.best_bid, self.order_quantity).await {
- Ok(order_id) => {
- info!("限价买单下单成功,订单ID: {}", order_id);
- self.state = StrategyState::WaitingLimitOrder {
- order_id,
- price: dm.best_bid
- };
- self.last_order_time = Instant::now();
- self.last_query_order_time = self.last_order_time;
- Ok(())
- }
- Err(e) => {
- warn!("下限价买单失败: {}", e);
- Err(e)
- }
- }
- }
- // 状态2&3: 等待限价单成交
- async fn handle_waiting_limit_order(
- &mut self,
- dm: &DataManager,
- order_id: String,
- price: Decimal,
- ) -> Result<()> {
- // info!("等待限价单成交,订单ID: {}, 价格: {}", order_id, price);
- // 下单之后过一会再检查订单
- let elapsed = self.last_query_order_time.elapsed().as_millis();
- if elapsed < self.min_query_order_interval_ms {
- return Ok(());
- }
- self.last_query_order_time = Instant::now();
- // 检查订单是否已成交
- match self.check_order_filled(&order_id).await {
- Ok(true) => {
- info!("限价单已成交,准备执行市价单");
- self.state = StrategyState::ExecutingMarketOrder { };
- return Ok(());
- }
- Ok(false) => {
- // 订单未成交,继续检查价格
- }
- Err(e) => {
- warn!("检查订单成交状态失败: {}", e);
- return Err(e);
- }
- }
- // 检查价格是否变化
- if dm.best_bid != price {
- info!("价格已变化,从 {} 到 {},准备撤单", price, dm.best_bid);
- self.state = StrategyState::CancellingOrder {
- order_id
- };
- }
- Ok(())
- }
- // 状态3: 撤单处理
- async fn handle_cancelling_order(
- &mut self,
- _dm: &DataManager,
- order_id: String,
- ) -> Result<()> {
- info!("撤单中,订单ID: {}", order_id);
- // 尝试撤单
- if let Err(e) = self.cancel_order(&order_id).await {
- warn!("撤单操作失败: {}", e);
- return Err(e);
- }
- // 撤单后暂停几秒再检查
- sleep(Duration::from_millis(3000)).await;
- // 撤单后检查是否有成交
- match self.check_order_partially_filled(&order_id).await {
- Ok(true) => {
- info!("撤单后发现有成交,准备执行市价单");
- self.state = StrategyState::ExecutingMarketOrder { };
- Ok(())
- }
- Ok(false) => {
- info!("撤单完成,无成交,返回空闲状态");
- self.state = StrategyState::Idle;
- Ok(())
- }
- Err(e) => {
- warn!("检查订单部分成交状态失败: {}", e);
- Err(e)
- }
- }
- }
- // 状态4: 执行市价单
- async fn handle_executing_market_order(
- &mut self,
- dm: &DataManager,
- ) -> Result<()> {
- info!("执行市价卖单");
- // 检查是否满足下单时间间隔
- let elapsed = self.last_order_time.elapsed().as_millis();
- if elapsed < self.min_order_interval_ms {
- return Ok(());
- }
- // 尝试下市价单
- match self.place_market_sell_order(dm.best_bid, self.filled_quantity).await {
- Ok(id) => {
- info!("市价卖单下单成功,订单ID: {}", id);
- self.last_order_time = Instant::now();
- self.state = StrategyState::WaitingMarketOrder {
- order_id: id.clone(),
- };
-
- Ok(())
- }
- Err(e) => {
- warn!("下市价卖单失败: {}", e);
- Err(e)
- }
- }
- }
- // 状态5:等待市价单成交
- async fn handle_waiting_market_order(
- &mut self,
- _dm: &DataManager,
- order_id: String,
- ) -> Result<()> {
- let elapsed = self.last_query_order_time.elapsed().as_millis();
- if elapsed < self.min_query_order_interval_ms {
- return Ok(());
- }
- self.last_query_order_time = Instant::now();
- // 等待市价单成交(市价单通常立即成交)
- match self.check_order_filled(&order_id).await {
- Ok(true) => {
- info!("市价单已成交,返回空闲状态");
- self.state = StrategyState::Idle;
- self.filled_quantity = Decimal::ZERO;
- Ok(())
- }
- Ok(false) => {
- self.state = StrategyState::ExecutingMarketOrder { };
-
- Ok(())
- }
- Err(e) => {
- warn!("检查市价单成交状态失败: {}", e);
- Err(e)
- }
- }
- }
- // ==================== 抽象方法(待实现)====================
- /// 下限价买单
- async fn place_limit_buy_order(&self, price: Decimal, quantity: Decimal) -> Result<String> {
- info!("下限价买单: 价格={}, 数量={}", price, quantity);
- let mut client = self.rest_client.lock().await;
- // 调用client执行下单
- let create_result = client.post_order(
- "LIMIT",
- "BUY",
- quantity.to_string().as_str(),
- price.to_string().as_str(),
- false,
- ).await;
- // 解析下单结果并返回
- self.match_create_order_result(&create_result)
- }
- /// 下市价卖单
- async fn place_market_sell_order(&self, price: Decimal, quantity: Decimal) -> Result<String> {
- info!("下市价卖单: 数量={}", quantity);
- let mut client = self.rest_client.lock().await;
- // 调用client执行下单
- let create_result = client.post_order(
- "MARKET",
- "SELL",
- quantity.to_string().as_str(),
- price.to_string().as_str(),
- true,
- ).await;
- // 解析下单结果并返回
- self.match_create_order_result(&create_result)
- }
- /// 撤单
- async fn cancel_order(&self, order_id: &str) -> Result<()> {
- info!("撤单: {}", order_id);
- let mut client = self.rest_client.lock().await;
- let response = client.cancel_order(order_id).await;
- let value = &response.data;
- // 预先捕获整个 Value 的字符串表示,用于错误报告
- let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
- // 获取status
- let status = value.get("status")
- .and_then(|v| v.as_str())
- .ok_or_else(|| anyhow!("撤单-获取 'status' 失败,原始JSON:{}", value_str))?;
- // 判定status
- if status != "OK" {
- bail!("撤单失败,状态不为OK,原始JSON:{}", value_str)
- }
- Ok(())
- }
- /// 检查订单是否完全成交
- async fn check_order_filled(&mut self, order_id: &str) -> Result<bool> {
- self.check_order_filled_status(order_id, true).await
- }
- /// 检查订单是否有部分成交
- async fn check_order_partially_filled(&mut self, order_id: &str) -> Result<bool> {
- self.check_order_filled_status(order_id, false).await
- }
- /// 检查订单成交状态
- /// - `require_full_fill`: true 表示只接受完全成交,false 表示部分成交也接受
- async fn check_order_filled_status(&mut self, order_id: &str, require_full_fill: bool) -> Result<bool> {
- let check_type = if require_full_fill { "完全成交" } else { "部分成交" };
- // info!("检查订单是否{}: {}", check_type, order_id);
- let data = self.get_order_result(order_id).await?;
- let data_str = serde_json::to_string(&data)
- .unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
- // 获取订单状态 [NEW, PARTIALLY_FILLED, FILLED, UNTRIGGERED, CANCELLED, REJECTED, EXPIRED, TRIGGERED]
- let status = data.get("status")
- .and_then(|v| v.as_str())
- .ok_or_else(|| anyhow!("查单-获取 'data.status' 失败,原始JSON:{}", data_str))?;
- // 根据要求判断是否符合成交条件
- let is_filled = if require_full_fill {
- status == "FILLED"
- } else {
- status == "FILLED" || status == "PARTIALLY_FILLED"
- };
- if is_filled {
- // 获取真实成交数量
- let filled_qty = data.get("filledQty")
- .and_then(|v| v.as_str())
- .ok_or_else(|| anyhow!("查单-获取 'data.filledQty' 失败,原始JSON:{}", data_str))
- .and_then(|v| Decimal::from_str(v)
- .map_err(|e| anyhow!("查单-解析 'data.filledQty' 为 Decimal 失败: {}, 值: {}", e, v))
- )?;
- self.filled_quantity = filled_qty.normalize();
- info!("订单 {} 已{},成交数量: {}", order_id, check_type, filled_qty);
- Ok(true)
- } else {
- Ok(false)
- }
- }
- async fn get_order_result(&self, order_id: &str) -> Result<Value> {
- let mut client = self.rest_client.lock().await;
- let response = client.get_order(order_id).await;
- let value = &response.data;
- // 预先捕获整个 Value 的字符串表示,用于错误报告
- let value_str = serde_json::to_string(&value)
- .unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
- // 获取并判定 status
- let status = value.get("status")
- .and_then(|v| v.as_str())
- .ok_or_else(|| anyhow!("查单-获取 'status' 失败,原始JSON:{}", value_str))?;
- if status != "OK" {
- bail!("查单失败,状态不为OK: {},原始JSON:{}", status, value_str)
- }
- // 获取 data 字段
- value.get("data")
- .cloned()
- .ok_or_else(|| anyhow!("查单-获取 'data' 字段失败,原始 JSON: {}", value_str))
- }
- fn match_create_order_result(&self, create_result: &Result<Response>) -> Result<String> {
- match create_result {
- Ok(response) => {
- let value = &response.data;
- // 预先捕获整个 Value 的字符串表示,用于错误报告
- let value_str = serde_json::to_string(&value).unwrap_or_else(|_| "无法序列化 JSON Value".to_string());
- // 获取status
- let status = value.get("status")
- .and_then(|v| v.as_str())
- .ok_or_else(|| anyhow!("下单-获取 'status' 失败,原始JSON:{}", value_str))?;
- // 判定status
- if status != "OK" {
- bail!("下单失败,状态不为OK,原始JSON:{}", value_str)
- }
- // 尝试获取 data 字段
- let data = value.get("data")
- .ok_or_else(|| anyhow!("下单-获取 'data' 字段失败,原始 JSON: {}", value_str))?;
- // 获取order的id
- let id = data.get("id")
- .and_then(|v| v.as_i64())
- .ok_or_else(|| anyhow!("下单-获取 'data.id' 失败,原始JSON:{}", value_str))?;
- Ok(id.to_string())
- }
- Err(error) => {
- bail!("下单失败:{}", error);
- }
- }
- }
- }
|