Ver Fonte

Merge remote-tracking branch 'origin/dev' into fresh

JiahengHe há 1 ano atrás
pai
commit
076afabdbb

+ 2 - 2
Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "as-rust"
-version = "0.1.0"
+version = "1.4.4"
 edition = "2021"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -14,7 +14,7 @@ tokio = { version = "1.31.0", features = ["full"] }
 chrono = "0.4.26"
 tracing = "0.1"
 tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
-tracing-appender = "0.2.2"
+tracing-appender-timezone = { git = "https://github.com/skyfffire/tracing-appender-timezone.git" }
 serde = { version = "1.0.188", features = ["derive"] }
 actix-rt = "2.5.0"
 actix-web = "4.0.0-beta.12"

+ 2 - 2
exchanges/tests/bybit_swap_test.rs

@@ -8,8 +8,8 @@ use tracing::trace;
 use exchanges::bybit_swap_rest::BybitSwapRest;
 use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
 
-const ACCESS_KEY: &str = "JKHMEL6kD7I7WjbHKP";
-const SECRET_KEY: &str = "jmofU9X9PjzGZ8BlO0xZLcUzImHE2CaTSQ3Y";
+const ACCESS_KEY: &str = "";
+const SECRET_KEY: &str = "";
 
 
 //ws-订阅公共频道信息

+ 7 - 2
global/Cargo.toml

@@ -9,8 +9,13 @@ edition = "2021"
 rust_decimal = "1.32.0"
 rust_decimal_macros = "1.32.0"
 tracing = "0.1"
-tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
-tracing-appender = "0.2.2"
+tracing-subscriber = { version = "0.3.17", features = [
+    "env-filter",
+    "time",
+    "local-time"
+] }
+time = { version = "0.3.7", features = ["macros"] }
+tracing-appender-timezone = { git = "https://github.com/skyfffire/tracing-appender-timezone.git" }
 toml = "0.5.11"
 serde = "1.0.183"
 serde_derive = "1.0"

+ 20 - 29
global/src/log_utils.rs

@@ -1,35 +1,14 @@
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io;
-use chrono::{Datelike, FixedOffset, Timelike, Utc};
 use tracing::{Event, Subscriber, warn};
-use tracing_appender::non_blocking::WorkerGuard;
+use tracing_appender_timezone::non_blocking::WorkerGuard;
 use tracing_subscriber::{fmt, Layer};
-use tracing_subscriber::fmt::format::Writer;
-use tracing_subscriber::fmt::time::FormatTime;
 use tracing_subscriber::layer::{Context, SubscriberExt};
 use reqwest::{Client};
-use rust_decimal::prelude::ToPrimitive;
 use tracing::field::{Field, Visit};
+use tracing_appender_timezone::rolling::{RollingFileAppender, Rotation};
 
-// 用來格式化日誌的輸出時間格式
-struct LocalTimer;
-
-impl FormatTime for LocalTimer {
-    fn format_time(&self, w: &mut Writer<'_>) -> std::fmt::Result {
-        let now = Utc::now().with_timezone(&FixedOffset::east_opt(8 * 3600).unwrap());
-        write!(
-            w,
-            "{:02}-{:02} {:02}:{:02}:{:02}.{:03}",
-            now.month(),
-            now.day(),
-            now.hour(),
-            now.minute(),
-            now.second(),
-            now.nanosecond() / 1e6.to_u32().unwrap()
-        )
-    }
-}
 
 struct ErrorMessageVisitor {
     message: String
@@ -105,19 +84,31 @@ pub fn final_init(level: &str, port: u32, account_name: String) -> WorkerGuard {
     path.push_str("./logs");
     path.push_str(port.to_string().as_str());
 
-    let file_appender = tracing_appender::rolling::daily(path, "as-debug.log");
-    let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
+    let file_appender = RollingFileAppender::builder()
+        .time_zone(8)
+        .rotation(Rotation::DAILY)
+        .filename_suffix("log")
+        .build(path)
+        .expect("initializing rolling file appender failed");
+    let (non_blocking, guard) = tracing_appender_timezone::non_blocking(file_appender);
+
+    use time::{macros::format_description, UtcOffset};
+    use tracing_subscriber::{fmt::time::OffsetTime};
+    let local_time = OffsetTime::new(
+        UtcOffset::from_hms(8, 0, 0).unwrap(),
+        format_description!("[month]-[day] [hour]:[minute]:[second].[subsecond digits:3]"),
+    );
 
     let fmt_layer = fmt::layer()
-        .with_timer(LocalTimer)
-        .with_target(true)
+        .with_timer(local_time.clone())
+        .with_target(false)
         .with_level(true)
         .with_writer(io::stdout)
         .with_span_events(fmt::format::FmtSpan::FULL);
 
     let file_layer = fmt::layer()
-        .with_timer(LocalTimer)
-        .with_target(true)
+        .with_timer(local_time.clone())
+        .with_target(false)
         .with_ansi(false)
         .with_level(true)
         .with_writer(non_blocking.clone())

+ 4 - 12
src/main.rs

@@ -6,7 +6,7 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use tracing::{info, warn};
-use tracing_appender::non_blocking::WorkerGuard;
+use tracing_appender_timezone::non_blocking::WorkerGuard;
 use global::log_utils::send_remote_err_log;
 use global::params::Params;
 
@@ -75,16 +75,8 @@ async fn main() {
     ws_running.store(false, Ordering::Relaxed);
     tokio::time::sleep(Duration::from_secs(1)).await;
 
-    info!("等待其他线程后续处理完毕(再次按control c可以立马结束)……");
-    tokio::spawn(async move {
-        let mut quant = quant_arc.lock().await;
-        quant.exit(0).await;
-    });
-    let mut i = 5;
-    while i > 0 {
-        tokio::time::sleep(Duration::from_secs(1)).await;
-        info!("{}", i);
-        i = i - 1;
-    }
+    info!("等待清空仓位、订单(再次按control c可以立马结束)……");
+    let mut quant = quant_arc.lock().await;
+    quant.exit().await;
     info!("程序已退出!为以防万一,请再次检查仓位和订单!");
 }

+ 19 - 20
strategy/src/binance_spot.rs

@@ -10,7 +10,6 @@ use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::BinanceSpot;
 use crate::exchange_disguise::on_special_depth;
-use crate::model::{OriginalTradeBa};
 use crate::quant::Quant;
 
 // 参考 币安 现货 启动
@@ -92,8 +91,8 @@ pub async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>,
 
 async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  update_flag_u: &mut Decimal,
-                 max_buy: &mut Decimal,
-                 min_sell: &mut Decimal,
+                 _max_buy: &mut Decimal,
+                 _min_sell: &mut Decimal,
                  data: ResponseData) {
     let mut trace_stack = TraceStack::default();
     trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
@@ -103,23 +102,23 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         return;
     }
     if data.channel == "aggTrade" {
-        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-        let str = data.label.clone();
-        let mut quant = bot_arc_clone.lock().await;
-        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
-            *max_buy = Decimal::ZERO;
-            *min_sell = Decimal::ZERO;
-            quant.is_update.remove(str.as_str());
-        }
-        if trade.p > *max_buy || *max_buy == Decimal::ZERO{
-            *max_buy = trade.p
-        }
-        if trade.p < *min_sell || *min_sell == Decimal::ZERO{
-            *min_sell = trade.p
-        }
-        {
-            quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-        }
+        // let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
+        // let str = data.label.clone();
+        // let mut quant = bot_arc_clone.lock().await;
+        // if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap() {
+        //     *max_buy = Decimal::ZERO;
+        //     *min_sell = Decimal::ZERO;
+        //     quant.is_update.remove(str.as_str());
+        // }
+        // if trade.p > *max_buy || *max_buy == Decimal::ZERO{
+        //     *max_buy = trade.p
+        // }
+        // if trade.p < *min_sell || *min_sell == Decimal::ZERO{
+        //     *min_sell = trade.p
+        // }
+        // {
+        //     quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+        // }
     } else if data.channel == "bookTicker" {
         trace_stack.on_before_format();
         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(BinanceSpot, data.clone());

+ 1 - 1
strategy/src/binance_usdt_swap.rs

@@ -28,7 +28,7 @@ pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
         ws.set_subscribe(vec![
             // BinanceSwapSubscribeType::PuDepth20levels100ms,
             BinanceSwapSubscribeType::PuBookTicker,
-            BinanceSwapSubscribeType::PuAggTrade
+            // BinanceSwapSubscribeType::PuAggTrade
         ]);
 
         //读取数据

+ 73 - 37
strategy/src/quant.rs

@@ -1131,35 +1131,59 @@ impl Quant {
     }
 
     #[instrument(skip(self, target_hold_coin), level="TRACE")]
-    pub async fn check_position(&mut self, target_hold_coin: Decimal) {
-        info!("清空挂单!");
+    pub async fn check_position(&mut self, target_hold_coin: Decimal) -> bool {
+        let mut is_clear = false;
+
+        info!("------------------------------------------------------------------------------------------------------------");
+        info!("步骤一:检查挂单:");
         match self.platform_rest.cancel_orders_all().await {
             Ok(val) => {
-                info!("清空所有挂单,{:?}", val);
+                let length = val.len();
+                is_clear = length == 0;
+
+                info!("已清空所有挂单({}条)", length);
+
+                for o in val {
+                    info!("    {:?}", o);
+                }
             }
             Err(err) => {
-                error!("取消所有订单异常: {}",err);
+                warn!("取消所有订单异常({}),启动备用方法。", err);
+
                 match self.platform_rest.cancel_orders().await {
                     Ok(val) => {
-                        info!("清空当前币对挂单,{:?}", val);
+                        let length = val.len();
+                        is_clear = length == 0;
+
+                        info!("清空所有挂单({}条):{:?}", length, val);
                     }
                     Err(exc) => {
-                        error!("清空当前币对订单异常: {}",exc);
+                        error!("清空当前币对订单异常: {}", exc);
                     }
                 }
             }
         }
+        info!("挂单检查完毕。");
+        info!("");
+
+        info!("步骤二:检查仓位:");
         if self.exchange.contains("spot") { // 现货
-            self.check_position_spot(target_hold_coin).await;
+            is_clear = is_clear && (self.check_position_spot(target_hold_coin.clone()).await == 0);
+            info!("检查遗漏仓位(现货),目标持仓:{}USDT", target_hold_coin);
         } else { // 合约
-            self.check_position_swap().await;
+            is_clear = is_clear && (self.check_position_swap().await == 0);
+            info!("遗漏仓位检查完毕(合约)!");
         }
-        info!("遗留仓位检测完毕");
+        info!("------------------------------------------------------------------------------------------------------------");
+        info!("");
+
+        return is_clear;
     }
 
     #[instrument(skip(self, target_hold_coin), level="TRACE")]
-    pub async fn check_position_spot(&mut self, target_hold_coin: Decimal) {
-        info!("---------------------------检查遗漏仓位(现货),目标持仓:{}USDT---------------------------", target_hold_coin);
+    pub async fn check_position_spot(&mut self, target_hold_coin: Decimal) -> usize {
+        let mut length = 0;
+
         match self.platform_rest.get_spot_account().await {
             Ok(mut val) => {
                 // 如果返回的数组里没有交易货币,则补充交易货币
@@ -1204,8 +1228,13 @@ impl Quant {
                                 // price = mp*0.999;
                                 amount = -diff / mp;
                             } else {
+                                // 不需要调整说明没有仓位了。
+
                                 continue;
                             }
+                            // 需要调整说明有仓位。
+                            length = 1;
+
                             info!(?ticker);
                             info!("需要调整现货仓位 {}USDT(目标:{}USDT) 共计{}{}。", diff, _hold_coin, amount, coin_name);
                             let mut ts = TraceStack::default();
@@ -1241,19 +1270,24 @@ impl Quant {
             }
         }
         info!("---------------------------遗漏仓位检查完毕(现货)!-----------------------------------");
+
+        return length;
     }
 
     #[instrument(skip(self), level="TRACE")]
-    pub async fn check_position_swap(&mut self) {
-        info!("---------------------------检查遗漏仓位(合约)!-----------------------------------");
+    pub async fn check_position_swap(&mut self) -> usize {
+        let mut length = 0;
         match self.platform_rest.get_positions().await {
             Ok(val) => {
+                info!("检查仓位信息({}条仓位信息,部分交易所会返回0持仓的):", length);
+
                 for position in val {
                     info!("{:?}", position);
                     if position.amount.eq(&Decimal::ZERO) {
                         continue;
                     }
-                    info!("仓位获取到:{:?}", position);
+                    length = length + 1;
+                    info!("    仓位:{:?}", position);
                     match self.platform_rest.get_ticker_symbol(position.symbol.clone()).await {
                         Ok(ticker) => {
                             let ap = ticker.sell;
@@ -1268,7 +1302,7 @@ impl Quant {
                                     market_info = market;
                                 }
                                 Err(err) => {
-                                    error!("{} 获取当前market异常: {}", position.symbol.clone(), err);
+                                    error!("    {} 获取当前market异常: {}", position.symbol.clone(), err);
                                     continue;
                                 }
                             }
@@ -1285,7 +1319,7 @@ impl Quant {
                                     side = "pk";
                                 }
                                 _ => {
-                                    info!("仓位position_mode匹配失败,不做操作!");
+                                    error!("    仓位position_mode匹配失败,不做操作!");
                                     // 执行完当前币对  结束循环
                                     continue;
                                 }
@@ -1297,29 +1331,32 @@ impl Quant {
                             match self.platform_rest.take_order_symbol(position.symbol.clone(), Decimal::ONE, utils::generate_client_id(None).as_str(), side, price, position.amount.abs()).await {
                                 Ok(order) => {
                                     ts.on_after_send();
-                                    info!("{}仓位清除下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
+                                    info!("    {}仓位清除下单成功 {:?}, {}", position.symbol.clone(), order, ts.to_string());
                                     // 执行完当前币对  结束循环
                                     continue;
                                 }
                                 Err(error) => {
                                     ts.on_after_send();
-                                    error!("{}仓位清除下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
+                                    error!("    {}仓位清除下单异常 {}, {}", position.symbol.clone(), error, ts.to_string());
                                     // 执行完当前币对  结束循环
                                     continue;
                                 }
                             };
                         }
                         Err(err) => {
-                            error!("{} 获取当前ticker异常: {}", position.symbol.clone(), err)
+                            error!("    {} 获取当前ticker异常: {}", position.symbol.clone(), err)
                         }
                     }
                 }
             }
             Err(error) => {
+                length = 0;
+
                 error!("获取仓位信息异常: {}", error);
             }
         }
-        info!("---------------------------遗漏仓位检查完毕(合约)!-----------------------------------");
+
+        return length
     }
 
 
@@ -1348,24 +1385,23 @@ impl Quant {
         // info!("退出进程!");
     }
 
-    #[instrument(skip(self, delay), level="TRACE")]
-    pub async fn exit(&mut self, delay: i8) {
-        info!("--------------------------------------------------");
-        info!("预约退出操作 delay:{}", delay);
-        if delay > 0i8 {
-            sleep(Duration::from_secs(delay as u64)).await;
+    #[instrument(skip(self), level="TRACE")]
+    pub async fn exit(&mut self) {
+        info!("-------------------------启动退出流程----------------------------");
+        info!("");
+
+        // 循环清空仓位,如若彻底清空,才进行退出。
+        let mut clear_count = 1;
+        while !self.check_position(Decimal::ZERO).await {
+            sleep(Duration::from_secs(1)).await;
+
+            clear_count += 1;
+            info!("清理指令发送完毕,启动第{}次检查。", clear_count);
+            info!("");
         }
-        info!("开始退出操作");
-        info!("为避免api失效导致遗漏仓位 建议人工复查");
-        self.check_position(Decimal::ZERO).await;
-        sleep(Duration::from_secs(2)).await;
-        info!("双重检查遗漏仓位");
-        self.check_position(Decimal::ZERO).await;
-        info!("停机退出  停机原因: {}", self.exit_msg);
-        // 发送交易状态 await self._post_params()
-        // TODO: 向中控发送信号
-        self.running.store(false, Ordering::Relaxed);
-        info!("退出进程!");
+
+        info!("订单、仓位清除完毕,为避免api失效导致遗漏仓位,建议人工复查。");
+        info!("停机原因:{}。", self.exit_msg);
     }
 
     #[instrument(skip(self), level="TRACE")]