skyfffire 11 ヶ月 前
コミット
6c7412eb04
2 ファイル変更49 行追加11 行削除
  1. 48 10
      strategy/src/avellaneda_stoikov.rs
  2. 1 1
      strategy/src/strategy.rs

+ 48 - 10
strategy/src/avellaneda_stoikov.rs

@@ -1,11 +1,15 @@
 use std::cmp::{max, min};
 use std::collections::{BTreeMap, VecDeque};
+use std::path::Path;
 use std::sync::Arc;
 use chrono::Utc;
 use rust_decimal::prelude::*;
 use rust_decimal_macros::dec;
+use tokio::fs;
+use tokio::fs::File;
+use tokio::io::AsyncWriteExt;
 use tokio::sync::Mutex;
-use tracing::info;
+use tracing::{error, info};
 use global::cci::CentralControlInfo;
 use global::fixed_time_range_deque::FixedTimeRangeDeque;
 use global::predictor_state::PredictorState;
@@ -525,9 +529,8 @@ impl AvellanedaStoikov {
 
         let inventory = self.inventory;
         let sigma_square = self.error_rate;
-        // let gamma = now - self.last_update_time;
-        let gamma = self.dir;
-        // let kappa = self.short_trade_len_dec;
+        let gamma = now - self.last_update_time;
+        let kappa = self.dir;
 
         let flow_ratio = Decimal::ZERO;
         let ref_price = self.ref_price;
@@ -540,10 +543,11 @@ impl AvellanedaStoikov {
         self.prev_insert_time = now;
 
         // 将数据放入cci里面,方便读取
+        let cci_arc_clone = cci_arc.clone();
         tokio::spawn(async move {
-            let mut cci = cci_arc.lock().await;
+            let mut cci = cci_arc_clone.lock().await;
 
-            let kappa = Decimal::from(cci.predictor_state_vec.len());
+            // let kappa = Decimal::from(cci.predictor_state_vec.len());
 
             cci.predictor_state_vec.push(PredictorState {
                 update_time: now,
@@ -569,24 +573,58 @@ impl AvellanedaStoikov {
             });
 
             // 长度限定,最大100w条
-            if cci.predictor_state_vec.len() > 1_000_000 {
+            if cci.predictor_state_vec.len() > 10_000 {
                 cci.predictor_state_vec.remove(0);
             }
         });
 
-        let prev_save_time = self.prev_save_time;
         let error_rate = self.error_rate;
         // 将数据存入本地json文件,要求不能在行情烈度比较大的时候执行这个逻辑,防止卡交易逻辑
-        if now - prev_save_time < dec!(60000) && error_rate > dec!(0.15) {
+        if now - self.prev_save_time < dec!(60000) || error_rate > dec!(0.15) {
             return;
         }
+        // 存放逻辑
+        info!("存放, {}, {}, {}", now, self.prev_save_time, now - self.prev_save_time);
+
+
 
         self.prev_save_time = Decimal::from(Utc::now().timestamp_millis());
-        // 存放逻辑
+        let cci = cci_arc.lock().await;
+        let temp_json_str = serde_json::to_string(&cci.predictor_state_vec).unwrap();
+        Self::write_to_file(&temp_json_str, "./db/db.json".to_string()).await;
+        info!("存放完毕, {}", self.prev_save_time - now);
     }
 
     // #[instrument(skip(self, ref_ticker_map), level="TRACE")]
     pub fn get_ref_price(&mut self, _ref_ticker_map: &BTreeMap<String, Ticker>) -> Vec<Vec<Decimal>> {
         vec![]
     }
+
+    pub async fn write_to_file(json_data: &String, file_path: String) {
+        // 尝试创建文件路径
+        if let Err(e) = fs::create_dir_all(
+            // 获取文件目录路径
+            Path::new(&file_path)
+                .parent() // 获取父目录(即文件路径除去文件名后的部分)
+                .unwrap_or_else(|| Path::new("")), // 如果没有父目录,使用当前目录
+        )
+            .await
+        {
+            // 如果创建路径失败,打印错误日志
+            error!("创建目录错误: {:?}", e);
+            return; // 结束任务
+        }
+
+        // 异步地执行文件写入操作
+        if let Err(e) = async {
+            let mut file = File::create(&file_path).await?;
+            file.write_all(json_data.as_bytes()).await?;
+            Result::<(), std::io::Error>::Ok(())
+        }
+            .await
+        {
+            // 如果发生错误,只打印错误日志
+            error!("json db写入错误: {:?}", e);
+        }
+    }
 }

+ 1 - 1
strategy/src/strategy.rs

@@ -1114,7 +1114,7 @@ impl Strategy {
         self.fix_price(predictor);
 
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+        // self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter