Browse Source

清仓程序完善(待测)

JiahengHe 1 year ago
parent
commit
1efdb78fc5

+ 1 - 1
exchanges/src/socket_tool.rs

@@ -235,7 +235,7 @@ impl AbstractWsMode {
                     }
                 }
             ) {
-                Ok(o) => {
+                Ok(_o) => {
                     trace!("发送指令-心跳:{:?}",h_type);
                 }
                 Err(k) => {

+ 26 - 0
global/src/clear_position_result.rs

@@ -0,0 +1,26 @@
+use serde_derive::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub struct ClearPositionResult {
+    pub robot_id: String,
+    pub clear_order_num: String,
+    pub clear_order_str: String,
+    pub clear_position_num: String,
+    pub clear_position_str: String,
+    pub clear_other_err: bool,
+    pub clear_other_str: String
+}
+
+impl ClearPositionResult {
+    pub fn new() -> ClearPositionResult {
+        ClearPositionResult{
+            robot_id: "".to_string(),
+            clear_order_num: "0".to_string(),
+            clear_order_str: "".to_string(),
+            clear_position_num: "0".to_string(),
+            clear_position_str: "".to_string(),
+            clear_other_err: false,
+            clear_other_str: "".to_string(),
+        }
+    }
+}

+ 1 - 0
global/src/lib.rs

@@ -5,3 +5,4 @@ pub mod trace_stack;
 pub mod export_utils;
 pub mod account_info;
 pub mod cci;
+pub mod clear_position_result;

+ 36 - 0
src/clear_core_libs.rs

@@ -0,0 +1,36 @@
+
+use std::collections::BTreeMap;
+use std::io::Error;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool};
+use tokio::sync::{mpsc, Mutex};
+use tracing::{info};
+use global::cci::CentralControlInfo;
+use global::params::Params;
+use standard::Order;
+use strategy::clear_core::ClearCore;
+
+pub async fn init(params: Params,
+                  _ws_running: Arc<AtomicBool>,
+                  running: Arc<AtomicBool>,
+                  cci_arc: Arc<Mutex<CentralControlInfo>>) -> ClearCore {
+    // 封装
+    let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
+    exchange_params.insert("access_key".to_string(), params.access_key.clone());
+    exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
+    exchange_params.insert("pass_key".to_string(), params.pass_key.clone());
+
+    let (order_sender, _order_receiver) = mpsc::channel::<Order>(100);
+    let (error_sender, _error_receiver) = mpsc::channel::<Error>(100);
+
+    let mut core_obj = ClearCore::new(params.exchange.clone(),
+                                 params.clone(),
+                                 exchange_params.clone(),
+                                 order_sender.clone(),
+                                 error_sender.clone(),
+                                 running.clone(),
+                                 cci_arc.clone()).await;
+    info!("清仓检查程序ClearCore初始化……");
+    core_obj.before_trade().await;
+    return core_obj;
+}

+ 49 - 22
src/main.rs

@@ -1,13 +1,14 @@
 mod server;
 mod control_c;
 mod core_libs;
+mod clear_core_libs;
 
 use std::str::FromStr;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use tokio::sync::Mutex;
-use tracing::{info, warn};
+use tracing::{error, info, warn};
 use tracing_appender_timezone::non_blocking::WorkerGuard;
 use global::cci::CentralControlInfo;
 use global::log_utils::{send_remote_err_log};
@@ -46,6 +47,7 @@ fn read_params_json() -> Params {
     let mut call_port = 5555;
     // 运行模式 0.正常策略运行, 1.清理挂单及仓位
     let mut run_mode = 0;
+    let robot_id;
 
     let args: Vec<String> = std::env::args().collect();
 
@@ -79,6 +81,20 @@ fn read_params_json() -> Params {
                 }
             }
         }
+
+        // Check for the --port argument and assign its value.
+        if arg.starts_with("--robot_id=") {
+            let parts: Vec<&str> = arg.split('=').collect();
+            if parts.len() == 2 {
+                robot_id = parts[1]
+            } else {
+                error!("启动失败,机器人id参数格式设置错误 --robot_id=xxx!");
+                panic!("启动失败,机器人id参数格式设置错误 --robot_id=xxx!");
+            }
+        } else {
+            error!("启动失败,缺少机器人id参数!");
+            panic!("启动失败,缺少机器人id参数!");
+        }
     }
 
     println!("通讯端口:{}, 配置文件路径:{}", call_port, path);
@@ -125,28 +141,39 @@ async fn main() {
 
     // ws退出程序
     let ws_running = Arc::new(AtomicBool::new(true));
-    // core初始化动作
-    let core_arc = core_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
-    // 初始化中控服务
-    server::run_server(params.port.clone(), running.clone(), cci_arc.clone());
-    // ctrl c退出检查程序
-    control_c::exit_handler(running.clone());
-
-    // 每一秒检查一次程序是否结束
-    while running.load(Ordering::Relaxed) {
+    if params.run_mode == 1{
+        // core初始化动作
+        let mut core_arc = clear_core_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
+        info!("开始执行清仓程序");
+        core_arc.exit("123456".to_string()).await;
+        info!("清仓程序执行完毕");
+        // 强制退出
+        std::process::exit(0);
+    } else {
+        // core初始化动作
+        let core_arc = core_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
+        // 初始化中控服务
+        server::run_server(params.port.clone(), running.clone(), cci_arc.clone());
+        // ctrl c退出检查程序
+        control_c::exit_handler(running.clone());
+
+        // 每一秒检查一次程序是否结束
+        while running.load(Ordering::Relaxed) {
+            tokio::time::sleep(Duration::from_secs(1)).await;
+        }
+
+        info!("检测到退出信号!停止ws订阅……");
+        ws_running.store(false, Ordering::Relaxed);
         tokio::time::sleep(Duration::from_secs(1)).await;
+
+        info!("等待清空仓位、订单(再次按control c可以立马结束)……");
+        let mut core = core_arc.lock().await;
+        core.exit().await;
+        info!("程序已退出!为以防万一,请再次检查仓位和订单!");
+        // 等两秒,等中控反应过来
+        tokio::time::sleep(Duration::from_secs(2)).await;
+        // 强制退出
+        std::process::exit(0);
     }
 
-    info!("检测到退出信号!停止ws订阅……");
-    ws_running.store(false, Ordering::Relaxed);
-    tokio::time::sleep(Duration::from_secs(1)).await;
-
-    info!("等待清空仓位、订单(再次按control c可以立马结束)……");
-    let mut core = core_arc.lock().await;
-    core.exit().await;
-    info!("程序已退出!为以防万一,请再次检查仓位和订单!");
-    // 等两秒,等中控反应过来
-    tokio::time::sleep(Duration::from_secs(2)).await;
-    // 强制退出
-    std::process::exit(0);
 }

+ 3 - 2
standard/src/lib.rs

@@ -4,6 +4,7 @@ use std::fmt::Formatter;
 use std::io::{Error};
 use async_trait::async_trait;
 use rust_decimal::Decimal;
+use serde::Serialize;
 use serde_json::Value;
 use tokio::time::Instant;
 use global::trace_stack::TraceStack;
@@ -278,7 +279,7 @@ impl SpecialOrder {
 /// - `avg_price(Decimal)`: 成交均价
 /// - `status(String)`: 订单状态
 /// - `order_type(String)`: 订单类型
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
 pub struct Order {
     pub id: String,
     pub custom_id: String,
@@ -397,7 +398,7 @@ impl Market {
 /// - `profit(Decimal)`: 持仓浮动盈亏
 /// - `position_mode(PositionModeEnum)`: 持仓模式
 /// - `margin(Decimal)`: 仓位占用的保证金
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
 pub struct Position {
     pub symbol: String,
     pub margin_level: Decimal,

+ 670 - 0
strategy/src/clear_core.rs

@@ -0,0 +1,670 @@
+use tokio::time::Instant;
+use std::collections::{BTreeMap, HashMap};
+use std::io::Error;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use chrono::{Utc};
+use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
+use rust_decimal::Decimal;
+use rust_decimal::prelude::ToPrimitive;
+use rust_decimal_macros::dec;
+use tokio::sync::mpsc::{Sender};
+use tokio::sync::{Mutex};
+use tokio::time::sleep;
+use tracing::{error, info, warn};
+use global::cci::CentralControlInfo;
+use global::clear_position_result::ClearPositionResult;
+use global::params::Params;
+use global::trace_stack::TraceStack;
+use standard::{Account, Market, Order, Platform, Position, PositionModeEnum, SpecialTicker};
+use standard::exchange::{Exchange};
+use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
+
+use crate::model::{LocalPosition, OrderInfo};
+use crate::predictor::Predictor;
+use crate::strategy::Strategy;
+use crate::utils;
+use crate::utils::clip;
+
+
+pub struct ClearCore {
+    pub params: Params,
+    // 启动时间
+    pub start_time: i64,
+    // 币对
+    pub symbol: String,
+    // 基础货币
+    pub base: String,
+    // 报价货币
+    pub quote: String,
+    //
+    pub strategy: Strategy,
+    // 本地挂单表
+    pub local_orders: HashMap<String, OrderInfo>,
+    // 本地订单缓存队列
+    pub local_orders_backup: HashMap<String, OrderInfo>,
+    // 本地订单缓存cid队列
+    pub local_orders_backup_cid: Vec<String>,
+    // 本地已处理cid缓存队列
+    pub handled_orders_cid: Vec<String>,
+    // 本地利润值
+    pub local_profit: Decimal,
+    // 本地U保证金
+    pub local_cash: Decimal,
+    // 本地币保证金
+    pub local_coin: Decimal,
+    // 仓位信息
+    pub local_position: LocalPosition,
+    // 仓位信息-来自订单
+    pub local_position_by_orders: LocalPosition,
+    //
+    pub local_buy_amount: Decimal,
+    pub local_sell_amount: Decimal,
+    pub local_buy_value: Decimal,
+    pub local_sell_value: Decimal,
+    pub local_cancel_log: HashMap<String, i64>,
+    pub interval: u64,
+    pub exchange: String,
+    pub exit_msg: String,
+    // 仓位检查结果序列
+    pub position_check_series: Vec<i8>,
+    // 止损大小
+    pub stop_loss: Decimal,
+    // 资金使用率
+    pub used_pct: Decimal,
+    // 启停信号 0 表示运行 大于1开始倒计时 1时停机
+    pub mode_signal: i8,
+    // 交易盘口订单流更新时间
+    pub trade_order_update_time: i64,
+    // onTick触发时间记录
+    pub on_tick_event_time: i64,
+    // 盘口ticker信息
+    pub tickers: HashMap<String, SpecialTicker>,
+    // 盘口 depth信息
+    pub depths: HashMap<String, Vec<Decimal>>,
+    // 行情更新延迟监控(风控)
+    pub market_update_time: HashMap<String, i64>,
+    pub market_update_interval: HashMap<String, Decimal>,
+    pub ref_num: i8,
+    pub ref_name: Vec<String>,
+    pub trade_name: String,
+    pub ready: i8,
+    pub predictor: Predictor,
+    pub market: Market,
+    pub platform_rest: Box<dyn Platform + Send + Sync>,
+    // 市场最优买卖价
+    pub max_buy_min_sell_cache: HashMap<String, Vec<Decimal>>,
+    // 最近一次的depth信息
+    pub local_depths: HashMap<String, Vec<Decimal>>,
+    pub is_update: HashMap<String, bool>,
+    pub running: Arc<AtomicBool>,
+    pub hold_coin: Decimal,
+
+    // 打印限频
+    pub prev_log_ready_timestamp: i64,
+    pub log_ready_log_interval: i64,
+
+    // 中控
+    pub cci_arc: Arc<Mutex<CentralControlInfo>>,            // 中控信息汇集
+
+    // 老版的trader_msg留下来的
+    pub agg_market: Vec<Decimal>,
+    pub ref_price: Vec<Vec<Decimal>>,
+    pub predict: Decimal,
+}
+
+impl ClearCore {
+    pub async fn new(exchange: String,
+                     params: Params,
+                     exchange_params: BTreeMap<String, String>,
+                     order_sender: Sender<Order>,
+                     error_sender: Sender<Error>,
+                     running: Arc<AtomicBool>,
+                     cci_arc: Arc<Mutex<CentralControlInfo>>) -> ClearCore {
+        let symbol = params.pair.clone();
+        let pairs: Vec<&str> = params.pair.split('_').collect();
+        let mut core_obj = ClearCore {
+            params: params.clone(),
+            start_time: 0,
+            symbol: symbol.clone(),
+            base: pairs[0].to_string(),
+            quote: pairs[1].to_string(),
+            // 现货底仓
+            hold_coin: clip(params.hold_coin, Decimal::ZERO, Decimal::ONE_HUNDRED * Decimal::ONE_HUNDRED),
+            strategy: Strategy::new(&params, true),
+            local_orders: Default::default(),
+            local_orders_backup: Default::default(),
+            local_orders_backup_cid: Default::default(),
+            handled_orders_cid: Default::default(),
+            local_profit: Default::default(),
+            local_cash: Default::default(),
+            local_coin: Default::default(),
+            local_position: LocalPosition {
+                long_pos: Default::default(),
+                short_pos: Default::default(),
+                long_avg: Default::default(),
+                short_avg: Default::default(),
+            },
+            local_position_by_orders: LocalPosition {
+                long_pos: Default::default(),
+                short_pos: Default::default(),
+                long_avg: Default::default(),
+                short_avg: Default::default(),
+            },
+            local_buy_amount: Default::default(),
+            local_sell_amount: Default::default(),
+            local_buy_value: Default::default(),
+            local_sell_value: Default::default(),
+            local_cancel_log: Default::default(),
+            interval: params.interval,
+            exchange: params.exchange,
+            exit_msg: "正常退出".to_string(),
+            position_check_series: Default::default(),
+            stop_loss: params.stop_loss,
+            used_pct: dec!(0.95),
+            mode_signal: 0,
+            trade_order_update_time: Utc::now().timestamp_millis(),
+            on_tick_event_time: Utc::now().timestamp_millis(),
+            tickers: Default::default(),
+            depths: Default::default(),
+            market_update_time: Default::default(),
+            market_update_interval: Default::default(),
+            ref_num: params.ref_exchange.len() as i8,
+            ref_name: Default::default(),
+            trade_name: "".to_string(),
+            ready: 0,
+            predictor: Predictor {
+                loop_count: 0,
+                market_info_list: vec![],
+                mid_price_list: vec![],
+                ref_mid_price_per_exchange_per_frame: vec![],
+                ref_exchange_length: 0,
+                data_length_max: 0,
+                alpha: vec![],
+                gamma: Default::default(),
+                avg_spread_list: vec![],
+            },
+            market: Market {
+                symbol: symbol.clone(),
+                base_asset: "".to_string(),
+                quote_asset: "".to_string(),
+                tick_size: Default::default(),
+                price_precision: Default::default(),
+                amount_precision: Default::default(),
+                min_qty: Default::default(),
+                max_qty: Default::default(),
+                min_notional: Default::default(),
+                max_notional: Default::default(),
+                ct_val: Default::default(),
+                amount_size: Default::default(),
+            },
+            platform_rest: match exchange.as_str() {
+                "kucoin_usdt_swap" => {
+                    Exchange::new(KucoinSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                "gate_usdt_swap" => {
+                    Exchange::new(GateSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                // "gate_usdt_spot" => {
+                //     Exchange::new(GateSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                "binance_usdt_swap" => {
+                    Exchange::new(BinanceSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                // "binance_spot" => {
+                //     Exchange::new(BinanceSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                // "bitget_spot" => {
+                //     Exchange::new(BitgetSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                "bitget_usdt_swap" => {
+                    Exchange::new(BitgetSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                // "okex_usdt_swap" => {
+                //     Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                "bybit_usdt_swap" => {
+                    Exchange::new(BybitSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                "coinex_usdt_swap" => {
+                    Exchange::new(CoinexSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                "htx_usdt_swap" => {
+                    Exchange::new(HtxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                _ => {
+                    error!("203未找到对应的交易所rest枚举!");
+                    panic!("203未找到对应的交易所rest枚举!");
+                }
+            },
+            max_buy_min_sell_cache: Default::default(),
+            local_depths: Default::default(),
+            is_update: Default::default(),
+            running,
+            prev_log_ready_timestamp: 0,
+            log_ready_log_interval: 10 * 1000,
+            cci_arc,
+            agg_market: vec![],
+            ref_price: vec![],
+            predict: Default::default(),
+        };
+        for i in 0..=params.ref_exchange.len() - 1 {
+            // 拼接不会消耗原字符串
+            let tickers_key: String = format!("{}{}{}{}", params.ref_exchange[i], "@", params.ref_pair[i], "@ref");
+            let ref_name_element = tickers_key.clone();
+            let depths_key: String = tickers_key.clone();
+            let market_update_time_key = tickers_key.clone();
+            let market_update_interval_key = tickers_key.clone();
+            let max_buy_min_sell_cache_key = tickers_key.clone();
+
+            core_obj.tickers.insert(tickers_key, SpecialTicker::new());
+            core_obj.ref_name.push(ref_name_element);
+            core_obj.depths.insert(depths_key, Default::default());
+            core_obj.market_update_time.insert(market_update_time_key, Default::default());
+            core_obj.market_update_interval.insert(market_update_interval_key, Default::default());
+            core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
+        }
+        let name = format!("{}{}{}", core_obj.exchange.clone(), "@", core_obj.symbol);
+        let market_update_time_key = name.clone();
+        let market_update_interval_key = name.clone();
+        let tickers_key = name.clone();
+        let depths_key = name.clone();
+        let max_buy_min_sell_cache_key = name.clone();
+        core_obj.trade_name = name;
+        core_obj.market_update_time.insert(market_update_time_key, Default::default());
+        core_obj.market_update_interval.insert(market_update_interval_key, Default::default());
+        core_obj.tickers.insert(tickers_key, SpecialTicker::new());
+        core_obj.depths.insert(depths_key, Default::default());
+        core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
+        // broker.newWs
+        let mut price_alpha: Vec<Decimal> = Vec::new();
+        for ref_pair_str in params.ref_pair {
+            if params.pair.contains("1000") && !ref_pair_str.contains("1000") {
+                price_alpha.push(dec!(1000.0));
+            } else if !params.pair.contains("1000") && ref_pair_str.contains("1000") {
+                price_alpha.push(dec!(0.001))
+            } else {
+                price_alpha.push(dec!(1.0));
+            }
+        }
+        info!("价格系数:{:?}", price_alpha);
+        core_obj.predictor = Predictor::new(core_obj.ref_name.len())
+            .alpha(price_alpha)
+            .gamma(params.gamma);
+
+        return core_obj;
+    }
+
+    pub fn log_ready_status(&mut self, msg: String) {
+        // 隔一会再打印未准备就绪的台词
+        let now_timestamp = Utc::now().timestamp_millis();
+        if now_timestamp - self.prev_log_ready_timestamp > self.log_ready_log_interval {
+            self.prev_log_ready_timestamp = now_timestamp;
+            info!("{}", msg);
+        }
+    }
+
+
+    // #[instrument(skip(self, data), level="TRACE")]
+    pub async fn update_position(&mut self, data: Vec<Position>) {
+        if data.is_empty() {
+            return;
+        }
+        let mut position = LocalPosition::new();
+        for pos in &data {
+            if pos.position_mode == PositionModeEnum::Long {
+                position.long_pos = pos.amount;
+                position.long_avg = pos.price;
+            } else if pos.position_mode == PositionModeEnum::Short {
+                position.short_pos = pos.amount.abs();
+                position.short_avg = pos.price;
+            }
+        }
+        // 更新仓位信息
+        if position != self.local_position {
+            info!("收到新的仓位推送, position: {:?}", data);
+            info!("更新本地仓位:{:?}", position);
+            self.local_position = position;
+        }
+
+        // 更新中控持仓相关的信息
+        {
+            let mut pos = self.local_position_by_orders.long_pos - self.local_position_by_orders.short_pos;
+            if !self.exchange.contains("spot") {
+                pos = self.local_position.long_pos - self.local_position.short_pos;
+            }
+            pos.rescale(8);
+
+            let mut entry_price;
+            if pos.gt(&Decimal::ZERO) {
+                entry_price = self.local_position_by_orders.long_avg;
+            } else {
+                entry_price = self.local_position_by_orders.short_avg;
+            }
+            entry_price.rescale(8);
+
+            let mut cci = self.cci_arc.lock().await;
+            cci.pos = pos;
+            cci.entry_price = entry_price;
+        }
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn get_exchange_info(&mut self) {
+        self.market = self.platform_rest.get_self_market();
+        info!(?self.market);
+    }
+
+    // #[instrument(skip(self, data), level="TRACE")]
+    pub async fn update_equity(&mut self, data: Account) {
+        /*
+           更新保证金信息
+           合约一直更新
+           现货只有当出现异常时更新
+       */
+        if self.exchange.contains("spot") {
+            return;
+        }
+        self.local_cash = data.balance * self.used_pct;
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn update_equity_rest_swap(&mut self) {
+        match self.platform_rest.get_account().await {
+            Ok(account) => {
+                /*
+                   更新保证金信息
+                   合约一直更新
+                   现货只有当出现异常时更新
+               */
+                self.local_cash = account.balance * self.used_pct
+            }
+            Err(e) => {
+                info!("获取账户信息错误: {:?}", e);
+            }
+        }
+    }
+
+    pub async fn update_position_rest_swap(&mut self) {
+        let position = self.platform_rest.get_position().await;
+        match position {
+            Ok(val) => {
+                // info!("bybit_swap:定时获取的仓位信息");
+                self.update_position(val).await;
+            }
+            Err(err) => {
+                error!("bybit_swap:定时获取仓位信息错误!\nget_position:res_data={:?}", err);
+            }
+        }
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn update_equity_rest_spot(&mut self) {
+        match self.platform_rest.get_spot_account().await {
+            Ok(mut val) => {
+                // 如果返回的数组里没有交易货币,则补充交易货币
+                if !val.iter().any(|a| a.coin.to_uppercase().eq(&self.base.to_uppercase())) {
+                    let mut base_coin_account = Account::new();
+                    base_coin_account.coin = self.base.to_uppercase();
+                    val.push(base_coin_account);
+                }
+
+                for account in val {
+                    // 交易货币
+                    if self.base.to_uppercase() == account.coin {
+                        self.local_coin = account.balance;
+                    }
+                    // 本位货币
+                    if self.quote.to_uppercase() == account.coin {
+                        self.local_cash = account.balance;
+                    }
+                }
+            }
+            Err(err) => {
+                error!("获取仓位信息异常: {}", err);
+            }
+        }
+    }
+
+    // #[instrument(skip(self, target_hold_coin), level="TRACE")]
+    pub async fn check_position(&mut self) -> ClearPositionResult {
+        let mut result = ClearPositionResult::new();
+        info!("------------------------------------------------------------------------------------------------------------");
+        info!("步骤一:检查挂单:");
+        let mut is_order_clear = false;
+        match self.platform_rest.cancel_orders_all().await {
+            Ok(val) => {
+                let length = val.len();
+                is_order_clear = length == 0;
+                result.clear_order_num = length.to_string();
+                info!("已清空所有挂单({}条)", length);
+                result.clear_order_str = serde_json::to_string(&val).expect("Failed to serialize to JSON---- order");
+                for o in val {
+                    info!("    {:?}", o);
+                }
+            }
+            Err(err) => {
+                warn!("取消所有订单异常({}),启动备用方法。", err);
+                match self.platform_rest.cancel_orders().await {
+                    Ok(val) => {
+                        let length = val.len();
+                        is_order_clear = length == 0;
+                        result.clear_order_num = length.to_string();
+                        result.clear_order_str = serde_json::to_string(&val).expect("Failed to serialize to JSON----  order");
+                        info!("清空所有挂单({}条):{:?}", length, val);
+                    }
+                    Err(exc) => {
+                        result.clear_order_str = exc.to_string();
+                        result.clear_other_err = true;
+                        error!("清空当前币对订单异常: {}", exc);
+                    }
+                }
+            }
+        }
+        info!("挂单检查完毕。");
+        info!("");
+
+        info!("步骤二:检查仓位:");
+        match self.platform_rest.get_positions().await {
+            Ok(val) => {
+                info!("检查仓位信息");
+                result.clear_position_num = val.len().to_string();
+                for position in val {
+                    if position.amount.eq(&Decimal::ZERO) {
+                        continue;
+                    }
+                    info!("    仓位:{:?}", position);
+                    let price = Decimal::ZERO;
+                    let side;
+                    info!(?position);
+                    match position.position_mode {
+                        PositionModeEnum::Long => {
+                            // pd
+                            side = "pd";
+                        }
+                        PositionModeEnum::Short => {
+                            // pk
+                            side = "pk";
+                        }
+                        _ => {
+                            error!("    仓位position_mode匹配失败,不做操作!");
+                            // 执行完当前币对  结束循环
+                            continue;
+                        }
+                    }
+                    // 发起清仓订单
+                    let mut ts = TraceStack::new(0, Instant::now());
+                    ts.on_before_send();
+
+                    // 市价单
+                    match self.platform_rest.take_order_symbol(position.symbol.clone(),
+                                                               Decimal::ONE,
+                                                               utils::generate_client_id(None).as_str(),
+                                                               side,
+                                                               price,
+                                                               position.amount.abs()).await {
+                        Ok(order) => {
+                            ts.on_after_send();
+                            info!("    {}仓位清除市价下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
+                            result.clear_position_str = format!("{} >仓位信息:{:?} 下单信息: {:?}",result.clear_position_str, position, order);
+                            // 执行完当前币对  结束循环
+                            continue;
+                        }
+                        Err(error) => {
+                            // ts.on_after_send();
+                            error!("    {}仓位清除市价下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
+                            result.clear_other_str = format!("{} >仓位信息:{:?} 下单异常信息: {:?}",result.clear_other_str, position, error);
+                            // 执行完当前币对  结束循环
+                            continue;
+                        }
+                    };
+                }
+            }
+            Err(error) => {
+                result.clear_other_err = true;
+                result.clear_position_str = format!("获取仓位异常 {}", error);
+                error!("获取仓位信息异常: {}", error);
+            }
+        }
+        info!("------------------------------------------------------------------------------------------------------------");
+        info!("");
+
+        return result;
+    }
+
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn exit(&mut self, robot_id: String) -> bool {
+        info!("-------------------------启动退出流程({})----------------------------", self.exit_msg);
+        info!("");
+
+        let mut result = self.check_position().await;
+        // 设置机器人id
+        result.robot_id = robot_id;
+        info!("清仓程序结果 {:?}", result);
+        /**
+         * todo: 判断是否有清仓,是否有异常
+        **/
+        if result.clear_position_num != "0" || result.clear_order_num != "0" || result.clear_other_err{
+            send_clear_msg_request(&result).await;
+            // 上报清仓日志
+        }
+
+        info!("订单、仓位清除完毕,为避免api失效导致遗漏仓位,建议人工复查。");
+        info!("停机原因:{}。", self.exit_msg);
+        return true;
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn before_trade(&mut self) -> bool {
+        sleep(Duration::from_secs(1)).await;
+        // 获取市场信息
+        self.get_exchange_info().await;
+        // 获取价格信息
+        let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!");
+        info!(?ticker);
+        let mp = (ticker.buy + ticker.sell) / Decimal::TWO;
+        // 获取账户信息
+        if self.exchange.contains("spot") {
+            self.update_equity_rest_spot().await;
+        } else {
+            self.update_equity_rest_swap().await;
+        }
+        // 更新中控账户相关信息
+        {
+            let mut now_balance = self.local_cash / self.used_pct;
+            now_balance.rescale(4);
+
+            let mut cci = self.cci_arc.lock().await;
+            cci.now_balance = now_balance;
+        }
+        // 初始资金
+        let start_cash = self.local_cash.clone();
+        let start_coin = self.local_coin.clone();
+        if start_cash.is_zero() && start_coin.is_zero() {
+            self.exit_msg = format!("{}{}{}{}", "初始余额为零 cash: ", start_cash, " coin: ", start_coin);
+        }
+        info!("初始cash: {start_cash} 初始coin: {start_coin}");
+        // 初始化策略基础信息
+        if mp <= Decimal::ZERO {
+            self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp);
+            // 停止程序
+            self.stop().await;
+            return false;
+        } else {
+            info!("初始价格为 {}", mp);
+        }
+        self.strategy.mp = mp.clone();
+        self.strategy.start_cash = start_cash.clone();
+        self.strategy.start_coin = start_coin.clone();
+        self.strategy.start_equity = start_cash + start_coin * mp;
+        self.strategy.max_equity = self.strategy.start_equity.clone();
+        self.strategy.equity = self.strategy.start_equity.clone();
+        self.strategy.total_amount = self.strategy.equity * self.strategy.lever_rate / self.strategy.mp;
+        // 获取数量精度
+        self.strategy.step_size = self.market.amount_size.clone();
+        if self.strategy.step_size > Decimal::ONE {
+            self.strategy.step_size = self.strategy.step_size.trunc();
+        }
+        // 获取价格精度
+        self.strategy.tick_size = self.market.tick_size.clone();
+        if self.strategy.tick_size > Decimal::ONE {
+            self.strategy.tick_size = self.strategy.tick_size.trunc();
+        }
+        if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero() {
+            self.exit_msg = format!("{}{}{}{}", "交易精度未正常获取 step_size: ", self.strategy.step_size, " tick_size:", self.strategy.tick_size);
+            // 停止程序
+            self.stop().await;
+            return false;
+        } else {
+            info!("数量精度 {}", self.strategy.step_size);
+            info!("价格精度 {}", self.strategy.tick_size);
+        }
+
+        // 初始化调度器
+        self.local_cash = start_cash;
+        self.local_coin = start_coin;
+
+        // 买入平台币
+        if self.exchange.contains("spot") { // 现货
+
+        }
+
+        // 清空挂单和仓位
+        return true;
+    }
+}
+
+// 清仓消息上报中控
+pub async fn send_clear_msg_request(body_params: &ClearPositionResult) {
+    // 创建客户端
+    let client = reqwest::Client::new();
+
+    // 创建请求头
+    let mut headers = HeaderMap::new();
+    headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+    headers.insert("token", HeaderValue::from_static("token_value"));
+
+    // 发送 POST 请求
+    let res = client
+        .post("https://xxxx.com")
+        .headers(headers)
+        .json(body_params)
+        .send()
+        .await;
+    match res {
+        Ok(response) => {
+            let response_text = response.text().await.unwrap_or("获取请求的响应文本异常".to_string());
+            // 检查响应状态并读取响应体
+            if response.status().is_success() {
+                info!("清仓结果上报中控,请求成功,响应文本: {}", response_text);
+            } else {
+                println!("清仓结果上报中控,请求失败: 响应异常码 {},响应文本 {}", response.status(), response_text);
+            }
+        },
+        Err(e) => {
+            error!("清仓结果上报中控,请求发送失败,异常:{}", e)
+        }
+    }
+}

+ 2 - 1
strategy/src/lib.rs

@@ -14,4 +14,5 @@ mod okx_usdt_swap;
 mod bybit_usdt_swap;
 mod bitget_usdt_swap;
 mod coinex_usdt_swap;
-mod htx_usdt_swap;
+mod htx_usdt_swap;
+pub mod clear_core;