||
- import asyncio
- from aiohttp import web
- import traceback
- import time, csv
- import strategy as strategy
- import utils
- import model
- import logging, logging.handlers
- import signal
- import os, json, sys
- import predictor
- import backtest
- import multiprocessing
- import random
- import psutil
- import ujson
- import broker
- from decimal import Decimal
- from loguru import logger
- VERSION = utils.VERSION
- def timeit(func):
- def wrapper(*args, **kwargs):
- nowTime = time.time()
- res = func(*args, **kwargs)
- spend_time = time.time() - nowTime
- spend_time = round(spend_time * 1000, 5)
- print(f'{func.__name__} 耗时 {spend_time} ms')
- return res
- return wrapper
- class Quant:
- def __init__(self, params:model.Config, logname="test_logname", father=1):
- print('############### 超级无敌韭菜收割机 ################')
- print(f'>>> 版本 {VERSION} <<<')
- print('*** 当前配置')
- self.params = params
- for p in self.params.__dict__:
- print('***', p, ' => ', getattr(self.params, p))
- print('##################################################')
- self.logger = self.get_logger(logname)
- self.csvname = logname + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- pid = os.getpid()
- self.pid_start_time = time.time()
- self.logger.info(f"进程号{pid} 启动时间{self.pid_start_time}")
- ##### 绑定cpu
- cpu_count = psutil.cpu_count()
- print("检测cpu核心负载")
- cpu_used_pct = [0.0 for _ in range(cpu_count)]
- for _ in range(random.randint(5,15)):
- r = psutil.cpu_times_percent(percpu=True)
- for i in range(cpu_count):
- cpu_used_pct[i] += int(r[i].user)
- time.sleep(0.1)
- print(cpu_used_pct)
- cpu_id = cpu_used_pct.index(min(cpu_used_pct))
- print(f"当前负载最低的cpu为:{cpu_id}")
- self.process = psutil.Process(pid)
- print(f"核心数{cpu_count} 目标绑定cpu:{cpu_id}")
- os.system(f"taskset -cp {cpu_id} {pid}")
- print("调整系统调度优先级为最高等级")
- if 'win' not in sys.platform:
- print(os.nice(-20))
- #### cpu 内存 平均占用
- self.cpu_ema = 0.0
- self.mm_ema = 0.0
- #####
- self.acct_name = self.params.account_name
- self.symbol = self.params.pair
- self.base = self.params.pair.split('_')[0].upper()
- self.quote = self.params.pair.split('_')[1].upper()
- if 1:
- ### 使用uvloop
- if 'win' not in sys.platform:
- print('采用高速事件循环库')
- import uvloop
- self.loop = uvloop.new_event_loop()
- else:
- print('采用普通事件循环库')
- self.loop = asyncio.get_event_loop()
- else:
- ### 使用原生loop
- self.loop = asyncio.get_event_loop()
- self.strategy = strategy.Strategy(self.params, is_print=1)
- ###### 判断启动方式
- self.father = father
- print(f"父进程标识 {self.father}")
- ##### 现货底仓
- hold_coin = float(self.params.hold_coin)
- self.hold_coin = utils.clip(hold_coin, 0.0, 10000.0)
- ##### 本地状态量
- self.data = dict()
- self.total_equity = 0.0
- self.local_orders = dict() # 本地挂单表
- self.local_orders_backup = dict() # 本地订单缓存队列
- self.local_orders_backup_cid = [] # 本地订单缓存cid队列
- self.handled_orders_cid = [] # 本地已处理cid缓存队列
- self.local_profit = 0.0
- self.local_cash = 0.0 # 本地U保证金
- self.local_coin = 0.0 # 本地币保证金
- self.local_position = model.Position()
- self.local_position_by_orders = model.Position()
- self.local_buy_amount = 0.0
- self.local_sell_amount = 0.0
- self.local_buy_value = 0.0
- self.local_sell_value = 0.0
- self.local_cancel_log = dict()
- self.interval = float(self.params.interval)
- self.exchange = self.params.exchange
- self.tradeMsg = model.TraderMsg()
- self.exit_msg = "正常退出"
- self.save = int(self.params.save) # 保存行情数据
- self.logger.info(f"实时行情数据记录开关:{self.save}")
- # 仓位检查结果序列
- self.position_check_series = []
- # 止损大小
- self.stoploss = float(self.params.stoploss)
- # 资金使用率
- self.used_pct = float(self.params.used_pct) #使用资金比例
- # 启停信号 0 表示运行 大于1开始倒计时 1时停机
- self.mode_signal = 0
- # 交易盘口订单流更新时间
- self.trade_order_update_time = time.time()
- # onTick触发时间记录
- self.on_tick_event_time = time.time()
- # 盘口ticker depth信息
- self.tickers = dict()
- self.depths = dict()
- # 行情更新延迟监控
- self.market_update_time = dict()
- self.market_update_interval = dict()
- # 参考盘口名称
- refex = self.params.refexchange
- refpair = self.params.refpair
- if len(refex) != len(refpair):
- self.logger.error("参考盘口数不等于参考品种数 退出")
- raise Exception("参考盘口数不等于参考品种数 退出")
- self.ref_num = len(refex)
- self.ref_name = []
- for i in range(self.ref_num):
- if refex[i] not in broker.exchange_lists:
- self.logger.error("出现不支持的参考盘口")
- raise Exception("出现不支持的参考盘口")
- name = refex[i] + '@' + refpair[i] + '@ref'
- self.ref_name.append(name)
- self.tickers[name] = dict()
- self.depths[name] = list()
- self.market_update_time[name] = 0.0
- self.market_update_interval[name] = 0.0
- # 参考盘口tick更新时间
- # 服务器私有ip地址检查
- ipList = utils.get_local_ip_list()
- ipListNum = len(ipList)
- # if int(self.params.ip) >= ipListNum:
- # raise Exception("指定私有ip地址序号不存在")
- # 用于创建ws实例
- name = self.exchange+'@'+self.params.pair
- self.trade_name = name
- self.market_update_time[name] = 0.0
- self.market_update_interval[name] = 0.0
- self.tickers[name] = dict()
- self.depths[name] = list()
- cp = model.ClientParams()
- cp.name = self.trade_name
- cp.pair = self.params.pair
- cp.access_key = self.params.access_key
- cp.secret_key = self.params.secret_key
- cp.pass_key = self.params.pass_key
- cp.interval = self.params.interval
- cp.broker_id = self.params.broker_id
- cp.debug = self.params.debug
- cp.proxy = self.params.proxy
- cp.ip = int(self.params.ip)
- self.ws = broker.newWs(self.exchange)(
- params=cp,
- colo=int(self.params.colo),
- is_print=0,
- )
- self.ws.logger = self.logger
- self.ready = 0
- # rest实例
- self.rest = broker.newRest(self.exchange)(cp, colo=int(self.params.colo))
- self.ws_ref = dict()
- # 参考盘口 ws 实例
- for i in range(self.ref_num):
- cp = model.ClientParams()
- cp.name = self.ref_name[i]
- cp.pair = self.params.refpair[i]
- cp.proxy = self.params.proxy
- cp.interval = self.params.interval
- cp.ip = int(self.params.ip)
- exchange = self.params.refexchange[i]
- if exchange not in broker.exchange_lists:
- self.logger.error("参考盘口名称错误 退出")
- return
- _colo = 0
- if self.params.refexchange[i] == self.params.exchange and \
- self.params.refpair[i] == self.params.pair and int(self.params.colo):
- _colo = 1
- self.ws_ref[self.ref_name[i]] = broker.newWs(exchange)(cp, colo=_colo)
- self.ws_ref[self.ref_name[i]].callback['onTicker']=self.update_ticker
- self.ws_ref[self.ref_name[i]].callback['onDepth']=self.update_depth
- self.ws_ref[self.ref_name[i]].logger = self.logger
- # 添加回调
- self.ws.callback = {
- 'onTicker':self.update_ticker,
- 'onDepth':self.update_depth,
- 'onPosition':self.update_position,
- 'onEquity':self.update_equity,
- 'onOrder':self.update_order,
- 'onExit':self.update_exit,
- }
- self.rest.callback = {
- 'onTicker':self.update_ticker,
- 'onDepth':self.update_depth,
- 'onPosition':self.update_position,
- 'onEquity':self.update_equity,
- 'onOrder':self.update_order,
- 'onExit':self.update_exit,
- }
- self.rest.logger = self.logger
- # 配置策略
- self.strategy.logger = self.logger
- # 配置定价模型
- price_alpha = []
- for i in self.params.refpair:
- # 交易1000shib 参考 shib
- if '1000' in self.params.pair and '1000' not in i:
- price_alpha.append(1000.0)
- # 交易shib 参考 1000shib
- elif '1000' not in self.params.pair and '1000' in i:
- price_alpha.append(0.001)
- else:
- # 交易shib 参考 shib
- price_alpha.append(1.0)
- self.logger.info(f'价格系数{price_alpha}')
- self.Predictor = predictor.Predictor(ref_name=self.ref_name, alpha=price_alpha, gamma=float(self.params.gamma))
- # 初始化参数
- self.strategy.trade_open_dist = float(self.params.open)
- self.strategy.trade_close_dist = float(self.params.close)
- # 在线训练
- self.backtest = int(self.params.backtest)
- self.logger.info(f'在线训练开关 {self.backtest}')
- ####
- time.sleep(3)
- def get_logger(self, logname):
- '''日志模块'''
- logger = logging.getLogger(__name__)
- # # log flag
- # if int(self.params.log):
- # log_level = logging.DEBUG
- # logger.setLevel(log_level)
- # # log to txt
- # formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
- # if logname == None: logname = "log"
- # handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8')
- # handler.setLevel(log_level)
- # handler.setFormatter(formatter)
- # # log to console
- # console = logging.StreamHandler()
- # console.setLevel(logging.INFO)
- # # add
- # logger.addHandler(handler)
- # logger.addHandler(console)
- # else:
- log_level = logging.INFO
- logger.setLevel(log_level)
- # log to txt
- formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
- if logname == None: logname = "log"
- handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8')
- handler.setLevel(log_level)
- handler.setFormatter(formatter)
- # add
- logger.addHandler(handler)
- logger.info('开启日志记录')
- return logger
- def update_order(self, data):
- self.loop.create_task(self._update_order(data))
- async def _update_order(self, data):
- '''
- 更新订单
- 首先直接复写本地订单
- 1、如果是开仓单
- 如果新增: 增加本地订单
- 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位
- 如果成交: 删除本地订单 发送平仓订单 修改本地仓位
- 2、如果是平仓单
- 如果新增: 增加本地订单
- 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位
- 如果成交: 删除本地订单 修改本地仓位
- NEW 可以从 ws / rest 来
- REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
- 为了防止下单失败依然有订单成交 本地需要做一个缓存
- '''
- try:
- # 触发订单更新
- self.trade_order_update_time = time.time()
- # 新增订单推送 仅需要cid oid信息
- if data['status'] == 'NEW':
- # 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控)
- if data['client_id'] in self.local_orders:
- self.local_orders[data['client_id']]["order_id"] = data['order_id']
- self.local_orders[data['client_id']]["localtime"] = time.time()
- # 完成订单推送 仅需要cid filled filled_size信息
- elif data['status'] == 'REMOVE':
- # 如果在撤单记录中 说明此订单结束生命周期 可以移除记录
- if data["client_id"] in self.local_cancel_log:
- del(self.local_cancel_log[data["client_id"]])
- # 在cid缓存队列中 说明是本策略的订单
- if data["client_id"] in self.local_orders_backup:
- # 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算
- if data['client_id'] not in self.handled_orders_cid:
- # 添加进已处理队列
- self.handled_orders_cid.append(data["client_id"])
- # 提取成交信息 方向 价格 量
- filled = data["filled"]
- side = self.local_orders_backup[data['client_id']]["side"]
- if "filled_price" in data:
- if data["filled_price"] > 0.0:
- filled_price = data["filled_price"]
- else:
- filled_price = self.local_orders_backup[data['client_id']]["price"]
- else:
- filled_price = self.local_orders_backup[data['client_id']]["price"]
- # 只有开仓成交才触发onPosition
- # 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况
- if filled > 0:
- if "spot" in self.exchange:# 如果是现货交易 还需要修改equity
- ### 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
- fee = data["fee"]
- ### 现货订单流仓位计算
- if side == "kd": # buy
- self.local_buy_amount += filled - fee
- self.local_buy_value += (filled - fee) * filled_price
- new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)) - Decimal(str(fee)))
- if new_long_pos == 0.0:
- self.local_position_by_orders.longAvg = 0.0
- self.local_position_by_orders.longPos = 0.0
- else:
- self.local_position_by_orders.longAvg = \
- (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos
- self.local_position_by_orders.longPos = new_long_pos
- self.local_cash -= filled * filled_price
- self.local_coin += filled - fee
- elif side == "pd": # sell
- self.local_sell_amount += filled
- self.local_sell_value += filled * filled_price
- self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg)
- new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled)))
- if new_long_pos == 0.0:
- self.local_position_by_orders.longAvg = 0.0
- self.local_position_by_orders.longPos = 0.0
- else:
- self.local_position_by_orders.longPos = new_long_pos
- self.local_cash += filled * filled_price - fee
- self.local_coin -= filled
- elif side == "pk": # buy
- self.local_buy_amount += filled - fee
- self.local_buy_value += (filled - fee) * filled_price
- self.local_profit += filled * (self.local_position_by_orders.shortAvg - filled_price)
- new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)) - Decimal(str(fee)))
- if new_short_pos == 0.0:
- self.local_position_by_orders.shortAvg = 0.0
- self.local_position_by_orders.shortPos = 0.0
- else:
- self.local_position_by_orders.shortPos = new_short_pos
- self.local_cash -= filled * filled_price
- self.local_coin += filled - fee
- elif side == "kk": # sell
- self.local_sell_amount += filled
- self.local_sell_value += filled * filled_price
- new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled)))
- if new_short_pos == 0.0:
- self.local_position_by_orders.shortAvg = 0.0
- self.local_position_by_orders.shortPos = 0.0
- else:
- self.local_position_by_orders.shortAvg = \
- (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos
- self.local_position_by_orders.shortPos = new_short_pos
- self.local_cash += filled * filled_price - fee
- self.local_coin -= filled
- else:
- self.logger.error(f"错误的仓位方向{side}")
- else:
- ### 合约订单流仓位计算
- if side == "kd":
- self.local_buy_amount += filled
- self.local_buy_value += filled * filled_price
- new_long_pos = (self.local_position_by_orders.longPos + filled)
- if new_long_pos == 0.0:
- self.local_position_by_orders.longAvg = 0
- self.local_position_by_orders.longPos = 0
- else:
- self.local_position_by_orders.longAvg = \
- (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos
- self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)))
- elif side == "kk":
- self.local_sell_amount += filled
- self.local_sell_value += filled * filled_price
- new_short_pos = (self.local_position_by_orders.shortPos + filled)
- if new_short_pos == 0.0:
- self.local_position_by_orders.shortAvg = 0
- self.local_position_by_orders.shortPos = 0
- else:
- self.local_position_by_orders.shortAvg = \
- (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos
- self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled)))
- elif side == "pd":
- self.local_sell_amount += filled
- self.local_sell_value += filled * filled_price
- self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg)
- self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled)))
- if self.local_position_by_orders.longPos == 0:self.local_position_by_orders.longAvg = 0
- elif side == "pk":
- self.local_buy_amount += filled
- self.local_buy_value += filled * filled_price
- self.local_profit += filled * (self.local_position_by_orders.shortAvg-filled_price)
- self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)))
- if self.local_position_by_orders.shortPos == 0:self.local_position_by_orders.shortAvg = 0
- else:
- self.logger.error(f"错误的仓位方向{side}")
- # 统计合约交易手续费 正fee为扣手续费 负fee为返佣
- if 'fee' in data:
- self.local_profit -= data['fee']
- self.logger.debug('更新推算仓位'+str(self.local_position_by_orders.__dict__))
- ###
- self._print_local_trades_summary()
- # 每次有订单变动就触发一次策略
- if self.mode_signal == 0 and self.ready:
- ### 更新交易数据
- self.update_trade_msg()
- ### 触发策略挂单逻辑
- # 更新策略时间
- self.strategy.local_time = time.time()
- orders = self.strategy.onTime(self.tradeMsg)
- # if (('Limits_open' in orders and len(orders['Limits_open']) != 0) or
- # ('Limits_close' in orders and len(orders['Limits_close']) != 0)):
- # self.logger.info("--------------------------------update_local_order订单指令----------------------------")
- # self.logger.info(orders)
- # self.logger.info("-------------------------------------------end--------------------------------------")
- ### 记录指令触发信息
- if self._not_empty(orders):
- self.logger.debug("触发onOrder")
- self._update_local_orders(orders)
- self.loop.create_task(self.rest.handle_signals(orders))
- self.logger.debug(orders)
- else:
- self.logger.debug(f"订单已经参与过仓位计算 拒绝重复进行计算{data['client_id']}")
- else:
- self.logger.debug(f"订单不属于本策略 拒绝进行仓位计算{data['client_id']}")
- # 移除本地订单
- if data["client_id"] in self.local_orders:
- self.logger.debug(['删除本地订单', data["client_id"]])
- del(self.local_orders[data["client_id"]])
- else:
- self.logger.debug(['该订单不在本地挂单表中', data["client_id"]])
- else:
- print(data)
- self.logger.debug(f"未知的订单事件类型 {data}")
- except Exception as e:
- print("处理订单推送出错:"+str(e))
- self.logger.error("处理订单推送出错:"+str(e))
- self.logger.error(traceback.format_exc())
- self.exit_msg="处理订单推送出错"
- self.stop()
- def _update_local_orders(self, orders):
- """
- 本地记录所有报单信息
- """
- try:
- for i in orders:
- if "Limits" in i:
- for j in orders[i]:
- order_info = dict()
- order_info['symbol'] = self.symbol
- order_info['amount'] = float(j[0])
- order_info['side'] = j[1]
- order_info['price'] = float(j[2])
- order_info['client_id'] = j[3]
- order_info['filled_price'] = 0
- order_info['filled'] = 0
- order_info['order_id'] = ""
- order_info['localtime'] = self.strategy.local_time
- order_info['createtime'] = self.strategy.local_time
- self.local_orders[j[3]] = order_info # 本地挂单表
- self.logger.debug(['新增本地订单', order_info])
- self.local_orders_backup[j[3]] = order_info # 本地缓存表
- self.local_orders_backup_cid.append(j[3]) # 本地缓存cid表
- if 'Cancel' in i:
- # 记录撤单次数
- cid = orders[i][0]
- if cid in self.local_cancel_log:
- self.local_cancel_log[cid] += 1
- else:
- self.local_cancel_log[cid] = 0
- # 清除过于久远的历史记录
- if len(self.local_orders_backup_cid) > 9999:
- cid = self.local_orders_backup_cid[0]
- # 判断是否超过1个小时 如果超过则移除历史记录
- if cid in self.local_orders_backup:
- if time.time() - self.local_orders_backup[cid]["localtime"] > 3600:
- del(self.local_orders_backup[cid])
- del(self.local_orders_backup_cid[0])
- if len(self.handled_orders_cid) > 9999:
- del(self.handled_orders_cid[0])
- except:
- self.logger.error("本地记录订单信息出错")
- self.logger.error(traceback.format_exc())
- self.exit_msg="本地记录订单信息出错"
- self.stop()
- def _not_empty(self, orders):
- '''检查指令是否不为空'''
- if isinstance(orders, dict):
- for order_name in orders:
- if "Cancel" in order_name or "Check" in order_name:
- return 1
- elif "Limits_open" in order_name:
- if len(orders["Limits_open"]) > 0:
- return 1
- elif "Limits_close" in order_name:
- if len(orders["Limits_close"]) > 0:
- return 1
- return 0
- def _print_local_trades_summary(self):
- '''计算本地累计利润'''
- ###
- local_buy_amount = round(self.local_buy_amount,5)
- local_buy_value = round(self.local_buy_value,5)
- local_sell_amount = round(self.local_sell_amount,5)
- local_sell_value = round(self.local_sell_value,5)
- local_profit = 0.0
- if isinstance(self.strategy.mp, float):
- unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp
- realized = local_sell_value - local_buy_value
- local_profit = round(unrealized+realized,5)
- self.strategy.local_profit = local_profit
- ###
- msg = f"买量{local_buy_amount} 卖量{local_sell_amount} 买额{local_buy_value} 卖额{local_sell_value} 利润 {local_profit}"
- self.logger.info(msg)
- def update_position(self, data):
- '''
- 更新仓位信息
- '''
- if data != self.local_position:
- self.local_position = data
- self.logger.debug('更新本地仓位'+str(self.local_position.__dict__))
- """
- 2023-2-22
- 用create_task去执行,会延迟,占用越大,延迟越大,可能会延迟100ms计算
- """
- def update_ticker(self, data):
- '''
- 增加onticker撤单 可能会导致平仓难度加大
- '''
- self.loop.create_task(self._update_ticker(data))
- def update_depth(self, data):
- self.loop.create_task(self._update_depth(data))
- async def _update_ticker(self, data):
- '''
- update ticker infomation
- '''
- name = data['name']
- # 记录tick更新时间
- # self.market_update_time[name] = time.time()
- self.tickers[name] = data
- ### 判断是否需要触发ontick
- if name == self.ref_name[self.strategy.ref_index]:
- pass
- elif name == self.trade_name:
- pass
- else:
- pass
- # @utils.timeit
- async def _update_depth(self, data):
- '''
- update orderbook infomation
- '''
- name = data['name']
- now_time = time.time()
- if self.market_update_time[name] == 0.0:
- pass
- else:
- interval = now_time - self.market_update_time[name]
- if self.market_update_interval[name] == 0.0:
- self.market_update_interval[name] = interval
- else:
- self.market_update_interval[name] = self.market_update_interval[name]*0.999 + interval*0.001
- self.market_update_time[name] = now_time
- ### 初始化depths
- if self.depths[name] == list():
- self.depths[name] = data['data']
- ### 判断是否需要触发ondepth
- # 如果是交易盘口
- if name == self.trade_name:
- ### 更新depths
- self.depths[name] = data['data']
- # 允许交易
- if self.mode_signal == 0 and self.ready:
- ### 聚合行情处理
- self.on_agg_market()
- ### 判断是否为当前跟踪的盘口
- elif name == self.ref_name[self.strategy.ref_index]:
- ### 判断是否需要触发ontick 对行情进行过滤
- ### 过滤条件 价格变化很大 时间间隔很长
- flag = 0
- if abs(data['data'][utils.BP_INDEX] - self.depths[name][utils.BP_INDEX])/data['data'][utils.BP_INDEX] > 0.0002 or \
- abs(data['data'][utils.AP_INDEX] - self.depths[name][utils.AP_INDEX])/data['data'][utils.AP_INDEX] > 0.0002 or \
- time.time() - self.on_tick_event_time > 0.05:
- ### 允许交易
- flag = 1
- ### 更新ontick触发时间记录
- self.on_tick_event_time = time.time()
- ### 更新depths
- self.depths[name] = data['data']
- # 允许交易
- if self.mode_signal == 0 and self.ready and flag:
- ### 更新交易数据
- self.update_trade_msg()
- ### 触发事件撤单逻辑
- # 更新策略时间
- self.strategy.local_time = time.time()
- # 产生交易信号
- orders = self.strategy.onTime(self.tradeMsg)
- # if (('Limits_open' in orders and len(orders['Limits_open']) != 0) or
- # ('Limits_close' in orders and len(orders['Limits_close']) != 0)):
- # self.logger.info("--------------------------------_update_depth订单指令--------------------------------")
- # self.logger.info(orders)
- # self.logger.info("-------------------------------------------end--------------------------------------")
- ### 记录指令触发信息
- if self._not_empty(orders):
- self.logger.debug("触发onTick")
- self._update_local_orders(orders)
- self.loop.create_task(self.rest.handle_signals(orders))
- self.logger.debug(orders)
- else:
- pass
- # @timeit
- async def real_time_back_test(self, data):
- '''
- 按照长短期回测利润选择参数
- 优先按长期回测利润选参数 如果找不到就
- 再按短期回测利润选参数 如果还找不到就
- 使用默认参数 如果默认参数亏损就触发冷静期
- '''
- now_time = time.time()
- await asyncio.sleep(0.005)
- for i in self.backtest_tasks:
- i["backtest_engine"].backtest_time = now_time
- i["backtest_engine"].run_by_tick(data)
- def choose_params(self):
- '''
- 按照长短期回测利润选择参数
- 优先按长期回测利润选参数 如果找不到就
- 再按短期回测利润选参数 如果还找不到就
- 使用默认参数 如果默认参数亏损就触发冷静期
- '''
- profits = []
- for i in self.backtest_tasks:
- # 获取绩效信息
- e = i["backtest_engine"].equity # 最终净值
- # 计算标准化利润
- p = (e-self.backtest_start_cash) / self.backtest_start_cash \
- / self.backtest_look_length * self.tick_profit_to_daily
- # 有一定成交次数的回测结果才有代表性 持仓太久的参数禁止使用
- _trade_num = i['backtest_engine'].trade_num
- _avg_hold_time = i['backtest_engine'].avg_hold_time
- _equity_high = i['backtest_engine'].equity_high
- # 排除交易次数太少的参数
- if i['open'] <= 0.002:
- if _trade_num < 10:
- p = 0.0
- # 排除长期持仓的参数
- if _avg_hold_time > 600:
- p = 0.0
- # 排除近期回撤较大的参数
- if _equity_high > e*1.01:
- p = 0.0
- profits.append(p) #利润
- ############## 重置回测
- # if _trade_num > 200:
- # i["backtest_engine"].trade_num = 0
- # i["backtest_engine"].equity = self.backtest_start_cash
- # 盈利参数个数不能太少 防止孤岛参数
- win_num = 0
- for i in profits:
- if i > 0.0:
- win_num += 1
- cond1 = win_num > self.backtest_num*0.1
- cond2 = win_num > 2
- cond_win = cond1 and cond2
- if cond_win:
- # 按最优回测结果调整参数
- max_profit = max(profits)
- max_index = profits.index(max_profit)
- self.strategy.trade_open_dist = self.backtest_tasks[max_index]["open"]
- self.strategy.trade_close_dist = self.backtest_tasks[max_index]["close"]
- self.strategy.ref_index = self.backtest_tasks[max_index]["index"]
- self.strategy.post_side = self.backtest_tasks[max_index]["side"]
- self.strategy.predict_alpha = self.backtest_tasks[max_index]["alpha"]
- # 检查是否需要关闭回测
- # if self.strategy.ready == 1:
- # self.backtest = 0
- else:
- # 如果没有符合条件的盈利参数
- self.strategy.trade_open_dist = 0.01
- self.strategy.trade_close_dist = 0.00001
- self.strategy.ref_index = 0
- self.strategy.post_side = 0
- self.strategy.predict_alpha = 0
- # 检查是否需要关闭回测
- # if self.strategy.ready == 1:
- # self.backtest = 0
- # self.exit_msg = "未找到合适参数 停机"
- # self.stop()
- return
- def update_equity(self, data):
- '''
- 更新保证金信息
- 合约一直更新
- 现货只有当出现异常时更新
- '''
- if "spot" in self.exchange:
- pass
- else:
- self.local_cash = data[self.quote] * self.used_pct
- def update_exit(self, data):
- '''
- 底层触发停机
- '''
- self.exit_msg = data
- self.stop()
- def get_all_market_data(self):
- '''
- 只能定时触发
- 组合市场信息=交易盘口+参考盘口
- '''
- market = []
- data = self.ws._get_data()["data"]
- market += data
- for i in self.ref_name:
- data = self.ws_ref[i]._get_data()["data"]
- market += data
- # handle save real market data
- if self.save:
- with open(f'./{self.csvname}.csv',
- 'a',
- newline='',
- encoding='utf-8') as f:
- writer = csv.writer(f, delimiter=',')
- writer.writerow(market)
- return market
- async def before_trade(self):
- ####### 启动ws #######
- # 启动交易ws
- # 当开启回测时才订阅交易盘口的成交流
- _sub_trade = int(self.params.backtest)
- _sub_fast = int(self.params.fast)
- # TODO 这是task1,要在这个交易所交易,pycharm定位有问题,这个ws不一定指向binance
- self.loop.create_task(self.ws.run(is_auth=1, sub_trade=_sub_trade, sub_fast=0))
- # TODO 这是task n,用来做参考
- for i in self.ref_name:
- # 启动参考ws 参考盘口使用fast行情性能消耗更大 使用普通行情可以节省性能
- self.loop.create_task(self.ws_ref[i].run(is_auth=0, sub_trade=0, sub_fast=_sub_fast))
- await asyncio.sleep(1)
- ###### 做交易前准备工作 ######
- # 买入平台币
- # TODO v9情况下买入平台币会怎么样?
- await self.rest.buy_token()
- await asyncio.sleep(1)
- # 清空挂单和仓位
- await self.rest.check_position(hold_coin=self.hold_coin)
- await asyncio.sleep(1)
- # 获取市场信息
- await self.rest.before_trade()
- await asyncio.sleep(1)
- # 获取价格信息
- ticker = await self.rest.get_ticker()
- mp = ticker['mp']
- # 获取账户信息
- await asyncio.sleep(1)
- await self.rest.get_equity()
- # 初始资金
- start_cash = self.rest.cash_value * self.used_pct
- start_coin = self.rest.coin_value * self.used_pct
- if start_cash == 0.0 and start_coin == 0.0:
- self.exit_msg = f"初始为零 cash: {start_cash} coin: {start_coin}"
- self.stop()
- self.logger.info(f"初始cash: {start_cash} 初始coin: {start_coin}")
- # 初始化策略基础信息
- if isinstance(mp, float):
- if mp <= 0.0:
- self.exit_msg = f"初始价格获取错误 {mp}"
- self.stop()
- else:
- print(f"初始价格为 {mp}")
- else:
- self.exit_msg = f"初始价格获取错误 {mp}"
- self.stop()
- self.strategy.mp = mp
- self.strategy.start_cash = start_cash
- self.strategy.start_coin = start_coin
- self.strategy.start_equity = start_cash + start_coin * mp
- self.strategy.max_equity = self.strategy.start_equity
- self.strategy.equity = self.strategy.start_equity
- self.strategy.total_amount = self.strategy.equity * self.strategy.leverrate / self.strategy.mp
- self.strategy.stepSize = self.rest.stepSize if self.rest.stepSize < 1.0 else int(self.rest.stepSize)
- self.strategy.tickSize = self.rest.tickSize if self.rest.tickSize < 1.0 else int(self.rest.tickSize)
- if self.strategy.stepSize == None or self.strategy.tickSize == None:
- self.exit_msg = f"交易精度未正常获取 stepsize: {self.strategy.stepSize} ticksize: {self.strategy.tickSize}"
- self.stop()
- else:
- self.logger.info(f"数量精度{self.strategy.stepSize}")
- self.logger.info(f"价格精度{self.strategy.tickSize}")
- grid = float(self.params.grid)
- # 计算下单数量
- if "spot" in self.exchange:
- long_one_hand_value = start_cash * float(self.params.leverrate) / grid
- short_one_hand_value = start_coin * mp * float(self.params.leverrate) / grid
- long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
- short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
- else:
- long_one_hand_value = start_cash * float(self.params.leverrate) / grid
- short_one_hand_value = start_cash * float(self.params.leverrate) / grid
- long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
- short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
- # 检查是否满足最低交易要求
- print(f"最低单手交易下单量为 buy: {long_one_hand_amount} sell: {short_one_hand_amount}")
- if (long_one_hand_amount == 0 and short_one_hand_amount == 0) or (long_one_hand_value < 20 and short_one_hand_value < 20):
- self.exit_msg = f"初始下单量太少 buy: {long_one_hand_amount} sell: {short_one_hand_amount}"
- self.stop()
- # 初始化调度器
- self.local_cash = start_cash
- self.local_coin = start_coin
- # 配置在线训练
- if self.backtest:
- # 设置策略默认参数
- self.strategy.trade_close_dist = 0.00001
- self.strategy.trade_open_dist = 0.01
- self.backtest_look_length = 86400 / self.interval # 回测区间足够长
- self.backtest_tasks = list()
- self.tick_profit_to_daily = (86400/self.interval)
- self.backtest_start_cash = 1000000.0
- # 备选参数
- open_list, close_list, alpha_list = utils.get_backtest_set(self.base)
- if 'spot' in self.exchange:
- side_list = []
- if long_one_hand_amount > 0:
- side_list.append(1)
- if short_one_hand_amount > 0:
- side_list.append(-1)
- if 1 in side_list and -1 in side_list:
- side_list.append(0)
- else:
- side_list = [-1,0,1]
- side_list_allow = []
- for s in side_list:
- if s in utils.POST_SIDE_LIMIT:
- side_list_allow.append(s)
- side_list = side_list_allow
- for _open in open_list:
- for _side in side_list:
- for _close in close_list[_open]:
- for _index in range(self.ref_num):
- for _alpha in alpha_list:
- task = dict()
- st = strategy.Strategy(self.params, is_print=0)
- st.leverrate = 1.0
- st.trade_open_dist = _open
- st.trade_close_dist = _close
- st.predict_alpha = _alpha
- st.ref_index = _index
- st.post_side = _side
- st.exchange = "dummy_usdt_swap"
- st.local_start_time = 0.0
- bt = backtest.Backtest(st, is_plot=0)
- bt.start_cash = self.backtest_start_cash
- task["backtest_engine"] = bt
- task["open"] = _open
- task["close"] = _close
- task["index"] = _index
- task["side"] = _side
- task["alpha"] = _alpha
- self.backtest_tasks.append(task)
- backtest_num = len(self.backtest_tasks)
- self.backtest_num = backtest_num
- self.logger.info(f'在线模拟撮合数量{backtest_num}')
- self.logger.info(f'当前为在线训练模式 需预热{utils.BACKTEST_PREHOT_SECOND}秒 请耐心等候...')
- else:
- self.logger.info('当前为指定参数模式...')
- ###### 交易前准备就绪 可以开始交易 ######
- self.loop.create_task(self.rest.go())
- self.loop.create_task(self.on_timer())
- self.loop.create_task(self._run_server())
- self.loop.create_task(self.run_stratey())
- #self.loop.create_task(self.post_loop()) #改
- self.loop.create_task(self.early_stop_loop())
- def update_trade_msg(self):
- # 更新保证金
- self.tradeMsg.cash = round(self.local_cash,10)
- self.tradeMsg.coin = round(self.local_coin,10)
- # 使用本地推算仓位
- self.tradeMsg.position = self.local_position_by_orders
- # 更新订单
- self.tradeMsg.orders = self.local_orders
- ### 更新 ref
- ref_tickers = []
- for i in self.ref_name:
- ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']])
- self.tradeMsg.ref_price = self.Predictor.Get_ref(ref_tickers)
- # logger.info('ref_price={}, market={}, predict={}'.format(
- # self.tradeMsg.ref_price, self.tradeMsg.market, self.tradeMsg.predict))
- async def server_handle(self, request):
- '''中控数据接口'''
- if 'spot' in self.exchange:
- pos = self.local_position_by_orders.longPos - self.local_position_by_orders.shortPos
- else:
- pos = self.local_position.longPos - self.local_position.shortPos
- if pos > 0.0:
- entryPrice = self.local_position_by_orders.longAvg
- elif pos < 0.0:
- entryPrice = self.local_position_by_orders.shortAvg
- else:
- entryPrice = 0
- return web.Response(body=json.dumps({
- "now_balance": round(self.strategy.equity/self.used_pct, 4), #钱包余额
- "unrealized_pn_l": round(self.local_profit, 4), #未实现盈利
- "pos": round(pos, 8), #持仓数量
- "entry_price": round(entryPrice, 8), #开仓价格
- "now_price": round(self.strategy.mp, 8), #当前价格
- }))
- async def change(self, request):
- '''中控台修改参数'''
- try:
- data = await request.json()
- if "stop" in data:
- self.logger.warning('中控停机')
- self.exit_msg = '中控停机'
- self.stop()
- return web.Response(text=f"停机成功")
-
- ip = request.remote
- print(f'从{ip}收到更新参数请求',data)
- if isinstance(data, str):
- data = json.loads(data)
- if self.backtest == 1:
- return web.Response(text="自动调参模式不允许手动修改参数")
- else:
- open = float(data['open'])
- close = float(data['close'])
- self.strategy.trade_open_dist = open
- self.strategy.trade_close_dist = close
- return web.Response(text=f"参数修改成功 {open} {close}")
- except Exception as e:
- return web.Response(text=f"参数修改失败 {e}")
- # @utils.timeit
- def check_risk(self):
- '''检查风控'''
- if self.strategy.start_cash == 0.0:
- print("请检查交易账户余额")
- return 0
- if isinstance(self.strategy.mp, float):
- pass
- else:
- print("请检查最新价格")
- return 0
- ############
- # print("当前线程数",self.process.num_threads())
- ###### 资源风控0 ######
- cpu_pct = psutil.cpu_times_percent().user
- self.cpu_ema = self.cpu_ema * 0.8 + cpu_pct * 0.2
- # print(f"cpu占用 {cpu_pct}")
- if self.cpu_ema > 95:
- msg = f"cpu占用过高 {self.cpu_ema} 准备停机"
- print(msg)
- self.logger.warning(msg)
- self.exit_msg = msg
- self.stop()
- mm_pct = psutil.virtual_memory().percent
- self.mm_ema = self.mm_ema * 0.8 + mm_pct * 0.2
- # print(f"内存占用 {mm_pct}")
- if self.mm_ema > 95:
- msg = f"内存占用过高 {self.mm_ema} 准备停机"
- print(msg)
- self.logger.warning(msg)
- self.exit_msg = msg
- self.stop()
- ###### 回撤风控1 ######
- if "spot" not in self.exchange:
- draw_back = 1-self.strategy.equity/self.strategy.max_equity
- if draw_back > self.stoploss:
- msg = f"{self.acct_name} 总资金吊灯回撤{draw_back} 当前{self.strategy.equity} 最高{self.strategy.max_equity} 触发止损 准备停机"
- print(msg)
- self.logger.warning(msg)
- self.exit_msg = msg
- self.stop()
- ###### 回撤风控2 ######
- draw_back = self.local_profit/self.strategy.start_equity
- if draw_back < -self.stoploss:
- msg = f"{self.acct_name} 交易亏损 触发止损 准备停机"
- print(msg)
- self.logger.warning(msg)
- self.exit_msg = msg
- self.stop()
- ###### 报单延迟风控 ######
- if self.rest.avg_delay > 5000: # 平均延迟允许上限 5000ms
- msg = f"{self.acct_name} 延迟爆表 触发风控 准备停机"
- print(msg)
- self.logger.warning(msg)
- self.exit_msg = msg
- self.stop()
- ###### 仓位异常风控 ######
- ### 合约60秒更新一次绝对仓位 ###
- # 连续5分钟仓位不正确就停机
- # 5 * 60 = 300 300/10 = 30
- diff_pos = max(abs(self.local_position.longPos - self.local_position_by_orders.longPos),abs(self.local_position.shortPos - self.local_position_by_orders.shortPos))
- if "spot" not in self.exchange:
- diff_pos_value = diff_pos * self.strategy.mp
- if diff_pos_value > self.strategy._min_amount_value:
- msg = f"{self.acct_name} ***发现仓位异常*** 推算{self.local_position_by_orders.__dict__} 本地{self.local_position.__dict__}"
- print(msg)
- self.logger.warning(msg)
- self.position_check_series.append(1)
- else:
- self.position_check_series.append(0)
- if len(self.position_check_series) > 30:
- del(self.position_check_series[0])
- if sum(self.position_check_series) >= 30:
- msg = f"{self.acct_name} 合约连续检查本地仓位和推算仓位不相符 退出"
- print(msg)
- self.logger.warning(msg)
- self.exit_msg = msg
- self.stop()
- ###### 下单异常风控 ######
- if self.strategy.total_amount == 0.0:
- msg = f"{self.acct_name} 开仓量为零 退出"
- print(msg)
- self.logger.warning(msg)
- self.exit_msg = msg
- self.stop()
- ###### 行情更新异常风控 ######
- for name in self.ref_name:
- delay = round((time.time() - self.market_update_time[name]) * 1000, 3)
- if delay > utils.MARKET_DELAY_LIMIT: # thre
- msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- for name in [self.trade_name]:
- delay = round((time.time() - self.market_update_time[name]) * 1000, 3)
- if delay > utils.MARKET_DELAY_LIMIT: # thre
- msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- ###### 订单异常风控 ######
- for cid in self.local_orders:
- if time.time() - self.local_orders[cid]["localtime"] > 300: # 订单长时间停留 怀疑漏单 但未必一定漏 5min
- msg = f"{self.acct_name} cid:{cid} 订单停留过久 怀疑异常 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- ###### 持仓均价异常风控 ######
- if isinstance(self.strategy.long_pos_bias, float):
- # 偏离mp较大 且持仓较大 说明出现异常
- if self.strategy.long_hold_value > 2*self.strategy._min_amount_value:
- if self.strategy.long_pos_bias > 4.0 or self.strategy.long_pos_bias < -2.0:
- msg = f"{self.acct_name} long_pos_bias:{self.strategy.long_pos_bias} 持仓均价异常 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- if isinstance(self.strategy.short_pos_bias, float):
- # 偏离mp较大 且持仓较大 说明出现出现异常
- if self.strategy.short_hold_value > 2*self.strategy._min_amount_value:
- if self.strategy.short_pos_bias > 4.0 or self.strategy.short_pos_bias < -2.0:
- msg = f"{self.acct_name} short_pos_bias:{self.strategy.short_pos_bias} 持仓均价异常 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- ###### 订单撤单异常风控 ######
- for cid in self.local_cancel_log:
- if self.local_cancel_log[cid] > 300:
- msg = f"{self.acct_name} 订单长时间无法撤销 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- ###### 定价异常风控 ######
- if abs(self.strategy.ref_price-self.strategy.mp)/self.strategy.mp > 0.03:
- msg = f"{self.acct_name} 定价偏离过大 怀疑异常 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- async def exit(self, delay=0):
- '''退出操作'''
- try:
- self.logger.info(f"预约退出操作 delay:{delay}")
- if delay > 0:
- await asyncio.sleep(delay)
- self.logger.info(f"开始退出操作")
- self.logger.info("为避免api失效导致遗漏仓位 建议人工复查")
- await self.rest.check_position(hold_coin=self.hold_coin)
- # stop flag
- self.rest.stop_flag = 1
- self.ws.stop_flag = 1
- for i in self.ref_name:
- self.ws_ref[i].stop_flag = 1
- # double check 需要延迟几秒以便等待更新数据
- await asyncio.sleep(3)
- self.logger.info("双重检查遗漏仓位")
- await self.rest.check_position(hold_coin=self.hold_coin)
- self.logger.info(f'停机退出 停机原因 {self.exit_msg}')
- await asyncio.sleep(1)
- # 发送交易状态
- await self._post_params()
- # 压缩行情文件
- utils.csv_to_gz_and_remove()
- # close pid
- self.logger.info("退出进程")
- except:
- self.logger.error(traceback.format_exc())
- finally:
- os._exit(0)
- async def on_timer(self):
- '''定期触发系统逻辑'''
- await asyncio.sleep(20)
- while 1:
- try:
- # 10秒检查一次风控
- await asyncio.sleep(10)
- # 检查风控
- self.check_risk()
- # stop
- if self.mode_signal == 1:return
- # 计算预估成交额
- total_trade_value = self.local_buy_value + self.local_sell_value
- self.strategy.trade_vol_24h = round(total_trade_value / (time.time()-self.pid_start_time) * 86400 / 10000, 2)
- # 打印
- if int(self.params.log):
- self.strategy._print_summary()
- # 打印行情延迟监控
- self.logger.info('Rest 报单平均延迟 ' + str(self.rest.avg_delay) + 'ms ')
- self.logger.info('Rest 报单最高延迟 ' + str(self.rest.max_delay) + 'ms ')
- for name in self.market_update_interval:
- avg_interval = round(self.market_update_interval[name]*1e3, 2)
- self.logger.info(f'WS 盘口{name}行情 平均更新间隔 {avg_interval}ms')
- # 选择参数
- if self.backtest:
- self.choose_params()
- except asyncio.CancelledError:
- print('定期循环任务取消')
- except:
- print("定时循环系统出错")
- self.logger.error(traceback.print_exc())
- await asyncio.sleep(10)
- async def _post_params(self):
- '''推送交易信息'''
- profit = round(self.strategy.daily_return/self.strategy.leverrate,4)
- if time.time() - self.pid_start_time > utils.EARLY_STOP_SECOND * 0.5 or profit < 0.0:
- await utils._post_params(
- "http://wwww.khods.com:8888/post_params",
- self.params.proxy,
- ujson.dumps({
- "pwd":"123456",
- "exchange":self.params.exchange,
- "pair":self.params.pair,
- "open":self.params.open,
- "close":self.params.close,
- "refexchange":self.params.refexchange[self.strategy.ref_index],
- "profit":profit,
- })
- )
- else:
- self.logger.info("不满足推送过滤条件 放弃推送参数")
- async def post_loop(self):
- '''定期触发交易信息推送'''
- await asyncio.sleep(30)
- _interval = 60 # 定期推送一次盈利情况
- while 1:
- try:
- # 定期推送一次
- await asyncio.sleep(_interval)
- # 发送交易状态
- await self._post_params()
- except asyncio.CancelledError:
- print('post loop 循环任务取消')
- except:
- print("post loop 循环系统出错")
- self.logger.error(traceback.print_exc())
- await asyncio.sleep(10)
-
- async def early_stop_loop(self):
- '''定期触发交易信息推送'''
- if self.father:
- self.logger.info(f'以父进程方式启动 关闭早停检测')
- return
- else:
- self.logger.info(f'以子进程方式启动 开启早停检测')
- await asyncio.sleep(30)
- _interval = utils.EARLY_STOP_SECOND
- _last_equity = self.strategy.start_equity
- _last_local_profit = 0.0
- while 1:
- try:
- # 休眠
- await asyncio.sleep(_interval)
- ###### 子进程早停风控 ######
- self.logger.info(f'当前净值{self.strategy.equity} 上次检测时净值{_last_equity} 当前累积利润{self.local_profit} 上次检测时利润{_last_local_profit}')
- # 检查是否需要早停 没有成交 或者 亏损
- if self.strategy.equity <= _last_equity or self.local_profit <= _last_local_profit:
- self.logger.info('触发早停条件 当零持仓时退出')
- # 没有持仓
- for _ in range(30):
- await asyncio.sleep(5)
- if self.strategy.long_hold_value < self.strategy._min_amount_value and \
- self.strategy.short_hold_value < self.strategy._min_amount_value:
- msg = f"{self.acct_name} 子进程盈利状况不理想 提前停机 退出"
- self.logger.error(msg)
- self.exit_msg = msg
- self.stop()
- # 更新上一次检测的净值
- _last_equity = self.strategy.equity
- _last_local_profit = self.local_profit
- except asyncio.CancelledError:
- print('early stop 循环任务取消')
- except:
- print("early stop 循环系统出错")
- self.logger.error(traceback.print_exc())
- await asyncio.sleep(10)
- def on_agg_market(self):
- '''
- 处理聚合行情
- 1. 获取聚合行情
- 2. 更新预测器
- 3. 触发tick回测
- '''
- ### 更新聚合市场数据
- agg_market = self.get_all_market_data()
- ### 更新聚合市场信息
- self.tradeMsg.market = agg_market
- ### 更新预测器
- self.Predictor.onTime(agg_market)
- ### 触发回测
- if self.backtest:
- self.loop.create_task(self.real_time_back_test(self.tradeMsg))
- async def run_stratey(self):
- '''
- 定期触发策略
- '''
- print('定时触发器启动')
- # 准备交易
- try:
- print('前期准备完成')
- await asyncio.sleep(10)
- while 1:
- try:
- # 时间预设
- start_time = time.time()
- ### 是否准备充分
- if self.ready:
- ### 更新交易信息集合
- self.update_trade_msg()
- ### 触发策略
- if self.mode_signal == 0:
- pass
- # # 更新策略时间
- # self.strategy.local_time = time.time()
- # # 产生信号
- # orders = self.strategy.onTime(self.tradeMsg)
- # ### 记录指令触发信息
- # if self._not_empty(orders):
- # self.logger.debug("触发onTime")
- # self._update_local_orders(orders)
- # self.loop.create_task(self.rest.handle_signals(orders))
- # self.logger.debug(orders)
- else:
- if self.mode_signal > 1:self.mode_signal -= 1
- if self.mode_signal == 1:return
- # 触发策略
- # 更新策略时间
- self.strategy.local_time = time.time()
- # 获取信号
- # TODO mode_signal∈[21, +无穷) 表示什么?
- if self.mode_signal > 20:
- # 先执行onExit
- orders = self.strategy.onExit(self.tradeMsg)
- ### 记录指令触发信息
- if self._not_empty(orders):
- self.logger.debug("触发onExit")
- self._update_local_orders(orders)
- self.loop.create_task(self.rest.handle_signals(orders))
- self.logger.debug(orders)
- # TODO mode_signal∈[2, 20] 表示什么?
- else:
- # 再执行onSleep
- orders = self.strategy.onSleep(self.tradeMsg)
- ### 记录指令触发信息
- if self._not_empty(orders):
- self.logger.debug("触发onSleep")
- self._update_local_orders(orders)
- self.loop.create_task(self.rest.handle_signals(orders))
- self.logger.debug(orders)
- ############################################################
- else:
- self.check_ready()
- ### 计算耗时并进行休眠
- pass_time = time.time()-start_time
- await asyncio.sleep(utils.clip(self.interval-pass_time, 0.0, 1.0))
- except asyncio.CancelledError:
- print('策略触发任务取消')
- except:
- self.logger.error(traceback.format_exc())
- traceback.print_exc()
- await asyncio.sleep(10)
- except asyncio.CancelledError:
- print('策略触发任务取消')
- except:
- self.logger.error(traceback.format_exc())
- traceback.print_exc()
- await asyncio.sleep(10)
-
- def check_ready(self):
- '''
- 判断初始数据是否齐全
- '''
- ### 检查 ticker 行情
- for i in self.ref_name:
- if i not in self.tickers or self.tickers[i] == {}:
- print("参考盘口ticker未准备好")
- return
- else:
- if self.tickers[i]['bp'] == 0 or self.tickers[i]['ap'] == 0:
- print("参考盘口ticker未准备好")
- return
- if self.trade_name not in self.tickers or self.tickers[self.trade_name] == {}:
- print("交易盘口ticker未准备好")
- return
- else:
- if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0:
- print("交易盘口ticker未准备好")
- return
- ### 检查 market 行情
- all_market = self.get_all_market_data()
- if len(all_market) != utils.LEN*(1+self.ref_num):
- print("聚合行情未准备好")
- return
- else:
- # 如果行情已经就绪 预热trademsg和predictor
- print("聚合行情准备就绪")
- self.tradeMsg.market = all_market
- self.Predictor.onTime(all_market)
- self.ready = 1
- def stop(self):
- '''
- 停机函数
- mode_signal 不能小于80
- 前6秒用于maker平仓
- 后2秒用于撤maker平仓单
- 休眠2秒再执行check_position 避免卡单导致漏仓位
- '''
- self.logger.info(f'进入停机流程...')
- self.mode_signal = 80
- # 等strategy onExit 彻底执行完毕 进入沉默状态之后 再进入exit 否则可能导致多处同时操作订单
- # 尽量减少大仓位直接take平
- self.loop.create_task(self.exit(delay=10))
- async def _run_server(self):
- print('server正在启动...')
- for _ in range(30):
- await asyncio.sleep(5)
- if self.strategy.equity > 0.0:break
- app = web.Application()
- app.router.add_route('GET', '/account', self.server_handle)
- app.router.add_route('POST', '/change', self.change)
- try:
- self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False))
- except:
- self.logger.error(f"Server启动失败")
- self.logger.error(traceback.format_exc())
- self.exit_msg = "服务启动失败 停机退出"
- self.stop()
- def run(self):
- '''启动ws行情获取'''
- def keyboard_interrupt(s, f):
- self.logger.info("收到退出信号 准备关机")
- self.stop()
- try:
- signal.signal(signal.SIGINT, keyboard_interrupt)
- signal.signal(signal.SIGTERM, keyboard_interrupt)
- if 'win' not in sys.platform:
- signal.signal(signal.SIGKILL, keyboard_interrupt)
- signal.signal(signal.SIGQUIT, keyboard_interrupt)
- except:
- pass
- self.loop.create_task(self.before_trade())
- # TODO 启动方式干嘛用的?为什么要判断?
- print(f'判断启动方式...')
- if self.father:
- print('以父进程方式启动 最大允许运行时间为30天')
- self.loop.create_task(self.exit(delay=60*60*24*30))
- else:
- print('以子进程方式启动 最大允许运行时间为60分钟')
- self.loop.create_task(self.exit(delay=utils.CHILD_RUN_SECOND))
-
- self.loop.run_forever()
- if __name__ == "__main__":
- if 0:
- utils.check_auth()
- if 0:
- utils.check_time()
- pnum = len(sys.argv)
- if pnum > 0:
- fname = None
- log_file = None
- pidnum = None
- father = 1
- for i in range(pnum):
- print(f"第{i}个参数为:{sys.argv[i]}")
- if sys.argv[i] == '-c' or sys.argv[i] == '--c':
- fname = sys.argv[i+1]
- elif sys.argv[i] == '-h':
- print("帮助文档")
- elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file':
- log_file = sys.argv[i+1]
- elif sys.argv[i] == '-num' or sys.argv[i] == '--num':
- pidnum = sys.argv[i+1]
- elif sys.argv[i] == '-v' or sys.argv[i] == '--v':
- print(f"当前版本为 V{VERSION}")
- elif sys.argv[i] == '-child' or sys.argv[i] == '--child':
- father = 0
- print(f"当前以子进程方式启动")
- if fname and log_file and pidnum:
- print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum} father:{father}")
- date = time.strftime("%Y%m%d", time.localtime())
- logname = f"{log_file}-{date}"
- quant = Quant(utils.get_params(fname), logname, father)
- quant.run()
- elif fname:
- print(f"运行指定配置文件{fname}")
- quant = Quant(utils.get_params(fname),father=father)
- quant.run()
- else:
- print("缺少指定参数 运行默认配置文件")
- fname = 'config.toml'
- quant = Quant(utils.get_params(fname),father=father)
- quant.run()
- else:
- fname = 'config.toml'
- quant = Quant(utils.get_params(fname))
- quant.run()
|