Explorar o código

k线基础数据加入,明天再计算MFI吧……

skyffire hai 1 ano
pai
achega
5f4fcf4d18

+ 19 - 2
standard/src/bybit_swap_handle.rs

@@ -10,7 +10,7 @@ use tokio::time::Instant;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Depth, Trade, Ticker};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Depth, Trade, Ticker, Record};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -251,4 +251,21 @@ pub fn format_depth_items(value: Value, mul: &Decimal) -> Vec<OrderBook> {
         })
     }
     depth_items
-}
+}
+
+pub fn handle_records(value: &Value) -> Vec<Record> {
+    let mut records = vec![];
+    for record_value in value.as_array().unwrap() {
+        records.push(Record {
+            time: Decimal::from_i64(record_value["start"].as_i64().unwrap()).unwrap(),
+            open: Decimal::from_str(record_value["open"].as_str().unwrap()).unwrap(),
+            high: Decimal::from_str(record_value["high"].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(record_value["low"].as_str().unwrap()).unwrap(),
+            close: Decimal::from_str(record_value["close"].as_str().unwrap()).unwrap(),
+            volume: Decimal::from_str(record_value["volume"].as_str().unwrap()).unwrap(),
+            symbol: "".to_string(),
+        });
+    }
+
+    return records;
+}

+ 1 - 1
standard/src/exchange_struct_handler.rs

@@ -191,7 +191,7 @@ impl ExchangeStructHandler {
                 binance_swap_handle::handle_records(&res_data.data)
             }
             ExchangeEnum::BybitSwap => { // 未使用暂不实现
-                vec![]
+                bybit_swap_handle::handle_records(&res_data.data)
             }
             // ExchangeEnum::HtxSwap => {
             //     htx_swap_handle::handle_records(&res_data.data)

+ 7 - 3
strategy/src/avellaneda_stoikov.rs

@@ -9,7 +9,7 @@ use tracing::info;
 use global::cci::CentralControlInfo;
 use global::fixed_time_range_deque::FixedTimeRangeDeque;
 use global::predictor_state::PredictorState;
-use standard::{Depth, Ticker, Trade};
+use standard::{Depth, Record, Ticker, Trade};
 
 #[derive(Debug)]
 pub struct AvellanedaStoikov {
@@ -130,7 +130,7 @@ impl AvellanedaStoikov {
 
             let mut first_trade_price = last_trade.price;
             for trade in self.trade_long_vec.deque.iter().rev() {
-                if last_trade_time - trade.time > dec!(6) {
+                if last_trade_time - trade.time > Decimal::TEN {
                     break;
                 }
 
@@ -173,6 +173,10 @@ impl AvellanedaStoikov {
 
     pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
 
+    pub async fn on_record(&mut self, record: &Record) {
+        info!(?record);
+    }
+
     pub async fn update_inventory(&mut self, inventory: &Decimal, min_amount_value: &Decimal) {
         let prev_inventory = self.inventory;
         self.inventory = (inventory / (min_amount_value / self.mid_price)).round();
@@ -185,7 +189,7 @@ impl AvellanedaStoikov {
     }
 
     pub fn update_sigma_square(&mut self) {
-        self.sigma_square = self.spread_max;
+        self.sigma_square = self.spread_max * dec!(0.5);
         self.sigma_square.rescale(10);
     }
 

+ 17 - 2
strategy/src/bybit_usdt_swap.rs

@@ -13,7 +13,7 @@ use standard::exchange::ExchangeEnum::BybitSwap;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use standard::{Depth, OrderBook};
 use crate::core::Core;
-use crate::exchange_disguise::{on_depth, on_ticker, on_trade};
+use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
 use crate::model::OrderInfo;
 
 // 参考 Bybit 合约 启动
@@ -29,6 +29,7 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         ws.set_subscribe(vec![
             BybitSwapSubscribeType::PuOrderBook1,
             BybitSwapSubscribeType::PuTrade,
+            BybitSwapSubscribeType::PuKline("1".to_string()),
             // BybitSwapSubscribeType::PuTickers
         ]);
 
@@ -166,7 +167,21 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
             trace_stack.on_after_format();
 
             on_ticker(core_arc, &mut trace_stack, &ticker).await;
-        }
+        },
+        // k线数据
+        "kline" => {
+            let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response);
+
+            if records.is_empty() {
+                return;
+            }
+
+            for record in records.iter_mut() {
+                let core_arc_clone = core_arc.clone();
+
+                on_record(core_arc_clone, &mut trace_stack, record).await
+            }
+        },
         _ => {
             error!("未知推送类型");
             error!(?response);

+ 5 - 1
strategy/src/core.rs

@@ -18,7 +18,7 @@ use tracing::{error, info, warn};
 use global::cci::CentralControlInfo;
 use global::params::Params;
 use global::trace_stack::TraceStack;
-use standard::{Account, Depth, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker, Trade};
+use standard::{Account, Depth, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Record, SpecialTicker, Ticker, Trade};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, BybitSwap, GateSwap};
 
@@ -616,6 +616,10 @@ impl Core {
         self.avellaneda_stoikov.on_ticker(ticker).await;
     }
 
+    pub async fn on_record(&mut self, record: &Record, _trace_stack: &mut TraceStack) {
+        self.avellaneda_stoikov.on_record(record).await;
+    }
+
     // #[instrument(skip(self, depth, name_ref, trace_stack), level="TRACE")]
     pub async fn on_depth(&mut self, depth: &Depth, name_ref: &String, trace_stack: &mut TraceStack) {
         // ================================ 刷新更新间隔 =========================================

+ 8 - 1
strategy/src/exchange_disguise.rs

@@ -3,7 +3,7 @@ use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
-use standard::{Depth, Ticker, Trade};
+use standard::{Depth, Record, Ticker, Trade};
 use crate::binance_usdt_swap::{binance_swap_run, reference_binance_swap_run};
 use crate::bybit_usdt_swap::{bybit_swap_run, reference_bybit_swap_run};
 use crate::coinex_usdt_swap::coinex_swap_run;
@@ -144,6 +144,13 @@ pub async fn on_ticker(core_arc: Arc<Mutex<Core>>, trace_stack: &mut TraceStack,
     core.on_ticker(ticker, trace_stack).await;
 }
 
+pub async fn on_record(core_arc: Arc<Mutex<Core>>, trace_stack: &mut TraceStack, record: &Record) {
+    let mut core = core_arc.lock().await;
+    trace_stack.on_after_unlock_core();
+
+    core.on_record(record, trace_stack).await;
+}
+
 pub async fn on_order() {}
 
 pub async fn on_position() {}

+ 1 - 1
strategy/src/strategy.rs

@@ -1110,7 +1110,7 @@ impl Strategy {
         }
 
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+        // self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter