Forráskód Böngészése

合并3.0 和 coinex下单报错立即停机

JiahengHe 1 éve
szülő
commit
9b8afedb23

+ 1 - 1
Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "as-rust"
-version = "3.2.4"
+version = "3.5.0"
 edition = "2021"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

+ 13 - 37
exchanges/src/coinex_swap_rest.rs

@@ -10,7 +10,7 @@ use serde_json::Value;
 use crate::http_tool::RestTool;
 use crate::response_base::ResponseData;
 use sha2::{Digest, Sha256};
-use tracing::{error, info};
+use tracing::{error};
 
 #[derive(Clone)]
 pub struct CoinexSwapRest {
@@ -346,7 +346,6 @@ impl CoinexSwapRest {
                                     None,
                                     Some(params.to_string()),
             ).await;
-            info!("通过交易所id取消:响应 {:?}", data);
             data
         } else if client_id != "" {  // 如果客户端id不为空,则用客户端id取消订单
             let params = serde_json::json!({
@@ -362,7 +361,6 @@ impl CoinexSwapRest {
                                         None,
                                         Some(params.to_string()),
             ).await;
-            info!("通过客户端id取消:响应 {:?}", data);
             // 非空的
             if data.code == 200 && !data.data.is_null() {
                 data.data = data.data.as_array().unwrap()[0]["data"].clone();
@@ -630,40 +628,18 @@ impl CoinexSwapRest {
         };
 
         // 读取响应的内容
-        let res = request_builder.send().await;
-        match res {
-            Ok(response) => {
-                let is_success = response.status().is_success(); // 先检查状态码
-                let text_result = response.text().await;
-                match text_result {
-                    Ok(text) => {
-                        let data_json_str: Result<Value, serde_json::Error> = serde_json::from_str(text.as_str());
-                        match data_json_str {
-                            Ok(data_json) => {
-                                return if is_success && data_json["code"].to_string() == "0" {
-                                    self.on_success_data(data_json["data"].clone())
-                                } else {
-                                    self.on_error_data(&text, &url, &body)
-                                };
-                            }
-                            Err(e) => {
-                                error!("{} 请求完成,解析响应内容JSON失败 {} {}", url, text.as_str(), e);
-                                self.on_error_data(&e.to_string(), &url, &body)
-                            }
-                        }
-                    }
-                    Err(e) => {
-                        error!("{} 请求完成,解析响应内容失败 {}", url, e);
-                        self.on_error_data(&e.to_string(), &url, &body)
-                    }
-                }
-            }
-            Err(e) => {
-                // 异常情况
-                error!("{} 请求失败,网络错误 {}", url, e);
-                self.on_error_data(&e.to_string(), &url, &body)
-            }
-        }
+        let response = request_builder.send().await.unwrap();
+
+        let is_success = response.status().is_success(); // 先检查状态码
+        let text = response.text().await.unwrap();
+
+        let data_json: Value = serde_json::from_str(text.as_str()).unwrap();
+        return if is_success && data_json["code"].to_string() == "0" {
+            self.on_success_data(data_json["data"].clone())
+        } else {
+            self.on_error_data(&text, &url, &body)
+        };
+
     }
 
     pub fn on_success_data(&mut self, text: Value) -> ResponseData {

+ 133 - 0
global/src/clear_log_utils.rs

@@ -0,0 +1,133 @@
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io;
+use tracing::{Event, info, Subscriber, warn};
+use tracing_appender_timezone::non_blocking::WorkerGuard;
+use tracing_subscriber::{fmt, Layer};
+use tracing_subscriber::layer::{Context, SubscriberExt};
+use reqwest::{Client};
+use tracing::field::{Field, Visit};
+use tracing_appender_timezone::rolling::{RollingFileAppender, Rotation};
+
+
+struct ErrorMessageVisitor {
+    message: String
+}
+
+impl Visit for ErrorMessageVisitor {
+    fn record_debug(&mut self, field: &Field, value: &dyn Debug) {
+        if field.name() == "message" {
+            self.message = format!("{:?}", value);
+        }
+    }
+}
+
+
+// 错误报告发送到指定服务器
+struct ReportingLayer {
+    account_name: String,
+}
+impl<S> Layer<S> for ReportingLayer
+where
+    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
+{
+    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
+        if event.metadata().level() == &tracing::Level::ERROR {
+            let mut visitor = ErrorMessageVisitor {
+                message: String::new()
+            };
+            event.record(&mut visitor);
+
+            let msg = format!("account={}, type=error, msg={}", self.account_name.clone(), visitor.message);
+            info!(msg)
+            // send_remote_err_log(msg)
+        }
+    }
+}
+
+pub fn send_remote_err_log(msg: String) {
+    tokio::spawn(async move {
+        let encoded_str = base64::encode(msg.clone());
+        let mut request_json_data = HashMap::new();
+        request_json_data.insert("serverName", "As");
+        request_json_data.insert("data", encoded_str.as_str());
+
+        let res = Client::new().post("https://hhh.liangjiang.cc/api/log/addError?key=d64a8sc874sa8c4as5")
+            .json(&request_json_data)
+            .send()
+            .await;
+
+        match res {
+            Ok(_resp) => {
+                // let body = _resp.text().await.unwrap();
+            }
+            Err(err) => {
+                warn!("log的error监听器发送远端报错失败:{:?}", err);
+            }
+        }
+    });
+}
+
+pub fn init_log_with_debug() {
+    let _ = final_init(tracing::Level::DEBUG.as_str(), 0, "test".to_string());
+}
+
+pub fn init_log_with_trace() {
+    let _ = final_init(tracing::Level::TRACE.as_str(), 0, "test".to_string());
+}
+
+pub fn init_log_with_info() {
+    let _ = final_init(tracing::Level::INFO.as_str(), 0, "test".to_string());
+}
+
+pub fn final_init(level: &str, port: u32, account_name: String) -> WorkerGuard {
+    let mut path = String::new();
+    path.push_str("./logs");
+    path.push_str(port.to_string().as_str());
+    path.push_str("/clear_program");
+
+    let file_appender = RollingFileAppender::builder()
+        .time_zone(8)
+        .rotation(Rotation::DAILY)
+        .filename_suffix("log")
+        .build(path)
+        .expect("initializing rolling file appender failed");
+    let (non_blocking, guard) = tracing_appender_timezone::non_blocking(file_appender);
+
+    use time::{macros::format_description, UtcOffset};
+    use tracing_subscriber::{fmt::time::OffsetTime};
+    let local_time = OffsetTime::new(
+        UtcOffset::from_hms(8, 0, 0).unwrap(),
+        format_description!("[month]-[day] [hour]:[minute]:[second].[subsecond digits:3]"),
+    );
+
+    let fmt_layer = fmt::layer()
+        .with_timer(local_time.clone())
+        .with_target(true)
+        .with_line_number(true)
+        .with_level(true)
+        .with_writer(io::stdout)
+        .with_span_events(fmt::format::FmtSpan::FULL);
+
+    let file_layer = fmt::layer()
+        .with_timer(local_time.clone())
+        .with_target(true)
+        .with_ansi(false)
+        .with_level(true)
+        .with_writer(non_blocking.clone())
+        .with_span_events(fmt::format::FmtSpan::FULL);
+
+    let reporting_layer = ReportingLayer {
+        account_name
+    };
+
+    let layer = tracing_subscriber::Registry::default()
+        .with(fmt_layer)
+        .with(file_layer)
+        .with(reporting_layer)
+        .with(tracing_subscriber::EnvFilter::new(level));
+
+    tracing::subscriber::set_global_default(layer).unwrap();
+
+    return guard;
+}

+ 26 - 0
global/src/clear_position_result.rs

@@ -0,0 +1,26 @@
+use serde_derive::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub struct ClearPositionResult {
+    pub r_id: String,
+    pub clear_order_num: String,
+    pub clear_order_str: String,
+    pub clear_position_num: String,
+    pub clear_position_str: String,
+    pub clear_other_err: bool,
+    pub clear_other_str: String
+}
+
+impl ClearPositionResult {
+    pub fn new() -> ClearPositionResult {
+        ClearPositionResult{
+            r_id: "".to_string(),
+            clear_order_num: "0".to_string(),
+            clear_order_str: "".to_string(),
+            clear_position_num: "0".to_string(),
+            clear_position_str: "".to_string(),
+            clear_other_err: false,
+            clear_other_str: "".to_string(),
+        }
+    }
+}

+ 3 - 0
global/src/lib.rs

@@ -5,3 +5,6 @@ pub mod trace_stack;
 pub mod export_utils;
 pub mod account_info;
 pub mod cci;
+pub mod clear_position_result;
+pub mod trade;
+pub mod clear_log_utils;

+ 3 - 2
global/src/log_utils.rs

@@ -1,7 +1,7 @@
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io;
-use tracing::{Event, Subscriber, warn};
+use tracing::{Event, info, Subscriber, warn};
 use tracing_appender_timezone::non_blocking::WorkerGuard;
 use tracing_subscriber::{fmt, Layer};
 use tracing_subscriber::layer::{Context, SubscriberExt};
@@ -39,7 +39,8 @@ impl<S> Layer<S> for ReportingLayer
             event.record(&mut visitor);
 
             let msg = format!("account={}, type=error, msg={}", self.account_name.clone(), visitor.message);
-            send_remote_err_log(msg)
+            info!(msg)
+            // send_remote_err_log(msg)
         }
     }
 }

+ 6 - 0
global/src/params.rs

@@ -50,6 +50,10 @@ pub struct Params {
     pub log_level: String,
     // 中控端口
     pub port: u32,
+    // 运行模式 0.正常策略运行, 1.清理挂单及仓位
+    pub run_mode: i8,
+    // 机器人id
+    pub r_id: String,
 }
 
 impl Params {
@@ -96,6 +100,8 @@ impl Params {
             gamma: dec!(0.999),
             log_level: "info".to_string(),
             port: call_port,
+            run_mode: 0,
+            r_id: "-1".to_string()
         };
         Ok(params)
     }

+ 17 - 0
global/src/trade.rs

@@ -0,0 +1,17 @@
+use rust_decimal::Decimal;
+
+pub struct Trade {
+    // 价格
+    pub price: Decimal,
+    // 时间
+    pub time: i64
+}
+
+impl Trade {
+    pub fn new_by_ticker (price: Decimal, time: i64) -> Trade {
+        Trade{
+            price,
+            time
+        }
+    }
+}

+ 36 - 0
src/clear_core_libs.rs

@@ -0,0 +1,36 @@
+
+use std::collections::BTreeMap;
+use std::io::Error;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool};
+use tokio::sync::{mpsc, Mutex};
+use tracing::{info};
+use global::cci::CentralControlInfo;
+use global::params::Params;
+use standard::Order;
+use strategy::clear_core::ClearCore;
+
+pub async fn init(params: Params,
+                  _ws_running: Arc<AtomicBool>,
+                  running: Arc<AtomicBool>,
+                  cci_arc: Arc<Mutex<CentralControlInfo>>) -> ClearCore {
+    // 封装
+    let mut exchange_params:BTreeMap<String, String> = BTreeMap::new();
+    exchange_params.insert("access_key".to_string(), params.access_key.clone());
+    exchange_params.insert("secret_key".to_string(), params.secret_key.clone());
+    exchange_params.insert("pass_key".to_string(), params.pass_key.clone());
+
+    let (order_sender, _order_receiver) = mpsc::channel::<Order>(100);
+    let (error_sender, _error_receiver) = mpsc::channel::<Error>(100);
+
+    let mut core_obj = ClearCore::new(params.exchange.clone(),
+                                 params.clone(),
+                                 exchange_params.clone(),
+                                 order_sender.clone(),
+                                 error_sender.clone(),
+                                 running.clone(),
+                                 cci_arc.clone()).await;
+    info!("清仓检查程序ClearCore初始化……");
+    core_obj.before_trade().await;
+    return core_obj;
+}

+ 21 - 0
src/core_libs.rs

@@ -124,5 +124,26 @@ pub async fn init(params: Params,
         }
     });
 
+    // 定时仓位检测
+    // let markt_price_core_arc = core_arc.clone();
+    // tokio::spawn(async move {
+    //     info!("rest仓位检测定时任务启动(5s)...");
+    //     loop {
+    //         tokio::time::sleep(Duration::from_secs(5)).await;
+    //
+    //         let mut core = markt_price_core_arc.lock().await;
+    //         match core.platform_rest.get_positions().await {
+    //             Ok(pos) => {
+    //                 if pos.len() > 0 {
+    //                     core.update_position(pos).await;
+    //                 }
+    //             },
+    //             Err(err) => {
+    //                 error!("rest持仓数据获取异常 {}", err);
+    //             }
+    //         };
+    //     }
+    // });
+
     return core_arc;
 }

+ 85 - 27
src/main.rs

@@ -1,13 +1,14 @@
 mod server;
 mod control_c;
 mod core_libs;
+mod clear_core_libs;
 
 use std::str::FromStr;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use tokio::sync::Mutex;
-use tracing::{info, warn};
+use tracing::{error, info, warn};
 use tracing_appender_timezone::non_blocking::WorkerGuard;
 use global::cci::CentralControlInfo;
 use global::log_utils::{send_remote_err_log};
@@ -15,8 +16,16 @@ use global::params::Params;
 
 // 日志级别配置
 fn log_level_init(log_str: String, port: u32, account_name: String) -> WorkerGuard {
+    let log = global::log_utils::final_init(log_str.as_str(), port, account_name);
     info!("日志级别读取成功:{}。", log_str);
-    global::log_utils::final_init(log_str.as_str(), port, account_name)
+    return log;
+}
+
+// 清仓程序日志级别配置
+fn clear_log_level_init(log_str: String, port: u32, account_name: String) -> WorkerGuard {
+    let log = global::clear_log_utils::final_init(log_str.as_str(), port, account_name);
+    info!("清仓程序日志级别读取成功:{}。", log_str);
+    return log;
 }
 
 // 获取本地配置
@@ -44,6 +53,9 @@ fn log_level_init(log_str: String, port: u32, account_name: String) -> WorkerGua
 fn read_params_json() -> Params {
     let mut path = "config.json";
     let mut call_port = 5555;
+    // 运行模式 0.正常策略运行, 1.清理挂单及仓位
+    let mut run_mode = 0;
+    let mut r_id = "-1".to_string();
 
     let args: Vec<String> = std::env::args().collect();
 
@@ -66,10 +78,40 @@ fn read_params_json() -> Params {
                 }
             }
         }
-    }
 
+        // Check for the --port argument and assign its value.
+        if arg.starts_with("--run_mode=") {
+            let parts: Vec<&str> = arg.split('=').collect();
+            if parts.len() == 2 {
+                match u32::from_str(parts[1]) {
+                    Ok(num) => run_mode = num,
+                    Err(_) => eprintln!("Invalid number for run_mode: {}", parts[1]),
+                }
+            }
+        }
+
+        //上报ID
+        if arg.starts_with("--r_id=") {
+            let parts: Vec<&str> = arg.split('=').collect();
+            if parts.len() == 2 {
+                r_id = parts[1].to_string();
+            } else {
+                error!("启动失败,回执单id参数格式设置错误 --check_id=xxx!");
+                panic!("启动失败,回执单id参数格式设置错误 --check_id=xxx!");
+            }
+        }
+    }
     println!("通讯端口:{}, 配置文件路径:{}", call_port, path);
-    let params = Params::new_json(path, call_port).unwrap();
+    println!("请求指令携带参数:{:?}",args.clone());
+    let mut params = Params::new_json(path, call_port).unwrap();
+    if run_mode == 1 {
+        params.run_mode = 1;
+        if r_id == "-1" {
+            error!("启动失败,缺少回执单id参数!");
+            panic!("启动失败,缺少回执单id参数!");
+        }
+    }
+    params.r_id = r_id;
     return params;
 }
 
@@ -80,7 +122,13 @@ async fn main() {
     let params = read_params_json();
 
     // 日志级别配置
-    let _guard = log_level_init(params.log_level.clone(), params.port.clone(), params.account_name.clone());
+    let _guard;
+    if params.run_mode == 1 {
+        _guard = clear_log_level_init(params.log_level.clone(), params.port.clone(), params.account_name.clone());
+    } else {
+        _guard = log_level_init(params.log_level.clone(), params.port.clone(), params.account_name.clone());
+    }
+
     info!("--------------------------------程序开始执行-----------------------------");
     info!("配置读取成功:{:?}。", params);
     // 主进程控制
@@ -109,28 +157,38 @@ async fn main() {
 
     // ws退出程序
     let ws_running = Arc::new(AtomicBool::new(true));
-    // core初始化动作
-    let core_arc = core_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
-    // 初始化中控服务
-    server::run_server(params.port.clone(), running.clone(), cci_arc.clone());
-    // ctrl c退出检查程序
-    control_c::exit_handler(running.clone());
-
-    // 每一秒检查一次程序是否结束
-    while running.load(Ordering::Relaxed) {
+    if params.run_mode == 1 {
+        // core初始化动作
+        let mut core_arc = clear_core_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
+        info!("开始执行清仓程序");
+        core_arc.exit(params.r_id).await;
+        info!("清仓程序执行完毕");
+        // 强制退出
+        std::process::exit(0);
+    } else {
+        // core初始化动作
+        let core_arc = core_libs::init(params.clone(), ws_running.clone(), running.clone(), cci_arc.clone()).await;
+        // 初始化中控服务
+        server::run_server(params.port.clone(), running.clone(), cci_arc.clone());
+        // ctrl c退出检查程序
+        control_c::exit_handler(running.clone());
+
+        // 每一秒检查一次程序是否结束
+        while running.load(Ordering::Relaxed) {
+            tokio::time::sleep(Duration::from_secs(1)).await;
+        }
+
+        info!("检测到退出信号!停止ws订阅……");
+        ws_running.store(false, Ordering::Relaxed);
         tokio::time::sleep(Duration::from_secs(1)).await;
-    }
 
-    info!("检测到退出信号!停止ws订阅……");
-    ws_running.store(false, Ordering::Relaxed);
-    tokio::time::sleep(Duration::from_secs(1)).await;
-
-    info!("等待清空仓位、订单(再次按control c可以立马结束)……");
-    let mut core = core_arc.lock().await;
-    core.exit().await;
-    info!("程序已退出!为以防万一,请再次检查仓位和订单!");
-    // 等两秒,等中控反应过来
-    tokio::time::sleep(Duration::from_secs(2)).await;
-    // 强制退出
-    std::process::exit(0);
+        info!("等待清空仓位、订单(再次按control c可以立马结束)……");
+        let mut core = core_arc.lock().await;
+        core.exit().await;
+        info!("程序已退出!为以防万一,请再次检查仓位和订单!");
+        // 等两秒,等中控反应过来
+        tokio::time::sleep(Duration::from_secs(2)).await;
+        // 强制退出
+        std::process::exit(0);
+    }
 }

+ 1 - 0
src/server.rs

@@ -6,6 +6,7 @@ use tokio::sync::Mutex;
 use tracing::{info};
 use global::cci::CentralControlInfo;
 
+// arcs
 #[derive(Clone)]
 struct Arcs {
     running: Arc<AtomicBool>,

+ 24 - 20
standard/src/coinex_swap.rs

@@ -604,28 +604,32 @@ impl Platform for CoinexSwap {
             let handle = spawn(async move {
                 // info!("数量 {},方向 {},价格 {},c_id {}", amount, side, price, cid);
                 ts.on_before_send();
-                let result = self_clone.take_order(&cid, &side, price, amount).await;
+                let mut result = self_clone.take_order(&cid, &side, price, amount).await.unwrap();
                 ts.on_after_send();
 
-                match result {
-                    Ok(mut result) => {
-                        result.trace_stack = ts;
-                        // info!("数量 {},方向 {},价格 {},c_id {}", amount, side, price, cid);
-                        self_clone.order_sender.send(result).await.unwrap();
-                    }
-                    Err(error) => {
-                        let mut err_order = Order::new();
-                        err_order.custom_id = cid.clone();
-                        err_order.status = "REMOVE".to_string();
-                        // info!("err 数量 {},方向 {},价格 {},c_id {}", amount, side, price, cid);
-                        self_clone.order_sender.send(err_order).await.unwrap();
-                        self_clone.error_sender.send(error).await.unwrap();
-                        // 触发限频
-                        // if error_info.to_string().contains("213:Please don't try too frequently") {
-                        //     Err(Error::new(ErrorKind::Other, "触发限频, 请调整下单频率"))
-                        // }
-                    }
-                }
+                result.trace_stack = ts;
+                // info!("数量 {},方向 {},价格 {},c_id {}", amount, side, price, cid);
+                self_clone.order_sender.send(result).await.unwrap();
+
+                // match result {
+                //     Ok(mut result) => {
+                //         result.trace_stack = ts;
+                //         // info!("数量 {},方向 {},价格 {},c_id {}", amount, side, price, cid);
+                //         self_clone.order_sender.send(result).await.unwrap();
+                //     }
+                //     Err(error) => {
+                //         let mut err_order = Order::new();
+                //         err_order.custom_id = cid.clone();
+                //         err_order.status = "REMOVE".to_string();
+                //         info!("coinex下单error 数量: {},方向: {},价格: {},c_id: {}, err: {}", amount, side, price, cid, error);
+                //         self_clone.order_sender.send(err_order).await.unwrap();
+                //         self_clone.error_sender.send(error).await.unwrap();
+                //         // 触发限频
+                //         // if error_info.to_string().contains("213:Please don't try too frequently") {
+                //         //     Err(Error::new(ErrorKind::Other, "触发限频, 请调整下单频率"))
+                //         // }
+                //     }
+                // }
             });
             handles.push(handle)
         }

+ 667 - 0
strategy/src/clear_core.rs

@@ -0,0 +1,667 @@
+use tokio::time::Instant;
+use std::collections::{BTreeMap, HashMap};
+use std::io::Error;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool};
+use std::time::Duration;
+use chrono::{Utc};
+use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
+use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
+use tokio::sync::mpsc::{Sender};
+use tokio::sync::{Mutex};
+use tokio::time::sleep;
+use tracing::{error, info, warn};
+use global::cci::CentralControlInfo;
+use global::clear_position_result::ClearPositionResult;
+use global::params::Params;
+use global::trace_stack::TraceStack;
+use standard::{Account, Market, Order, Platform, Position, PositionModeEnum, SpecialTicker};
+use standard::exchange::{Exchange};
+use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
+
+use crate::model::{LocalPosition, OrderInfo};
+use crate::predictor::Predictor;
+use crate::strategy::Strategy;
+use crate::utils;
+use crate::utils::clip;
+
+
+pub struct ClearCore {
+    pub params: Params,
+    // 启动时间
+    pub start_time: i64,
+    // 币对
+    pub symbol: String,
+    // 基础货币
+    pub base: String,
+    // 报价货币
+    pub quote: String,
+    //
+    pub strategy: Strategy,
+    // 本地挂单表
+    pub local_orders: HashMap<String, OrderInfo>,
+    // 本地订单缓存队列
+    pub local_orders_backup: HashMap<String, OrderInfo>,
+    // 本地订单缓存cid队列
+    pub local_orders_backup_cid: Vec<String>,
+    // 本地已处理cid缓存队列
+    pub handled_orders_cid: Vec<String>,
+    // 本地利润值
+    pub local_profit: Decimal,
+    // 本地U保证金
+    pub local_cash: Decimal,
+    // 本地币保证金
+    pub local_coin: Decimal,
+    // 仓位信息
+    pub local_position: LocalPosition,
+    // 仓位信息-来自订单
+    pub local_position_by_orders: LocalPosition,
+    //
+    pub local_buy_amount: Decimal,
+    pub local_sell_amount: Decimal,
+    pub local_buy_value: Decimal,
+    pub local_sell_value: Decimal,
+    pub local_cancel_log: HashMap<String, i64>,
+    pub interval: u64,
+    pub exchange: String,
+    pub exit_msg: String,
+    // 仓位检查结果序列
+    pub position_check_series: Vec<i8>,
+    // 止损大小
+    pub stop_loss: Decimal,
+    // 资金使用率
+    pub used_pct: Decimal,
+    // 启停信号 0 表示运行 大于1开始倒计时 1时停机
+    pub mode_signal: i8,
+    // 交易盘口订单流更新时间
+    pub trade_order_update_time: i64,
+    // onTick触发时间记录
+    pub on_tick_event_time: i64,
+    // 盘口ticker信息
+    pub tickers: HashMap<String, SpecialTicker>,
+    // 盘口 depth信息
+    pub depths: HashMap<String, Vec<Decimal>>,
+    // 行情更新延迟监控(风控)
+    pub market_update_time: HashMap<String, i64>,
+    pub market_update_interval: HashMap<String, Decimal>,
+    pub ref_num: i8,
+    pub ref_name: Vec<String>,
+    pub trade_name: String,
+    pub ready: i8,
+    pub predictor: Predictor,
+    pub market: Market,
+    pub platform_rest: Box<dyn Platform + Send + Sync>,
+    // 市场最优买卖价
+    pub max_buy_min_sell_cache: HashMap<String, Vec<Decimal>>,
+    // 最近一次的depth信息
+    pub local_depths: HashMap<String, Vec<Decimal>>,
+    pub is_update: HashMap<String, bool>,
+    pub running: Arc<AtomicBool>,
+    pub hold_coin: Decimal,
+
+    // 打印限频
+    pub prev_log_ready_timestamp: i64,
+    pub log_ready_log_interval: i64,
+
+    // 中控
+    pub cci_arc: Arc<Mutex<CentralControlInfo>>,            // 中控信息汇集
+
+    // 老版的trader_msg留下来的
+    pub agg_market: Vec<Decimal>,
+    pub ref_price: Vec<Vec<Decimal>>,
+    pub predict: Decimal,
+}
+
+impl ClearCore {
+    pub async fn new(exchange: String,
+                     params: Params,
+                     exchange_params: BTreeMap<String, String>,
+                     order_sender: Sender<Order>,
+                     error_sender: Sender<Error>,
+                     running: Arc<AtomicBool>,
+                     cci_arc: Arc<Mutex<CentralControlInfo>>) -> ClearCore {
+        let symbol = params.pair.clone();
+        let pairs: Vec<&str> = params.pair.split('_').collect();
+        let mut core_obj = ClearCore {
+            params: params.clone(),
+            start_time: 0,
+            symbol: symbol.clone(),
+            base: pairs[0].to_string(),
+            quote: pairs[1].to_string(),
+            // 现货底仓
+            hold_coin: clip(params.hold_coin, Decimal::ZERO, Decimal::ONE_HUNDRED * Decimal::ONE_HUNDRED),
+            strategy: Strategy::new(&params, true),
+            local_orders: Default::default(),
+            local_orders_backup: Default::default(),
+            local_orders_backup_cid: Default::default(),
+            handled_orders_cid: Default::default(),
+            local_profit: Default::default(),
+            local_cash: Default::default(),
+            local_coin: Default::default(),
+            local_position: LocalPosition {
+                long_pos: Default::default(),
+                short_pos: Default::default(),
+                long_avg: Default::default(),
+                short_avg: Default::default(),
+            },
+            local_position_by_orders: LocalPosition {
+                long_pos: Default::default(),
+                short_pos: Default::default(),
+                long_avg: Default::default(),
+                short_avg: Default::default(),
+            },
+            local_buy_amount: Default::default(),
+            local_sell_amount: Default::default(),
+            local_buy_value: Default::default(),
+            local_sell_value: Default::default(),
+            local_cancel_log: Default::default(),
+            interval: params.interval,
+            exchange: params.exchange,
+            exit_msg: "正常退出".to_string(),
+            position_check_series: Default::default(),
+            stop_loss: params.stop_loss,
+            used_pct: dec!(0.95),
+            mode_signal: 0,
+            trade_order_update_time: Utc::now().timestamp_millis(),
+            on_tick_event_time: Utc::now().timestamp_millis(),
+            tickers: Default::default(),
+            depths: Default::default(),
+            market_update_time: Default::default(),
+            market_update_interval: Default::default(),
+            ref_num: params.ref_exchange.len() as i8,
+            ref_name: Default::default(),
+            trade_name: "".to_string(),
+            ready: 0,
+            predictor: Predictor {
+                loop_count: 0,
+                market_info_list: vec![],
+                mid_price_list: vec![],
+                ref_mid_price_per_exchange_per_frame: vec![],
+                ref_exchange_length: 0,
+                data_length_max: 0,
+                alpha: vec![],
+                gamma: Default::default(),
+                avg_spread_list: vec![],
+            },
+            market: Market {
+                symbol: symbol.clone(),
+                base_asset: "".to_string(),
+                quote_asset: "".to_string(),
+                tick_size: Default::default(),
+                price_precision: Default::default(),
+                amount_precision: Default::default(),
+                min_qty: Default::default(),
+                max_qty: Default::default(),
+                min_notional: Default::default(),
+                max_notional: Default::default(),
+                ct_val: Default::default(),
+                amount_size: Default::default(),
+            },
+            platform_rest: match exchange.as_str() {
+                "kucoin_usdt_swap" => {
+                    Exchange::new(KucoinSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                "gate_usdt_swap" => {
+                    Exchange::new(GateSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                // "gate_usdt_spot" => {
+                //     Exchange::new(GateSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                "binance_usdt_swap" => {
+                    Exchange::new(BinanceSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                // "binance_spot" => {
+                //     Exchange::new(BinanceSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                // "bitget_spot" => {
+                //     Exchange::new(BitgetSpot, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                "bitget_usdt_swap" => {
+                    Exchange::new(BitgetSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                // "okex_usdt_swap" => {
+                //     Exchange::new(OkxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                // }
+                "bybit_usdt_swap" => {
+                    Exchange::new(BybitSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                "coinex_usdt_swap" => {
+                    Exchange::new(CoinexSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                "htx_usdt_swap" => {
+                    Exchange::new(HtxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
+                _ => {
+                    error!("203未找到对应的交易所rest枚举!");
+                    panic!("203未找到对应的交易所rest枚举!");
+                }
+            },
+            max_buy_min_sell_cache: Default::default(),
+            local_depths: Default::default(),
+            is_update: Default::default(),
+            running,
+            prev_log_ready_timestamp: 0,
+            log_ready_log_interval: 10 * 1000,
+            cci_arc,
+            agg_market: vec![],
+            ref_price: vec![],
+            predict: Default::default(),
+        };
+        for i in 0..=params.ref_exchange.len() - 1 {
+            // 拼接不会消耗原字符串
+            let tickers_key: String = format!("{}{}{}{}", params.ref_exchange[i], "@", params.ref_pair[i], "@ref");
+            let ref_name_element = tickers_key.clone();
+            let depths_key: String = tickers_key.clone();
+            let market_update_time_key = tickers_key.clone();
+            let market_update_interval_key = tickers_key.clone();
+            let max_buy_min_sell_cache_key = tickers_key.clone();
+
+            core_obj.tickers.insert(tickers_key, SpecialTicker::new());
+            core_obj.ref_name.push(ref_name_element);
+            core_obj.depths.insert(depths_key, Default::default());
+            core_obj.market_update_time.insert(market_update_time_key, Default::default());
+            core_obj.market_update_interval.insert(market_update_interval_key, Default::default());
+            core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
+        }
+        let name = format!("{}{}{}", core_obj.exchange.clone(), "@", core_obj.symbol);
+        let market_update_time_key = name.clone();
+        let market_update_interval_key = name.clone();
+        let tickers_key = name.clone();
+        let depths_key = name.clone();
+        let max_buy_min_sell_cache_key = name.clone();
+        core_obj.trade_name = name;
+        core_obj.market_update_time.insert(market_update_time_key, Default::default());
+        core_obj.market_update_interval.insert(market_update_interval_key, Default::default());
+        core_obj.tickers.insert(tickers_key, SpecialTicker::new());
+        core_obj.depths.insert(depths_key, Default::default());
+        core_obj.max_buy_min_sell_cache.insert(max_buy_min_sell_cache_key, vec![Decimal::ZERO, Decimal::ZERO]);
+        // broker.newWs
+        let mut price_alpha: Vec<Decimal> = Vec::new();
+        for ref_pair_str in params.ref_pair {
+            if params.pair.contains("1000") && !ref_pair_str.contains("1000") {
+                price_alpha.push(dec!(1000.0));
+            } else if !params.pair.contains("1000") && ref_pair_str.contains("1000") {
+                price_alpha.push(dec!(0.001))
+            } else {
+                price_alpha.push(dec!(1.0));
+            }
+        }
+        info!("价格系数:{:?}", price_alpha);
+        core_obj.predictor = Predictor::new(core_obj.ref_name.len())
+            .alpha(price_alpha)
+            .gamma(params.gamma);
+
+        return core_obj;
+    }
+
+    pub fn log_ready_status(&mut self, msg: String) {
+        // 隔一会再打印未准备就绪的台词
+        let now_timestamp = Utc::now().timestamp_millis();
+        if now_timestamp - self.prev_log_ready_timestamp > self.log_ready_log_interval {
+            self.prev_log_ready_timestamp = now_timestamp;
+            info!("{}", msg);
+        }
+    }
+
+
+    // #[instrument(skip(self, data), level="TRACE")]
+    pub async fn update_position(&mut self, data: Vec<Position>) {
+        if data.is_empty() {
+            return;
+        }
+        let mut position = LocalPosition::new();
+        for pos in &data {
+            if pos.position_mode == PositionModeEnum::Long {
+                position.long_pos = pos.amount;
+                position.long_avg = pos.price;
+            } else if pos.position_mode == PositionModeEnum::Short {
+                position.short_pos = pos.amount.abs();
+                position.short_avg = pos.price;
+            }
+        }
+        // 更新仓位信息
+        if position != self.local_position {
+            info!("收到新的仓位推送, position: {:?}", data);
+            info!("更新本地仓位:{:?}", position);
+            self.local_position = position;
+        }
+
+        // 更新中控持仓相关的信息
+        {
+            let mut pos = self.local_position_by_orders.long_pos - self.local_position_by_orders.short_pos;
+            if !self.exchange.contains("spot") {
+                pos = self.local_position.long_pos - self.local_position.short_pos;
+            }
+            pos.rescale(8);
+
+            let mut entry_price;
+            if pos.gt(&Decimal::ZERO) {
+                entry_price = self.local_position_by_orders.long_avg;
+            } else {
+                entry_price = self.local_position_by_orders.short_avg;
+            }
+            entry_price.rescale(8);
+
+            let mut cci = self.cci_arc.lock().await;
+            cci.pos = pos;
+            cci.entry_price = entry_price;
+        }
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn get_exchange_info(&mut self) {
+        self.market = self.platform_rest.get_self_market();
+        info!(?self.market);
+    }
+
+    // #[instrument(skip(self, data), level="TRACE")]
+    pub async fn update_equity(&mut self, data: Account) {
+        /*
+           更新保证金信息
+           合约一直更新
+           现货只有当出现异常时更新
+       */
+        if self.exchange.contains("spot") {
+            return;
+        }
+        self.local_cash = data.balance * self.used_pct;
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn update_equity_rest_swap(&mut self) {
+        match self.platform_rest.get_account().await {
+            Ok(account) => {
+                /*
+                   更新保证金信息
+                   合约一直更新
+                   现货只有当出现异常时更新
+               */
+                self.local_cash = account.balance * self.used_pct
+            }
+            Err(e) => {
+                info!("获取账户信息错误: {:?}", e);
+            }
+        }
+    }
+
+    pub async fn update_position_rest_swap(&mut self) {
+        let position = self.platform_rest.get_position().await;
+        match position {
+            Ok(val) => {
+                // info!("bybit_swap:定时获取的仓位信息");
+                self.update_position(val).await;
+            }
+            Err(err) => {
+                error!("bybit_swap:定时获取仓位信息错误!\nget_position:res_data={:?}", err);
+            }
+        }
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn update_equity_rest_spot(&mut self) {
+        match self.platform_rest.get_spot_account().await {
+            Ok(mut val) => {
+                // 如果返回的数组里没有交易货币,则补充交易货币
+                if !val.iter().any(|a| a.coin.to_uppercase().eq(&self.base.to_uppercase())) {
+                    let mut base_coin_account = Account::new();
+                    base_coin_account.coin = self.base.to_uppercase();
+                    val.push(base_coin_account);
+                }
+
+                for account in val {
+                    // 交易货币
+                    if self.base.to_uppercase() == account.coin {
+                        self.local_coin = account.balance;
+                    }
+                    // 本位货币
+                    if self.quote.to_uppercase() == account.coin {
+                        self.local_cash = account.balance;
+                    }
+                }
+            }
+            Err(err) => {
+                error!("获取仓位信息异常: {}", err);
+            }
+        }
+    }
+
+    // #[instrument(skip(self, target_hold_coin), level="TRACE")]
+    pub async fn check_position(&mut self) -> ClearPositionResult {
+        let mut result = ClearPositionResult::new();
+        info!("------------------------------------------------------------------------------------------------------------");
+        info!("步骤一:检查挂单:");
+        match self.platform_rest.cancel_orders_all().await {
+            Ok(val) => {
+                let length = val.len();
+                result.clear_order_num = length.to_string();
+                result.clear_order_str = format!("清空所有挂单:{:?}", val);
+                if length > 0{
+                    info!("已清空所有挂单({}条)", length);
+                    for o in val {
+                        info!("    {:?}", o);
+                    }
+                }
+            }
+            Err(err) => {
+                warn!("取消所有订单异常({}),启动备用方法。", err);
+                match self.platform_rest.cancel_orders().await {
+                    Ok(val) => {
+                        let length = val.len();
+                        result.clear_order_num = length.to_string();
+                        result.clear_order_str = format!("清空所有挂单(备用):{:?}", val);
+                        if length > 0{
+                            info!("清空所有挂单({}条):{:?}", length, val);
+                        }
+                    }
+                    Err(exc) => {
+                        result.clear_order_str = exc.to_string();
+                        result.clear_other_err = true;
+                        error!("清空当前币对订单异常: {}", exc);
+                    }
+                }
+            }
+        }
+        info!("挂单检查完毕。");
+        info!("");
+
+        info!("步骤二:检查仓位:");
+        match self.platform_rest.get_positions().await {
+            Ok(val) => {
+                info!("检查仓位信息");
+                let mut position_num = 0;
+                for position in val {
+                    if position.amount.eq(&Decimal::ZERO) {
+                        continue;
+                    }
+                    position_num = position_num + 1;
+                    info!("    仓位:{:?}", position);
+                    let price = Decimal::ZERO;
+                    let side;
+                    info!(?position);
+                    match position.position_mode {
+                        PositionModeEnum::Long => {
+                            // pd
+                            side = "pd";
+                        }
+                        PositionModeEnum::Short => {
+                            // pk
+                            side = "pk";
+                        }
+                        _ => {
+                            error!("    仓位position_mode匹配失败,不做操作!");
+                            // 执行完当前币对  结束循环
+                            continue;
+                        }
+                    }
+                    // 发起清仓订单
+                    let mut ts = TraceStack::new(0, Instant::now());
+                    ts.on_before_send();
+
+                    // 市价单
+                    match self.platform_rest.take_order_symbol(position.symbol.clone(),
+                                                               Decimal::ONE,
+                                                               utils::generate_client_id(None).as_str(),
+                                                               side,
+                                                               price,
+                                                               position.amount.abs()).await {
+                        Ok(order) => {
+                            ts.on_after_send();
+                            info!("    {}仓位清除市价下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
+                            result.clear_position_str = format!("{} >仓位信息:{:?} 下单信息: {:?}",result.clear_position_str, position, order);
+                            // 执行完当前币对  结束循环
+                            continue;
+                        }
+                        Err(error) => {
+                            // ts.on_after_send();
+                            error!("    {}仓位清除市价下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
+                            result.clear_other_str = format!("{} >仓位信息:{:?} 下单异常信息: {:?}",result.clear_other_str, position, error);
+                            // 执行完当前币对  结束循环
+                            continue;
+                        }
+                    };
+                }
+                result.clear_position_num = position_num.to_string();
+            }
+            Err(error) => {
+                result.clear_other_err = true;
+                result.clear_position_str = format!("获取仓位异常 {}", error);
+                error!("获取仓位信息异常: {}", error);
+            }
+        }
+        info!("------------------------------------------------------------------------------------------------------------");
+        info!("");
+
+        return result;
+    }
+
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn exit(&mut self, r_id: String) -> bool {
+        let mut result = self.check_position().await;
+        // 设置机器人id
+        result.r_id = r_id;
+        info!("清仓程序结果 {:?}", result);
+        // 判断是否有清仓,是否有异常
+        if result.clear_position_num != "0" || result.clear_order_num != "0" || result.clear_other_err{
+            info!("上报了清仓信息!!!");
+            send_clear_msg_request(&result).await;
+            // 上报清仓日志
+        }
+
+        info!("订单、仓位自动清除完毕。");
+        return true;
+    }
+
+    // #[instrument(skip(self), level="TRACE")]
+    pub async fn before_trade(&mut self) -> bool {
+        sleep(Duration::from_secs(1)).await;
+        // 获取市场信息
+        self.get_exchange_info().await;
+        // 获取价格信息
+        let ticker = self.platform_rest.get_ticker().await.expect("获取价格信息异常!");
+        info!(?ticker);
+        let mp = (ticker.buy + ticker.sell) / Decimal::TWO;
+        // 获取账户信息
+        if self.exchange.contains("spot") {
+            self.update_equity_rest_spot().await;
+        } else {
+            self.update_equity_rest_swap().await;
+        }
+        // 更新中控账户相关信息
+        {
+            let mut now_balance = self.local_cash / self.used_pct;
+            now_balance.rescale(4);
+
+            let mut cci = self.cci_arc.lock().await;
+            cci.now_balance = now_balance;
+        }
+        // 初始资金
+        let start_cash = self.local_cash.clone();
+        let start_coin = self.local_coin.clone();
+        if start_cash.is_zero() && start_coin.is_zero() {
+            self.exit_msg = format!("{}{}{}{}", "初始余额为零 cash: ", start_cash, " coin: ", start_coin);
+        }
+        info!("初始cash: {start_cash} 初始coin: {start_coin}");
+        // 初始化策略基础信息
+        if mp <= Decimal::ZERO {
+            self.exit_msg = format!("{}{}", "初始价格获取错误: ", mp);
+            return false;
+        } else {
+            info!("初始价格为 {}", mp);
+        }
+        self.strategy.mp = mp.clone();
+        self.strategy.start_cash = start_cash.clone();
+        self.strategy.start_coin = start_coin.clone();
+        self.strategy.start_equity = start_cash + start_coin * mp;
+        self.strategy.max_equity = self.strategy.start_equity.clone();
+        self.strategy.equity = self.strategy.start_equity.clone();
+        self.strategy.total_amount = self.strategy.equity * self.strategy.lever_rate / self.strategy.mp;
+        // 获取数量精度
+        self.strategy.step_size = self.market.amount_size.clone();
+        if self.strategy.step_size > Decimal::ONE {
+            self.strategy.step_size = self.strategy.step_size.trunc();
+        }
+        // 获取价格精度
+        self.strategy.tick_size = self.market.tick_size.clone();
+        if self.strategy.tick_size > Decimal::ONE {
+            self.strategy.tick_size = self.strategy.tick_size.trunc();
+        }
+        if self.strategy.step_size.is_zero() || self.strategy.tick_size.is_zero() {
+            self.exit_msg = format!("{}{}{}{}", "交易精度未正常获取 step_size: ", self.strategy.step_size, " tick_size:", self.strategy.tick_size);
+            return false;
+        } else {
+            info!("数量精度 {}", self.strategy.step_size);
+            info!("价格精度 {}", self.strategy.tick_size);
+        }
+
+        // 初始化调度器
+        self.local_cash = start_cash;
+        self.local_coin = start_coin;
+
+        // 买入平台币
+        if self.exchange.contains("spot") { // 现货
+
+        }
+
+        // 清空挂单和仓位
+        return true;
+    }
+}
+
+// 清仓消息上报中控
+pub async fn send_clear_msg_request(body_params: &ClearPositionResult) {
+    // 创建客户端
+    let client = reqwest::Client::new();
+
+    // 创建请求头
+    let mut headers = HeaderMap::new();
+    headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
+    headers.insert("report-token", HeaderValue::from_static("r7T$8gBV!f&L@E2+"));
+    headers.insert("auth", HeaderValue::from_static("4L"));
+
+    let body = serde_json::to_string(&body_params).unwrap();
+
+    // 发送 POST 请求
+    let res = client
+        .post("https://4lapi.skyfffire.com/api/report/searchPositions")
+        .body(body)
+        .headers(headers)
+        .send()
+        .await;
+    match res {
+        Ok(response) => {
+            let status = response.status();
+            let response_text = response.text().await.unwrap_or("获取请求的响应文本异常".to_string());
+            // 检查响应状态并读取响应体
+            if status.is_success() {
+                info!("清仓结果上报中控,请求成功,响应文本: {}", response_text);
+            } else {
+                println!("清仓结果上报中控,请求失败: 响应异常码 {},响应文本 {}", status, response_text);
+            }
+        },
+        Err(e) => {
+            error!("清仓结果上报中控,请求发送失败,异常:{}", e)
+        }
+    }
+}

+ 1 - 4
strategy/src/coinex_usdt_swap.rs

@@ -1,4 +1,4 @@
-use tracing::{error, info};
+use tracing::{error};
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
@@ -98,7 +98,6 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
         }
         "order.update" => {
             trace_stack.set_source("coinex_swap.orders".to_string());
-            info!("订单推送: {}", response.data);
             let orders = standard::handle_info::HandleSwapInfo::handle_order(CoinexSwap, response.clone(), multiplier.clone());
             let mut order_infos:Vec<OrderInfo> = Vec::new();
             for mut order in orders.order {
@@ -133,8 +132,6 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
             if new_order.status != "REMOVE" || new_order.filled == Decimal::ZERO {
                 core.update_order(order_infos, trace_stack).await;
                 return
-            } else {
-                error!("不做操作的订单: {:?}", new_order);
             }
 
             // 单向持仓的处理

+ 125 - 6
strategy/src/core.rs

@@ -1,13 +1,13 @@
 use tokio::time::Instant;
 use std::cmp::max;
-use std::collections::{BTreeMap, HashMap};
+use std::collections::{BTreeMap, HashMap, VecDeque};
 use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use chrono::{Utc};
-use rust_decimal::Decimal;
+use rust_decimal::{Decimal, MathematicalOps};
 use rust_decimal::prelude::{ToPrimitive};
 use rust_decimal_macros::dec;
 use tokio::spawn;
@@ -20,6 +20,7 @@ use global::cci::CentralControlInfo;
 use global::params::Params;
 use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
+use global::trade::Trade;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
@@ -115,6 +116,12 @@ pub struct Core {
     pub agg_market: Vec<Decimal>,
     pub ref_price: Vec<Vec<Decimal>>,
     pub predict: Decimal,
+
+    // 波动率指标相关数据(最近100条)
+    pub trade_vec: VecDeque<Trade>,  // 行情数据 此处价格取买一卖一中间价,时间为交易所的数据创建时间
+    pub sigma_vec: VecDeque<Decimal>,  // 波动率记录
+    pub is_sigma_abnormal: bool,  // 是否sigma反常
+    pub is_sigma_allow_open: bool, // 是否允许开单
 }
 
 impl Core {
@@ -251,6 +258,10 @@ impl Core {
             agg_market: vec![],
             ref_price: vec![],
             predict: Default::default(),
+            trade_vec: VecDeque::with_capacity(100),
+            sigma_vec: VecDeque::with_capacity(100),
+            is_sigma_abnormal: false,
+            is_sigma_allow_open: true,
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -439,6 +450,7 @@ impl Core {
                                 info!("错误的仓位方向{}", side);
                             }
                         } else { // 合约订单流仓位计算
+                            let mut this_order_profit = Decimal::ZERO;
                             if side == "kd" { // buy 开多
                                 self.local_buy_amount += filled;
                                 self.local_buy_value += filled * filled_price;
@@ -465,7 +477,8 @@ impl Core {
                             } else if side == "pd" { // sell 平多
                                 self.local_sell_amount += filled;
                                 self.local_sell_value += filled * filled_price;
-                                self.local_profit += filled * (filled_price - self.local_position_by_orders.long_avg);
+                                this_order_profit = filled * (filled_price - self.local_position_by_orders.long_avg);
+                                self.local_profit += this_order_profit;
                                 self.local_position_by_orders.long_pos = self.local_position_by_orders.long_pos - filled;
                                 if self.local_position_by_orders.long_pos == Decimal::ZERO {
                                     self.local_position_by_orders.long_avg = Decimal::ZERO;
@@ -473,7 +486,8 @@ impl Core {
                             } else if side == "pk" { // buy 平空
                                 self.local_buy_amount += filled;
                                 self.local_buy_value += filled * filled_price;
-                                self.local_profit += filled * (self.local_position_by_orders.short_avg - filled_price);
+                                this_order_profit = filled * (self.local_position_by_orders.short_avg - filled_price);
+                                self.local_profit += this_order_profit;
                                 self.local_position_by_orders.short_pos = self.local_position_by_orders.short_pos - filled;
                                 if self.local_position_by_orders.short_pos == Decimal::ZERO {
                                     self.local_position_by_orders.short_avg = Decimal::ZERO;
@@ -485,6 +499,14 @@ impl Core {
                             if data.fee > Decimal::ZERO {
                                 self.local_profit -= data.fee;
                             }
+                            // 订单亏损 并且 波动率异常 不允许开单
+                            if this_order_profit < Decimal::ZERO && self.is_sigma_abnormal {
+                                self.is_sigma_allow_open = false;
+                                info!("交易触发亏损,不允许开单!");
+                                info!("sigma_vec:{:?}" , self.sigma_vec);
+                            } else {
+                                self.is_sigma_allow_open = true;
+                            }
                         }
                         // info!("成交单耗时数据:{}", time_record.to_string());
                         info!("更新推算仓位 {:?}", self.local_position_by_orders);
@@ -510,6 +532,7 @@ impl Core {
                                                               &self.local_coin,
                                                               &self.ref_price,
                                                               &self.predict,
+                                                              &self.is_sigma_allow_open,
                                                               &trace_stack.ins);
                         // trace_stack.on_after_strategy();
                         // 记录指令触发信息
@@ -637,6 +660,20 @@ impl Core {
                 self.on_agg_market();
             }
         } else if *name_ref == self.ref_name[0] { // 判断是否为当前跟踪的盘口
+            // 写入行情数据
+            let ticker = self.tickers.get(name_ref).unwrap();
+            if self.trade_vec.len() == 100 {
+                self.trade_vec.pop_front();
+            }
+            self.trade_vec.push_back(Trade::new_by_ticker(ticker.mid_price.clone(), ticker.create_at/1000));
+            // 更新波动率
+            if self.trade_vec.len() > 99 {
+                self.calc_sigma();
+                // 波动率正常,但还是不开单信号,允许开单
+                if !self.is_sigma_abnormal && !self.is_sigma_allow_open {
+                    self.is_sigma_allow_open = true;
+                }
+            }
             // 判断是否需要触发ontick 对行情进行过滤
             // 过滤条件 价格变化很大 时间间隔很长
             let mut flag = 0;
@@ -665,6 +702,7 @@ impl Core {
                                                        &self.local_coin,
                                                        &self.ref_price,
                                                        &self.predict,
+                                                       &self.is_sigma_allow_open,
                                                        &trace_stack.ins);
                 trace_stack.on_after_strategy();
 
@@ -703,6 +741,87 @@ impl Core {
         }
     }
 
+    pub fn calc_sigma(&mut self) {
+        for (index, trade) in self.trade_vec.iter().enumerate() {
+            if index == 0 {
+                continue
+            }
+
+            // 计算过去至多100条数据的sigma值 sigma^2 = (1 / (tn-t0))*sum((S(tk) - S(tk-1)) ^ 2)
+            let mut sigma_index = index - 1;
+            let t_last = Decimal::from_str(&format!("{}", trade.time)).unwrap();
+
+            let mut _t_first = Decimal::from_str(&format!("{}", trade.time)).unwrap();
+            // 右值
+            let mut total_right = Decimal::ZERO;
+            loop {
+                let flag_trade = self.trade_vec.get(sigma_index).unwrap();
+                let next_trade = self.trade_vec.get(sigma_index + 1).unwrap();
+
+                // 下标合法性判断
+                if sigma_index == 0 || sigma_index + 100 <= index {
+                    _t_first = Decimal::from_str(&format!("{}", flag_trade.time)).unwrap();
+                    break;
+                }
+
+                // 计算差值
+                let diff = Decimal::ONE - flag_trade.price / next_trade.price;
+                total_right += diff * diff;
+
+                sigma_index = sigma_index - 1;
+            }
+            let sigma_square = if _t_first == t_last {
+                let time_diff = Decimal::ONE;
+                (Decimal::ONE / time_diff) * total_right
+            } else {
+                let time_diff = (t_last - _t_first) / Decimal::ONE_THOUSAND;
+                (Decimal::ONE / time_diff) * total_right
+            };
+            let mut sigma = sigma_square.sqrt().unwrap();
+            sigma.rescale(6);
+            if self.sigma_vec.len() == 100 {
+                self.sigma_vec.pop_front();
+            }
+            self.sigma_vec.push_back(sigma);
+        }
+        if self.sigma_vec.len() > 99 {
+            let sigma = match self.sigma_vec.back() {
+                Some(&value) => value,
+                None => Decimal::TEN,
+            };
+
+            // 计算过去至多100个sigma值的平均值
+            let sigma_ma = if self.sigma_vec.len() > 0 {
+                let mut sigma_ma_index = self.sigma_vec.len();
+                let mut sigma_total = Decimal::ZERO;
+                let mut sigma_count = Decimal::ZERO;
+                loop {
+                    if sigma_ma_index == 0 || sigma_ma_index + 99 < self.sigma_vec.len() {
+                        break
+                    }
+                    // 步进
+                    sigma_ma_index -= 1;
+                    // 计算
+                    sigma_total += self.sigma_vec[sigma_ma_index];
+                    sigma_count += Decimal::ONE;
+                }
+                let mut sigma_ma = sigma_total / sigma_count;
+                sigma_ma.rescale(6);
+
+                sigma_ma
+            } else {
+                sigma
+            };
+            // sigma值大于平均值定义为波动率异常
+            if sigma > sigma_ma {
+                self.is_sigma_abnormal = true;
+                // info!("sigma: {}, sigma_ma: {}, sigma_vec: {:?}", sigma, sigma_ma, self.sigma_vec);
+            } else {
+                self.is_sigma_abnormal = false;
+            }
+        }
+    }
+
     // #[instrument(skip(self, data), level="TRACE")]
     pub async fn update_position(&mut self, data: Vec<Position>) {
         if data.is_empty() {
@@ -1004,7 +1123,7 @@ impl Core {
             }
 
             // self.position_check_series长度限制
-            if self.position_check_series.len() > 30 {
+            if self.position_check_series.len() > 6 {
                 self.position_check_series.remove(0);
             }
 
@@ -1594,7 +1713,7 @@ impl Core {
             }
 
             // 如果连续5次都检查到清理干净,则表明大概率是清理干净了的
-            if clear_count >= 5 {
+            if clear_count >= 1 {
                 info!("连续{}次清理完成。", clear_count);
                 info!("");
                 break

+ 2 - 1
strategy/src/lib.rs

@@ -14,4 +14,5 @@ mod okx_usdt_swap;
 mod bybit_usdt_swap;
 mod bitget_usdt_swap;
 mod coinex_usdt_swap;
-mod htx_usdt_swap;
+mod htx_usdt_swap;
+pub mod clear_core;

+ 27 - 10
strategy/src/strategy.rs

@@ -105,8 +105,9 @@ pub struct Strategy {
     pub trade_vol_24h_w: Decimal,                                   // 24小时成交额(单位:万)
     pub grid: Decimal,                                              // 网格数量
 
-    // 订单流相关
-    pub side: String,                                               // 当前主动性方向
+    // 速度限制,至少0.5秒才取消订单
+    pub prev_place_order_timestamp: i64,                            // 上次挂单的时间
+    pub min_cancel_interval_mills: i64,                             // 至少要挂这么久才允许撤销
 }
 
 impl Strategy {
@@ -198,7 +199,8 @@ impl Strategy {
             post_side: 0,
             trade_vol_24h_w: Default::default(),
             grid: Decimal::from(params.grid),
-            side: "normal".to_string(),
+            prev_place_order_timestamp: 0,
+            min_cancel_interval_mills: 500,
         };
 
         // 交易名字
@@ -352,8 +354,8 @@ impl Strategy {
     // 耗时700微秒
     // #[instrument(skip(self), level="TRACE")]
     pub fn _print_summary(&mut self) {
-        self.mp.rescale(6);
-        self.ref_price.rescale(6);
+        self.mp.rescale(10);
+        self.ref_price.rescale(10);
         self.equity.rescale(3);
         self.cash.rescale(3);
         let mut value = self.coin * self.mp;
@@ -417,8 +419,8 @@ impl Strategy {
         msg.push_str(format!("[推算利润 {:?}, 盈亏 {:?}%, 做多杠杆 {:?}%, 做多浮盈 {:?}%, 做空杠杆 {:?}%, 做空浮盈 {:?}%], ",
                              self.local_profit, self.profit, long_pos_leverage, self.long_pos_bias, short_pos_leverage, self.short_pos_bias).as_str());
         msg.push_str(format!("[请求 {:?}, 上限{:?}次/10秒], ", self._req_num_per_window, self.limit_order_requests_num).as_str());
-        msg.push_str(format!("[当前参数, 开仓 {:?}, 平仓 {:?}, 方向 {:?}, 参考 {:?}, 模式 {:?}], ",
-                             self.trade_open_dist, self.trade_close_dist, self.side, self.ref_name[self.ref_index], self.maker_mode).as_str());
+        msg.push_str(format!("[当前参数, 开仓 {:?}, 平仓 {:?}, 参考 {:?}, 模式 {:?}], ",
+                             self.trade_open_dist, self.trade_close_dist, self.ref_name[self.ref_index], self.maker_mode).as_str());
         msg.push_str(format!("[挂单列表,共{:?}单, ", o_num).as_str());
         for (_, order) in &self.local_orders {
             let mut order_value = order.amount * self.mp;
@@ -510,7 +512,7 @@ impl Strategy {
         // debug!(?mode, ?buy_start, ?sell_start, ?mp);
 
         // 开仓相关
-        avoid = min(dec!(0.001), open * dec!(0.05));
+        avoid = min(dec!(0.002), open * dec!(0.1));
         // 持仓偏移
         let buy_shift = Decimal::ONE + pos_rate[0] * grid;
         let sell_shift = Decimal::ONE + pos_rate[1] * grid;
@@ -1022,6 +1024,11 @@ impl Strategy {
     // 生成取消订单的指令
     // #[instrument(skip(self, command), level="TRACE")]
     pub fn _cancel_open(&self, command: &mut OrderCommand, local_orders: &HashMap<String, OrderInfo>) {
+        // 强制性时间间隔
+        if self.prev_place_order_timestamp + self.min_cancel_interval_mills > Utc::now().timestamp_millis() {
+            return;
+        }
+
         // debug!(?command);
         // 挂单范围
         let long_upper = self.open_dist[0];
@@ -1244,6 +1251,7 @@ impl Strategy {
                    local_coin: &Decimal,
                    ref_price: &Vec<Vec<Decimal>>,
                    predict: &Decimal,
+                   is_sigma_allow_open: &bool,
                    _ins: &Instant) -> OrderCommand {
         self.on_time_print();
 
@@ -1272,14 +1280,23 @@ impl Strategy {
         // 下单指令处理逻辑
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
         self._post_close(&mut command, local_orders);               // 平仓单命令处理
-        self._post_open(&mut command, local_orders);                // 限价单命令处理
-
+        // 波动率是否允许下单
+        if *is_sigma_allow_open {
+            self._post_open(&mut command, local_orders);                // 限价单命令处理
+        } else {
+            info!("波动率异常订单亏损, 不开单!");
+        }
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter
         self._refresh_request_limit();                              // 刷新频率限制
         self._update_request_num(&mut command);                     // 统计刷新频率
 
+        // 如果提交了订单,则更新最后提交时间
+        if command.limits_open.len() != 0 {
+            self.prev_place_order_timestamp = Utc::now().timestamp_millis();
+        }
+
         // if command.limits_open.len() != 0 || command.limits_close.len() != 0 {
         //     let name = self.params.account_name.clone();
         //     // 参考卖价