浏览代码

准备实现另一个做市思路。

skyffire 1 年之前
父节点
当前提交
debc4380f9

+ 1 - 1
derive/src/binance_swap_export.rs

@@ -90,7 +90,7 @@ impl ExportConnector for BinanceSwapExport {
                     ]);
                 }
 
-                if trades_info.len().to_i64().unwrap() < 96 {
+                if trades_info.len().to_i64().unwrap() < limit_params {
                     break;
                 }
             }

+ 2 - 2
derive/tests/binance_swap_export_test.rs

@@ -5,7 +5,7 @@ use derive::export_excel::ExportEnum;
 use crate::export_excel_test::test_new_export;
 
 
-const SYMBOL: &str = "RDNT_USDT";
+const SYMBOL: &str = "SUPER_USDT";
 
 // 测试获取Exchange实体
 #[tokio::test]
@@ -14,6 +14,6 @@ async fn test_get_self_exchange() {
     global::log_utils::init_log_with_trace();
 
     let mut export = test_new_export(ExportEnum::BinanceSwap).await;
-    let export_trades = export.export_trades("binance_swap", SYMBOL.to_string(), 1725003480000, 1725011269000, 1000).await;
+    let export_trades = export.export_trades("binance_swap", SYMBOL.to_string(), 1726027200000, 1726103276000, 1000).await;
     trace!(?export_trades);
 }

+ 5 - 5
exchanges/src/bybit_swap_rest.rs

@@ -88,12 +88,12 @@ impl BybitSwapRest {
         data
     }
     //查詢市場價格K線數據
-    pub async fn get_kline(&mut self, symbol: String) -> ResponseData {
+    pub async fn get_record(&mut self, symbol: String, interval: String, limit: String) -> ResponseData {
         let params = serde_json::json!({
-               "category":"linear",
-                "symbol":symbol,
-                "interval":"1",
-                "limit":"200"
+               "category": "linear",
+                "symbol": symbol,
+                "interval": interval,
+                "limit": limit
          });
         let data = self.request("GET".to_string(),
                                 "/v5".to_string(),

+ 47 - 8
exchanges/tests/bybit_swap_test.rs

@@ -7,6 +7,7 @@ use tracing::trace;
 
 use exchanges::bybit_swap_rest::BybitSwapRest;
 use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
+use exchanges::response_base::ResponseData;
 
 const ACCESS_KEY: &str = "";
 const SECRET_KEY: &str = "";
@@ -19,10 +20,10 @@ async fn ws_custom_subscribe_pu() {
 
 
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
 
 
-    let mut ws = get_ws(None, BybitSwapWsType::Public).await;
+    let mut ws = get_ws(None, BybitSwapWsType::Public);
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
         // BybitSwapSubscribeType::PuOrderBook1,
@@ -69,10 +70,28 @@ async fn ws_custom_subscribe_pu() {
     //     trace!("线程-数据写入-结束");
     // });
 
+
+    let fun = move |data: ResponseData| {
+        // 在 async 块之前克隆 Arc
+        // let core_arc_cc = core_arc_clone.clone();
+        // let mul = multiplier.clone();
+        //
+        // let depth_asks = Arc::clone(&depth_asks);
+        // let depth_bids = Arc::clone(&depth_bids);
+
+        async move {
+            trace!("333333333333333:ResponseData:{:?}",data);
+            // let mut depth_asks = depth_asks.lock().await;
+            // let mut depth_bids = depth_bids.lock().await;
+            // 使用克隆后的 Arc,避免 move 语义
+            // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
+        }
+    };
+
     let t1 = tokio::spawn(async move {
         //链接
         let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        ws.ws_connect_async(bool_v3_clone,fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
         trace!("test 唯一线程结束--");
     });
     tokio::try_join!(t1).unwrap();
@@ -86,18 +105,19 @@ async fn ws_custom_subscribe_pu() {
 //ws-订阅私有频道信息
 #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
 async fn ws_custom_subscribe_pr() {
+
     global::log_utils::init_log_with_trace();
 
 
     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
 
     let logparam = BybitSwapLogin {
         api_key: ACCESS_KEY.to_string(),
         secret_key: SECRET_KEY.to_string(),
     };
 
-    let mut ws = get_ws(Option::from(logparam), BybitSwapWsType::Private).await;
+    let mut ws = get_ws(Option::from(logparam), BybitSwapWsType::Private);
     ws.set_symbols(vec!["BTC_USDT".to_string()]);
     ws.set_subscribe(vec![
         BybitSwapSubscribeType::PrPosition,
@@ -144,10 +164,29 @@ async fn ws_custom_subscribe_pr() {
     //     trace!("线程-数据写入-结束");
     // });
 
+    let fun = move |data: ResponseData| {
+        // 在 async 块之前克隆 Arc
+        // let core_arc_cc = core_arc_clone.clone();
+        // let mul = multiplier.clone();
+        //
+        // let depth_asks = Arc::clone(&depth_asks);
+        // let depth_bids = Arc::clone(&depth_bids);
+
+        async move {
+            trace!("333333333333333:ResponseData:{:?}",data);
+            // let mut depth_asks = depth_asks.lock().await;
+            // let mut depth_bids = depth_bids.lock().await;
+            // 使用克隆后的 Arc,避免 move 语义
+            // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
+        }
+    };
+
     let t1 = tokio::spawn(async move {
         //链接
         let bool_v3_clone = Arc::clone(&is_shutdown_arc);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        // ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        ws.ws_connect_async(bool_v3_clone,fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+
         trace!("test 唯一线程结束--");
     });
     tokio::try_join!(t1).unwrap();
@@ -184,7 +223,7 @@ async fn rest_get_kline_test() {
     global::log_utils::init_log_with_trace();
 
     let mut ret = get_rest();
-    let req_data = ret.get_kline("DOGEUSDT".to_string()).await;
+    let req_data = ret.get_record("DOGEUSDT".to_string(), "1".to_string(), "5".to_string()).await;
     println!("Bybit--查詢市場價格K線數據--{:?}", req_data);
 }
 
@@ -305,7 +344,7 @@ async fn rest_cancel_orders_test() {
 }
 
 
-async fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
+ fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
     let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
     ku_ws
 }

+ 2 - 1
standard/Cargo.toml

@@ -19,4 +19,5 @@ futures = "0.3"
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
 toml = "0.5.11"
-futures-channel = "0.3.29"
+futures-channel = "0.3.29"
+lazy_static = "1.4.0"

+ 9 - 3
standard/src/binance_swap.rs

@@ -13,7 +13,7 @@ use tokio::spawn;
 use tokio::sync::mpsc::Sender;
 use tokio::time::Instant;
 use tracing::{error, info};
-use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, utils, PositionModeEnum};
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, utils, PositionModeEnum, Record};
 use exchanges::binance_swap_rest::BinanceSwapRest;
 use global::trace_stack::TraceStack;
 
@@ -177,13 +177,14 @@ impl Platform for BinanceSwap {
         if res_data.code == 200 {
             let res_data_json: serde_json::Value = res_data.data;
             let result = Ticker {
-                time: res_data_json["time"].as_i64().unwrap(),
+                time: Decimal::from_i64(res_data_json["time"].as_i64().unwrap()).unwrap(),
                 high: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 low: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 sell: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 buy: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 last: dec!(-1),
                 volume: dec!(-1),
+                open_interest: Default::default(),
             };
             Ok(result)
         } else {
@@ -191,19 +192,24 @@ impl Platform for BinanceSwap {
         }
     }
 
+    async fn get_record(&mut self, _interval: String) -> Result<Vec<Record>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "binance_usdt_swap:该交易所方法未实现".to_string()))
+    }
+
     async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
         let symbol_format = utils::format_symbol(symbol.clone(), "");
         let res_data = self.request.get_book_ticker(symbol_format).await;
         if res_data.code == 200 {
             let res_data_json: serde_json::Value = res_data.data;
             let result = Ticker {
-                time: res_data_json["time"].as_i64().unwrap(),
+                time: Decimal::from_i64(res_data_json["time"].as_i64().unwrap()).unwrap(),
                 high: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 low: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 sell: Decimal::from_str(res_data_json["askPrice"].as_str().unwrap()).unwrap(),
                 buy: Decimal::from_str(res_data_json["bidPrice"].as_str().unwrap()).unwrap(),
                 last: dec!(-1),
                 volume: dec!(-1),
+                open_interest: Default::default(),
             };
             Ok(result)
         } else {

+ 36 - 5
standard/src/bybit_swap.rs

@@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
 use tokio::time::Instant;
 use tracing::{error, info, trace};
 use exchanges::bybit_swap_rest::BybitSwapRest;
-use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum, Record};
 use global::trace_stack::TraceStack;
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -193,19 +193,49 @@ impl Platform for BybitSwap {
             }
             let value = list[0].clone();
             Ok(Ticker{
-                time: chrono::Utc::now().timestamp_millis(),
+                time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                 high: value.high_price24h,
                 low: value.low_price24h,
                 sell: value.ask1_price,
                 buy: value.bid1_price,
                 last: value.last_price,
-                volume: value.volume24h
+                volume: value.volume24h,
+                open_interest: Default::default(),
             })
         } else {
             Err(Error::new(ErrorKind::Other, res_data.to_string()))
         }
     }
 
+    async fn get_record(&mut self, interval: String) -> Result<Vec<Record>, Error> {
+        let symbol = self.symbol_uppercase.clone();
+
+        let res_data = self.request.get_record(symbol, interval, "3".to_string()).await;
+
+        if res_data.code == 200 {
+            let mut records: Vec<Record> = vec![];
+
+            for record_value in res_data.data["list"].as_array().unwrap() {
+                let value_array = record_value.as_array().unwrap();
+
+                records.push(Record {
+                    time: Decimal::from_str(value_array[0].as_str().unwrap()).unwrap(),
+                    open: Decimal::from_str(value_array[1].as_str().unwrap()).unwrap(),
+                    high: Decimal::from_str(value_array[2].as_str().unwrap()).unwrap(),
+                    low: Decimal::from_str(value_array[3].as_str().unwrap()).unwrap(),
+                    close: Decimal::from_str(value_array[4].as_str().unwrap()).unwrap(),
+                    volume: Decimal::from_str(value_array[5].as_str().unwrap()).unwrap(),
+                    symbol: "".to_string(),
+                });
+            }
+
+            records.reverse();
+            Ok(records)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
     async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
         let symbol_upper = symbol.replace("_", "").to_uppercase();
         let res_data = self.request.get_tickers(symbol_upper.clone()).await;
@@ -220,13 +250,14 @@ impl Platform for BybitSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: value.high_price24h,
                         low: value.low_price24h,
                         sell: value.ask1_price,
                         buy: value.bid1_price,
                         last: value.last_price,
-                        volume: value.volume24h
+                        volume: value.volume24h,
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }

+ 65 - 3
standard/src/bybit_swap_handle.rs

@@ -1,12 +1,16 @@
 use std::str::FromStr;
-use rust_decimal::Decimal;
+use std::sync::Arc;
+use chrono::Utc;
+use lazy_static::lazy_static;
+use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
 use serde_json::{from_value, Value};
+use tokio::sync::Mutex;
 use tokio::time::Instant;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Depth, Trade};
+use crate::{Account, OrderBook, Order, Position, PositionModeEnum, SpecialOrder, Depth, Trade, Ticker, Record};
 
 // 处理账号信息
 pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
@@ -193,6 +197,47 @@ pub fn handle_book_ticker(res_data: &ResponseData, mul: &Decimal) -> Depth {
     }
 }
 
+lazy_static! {
+    static ref TICKER: Arc<Mutex<Ticker>> = Arc::new(Mutex::new(Ticker::new()));
+}
+
+pub async fn handle_ticker(res_data: &ResponseData) -> Ticker {
+    let mut ticker = TICKER.lock().await;
+
+    ticker.time = Decimal::from_i64(Utc::now().timestamp_millis()).unwrap();
+    ticker.high = match res_data.data["highPrice24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.high }
+    };
+    ticker.low = match res_data.data["lowPrice24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.low }
+    };
+    ticker.sell = match res_data.data["ask1Price"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.sell }
+    };
+    ticker.buy = match res_data.data["bid1Price"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.buy }
+    };
+    ticker.last = match res_data.data["lastPrice"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.last }
+    };
+    ticker.volume = match res_data.data["volume24h"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.volume }
+    };
+    ticker.open_interest = match res_data.data["openInterest"].as_str() {
+        Some(str) => { Decimal::from_str(str).unwrap() }
+        None => { ticker.open_interest }
+    };
+    // let s = res_data.data["symbol"].as_str().unwrap().replace("USDT", "_USDT");
+
+    ticker.clone()
+}
+
 pub fn format_depth_items(value: Value, mul: &Decimal) -> Vec<OrderBook> {
     let mut depth_items: Vec<OrderBook> = vec![];
     for val in value.as_array().unwrap() {
@@ -206,4 +251,21 @@ pub fn format_depth_items(value: Value, mul: &Decimal) -> Vec<OrderBook> {
         })
     }
     depth_items
-}
+}
+
+pub fn handle_records(value: &Value) -> Vec<Record> {
+    let mut records = vec![];
+    for record_value in value.as_array().unwrap() {
+        records.push(Record {
+            time: Decimal::from_i64(record_value["start"].as_i64().unwrap()).unwrap(),
+            open: Decimal::from_str(record_value["open"].as_str().unwrap()).unwrap(),
+            high: Decimal::from_str(record_value["high"].as_str().unwrap()).unwrap(),
+            low: Decimal::from_str(record_value["low"].as_str().unwrap()).unwrap(),
+            close: Decimal::from_str(record_value["close"].as_str().unwrap()).unwrap(),
+            volume: Decimal::from_str(record_value["volume"].as_str().unwrap()).unwrap(),
+            symbol: "".to_string(),
+        });
+    }
+
+    return records;
+}

+ 9 - 3
standard/src/coinex_swap.rs

@@ -12,7 +12,7 @@ use tokio::spawn;
 use tokio::time::Instant;
 use tracing::{error, info, trace};
 use exchanges::coinex_swap_rest::CoinexSwapRest;
-use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum, utils};
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum, utils, Record};
 use global::trace_stack::TraceStack;
 use crate::utils::get_tick_size;
 
@@ -182,13 +182,14 @@ impl Platform for CoinexSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }
@@ -198,6 +199,10 @@ impl Platform for CoinexSwap {
         }
     }
 
+    async fn get_record(&mut self, _interval: String) -> Result<Vec<Record>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "coinex_usdt_swap:该交易所方法未实现".to_string()))
+    }
+
     async fn get_ticker_symbol(&mut self, symbol_param: String) -> Result<Ticker, Error> {
         let symbol: String = symbol_param.replace("_", "").to_uppercase();
         let res_data = self.request.get_ticker(symbol.clone()).await;
@@ -211,13 +216,14 @@ impl Platform for CoinexSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }

+ 14 - 4
standard/src/exchange_struct_handler.rs

@@ -4,7 +4,7 @@ use rust_decimal::prelude::FromPrimitive;
 use tracing::error;
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_swap_handle};
+use crate::{binance_swap_handle, bybit_swap_handle, coinex_swap_handle, gate_swap_handle, Ticker};
 use crate::{Record, Trade, Depth};
 use crate::{Account, OrderBook, Position, SpecialOrder};
 
@@ -98,7 +98,6 @@ impl ExchangeStructHandler {
             symbol,
         }
     }
-
     // 处理成交信息,关于mul的问题:
     //     因为部分交易所比较特殊,返回的深度的数量是张数,这里是标准化成U量的形式;
     //     如果是不需要处理的交易所,传个Decimal::ONE就行了
@@ -158,7 +157,6 @@ impl ExchangeStructHandler {
             // }
         }
     }
-
     // 处理BookTicker信息
     pub fn book_ticker_handle(exchange: ExchangeEnum, res_data: &ResponseData, mul: &Decimal) -> Depth {
         match exchange {
@@ -193,7 +191,7 @@ impl ExchangeStructHandler {
                 binance_swap_handle::handle_records(&res_data.data)
             }
             ExchangeEnum::BybitSwap => { // 未使用暂不实现
-                vec![]
+                bybit_swap_handle::handle_records(&res_data.data)
             }
             // ExchangeEnum::HtxSwap => {
             //     htx_swap_handle::handle_records(&res_data.data)
@@ -231,6 +229,18 @@ impl ExchangeStructHandler {
             }
         }
     }
+    // 处理ticker信息
+    pub async fn ticker_handle(exchange: ExchangeEnum, res_data: &ResponseData) -> Ticker {
+        match exchange {
+            ExchangeEnum::BybitSwap => {
+                bybit_swap_handle::handle_ticker(res_data).await
+            },
+            _ => {
+                error!("未找到该交易所!trades_handle: {:?}", exchange);
+                panic!("未找到该交易所!trades_handle: {:?}", exchange);
+            }
+        }
+    }
     // 处理账号信息
     pub fn account_info_handle(exchange: ExchangeEnum, res_data: &ResponseData, symbol: &String) -> Account {
         match exchange {

+ 9 - 3
standard/src/gate_swap.rs

@@ -11,7 +11,7 @@ use serde_json::{json, Value};
 use tokio::spawn;
 use tokio::time::Instant;
 use tracing::{error, info};
-use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum};
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum, Record};
 use exchanges::gate_swap_rest::GateSwapRest;
 use global::trace_stack::TraceStack;
 
@@ -165,13 +165,14 @@ impl Platform for GateSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high_24h"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low_24h"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["lowest_ask"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["highest_bid"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume_24h"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }
@@ -181,6 +182,10 @@ impl Platform for GateSwap {
         }
     }
 
+    async fn get_record(&mut self, _interval: String) -> Result<Vec<Record>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "gate_swap:该交易所方法未实现".to_string()))
+    }
+
     async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
         let symbol_upper = symbol.to_uppercase();
         let symbol_array: Vec<&str> = symbol_upper.split("_").collect();
@@ -195,13 +200,14 @@ impl Platform for GateSwap {
                 }
                 Some(value) => {
                     let result = Ticker {
-                        time: chrono::Utc::now().timestamp_millis(),
+                        time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
                         high: Decimal::from_str(value["high_24h"].as_str().unwrap()).unwrap(),
                         low: Decimal::from_str(value["low_24h"].as_str().unwrap()).unwrap(),
                         sell: Decimal::from_str(value["lowest_ask"].as_str().unwrap()).unwrap(),
                         buy: Decimal::from_str(value["highest_bid"].as_str().unwrap()).unwrap(),
                         last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
                         volume: Decimal::from_str(value["volume_24h"].as_str().unwrap()).unwrap(),
+                        open_interest: Default::default(),
                     };
                     Ok(result)
                 }

+ 6 - 2
standard/src/lib.rs

@@ -416,25 +416,27 @@ impl Order {
 /// - `volume(Decimal)`: 最近成交量
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct Ticker {
-    pub time: i64,
+    pub time: Decimal,
     pub high: Decimal,
     pub low: Decimal,
     pub sell: Decimal,
     pub buy: Decimal,
     pub last: Decimal,
     pub volume: Decimal,
+    pub open_interest: Decimal,
 }
 
 impl Ticker {
     pub fn new() -> Ticker {
         Ticker {
-            time: 0,
+            time: Default::default(),
             high: Default::default(),
             low: Default::default(),
             sell: Default::default(),
             buy: Default::default(),
             last: Default::default(),
             volume: Default::default(),
+            open_interest: Default::default(),
         }
     }
 }
@@ -637,6 +639,8 @@ pub trait Platform {
     async fn get_positions(&mut self) -> Result<Vec<Position>, Error>;
     // 获取市场行情
     async fn get_ticker(&mut self) -> Result<Ticker, Error>;
+    // 获取市场行情
+    async fn get_record(&mut self, interval: String) -> Result<Vec<Record>, Error>;
     // 获取市场行情自定义交易对
     async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error>;
     // 查询所有的市场信息

+ 240 - 132
strategy/src/avellaneda_stoikov.rs

@@ -9,17 +9,20 @@ use tracing::info;
 use global::cci::CentralControlInfo;
 use global::fixed_time_range_deque::FixedTimeRangeDeque;
 use global::predictor_state::PredictorState;
-use standard::{Depth, Ticker, Trade};
+use standard::{Depth, Record, Ticker, Trade};
 
 #[derive(Debug)]
 pub struct AvellanedaStoikov {
     pub depth_vec: FixedTimeRangeDeque<Depth>,                                  // 深度队列
-    pub trade_vec: FixedTimeRangeDeque<Trade>,                                  // 交易队列
+    pub trade_long_vec: FixedTimeRangeDeque<Trade>,                             // 交易队列
+    pub trade_short_vec: FixedTimeRangeDeque<Trade>,                            // 交易队列
     pub spread_vec: FixedTimeRangeDeque<Decimal>,
+    pub record_vec: FixedTimeRangeDeque<Record>,                                // 蜡烛队列
 
     pub mid_price: Decimal,                                                     // 中间价
     pub ask_price: Decimal,                                                     // 卖一价
     pub bid_price: Decimal,                                                     // 买一价
+    pub last_price: Decimal,                                                    // 最后成交价
     pub spread: Decimal,                                                        // 市场冲击
     pub spread_max: Decimal,                                                    // 最大市场冲击
     pub spread_min: Decimal,                                                    // 最小市场冲击
@@ -27,18 +30,20 @@ pub struct AvellanedaStoikov {
     pub optimal_bid_price: Decimal,                                             // 买入挂单价
 
     pub inventory: Decimal,                                                     // 库存,也就是q
+    pub level: Decimal,                                                         // martin
     pub sigma_square: Decimal,                                                  // σ^2,波动性的平方
     pub gamma: Decimal,                                                         // γ,库存风险厌恶参数
     pub kappa: Decimal,                                                         // κ 订单簿 流动性 参数
 
-    pub flow_in_value: Decimal,
-    pub flow_out_value: Decimal,
-    pub flow_ratio: Decimal,                                                    // 资金流比例
-    pub flow_ratio_diff_log: Decimal,                                           // 一阶导数
+    pub flow_ratio_long: Decimal,                                               // 资金流比例
+    pub flow_ratio_short: Decimal,                                              // 资金流比例
 
-    pub delta_ask: Decimal,                                                     // δa
-    pub delta_bid: Decimal,                                                     // δb
+    pub ask_delta: Decimal,                                                     // δa
+    pub bid_delta: Decimal,                                                     // δb
+    pub base_delta: Decimal,                                                    // 基础挂单距离
+    pub ratio_edge: Decimal,                                                    // 资金流修正的挂单距离
     pub ref_price: Decimal,                                                     // 预定价格
+    pub init_delta_plus: Decimal,                                               // 最初的delta之和
 
     pub cci_arc: Arc<Mutex<CentralControlInfo>>,                                // 中控信息
 
@@ -50,20 +55,26 @@ pub struct AvellanedaStoikov {
 impl AvellanedaStoikov {
     // 时间窗口大小(微秒)
     const MAX_TIME_RANGE_MICROS: i64 = 3 * 60_000_000;
-    const ONE_MILLION: Decimal = dec!(1_000_000);
-    const TWENTY_THOUSAND: Decimal = dec!(20_000);
+    const RECORD_RANGE_MICROS: i64 = 4 * 60_000_000;
+    const TRADE_LONG_RANGE_MICROS: i64 = 3 * 60_000_000;
+    const TRADE_SHORT_RANGE_MICROS: i64 = 20_000_000;
+    // const ONE_MILLION: Decimal = dec!(1_000_000);
+    // const TWENTY_THOUSAND: Decimal = dec!(20_000);
     const IRA: Decimal = dec!(1);
-    
+
     pub fn new(cci_arc: Arc<Mutex<CentralControlInfo>>) -> Self {
         let avellaneda_stoikov = Self {
             // 分别给与的长度
             depth_vec: FixedTimeRangeDeque::new(Self::MAX_TIME_RANGE_MICROS),
-            trade_vec: FixedTimeRangeDeque::new(Self::MAX_TIME_RANGE_MICROS),
             spread_vec: FixedTimeRangeDeque::new(Self::MAX_TIME_RANGE_MICROS),
+            trade_long_vec: FixedTimeRangeDeque::new(Self::TRADE_LONG_RANGE_MICROS),
+            trade_short_vec: FixedTimeRangeDeque::new(Self::TRADE_SHORT_RANGE_MICROS),
+            record_vec: FixedTimeRangeDeque::new(Self::RECORD_RANGE_MICROS),
 
             mid_price: Default::default(),
             ask_price: Default::default(),
             bid_price: Default::default(),
+            last_price: Default::default(),
             spread: Default::default(),
             spread_max: Default::default(),
             spread_min: Default::default(),
@@ -73,20 +84,23 @@ impl AvellanedaStoikov {
             inventory: Default::default(),
             gamma: Default::default(),
             sigma_square: Default::default(),
-            delta_ask: Default::default(),
-            delta_bid: Default::default(),
+            ask_delta: Default::default(),
+            bid_delta: Default::default(),
+            base_delta: Default::default(),
+            init_delta_plus: Default::default(),
+
+            ratio_edge: Default::default(),
             kappa: Default::default(),
-            flow_in_value: Default::default(),
             ref_price: Default::default(),
 
             cci_arc,
 
             is_ready: false,
-            prev_trade_time: 0,
+            prev_trade_time: Utc::now().timestamp_micros(),
             t_diff: Default::default(),
-            flow_ratio: Decimal::ONE,
-            flow_ratio_diff_log: Default::default(),
-            flow_out_value: Default::default(),
+            flow_ratio_long: Decimal::ONE,
+            level: Default::default(),
+            flow_ratio_short: Default::default(),
         };
 
         avellaneda_stoikov
@@ -111,14 +125,14 @@ impl AvellanedaStoikov {
     }
 
     pub fn update_spread(&mut self) {
-        if self.trade_vec.len() > 0 {
+        if self.trade_long_vec.len() > 0 {
             //
-            let last_trade = self.trade_vec.get(self.trade_vec.len() - 1).unwrap();
+            let last_trade = self.trade_long_vec.get(self.trade_long_vec.len() - 1).unwrap();
             let last_trade_price = last_trade.price;
             let last_trade_time = last_trade.time;
 
             let mut first_trade_price = last_trade.price;
-            for trade in self.trade_vec.deque.iter().rev() {
+            for trade in self.trade_long_vec.deque.iter().rev() {
                 if last_trade_time - trade.time > Decimal::TEN {
                     break;
                 }
@@ -147,84 +161,74 @@ impl AvellanedaStoikov {
     }
 
     pub async fn on_trade(&mut self, trade: &Trade) {
-        if trade.size > Decimal::ZERO {
-            self.flow_in_value += trade.value;
-        } else {
-            self.flow_out_value += trade.value;
-        }
-        if self.flow_in_value + self.flow_out_value > Self::ONE_MILLION {
-            self.flow_in_value = self.flow_in_value * dec!(0.618);
-            self.flow_out_value = self.flow_out_value * dec!(0.618);
-        }
+        self.trade_long_vec.push_back(trade.clone());
+        self.trade_short_vec.push_back(trade.clone());
 
-        self.trade_vec.push_back(trade.clone());
+        self.last_price = trade.price;
         self.update_spread();
         self.processor().await;
     }
 
-    pub async fn update_inventory(&mut self, inventory: &Decimal, step_size: &Decimal) {
-        let new_inventory = inventory / step_size;
+    pub async fn update_level(&mut self) {
+        self.level = (Decimal::NEGATIVE_ONE + (Decimal::ONE + dec!(8) * self.inventory.abs()).sqrt().unwrap()) / Decimal::TWO;
+        self.level = min(self.level, dec!(6));
+    }
 
-        if self.inventory.abs() < new_inventory.abs() {
-            self.prev_trade_time = Utc::now().timestamp_micros();
+    pub async fn on_ticker(&mut self, _ticker: &Ticker) {}
+
+    pub async fn on_record(&mut self, record: &Record) {
+        // 添加新蜡烛
+        if self.record_vec.len() == 0 {
+            self.record_vec.push_back(record.clone());
+        } else {
+            let last_record = self.record_vec.deque.back_mut().unwrap();
+
+            if last_record.time == record.time {
+                *last_record = record.clone();
+            } else if last_record.time < record.time {
+                self.record_vec.push_back(record.clone());
+            }
         }
+    }
 
-        self.inventory = new_inventory;
+    pub async fn update_inventory(&mut self, inventory: &Decimal, min_amount_value: &Decimal) {
+        let prev_inventory = self.inventory;
+        self.inventory = (inventory / (min_amount_value / self.mid_price)).round();
+        if prev_inventory != self.inventory {
+            self.prev_trade_time = Utc::now().timestamp_micros();
+        }
 
+        self.update_level().await;
         self.processor().await;
     }
 
     pub fn update_sigma_square(&mut self) {
-        self.sigma_square = if self.trade_vec.len() < 2 {
-            Decimal::ZERO
-        } else {
-            let last_trade = self.trade_vec.get(self.trade_vec.len() - 1).unwrap();
-
-            let mut volatility_total = Decimal::ZERO;
-            for index in (0..self.trade_vec.len()).rev() {
-                if index == self.trade_vec.len() - 1 {
-                    continue
-                }
-                let trade = self.trade_vec.get(index).unwrap();
-
-                if last_trade.time - trade.time > Decimal::ONE_THOUSAND {
-                    break;
-                }
-
-                let prev_trade = self.trade_vec.get(index + 1).unwrap();
-                let volatility = (trade.price - prev_trade.price).abs();
-                volatility_total += volatility;
-            }
-
-            volatility_total / Decimal::TEN
-        };
-
+        self.sigma_square = self.spread_max * dec!(0.5);
         self.sigma_square.rescale(10);
     }
 
     pub fn update_gamma(&mut self) {
-        self.gamma = Decimal::ONE + Self::IRA * (self.spread_max / self.spread_min).sqrt().unwrap() / Decimal::TEN;
-        self.gamma.rescale(8);
+        // self.gamma = if self.sigma_square == Decimal::ZERO || self.inventory == Decimal::ZERO {
+        //     Decimal::ONE
+        // } else {
+        //     Self::IRA * (self.spread_max - self.spread_min) / (Decimal::TWO * self.inventory.abs() * self.sigma_square)
+        // };
+        // self.gamma.rescale(8);
+
+        self.gamma = dec!(0.236) * Self::IRA;
     }
 
     pub fn update_kappa(&mut self) {
-        // if self.spread_max == Decimal::ZERO {
-        //     self.kappa = Decimal::ONE;
+        // self.kappa = if self.spread_max.is_zero() || self.init_delta_plus.is_zero() {
+        //     Decimal::ONE
         // } else {
-        //     let delta_plus_max = (Decimal::TWO - Self::IRA) * self.spread_max + Self::IRA * self.spread_min;
-        //     let mut temp = (delta_plus_max) * self.gamma - self.sigma_square * self.gamma.powd(Decimal::TWO);
-        //
+        //     let mut temp = self.init_delta_plus * self.gamma - self.sigma_square * self.gamma.powd(Decimal::TWO);
         //     temp.rescale(6);
         //
-        //     self.kappa = if temp <= Decimal::ZERO {
-        //         Decimal::TEN
-        //     } else if temp >= Decimal::TEN {
-        //         Decimal::TEN
-        //     } else {
-        //         self.gamma / (temp.exp() - Decimal::ONE)
-        //     };
-        //     self.kappa.rescale(8);
-        // }
+        //     self.gamma / (temp.exp() - Decimal::ONE)
+        // };
+        //
+        // self.kappa.rescale(8);
 
         if self.mid_price > Decimal::ZERO {
             self.kappa = dec!(888) / self.mid_price;
@@ -234,73 +238,177 @@ impl AvellanedaStoikov {
     }
 
     pub fn update_ref_price(&mut self) {
-        self.ref_price = self.mid_price - self.inventory * self.gamma * self.sigma_square * self.t_diff;
+        self.ref_price = self.mid_price;
     }
 
     pub fn update_delta(&mut self) {
         if self.gamma != Decimal::ZERO {
-            let a = (self.gamma * self.sigma_square * self.t_diff) / Decimal::TWO;
-            // let edge = (self.flow_ratio_diff_log.abs() / dec!(0.0003)) * self.gamma * self.sigma_square * self.t_diff;
+            let pos_edge = self.gamma * self.sigma_square * self.inventory.abs().powd(dec!(2)) * self.t_diff;
 
-            self.delta_bid = {
-                let b = (Decimal::ONE / self.gamma) * (Decimal::ONE + self.gamma / self.kappa).ln();
+            self.base_delta = self.gamma * self.sigma_square * self.t_diff / Decimal::TWO + (Decimal::ONE / self.gamma) * (Decimal::ONE + self.gamma / self.kappa).ln();
+            self.ratio_edge = self.flow_ratio_long * self.sigma_square;
 
-                a + b
-            };
+            self.bid_delta = self.base_delta;
+            self.ask_delta = self.base_delta;
 
-            self.delta_ask = {
-                let b = (Decimal::ONE / self.gamma) * (Decimal::ONE + self.gamma / self.kappa).ln();
+            if self.inventory > Decimal::ZERO {
+                self.bid_delta += pos_edge;
+            } else if self.inventory < Decimal::ZERO {
+                self.ask_delta += pos_edge;
+            }
 
-                a + b
-            };
+            if self.ratio_edge > Decimal::ZERO {
+                self.ask_delta = self.ask_delta - self.ratio_edge.abs() * (Decimal::TWO - self.t_diff);
+                self.bid_delta = self.bid_delta + self.ratio_edge.abs() * dec!(16);
+            } else if self.ratio_edge < Decimal::ZERO {
+                self.ask_delta = self.ask_delta + self.ratio_edge.abs() * dec!(16);
+                self.bid_delta = self.bid_delta - self.ratio_edge.abs() * (Decimal::TWO - self.t_diff);
+            }
+
+            if self.init_delta_plus.is_zero() {
+                self.init_delta_plus = (self.bid_delta + self.ask_delta) / Decimal::TWO
+            }
         }
     }
 
     pub fn update_optimal_ask_and_bid(&mut self) {
-        self.optimal_ask_price = max(self.ref_price + self.delta_ask / Decimal::TWO, self.ask_price);
-        self.optimal_bid_price = min(self.ref_price - self.delta_bid / Decimal::TWO, self.bid_price);
+        self.optimal_ask_price = max(self.ref_price + self.ask_delta / Decimal::TWO, self.ask_price);
+        self.optimal_bid_price = min(self.ref_price - self.bid_delta / Decimal::TWO, self.bid_price);
     }
 
     pub fn update_t_diff(&mut self) {
-        // let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
-        // self.t_diff = time_diff_decimal / Decimal::from_i64(Self::MAX_TIME_RANGE_MICROS).unwrap();
-        // self.t_diff = min(self.t_diff, Decimal::ONE);
-        self.t_diff = Decimal::ONE;
+        if self.prev_trade_time > 0 {
+            let time_diff_decimal = Decimal::from_i64(Utc::now().timestamp_micros() - self.prev_trade_time).unwrap();
+            self.t_diff = max(Decimal::ONE - time_diff_decimal / Decimal::from_i64(Self::MAX_TIME_RANGE_MICROS).unwrap(), Decimal::ZERO);
+        } else {
+            self.t_diff = Decimal::ONE;
+        }
     }
 
-    pub fn update_flow_ratio(&mut self) {
-        let prev_flow_ratio = self.flow_ratio;
-        self.flow_ratio = if self.trade_vec.len() < 2 {
-            Decimal::ZERO
-        } else {
-            if self.flow_out_value + self.flow_in_value <= Self::TWENTY_THOUSAND {
-                Decimal::ZERO
+    fn calc_flow_ratio(_prev_flow_ratio: &Decimal, min_volume: &Decimal, trades: &mut FixedTimeRangeDeque<Trade>) -> Decimal {
+        // let mut flow_in_value = Decimal::ZERO;
+        // let mut flow_out_value = Decimal::ZERO;
+        // for (index, trade_iter) in trades.deque.iter().enumerate() {
+        //     if index == 0 {
+        //         continue
+        //     }
+        //
+        //     let prev_trade_iter = trades.deque.get(index - 1).unwrap();
+        //     let trade = trade_iter;
+        //     if trade.price > prev_trade_iter.price {
+        //         flow_in_value += trade.value * (prev_trade_iter.price - trade.price).abs();
+        //         // flow_in_value += Decimal::ONE;
+        //     } else if trade.price < prev_trade_iter.price {
+        //         flow_out_value += trade.value * (prev_trade_iter.price - trade.price).abs();
+        //         // flow_out_value += Decimal::ONE;
+        //     } else {
+        //         // if trade.size > Decimal::ZERO {
+        //         //     flow_in_value += trade.value;
+        //         // } else {
+        //         //     flow_out_value += trade.value;
+        //         // }
+        //     }
+        //
+        //     // if trade_iter.size > Decimal::ZERO {
+        //     //     flow_in_value += trade_iter.value;
+        //     // } else {
+        //     //     flow_out_value += trade_iter.value;
+        //     // }
+        // }
+
+        // if self.trade_vec.deque.len() > 1 {
+        //     let prev_trade_iter = self.trade_vec.deque.get(self.trade_vec.deque.len() - 2).unwrap();
+        //     if trade.price > prev_trade_iter.price {
+        //         self.flow_in_value += trade.value;
+        //     } else if trade.price < prev_trade_iter.price {
+        //         self.flow_out_value += trade.value;
+        //     } else {
+        //         // if trade.size > Decimal::ZERO {
+        //         //     self.flow_in_value += trade.value;
+        //         // } else {
+        //         //     self.flow_out_value += trade.value;
+        //         // }
+        //     }
+        //
+        //     // if trade.size > Decimal::ZERO {
+        //     //     self.flow_in_value += trade.value;
+        //     // } else {
+        //     //     self.flow_out_value += trade.value;
+        //     // }
+        //
+        //     if self.flow_out_value + self.flow_in_value > dec!(2_000_000) {
+        //         self.flow_out_value = self.flow_out_value * dec!(0.618);
+        //         self.flow_in_value = self.flow_in_value * dec!(0.618);
+        //     }
+        // }
+        // else {
+        //     if trade.size > Decimal::ZERO {
+        //         self.flow_in_value += trade.value;
+        //     } else {
+        //         self.flow_out_value += trade.value;
+        //     }
+        // }
+
+        let mut flow_in_value = Decimal::ZERO;
+        let mut flow_out_value = Decimal::ZERO;
+        for trade_iter in trades.deque.iter() {
+            if trade_iter.size > Decimal::ZERO {
+                flow_in_value += trade_iter.value;
             } else {
-                self.flow_in_value / self.flow_out_value
+                flow_out_value += trade_iter.value;
             }
-        };
+        }
 
-        self.flow_ratio.rescale(6);
-        self.flow_ratio = if self.flow_ratio > Decimal::TEN {
-            Decimal::ZERO
-        } else if self.flow_ratio < dec!(-10) {
-            Decimal::ZERO
+        // 使用EMA來更新資金流,確保平滑性
+        // let a = Decimal::TWO / dec!(50);
+        if flow_out_value + flow_in_value > *min_volume {
+            // let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
+            // a * now + (Decimal::ONE - a) * prev_flow_ratio
+            (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value)
         } else {
-            self.flow_ratio
-        };
+            Decimal::ZERO
+        }
+    }
 
-        if self.flow_ratio != Decimal::ZERO {
-            if prev_flow_ratio != Decimal::ZERO && self.flow_ratio != prev_flow_ratio {
-                let flow_ratio_diff = self.flow_ratio - prev_flow_ratio;
-                let temp = if flow_ratio_diff > Decimal::ZERO {
-                    (flow_ratio_diff + Decimal::ONE).ln()
-                } else {
-                    (flow_ratio_diff.abs() + Decimal::ONE).ln() * Decimal::NEGATIVE_ONE
-                };
+    fn calc_flow_ratio_2(_prev_flow_ratio: &Decimal, min_volume: &Decimal, trades: &mut FixedTimeRangeDeque<Trade>) -> Decimal {
+        let mut flow_in_value = Decimal::ZERO;
+        let mut flow_out_value = Decimal::ZERO;
+        for (index, trade_iter) in trades.deque.iter().enumerate() {
+            if index == 0 {
+                continue
+            }
 
-                self.flow_ratio_diff_log = temp * dec!(0.03) + self.flow_ratio_diff_log * dec!(0.97)
+            let prev_trade_iter = trades.deque.get(index - 1).unwrap();
+            let trade = trade_iter;
+            if trade.price > prev_trade_iter.price {
+                flow_in_value += trade.value;
+                // flow_in_value += Decimal::ONE;
+            } else if trade.price < prev_trade_iter.price {
+                flow_out_value += trade.value;
+                // flow_out_value += Decimal::ONE;
+            } else {
+                if trade.size > Decimal::ZERO {
+                    flow_in_value += trade.value;
+                } else {
+                    flow_out_value += trade.value;
+                }
             }
         }
+
+        // 使用EMA來更新資金流,確保平滑性
+        // let a = Decimal::TWO / dec!(50);
+        if flow_out_value + flow_in_value > *min_volume {
+            // let now = (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value);
+            // a * now + (Decimal::ONE - a) * prev_flow_ratio
+            (flow_in_value - flow_out_value) / (flow_out_value + flow_in_value)
+        } else {
+            Decimal::ZERO
+        }
+    }
+
+    pub fn update_flow_ratio(&mut self) {
+        self.flow_ratio_long = Self::calc_flow_ratio_2(&self.flow_ratio_long, &dec!(0), &mut self.trade_long_vec);
+        self.flow_ratio_short = Self::calc_flow_ratio(&self.flow_ratio_short, &dec!(0), &mut self.trade_long_vec);
     }
 
     pub fn check_ready(&mut self) {
@@ -320,11 +428,11 @@ impl AvellanedaStoikov {
             return;
         }
 
-        if self.optimal_ask_price <= self.ask_price {
+        if self.optimal_ask_price < self.ask_price {
             return;
         }
 
-        if self.optimal_bid_price >= self.bid_price {
+        if self.optimal_bid_price > self.bid_price {
             return;
         }
 
@@ -332,7 +440,7 @@ impl AvellanedaStoikov {
             return;
         }
 
-        if self.trade_vec.len() < 100 {
+        if self.trade_long_vec.len() < 100 {
             return;
         }
 
@@ -345,7 +453,7 @@ impl AvellanedaStoikov {
         self.update_t_diff();
         // info!(?self.t_diff);
         self.update_flow_ratio();
-        // info!(?self.flow_ratio);
+        // info!(?self.flow_ratio_long);
         self.update_sigma_square();
         // info!(?self.sigma_square);
         self.update_gamma();
@@ -355,7 +463,7 @@ impl AvellanedaStoikov {
         self.update_ref_price();
         // info!(?self.ref_price);
         self.update_delta();
-        // info!(?self.delta_ask, ?self.delta_bid);
+        // info!(?self.ask_delta, ?self.bid_delta);
         self.update_optimal_ask_and_bid();
         // info!("=============================================");
 
@@ -368,23 +476,23 @@ impl AvellanedaStoikov {
         cci.predictor_state_vec.push_back(PredictorState {
             update_time: Decimal::from_i64(Utc::now().timestamp_millis()).unwrap(),
 
-            mid_price: self.mid_price,
+            mid_price: self.last_price,
             ask_price: self.ask_price,
             bid_price: self.bid_price,
+            last_price: self.last_price,
             spread: self.spread,
-            spread_max: self.spread_max,
-            spread_min: self.spread_min,
+            spread_max: self.ask_delta,
+            spread_min: self.bid_delta,
             optimal_ask_price: self.optimal_ask_price,
             optimal_bid_price: self.optimal_bid_price,
 
             inventory: self.inventory,
-            sigma_square: self.sigma_square,
-            gamma: self.gamma,
-            kappa: self.kappa,
+            sigma_square: self.flow_ratio_long,
+            gamma: self.flow_ratio_short,
+            kappa: self.t_diff,
 
-            flow_ratio: self.flow_ratio_diff_log,
+            flow_ratio: self.flow_ratio_long,
             ref_price: self.ref_price,
-            last_price: self.mid_price,
         });
     }
 

+ 34 - 3
strategy/src/bybit_usdt_swap.rs

@@ -13,7 +13,7 @@ use standard::exchange::ExchangeEnum::BybitSwap;
 use standard::exchange_struct_handler::ExchangeStructHandler;
 use standard::{Depth, OrderBook};
 use crate::core::Core;
-use crate::exchange_disguise::{on_depth, on_trade};
+use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
 use crate::model::OrderInfo;
 
 // 参考 Bybit 合约 启动
@@ -28,12 +28,22 @@ pub(crate) async fn reference_bybit_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         let mut ws = BybitSwapWs::new_label(name, is_colo, None, BybitSwapWsType::Public);
         ws.set_subscribe(vec![
             BybitSwapSubscribeType::PuOrderBook1,
-            BybitSwapSubscribeType::PuTrade
+            BybitSwapSubscribeType::PuTrade,
+            BybitSwapSubscribeType::PuKline("1".to_string()),
+            // BybitSwapSubscribeType::PuTickers
         ]);
 
         // 读取数据
         let core_arc_clone = Arc::clone(&core_arc);
-        let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
+        let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
+        let multiplier = rest.get_self_market().multiplier;
+        let mut records = rest.get_record("1".to_string()).await.unwrap();
+        for record in records.iter_mut() {
+            let core_arc_clone = core_arc.clone();
+
+            on_record(core_arc_clone, record).await
+        }
+
         let depth_asks = Arc::new(Mutex::new(Vec::new()));
         let depth_bids = Arc::new(Mutex::new(Vec::new()));
 
@@ -158,6 +168,27 @@ async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &Re
                 on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
             }
         }
+        "tickers" => {
+            trace_stack.set_source("bybit_usdt_swap.tickers".to_string());
+            let ticker = ExchangeStructHandler::ticker_handle(BybitSwap, response).await;
+            trace_stack.on_after_format();
+
+            on_ticker(core_arc, &mut trace_stack, &ticker).await;
+        },
+        // k线数据
+        "kline" => {
+            let mut records = ExchangeStructHandler::records_handle(BybitSwap, &response);
+
+            if records.is_empty() {
+                return;
+            }
+
+            for record in records.iter_mut() {
+                let core_arc_clone = core_arc.clone();
+
+                on_record(core_arc_clone, record).await
+            }
+        },
         _ => {
             error!("未知推送类型");
             error!(?response);

+ 13 - 4
strategy/src/core.rs

@@ -18,7 +18,7 @@ use tracing::{error, info, warn};
 use global::cci::CentralControlInfo;
 use global::params::Params;
 use global::trace_stack::TraceStack;
-use standard::{Account, Depth, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker, Trade};
+use standard::{Account, Depth, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Record, SpecialTicker, Ticker, Trade};
 use standard::exchange::{Exchange};
 use standard::exchange::ExchangeEnum::{BinanceSwap, BybitSwap, GateSwap};
 
@@ -503,7 +503,7 @@ impl Core {
                         // 更新策略时间
                         self.strategy.local_time = Utc::now().timestamp_millis();
                         // trace_stack.on_before_strategy();
-                        let mut order = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders);
+                        let mut order = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders, &self.local_coin, &self.local_cash);
                         // trace_stack.on_after_strategy();
                         // 记录指令触发信息
                         if order.is_not_empty() {
@@ -612,6 +612,14 @@ impl Core {
         self.avellaneda_stoikov.on_trade(trade).await;
     }
 
+    pub async fn on_ticker(&mut self, ticker: &Ticker, _trace_stack: &mut TraceStack) {
+        self.avellaneda_stoikov.on_ticker(ticker).await;
+    }
+
+    pub async fn on_record(&mut self, record: &Record) {
+        self.avellaneda_stoikov.on_record(record).await;
+    }
+
     // #[instrument(skip(self, depth, name_ref, trace_stack), level="TRACE")]
     pub async fn on_depth(&mut self, depth: &Depth, name_ref: &String, trace_stack: &mut TraceStack) {
         // ================================ 刷新更新间隔 =========================================
@@ -637,7 +645,7 @@ impl Core {
             self.strategy.local_time = Utc::now().timestamp_millis();
 
             // 产生交易信号
-            let mut orders = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders);
+            let mut orders = self.strategy.do_strategy(&mut self.avellaneda_stoikov, &self.local_orders, &self.local_coin, &self.local_cash);
             trace_stack.on_after_strategy();
 
             if orders.is_not_empty() {
@@ -745,13 +753,14 @@ impl Core {
             let bp = self.tickers.get(i).unwrap().buy;
             let ap = self.tickers.get(i).unwrap().sell;
             ref_tickers.insert(i.clone(), Ticker {
-                time: 0,
+                time: Decimal::ZERO,
                 high: Default::default(),
                 low: Default::default(),
                 sell: ap,
                 buy: bp,
                 last: Default::default(),
                 volume: Default::default(),
+                open_interest: Default::default(),
             });
         }
         self.ref_price = self.avellaneda_stoikov.get_ref_price(&ref_tickers);

+ 14 - 1
strategy/src/exchange_disguise.rs

@@ -3,7 +3,7 @@ use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
 use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
-use standard::{Depth, Trade};
+use standard::{Depth, Record, Ticker, Trade};
 use crate::binance_usdt_swap::{binance_swap_run, reference_binance_swap_run};
 use crate::bybit_usdt_swap::{bybit_swap_run, reference_bybit_swap_run};
 use crate::coinex_usdt_swap::coinex_swap_run;
@@ -137,6 +137,19 @@ pub async fn on_trade(core_arc: Arc<Mutex<Core>>, label: &String, trace_stack: &
     core.on_trade(trade, &label, trace_stack).await;
 }
 
+pub async fn on_ticker(core_arc: Arc<Mutex<Core>>, trace_stack: &mut TraceStack, ticker: &Ticker) {
+    let mut core = core_arc.lock().await;
+    trace_stack.on_after_unlock_core();
+
+    core.on_ticker(ticker, trace_stack).await;
+}
+
+pub async fn on_record(core_arc: Arc<Mutex<Core>>, record: &Record) {
+    let mut core = core_arc.lock().await;
+
+    core.on_record(record).await;
+}
+
 pub async fn on_order() {}
 
 pub async fn on_position() {}

+ 20 - 10
strategy/src/strategy.rs

@@ -467,8 +467,8 @@ impl Strategy {
         predictor.optimal_ask_price = utils::fix_price(predictor.optimal_ask_price, self.tick_size);
         predictor.optimal_bid_price = utils::fix_price(predictor.optimal_bid_price, self.tick_size);
 
-        let delta_ask = predictor.delta_ask.clone();
-        let delta_bid = predictor.delta_bid.clone();
+        let delta_ask = predictor.ask_delta.clone();
+        let delta_bid = predictor.ask_delta.clone();
 
         // 开仓相关
         let avoid_ask = min(dec!(0.001), (delta_ask / predictor.mid_price) * dec!(0.1));
@@ -826,10 +826,10 @@ impl Strategy {
     pub fn _cancel_open(&self, command: &mut OrderCommand, local_orders: &HashMap<String, OrderInfo>) {
         // debug!(?command);
         // 挂单范围
-        let long_upper = self.open_dist[0];
-        let long_lower = self.open_dist[1];
-        let short_lower = self.open_dist[2];
-        let short_upper = self.open_dist[3];
+        // let long_upper = self.open_dist[0];
+        // let long_lower = self.open_dist[1];
+        // let short_lower = self.open_dist[2];
+        // let short_upper = self.open_dist[3];
 
         for order_client_id in local_orders.keys() {
             let order = local_orders.get(order_client_id).unwrap();
@@ -839,14 +839,16 @@ impl Strategy {
             // 开多订单处理
             if order.side == "kd".to_string() {
                 // 在价格范围内时不处理
-                if (order.price <= long_upper && order.price >= long_lower) || self.local_time - order.local_time <= 100 {
+                // if (order.price <= long_upper && order.price >= long_lower) || self.local_time - order.local_time <= 200 {
+                if self.local_time - order.local_time <= 200 {
                     continue
                 }
                 // debug!(?key, ?order.price, ?long_upper, ?long_lower);
                 command.cancel.insert(key, value);
             } else if order.side == "kk".to_string() { // 开空订单处理
                 // 在价格范围内时不处理
-                if (order.price >= short_lower && order.price <= short_upper) || self.local_time - order.local_time <= 100 {
+                // if (order.price >= short_lower && order.price <= short_upper) || self.local_time - order.local_time <= 200 {
+                if self.local_time - order.local_time <= 200 {
                     continue
                 }
                 // debug!(?key, ?order.price, ?short_lower, ?short_upper);
@@ -1083,7 +1085,15 @@ impl Strategy {
         return command;
     }
 
-    pub fn do_strategy(&mut self, predictor: &mut AvellanedaStoikov, local_orders: &HashMap<String, OrderInfo>) -> OrderCommand {
+    pub fn do_strategy(&mut self, predictor: &mut AvellanedaStoikov, local_orders: &HashMap<String, OrderInfo>, local_coin: &Decimal, local_cash: &Decimal) -> OrderCommand {
+        // 更新当前账户余额
+        self.coin = local_coin.clone();
+        self.cash = local_cash.clone();
+        self.equity = local_cash + local_coin * self.mp;
+        if self.equity > self.max_equity {
+            self.max_equity = self.equity;
+        }
+
         self.ref_ap = predictor.optimal_ask_price;
         self.ref_bp = predictor.optimal_bid_price;
         self.ref_price = predictor.ref_price;
@@ -1100,7 +1110,7 @@ impl Strategy {
         }
 
         self._cancel_open(&mut command, local_orders);              // 撤单命令处理
-        self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
+        // self._post_open(&mut command, local_orders, predictor);     // 限价单命令处理
         self._check_local_orders(&mut command, local_orders);       // 固定时间检查超时订单
         self._update_in_cancel(&mut command, local_orders);         // 更新撤单队列,是一个filter
         self._check_request_limit(&mut command);                    // 限制频率,移除不合规则之订单,是一个filter