Bläddra i källkod

使用了极为先进的通道技术

skyffire 11 månader sedan
förälder
incheckning
6973aa282b
2 ändrade filer med 127 tillägg och 62 borttagningar
  1. 1 1
      src/main.rs
  2. 126 61
      strategy/src/avellaneda_stoikov.rs

+ 1 - 1
src/main.rs

@@ -115,7 +115,7 @@ fn read_params_json() -> Params {
     return params;
 }
 
-#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
+#[tokio::main]
 async fn main() {
     // 日志级别配置
     // let params = read_params();

+ 126 - 61
strategy/src/avellaneda_stoikov.rs

@@ -1,10 +1,14 @@
 use std::cmp::{max, min};
 use std::collections::{BTreeMap, VecDeque};
 use std::sync::Arc;
-use chrono::Utc;
+use std::time::Duration;
+use chrono::{Utc};
+use futures_channel::mpsc::UnboundedSender;
+use futures_util::StreamExt;
 use rust_decimal::prelude::*;
 use rust_decimal_macros::dec;
-use tokio::sync::Mutex;
+use tokio::sync::{Mutex};
+use tokio::time::sleep;
 use tracing::{info};
 use global::cci::CentralControlInfo;
 use global::fixed_time_range_deque::FixedTimeRangeDeque;
@@ -68,6 +72,8 @@ pub struct AvellanedaStoikov {
     pub prev_save_time: Decimal,
 
     pub params: Params,
+
+    pub debug_sender: UnboundedSender<Vec<Decimal>>,
 }
 
 impl AvellanedaStoikov {
@@ -82,6 +88,45 @@ impl AvellanedaStoikov {
     const IRA: Decimal = dec!(1);
 
     pub fn new(cci_arc: Arc<Mutex<CentralControlInfo>>, params: Params) -> Self {
+        // 创建数据通道
+        // 创建一个无界通道
+        let (tx, mut rx) = futures_channel::mpsc::unbounded::<Vec<Decimal>>();
+
+        tokio::spawn(async move {
+            let len = 16usize;
+            let mut prev_save_time = dec!(0);
+            let mut debugs: Vec<Vec<Decimal>> = vec![Vec::new(); len];
+
+            loop {
+                while let Some(value) = rx.next().await {
+                    // 数据填充到对应位置
+                    for i in 0..len {
+                        debugs[i].push(value[i]);
+                    }
+
+                    // 长度限制
+                    if debugs[0].len() > 700_000 {
+                        for i in 0..len {
+                            debugs[i].remove(0);
+                        }
+                    }
+                }
+
+                let now = Decimal::from(Utc::now().timestamp_millis());
+                if now - prev_save_time < dec!(60000) {
+                    sleep(Duration::from_millis(1000)).await;
+
+                    continue;
+                }
+
+                let temp_html_str = utils::build_html_file(&debugs).await;
+                utils::write_to_file(&temp_html_str, "./db/db.html".to_string()).await;
+
+                prev_save_time = Decimal::from(Utc::now().timestamp_millis());
+                info!("存放完毕, {}", prev_save_time - now);
+            }
+        });
+
         let avellaneda_stoikov = Self {
             // 接针版本
             depth_vec: vec![Depth::new(); 10],
@@ -138,6 +183,8 @@ impl AvellanedaStoikov {
             prev_save_time: Decimal::from(Utc::now().timestamp_millis()),
             dir: Default::default(),
             params,
+
+            debug_sender: tx,
         };
 
         avellaneda_stoikov
@@ -550,65 +597,83 @@ impl AvellanedaStoikov {
             return;
         }
 
-        self.prev_insert_time = now;
-
-        // 将数据放入debugs里面
-        if self.debugs.is_empty() {
-            self.debugs = vec![Vec::new(); 16];
-        }
-        self.debugs[0].push(now);
-        self.debugs[1].push(mid_price);
-        self.debugs[2].push(ask_price);
-        self.debugs[3].push(bid_price);
-        self.debugs[4].push(last_price);
-        self.debugs[5].push(spread);
-        self.debugs[6].push(spread_max);
-        self.debugs[7].push(spread_min);
-        self.debugs[8].push(optimal_ask_price);
-        self.debugs[9].push(optimal_bid_price);
-        self.debugs[10].push(inventory);
-        self.debugs[11].push(sigma_square);
-        self.debugs[12].push(gamma);
-        self.debugs[13].push(kappa);
-        self.debugs[14].push(flow_ratio);
-        self.debugs[15].push(ref_price);
-
-        // 长度限定
-        if self.debugs[0].len() > 700_000 {
-            self.debugs[0].remove(0);
-            self.debugs[1].remove(0);
-            self.debugs[2].remove(0);
-            self.debugs[3].remove(0);
-            self.debugs[4].remove(0);
-            self.debugs[5].remove(0);
-            self.debugs[6].remove(0);
-            self.debugs[7].remove(0);
-            self.debugs[8].remove(0);
-            self.debugs[9].remove(0);
-            self.debugs[10].remove(0);
-            self.debugs[11].remove(0);
-            self.debugs[12].remove(0);
-            self.debugs[13].remove(0);
-            self.debugs[14].remove(0);
-            self.debugs[15].remove(0);
-        }
-
-        let error_rate = self.error_rate;
-        // 将数据存入本地json文件,要求不能在行情烈度比较大的时候执行这个逻辑,防止卡交易逻辑
-        if now - self.prev_save_time < dec!(60000) || error_rate > dec!(0.15) {
-            return;
-        }
-        // 存放逻辑
-        info!("存放, {}, {}, {}", now, self.prev_save_time, now - self.prev_save_time);
-
-        let data_c = self.debugs.clone();
-        tokio::spawn(async move {
-            let temp_html_str = utils::build_html_file(&data_c).await;
-            utils::write_to_file(&temp_html_str, "./db/db.html".to_string()).await;
-        });
-
-        self.prev_save_time = Decimal::from(Utc::now().timestamp_millis());
-        info!("存放完毕, {}", self.prev_save_time - now);
+        self.debug_sender.unbounded_send(vec![
+            now,
+            mid_price,
+            ask_price,
+            bid_price,
+            last_price,
+            spread,
+            spread_max,
+            spread_min,
+            optimal_ask_price,
+            optimal_bid_price,
+            inventory,
+            sigma_square,
+            gamma,
+            kappa,
+            flow_ratio,
+            ref_price
+        ]).unwrap()
+
+        // self.prev_insert_time = now;
+        //
+        // // 将数据放入debugs里面
+        // if self.debugs.is_empty() {
+        //     self.debugs = vec![Vec::new(); 16];
+        // }
+        // self.debugs[0].push(now);
+        // self.debugs[1].push(mid_price);
+        // self.debugs[2].push(ask_price);
+        // self.debugs[3].push(bid_price);
+        // self.debugs[4].push(last_price);
+        // self.debugs[5].push(spread);
+        // self.debugs[6].push(spread_max);
+        // self.debugs[7].push(spread_min);
+        // self.debugs[8].push(optimal_ask_price);
+        // self.debugs[9].push(optimal_bid_price);
+        // self.debugs[10].push(inventory);
+        // self.debugs[11].push(sigma_square);
+        // self.debugs[12].push(gamma);
+        // self.debugs[13].push(kappa);
+        // self.debugs[14].push(flow_ratio);
+        // self.debugs[15].push(ref_price);
+        //
+        // // 长度限定
+        // if self.debugs[0].len() > 700_000 {
+        //     self.debugs[0].remove(0);
+        //     self.debugs[1].remove(0);
+        //     self.debugs[2].remove(0);
+        //     self.debugs[3].remove(0);
+        //     self.debugs[4].remove(0);
+        //     self.debugs[5].remove(0);
+        //     self.debugs[6].remove(0);
+        //     self.debugs[7].remove(0);
+        //     self.debugs[8].remove(0);
+        //     self.debugs[9].remove(0);
+        //     self.debugs[10].remove(0);
+        //     self.debugs[11].remove(0);
+        //     self.debugs[12].remove(0);
+        //     self.debugs[13].remove(0);
+        //     self.debugs[14].remove(0);
+        //     self.debugs[15].remove(0);
+        // }
+        //
+        // // 将数据存入本地json文件,要求不能在行情烈度比较大的时候执行这个逻辑,防止卡交易逻辑
+        // if now - self.prev_save_time < dec!(60000) {
+        //     return;
+        // }
+        // // 存放逻辑
+        // info!("存放, {}, {}, {}", now, self.prev_save_time, now - self.prev_save_time);
+        //
+        // let data_c = self.debugs.clone();
+        // tokio::spawn(async move {
+        //     let temp_html_str = utils::build_html_file(&data_c).await;
+        //     utils::write_to_file(&temp_html_str, "./db/db.html".to_string()).await;
+        // });
+        //
+        // self.prev_save_time = Decimal::from(Utc::now().timestamp_millis());
+        // info!("存放完毕, {}", self.prev_save_time - now);
     }
 
     // #[instrument(skip(self, ref_ticker_map), level="TRACE")]