import asyncio from aiohttp import web import traceback import time, csv import strategy as strategy import utils import model import logging, logging.handlers import signal import os, json, sys import predictor import backtest import multiprocessing import random import psutil import ujson import broker from decimal import Decimal from loguru import logger VERSION = utils.VERSION def timeit(func): def wrapper(*args, **kwargs): nowTime = time.time() res = func(*args, **kwargs) spend_time = time.time() - nowTime spend_time = round(spend_time * 1000, 5) print(f'{func.__name__} 耗时 {spend_time} ms') return res return wrapper class Quant: def __init__(self, params:model.Config, logname="test_logname", father=1): print('############### 超级无敌韭菜收割机 ################') print(f'>>> 版本 {VERSION} <<<') print('*** 当前配置') self.params = params for p in self.params.__dict__: print('***', p, ' => ', getattr(self.params, p)) print('##################################################') self.logger = self.get_logger(logname) self.csvname = logname + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) pid = os.getpid() self.pid_start_time = time.time() self.logger.info(f"进程号{pid} 启动时间{self.pid_start_time}") ##### 绑定cpu cpu_count = psutil.cpu_count() print("检测cpu核心负载") cpu_used_pct = [0.0 for _ in range(cpu_count)] for _ in range(random.randint(5,15)): r = psutil.cpu_times_percent(percpu=True) for i in range(cpu_count): cpu_used_pct[i] += int(r[i].user) time.sleep(0.1) print(cpu_used_pct) cpu_id = cpu_used_pct.index(min(cpu_used_pct)) print(f"当前负载最低的cpu为:{cpu_id}") self.process = psutil.Process(pid) print(f"核心数{cpu_count} 目标绑定cpu:{cpu_id}") os.system(f"taskset -cp {cpu_id} {pid}") print("调整系统调度优先级为最高等级") if 'win' not in sys.platform: print(os.nice(-20)) #### cpu 内存 平均占用 self.cpu_ema = 0.0 self.mm_ema = 0.0 ##### self.acct_name = self.params.account_name self.symbol = self.params.pair self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() if 1: ### 使用uvloop if 'win' not in sys.platform: print('采用高速事件循环库') import uvloop self.loop = uvloop.new_event_loop() else: print('采用普通事件循环库') self.loop = asyncio.get_event_loop() else: ### 使用原生loop self.loop = asyncio.get_event_loop() self.strategy = strategy.Strategy(self.params, is_print=1) ###### 判断启动方式 self.father = father print(f"父进程标识 {self.father}") ##### 现货底仓 hold_coin = float(self.params.hold_coin) self.hold_coin = utils.clip(hold_coin, 0.0, 10000.0) ##### 本地状态量 self.data = dict() self.total_equity = 0.0 self.local_orders = dict() # 本地挂单表 self.local_orders_backup = dict() # 本地订单缓存队列 self.local_orders_backup_cid = [] # 本地订单缓存cid队列 self.handled_orders_cid = [] # 本地已处理cid缓存队列 self.local_profit = 0.0 self.local_cash = 0.0 # 本地U保证金 self.local_coin = 0.0 # 本地币保证金 self.local_position = model.Position() self.local_position_by_orders = model.Position() self.local_buy_amount = 0.0 self.local_sell_amount = 0.0 self.local_buy_value = 0.0 self.local_sell_value = 0.0 self.local_cancel_log = dict() self.interval = float(self.params.interval) self.exchange = self.params.exchange self.tradeMsg = model.TraderMsg() self.exit_msg = "正常退出" self.save = int(self.params.save) # 保存行情数据 self.logger.info(f"实时行情数据记录开关:{self.save}") # 仓位检查结果序列 self.position_check_series = [] # 止损大小 self.stoploss = float(self.params.stoploss) # 资金使用率 self.used_pct = float(self.params.used_pct) #使用资金比例 # 启停信号 0 表示运行 大于1开始倒计时 1时停机 self.mode_signal = 0 # 交易盘口订单流更新时间 self.trade_order_update_time = time.time() # onTick触发时间记录 self.on_tick_event_time = time.time() # 盘口ticker depth信息 self.tickers = dict() self.depths = dict() # 行情更新延迟监控 self.market_update_time = dict() self.market_update_interval = dict() # 参考盘口名称 refex = self.params.refexchange refpair = self.params.refpair if len(refex) != len(refpair): self.logger.error("参考盘口数不等于参考品种数 退出") raise Exception("参考盘口数不等于参考品种数 退出") self.ref_num = len(refex) self.ref_name = [] for i in range(self.ref_num): if refex[i] not in broker.exchange_lists: self.logger.error("出现不支持的参考盘口") raise Exception("出现不支持的参考盘口") name = refex[i] + '@' + refpair[i] + '@ref' self.ref_name.append(name) self.tickers[name] = dict() self.depths[name] = list() self.market_update_time[name] = 0.0 self.market_update_interval[name] = 0.0 # 参考盘口tick更新时间 # 服务器私有ip地址检查 ipList = utils.get_local_ip_list() ipListNum = len(ipList) # if int(self.params.ip) >= ipListNum: # raise Exception("指定私有ip地址序号不存在") # 用于创建ws实例 name = self.exchange+'@'+self.params.pair self.trade_name = name self.market_update_time[name] = 0.0 self.market_update_interval[name] = 0.0 self.tickers[name] = dict() self.depths[name] = list() cp = model.ClientParams() cp.name = self.trade_name cp.pair = self.params.pair cp.access_key = self.params.access_key cp.secret_key = self.params.secret_key cp.pass_key = self.params.pass_key cp.interval = self.params.interval cp.broker_id = self.params.broker_id cp.debug = self.params.debug cp.proxy = self.params.proxy cp.ip = int(self.params.ip) self.ws = broker.newWs(self.exchange)( params=cp, colo=int(self.params.colo), is_print=0, ) self.ws.logger = self.logger self.ready = 0 # rest实例 self.rest = broker.newRest(self.exchange)(cp, colo=int(self.params.colo)) self.ws_ref = dict() # 参考盘口 ws 实例 for i in range(self.ref_num): cp = model.ClientParams() cp.name = self.ref_name[i] cp.pair = self.params.refpair[i] cp.proxy = self.params.proxy cp.interval = self.params.interval cp.ip = int(self.params.ip) exchange = self.params.refexchange[i] if exchange not in broker.exchange_lists: self.logger.error("参考盘口名称错误 退出") return _colo = 0 if self.params.refexchange[i] == self.params.exchange and \ self.params.refpair[i] == self.params.pair and int(self.params.colo): _colo = 1 self.ws_ref[self.ref_name[i]] = broker.newWs(exchange)(cp, colo=_colo) self.ws_ref[self.ref_name[i]].callback['onTicker']=self.update_ticker self.ws_ref[self.ref_name[i]].callback['onDepth']=self.update_depth self.ws_ref[self.ref_name[i]].logger = self.logger # 添加回调 self.ws.callback = { 'onTicker':self.update_ticker, 'onDepth':self.update_depth, 'onPosition':self.update_position, 'onEquity':self.update_equity, 'onOrder':self.update_order, 'onExit':self.update_exit, } self.rest.callback = { 'onTicker':self.update_ticker, 'onDepth':self.update_depth, 'onPosition':self.update_position, 'onEquity':self.update_equity, 'onOrder':self.update_order, 'onExit':self.update_exit, } self.rest.logger = self.logger # 配置策略 self.strategy.logger = self.logger # 配置定价模型 price_alpha = [] for i in self.params.refpair: # 交易1000shib 参考 shib if '1000' in self.params.pair and '1000' not in i: price_alpha.append(1000.0) # 交易shib 参考 1000shib elif '1000' not in self.params.pair and '1000' in i: price_alpha.append(0.001) else: # 交易shib 参考 shib price_alpha.append(1.0) self.logger.info(f'价格系数{price_alpha}') self.Predictor = predictor.Predictor(ref_name=self.ref_name, alpha=price_alpha, gamma=float(self.params.gamma)) # 初始化参数 self.strategy.trade_open_dist = float(self.params.open) self.strategy.trade_close_dist = float(self.params.close) # 在线训练 self.backtest = int(self.params.backtest) self.logger.info(f'在线训练开关 {self.backtest}') #### time.sleep(3) def get_logger(self, logname): '''日志模块''' logger = logging.getLogger(__name__) # # log flag # if int(self.params.log): # log_level = logging.DEBUG # logger.setLevel(log_level) # # log to txt # formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') # if logname == None: logname = "log" # handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8') # handler.setLevel(log_level) # handler.setFormatter(formatter) # # log to console # console = logging.StreamHandler() # console.setLevel(logging.INFO) # # add # logger.addHandler(handler) # logger.addHandler(console) # else: log_level = logging.INFO logger.setLevel(log_level) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') if logname == None: logname = "log" handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8') handler.setLevel(log_level) handler.setFormatter(formatter) # add logger.addHandler(handler) logger.info('开启日志记录') return logger def update_order(self, data): self.loop.create_task(self._update_order(data)) async def _update_order(self, data): ''' 更新订单 首先直接复写本地订单 1、如果是开仓单 如果新增: 增加本地订单 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位 如果成交: 删除本地订单 发送平仓订单 修改本地仓位 2、如果是平仓单 如果新增: 增加本地订单 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位 如果成交: 删除本地订单 修改本地仓位 NEW 可以从 ws / rest 来 REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单 为了防止下单失败依然有订单成交 本地需要做一个缓存 ''' try: # 触发订单更新 self.trade_order_update_time = time.time() # 新增订单推送 仅需要cid oid信息 if data['status'] == 'NEW': # 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控) if data['client_id'] in self.local_orders: self.local_orders[data['client_id']]["order_id"] = data['order_id'] self.local_orders[data['client_id']]["localtime"] = time.time() # 完成订单推送 仅需要cid filled filled_size信息 elif data['status'] == 'REMOVE': # 如果在撤单记录中 说明此订单结束生命周期 可以移除记录 if data["client_id"] in self.local_cancel_log: del(self.local_cancel_log[data["client_id"]]) # 在cid缓存队列中 说明是本策略的订单 if data["client_id"] in self.local_orders_backup: # 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算 if data['client_id'] not in self.handled_orders_cid: # 添加进已处理队列 self.handled_orders_cid.append(data["client_id"]) # 提取成交信息 方向 价格 量 filled = data["filled"] side = self.local_orders_backup[data['client_id']]["side"] if "filled_price" in data: if data["filled_price"] > 0.0: filled_price = data["filled_price"] else: filled_price = self.local_orders_backup[data['client_id']]["price"] else: filled_price = self.local_orders_backup[data['client_id']]["price"] # 只有开仓成交才触发onPosition # 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况 if filled > 0: if "spot" in self.exchange:# 如果是现货交易 还需要修改equity ### 现货必须考虑fee 买入fee单位为币 卖出fee单位为u fee = data["fee"] ### 现货订单流仓位计算 if side == "kd": # buy self.local_buy_amount += filled - fee self.local_buy_value += (filled - fee) * filled_price new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)) - Decimal(str(fee))) if new_long_pos == 0.0: self.local_position_by_orders.longAvg = 0.0 self.local_position_by_orders.longPos = 0.0 else: self.local_position_by_orders.longAvg = \ (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos self.local_position_by_orders.longPos = new_long_pos self.local_cash -= filled * filled_price self.local_coin += filled - fee elif side == "pd": # sell self.local_sell_amount += filled self.local_sell_value += filled * filled_price self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg) new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled))) if new_long_pos == 0.0: self.local_position_by_orders.longAvg = 0.0 self.local_position_by_orders.longPos = 0.0 else: self.local_position_by_orders.longPos = new_long_pos self.local_cash += filled * filled_price - fee self.local_coin -= filled elif side == "pk": # buy self.local_buy_amount += filled - fee self.local_buy_value += (filled - fee) * filled_price self.local_profit += filled * (self.local_position_by_orders.shortAvg - filled_price) new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)) - Decimal(str(fee))) if new_short_pos == 0.0: self.local_position_by_orders.shortAvg = 0.0 self.local_position_by_orders.shortPos = 0.0 else: self.local_position_by_orders.shortPos = new_short_pos self.local_cash -= filled * filled_price self.local_coin += filled - fee elif side == "kk": # sell self.local_sell_amount += filled self.local_sell_value += filled * filled_price new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled))) if new_short_pos == 0.0: self.local_position_by_orders.shortAvg = 0.0 self.local_position_by_orders.shortPos = 0.0 else: self.local_position_by_orders.shortAvg = \ (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos self.local_position_by_orders.shortPos = new_short_pos self.local_cash += filled * filled_price - fee self.local_coin -= filled else: self.logger.error(f"错误的仓位方向{side}") else: ### 合约订单流仓位计算 if side == "kd": self.local_buy_amount += filled self.local_buy_value += filled * filled_price new_long_pos = (self.local_position_by_orders.longPos + filled) if new_long_pos == 0.0: self.local_position_by_orders.longAvg = 0 self.local_position_by_orders.longPos = 0 else: self.local_position_by_orders.longAvg = \ (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled))) elif side == "kk": self.local_sell_amount += filled self.local_sell_value += filled * filled_price new_short_pos = (self.local_position_by_orders.shortPos + filled) if new_short_pos == 0.0: self.local_position_by_orders.shortAvg = 0 self.local_position_by_orders.shortPos = 0 else: self.local_position_by_orders.shortAvg = \ (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled))) elif side == "pd": self.local_sell_amount += filled self.local_sell_value += filled * filled_price self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg) self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled))) if self.local_position_by_orders.longPos == 0:self.local_position_by_orders.longAvg = 0 elif side == "pk": self.local_buy_amount += filled self.local_buy_value += filled * filled_price self.local_profit += filled * (self.local_position_by_orders.shortAvg-filled_price) self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled))) if self.local_position_by_orders.shortPos == 0:self.local_position_by_orders.shortAvg = 0 else: self.logger.error(f"错误的仓位方向{side}") # 统计合约交易手续费 正fee为扣手续费 负fee为返佣 if 'fee' in data: self.local_profit -= data['fee'] self.logger.debug('更新推算仓位'+str(self.local_position_by_orders.__dict__)) ### self._print_local_trades_summary() # 每次有订单变动就触发一次策略 if self.mode_signal == 0 and self.ready: ### 更新交易数据 self.update_trade_msg() ### 触发策略挂单逻辑 # 更新策略时间 self.strategy.local_time = time.time() orders = self.strategy.onTime(self.tradeMsg) # if (('Limits_open' in orders and len(orders['Limits_open']) != 0) or # ('Limits_close' in orders and len(orders['Limits_close']) != 0)): # self.logger.info("--------------------------------update_local_order订单指令----------------------------") # self.logger.info(orders) # self.logger.info("-------------------------------------------end--------------------------------------") ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onOrder") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) else: self.logger.debug(f"订单已经参与过仓位计算 拒绝重复进行计算{data['client_id']}") else: self.logger.debug(f"订单不属于本策略 拒绝进行仓位计算{data['client_id']}") # 移除本地订单 if data["client_id"] in self.local_orders: self.logger.debug(['删除本地订单', data["client_id"]]) del(self.local_orders[data["client_id"]]) else: self.logger.debug(['该订单不在本地挂单表中', data["client_id"]]) else: print(data) self.logger.debug(f"未知的订单事件类型 {data}") except Exception as e: print("处理订单推送出错:"+str(e)) self.logger.error("处理订单推送出错:"+str(e)) self.logger.error(traceback.format_exc()) self.exit_msg="处理订单推送出错" self.stop() def _update_local_orders(self, orders): """ 本地记录所有报单信息 """ try: for i in orders: if "Limits" in i: for j in orders[i]: order_info = dict() order_info['symbol'] = self.symbol order_info['amount'] = float(j[0]) order_info['side'] = j[1] order_info['price'] = float(j[2]) order_info['client_id'] = j[3] order_info['filled_price'] = 0 order_info['filled'] = 0 order_info['order_id'] = "" order_info['localtime'] = self.strategy.local_time order_info['createtime'] = self.strategy.local_time self.local_orders[j[3]] = order_info # 本地挂单表 self.logger.debug(['新增本地订单', order_info]) self.local_orders_backup[j[3]] = order_info # 本地缓存表 self.local_orders_backup_cid.append(j[3]) # 本地缓存cid表 if 'Cancel' in i: # 记录撤单次数 cid = orders[i][0] if cid in self.local_cancel_log: self.local_cancel_log[cid] += 1 else: self.local_cancel_log[cid] = 0 # 清除过于久远的历史记录 if len(self.local_orders_backup_cid) > 9999: cid = self.local_orders_backup_cid[0] # 判断是否超过1个小时 如果超过则移除历史记录 if cid in self.local_orders_backup: if time.time() - self.local_orders_backup[cid]["localtime"] > 3600: del(self.local_orders_backup[cid]) del(self.local_orders_backup_cid[0]) if len(self.handled_orders_cid) > 9999: del(self.handled_orders_cid[0]) except: self.logger.error("本地记录订单信息出错") self.logger.error(traceback.format_exc()) self.exit_msg="本地记录订单信息出错" self.stop() def _not_empty(self, orders): '''检查指令是否不为空''' if isinstance(orders, dict): for order_name in orders: if "Cancel" in order_name or "Check" in order_name: return 1 elif "Limits_open" in order_name: if len(orders["Limits_open"]) > 0: return 1 elif "Limits_close" in order_name: if len(orders["Limits_close"]) > 0: return 1 return 0 def _print_local_trades_summary(self): '''计算本地累计利润''' ### local_buy_amount = round(self.local_buy_amount,5) local_buy_value = round(self.local_buy_value,5) local_sell_amount = round(self.local_sell_amount,5) local_sell_value = round(self.local_sell_value,5) local_profit = 0.0 if isinstance(self.strategy.mp, float): unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp realized = local_sell_value - local_buy_value local_profit = round(unrealized+realized,5) self.strategy.local_profit = local_profit ### msg = f"买量{local_buy_amount} 卖量{local_sell_amount} 买额{local_buy_value} 卖额{local_sell_value} 利润 {local_profit}" self.logger.info(msg) def update_position(self, data): ''' 更新仓位信息 ''' if data != self.local_position: self.local_position = data self.logger.debug('更新本地仓位'+str(self.local_position.__dict__)) """ 2023-2-22 用create_task去执行,会延迟,占用越大,延迟越大,可能会延迟100ms计算 """ def update_ticker(self, data): ''' 增加onticker撤单 可能会导致平仓难度加大 ''' self.loop.create_task(self._update_ticker(data)) def update_depth(self, data): self.loop.create_task(self._update_depth(data)) async def _update_ticker(self, data): ''' update ticker infomation ''' name = data['name'] # 记录tick更新时间 # self.market_update_time[name] = time.time() self.tickers[name] = data ### 判断是否需要触发ontick if name == self.ref_name[self.strategy.ref_index]: pass elif name == self.trade_name: pass else: pass # @utils.timeit async def _update_depth(self, data): ''' update orderbook infomation ''' name = data['name'] now_time = time.time() if self.market_update_time[name] == 0.0: pass else: interval = now_time - self.market_update_time[name] if self.market_update_interval[name] == 0.0: self.market_update_interval[name] = interval else: self.market_update_interval[name] = self.market_update_interval[name]*0.999 + interval*0.001 self.market_update_time[name] = now_time ### 初始化depths if self.depths[name] == list(): self.depths[name] = data['data'] ### 判断是否需要触发ondepth # 如果是交易盘口 if name == self.trade_name: ### 更新depths self.depths[name] = data['data'] # 允许交易 if self.mode_signal == 0 and self.ready: ### 聚合行情处理 self.on_agg_market() ### 判断是否为当前跟踪的盘口 elif name == self.ref_name[self.strategy.ref_index]: ### 判断是否需要触发ontick 对行情进行过滤 ### 过滤条件 价格变化很大 时间间隔很长 flag = 0 if abs(data['data'][utils.BP_INDEX] - self.depths[name][utils.BP_INDEX])/data['data'][utils.BP_INDEX] > 0.0002 or \ abs(data['data'][utils.AP_INDEX] - self.depths[name][utils.AP_INDEX])/data['data'][utils.AP_INDEX] > 0.0002 or \ time.time() - self.on_tick_event_time > 0.05: ### 允许交易 flag = 1 ### 更新ontick触发时间记录 self.on_tick_event_time = time.time() ### 更新depths self.depths[name] = data['data'] # 允许交易 if self.mode_signal == 0 and self.ready and flag: ### 更新交易数据 self.update_trade_msg() ### 触发事件撤单逻辑 # 更新策略时间 self.strategy.local_time = time.time() # 产生交易信号 orders = self.strategy.onTime(self.tradeMsg) # if (('Limits_open' in orders and len(orders['Limits_open']) != 0) or # ('Limits_close' in orders and len(orders['Limits_close']) != 0)): # self.logger.info("--------------------------------_update_depth订单指令--------------------------------") # self.logger.info(orders) # self.logger.info("-------------------------------------------end--------------------------------------") ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onTick") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) else: pass # @timeit async def real_time_back_test(self, data): ''' 按照长短期回测利润选择参数 优先按长期回测利润选参数 如果找不到就 再按短期回测利润选参数 如果还找不到就 使用默认参数 如果默认参数亏损就触发冷静期 ''' now_time = time.time() await asyncio.sleep(0.005) for i in self.backtest_tasks: i["backtest_engine"].backtest_time = now_time i["backtest_engine"].run_by_tick(data) def choose_params(self): ''' 按照长短期回测利润选择参数 优先按长期回测利润选参数 如果找不到就 再按短期回测利润选参数 如果还找不到就 使用默认参数 如果默认参数亏损就触发冷静期 ''' profits = [] for i in self.backtest_tasks: # 获取绩效信息 e = i["backtest_engine"].equity # 最终净值 # 计算标准化利润 p = (e-self.backtest_start_cash) / self.backtest_start_cash \ / self.backtest_look_length * self.tick_profit_to_daily # 有一定成交次数的回测结果才有代表性 持仓太久的参数禁止使用 _trade_num = i['backtest_engine'].trade_num _avg_hold_time = i['backtest_engine'].avg_hold_time _equity_high = i['backtest_engine'].equity_high # 排除交易次数太少的参数 if i['open'] <= 0.002: if _trade_num < 10: p = 0.0 # 排除长期持仓的参数 if _avg_hold_time > 600: p = 0.0 # 排除近期回撤较大的参数 if _equity_high > e*1.01: p = 0.0 profits.append(p) #利润 ############## 重置回测 # if _trade_num > 200: # i["backtest_engine"].trade_num = 0 # i["backtest_engine"].equity = self.backtest_start_cash # 盈利参数个数不能太少 防止孤岛参数 win_num = 0 for i in profits: if i > 0.0: win_num += 1 cond1 = win_num > self.backtest_num*0.1 cond2 = win_num > 2 cond_win = cond1 and cond2 if cond_win: # 按最优回测结果调整参数 max_profit = max(profits) max_index = profits.index(max_profit) self.strategy.trade_open_dist = self.backtest_tasks[max_index]["open"] self.strategy.trade_close_dist = self.backtest_tasks[max_index]["close"] self.strategy.ref_index = self.backtest_tasks[max_index]["index"] self.strategy.post_side = self.backtest_tasks[max_index]["side"] self.strategy.predict_alpha = self.backtest_tasks[max_index]["alpha"] # 检查是否需要关闭回测 # if self.strategy.ready == 1: # self.backtest = 0 else: # 如果没有符合条件的盈利参数 self.strategy.trade_open_dist = 0.01 self.strategy.trade_close_dist = 0.00001 self.strategy.ref_index = 0 self.strategy.post_side = 0 self.strategy.predict_alpha = 0 # 检查是否需要关闭回测 # if self.strategy.ready == 1: # self.backtest = 0 # self.exit_msg = "未找到合适参数 停机" # self.stop() return def update_equity(self, data): ''' 更新保证金信息 合约一直更新 现货只有当出现异常时更新 ''' if "spot" in self.exchange: pass else: self.local_cash = data[self.quote] * self.used_pct def update_exit(self, data): ''' 底层触发停机 ''' self.exit_msg = data self.stop() def get_all_market_data(self): ''' 只能定时触发 组合市场信息=交易盘口+参考盘口 ''' market = [] data = self.ws._get_data()["data"] market += data for i in self.ref_name: data = self.ws_ref[i]._get_data()["data"] market += data # handle save real market data if self.save: with open(f'./{self.csvname}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(market) return market async def before_trade(self): ####### 启动ws ####### # 启动交易ws # 当开启回测时才订阅交易盘口的成交流 _sub_trade = int(self.params.backtest) _sub_fast = int(self.params.fast) # TODO 这是task1,要在这个交易所交易,pycharm定位有问题,这个ws不一定指向binance self.loop.create_task(self.ws.run(is_auth=1, sub_trade=_sub_trade, sub_fast=0)) # TODO 这是task n,用来做参考 for i in self.ref_name: # 启动参考ws 参考盘口使用fast行情性能消耗更大 使用普通行情可以节省性能 self.loop.create_task(self.ws_ref[i].run(is_auth=0, sub_trade=0, sub_fast=_sub_fast)) await asyncio.sleep(1) ###### 做交易前准备工作 ###### # 买入平台币 # TODO v9情况下买入平台币会怎么样? await self.rest.buy_token() await asyncio.sleep(1) # 清空挂单和仓位 await self.rest.check_position(hold_coin=self.hold_coin) await asyncio.sleep(1) # 获取市场信息 await self.rest.before_trade() await asyncio.sleep(1) # 获取价格信息 ticker = await self.rest.get_ticker() mp = ticker['mp'] # 获取账户信息 await asyncio.sleep(1) await self.rest.get_equity() # 初始资金 start_cash = self.rest.cash_value * self.used_pct start_coin = self.rest.coin_value * self.used_pct if start_cash == 0.0 and start_coin == 0.0: self.exit_msg = f"初始为零 cash: {start_cash} coin: {start_coin}" self.stop() self.logger.info(f"初始cash: {start_cash} 初始coin: {start_coin}") # 初始化策略基础信息 if isinstance(mp, float): if mp <= 0.0: self.exit_msg = f"初始价格获取错误 {mp}" self.stop() else: print(f"初始价格为 {mp}") else: self.exit_msg = f"初始价格获取错误 {mp}" self.stop() self.strategy.mp = mp self.strategy.start_cash = start_cash self.strategy.start_coin = start_coin self.strategy.start_equity = start_cash + start_coin * mp self.strategy.max_equity = self.strategy.start_equity self.strategy.equity = self.strategy.start_equity self.strategy.total_amount = self.strategy.equity * self.strategy.leverrate / self.strategy.mp self.strategy.stepSize = self.rest.stepSize if self.rest.stepSize < 1.0 else int(self.rest.stepSize) self.strategy.tickSize = self.rest.tickSize if self.rest.tickSize < 1.0 else int(self.rest.tickSize) if self.strategy.stepSize == None or self.strategy.tickSize == None: self.exit_msg = f"交易精度未正常获取 stepsize: {self.strategy.stepSize} ticksize: {self.strategy.tickSize}" self.stop() else: self.logger.info(f"数量精度{self.strategy.stepSize}") self.logger.info(f"价格精度{self.strategy.tickSize}") grid = float(self.params.grid) # 计算下单数量 if "spot" in self.exchange: long_one_hand_value = start_cash * float(self.params.leverrate) / grid short_one_hand_value = start_coin * mp * float(self.params.leverrate) / grid long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) else: long_one_hand_value = start_cash * float(self.params.leverrate) / grid short_one_hand_value = start_cash * float(self.params.leverrate) / grid long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) # 检查是否满足最低交易要求 print(f"最低单手交易下单量为 buy: {long_one_hand_amount} sell: {short_one_hand_amount}") if (long_one_hand_amount == 0 and short_one_hand_amount == 0) or (long_one_hand_value < 20 and short_one_hand_value < 20): self.exit_msg = f"初始下单量太少 buy: {long_one_hand_amount} sell: {short_one_hand_amount}" self.stop() # 初始化调度器 self.local_cash = start_cash self.local_coin = start_coin # 配置在线训练 if self.backtest: # 设置策略默认参数 self.strategy.trade_close_dist = 0.00001 self.strategy.trade_open_dist = 0.01 self.backtest_look_length = 86400 / self.interval # 回测区间足够长 self.backtest_tasks = list() self.tick_profit_to_daily = (86400/self.interval) self.backtest_start_cash = 1000000.0 # 备选参数 open_list, close_list, alpha_list = utils.get_backtest_set(self.base) if 'spot' in self.exchange: side_list = [] if long_one_hand_amount > 0: side_list.append(1) if short_one_hand_amount > 0: side_list.append(-1) if 1 in side_list and -1 in side_list: side_list.append(0) else: side_list = [-1,0,1] side_list_allow = [] for s in side_list: if s in utils.POST_SIDE_LIMIT: side_list_allow.append(s) side_list = side_list_allow for _open in open_list: for _side in side_list: for _close in close_list[_open]: for _index in range(self.ref_num): for _alpha in alpha_list: task = dict() st = strategy.Strategy(self.params, is_print=0) st.leverrate = 1.0 st.trade_open_dist = _open st.trade_close_dist = _close st.predict_alpha = _alpha st.ref_index = _index st.post_side = _side st.exchange = "dummy_usdt_swap" st.local_start_time = 0.0 bt = backtest.Backtest(st, is_plot=0) bt.start_cash = self.backtest_start_cash task["backtest_engine"] = bt task["open"] = _open task["close"] = _close task["index"] = _index task["side"] = _side task["alpha"] = _alpha self.backtest_tasks.append(task) backtest_num = len(self.backtest_tasks) self.backtest_num = backtest_num self.logger.info(f'在线模拟撮合数量{backtest_num}') self.logger.info(f'当前为在线训练模式 需预热{utils.BACKTEST_PREHOT_SECOND}秒 请耐心等候...') else: self.logger.info('当前为指定参数模式...') ###### 交易前准备就绪 可以开始交易 ###### self.loop.create_task(self.rest.go()) self.loop.create_task(self.on_timer()) self.loop.create_task(self._run_server()) self.loop.create_task(self.run_stratey()) #self.loop.create_task(self.post_loop()) #改 self.loop.create_task(self.early_stop_loop()) def update_trade_msg(self): # 更新保证金 self.tradeMsg.cash = round(self.local_cash,10) self.tradeMsg.coin = round(self.local_coin,10) # 使用本地推算仓位 self.tradeMsg.position = self.local_position_by_orders # 更新订单 self.tradeMsg.orders = self.local_orders ### 更新 ref ref_tickers = [] for i in self.ref_name: ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']]) self.tradeMsg.ref_price = self.Predictor.Get_ref(ref_tickers) # logger.info('ref_price={}, market={}, predict={}'.format( # self.tradeMsg.ref_price, self.tradeMsg.market, self.tradeMsg.predict)) async def server_handle(self, request): '''中控数据接口''' if 'spot' in self.exchange: pos = self.local_position_by_orders.longPos - self.local_position_by_orders.shortPos else: pos = self.local_position.longPos - self.local_position.shortPos if pos > 0.0: entryPrice = self.local_position_by_orders.longAvg elif pos < 0.0: entryPrice = self.local_position_by_orders.shortAvg else: entryPrice = 0 return web.Response(body=json.dumps({ "now_balance": round(self.strategy.equity/self.used_pct, 4), #钱包余额 "unrealized_pn_l": round(self.local_profit, 4), #未实现盈利 "pos": round(pos, 8), #持仓数量 "entry_price": round(entryPrice, 8), #开仓价格 "now_price": round(self.strategy.mp, 8), #当前价格 })) async def change(self, request): '''中控台修改参数''' try: data = await request.json() if "stop" in data: self.logger.warning('中控停机') self.exit_msg = '中控停机' self.stop() return web.Response(text=f"停机成功") ip = request.remote print(f'从{ip}收到更新参数请求',data) if isinstance(data, str): data = json.loads(data) if self.backtest == 1: return web.Response(text="自动调参模式不允许手动修改参数") else: open = float(data['open']) close = float(data['close']) self.strategy.trade_open_dist = open self.strategy.trade_close_dist = close return web.Response(text=f"参数修改成功 {open} {close}") except Exception as e: return web.Response(text=f"参数修改失败 {e}") # @utils.timeit def check_risk(self): '''检查风控''' if self.strategy.start_cash == 0.0: print("请检查交易账户余额") return 0 if isinstance(self.strategy.mp, float): pass else: print("请检查最新价格") return 0 ############ # print("当前线程数",self.process.num_threads()) ###### 资源风控0 ###### cpu_pct = psutil.cpu_times_percent().user self.cpu_ema = self.cpu_ema * 0.8 + cpu_pct * 0.2 # print(f"cpu占用 {cpu_pct}") if self.cpu_ema > 95: msg = f"cpu占用过高 {self.cpu_ema} 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() mm_pct = psutil.virtual_memory().percent self.mm_ema = self.mm_ema * 0.8 + mm_pct * 0.2 # print(f"内存占用 {mm_pct}") if self.mm_ema > 95: msg = f"内存占用过高 {self.mm_ema} 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 回撤风控1 ###### if "spot" not in self.exchange: draw_back = 1-self.strategy.equity/self.strategy.max_equity if draw_back > self.stoploss: msg = f"{self.acct_name} 总资金吊灯回撤{draw_back} 当前{self.strategy.equity} 最高{self.strategy.max_equity} 触发止损 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 回撤风控2 ###### draw_back = self.local_profit/self.strategy.start_equity if draw_back < -self.stoploss: msg = f"{self.acct_name} 交易亏损 触发止损 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 报单延迟风控 ###### if self.rest.avg_delay > 5000: # 平均延迟允许上限 5000ms msg = f"{self.acct_name} 延迟爆表 触发风控 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 仓位异常风控 ###### ### 合约60秒更新一次绝对仓位 ### # 连续5分钟仓位不正确就停机 # 5 * 60 = 300 300/10 = 30 diff_pos = max(abs(self.local_position.longPos - self.local_position_by_orders.longPos),abs(self.local_position.shortPos - self.local_position_by_orders.shortPos)) if "spot" not in self.exchange: diff_pos_value = diff_pos * self.strategy.mp if diff_pos_value > self.strategy._min_amount_value: msg = f"{self.acct_name} ***发现仓位异常*** 推算{self.local_position_by_orders.__dict__} 本地{self.local_position.__dict__}" print(msg) self.logger.warning(msg) self.position_check_series.append(1) else: self.position_check_series.append(0) if len(self.position_check_series) > 30: del(self.position_check_series[0]) if sum(self.position_check_series) >= 30: msg = f"{self.acct_name} 合约连续检查本地仓位和推算仓位不相符 退出" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 下单异常风控 ###### if self.strategy.total_amount == 0.0: msg = f"{self.acct_name} 开仓量为零 退出" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 行情更新异常风控 ###### for name in self.ref_name: delay = round((time.time() - self.market_update_time[name]) * 1000, 3) if delay > utils.MARKET_DELAY_LIMIT: # thre msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出" self.logger.error(msg) self.exit_msg = msg self.stop() for name in [self.trade_name]: delay = round((time.time() - self.market_update_time[name]) * 1000, 3) if delay > utils.MARKET_DELAY_LIMIT: # thre msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 订单异常风控 ###### for cid in self.local_orders: if time.time() - self.local_orders[cid]["localtime"] > 300: # 订单长时间停留 怀疑漏单 但未必一定漏 5min msg = f"{self.acct_name} cid:{cid} 订单停留过久 怀疑异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 持仓均价异常风控 ###### if isinstance(self.strategy.long_pos_bias, float): # 偏离mp较大 且持仓较大 说明出现异常 if self.strategy.long_hold_value > 2*self.strategy._min_amount_value: if self.strategy.long_pos_bias > 4.0 or self.strategy.long_pos_bias < -2.0: msg = f"{self.acct_name} long_pos_bias:{self.strategy.long_pos_bias} 持仓均价异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() if isinstance(self.strategy.short_pos_bias, float): # 偏离mp较大 且持仓较大 说明出现出现异常 if self.strategy.short_hold_value > 2*self.strategy._min_amount_value: if self.strategy.short_pos_bias > 4.0 or self.strategy.short_pos_bias < -2.0: msg = f"{self.acct_name} short_pos_bias:{self.strategy.short_pos_bias} 持仓均价异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 订单撤单异常风控 ###### for cid in self.local_cancel_log: if self.local_cancel_log[cid] > 300: msg = f"{self.acct_name} 订单长时间无法撤销 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 定价异常风控 ###### if abs(self.strategy.ref_price-self.strategy.mp)/self.strategy.mp > 0.03: msg = f"{self.acct_name} 定价偏离过大 怀疑异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() async def exit(self, delay=0): '''退出操作''' try: self.logger.info(f"预约退出操作 delay:{delay}") if delay > 0: await asyncio.sleep(delay) self.logger.info(f"开始退出操作") self.logger.info("为避免api失效导致遗漏仓位 建议人工复查") await self.rest.check_position(hold_coin=self.hold_coin) # stop flag self.rest.stop_flag = 1 self.ws.stop_flag = 1 for i in self.ref_name: self.ws_ref[i].stop_flag = 1 # double check 需要延迟几秒以便等待更新数据 await asyncio.sleep(3) self.logger.info("双重检查遗漏仓位") await self.rest.check_position(hold_coin=self.hold_coin) self.logger.info(f'停机退出 停机原因 {self.exit_msg}') await asyncio.sleep(1) # 发送交易状态 await self._post_params() # 压缩行情文件 utils.csv_to_gz_and_remove() # close pid self.logger.info("退出进程") except: self.logger.error(traceback.format_exc()) finally: os._exit(0) async def on_timer(self): '''定期触发系统逻辑''' await asyncio.sleep(20) while 1: try: # 10秒检查一次风控 await asyncio.sleep(10) # 检查风控 self.check_risk() # stop if self.mode_signal == 1:return # 计算预估成交额 total_trade_value = self.local_buy_value + self.local_sell_value self.strategy.trade_vol_24h = round(total_trade_value / (time.time()-self.pid_start_time) * 86400 / 10000, 2) # 打印 if int(self.params.log): self.strategy._print_summary() # 打印行情延迟监控 self.logger.info('Rest 报单平均延迟 ' + str(self.rest.avg_delay) + 'ms ') self.logger.info('Rest 报单最高延迟 ' + str(self.rest.max_delay) + 'ms ') for name in self.market_update_interval: avg_interval = round(self.market_update_interval[name]*1e3, 2) self.logger.info(f'WS 盘口{name}行情 平均更新间隔 {avg_interval}ms') # 选择参数 if self.backtest: self.choose_params() except asyncio.CancelledError: print('定期循环任务取消') except: print("定时循环系统出错") self.logger.error(traceback.print_exc()) await asyncio.sleep(10) async def _post_params(self): '''推送交易信息''' profit = round(self.strategy.daily_return/self.strategy.leverrate,4) if time.time() - self.pid_start_time > utils.EARLY_STOP_SECOND * 0.5 or profit < 0.0: await utils._post_params( "http://wwww.khods.com:8888/post_params", self.params.proxy, ujson.dumps({ "pwd":"123456", "exchange":self.params.exchange, "pair":self.params.pair, "open":self.params.open, "close":self.params.close, "refexchange":self.params.refexchange[self.strategy.ref_index], "profit":profit, }) ) else: self.logger.info("不满足推送过滤条件 放弃推送参数") async def post_loop(self): '''定期触发交易信息推送''' await asyncio.sleep(30) _interval = 60 # 定期推送一次盈利情况 while 1: try: # 定期推送一次 await asyncio.sleep(_interval) # 发送交易状态 await self._post_params() except asyncio.CancelledError: print('post loop 循环任务取消') except: print("post loop 循环系统出错") self.logger.error(traceback.print_exc()) await asyncio.sleep(10) async def early_stop_loop(self): '''定期触发交易信息推送''' if self.father: self.logger.info(f'以父进程方式启动 关闭早停检测') return else: self.logger.info(f'以子进程方式启动 开启早停检测') await asyncio.sleep(30) _interval = utils.EARLY_STOP_SECOND _last_equity = self.strategy.start_equity _last_local_profit = 0.0 while 1: try: # 休眠 await asyncio.sleep(_interval) ###### 子进程早停风控 ###### self.logger.info(f'当前净值{self.strategy.equity} 上次检测时净值{_last_equity} 当前累积利润{self.local_profit} 上次检测时利润{_last_local_profit}') # 检查是否需要早停 没有成交 或者 亏损 if self.strategy.equity <= _last_equity or self.local_profit <= _last_local_profit: self.logger.info('触发早停条件 当零持仓时退出') # 没有持仓 for _ in range(30): await asyncio.sleep(5) if self.strategy.long_hold_value < self.strategy._min_amount_value and \ self.strategy.short_hold_value < self.strategy._min_amount_value: msg = f"{self.acct_name} 子进程盈利状况不理想 提前停机 退出" self.logger.error(msg) self.exit_msg = msg self.stop() # 更新上一次检测的净值 _last_equity = self.strategy.equity _last_local_profit = self.local_profit except asyncio.CancelledError: print('early stop 循环任务取消') except: print("early stop 循环系统出错") self.logger.error(traceback.print_exc()) await asyncio.sleep(10) def on_agg_market(self): ''' 处理聚合行情 1. 获取聚合行情 2. 更新预测器 3. 触发tick回测 ''' ### 更新聚合市场数据 agg_market = self.get_all_market_data() ### 更新聚合市场信息 self.tradeMsg.market = agg_market ### 更新预测器 self.Predictor.onTime(agg_market) ### 触发回测 if self.backtest: self.loop.create_task(self.real_time_back_test(self.tradeMsg)) async def run_stratey(self): ''' 定期触发策略 ''' print('定时触发器启动') # 准备交易 try: print('前期准备完成') await asyncio.sleep(10) while 1: try: # 时间预设 start_time = time.time() ### 是否准备充分 if self.ready: ### 更新交易信息集合 self.update_trade_msg() ### 触发策略 if self.mode_signal == 0: pass # # 更新策略时间 # self.strategy.local_time = time.time() # # 产生信号 # orders = self.strategy.onTime(self.tradeMsg) # ### 记录指令触发信息 # if self._not_empty(orders): # self.logger.debug("触发onTime") # self._update_local_orders(orders) # self.loop.create_task(self.rest.handle_signals(orders)) # self.logger.debug(orders) else: if self.mode_signal > 1:self.mode_signal -= 1 if self.mode_signal == 1:return # 触发策略 # 更新策略时间 self.strategy.local_time = time.time() # 获取信号 # TODO mode_signal∈[21, +无穷) 表示什么? if self.mode_signal > 20: # 先执行onExit orders = self.strategy.onExit(self.tradeMsg) ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onExit") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) # TODO mode_signal∈[2, 20] 表示什么? else: # 再执行onSleep orders = self.strategy.onSleep(self.tradeMsg) ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onSleep") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) ############################################################ else: self.check_ready() ### 计算耗时并进行休眠 pass_time = time.time()-start_time await asyncio.sleep(utils.clip(self.interval-pass_time, 0.0, 1.0)) except asyncio.CancelledError: print('策略触发任务取消') except: self.logger.error(traceback.format_exc()) traceback.print_exc() await asyncio.sleep(10) except asyncio.CancelledError: print('策略触发任务取消') except: self.logger.error(traceback.format_exc()) traceback.print_exc() await asyncio.sleep(10) def check_ready(self): ''' 判断初始数据是否齐全 ''' ### 检查 ticker 行情 for i in self.ref_name: if i not in self.tickers or self.tickers[i] == {}: print("参考盘口ticker未准备好") return else: if self.tickers[i]['bp'] == 0 or self.tickers[i]['ap'] == 0: print("参考盘口ticker未准备好") return if self.trade_name not in self.tickers or self.tickers[self.trade_name] == {}: print("交易盘口ticker未准备好") return else: if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0: print("交易盘口ticker未准备好") return ### 检查 market 行情 all_market = self.get_all_market_data() if len(all_market) != utils.LEN*(1+self.ref_num): print("聚合行情未准备好") return else: # 如果行情已经就绪 预热trademsg和predictor print("聚合行情准备就绪") self.tradeMsg.market = all_market self.Predictor.onTime(all_market) self.ready = 1 def stop(self): ''' 停机函数 mode_signal 不能小于80 前6秒用于maker平仓 后2秒用于撤maker平仓单 休眠2秒再执行check_position 避免卡单导致漏仓位 ''' self.logger.info(f'进入停机流程...') self.mode_signal = 80 # 等strategy onExit 彻底执行完毕 进入沉默状态之后 再进入exit 否则可能导致多处同时操作订单 # 尽量减少大仓位直接take平 self.loop.create_task(self.exit(delay=10)) async def _run_server(self): print('server正在启动...') for _ in range(30): await asyncio.sleep(5) if self.strategy.equity > 0.0:break app = web.Application() app.router.add_route('GET', '/account', self.server_handle) app.router.add_route('POST', '/change', self.change) try: self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False)) except: self.logger.error(f"Server启动失败") self.logger.error(traceback.format_exc()) self.exit_msg = "服务启动失败 停机退出" self.stop() def run(self): '''启动ws行情获取''' def keyboard_interrupt(s, f): self.logger.info("收到退出信号 准备关机") self.stop() try: signal.signal(signal.SIGINT, keyboard_interrupt) signal.signal(signal.SIGTERM, keyboard_interrupt) if 'win' not in sys.platform: signal.signal(signal.SIGKILL, keyboard_interrupt) signal.signal(signal.SIGQUIT, keyboard_interrupt) except: pass self.loop.create_task(self.before_trade()) # TODO 启动方式干嘛用的?为什么要判断? print(f'判断启动方式...') if self.father: print('以父进程方式启动 最大允许运行时间为30天') self.loop.create_task(self.exit(delay=60*60*24*30)) else: print('以子进程方式启动 最大允许运行时间为60分钟') self.loop.create_task(self.exit(delay=utils.CHILD_RUN_SECOND)) self.loop.run_forever() if __name__ == "__main__": if 0: utils.check_auth() if 0: utils.check_time() pnum = len(sys.argv) if pnum > 0: fname = None log_file = None pidnum = None father = 1 for i in range(pnum): print(f"第{i}个参数为:{sys.argv[i]}") if sys.argv[i] == '-c' or sys.argv[i] == '--c': fname = sys.argv[i+1] elif sys.argv[i] == '-h': print("帮助文档") elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file': log_file = sys.argv[i+1] elif sys.argv[i] == '-num' or sys.argv[i] == '--num': pidnum = sys.argv[i+1] elif sys.argv[i] == '-v' or sys.argv[i] == '--v': print(f"当前版本为 V{VERSION}") elif sys.argv[i] == '-child' or sys.argv[i] == '--child': father = 0 print(f"当前以子进程方式启动") if fname and log_file and pidnum: print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum} father:{father}") date = time.strftime("%Y%m%d", time.localtime()) logname = f"{log_file}-{date}" quant = Quant(utils.get_params(fname), logname, father) quant.run() elif fname: print(f"运行指定配置文件{fname}") quant = Quant(utils.get_params(fname),father=father) quant.run() else: print("缺少指定参数 运行默认配置文件") fname = 'config.toml' quant = Quant(utils.get_params(fname),father=father) quant.run() else: fname = 'config.toml' quant = Quant(utils.get_params(fname)) quant.run()