Explorar o código

增加交易量统计功能

JiahengHe hai 8 meses
pai
achega
740e43fc61

+ 20 - 0
exchanges/src/coinex_swap_rest.rs

@@ -408,6 +408,26 @@ impl CoinexSwapRest {
         data
     }
 
+    // 查询已完成委托单
+    pub async fn finished_order(&mut self, market: String, page:i64 ,limit: i64)-> ResponseData {
+        let mut params = serde_json::json!({
+            "market_type": "FUTURES",
+            "page": 1,
+            "limit": 1000
+        });
+
+        if market != "" { params["market"] = serde_json::json!(market); }
+        if page > 0 { params["page"] = serde_json::json!(page); }
+        if limit > 0 { params["limit"] = serde_json::json!(limit); }
+
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/finished-order".to_string(),
+                                true,
+                                Some(params.to_string()), None).await;
+        data
+    }
+
     //查询合约账户变更历史
     pub async fn account_book(&mut self) -> ResponseData {
         error!("查询合约账户变更历史失败,无实现");

+ 2 - 1
exchanges/src/gate_swap_rest.rs

@@ -370,9 +370,10 @@ impl GateSwapRest {
     }
 
     //查询个人成交记录
-    pub async fn my_trades(&mut self, settle: String, contract: String, limit: i64) -> ResponseData {
+    pub async fn my_trades(&mut self, settle: String, contract: String, limit: i64, last_id: String) -> ResponseData {
         let mut params = serde_json::json!({
             "contract":contract,
+            "last_id": last_id,
             "limit":1000
         });
         if limit > 0 {

+ 33 - 2
src/core_libs.rs

@@ -6,9 +6,9 @@ use strategy::{exchange_disguise, core};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool};
 use std::time::Duration;
-use chrono::Utc;
+use chrono::{Local, Timelike, Utc};
 use tokio::sync::{mpsc, Mutex};
-use tokio::time::Instant;
+use tokio::time::{sleep_until, Instant};
 use tracing::{error, info};
 use global::cci::CentralControlInfo;
 use global::params::Params;
@@ -124,6 +124,37 @@ pub async fn init(params: Params,
         }
     });
 
+    // 定时交易量更新
+    let trade_volume_core_arc = core_arc.clone();
+    tokio::spawn(async move {
+        info!("1周交易量统计定时任务启动(1hour)...");
+        loop {
+            // 获取当前时间并计算下一个整点
+            let now = Local::now();
+            let truncated_now = now
+                .with_minute(0)
+                .unwrap()
+                .with_second(0)
+                .unwrap()
+                .with_nanosecond(0)
+                .unwrap();
+
+            let next_hour = truncated_now + chrono::Duration::hours(1);
+
+            // 计算需要等待的时间
+            let wait_duration = next_hour - now;
+            let wait_secs = wait_duration.num_seconds() as u64;
+
+
+            // 等待到下一个整点
+            sleep_until(Instant::now() + std::time::Duration::from_secs(wait_secs)).await;
+
+            // 更新近7天的交易量统计
+            let mut core = trade_volume_core_arc.lock().await;
+            core.update_trade_volume().await;
+        }
+    });
+
     // 定时仓位检测
     // let markt_price_core_arc = core_arc.clone();
     // tokio::spawn(async move {

+ 4 - 0
standard/src/binance_swap.rs

@@ -145,6 +145,10 @@ impl Platform for BinanceSwap {
         }
     }
 
+    async fn get_trade_amount(&mut self, _start_time: i64, _end_time: i64) -> Result<Decimal, Error> {
+        Ok(Decimal::ZERO)
+    }
+
     // 获取市场行情
     async fn get_ticker(&mut self) -> Result<Ticker, Error> {
         let symbol_format = utils::format_symbol(self.symbol.clone(), "");

+ 4 - 0
standard/src/bitget_swap.rs

@@ -207,6 +207,10 @@ impl Platform for BitgetSwap {
         Ok(positions)
     }
 
+    async fn get_trade_amount(&mut self, _start_time: i64, _end_time: i64) -> Result<Decimal, Error> {
+        Ok(Decimal::ZERO)
+    }
+
     async fn get_ticker(&mut self) -> Result<Ticker, Error> {
         return self.get_ticker_symbol(self.symbol.clone()).await
     }

+ 5 - 0
standard/src/bybit_swap.rs

@@ -170,6 +170,11 @@ impl Platform for BybitSwap {
             Err(Error::new(ErrorKind::Other, res_data.to_string()))
         }
     }
+
+    async fn get_trade_amount(&mut self, _start_time: i64, _end_time: i64) -> Result<Decimal, Error> {
+        Ok(Decimal::ZERO)
+    }
+
     // 获取市场行情
     async fn get_ticker(&mut self) -> Result<Ticker, Error> {
         let symbol = self.symbol_uppercase.clone();

+ 68 - 0
standard/src/coinex_swap.rs

@@ -7,6 +7,7 @@ use futures::stream::FuturesUnordered;
 use futures::TryStreamExt;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
+use serde::{Deserialize, Serialize};
 use serde_json::{Value};
 use tokio::spawn;
 use tokio::time::Instant;
@@ -29,6 +30,39 @@ pub struct CoinexSwap {
     error_sender: Sender<Error>,
 }
 
+/// TradesSwap
+/// - `id`: i64, 成交记录 ID
+/// - `create_time`: i64, 成交时间
+/// - `contract`: String, 合约标识
+/// - `order_id`: String, 成交记录关联订单 ID
+/// - `size`: i64, 成交数量
+/// - `price`: String, 成交价格
+/// - `text`: String, 成交角色, taker - 吃单, maker - 做单
+/// - `fee`: String, 订单的自定义信息
+/// - `point_fee`: String, 成交手续费
+/// - `role`: String, 成交点卡手续费
+#[derive(Debug, Clone, Deserialize, Serialize)]
+struct TradesSwap {
+    order_id: i64,
+    market: String,
+    market_type: String,
+    side: String,
+    #[serde(rename = "type")]
+    trade_type: String,
+    amount: String,
+    price: String,
+    unfilled_amount: String,
+    filled_amount: String,
+    filled_value: String,
+    client_id: String,
+    fee: String,
+    fee_ccy: String,
+    maker_fee_rate: String,
+    taker_fee_rate: String,
+    created_at: i64,
+    updated_at: i64,
+}
+
 impl CoinexSwap {
     pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> CoinexSwap {
         let market = Market::new();
@@ -168,6 +202,40 @@ impl Platform for CoinexSwap {
             Err(Error::new(ErrorKind::Other, res_data.to_string()))
         }
     }
+    
+    // 获取指定时间内的交易量
+    async fn get_trade_amount(&mut self, start_time: i64, end_time: i64) -> Result<Decimal, Error> {
+        let mut data_array = vec![];
+        let mut page = 1;
+        loop {
+            let res_data = self.request.finished_order("".to_string(), page, 1000).await;
+            if res_data.code == 200 {
+                let trades_info: Vec<TradesSwap> = serde_json::from_str(&res_data.data.to_string()).unwrap();
+                if trades_info.is_empty(){
+                    break;
+                }
+                let last_time = (trades_info.last().unwrap().created_at).to_i64().unwrap();
+                data_array.extend(trades_info);
+
+                if last_time < start_time || last_time > end_time {
+                    data_array = data_array.iter().filter(|item| {
+                        let now_time = item.created_at;
+                        now_time > start_time && now_time < end_time
+                    }).cloned().collect();
+                    break;
+                } else {
+                    page = page + 1;
+                }
+            }
+        }
+        let mut amount = Decimal::ZERO;
+        for item in data_array.iter() {
+            let filled_value = Decimal::from_str(&item.filled_value).unwrap_or(Decimal::ZERO);
+            amount = filled_value + amount;
+        }
+        Ok(amount)
+    }
+
     // 获取市场行情
     async fn get_ticker(&mut self) -> Result<Ticker, Error> {
         let symbol: String = self.symbol.replace("_", "");

+ 59 - 0
standard/src/gate_swap.rs

@@ -7,6 +7,7 @@ use futures::stream::FuturesUnordered;
 use futures::TryStreamExt;
 use rust_decimal::Decimal;
 use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
+use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
 use tokio::spawn;
 use tokio::time::Instant;
@@ -28,6 +29,31 @@ pub struct GateSwap {
     error_sender: Sender<Error>,
 }
 
+/// TradesSwap
+/// - `id`: i64, 成交记录 ID
+/// - `create_time`: i64, 成交时间
+/// - `contract`: String, 合约标识
+/// - `order_id`: String, 成交记录关联订单 ID
+/// - `size`: i64, 成交数量
+/// - `price`: String, 成交价格
+/// - `text`: String, 成交角色, taker - 吃单, maker - 做单
+/// - `fee`: String, 订单的自定义信息
+/// - `point_fee`: String, 成交手续费
+/// - `role`: String, 成交点卡手续费
+#[derive(Debug, Clone, Deserialize, Serialize)]
+struct TradesSwap {
+    id: i64,
+    create_time: f64,
+    contract: String,
+    order_id: String,
+    size: i64,
+    price: String,
+    text: String,
+    fee: String,
+    point_fee: String,
+    role: String,
+}
+
 impl GateSwap {
     pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> GateSwap {
         let market = Market::new();
@@ -151,6 +177,39 @@ impl Platform for GateSwap {
             Err(Error::new(ErrorKind::Other, res_data.to_string()))
         }
     }
+
+    async fn get_trade_amount(&mut self, start_time: i64, end_time: i64) -> Result<Decimal, Error> {
+        let mut data_array = vec![];
+        let mut last_id = "".to_string();
+        loop {
+            let res_data = self.request.my_trades("usdt".to_string(), "".to_string(), 1000, last_id.clone()).await;
+            if res_data.code == 200 {
+                let trades_info: Vec<TradesSwap> = serde_json::from_str(&res_data.data.to_string()).unwrap();
+                if trades_info.len() == 0 {
+                    break;
+                }
+                let last_time = (trades_info.last().unwrap().create_time * 1000.0).to_i64().unwrap();
+                last_id = trades_info.last().unwrap().id.to_string();
+                // 写入存储数组中
+                data_array.extend(trades_info);
+                // 当最后一个时间戳小于开始时间或者大于结束时间,则跳出循环
+                if last_time < start_time || last_time > end_time {
+                    data_array = data_array.iter().filter(|item| {
+                        (item.create_time*1000.0).to_i64().unwrap_or(0) > start_time
+                    }).cloned().collect();
+                    break;
+                }
+            }
+        }
+        let mut amount = Decimal::ZERO;
+        for item in data_array.iter() {
+            let filled_price = Decimal::from_str(&item.price).unwrap_or(Decimal::ZERO);
+            let filled_size = Decimal::from_i64(item.size.abs()).unwrap_or(Decimal::ZERO);
+            amount = filled_size * filled_price + amount;
+        }
+        Ok(amount)
+    }
+
     // 获取市场行情
     async fn get_ticker(&mut self) -> Result<Ticker, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();

+ 4 - 0
standard/src/htx_swap.rs

@@ -175,6 +175,10 @@ impl Platform for HtxSwap {
         Ok(result)
     }
 
+    async fn get_trade_amount(&mut self, _start_time: i64, _end_time: i64) -> Result<Decimal, Error> {
+        Ok(Decimal::ZERO)
+    }
+
     async fn get_ticker(&mut self) -> Result<Ticker, Error> {
         return self.get_ticker_symbol(self.symbol.clone()).await;
     }

+ 4 - 0
standard/src/kucoin_swap.rs

@@ -147,6 +147,10 @@ impl Platform for KucoinSwap {
         }
     }
 
+    async fn get_trade_amount(&mut self, _start_time: i64, _end_time: i64) -> Result<Decimal, Error> {
+        Ok(Decimal::ZERO)
+    }
+
     async fn get_ticker(&mut self) -> Result<Ticker, Error> {
         let symbol_mapper = utils::symbol_enter_mapper(ExchangeEnum::KucoinSwap, self.symbol.as_str());
         let symbol_format = format!("{}M", utils::format_symbol(symbol_mapper.clone(), ""));

+ 2 - 0
standard/src/lib.rs

@@ -536,6 +536,8 @@ pub trait Platform {
     async fn get_position(&mut self) -> Result<Vec<Position>, Error>;
     // 获取所有持仓
     async fn get_positions(&mut self) -> Result<Vec<Position>, Error>;
+    // 获取交易量
+    async fn get_trade_amount(&mut self, start_time: i64, end_time: i64) -> Result<Decimal, Error>;
     // 获取市场行情
     async fn get_ticker(&mut self) -> Result<Ticker, Error>;
     // 获取市场行情自定义交易对

+ 34 - 1
strategy/src/core.rs

@@ -5,7 +5,7 @@ use std::io::Error;
 use std::str::FromStr;
 use std::sync::{Arc};
 use std::sync::atomic::{AtomicBool, Ordering};
-use std::time::Duration;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use chrono::{Utc};
 use rust_decimal::{Decimal, MathematicalOps};
 use rust_decimal::prelude::{ToPrimitive};
@@ -122,6 +122,8 @@ pub struct Core {
     pub sigma_vec: VecDeque<Decimal>,  // 波动率记录
     pub is_sigma_abnormal: bool,  // 是否sigma反常
     pub is_sigma_allow_open: bool, // 是否允许开单
+
+    pub trading_volume: Decimal, // 交易量统计
 }
 
 impl Core {
@@ -262,6 +264,7 @@ impl Core {
             sigma_vec: VecDeque::with_capacity(100),
             is_sigma_abnormal: false,
             is_sigma_allow_open: true,
+            trading_volume: Default::default(),
         };
         for i in 0..=params.ref_exchange.len() - 1 {
             // 拼接不会消耗原字符串
@@ -573,6 +576,32 @@ impl Core {
         }
     }
 
+    pub async fn update_trade_volume(&mut self) {
+        // 获取当前时间戳(毫秒)
+        let end_time = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards")
+            .as_millis() as i64;
+
+        // 计算7天前的毫秒数
+        let seven_days_millis = 7 * 24 * 60 * 60 * 1000;
+
+        // 计算开始时间戳
+        let start_time = end_time - seven_days_millis;
+        let trade_volume = self.platform_rest.get_trade_amount(start_time, end_time).await;
+
+        match trade_volume {
+            Ok(volume) => {
+                info!("交易量定时更新成功: {}", volume);
+                self.trading_volume = volume;
+            },
+            Err(e) => {
+                error!("获取交易量失败: {}", e);
+                self.trading_volume = Decimal::ZERO;
+            }
+        }
+    }
+
     // #[instrument(skip(self), level="TRACE")]
     pub fn _print_local_trades_summary(&mut self) {
         // 计算本地累计利润
@@ -1690,6 +1719,10 @@ impl Core {
 
         // 清空挂单和仓位
         self.clear_position_and_orders(self.hold_coin).await;
+
+        // 过去一周交易量统计
+        self.update_trade_volume().await;
+        info!("获取近一周交易量成功!交易量: {}", self.trading_volume);
         /*
         ###### 交易前准备就绪 可以开始交易 ######
         self.loop.create_task(self.rest.go())