import asyncio from aiohttp import web import traceback import time, csv import strategy as strategy import utils import model import logging, logging.handlers import signal import os, json, sys import predictor_new import backtest import random import psutil import ujson import broker from decimal import Decimal VERSION = utils.VERSION def timeit(func): def wrapper(*args, **kwargs): nowTime = time.time() res = func(*args, **kwargs) spend_time = time.time() - nowTime spend_time = round(spend_time * 1000, 5) print(f'{func.__name__} 耗时 {spend_time} ms') return res return wrapper class Quant: def __init__(self, params:model.Config, logname="test_logname", father=1): print('############### 超级无敌韭菜收割机 ################') print(f'>>> 版本 {VERSION} <<<') print('*** 当前配置') self.params = params for p in self.params.__dict__: print('***', p, ' => ', getattr(self.params, p)) print('##################################################') self.logger = self.get_logger(logname) self.csvname = logname + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) pid = os.getpid() self.pid_start_time = time.time() self.logger.info(f"进程号{pid} 启动时间{self.pid_start_time}") ##### 绑定cpu cpu_count = psutil.cpu_count() print("检测cpu核心负载") cpu_used_pct = [0.0 for _ in range(cpu_count)] for _ in range(random.randint(5,15)): r = psutil.cpu_times_percent(percpu=True) for i in range(cpu_count): cpu_used_pct[i] += int(r[i].user) time.sleep(0.1) print(cpu_used_pct) cpu_id = cpu_used_pct.index(min(cpu_used_pct)) print(f"当前负载最低的cpu为:{cpu_id}") self.process = psutil.Process(pid) print(f"核心数{cpu_count} 目标绑定cpu:{cpu_id}") os.system(f"taskset -cp {cpu_id} {pid}") print("调整系统调度优先级为最高等级") if 'win' not in sys.platform: print(os.nice(-20)) #### cpu 内存 平均占用 self.cpu_ema = 0.0 self.mm_ema = 0.0 ##### self.acct_name = self.params.account_name self.symbol = self.params.pair self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() if 1: ### 使用uvloop if 'win' not in sys.platform: print('采用高速事件循环库') import uvloop self.loop = uvloop.new_event_loop() else: print('采用普通事件循环库') self.loop = asyncio.get_event_loop() else: ### 使用原生loop self.loop = asyncio.get_event_loop() self.strategy = strategy.Strategy(self.params, is_print=1) ###### 判断启动方式 self.father = father print(f"父进程标识 {self.father}") ##### 现货底仓 hold_coin = float(self.params.hold_coin) self.hold_coin = utils.clip(hold_coin, 0.0, 10000.0) ##### 本地状态量 self.data = dict() self.total_equity = 0.0 self.local_orders = dict() # 本地挂单表 self.local_orders_backup = dict() # 本地订单缓存队列 self.local_orders_backup_cid = [] # 本地订单缓存cid队列 self.handled_orders_cid = [] # 本地已处理cid缓存队列 self.local_profit = 0.0 self.local_cash = 0.0 # 本地U保证金 self.local_coin = 0.0 # 本地币保证金 self.local_position = model.Position() self.local_position_by_orders = model.Position() self.local_buy_amount = 0.0 self.local_sell_amount = 0.0 self.local_buy_value = 0.0 self.local_sell_value = 0.0 self.local_cancel_log = dict() self.interval = float(self.params.interval) self.exchange = self.params.exchange self.tradeMsg = model.TraderMsg() self.exit_msg = "正常退出" self.save = int(self.params.save) # 保存行情数据 self.logger.info(f"实时行情数据记录开关:{self.save}") # 仓位检查结果序列 self.position_check_series = [] # 止损大小 self.stoploss = float(self.params.stoploss) # 资金使用率 self.used_pct = float(self.params.used_pct) #使用资金比例 # 启停信号 0 表示运行 大于1开始倒计时 1时停机 self.mode_signal = 0 # 交易盘口订单流更新时间 self.trade_order_update_time = time.time() # onTick触发时间记录 self.on_tick_event_time = time.time() # 盘口ticker depth信息 self.tickers = dict() self.depths = dict() # 行情更新延迟监控 self.market_update_time = dict() self.market_update_interval = dict() # 参考盘口名称 refex = self.params.refexchange refpair = self.params.refpair if len(refex) != len(refpair): self.logger.error("参考盘口数不等于参考品种数 退出") raise Exception("参考盘口数不等于参考品种数 退出") self.ref_num = len(refex) self.ref_name = [] for i in range(self.ref_num): if refex[i] not in broker.exchange_lists: self.logger.error("出现不支持的参考盘口") raise Exception("出现不支持的参考盘口") name = refex[i] + '@' + refpair[i] + '@ref' self.ref_name.append(name) self.tickers[name] = dict() self.depths[name] = list() self.market_update_time[name] = 0.0 self.market_update_interval[name] = 0.0 # 参考盘口tick更新时间 # 服务器私有ip地址检查 ipList = utils.get_local_ip_list() ipListNum = len(ipList) # if int(self.params.ip) >= ipListNum: # raise Exception("指定私有ip地址序号不存在") # 用于创建ws实例 name = self.exchange+'@'+self.params.pair self.trade_name = name self.market_update_time[name] = 0.0 self.market_update_interval[name] = 0.0 self.tickers[name] = dict() self.depths[name] = list() cp = model.ClientParams() cp.name = self.trade_name cp.pair = self.params.pair cp.access_key = self.params.access_key cp.secret_key = self.params.secret_key cp.pass_key = self.params.pass_key cp.interval = self.params.interval cp.broker_id = self.params.broker_id cp.debug = self.params.debug cp.proxy = self.params.proxy cp.ip = int(self.params.ip) self.ws = broker.newWs(self.exchange)( params=cp, colo=int(self.params.colo), is_print=0, ) self.ws.logger = self.logger self.ready = 0 # rest实例 self.rest = broker.newRest(self.exchange)(cp, colo=int(self.params.colo)) self.ws_ref = dict() # 参考盘口 ws 实例 for i in range(self.ref_num): cp = model.ClientParams() cp.name = self.ref_name[i] cp.pair = self.params.refpair[i] cp.proxy = self.params.proxy cp.interval = self.params.interval cp.ip = int(self.params.ip) exchange = self.params.refexchange[i] if exchange not in broker.exchange_lists: self.logger.error("参考盘口名称错误 退出") return _colo = 0 if self.params.refexchange[i] == self.params.exchange and \ self.params.refpair[i] == self.params.pair and int(self.params.colo): _colo = 1 self.ws_ref[self.ref_name[i]] = broker.newWs(exchange)(cp, colo=_colo) self.ws_ref[self.ref_name[i]].callback['onTicker']=self.update_ticker self.ws_ref[self.ref_name[i]].callback['onDepth']=self.update_depth self.ws_ref[self.ref_name[i]].callback['onTrade'] = self.update_trade self.ws_ref[self.ref_name[i]].logger = self.logger # 添加回调 self.ws.callback = { 'onTicker':self.update_ticker, 'onDepth':self.update_depth, 'onPosition':self.update_position, 'onEquity':self.update_equity, 'onOrder':self.update_order, 'onTrade':self.update_trade, 'onExit':self.update_exit, } self.rest.callback = { 'onTicker':self.update_ticker, 'onDepth':self.update_depth, 'onPosition':self.update_position, 'onEquity':self.update_equity, 'onOrder':self.update_order, 'onTrade': self.update_trade, 'onExit':self.update_exit, } self.rest.logger = self.logger # 配置策略 self.strategy.logger = self.logger # 配置定价模型 price_alpha = [] for i in self.params.refpair: # 交易1000shib 参考 shib if '1000' in self.params.pair and '1000' not in i: price_alpha.append(1000.0) # 交易shib 参考 1000shib elif '1000' not in self.params.pair and '1000' in i: price_alpha.append(0.001) else: # 交易shib 参考 shib price_alpha.append(1.0) self.logger.info(f'价格系数{price_alpha}') self.Predictor = predictor_new.PredictorNew(ref_name=self.ref_name, alpha=price_alpha, gamma=float(self.params.gamma)) # 初始化参数 self.strategy.trade_open_dist = float(self.params.open) self.strategy.trade_close_dist = float(self.params.close) # 在线训练 self.backtest = int(self.params.backtest) self.logger.info(f'在线训练开关 {self.backtest}') #### time.sleep(3) def get_logger(self, logname): '''日志模块''' logger = logging.getLogger(__name__) # # log flag # if int(self.params.log): # log_level = logging.DEBUG # logger.setLevel(log_level) # # log to txt # formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') # if logname == None: logname = "log" # handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8') # handler.setLevel(log_level) # handler.setFormatter(formatter) # # log to console # console = logging.StreamHandler() # console.setLevel(logging.INFO) # # add # logger.addHandler(handler) # logger.addHandler(console) # else: log_level = logging.INFO logger.setLevel(log_level) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') if logname == None: logname = "log" handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8') handler.setLevel(log_level) handler.setFormatter(formatter) # add logger.addHandler(handler) logger.info('开启日志记录') return logger def update_order(self, data): self.loop.create_task(self._update_order(data)) async def _update_order(self, data): ''' 更新订单 首先直接复写本地订单 1、如果是开仓单 如果新增: 增加本地订单 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位 如果成交: 删除本地订单 发送平仓订单 修改本地仓位 2、如果是平仓单 如果新增: 增加本地订单 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位 如果成交: 删除本地订单 修改本地仓位 NEW 可以从 ws / rest 来 REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单 为了防止下单失败依然有订单成交 本地需要做一个缓存 ''' try: # 触发订单更新 self.trade_order_update_time = time.time() # 新增订单推送 仅需要cid oid信息 if data['status'] == 'NEW': # 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控) if data['client_id'] in self.local_orders: self.local_orders[data['client_id']]["order_id"] = data['order_id'] self.local_orders[data['client_id']]["localtime"] = time.time() # 完成订单推送 仅需要cid filled filled_size信息 elif data['status'] == 'REMOVE': # 如果在撤单记录中 说明此订单结束生命周期 可以移除记录 if data["client_id"] in self.local_cancel_log: del(self.local_cancel_log[data["client_id"]]) # 在cid缓存队列中 说明是本策略的订单 if data["client_id"] in self.local_orders_backup: # 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算 if data['client_id'] not in self.handled_orders_cid: # 添加进已处理队列 self.handled_orders_cid.append(data["client_id"]) # 提取成交信息 方向 价格 量 filled = data["filled"] side = self.local_orders_backup[data['client_id']]["side"] if "filled_price" in data: if data["filled_price"] > 0.0: filled_price = data["filled_price"] else: filled_price = self.local_orders_backup[data['client_id']]["price"] else: filled_price = self.local_orders_backup[data['client_id']]["price"] # 只有开仓成交才触发onPosition # 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况 if filled > 0: if "spot" in self.exchange:# 如果是现货交易 还需要修改equity ### 现货必须考虑fee 买入fee单位为币 卖出fee单位为u fee = data["fee"] ### 现货订单流仓位计算 if side == "kd": # buy self.local_buy_amount += filled - fee self.local_buy_value += (filled - fee) * filled_price new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)) - Decimal(str(fee))) if new_long_pos == 0.0: self.local_position_by_orders.longAvg = 0.0 self.local_position_by_orders.longPos = 0.0 else: self.local_position_by_orders.longAvg = \ (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos self.local_position_by_orders.longPos = new_long_pos self.local_cash -= filled * filled_price self.local_coin += filled - fee elif side == "pd": # sell self.local_sell_amount += filled self.local_sell_value += filled * filled_price self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg) new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled))) if new_long_pos == 0.0: self.local_position_by_orders.longAvg = 0.0 self.local_position_by_orders.longPos = 0.0 else: self.local_position_by_orders.longPos = new_long_pos self.local_cash += filled * filled_price - fee self.local_coin -= filled elif side == "pk": # buy self.local_buy_amount += filled - fee self.local_buy_value += (filled - fee) * filled_price self.local_profit += filled * (self.local_position_by_orders.shortAvg - filled_price) new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)) - Decimal(str(fee))) if new_short_pos == 0.0: self.local_position_by_orders.shortAvg = 0.0 self.local_position_by_orders.shortPos = 0.0 else: self.local_position_by_orders.shortPos = new_short_pos self.local_cash -= filled * filled_price self.local_coin += filled - fee elif side == "kk": # sell self.local_sell_amount += filled self.local_sell_value += filled * filled_price new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled))) if new_short_pos == 0.0: self.local_position_by_orders.shortAvg = 0.0 self.local_position_by_orders.shortPos = 0.0 else: self.local_position_by_orders.shortAvg = \ (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos self.local_position_by_orders.shortPos = new_short_pos self.local_cash += filled * filled_price - fee self.local_coin -= filled else: self.logger.error(f"错误的仓位方向{side}") else: ### 合约订单流仓位计算 if side == "kd": self.local_buy_amount += filled self.local_buy_value += filled * filled_price new_long_pos = (self.local_position_by_orders.longPos + filled) if new_long_pos == 0.0: self.local_position_by_orders.longAvg = 0 self.local_position_by_orders.longPos = 0 else: self.local_position_by_orders.longAvg = \ (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled))) elif side == "kk": self.local_sell_amount += filled self.local_sell_value += filled * filled_price new_short_pos = (self.local_position_by_orders.shortPos + filled) if new_short_pos == 0.0: self.local_position_by_orders.shortAvg = 0 self.local_position_by_orders.shortPos = 0 else: self.local_position_by_orders.shortAvg = \ (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled))) elif side == "pd": self.local_sell_amount += filled self.local_sell_value += filled * filled_price self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg) self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled))) if self.local_position_by_orders.longPos == 0:self.local_position_by_orders.longAvg = 0 elif side == "pk": self.local_buy_amount += filled self.local_buy_value += filled * filled_price self.local_profit += filled * (self.local_position_by_orders.shortAvg-filled_price) self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled))) if self.local_position_by_orders.shortPos == 0:self.local_position_by_orders.shortAvg = 0 else: self.logger.error(f"错误的仓位方向{side}") # 统计合约交易手续费 正fee为扣手续费 负fee为返佣 if 'fee' in data: self.local_profit -= data['fee'] self.logger.debug('更新推算仓位'+str(self.local_position_by_orders.__dict__)) ### self._print_local_trades_summary() # 每次有订单变动就触发一次策略 if self.mode_signal == 0 and self.ready: ### 更新交易数据 self.update_trade_msg() ### 触发策略挂单逻辑 # 更新策略时间 self.strategy.local_time = time.time() orders = self.strategy.onTime(self.tradeMsg) # if (('Limits_open' in orders and len(orders['Limits_open']) != 0) or # ('Limits_close' in orders and len(orders['Limits_close']) != 0)): # self.logger.info("--------------------------------update_local_order订单指令----------------------------") # self.logger.info(orders) # self.logger.info("-------------------------------------------end--------------------------------------") ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onOrder") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) else: self.logger.debug(f"订单已经参与过仓位计算 拒绝重复进行计算{data['client_id']}") else: self.logger.debug(f"订单不属于本策略 拒绝进行仓位计算{data['client_id']}") # 移除本地订单 if data["client_id"] in self.local_orders: self.logger.debug(['删除本地订单', data["client_id"]]) del(self.local_orders[data["client_id"]]) else: self.logger.debug(['该订单不在本地挂单表中', data["client_id"]]) else: print(data) self.logger.debug(f"未知的订单事件类型 {data}") except Exception as e: print("处理订单推送出错:"+str(e)) self.logger.error("处理订单推送出错:"+str(e)) self.logger.error(traceback.format_exc()) self.exit_msg="处理订单推送出错" self.stop() def _update_local_orders(self, orders): """ 本地记录所有报单信息 """ try: for i in orders: if "Limits" in i: for j in orders[i]: order_info = dict() order_info['symbol'] = self.symbol order_info['amount'] = float(j[0]) order_info['side'] = j[1] order_info['price'] = float(j[2]) order_info['client_id'] = j[3] order_info['filled_price'] = 0 order_info['filled'] = 0 order_info['order_id'] = "" order_info['localtime'] = self.strategy.local_time order_info['createtime'] = self.strategy.local_time self.local_orders[j[3]] = order_info # 本地挂单表 self.logger.debug(['新增本地订单', order_info]) self.local_orders_backup[j[3]] = order_info # 本地缓存表 self.local_orders_backup_cid.append(j[3]) # 本地缓存cid表 if 'Cancel' in i: # 记录撤单次数 cid = orders[i][0] if cid in self.local_cancel_log: self.local_cancel_log[cid] += 1 else: self.local_cancel_log[cid] = 0 # 清除过于久远的历史记录 if len(self.local_orders_backup_cid) > 9999: cid = self.local_orders_backup_cid[0] # 判断是否超过1个小时 如果超过则移除历史记录 if cid in self.local_orders_backup: if time.time() - self.local_orders_backup[cid]["localtime"] > 3600: del(self.local_orders_backup[cid]) del(self.local_orders_backup_cid[0]) if len(self.handled_orders_cid) > 9999: del(self.handled_orders_cid[0]) except: self.logger.error("本地记录订单信息出错") self.logger.error(traceback.format_exc()) self.exit_msg="本地记录订单信息出错" self.stop() def _not_empty(self, orders): '''检查指令是否不为空''' if isinstance(orders, dict): for order_name in orders: if "Cancel" in order_name or "Check" in order_name: return 1 elif "Limits_open" in order_name: if len(orders["Limits_open"]) > 0: return 1 elif "Limits_close" in order_name: if len(orders["Limits_close"]) > 0: return 1 return 0 def _print_local_trades_summary(self): '''计算本地累计利润''' ### local_buy_amount = round(self.local_buy_amount,5) local_buy_value = round(self.local_buy_value,5) local_sell_amount = round(self.local_sell_amount,5) local_sell_value = round(self.local_sell_value,5) local_profit = 0.0 if isinstance(self.strategy.mp, float): unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp realized = local_sell_value - local_buy_value local_profit = round(unrealized+realized,5) self.strategy.local_profit = local_profit ### msg = f"买量{local_buy_amount} 卖量{local_sell_amount} 买额{local_buy_value} 卖额{local_sell_value} 利润 {local_profit}" self.logger.info(msg) def update_position(self, data): ''' 更新仓位信息 ''' if data != self.local_position: self.local_position = data self.logger.debug('更新本地仓位'+str(self.local_position.__dict__)) """ 2023-2-22 用create_task去执行,会延迟,占用越大,延迟越大,可能会延迟100ms计算 """ def update_ticker(self, data): ''' 增加onticker撤单 可能会导致平仓难度加大 ''' self.loop.create_task(self._update_ticker(data)) def update_depth(self, data): self.loop.create_task(self._update_depth(data)) async def _update_ticker(self, data): ''' update ticker infomation ''' name = data['name'] if name == self.ref_name[0]: min_price = (data['bp'] + data['ap']) * 0.5 self.Predictor.market_update(min_price, data['time']) # 记录tick更新时间 # self.market_update_time[name] = time.time() self.tickers[name] = data ### 判断是否需要触发ontick if name == self.ref_name[self.strategy.ref_index]: pass elif name == self.trade_name: pass else: pass def update_trade(self, data): self.loop.create_task(self._update_trade(data)) async def _update_trade(self, data): print(f"trade数据:{data}") self.Predictor.trading_intensity.c_register_trade(data) # @utils.timeit async def _update_depth(self, data): ''' update orderbook infomation ''' name = data['name'] now_time = time.time() if self.market_update_time[name] == 0.0: pass else: interval = now_time - self.market_update_time[name] if self.market_update_interval[name] == 0.0: self.market_update_interval[name] = interval else: self.market_update_interval[name] = self.market_update_interval[name]*0.999 + interval*0.001 self.market_update_time[name] = now_time ### 初始化depths if self.depths[name] == list(): self.depths[name] = data['data'] ### 判断是否需要触发ondepth # 如果是交易盘口 if name == self.trade_name: ### 更新depths self.depths[name] = data['data'] # 允许交易 if self.mode_signal == 0 and self.ready: ### 聚合行情处理 self.on_agg_market() ### 判断是否为当前跟踪的盘口 elif name == self.ref_name[self.strategy.ref_index]: ### 判断是否需要触发ontick 对行情进行过滤 ### 过滤条件 价格变化很大 时间间隔很长 flag = 0 if abs(data['data'][utils.BP_INDEX] - self.depths[name][utils.BP_INDEX])/data['data'][utils.BP_INDEX] > 0.0002 or \ abs(data['data'][utils.AP_INDEX] - self.depths[name][utils.AP_INDEX])/data['data'][utils.AP_INDEX] > 0.0002 or \ time.time() - self.on_tick_event_time > 0.05: ### 允许交易 flag = 1 ### 更新ontick触发时间记录 self.on_tick_event_time = time.time() ### 更新depths self.depths[name] = data['data'] # 允许交易 if self.mode_signal == 0 and self.ready and flag: ### 更新交易数据 self.update_trade_msg() ### 触发事件撤单逻辑 # 更新策略时间 self.strategy.local_time = time.time() # 产生交易信号 orders = self.strategy.onTime(self.tradeMsg) # if (('Limits_open' in orders and len(orders['Limits_open']) != 0) or # ('Limits_close' in orders and len(orders['Limits_close']) != 0)): # self.logger.info("--------------------------------_update_depth订单指令--------------------------------") # self.logger.info(orders) # self.logger.info("-------------------------------------------end--------------------------------------") ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onTick") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) else: pass # @timeit async def real_time_back_test(self, data): ''' 按照长短期回测利润选择参数 优先按长期回测利润选参数 如果找不到就 再按短期回测利润选参数 如果还找不到就 使用默认参数 如果默认参数亏损就触发冷静期 ''' now_time = time.time() await asyncio.sleep(0.005) for i in self.backtest_tasks: i["backtest_engine"].backtest_time = now_time i["backtest_engine"].run_by_tick(data) def choose_params(self): ''' 按照长短期回测利润选择参数 优先按长期回测利润选参数 如果找不到就 再按短期回测利润选参数 如果还找不到就 使用默认参数 如果默认参数亏损就触发冷静期 ''' profits = [] for i in self.backtest_tasks: # 获取绩效信息 e = i["backtest_engine"].equity # 最终净值 # 计算标准化利润 p = (e-self.backtest_start_cash) / self.backtest_start_cash \ / self.backtest_look_length * self.tick_profit_to_daily # 有一定成交次数的回测结果才有代表性 持仓太久的参数禁止使用 _trade_num = i['backtest_engine'].trade_num _avg_hold_time = i['backtest_engine'].avg_hold_time _equity_high = i['backtest_engine'].equity_high # 排除交易次数太少的参数 if i['open'] <= 0.002: if _trade_num < 10: p = 0.0 # 排除长期持仓的参数 if _avg_hold_time > 600: p = 0.0 # 排除近期回撤较大的参数 if _equity_high > e*1.01: p = 0.0 profits.append(p) #利润 ############## 重置回测 # if _trade_num > 200: # i["backtest_engine"].trade_num = 0 # i["backtest_engine"].equity = self.backtest_start_cash # 盈利参数个数不能太少 防止孤岛参数 win_num = 0 for i in profits: if i > 0.0: win_num += 1 cond1 = win_num > self.backtest_num*0.1 cond2 = win_num > 2 cond_win = cond1 and cond2 if cond_win: # 按最优回测结果调整参数 max_profit = max(profits) max_index = profits.index(max_profit) self.strategy.trade_open_dist = self.backtest_tasks[max_index]["open"] self.strategy.trade_close_dist = self.backtest_tasks[max_index]["close"] self.strategy.ref_index = self.backtest_tasks[max_index]["index"] self.strategy.post_side = self.backtest_tasks[max_index]["side"] self.strategy.predict_alpha = self.backtest_tasks[max_index]["alpha"] # 检查是否需要关闭回测 # if self.strategy.ready == 1: # self.backtest = 0 else: # 如果没有符合条件的盈利参数 self.strategy.trade_open_dist = 0.01 self.strategy.trade_close_dist = 0.00001 self.strategy.ref_index = 0 self.strategy.post_side = 0 self.strategy.predict_alpha = 0 # 检查是否需要关闭回测 # if self.strategy.ready == 1: # self.backtest = 0 # self.exit_msg = "未找到合适参数 停机" # self.stop() return def update_equity(self, data): ''' 更新保证金信息 合约一直更新 现货只有当出现异常时更新 ''' if "spot" in self.exchange: pass else: self.local_cash = data[self.quote] * self.used_pct def update_exit(self, data): ''' 底层触发停机 ''' self.exit_msg = data self.stop() def get_all_market_data(self): ''' 只能定时触发 组合市场信息=交易盘口+参考盘口 ''' market = [] data = self.ws._get_data()["data"] market += data for i in self.ref_name: data = self.ws_ref[i]._get_data()["data"] market += data # handle save real market data if self.save: with open(f'./{self.csvname}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(market) return market async def before_trade(self): ####### 启动ws ####### # 启动交易ws # 当开启回测时才订阅交易盘口的成交流 _sub_trade = int(self.params.backtest) _sub_fast = int(self.params.fast) # TODO 这是task1,要在这个交易所交易,pycharm定位有问题,这个ws不一定指向binance self.loop.create_task(self.ws.run(is_auth=1, sub_trade=_sub_trade, sub_fast=0)) # TODO 这是task n,用来做参考 for i in self.ref_name: # 启动参考ws 参考盘口使用fast行情性能消耗更大 使用普通行情可以节省性能 self.loop.create_task(self.ws_ref[i].run(is_auth=0, sub_trade=0, sub_fast=_sub_fast)) await asyncio.sleep(1) ###### 做交易前准备工作 ###### # 买入平台币 # TODO v9情况下买入平台币会怎么样? await self.rest.buy_token() await asyncio.sleep(1) # 清空挂单和仓位 await self.rest.check_position(hold_coin=self.hold_coin) await asyncio.sleep(1) # 获取市场信息 await self.rest.before_trade() await asyncio.sleep(1) # 获取价格信息 ticker = await self.rest.get_ticker() mp = ticker['mp'] # 获取账户信息 await asyncio.sleep(1) await self.rest.get_equity() # 初始资金 start_cash = self.rest.cash_value * self.used_pct start_coin = self.rest.coin_value * self.used_pct if start_cash == 0.0 and start_coin == 0.0: self.exit_msg = f"初始为零 cash: {start_cash} coin: {start_coin}" self.stop() self.logger.info(f"初始cash: {start_cash} 初始coin: {start_coin}") # 初始化策略基础信息 if isinstance(mp, float): if mp <= 0.0: self.exit_msg = f"初始价格获取错误 {mp}" self.stop() else: print(f"初始价格为 {mp}") else: self.exit_msg = f"初始价格获取错误 {mp}" self.stop() self.strategy.mp = mp self.strategy.start_cash = start_cash self.strategy.start_coin = start_coin self.strategy.start_equity = start_cash + start_coin * mp self.strategy.max_equity = self.strategy.start_equity self.strategy.equity = self.strategy.start_equity self.strategy.total_amount = self.strategy.equity * self.strategy.leverrate / self.strategy.mp self.strategy.stepSize = self.rest.stepSize if self.rest.stepSize < 1.0 else int(self.rest.stepSize) self.strategy.tickSize = self.rest.tickSize if self.rest.tickSize < 1.0 else int(self.rest.tickSize) if self.strategy.stepSize == None or self.strategy.tickSize == None: self.exit_msg = f"交易精度未正常获取 stepsize: {self.strategy.stepSize} ticksize: {self.strategy.tickSize}" self.stop() else: self.logger.info(f"数量精度{self.strategy.stepSize}") self.logger.info(f"价格精度{self.strategy.tickSize}") grid = float(self.params.grid) # 计算下单数量 if "spot" in self.exchange: long_one_hand_value = start_cash * float(self.params.leverrate) / grid short_one_hand_value = start_coin * mp * float(self.params.leverrate) / grid long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) else: long_one_hand_value = start_cash * float(self.params.leverrate) / grid short_one_hand_value = start_cash * float(self.params.leverrate) / grid long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize))) # 检查是否满足最低交易要求 print(f"最低单手交易下单量为 buy: {long_one_hand_amount} sell: {short_one_hand_amount}") if (long_one_hand_amount == 0 and short_one_hand_amount == 0) or (long_one_hand_value < 20 and short_one_hand_value < 20): self.exit_msg = f"初始下单量太少 buy: {long_one_hand_amount} sell: {short_one_hand_amount}" self.stop() # 初始化调度器 self.local_cash = start_cash self.local_coin = start_coin # 配置在线训练 if self.backtest: # 设置策略默认参数 self.strategy.trade_close_dist = 0.00001 self.strategy.trade_open_dist = 0.01 self.backtest_look_length = 86400 / self.interval # 回测区间足够长 self.backtest_tasks = list() self.tick_profit_to_daily = (86400/self.interval) self.backtest_start_cash = 1000000.0 # 备选参数 open_list, close_list, alpha_list = utils.get_backtest_set(self.base) if 'spot' in self.exchange: side_list = [] if long_one_hand_amount > 0: side_list.append(1) if short_one_hand_amount > 0: side_list.append(-1) if 1 in side_list and -1 in side_list: side_list.append(0) else: side_list = [-1,0,1] side_list_allow = [] for s in side_list: if s in utils.POST_SIDE_LIMIT: side_list_allow.append(s) side_list = side_list_allow for _open in open_list: for _side in side_list: for _close in close_list[_open]: for _index in range(self.ref_num): for _alpha in alpha_list: task = dict() st = strategy.Strategy(self.params, is_print=0) st.leverrate = 1.0 st.trade_open_dist = _open st.trade_close_dist = _close st.predict_alpha = _alpha st.ref_index = _index st.post_side = _side st.exchange = "dummy_usdt_swap" st.local_start_time = 0.0 bt = backtest.Backtest(st, is_plot=0) bt.start_cash = self.backtest_start_cash task["backtest_engine"] = bt task["open"] = _open task["close"] = _close task["index"] = _index task["side"] = _side task["alpha"] = _alpha self.backtest_tasks.append(task) backtest_num = len(self.backtest_tasks) self.backtest_num = backtest_num self.logger.info(f'在线模拟撮合数量{backtest_num}') self.logger.info(f'当前为在线训练模式 需预热{utils.BACKTEST_PREHOT_SECOND}秒 请耐心等候...') else: self.logger.info('当前为指定参数模式...') ###### 交易前准备就绪 可以开始交易 ###### self.loop.create_task(self.rest.go()) self.loop.create_task(self.on_timer()) self.loop.create_task(self._run_server()) self.loop.create_task(self.run_stratey()) #self.loop.create_task(self.post_loop()) #改 self.loop.create_task(self.early_stop_loop()) def update_trade_msg(self): # 更新保证金 self.tradeMsg.cash = round(self.local_cash,10) self.tradeMsg.coin = round(self.local_coin,10) # 使用本地推算仓位 self.tradeMsg.position = self.local_position_by_orders # 更新订单 self.tradeMsg.orders = self.local_orders ### 更新 ref ref_tickers = [] for i in self.ref_name: ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']]) ref_price = self.Predictor.Get_ref(ref_tickers) if len(ref_price) == 0: return # logger.info('ref_price={}, market={}, predict={}'.format( # self.tradeMsg.ref_price, self.tradeMsg.market, self.tradeMsg.predict)) self.tradeMsg.ref_price = ref_price async def server_handle(self, request): '''中控数据接口''' if 'spot' in self.exchange: pos = self.local_position_by_orders.longPos - self.local_position_by_orders.shortPos else: pos = self.local_position.longPos - self.local_position.shortPos if pos > 0.0: entryPrice = self.local_position_by_orders.longAvg elif pos < 0.0: entryPrice = self.local_position_by_orders.shortAvg else: entryPrice = 0 return web.Response(body=json.dumps({ "now_balance": round(self.strategy.equity/self.used_pct, 4), #钱包余额 "unrealized_pn_l": round(self.local_profit, 4), #未实现盈利 "pos": round(pos, 8), #持仓数量 "entry_price": round(entryPrice, 8), #开仓价格 "now_price": round(self.strategy.mp, 8), #当前价格 })) async def change(self, request): '''中控台修改参数''' try: data = await request.json() if "stop" in data: self.logger.warning('中控停机') self.exit_msg = '中控停机' self.stop() return web.Response(text=f"停机成功") ip = request.remote print(f'从{ip}收到更新参数请求',data) if isinstance(data, str): data = json.loads(data) if self.backtest == 1: return web.Response(text="自动调参模式不允许手动修改参数") else: open = float(data['open']) close = float(data['close']) self.strategy.trade_open_dist = open self.strategy.trade_close_dist = close return web.Response(text=f"参数修改成功 {open} {close}") except Exception as e: return web.Response(text=f"参数修改失败 {e}") # @utils.timeit def check_risk(self): '''检查风控''' if self.strategy.start_cash == 0.0: print("请检查交易账户余额") return 0 if isinstance(self.strategy.mp, float): pass else: print("请检查最新价格") return 0 ############ # print("当前线程数",self.process.num_threads()) ###### 资源风控0 ###### cpu_pct = psutil.cpu_times_percent().user self.cpu_ema = self.cpu_ema * 0.8 + cpu_pct * 0.2 # print(f"cpu占用 {cpu_pct}") if self.cpu_ema > 95: msg = f"cpu占用过高 {self.cpu_ema} 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() mm_pct = psutil.virtual_memory().percent self.mm_ema = self.mm_ema * 0.8 + mm_pct * 0.2 # print(f"内存占用 {mm_pct}") if self.mm_ema > 95: msg = f"内存占用过高 {self.mm_ema} 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 回撤风控1 ###### if "spot" not in self.exchange: draw_back = 1-self.strategy.equity/self.strategy.max_equity if draw_back > self.stoploss: msg = f"{self.acct_name} 总资金吊灯回撤{draw_back} 当前{self.strategy.equity} 最高{self.strategy.max_equity} 触发止损 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 回撤风控2 ###### draw_back = self.local_profit/self.strategy.start_equity if draw_back < -self.stoploss: msg = f"{self.acct_name} 交易亏损 触发止损 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 报单延迟风控 ###### if self.rest.avg_delay > 5000: # 平均延迟允许上限 5000ms msg = f"{self.acct_name} 延迟爆表 触发风控 准备停机" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 仓位异常风控 ###### ### 合约60秒更新一次绝对仓位 ### # 连续5分钟仓位不正确就停机 # 5 * 60 = 300 300/10 = 30 diff_pos = max(abs(self.local_position.longPos - self.local_position_by_orders.longPos),abs(self.local_position.shortPos - self.local_position_by_orders.shortPos)) if "spot" not in self.exchange: diff_pos_value = diff_pos * self.strategy.mp if diff_pos_value > self.strategy._min_amount_value: msg = f"{self.acct_name} ***发现仓位异常*** 推算{self.local_position_by_orders.__dict__} 本地{self.local_position.__dict__}" print(msg) self.logger.warning(msg) self.position_check_series.append(1) else: self.position_check_series.append(0) if len(self.position_check_series) > 30: del(self.position_check_series[0]) if sum(self.position_check_series) >= 30: msg = f"{self.acct_name} 合约连续检查本地仓位和推算仓位不相符 退出" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 下单异常风控 ###### if self.strategy.total_amount == 0.0: msg = f"{self.acct_name} 开仓量为零 退出" print(msg) self.logger.warning(msg) self.exit_msg = msg self.stop() ###### 行情更新异常风控 ###### for name in self.ref_name: delay = round((time.time() - self.market_update_time[name]) * 1000, 3) if delay > utils.MARKET_DELAY_LIMIT: # thre msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出" self.logger.error(msg) self.exit_msg = msg self.stop() for name in [self.trade_name]: delay = round((time.time() - self.market_update_time[name]) * 1000, 3) if delay > utils.MARKET_DELAY_LIMIT: # thre msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 订单异常风控 ###### for cid in self.local_orders: if time.time() - self.local_orders[cid]["localtime"] > 300: # 订单长时间停留 怀疑漏单 但未必一定漏 5min msg = f"{self.acct_name} cid:{cid} 订单停留过久 怀疑异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 持仓均价异常风控 ###### if isinstance(self.strategy.long_pos_bias, float): # 偏离mp较大 且持仓较大 说明出现异常 if self.strategy.long_hold_value > 2*self.strategy._min_amount_value: if self.strategy.long_pos_bias > 4.0 or self.strategy.long_pos_bias < -2.0: msg = f"{self.acct_name} long_pos_bias:{self.strategy.long_pos_bias} 持仓均价异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() if isinstance(self.strategy.short_pos_bias, float): # 偏离mp较大 且持仓较大 说明出现出现异常 if self.strategy.short_hold_value > 2*self.strategy._min_amount_value: if self.strategy.short_pos_bias > 4.0 or self.strategy.short_pos_bias < -2.0: msg = f"{self.acct_name} short_pos_bias:{self.strategy.short_pos_bias} 持仓均价异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 订单撤单异常风控 ###### for cid in self.local_cancel_log: if self.local_cancel_log[cid] > 300: msg = f"{self.acct_name} 订单长时间无法撤销 退出" self.logger.error(msg) self.exit_msg = msg self.stop() ###### 定价异常风控 ###### if self.strategy.ref_price is not None and self.strategy.ref_price != 0: if abs(self.strategy.ref_price-self.strategy.mp)/self.strategy.mp > 0.03: msg = f"{self.acct_name} 定价偏离过大 怀疑异常 退出" self.logger.error(msg) self.exit_msg = msg self.stop() async def exit(self, delay=0): '''退出操作''' try: self.logger.info(f"预约退出操作 delay:{delay}") if delay > 0: await asyncio.sleep(delay) self.logger.info(f"开始退出操作") self.logger.info("为避免api失效导致遗漏仓位 建议人工复查") await self.rest.check_position(hold_coin=self.hold_coin) # stop flag self.rest.stop_flag = 1 self.ws.stop_flag = 1 for i in self.ref_name: self.ws_ref[i].stop_flag = 1 # double check 需要延迟几秒以便等待更新数据 await asyncio.sleep(3) self.logger.info("双重检查遗漏仓位") await self.rest.check_position(hold_coin=self.hold_coin) self.logger.info(f'停机退出 停机原因 {self.exit_msg}') await asyncio.sleep(1) # 发送交易状态 await self._post_params() # 压缩行情文件 utils.csv_to_gz_and_remove() # close pid self.logger.info("退出进程") except: self.logger.error(traceback.format_exc()) finally: os._exit(0) async def on_timer(self): '''定期触发系统逻辑''' await asyncio.sleep(20) while 1: try: # 10秒检查一次风控 await asyncio.sleep(10) # 检查风控 self.check_risk() # stop if self.mode_signal == 1:return # 计算预估成交额 total_trade_value = self.local_buy_value + self.local_sell_value self.strategy.trade_vol_24h = round(total_trade_value / (time.time()-self.pid_start_time) * 86400 / 10000, 2) # 打印 if int(self.params.log): self.strategy._print_summary() # 打印行情延迟监控 self.logger.info('Rest 报单平均延迟 ' + str(self.rest.avg_delay) + 'ms ') self.logger.info('Rest 报单最高延迟 ' + str(self.rest.max_delay) + 'ms ') for name in self.market_update_interval: avg_interval = round(self.market_update_interval[name]*1e3, 2) self.logger.info(f'WS 盘口{name}行情 平均更新间隔 {avg_interval}ms') # 选择参数 if self.backtest: self.choose_params() except asyncio.CancelledError: print('定期循环任务取消') except: print("定时循环系统出错") self.logger.error(traceback.print_exc()) await asyncio.sleep(10) async def _post_params(self): '''推送交易信息''' profit = round(self.strategy.daily_return/self.strategy.leverrate,4) if time.time() - self.pid_start_time > utils.EARLY_STOP_SECOND * 0.5 or profit < 0.0: await utils._post_params( "http://wwww.khods.com:8888/post_params", self.params.proxy, ujson.dumps({ "pwd":"123456", "exchange":self.params.exchange, "pair":self.params.pair, "open":self.params.open, "close":self.params.close, "refexchange":self.params.refexchange[self.strategy.ref_index], "profit":profit, }) ) else: self.logger.info("不满足推送过滤条件 放弃推送参数") async def post_loop(self): '''定期触发交易信息推送''' await asyncio.sleep(30) _interval = 60 # 定期推送一次盈利情况 while 1: try: # 定期推送一次 await asyncio.sleep(_interval) # 发送交易状态 await self._post_params() except asyncio.CancelledError: print('post loop 循环任务取消') except: print("post loop 循环系统出错") self.logger.error(traceback.print_exc()) await asyncio.sleep(10) async def early_stop_loop(self): '''定期触发交易信息推送''' if self.father: self.logger.info(f'以父进程方式启动 关闭早停检测') return else: self.logger.info(f'以子进程方式启动 开启早停检测') await asyncio.sleep(30) _interval = utils.EARLY_STOP_SECOND _last_equity = self.strategy.start_equity _last_local_profit = 0.0 while 1: try: # 休眠 await asyncio.sleep(_interval) ###### 子进程早停风控 ###### self.logger.info(f'当前净值{self.strategy.equity} 上次检测时净值{_last_equity} 当前累积利润{self.local_profit} 上次检测时利润{_last_local_profit}') # 检查是否需要早停 没有成交 或者 亏损 if self.strategy.equity <= _last_equity or self.local_profit <= _last_local_profit: self.logger.info('触发早停条件 当零持仓时退出') # 没有持仓 for _ in range(30): await asyncio.sleep(5) if self.strategy.long_hold_value < self.strategy._min_amount_value and \ self.strategy.short_hold_value < self.strategy._min_amount_value: msg = f"{self.acct_name} 子进程盈利状况不理想 提前停机 退出" self.logger.error(msg) self.exit_msg = msg self.stop() # 更新上一次检测的净值 _last_equity = self.strategy.equity _last_local_profit = self.local_profit except asyncio.CancelledError: print('early stop 循环任务取消') except: print("early stop 循环系统出错") self.logger.error(traceback.print_exc()) await asyncio.sleep(10) def on_agg_market(self): ''' 处理聚合行情 1. 获取聚合行情 2. 更新预测器 3. 触发tick回测 ''' ### 更新聚合市场数据 agg_market = self.get_all_market_data() ### 更新聚合市场信息 self.tradeMsg.market = agg_market ### 更新预测器 self.Predictor.onTime(agg_market) ### 触发回测 if self.backtest: self.loop.create_task(self.real_time_back_test(self.tradeMsg)) async def run_stratey(self): ''' 定期触发策略 ''' print('定时触发器启动') # 准备交易 try: print('前期准备完成') await asyncio.sleep(10) while 1: try: # 时间预设 start_time = time.time() ### 是否准备充分 if self.ready: ### 更新交易信息集合 self.update_trade_msg() ### 触发策略 if self.mode_signal == 0: pass # # 更新策略时间 # self.strategy.local_time = time.time() # # 产生信号 # orders = self.strategy.onTime(self.tradeMsg) # ### 记录指令触发信息 # if self._not_empty(orders): # self.logger.debug("触发onTime") # self._update_local_orders(orders) # self.loop.create_task(self.rest.handle_signals(orders)) # self.logger.debug(orders) else: if self.mode_signal > 1:self.mode_signal -= 1 if self.mode_signal == 1:return # 触发策略 # 更新策略时间 self.strategy.local_time = time.time() # 获取信号 # TODO mode_signal∈[21, +无穷) 表示什么? if self.mode_signal > 20: # 先执行onExit orders = self.strategy.onExit(self.tradeMsg) ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onExit") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) # TODO mode_signal∈[2, 20] 表示什么? else: # 再执行onSleep orders = self.strategy.onSleep(self.tradeMsg) ### 记录指令触发信息 if self._not_empty(orders): self.logger.debug("触发onSleep") self._update_local_orders(orders) self.loop.create_task(self.rest.handle_signals(orders)) self.logger.debug(orders) ############################################################ else: self.check_ready() ### 计算耗时并进行休眠 pass_time = time.time()-start_time await asyncio.sleep(utils.clip(self.interval-pass_time, 0.0, 1.0)) except asyncio.CancelledError: print('策略触发任务取消') except: self.logger.error(traceback.format_exc()) traceback.print_exc() await asyncio.sleep(10) except asyncio.CancelledError: print('策略触发任务取消') except: self.logger.error(traceback.format_exc()) traceback.print_exc() await asyncio.sleep(10) def check_ready(self): ''' 判断初始数据是否齐全 ''' ### 检查 ticker 行情 for i in self.ref_name: if i not in self.tickers or self.tickers[i] == {}: print("参考盘口ticker未准备好") return else: if self.tickers[i]['bp'] == 0 or self.tickers[i]['ap'] == 0: print("参考盘口ticker未准备好") return if self.trade_name not in self.tickers or self.tickers[self.trade_name] == {}: print("交易盘口ticker未准备好") return else: if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0: print("交易盘口ticker未准备好") return ### 检查 market 行情 all_market = self.get_all_market_data() if len(all_market) != utils.LEN*(1+self.ref_num): print("聚合行情未准备好") return else: # 如果行情已经就绪 预热trademsg和predictor print("聚合行情准备就绪") self.tradeMsg.market = all_market self.Predictor.onTime(all_market) self.ready = 1 def stop(self): ''' 停机函数 mode_signal 不能小于80 前6秒用于maker平仓 后2秒用于撤maker平仓单 休眠2秒再执行check_position 避免卡单导致漏仓位 ''' self.logger.info(f'进入停机流程...') self.mode_signal = 80 # 等strategy onExit 彻底执行完毕 进入沉默状态之后 再进入exit 否则可能导致多处同时操作订单 # 尽量减少大仓位直接take平 self.loop.create_task(self.exit(delay=10)) async def _run_server(self): print('server正在启动...') for _ in range(30): await asyncio.sleep(5) if self.strategy.equity > 0.0:break app = web.Application() app.router.add_route('GET', '/account', self.server_handle) app.router.add_route('POST', '/change', self.change) try: self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False)) except: self.logger.error(f"Server启动失败") self.logger.error(traceback.format_exc()) self.exit_msg = "服务启动失败 停机退出" self.stop() def run(self): '''启动ws行情获取''' def keyboard_interrupt(s, f): self.logger.info("收到退出信号 准备关机") self.stop() try: signal.signal(signal.SIGINT, keyboard_interrupt) signal.signal(signal.SIGTERM, keyboard_interrupt) if 'win' not in sys.platform: signal.signal(signal.SIGKILL, keyboard_interrupt) signal.signal(signal.SIGQUIT, keyboard_interrupt) except: pass self.loop.create_task(self.before_trade()) # TODO 启动方式干嘛用的?为什么要判断? print(f'判断启动方式...') if self.father: print('以父进程方式启动 最大允许运行时间为30天') self.loop.create_task(self.exit(delay=60*60*24*30)) else: print('以子进程方式启动 最大允许运行时间为60分钟') self.loop.create_task(self.exit(delay=utils.CHILD_RUN_SECOND)) self.loop.run_forever() if __name__ == "__main__": if 0: utils.check_auth() if 0: utils.check_time() pnum = len(sys.argv) if pnum > 0: fname = None log_file = None pidnum = None father = 1 for i in range(pnum): print(f"第{i}个参数为:{sys.argv[i]}") if sys.argv[i] == '-c' or sys.argv[i] == '--c': fname = sys.argv[i+1] elif sys.argv[i] == '-h': print("帮助文档") elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file': log_file = sys.argv[i+1] elif sys.argv[i] == '-num' or sys.argv[i] == '--num': pidnum = sys.argv[i+1] elif sys.argv[i] == '-v' or sys.argv[i] == '--v': print(f"当前版本为 V{VERSION}") elif sys.argv[i] == '-child' or sys.argv[i] == '--child': father = 0 print(f"当前以子进程方式启动") if fname and log_file and pidnum: print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum} father:{father}") date = time.strftime("%Y%m%d", time.localtime()) logname = f"{log_file}-{date}" quant = Quant(utils.get_params(fname), logname, father) quant.run() elif fname: print(f"运行指定配置文件{fname}") quant = Quant(utils.get_params(fname),father=father) quant.run() else: print("缺少指定参数 运行默认配置文件") fname = 'config.toml' quant = Quant(utils.get_params(fname),father=father) quant.run() else: fname = 'config.toml' quant = Quant(utils.get_params(fname)) quant.run()