|
|
@@ -0,0 +1,1473 @@
|
|
|
+import asyncio
|
|
|
+from aiohttp import web
|
|
|
+import traceback
|
|
|
+import time, csv
|
|
|
+import strategy as strategy
|
|
|
+import utils
|
|
|
+import model
|
|
|
+import logging, logging.handlers
|
|
|
+import signal
|
|
|
+import os, json, sys
|
|
|
+import predictor
|
|
|
+import backtest
|
|
|
+import multiprocessing
|
|
|
+import random
|
|
|
+import psutil
|
|
|
+import ujson
|
|
|
+import broker
|
|
|
+from decimal import Decimal
|
|
|
+
|
|
|
+VERSION = utils.VERSION
|
|
|
+
|
|
|
+def timeit(func):
|
|
|
+ def wrapper(*args, **kwargs):
|
|
|
+ nowTime = time.time()
|
|
|
+ res = func(*args, **kwargs)
|
|
|
+ spend_time = time.time() - nowTime
|
|
|
+ spend_time = round(spend_time * 1000, 5)
|
|
|
+ print(f'{func.__name__} 耗时 {spend_time} ms')
|
|
|
+ return res
|
|
|
+ return wrapper
|
|
|
+
|
|
|
+class Quant:
|
|
|
+
|
|
|
+ def __init__(self, params:model.Config, logname="test_logname", father=1):
|
|
|
+ print('############### 超级无敌韭菜收割机 ################')
|
|
|
+ print(f'>>> 版本 {VERSION} <<<')
|
|
|
+ print('*** 当前配置')
|
|
|
+ self.params = params
|
|
|
+ for p in self.params.__dict__:
|
|
|
+ print('***', p, ' => ', getattr(self.params, p))
|
|
|
+ print('##################################################')
|
|
|
+ self.logger = self.get_logger(logname)
|
|
|
+ self.csvname = logname + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|
|
+ pid = os.getpid()
|
|
|
+ self.pid_start_time = time.time()
|
|
|
+ self.logger.info(f"进程号{pid} 启动时间{self.pid_start_time}")
|
|
|
+ ##### 绑定cpu
|
|
|
+ cpu_count = psutil.cpu_count()
|
|
|
+ print("检测cpu核心负载")
|
|
|
+ cpu_used_pct = [0.0 for _ in range(cpu_count)]
|
|
|
+ for _ in range(random.randint(5,15)):
|
|
|
+ r = psutil.cpu_times_percent(percpu=True)
|
|
|
+ for i in range(cpu_count):
|
|
|
+ cpu_used_pct[i] += int(r[i].user)
|
|
|
+ time.sleep(1)
|
|
|
+ print(cpu_used_pct)
|
|
|
+ cpu_id = cpu_used_pct.index(min(cpu_used_pct))
|
|
|
+ print(f"当前负载最低的cpu为:{cpu_id}")
|
|
|
+ self.process = psutil.Process(pid)
|
|
|
+ print(f"核心数{cpu_count} 目标绑定cpu:{cpu_id}")
|
|
|
+ os.system(f"taskset -cp {cpu_id} {pid}")
|
|
|
+ print("调整系统调度优先级为最高等级")
|
|
|
+ if 'win' not in sys.platform:
|
|
|
+ print(os.nice(-20))
|
|
|
+ #### cpu 内存 平均占用
|
|
|
+ self.cpu_ema = 0.0
|
|
|
+ self.mm_ema = 0.0
|
|
|
+ #####
|
|
|
+ self.acct_name = self.params.account_name
|
|
|
+ self.symbol = self.params.pair
|
|
|
+ self.base = self.params.pair.split('_')[0].upper()
|
|
|
+ self.quote = self.params.pair.split('_')[1].upper()
|
|
|
+ if 1:
|
|
|
+ ### 使用uvloop
|
|
|
+ if 'win' not in sys.platform:
|
|
|
+ print('采用高速事件循环库')
|
|
|
+ import uvloop
|
|
|
+ self.loop = uvloop.new_event_loop()
|
|
|
+ else:
|
|
|
+ print('采用普通事件循环库')
|
|
|
+ self.loop = asyncio.get_event_loop()
|
|
|
+ else:
|
|
|
+ ### 使用原生loop
|
|
|
+ self.loop = asyncio.get_event_loop()
|
|
|
+ self.strategy = strategy.Strategy(self.params, is_print=1)
|
|
|
+ ###### 判断启动方式
|
|
|
+ self.father = father
|
|
|
+ print(f"父进程标识 {self.father}")
|
|
|
+ ##### 现货底仓
|
|
|
+ hold_coin = float(self.params.hold_coin)
|
|
|
+ self.hold_coin = utils.clip(hold_coin, 0.0, 10000.0)
|
|
|
+ ##### 本地状态量
|
|
|
+ self.data = dict()
|
|
|
+ self.total_equity = 0.0
|
|
|
+ self.local_orders = dict() # 本地挂单表
|
|
|
+ self.local_orders_backup = dict() # 本地订单缓存队列
|
|
|
+ self.local_orders_backup_cid = [] # 本地订单缓存cid队列
|
|
|
+ self.handled_orders_cid = [] # 本地已处理cid缓存队列
|
|
|
+ self.local_profit = 0.0
|
|
|
+ self.local_cash = 0.0 # 本地U保证金
|
|
|
+ self.local_coin = 0.0 # 本地币保证金
|
|
|
+ self.local_position = model.Position()
|
|
|
+ self.local_position_by_orders = model.Position()
|
|
|
+ self.local_buy_amount = 0.0
|
|
|
+ self.local_sell_amount = 0.0
|
|
|
+ self.local_buy_value = 0.0
|
|
|
+ self.local_sell_value = 0.0
|
|
|
+ self.local_cancel_log = dict()
|
|
|
+ self.interval = float(self.params.interval)
|
|
|
+ self.exchange = self.params.exchange
|
|
|
+ self.tradeMsg = model.TraderMsg()
|
|
|
+ self.exit_msg = "正常退出"
|
|
|
+ self.save = int(self.params.save) # 保存行情数据
|
|
|
+ self.logger.info(f"实时行情数据记录开关:{self.save}")
|
|
|
+ # 仓位检查结果序列
|
|
|
+ self.position_check_series = []
|
|
|
+ # 止损大小
|
|
|
+ self.stoploss = float(self.params.stoploss)
|
|
|
+ # 资金使用率
|
|
|
+ self.used_pct = float(self.params.used_pct) #使用资金比例
|
|
|
+ # 启停信号 0 表示运行 大于1开始倒计时 1时停机
|
|
|
+ self.mode_signal = 0
|
|
|
+ # 交易盘口订单流更新时间
|
|
|
+ self.trade_order_update_time = time.time()
|
|
|
+ # onTick触发时间记录
|
|
|
+ self.on_tick_event_time = time.time()
|
|
|
+ # 盘口ticker depth信息
|
|
|
+ self.tickers = dict()
|
|
|
+ self.depths = dict()
|
|
|
+ # 行情更新延迟监控
|
|
|
+ self.market_update_time = dict()
|
|
|
+ self.market_update_interval = dict()
|
|
|
+ # 参考盘口名称
|
|
|
+ refex = self.params.refexchange
|
|
|
+ refpair = self.params.refpair
|
|
|
+ if len(refex) != len(refpair):
|
|
|
+ self.logger.error("参考盘口数不等于参考品种数 退出")
|
|
|
+ raise Exception("参考盘口数不等于参考品种数 退出")
|
|
|
+ self.ref_num = len(refex)
|
|
|
+ self.ref_name = []
|
|
|
+ for i in range(self.ref_num):
|
|
|
+ if refex[i] not in broker.exchange_lists:
|
|
|
+ self.logger.error("出现不支持的参考盘口")
|
|
|
+ raise Exception("出现不支持的参考盘口")
|
|
|
+ name = refex[i] + '@' + refpair[i] + '@ref'
|
|
|
+ self.ref_name.append(name)
|
|
|
+ self.tickers[name] = dict()
|
|
|
+ self.depths[name] = list()
|
|
|
+ self.market_update_time[name] = 0.0
|
|
|
+ self.market_update_interval[name] = 0.0
|
|
|
+ # 参考盘口tick更新时间
|
|
|
+ # 服务器私有ip地址检查
|
|
|
+ ipList = utils.get_local_ip_list()
|
|
|
+ ipListNum = len(ipList)
|
|
|
+ if int(self.params.ip) >= ipListNum:
|
|
|
+ raise Exception("指定私有ip地址序号不存在")
|
|
|
+ # 创建ws实例
|
|
|
+ name = self.exchange+'@'+self.params.pair
|
|
|
+ self.trade_name = name
|
|
|
+ self.market_update_time[name] = 0.0
|
|
|
+ self.market_update_interval[name] = 0.0
|
|
|
+ self.tickers[name] = dict()
|
|
|
+ self.depths[name] = list()
|
|
|
+ cp = model.ClientParams()
|
|
|
+ cp.name = self.trade_name
|
|
|
+ cp.pair = self.params.pair
|
|
|
+ cp.access_key = self.params.access_key
|
|
|
+ cp.secret_key = self.params.secret_key
|
|
|
+ cp.pass_key = self.params.pass_key
|
|
|
+ cp.interval = self.params.interval
|
|
|
+ cp.broker_id = self.params.broker_id
|
|
|
+ cp.debug = self.params.debug
|
|
|
+ cp.proxy = self.params.proxy
|
|
|
+ cp.interval = self.params.interval
|
|
|
+ cp.ip = int(self.params.ip)
|
|
|
+ self.ws = broker.newWs(self.exchange)(
|
|
|
+ params=cp,
|
|
|
+ colo=int(self.params.colo),
|
|
|
+ is_print=0,
|
|
|
+ )
|
|
|
+ self.ws.logger = self.logger
|
|
|
+ self.ready = 0
|
|
|
+ # rest实例
|
|
|
+ self.rest = broker.newRest(self.exchange)(cp, colo=int(self.params.colo))
|
|
|
+ self.ws_ref = dict()
|
|
|
+ # 参考盘口 ws 实例
|
|
|
+ for i in range(self.ref_num):
|
|
|
+ cp = model.ClientParams()
|
|
|
+ cp.name = self.ref_name[i]
|
|
|
+ cp.pair = self.params.refpair[i]
|
|
|
+ cp.proxy = self.params.proxy
|
|
|
+ cp.interval = self.params.interval
|
|
|
+ cp.ip = int(self.params.ip)
|
|
|
+ exchange = self.params.refexchange[i]
|
|
|
+ if exchange not in broker.exchange_lists:
|
|
|
+ self.logger.error("参考盘口名称错误 退出")
|
|
|
+ return
|
|
|
+ _colo = 0
|
|
|
+ if self.params.refexchange[i] == self.params.exchange and \
|
|
|
+ self.params.refpair[i] == self.params.pair and int(self.params.colo):
|
|
|
+ _colo = 1
|
|
|
+ self.ws_ref[self.ref_name[i]] = broker.newWs(exchange)(cp, colo=_colo)
|
|
|
+ self.ws_ref[self.ref_name[i]].callback['onTicker']=self.update_ticker
|
|
|
+ self.ws_ref[self.ref_name[i]].callback['onDepth']=self.update_depth
|
|
|
+ self.ws_ref[self.ref_name[i]].logger = self.logger
|
|
|
+ # 添加回调
|
|
|
+ self.ws.callback = {
|
|
|
+ 'onTicker':self.update_ticker,
|
|
|
+ 'onDepth':self.update_depth,
|
|
|
+ 'onPosition':self.update_position,
|
|
|
+ 'onEquity':self.update_equity,
|
|
|
+ 'onOrder':self.update_order,
|
|
|
+ 'onExit':self.update_exit,
|
|
|
+ }
|
|
|
+ self.rest.callback = {
|
|
|
+ 'onTicker':self.update_ticker,
|
|
|
+ 'onDepth':self.update_depth,
|
|
|
+ 'onPosition':self.update_position,
|
|
|
+ 'onEquity':self.update_equity,
|
|
|
+ 'onOrder':self.update_order,
|
|
|
+ 'onExit':self.update_exit,
|
|
|
+ }
|
|
|
+ self.rest.logger = self.logger
|
|
|
+ # 配置策略
|
|
|
+ self.strategy.logger = self.logger
|
|
|
+ # 配置定价模型
|
|
|
+ price_alpha = []
|
|
|
+ for i in self.params.refpair:
|
|
|
+ # 交易1000shib 参考 shib
|
|
|
+ if '1000' in self.params.pair and '1000' not in i:
|
|
|
+ price_alpha.append(1000.0)
|
|
|
+ # 交易shib 参考 1000shib
|
|
|
+ elif '1000' not in self.params.pair and '1000' in i:
|
|
|
+ price_alpha.append(0.001)
|
|
|
+ else:
|
|
|
+ # 交易shib 参考 shib
|
|
|
+ price_alpha.append(1.0)
|
|
|
+ self.logger.info(f'价格系数{price_alpha}')
|
|
|
+ self.Predictor = predictor.Predictor(ref_name=self.ref_name, alpha=price_alpha, gamma=float(self.params.gamma))
|
|
|
+ # 初始化参数
|
|
|
+ self.strategy.trade_open_dist = float(self.params.open)
|
|
|
+ self.strategy.trade_close_dist = float(self.params.close)
|
|
|
+ # 在线训练
|
|
|
+ self.backtest = int(self.params.backtest)
|
|
|
+ self.logger.info(f'在线训练开关 {self.backtest}')
|
|
|
+ ####
|
|
|
+ time.sleep(3)
|
|
|
+
|
|
|
+ def get_logger(self, logname):
|
|
|
+ '''日志模块'''
|
|
|
+ logger = logging.getLogger(__name__)
|
|
|
+ # log flag
|
|
|
+ if int(self.params.log):
|
|
|
+ log_level = logging.DEBUG
|
|
|
+ logger.setLevel(log_level)
|
|
|
+ # log to txt
|
|
|
+ formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
|
|
|
+ if logname == None: logname = "log"
|
|
|
+ handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8')
|
|
|
+ handler.setLevel(log_level)
|
|
|
+ handler.setFormatter(formatter)
|
|
|
+ # log to console
|
|
|
+ console = logging.StreamHandler()
|
|
|
+ console.setLevel(logging.INFO)
|
|
|
+ # add
|
|
|
+ logger.addHandler(handler)
|
|
|
+ logger.addHandler(console)
|
|
|
+ else:
|
|
|
+ log_level = logging.INFO
|
|
|
+ logger.setLevel(log_level)
|
|
|
+ # log to txt
|
|
|
+ formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
|
|
|
+ if logname == None: logname = "log"
|
|
|
+ handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8')
|
|
|
+ handler.setLevel(log_level)
|
|
|
+ handler.setFormatter(formatter)
|
|
|
+ # add
|
|
|
+ logger.addHandler(handler)
|
|
|
+ logger.info('开启日志记录')
|
|
|
+ return logger
|
|
|
+
|
|
|
+ def update_order(self, data):
|
|
|
+ self.loop.create_task(self._update_order(data))
|
|
|
+
|
|
|
+ async def _update_order(self, data):
|
|
|
+ '''
|
|
|
+ 更新订单
|
|
|
+ 首先直接复写本地订单
|
|
|
+ 1、如果是开仓单
|
|
|
+ 如果新增: 增加本地订单
|
|
|
+ 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位
|
|
|
+ 如果成交: 删除本地订单 发送平仓订单 修改本地仓位
|
|
|
+ 2、如果是平仓单
|
|
|
+ 如果新增: 增加本地订单
|
|
|
+ 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位
|
|
|
+ 如果成交: 删除本地订单 修改本地仓位
|
|
|
+ NEW 可以从 ws / rest 来
|
|
|
+ REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
|
|
|
+ 为了防止下单失败依然有订单成交 本地需要做一个缓存
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ # 触发订单更新
|
|
|
+ self.trade_order_update_time = time.time()
|
|
|
+ # 新增订单推送 仅需要cid oid信息
|
|
|
+ if data['status'] == 'NEW':
|
|
|
+ # 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控)
|
|
|
+ if data['client_id'] in self.local_orders:
|
|
|
+ self.local_orders[data['client_id']]["order_id"] = data['order_id']
|
|
|
+ self.local_orders[data['client_id']]["localtime"] = time.time()
|
|
|
+ # 完成订单推送 仅需要cid filled filled_size信息
|
|
|
+ elif data['status'] == 'REMOVE':
|
|
|
+ # 如果在撤单记录中 说明此订单结束生命周期 可以移除记录
|
|
|
+ if data["client_id"] in self.local_cancel_log:
|
|
|
+ del(self.local_cancel_log[data["client_id"]])
|
|
|
+ # 在cid缓存队列中 说明是本策略的订单
|
|
|
+ if data["client_id"] in self.local_orders_backup:
|
|
|
+ # 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算
|
|
|
+ if data['client_id'] not in self.handled_orders_cid:
|
|
|
+ # 添加进已处理队列
|
|
|
+ self.handled_orders_cid.append(data["client_id"])
|
|
|
+ # 提取成交信息 方向 价格 量
|
|
|
+ filled = data["filled"]
|
|
|
+ side = self.local_orders_backup[data['client_id']]["side"]
|
|
|
+ if "filled_price" in data:
|
|
|
+ if data["filled_price"] > 0.0:
|
|
|
+ filled_price = data["filled_price"]
|
|
|
+ else:
|
|
|
+ filled_price = self.local_orders_backup[data['client_id']]["price"]
|
|
|
+ else:
|
|
|
+ filled_price = self.local_orders_backup[data['client_id']]["price"]
|
|
|
+ # 只有开仓成交才触发onPosition
|
|
|
+ # 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况
|
|
|
+ if filled > 0:
|
|
|
+ if "spot" in self.exchange:# 如果是现货交易 还需要修改equity
|
|
|
+ ### 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
|
|
|
+ fee = data["fee"]
|
|
|
+ ### 现货订单流仓位计算
|
|
|
+ if side == "kd": # buy
|
|
|
+ self.local_buy_amount += filled - fee
|
|
|
+ self.local_buy_value += (filled - fee) * filled_price
|
|
|
+ new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)) - Decimal(str(fee)))
|
|
|
+ if new_long_pos == 0.0:
|
|
|
+ self.local_position_by_orders.longAvg = 0.0
|
|
|
+ self.local_position_by_orders.longPos = 0.0
|
|
|
+ else:
|
|
|
+ self.local_position_by_orders.longAvg = \
|
|
|
+ (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos
|
|
|
+ self.local_position_by_orders.longPos = new_long_pos
|
|
|
+ self.local_cash -= filled * filled_price
|
|
|
+ self.local_coin += filled - fee
|
|
|
+ elif side == "pd": # sell
|
|
|
+ self.local_sell_amount += filled
|
|
|
+ self.local_sell_value += filled * filled_price
|
|
|
+ self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg)
|
|
|
+ new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled)))
|
|
|
+ if new_long_pos == 0.0:
|
|
|
+ self.local_position_by_orders.longAvg = 0.0
|
|
|
+ self.local_position_by_orders.longPos = 0.0
|
|
|
+ else:
|
|
|
+ self.local_position_by_orders.longPos = new_long_pos
|
|
|
+ self.local_cash += filled * filled_price - fee
|
|
|
+ self.local_coin -= filled
|
|
|
+ elif side == "pk": # buy
|
|
|
+ self.local_buy_amount += filled - fee
|
|
|
+ self.local_buy_value += (filled - fee) * filled_price
|
|
|
+ self.local_profit += filled * (self.local_position_by_orders.shortAvg - filled_price)
|
|
|
+ new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)) - Decimal(str(fee)))
|
|
|
+ if new_short_pos == 0.0:
|
|
|
+ self.local_position_by_orders.shortAvg = 0.0
|
|
|
+ self.local_position_by_orders.shortPos = 0.0
|
|
|
+ else:
|
|
|
+ self.local_position_by_orders.shortPos = new_short_pos
|
|
|
+ self.local_cash -= filled * filled_price
|
|
|
+ self.local_coin += filled - fee
|
|
|
+ elif side == "kk": # sell
|
|
|
+ self.local_sell_amount += filled
|
|
|
+ self.local_sell_value += filled * filled_price
|
|
|
+ new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled)))
|
|
|
+ if new_short_pos == 0.0:
|
|
|
+ self.local_position_by_orders.shortAvg = 0.0
|
|
|
+ self.local_position_by_orders.shortPos = 0.0
|
|
|
+ else:
|
|
|
+ self.local_position_by_orders.shortAvg = \
|
|
|
+ (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos
|
|
|
+ self.local_position_by_orders.shortPos = new_short_pos
|
|
|
+ self.local_cash += filled * filled_price - fee
|
|
|
+ self.local_coin -= filled
|
|
|
+ else:
|
|
|
+ self.logger.error(f"错误的仓位方向{side}")
|
|
|
+ else:
|
|
|
+ ### 合约订单流仓位计算
|
|
|
+ if side == "kd":
|
|
|
+ self.local_buy_amount += filled
|
|
|
+ self.local_buy_value += filled * filled_price
|
|
|
+ new_long_pos = (self.local_position_by_orders.longPos + filled)
|
|
|
+ if new_long_pos == 0.0:
|
|
|
+ self.local_position_by_orders.longAvg = 0
|
|
|
+ self.local_position_by_orders.longPos = 0
|
|
|
+ else:
|
|
|
+ self.local_position_by_orders.longAvg = \
|
|
|
+ (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos
|
|
|
+ self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)))
|
|
|
+ elif side == "kk":
|
|
|
+ self.local_sell_amount += filled
|
|
|
+ self.local_sell_value += filled * filled_price
|
|
|
+ new_short_pos = (self.local_position_by_orders.shortPos + filled)
|
|
|
+ if new_short_pos == 0.0:
|
|
|
+ self.local_position_by_orders.shortAvg = 0
|
|
|
+ self.local_position_by_orders.shortPos = 0
|
|
|
+ else:
|
|
|
+ self.local_position_by_orders.shortAvg = \
|
|
|
+ (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos
|
|
|
+ self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled)))
|
|
|
+ elif side == "pd":
|
|
|
+ self.local_sell_amount += filled
|
|
|
+ self.local_sell_value += filled * filled_price
|
|
|
+ self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg)
|
|
|
+ self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled)))
|
|
|
+ if self.local_position_by_orders.longPos == 0:self.local_position_by_orders.longAvg = 0
|
|
|
+ elif side == "pk":
|
|
|
+ self.local_buy_amount += filled
|
|
|
+ self.local_buy_value += filled * filled_price
|
|
|
+ self.local_profit += filled * (self.local_position_by_orders.shortAvg-filled_price)
|
|
|
+ self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)))
|
|
|
+ if self.local_position_by_orders.shortPos == 0:self.local_position_by_orders.shortAvg = 0
|
|
|
+ else:
|
|
|
+ self.logger.error(f"错误的仓位方向{side}")
|
|
|
+ # 统计合约交易手续费 正fee为扣手续费 负fee为返佣
|
|
|
+ if 'fee' in data:
|
|
|
+ self.local_profit -= data['fee']
|
|
|
+ self.logger.debug('更新推算仓位'+str(self.local_position_by_orders.__dict__))
|
|
|
+ ###
|
|
|
+ self._print_local_trades_summary()
|
|
|
+ # 每次有订单变动就触发一次策略
|
|
|
+ if self.mode_signal == 0 and self.ready:
|
|
|
+ ### 更新交易数据
|
|
|
+ self.update_trade_msg()
|
|
|
+ ### 触发策略挂单逻辑
|
|
|
+ # 更新策略时间
|
|
|
+ self.strategy.local_time = time.time()
|
|
|
+ orders = self.strategy.onTime(self.tradeMsg)
|
|
|
+ ### 记录指令触发信息
|
|
|
+ if self._not_empty(orders):
|
|
|
+ self.logger.debug("触发onOrder")
|
|
|
+ self._update_local_orders(orders)
|
|
|
+ self.loop.create_task(self.rest.handle_signals(orders))
|
|
|
+ self.logger.debug(orders)
|
|
|
+ else:
|
|
|
+ self.logger.debug(f"订单已经参与过仓位计算 拒绝重复进行计算{data['client_id']}")
|
|
|
+ else:
|
|
|
+ self.logger.debug(f"订单不属于本策略 拒绝进行仓位计算{data['client_id']}")
|
|
|
+ # 移除本地订单
|
|
|
+ if data["client_id"] in self.local_orders:
|
|
|
+ self.logger.debug(['删除本地订单', data["client_id"]])
|
|
|
+ del(self.local_orders[data["client_id"]])
|
|
|
+ else:
|
|
|
+ self.logger.debug(['该订单不在本地挂单表中', data["client_id"]])
|
|
|
+ else:
|
|
|
+ print(data)
|
|
|
+ self.logger.debug(f"未知的订单事件类型 {data}")
|
|
|
+ except Exception as e:
|
|
|
+ print("处理订单推送出错:"+str(e))
|
|
|
+ self.logger.error("处理订单推送出错:"+str(e))
|
|
|
+ self.logger.error(traceback.format_exc())
|
|
|
+ self.exit_msg="处理订单推送出错"
|
|
|
+ self.stop()
|
|
|
+
|
|
|
+ def _update_local_orders(self, orders):
|
|
|
+ """
|
|
|
+ 本地记录所有报单信息
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ for i in orders:
|
|
|
+ if "Limits" in i:
|
|
|
+ for j in orders[i]:
|
|
|
+ order_info = dict()
|
|
|
+ order_info['symbol'] = self.symbol
|
|
|
+ order_info['amount'] = float(j[0])
|
|
|
+ order_info['side'] = j[1]
|
|
|
+ order_info['price'] = float(j[2])
|
|
|
+ order_info['client_id'] = j[3]
|
|
|
+ order_info['filled_price'] = 0
|
|
|
+ order_info['filled'] = 0
|
|
|
+ order_info['order_id'] = ""
|
|
|
+ order_info['localtime'] = self.strategy.local_time
|
|
|
+ order_info['createtime'] = self.strategy.local_time
|
|
|
+ self.local_orders[j[3]] = order_info # 本地挂单表
|
|
|
+ self.logger.debug(['新增本地订单', order_info])
|
|
|
+ self.local_orders_backup[j[3]] = order_info # 本地缓存表
|
|
|
+ self.local_orders_backup_cid.append(j[3]) # 本地缓存cid表
|
|
|
+ if 'Cancel' in i:
|
|
|
+ # 记录撤单次数
|
|
|
+ cid = orders[i][0]
|
|
|
+ if cid in self.local_cancel_log:
|
|
|
+ self.local_cancel_log[cid] += 1
|
|
|
+ else:
|
|
|
+ self.local_cancel_log[cid] = 0
|
|
|
+ # 清除过于久远的历史记录
|
|
|
+ if len(self.local_orders_backup_cid) > 9999:
|
|
|
+ cid = self.local_orders_backup_cid[0]
|
|
|
+ # 判断是否超过1个小时 如果超过则移除历史记录
|
|
|
+ if cid in self.local_orders_backup:
|
|
|
+ if time.time() - self.local_orders_backup[cid]["localtime"] > 3600:
|
|
|
+ del(self.local_orders_backup[cid])
|
|
|
+ del(self.local_orders_backup_cid[0])
|
|
|
+ if len(self.handled_orders_cid) > 9999:
|
|
|
+ del(self.handled_orders_cid[0])
|
|
|
+ except:
|
|
|
+ self.logger.error("本地记录订单信息出错")
|
|
|
+ self.logger.error(traceback.format_exc())
|
|
|
+ self.exit_msg="本地记录订单信息出错"
|
|
|
+ self.stop()
|
|
|
+
|
|
|
+ def _not_empty(self, orders):
|
|
|
+ '''检查指令是否不为空'''
|
|
|
+ if isinstance(orders, dict):
|
|
|
+ for order_name in orders:
|
|
|
+ if "Cancel" in order_name or "Check" in order_name:
|
|
|
+ return 1
|
|
|
+ elif "Limits_open" in order_name:
|
|
|
+ if len(orders["Limits_open"]) > 0:
|
|
|
+ return 1
|
|
|
+ elif "Limits_close" in order_name:
|
|
|
+ if len(orders["Limits_close"]) > 0:
|
|
|
+ return 1
|
|
|
+ return 0
|
|
|
+
|
|
|
+ def _print_local_trades_summary(self):
|
|
|
+ '''计算本地累计利润'''
|
|
|
+ ###
|
|
|
+ local_buy_amount = round(self.local_buy_amount,5)
|
|
|
+ local_buy_value = round(self.local_buy_value,5)
|
|
|
+ local_sell_amount = round(self.local_sell_amount,5)
|
|
|
+ local_sell_value = round(self.local_sell_value,5)
|
|
|
+ local_profit = 0.0
|
|
|
+ if isinstance(self.strategy.mp, float):
|
|
|
+ unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp
|
|
|
+ realized = local_sell_value - local_buy_value
|
|
|
+ local_profit = round(unrealized+realized,5)
|
|
|
+ self.strategy.local_profit = local_profit
|
|
|
+ ###
|
|
|
+ msg = f"买量{local_buy_amount} 卖量{local_sell_amount} 买额{local_buy_value} 卖额{local_sell_value} 利润 {local_profit}"
|
|
|
+ self.logger.info(msg)
|
|
|
+
|
|
|
+ def update_position(self, data):
|
|
|
+ '''
|
|
|
+ 更新仓位信息
|
|
|
+ '''
|
|
|
+ if data != self.local_position:
|
|
|
+ self.local_position = data
|
|
|
+ self.logger.debug('更新本地仓位'+str(self.local_position.__dict__))
|
|
|
+
|
|
|
+ """
|
|
|
+ 2023-2-22
|
|
|
+ 用create_task去执行,会延迟,占用越大,延迟越大,可能会延迟100ms计算
|
|
|
+ """
|
|
|
+ def update_ticker(self, data):
|
|
|
+ '''
|
|
|
+ 增加onticker撤单 可能会导致平仓难度加大
|
|
|
+ '''
|
|
|
+ self.loop.create_task(self._update_ticker(data))
|
|
|
+
|
|
|
+ def update_depth(self, data):
|
|
|
+ self.loop.create_task(self._update_depth(data))
|
|
|
+
|
|
|
+ async def _update_ticker(self, data):
|
|
|
+ '''
|
|
|
+ update ticker infomation
|
|
|
+ '''
|
|
|
+ name = data['name']
|
|
|
+ # 记录tick更新时间
|
|
|
+ # self.market_update_time[name] = time.time()
|
|
|
+ self.tickers[name] = data
|
|
|
+ ### 判断是否需要触发ontick
|
|
|
+ if name == self.ref_name[self.strategy.ref_index]:
|
|
|
+ pass
|
|
|
+ elif name == self.trade_name:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # @utils.timeit
|
|
|
+ async def _update_depth(self, data):
|
|
|
+ '''
|
|
|
+ update orderbook infomation
|
|
|
+ '''
|
|
|
+ name = data['name']
|
|
|
+ now_time = time.time()
|
|
|
+
|
|
|
+ if self.market_update_time[name] == 0.0:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ interval = now_time - self.market_update_time[name]
|
|
|
+ if self.market_update_interval[name] == 0.0:
|
|
|
+ self.market_update_interval[name] = interval
|
|
|
+ else:
|
|
|
+ self.market_update_interval[name] = self.market_update_interval[name]*0.999 + interval*0.001
|
|
|
+ self.market_update_time[name] = now_time
|
|
|
+ ### 初始化depths
|
|
|
+ if self.depths[name] == list():
|
|
|
+ self.depths[name] = data['data']
|
|
|
+ ### 判断是否需要触发ondepth
|
|
|
+ # 如果是交易盘口
|
|
|
+ if name == self.trade_name:
|
|
|
+ ### 更新depths
|
|
|
+ self.depths[name] = data['data']
|
|
|
+ # 允许交易
|
|
|
+ if self.mode_signal == 0 and self.ready:
|
|
|
+ ### 聚合行情处理
|
|
|
+ self.on_agg_market()
|
|
|
+ ### 判断是否为当前跟踪的盘口
|
|
|
+ elif name == self.ref_name[self.strategy.ref_index]:
|
|
|
+ ### 判断是否需要触发ontick 对行情进行过滤
|
|
|
+ ### 过滤条件 价格变化很大 时间间隔很长
|
|
|
+ flag = 0
|
|
|
+ if abs(data['data'][utils.BP_INDEX] - self.depths[name][utils.BP_INDEX])/data['data'][utils.BP_INDEX] > 0.0002 or \
|
|
|
+ abs(data['data'][utils.AP_INDEX] - self.depths[name][utils.AP_INDEX])/data['data'][utils.AP_INDEX] > 0.0002 or \
|
|
|
+ time.time() - self.on_tick_event_time > 0.05:
|
|
|
+ ### 允许交易
|
|
|
+ flag = 1
|
|
|
+ ### 更新ontick触发时间记录
|
|
|
+ self.on_tick_event_time = time.time()
|
|
|
+ ### 更新depths
|
|
|
+ self.depths[name] = data['data']
|
|
|
+ # 允许交易
|
|
|
+ if self.mode_signal == 0 and self.ready and flag:
|
|
|
+ ### 更新交易数据
|
|
|
+ self.update_trade_msg()
|
|
|
+ ### 触发事件撤单逻辑
|
|
|
+ # 更新策略时间
|
|
|
+ self.strategy.local_time = time.time()
|
|
|
+ # 产生交易信号
|
|
|
+ orders = self.strategy.onTime(self.tradeMsg)
|
|
|
+ ### 记录指令触发信息
|
|
|
+ if self._not_empty(orders):
|
|
|
+ self.logger.debug("触发onTick")
|
|
|
+ self._update_local_orders(orders)
|
|
|
+ self.loop.create_task(self.rest.handle_signals(orders))
|
|
|
+ self.logger.debug(orders)
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # @timeit
|
|
|
+ async def real_time_back_test(self, data):
|
|
|
+ '''
|
|
|
+ 按照长短期回测利润选择参数
|
|
|
+ 优先按长期回测利润选参数 如果找不到就
|
|
|
+ 再按短期回测利润选参数 如果还找不到就
|
|
|
+ 使用默认参数 如果默认参数亏损就触发冷静期
|
|
|
+ '''
|
|
|
+ now_time = time.time()
|
|
|
+ await asyncio.sleep(0.005)
|
|
|
+ for i in self.backtest_tasks:
|
|
|
+ i["backtest_engine"].backtest_time = now_time
|
|
|
+ i["backtest_engine"].run_by_tick(data)
|
|
|
+
|
|
|
+ def choose_params(self):
|
|
|
+ '''
|
|
|
+ 按照长短期回测利润选择参数
|
|
|
+ 优先按长期回测利润选参数 如果找不到就
|
|
|
+ 再按短期回测利润选参数 如果还找不到就
|
|
|
+ 使用默认参数 如果默认参数亏损就触发冷静期
|
|
|
+ '''
|
|
|
+ profits = []
|
|
|
+ for i in self.backtest_tasks:
|
|
|
+ # 获取绩效信息
|
|
|
+ e = i["backtest_engine"].equity # 最终净值
|
|
|
+ # 计算标准化利润
|
|
|
+ p = (e-self.backtest_start_cash) / self.backtest_start_cash \
|
|
|
+ / self.backtest_look_length * self.tick_profit_to_daily
|
|
|
+ # 有一定成交次数的回测结果才有代表性 持仓太久的参数禁止使用
|
|
|
+ _trade_num = i['backtest_engine'].trade_num
|
|
|
+ _avg_hold_time = i['backtest_engine'].avg_hold_time
|
|
|
+ _equity_high = i['backtest_engine'].equity_high
|
|
|
+ # 排除交易次数太少的参数
|
|
|
+ if i['open'] <= 0.002:
|
|
|
+ if _trade_num < 10:
|
|
|
+ p = 0.0
|
|
|
+ # 排除长期持仓的参数
|
|
|
+ if _avg_hold_time > 600:
|
|
|
+ p = 0.0
|
|
|
+ # 排除近期回撤较大的参数
|
|
|
+ if _equity_high > e*1.01:
|
|
|
+ p = 0.0
|
|
|
+ profits.append(p) #利润
|
|
|
+ ############## 重置回测
|
|
|
+ # if _trade_num > 200:
|
|
|
+ # i["backtest_engine"].trade_num = 0
|
|
|
+ # i["backtest_engine"].equity = self.backtest_start_cash
|
|
|
+ # 盈利参数个数不能太少 防止孤岛参数
|
|
|
+ win_num = 0
|
|
|
+ for i in profits:
|
|
|
+ if i > 0.0:
|
|
|
+ win_num += 1
|
|
|
+ cond1 = win_num > self.backtest_num*0.1
|
|
|
+ cond2 = win_num > 2
|
|
|
+ cond_win = cond1 and cond2
|
|
|
+ if cond_win:
|
|
|
+ # 按最优回测结果调整参数
|
|
|
+ max_profit = max(profits)
|
|
|
+ max_index = profits.index(max_profit)
|
|
|
+ self.strategy.trade_open_dist = self.backtest_tasks[max_index]["open"]
|
|
|
+ self.strategy.trade_close_dist = self.backtest_tasks[max_index]["close"]
|
|
|
+ self.strategy.ref_index = self.backtest_tasks[max_index]["index"]
|
|
|
+ self.strategy.post_side = self.backtest_tasks[max_index]["side"]
|
|
|
+ self.strategy.predict_alpha = self.backtest_tasks[max_index]["alpha"]
|
|
|
+ # 检查是否需要关闭回测
|
|
|
+ # if self.strategy.ready == 1:
|
|
|
+ # self.backtest = 0
|
|
|
+ else:
|
|
|
+ # 如果没有符合条件的盈利参数
|
|
|
+ self.strategy.trade_open_dist = 0.01
|
|
|
+ self.strategy.trade_close_dist = 0.00001
|
|
|
+ self.strategy.ref_index = 0
|
|
|
+ self.strategy.post_side = 0
|
|
|
+ self.strategy.predict_alpha = 0
|
|
|
+ # 检查是否需要关闭回测
|
|
|
+ # if self.strategy.ready == 1:
|
|
|
+ # self.backtest = 0
|
|
|
+ # self.exit_msg = "未找到合适参数 停机"
|
|
|
+ # self.stop()
|
|
|
+ return
|
|
|
+
|
|
|
+
|
|
|
+ def update_equity(self, data):
|
|
|
+ '''
|
|
|
+ 更新保证金信息
|
|
|
+ 合约一直更新
|
|
|
+ 现货只有当出现异常时更新
|
|
|
+ '''
|
|
|
+ if "spot" in self.exchange:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ self.local_cash = data[self.quote] * self.used_pct
|
|
|
+
|
|
|
+ def update_exit(self, data):
|
|
|
+ '''
|
|
|
+ 底层触发停机
|
|
|
+ '''
|
|
|
+ self.exit_msg = data
|
|
|
+ self.stop()
|
|
|
+
|
|
|
+ def get_all_market_data(self):
|
|
|
+ '''
|
|
|
+ 只能定时触发
|
|
|
+ 组合市场信息=交易盘口+参考盘口
|
|
|
+ '''
|
|
|
+ market = []
|
|
|
+ data = self.ws._get_data()["data"]
|
|
|
+ market += data
|
|
|
+ for i in self.ref_name:
|
|
|
+ data = self.ws_ref[i]._get_data()["data"]
|
|
|
+ market += data
|
|
|
+ # handle save real market data
|
|
|
+ if self.save:
|
|
|
+ with open(f'./{self.csvname}.csv',
|
|
|
+ 'a',
|
|
|
+ newline='',
|
|
|
+ encoding='utf-8') as f:
|
|
|
+ writer = csv.writer(f, delimiter=',')
|
|
|
+ writer.writerow(market)
|
|
|
+ return market
|
|
|
+
|
|
|
+ async def before_trade(self):
|
|
|
+ ####### 启动ws #######
|
|
|
+ # 启动交易ws
|
|
|
+ # 当开启回测时才订阅交易盘口的成交流
|
|
|
+ _sub_trade = int(self.params.backtest)
|
|
|
+ _sub_fast = int(self.params.fast)
|
|
|
+ self.loop.create_task(self.ws.run(is_auth=1, sub_trade=_sub_trade, sub_fast=0))
|
|
|
+ for i in self.ref_name:
|
|
|
+ # 启动参考ws 参考盘口使用fast行情性能消耗更大 使用普通行情可以节省性能
|
|
|
+ self.loop.create_task(self.ws_ref[i].run(is_auth=0, sub_trade=0, sub_fast=_sub_fast))
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ ###### 做交易前准备工作 ######
|
|
|
+ # 买入平台币
|
|
|
+ await self.rest.buy_token()
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ # 清空挂单和仓位
|
|
|
+ await self.rest.check_position(hold_coin=self.hold_coin)
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ # 获取市场信息
|
|
|
+ await self.rest.before_trade()
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ # 获取价格信息
|
|
|
+ ticker = await self.rest.get_ticker()
|
|
|
+ mp = ticker['mp']
|
|
|
+ # 获取账户信息
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ await self.rest.get_equity()
|
|
|
+ # 初始资金
|
|
|
+ start_cash = self.rest.cash_value * self.used_pct
|
|
|
+ start_coin = self.rest.coin_value * self.used_pct
|
|
|
+ if start_cash == 0.0 and start_coin == 0.0:
|
|
|
+ self.exit_msg = f"初始为零 cash: {start_cash} coin: {start_coin}"
|
|
|
+ self.stop()
|
|
|
+ self.logger.info(f"初始cash: {start_cash} 初始coin: {start_coin}")
|
|
|
+ # 初始化策略基础信息
|
|
|
+ if isinstance(mp, float):
|
|
|
+ if mp <= 0.0:
|
|
|
+ self.exit_msg = f"初始价格获取错误 {mp}"
|
|
|
+ self.stop()
|
|
|
+ else:
|
|
|
+ print(f"初始价格为 {mp}")
|
|
|
+ else:
|
|
|
+ self.exit_msg = f"初始价格获取错误 {mp}"
|
|
|
+ self.stop()
|
|
|
+ self.strategy.mp = mp
|
|
|
+ self.strategy.start_cash = start_cash
|
|
|
+ self.strategy.start_coin = start_coin
|
|
|
+ self.strategy.start_equity = start_cash + start_coin * mp
|
|
|
+ self.strategy.max_equity = self.strategy.start_equity
|
|
|
+ self.strategy.equity = self.strategy.start_equity
|
|
|
+ self.strategy.total_amount = self.strategy.equity * self.strategy.leverrate / self.strategy.mp
|
|
|
+ self.strategy.stepSize = self.rest.stepSize if self.rest.stepSize < 1.0 else int(self.rest.stepSize)
|
|
|
+ self.strategy.tickSize = self.rest.tickSize if self.rest.tickSize < 1.0 else int(self.rest.tickSize)
|
|
|
+ if self.strategy.stepSize == None or self.strategy.tickSize == None:
|
|
|
+ self.exit_msg = f"交易精度未正常获取 stepsize: {self.strategy.stepSize} ticksize: {self.strategy.tickSize}"
|
|
|
+ self.stop()
|
|
|
+ else:
|
|
|
+ self.logger.info(f"数量精度{self.strategy.stepSize}")
|
|
|
+ self.logger.info(f"价格精度{self.strategy.tickSize}")
|
|
|
+ grid = float(self.params.grid)
|
|
|
+ if "spot" in self.exchange:
|
|
|
+ long_one_hand_value = start_cash * float(self.params.leverrate) / grid
|
|
|
+ short_one_hand_value = start_coin * mp * float(self.params.leverrate) / grid
|
|
|
+ long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
|
|
|
+ short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
|
|
|
+ else:
|
|
|
+ long_one_hand_value = start_cash * float(self.params.leverrate) / grid
|
|
|
+ short_one_hand_value = start_cash * float(self.params.leverrate) / grid
|
|
|
+ long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
|
|
|
+ short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
|
|
|
+ # 检查是否满足最低交易要求
|
|
|
+ print(f"最低单手交易下单量为 buy: {long_one_hand_amount} sell: {short_one_hand_amount}")
|
|
|
+ if (long_one_hand_amount == 0 and short_one_hand_amount == 0) or (long_one_hand_value < 20 and short_one_hand_value < 20):
|
|
|
+ self.exit_msg = f"初始下单量太少 buy: {long_one_hand_amount} sell: {short_one_hand_amount}"
|
|
|
+ self.stop()
|
|
|
+ # 初始化调度器
|
|
|
+ self.local_cash = start_cash
|
|
|
+ self.local_coin = start_coin
|
|
|
+ # 配置在线训练
|
|
|
+ if self.backtest:
|
|
|
+ # 设置策略默认参数
|
|
|
+ self.strategy.trade_close_dist = 0.00001
|
|
|
+ self.strategy.trade_open_dist = 0.01
|
|
|
+ self.backtest_look_length = 86400 / self.interval # 回测区间足够长
|
|
|
+ self.backtest_tasks = list()
|
|
|
+ self.tick_profit_to_daily = (86400/self.interval)
|
|
|
+ self.backtest_start_cash = 1000000.0
|
|
|
+ # 备选参数
|
|
|
+ open_list, close_list, alpha_list = utils.get_backtest_set(self.base)
|
|
|
+ if 'spot' in self.exchange:
|
|
|
+ side_list = []
|
|
|
+ if long_one_hand_amount > 0:
|
|
|
+ side_list.append(1)
|
|
|
+ if short_one_hand_amount > 0:
|
|
|
+ side_list.append(-1)
|
|
|
+ if 1 in side_list and -1 in side_list:
|
|
|
+ side_list.append(0)
|
|
|
+ else:
|
|
|
+ side_list = [-1,0,1]
|
|
|
+ side_list_allow = []
|
|
|
+ for s in side_list:
|
|
|
+ if s in utils.POST_SIDE_LIMIT:
|
|
|
+ side_list_allow.append(s)
|
|
|
+ side_list = side_list_allow
|
|
|
+ for _open in open_list:
|
|
|
+ for _side in side_list:
|
|
|
+ for _close in close_list[_open]:
|
|
|
+ for _index in range(self.ref_num):
|
|
|
+ for _alpha in alpha_list:
|
|
|
+ task = dict()
|
|
|
+ st = strategy.Strategy(self.params, is_print=0)
|
|
|
+ st.leverrate = 1.0
|
|
|
+ st.trade_open_dist = _open
|
|
|
+ st.trade_close_dist = _close
|
|
|
+ st.predict_alpha = _alpha
|
|
|
+ st.ref_index = _index
|
|
|
+ st.post_side = _side
|
|
|
+ st.exchange = "dummy_usdt_swap"
|
|
|
+ st.local_start_time = 0.0
|
|
|
+ bt = backtest.Backtest(st, is_plot=0)
|
|
|
+ bt.start_cash = self.backtest_start_cash
|
|
|
+ task["backtest_engine"] = bt
|
|
|
+ task["open"] = _open
|
|
|
+ task["close"] = _close
|
|
|
+ task["index"] = _index
|
|
|
+ task["side"] = _side
|
|
|
+ task["alpha"] = _alpha
|
|
|
+ self.backtest_tasks.append(task)
|
|
|
+ backtest_num = len(self.backtest_tasks)
|
|
|
+ self.backtest_num = backtest_num
|
|
|
+ self.logger.info(f'在线模拟撮合数量{backtest_num}')
|
|
|
+ self.logger.info(f'当前为在线训练模式 需预热{utils.BACKTEST_PREHOT_SECOND}秒 请耐心等候...')
|
|
|
+ else:
|
|
|
+ self.logger.info('当前为指定参数模式...')
|
|
|
+ ###### 交易前准备就绪 可以开始交易 ######
|
|
|
+ self.loop.create_task(self.rest.go())
|
|
|
+ self.loop.create_task(self.on_timer())
|
|
|
+ self.loop.create_task(self._run_server())
|
|
|
+ self.loop.create_task(self.run_stratey())
|
|
|
+ #self.loop.create_task(self.post_loop()) #改
|
|
|
+ self.loop.create_task(self.early_stop_loop())
|
|
|
+
|
|
|
+ def update_trade_msg(self):
|
|
|
+ # 更新保证金
|
|
|
+ self.tradeMsg.cash = round(self.local_cash,10)
|
|
|
+ self.tradeMsg.coin = round(self.local_coin,10)
|
|
|
+ # 使用本地推算仓位
|
|
|
+ self.tradeMsg.position = self.local_position_by_orders
|
|
|
+ # 更新订单
|
|
|
+ self.tradeMsg.orders = self.local_orders
|
|
|
+ ### 更新 ref
|
|
|
+ ref_tickers = []
|
|
|
+ for i in self.ref_name:
|
|
|
+ ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']])
|
|
|
+ self.tradeMsg.ref_price = self.Predictor.Get_ref(ref_tickers)
|
|
|
+
|
|
|
+ async def server_handle(self, request):
|
|
|
+ '''中控数据接口'''
|
|
|
+ if 'spot' in self.exchange:
|
|
|
+ pos = self.local_position_by_orders.longPos - self.local_position_by_orders.shortPos
|
|
|
+ else:
|
|
|
+ pos = self.local_position.longPos - self.local_position.shortPos
|
|
|
+ if pos > 0.0:
|
|
|
+ entryPrice = self.local_position_by_orders.longAvg
|
|
|
+ elif pos < 0.0:
|
|
|
+ entryPrice = self.local_position_by_orders.shortAvg
|
|
|
+ else:
|
|
|
+ entryPrice = 0
|
|
|
+ return web.Response(body=json.dumps({
|
|
|
+ "now_balance": round(self.strategy.equity/self.used_pct, 4), #钱包余额
|
|
|
+ "unrealized_pn_l": round(self.local_profit, 4), #未实现盈利
|
|
|
+ "pos": round(pos, 8), #持仓数量
|
|
|
+ "entry_price": round(entryPrice, 8), #开仓价格
|
|
|
+ "now_price": round(self.strategy.mp, 8), #当前价格
|
|
|
+ }))
|
|
|
+
|
|
|
+ async def change(self, request):
|
|
|
+ '''中控台修改参数'''
|
|
|
+ try:
|
|
|
+ data = await request.json()
|
|
|
+ if "stop" in data:
|
|
|
+ self.logger.warning('中控停机')
|
|
|
+ self.exit_msg = '中控停机'
|
|
|
+ self.stop()
|
|
|
+ return web.Response(text=f"停机成功")
|
|
|
+
|
|
|
+ ip = request.remote
|
|
|
+ print(f'从{ip}收到更新参数请求',data)
|
|
|
+ if isinstance(data, str):
|
|
|
+ data = json.loads(data)
|
|
|
+
|
|
|
+ if self.backtest == 1:
|
|
|
+ return web.Response(text="自动调参模式不允许手动修改参数")
|
|
|
+ else:
|
|
|
+ open = float(data['open'])
|
|
|
+ close = float(data['close'])
|
|
|
+ self.strategy.trade_open_dist = open
|
|
|
+ self.strategy.trade_close_dist = close
|
|
|
+ return web.Response(text=f"参数修改成功 {open} {close}")
|
|
|
+ except Exception as e:
|
|
|
+ return web.Response(text=f"参数修改失败 {e}")
|
|
|
+
|
|
|
+ # @utils.timeit
|
|
|
+ def check_risk(self):
|
|
|
+ '''检查风控'''
|
|
|
+ if self.strategy.start_cash == 0.0:
|
|
|
+ print("请检查交易账户余额")
|
|
|
+ return 0
|
|
|
+ if isinstance(self.strategy.mp, float):
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ print("请检查最新价格")
|
|
|
+ return 0
|
|
|
+ ############
|
|
|
+ # print("当前线程数",self.process.num_threads())
|
|
|
+ ###### 资源风控0 ######
|
|
|
+ cpu_pct = psutil.cpu_times_percent().user
|
|
|
+ self.cpu_ema = self.cpu_ema * 0.8 + cpu_pct * 0.2
|
|
|
+ # print(f"cpu占用 {cpu_pct}")
|
|
|
+ if self.cpu_ema > 95:
|
|
|
+ msg = f"cpu占用过高 {self.cpu_ema} 准备停机"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ mm_pct = psutil.virtual_memory().percent
|
|
|
+ self.mm_ema = self.mm_ema * 0.8 + mm_pct * 0.2
|
|
|
+ # print(f"内存占用 {mm_pct}")
|
|
|
+ if self.mm_ema > 95:
|
|
|
+ msg = f"内存占用过高 {self.mm_ema} 准备停机"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 回撤风控1 ######
|
|
|
+ if "spot" not in self.exchange:
|
|
|
+ draw_back = 1-self.strategy.equity/self.strategy.max_equity
|
|
|
+ if draw_back > self.stoploss:
|
|
|
+ msg = f"{self.acct_name} 总资金吊灯回撤{draw_back} 当前{self.strategy.equity} 最高{self.strategy.max_equity} 触发止损 准备停机"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 回撤风控2 ######
|
|
|
+ draw_back = self.local_profit/self.strategy.start_equity
|
|
|
+ if draw_back < -self.stoploss:
|
|
|
+ msg = f"{self.acct_name} 交易亏损 触发止损 准备停机"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 报单延迟风控 ######
|
|
|
+ if self.rest.avg_delay > 5000: # 平均延迟允许上限 5000ms
|
|
|
+ msg = f"{self.acct_name} 延迟爆表 触发风控 准备停机"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 仓位异常风控 ######
|
|
|
+ ### 合约60秒更新一次绝对仓位 ###
|
|
|
+ # 连续5分钟仓位不正确就停机
|
|
|
+ # 5 * 60 = 300 300/10 = 30
|
|
|
+ diff_pos = max(abs(self.local_position.longPos - self.local_position_by_orders.longPos),abs(self.local_position.shortPos - self.local_position_by_orders.shortPos))
|
|
|
+ if "spot" not in self.exchange:
|
|
|
+ diff_pos_value = diff_pos * self.strategy.mp
|
|
|
+ if diff_pos_value > self.strategy._min_amount_value:
|
|
|
+ msg = f"{self.acct_name} ***发现仓位异常*** 推算{self.local_position_by_orders.__dict__} 本地{self.local_position.__dict__}"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.position_check_series.append(1)
|
|
|
+ else:
|
|
|
+ self.position_check_series.append(0)
|
|
|
+ if len(self.position_check_series) > 30:
|
|
|
+ del(self.position_check_series[0])
|
|
|
+ if sum(self.position_check_series) >= 30:
|
|
|
+ msg = f"{self.acct_name} 合约连续检查本地仓位和推算仓位不相符 退出"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 下单异常风控 ######
|
|
|
+ if self.strategy.total_amount == 0.0:
|
|
|
+ msg = f"{self.acct_name} 开仓量为零 退出"
|
|
|
+ print(msg)
|
|
|
+ self.logger.warning(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 行情更新异常风控 ######
|
|
|
+ for name in self.ref_name:
|
|
|
+ delay = round((time.time() - self.market_update_time[name]) * 1000, 3)
|
|
|
+ if delay > utils.MARKET_DELAY_LIMIT: # thre
|
|
|
+ msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ for name in [self.trade_name]:
|
|
|
+ delay = round((time.time() - self.market_update_time[name]) * 1000, 3)
|
|
|
+ if delay > utils.MARKET_DELAY_LIMIT: # thre
|
|
|
+ msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 订单异常风控 ######
|
|
|
+ for cid in self.local_orders:
|
|
|
+ if time.time() - self.local_orders[cid]["localtime"] > 300: # 订单长时间停留 怀疑漏单 但未必一定漏 5min
|
|
|
+ msg = f"{self.acct_name} cid:{cid} 订单停留过久 怀疑异常 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 持仓均价异常风控 ######
|
|
|
+ if isinstance(self.strategy.long_pos_bias, float):
|
|
|
+ # 偏离mp较大 且持仓较大 说明出现异常
|
|
|
+ if self.strategy.long_hold_value > 2*self.strategy._min_amount_value:
|
|
|
+ if self.strategy.long_pos_bias > 4.0 or self.strategy.long_pos_bias < -2.0:
|
|
|
+ msg = f"{self.acct_name} long_pos_bias:{self.strategy.long_pos_bias} 持仓均价异常 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ if isinstance(self.strategy.short_pos_bias, float):
|
|
|
+ # 偏离mp较大 且持仓较大 说明出现出现异常
|
|
|
+ if self.strategy.short_hold_value > 2*self.strategy._min_amount_value:
|
|
|
+ if self.strategy.short_pos_bias > 4.0 or self.strategy.short_pos_bias < -2.0:
|
|
|
+ msg = f"{self.acct_name} short_pos_bias:{self.strategy.short_pos_bias} 持仓均价异常 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 订单撤单异常风控 ######
|
|
|
+ for cid in self.local_cancel_log:
|
|
|
+ if self.local_cancel_log[cid] > 300:
|
|
|
+ msg = f"{self.acct_name} 订单长时间无法撤销 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ ###### 定价异常风控 ######
|
|
|
+ if abs(self.strategy.ref_price-self.strategy.mp)/self.strategy.mp > 0.03:
|
|
|
+ msg = f"{self.acct_name} 定价偏离过大 怀疑异常 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+
|
|
|
+ async def exit(self, delay=0):
|
|
|
+ '''退出操作'''
|
|
|
+ try:
|
|
|
+ self.logger.info(f"预约退出操作 delay:{delay}")
|
|
|
+ if delay > 0:
|
|
|
+ await asyncio.sleep(delay)
|
|
|
+ self.logger.info(f"开始退出操作")
|
|
|
+ self.logger.info("为避免api失效导致遗漏仓位 建议人工复查")
|
|
|
+ await self.rest.check_position(hold_coin=self.hold_coin)
|
|
|
+ # stop flag
|
|
|
+ self.rest.stop_flag = 1
|
|
|
+ self.ws.stop_flag = 1
|
|
|
+ for i in self.ref_name:
|
|
|
+ self.ws_ref[i].stop_flag = 1
|
|
|
+ # double check 需要延迟几秒以便等待更新数据
|
|
|
+ await asyncio.sleep(3)
|
|
|
+ self.logger.info("双重检查遗漏仓位")
|
|
|
+ await self.rest.check_position(hold_coin=self.hold_coin)
|
|
|
+ self.logger.info(f'停机退出 停机原因 {self.exit_msg}')
|
|
|
+ await asyncio.sleep(1)
|
|
|
+ # 发送交易状态
|
|
|
+ await self._post_params()
|
|
|
+ # 压缩行情文件
|
|
|
+ utils.csv_to_gz_and_remove()
|
|
|
+ # close pid
|
|
|
+ self.logger.info("退出进程")
|
|
|
+ except:
|
|
|
+ self.logger.error(traceback.format_exc())
|
|
|
+ finally:
|
|
|
+ os._exit(0)
|
|
|
+
|
|
|
+ async def on_timer(self):
|
|
|
+ '''定期触发系统逻辑'''
|
|
|
+ await asyncio.sleep(20)
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ # 10秒检查一次风控
|
|
|
+ await asyncio.sleep(10)
|
|
|
+ # 检查风控
|
|
|
+ self.check_risk()
|
|
|
+ # stop
|
|
|
+ if self.mode_signal == 1:return
|
|
|
+ # 计算预估成交额
|
|
|
+ total_trade_value = self.local_buy_value + self.local_sell_value
|
|
|
+ self.strategy.trade_vol_24h = round(total_trade_value / (time.time()-self.pid_start_time) * 86400 / 10000, 2)
|
|
|
+ # 打印
|
|
|
+ if int(self.params.log):
|
|
|
+ self.strategy._print_summary()
|
|
|
+ # 打印行情延迟监控
|
|
|
+ self.logger.info('Rest 报单平均延迟 ' + str(self.rest.avg_delay) + 'ms ')
|
|
|
+ self.logger.info('Rest 报单最高延迟 ' + str(self.rest.max_delay) + 'ms ')
|
|
|
+ for name in self.market_update_interval:
|
|
|
+ avg_interval = round(self.market_update_interval[name]*1e3, 2)
|
|
|
+ self.logger.info(f'WS 盘口{name}行情 平均更新间隔 {avg_interval}ms')
|
|
|
+ # 选择参数
|
|
|
+ if self.backtest:
|
|
|
+ self.choose_params()
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ print('定期循环任务取消')
|
|
|
+ except:
|
|
|
+ print("定时循环系统出错")
|
|
|
+ self.logger.error(traceback.print_exc())
|
|
|
+ await asyncio.sleep(10)
|
|
|
+
|
|
|
+ async def _post_params(self):
|
|
|
+ '''推送交易信息'''
|
|
|
+ profit = round(self.strategy.daily_return/self.strategy.leverrate,4)
|
|
|
+ if time.time() - self.pid_start_time > utils.EARLY_STOP_SECOND * 0.5 or profit < 0.0:
|
|
|
+ await utils._post_params(
|
|
|
+ "http://wwww.khods.com:8888/post_params",
|
|
|
+ self.params.proxy,
|
|
|
+ ujson.dumps({
|
|
|
+ "pwd":"123456",
|
|
|
+ "exchange":self.params.exchange,
|
|
|
+ "pair":self.params.pair,
|
|
|
+ "open":self.params.open,
|
|
|
+ "close":self.params.close,
|
|
|
+ "refexchange":self.params.refexchange[self.strategy.ref_index],
|
|
|
+ "profit":profit,
|
|
|
+ })
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ self.logger.info("不满足推送过滤条件 放弃推送参数")
|
|
|
+
|
|
|
+ async def post_loop(self):
|
|
|
+ '''定期触发交易信息推送'''
|
|
|
+ await asyncio.sleep(30)
|
|
|
+ _interval = 60 # 定期推送一次盈利情况
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ # 定期推送一次
|
|
|
+ await asyncio.sleep(_interval)
|
|
|
+ # 发送交易状态
|
|
|
+ await self._post_params()
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ print('post loop 循环任务取消')
|
|
|
+ except:
|
|
|
+ print("post loop 循环系统出错")
|
|
|
+ self.logger.error(traceback.print_exc())
|
|
|
+ await asyncio.sleep(10)
|
|
|
+
|
|
|
+ async def early_stop_loop(self):
|
|
|
+ '''定期触发交易信息推送'''
|
|
|
+ if self.father:
|
|
|
+ self.logger.info(f'以父进程方式启动 关闭早停检测')
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ self.logger.info(f'以子进程方式启动 开启早停检测')
|
|
|
+ await asyncio.sleep(30)
|
|
|
+ _interval = utils.EARLY_STOP_SECOND
|
|
|
+ _last_equity = self.strategy.start_equity
|
|
|
+ _last_local_profit = 0.0
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ # 休眠
|
|
|
+ await asyncio.sleep(_interval)
|
|
|
+ ###### 子进场早停风控 ######
|
|
|
+ self.logger.info(f'当前净值{self.strategy.equity} 上次检测时净值{_last_equity} 当前累积利润{self.local_profit} 上次检测时利润{_last_local_profit}')
|
|
|
+ # 检查是否需要早停 没有成交 或者 亏损
|
|
|
+ if self.strategy.equity <= _last_equity or self.local_profit <= _last_local_profit:
|
|
|
+ self.logger.info('触发早停条件 当零持仓时退出')
|
|
|
+ # 没有持仓
|
|
|
+ for _ in range(30):
|
|
|
+ await asyncio.sleep(5)
|
|
|
+ if self.strategy.long_hold_value < self.strategy._min_amount_value and \
|
|
|
+ self.strategy.short_hold_value < self.strategy._min_amount_value:
|
|
|
+ msg = f"{self.acct_name} 子进程盈利状况不理想 提前停机 退出"
|
|
|
+ self.logger.error(msg)
|
|
|
+ self.exit_msg = msg
|
|
|
+ self.stop()
|
|
|
+ # 更新上一次检测的净值
|
|
|
+ _last_equity = self.strategy.equity
|
|
|
+ _last_local_profit = self.local_profit
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ print('early stop 循环任务取消')
|
|
|
+ except:
|
|
|
+ print("early stop 循环系统出错")
|
|
|
+ self.logger.error(traceback.print_exc())
|
|
|
+ await asyncio.sleep(10)
|
|
|
+
|
|
|
+ def on_agg_market(self):
|
|
|
+ '''
|
|
|
+ 处理聚合行情
|
|
|
+ 1. 获取聚合行情
|
|
|
+ 2. 更新预测器
|
|
|
+ 3. 触发tick回测
|
|
|
+ '''
|
|
|
+ ### 更新聚合市场数据
|
|
|
+ agg_market = self.get_all_market_data()
|
|
|
+ ### 更新聚合市场信息
|
|
|
+ self.tradeMsg.market = agg_market
|
|
|
+ ### 更新预测器
|
|
|
+ self.Predictor.onTime(agg_market)
|
|
|
+ ### 触发回测
|
|
|
+ if self.backtest:
|
|
|
+ self.loop.create_task(self.real_time_back_test(self.tradeMsg))
|
|
|
+
|
|
|
+ async def run_stratey(self):
|
|
|
+ '''
|
|
|
+ 定期触发策略
|
|
|
+ '''
|
|
|
+ print('定时触发器启动')
|
|
|
+ # 准备交易
|
|
|
+ try:
|
|
|
+ print('前期准备完成')
|
|
|
+ await asyncio.sleep(10)
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ # 时间预设
|
|
|
+ start_time = time.time()
|
|
|
+ ### 是否准备充分
|
|
|
+ if self.ready:
|
|
|
+ ### 更新交易信息集合
|
|
|
+ self.update_trade_msg()
|
|
|
+ ### 触发策略
|
|
|
+ if self.mode_signal == 0:
|
|
|
+ pass
|
|
|
+ # # 更新策略时间
|
|
|
+ # self.strategy.local_time = time.time()
|
|
|
+ # # 产生信号
|
|
|
+ # orders = self.strategy.onTime(self.tradeMsg)
|
|
|
+ # ### 记录指令触发信息
|
|
|
+ # if self._not_empty(orders):
|
|
|
+ # self.logger.debug("触发onTime")
|
|
|
+ # self._update_local_orders(orders)
|
|
|
+ # self.loop.create_task(self.rest.handle_signals(orders))
|
|
|
+ # self.logger.debug(orders)
|
|
|
+ else:
|
|
|
+ if self.mode_signal > 1:self.mode_signal -= 1
|
|
|
+ if self.mode_signal == 1:return
|
|
|
+ # 触发策略
|
|
|
+ # 更新策略时间
|
|
|
+ self.strategy.local_time = time.time()
|
|
|
+ # 获取信号
|
|
|
+ if self.mode_signal > 20:
|
|
|
+ # 先执行onExit
|
|
|
+ orders = self.strategy.onExit(self.tradeMsg)
|
|
|
+ ### 记录指令触发信息
|
|
|
+ if self._not_empty(orders):
|
|
|
+ self.logger.debug("触发onExit")
|
|
|
+ self._update_local_orders(orders)
|
|
|
+ self.loop.create_task(self.rest.handle_signals(orders))
|
|
|
+ self.logger.debug(orders)
|
|
|
+ else:
|
|
|
+ # 再执行onSleep
|
|
|
+ orders = self.strategy.onSleep(self.tradeMsg)
|
|
|
+ ### 记录指令触发信息
|
|
|
+ if self._not_empty(orders):
|
|
|
+ self.logger.debug("触发onSleep")
|
|
|
+ self._update_local_orders(orders)
|
|
|
+ self.loop.create_task(self.rest.handle_signals(orders))
|
|
|
+ self.logger.debug(orders)
|
|
|
+ ############################################################
|
|
|
+ else:
|
|
|
+ self.check_ready()
|
|
|
+ ### 计算耗时并进行休眠
|
|
|
+ pass_time = time.time()-start_time
|
|
|
+ await asyncio.sleep(utils.clip(self.interval-pass_time, 0.0, 1.0))
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ print('策略触发任务取消')
|
|
|
+ except:
|
|
|
+ self.logger.error(traceback.format_exc())
|
|
|
+ traceback.print_exc()
|
|
|
+ await asyncio.sleep(10)
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ print('策略触发任务取消')
|
|
|
+ except:
|
|
|
+ self.logger.error(traceback.format_exc())
|
|
|
+ traceback.print_exc()
|
|
|
+ await asyncio.sleep(10)
|
|
|
+
|
|
|
+ def check_ready(self):
|
|
|
+ '''
|
|
|
+ 判断初始数据是否齐全
|
|
|
+ '''
|
|
|
+ ### 检查 ticker 行情
|
|
|
+ for i in self.ref_name:
|
|
|
+ if i not in self.tickers or self.tickers[i] == {}:
|
|
|
+ print("参考盘口ticker未准备好")
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ if self.tickers[i]['bp'] == 0 or self.tickers[i]['ap'] == 0:
|
|
|
+ print("参考盘口ticker未准备好")
|
|
|
+ return
|
|
|
+ if self.trade_name not in self.tickers or self.tickers[self.trade_name] == {}:
|
|
|
+ print("交易盘口ticker未准备好")
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0:
|
|
|
+ print("交易盘口ticker未准备好")
|
|
|
+ return
|
|
|
+ ### 检查 market 行情
|
|
|
+ all_market = self.get_all_market_data()
|
|
|
+ if len(all_market) != utils.LEN*(1+self.ref_num):
|
|
|
+ print("聚合行情未准备好")
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ # 如果行情已经就绪 预热trademsg和predictor
|
|
|
+ print("聚合行情准备就绪")
|
|
|
+ self.tradeMsg.market = all_market
|
|
|
+ self.Predictor.onTime(all_market)
|
|
|
+ self.ready = 1
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ '''
|
|
|
+ 停机函数
|
|
|
+ mode_signal 不能小于80
|
|
|
+ 前6秒用于maker平仓
|
|
|
+ 后2秒用于撤maker平仓单
|
|
|
+ 休眠2秒再执行check_position 避免卡单导致漏仓位
|
|
|
+ '''
|
|
|
+ self.logger.info(f'进入停机流程...')
|
|
|
+ self.mode_signal = 80
|
|
|
+ # 等strategy onExit 彻底执行完毕 进入沉默状态之后 再进入exit 否则可能导致多处同时操作订单
|
|
|
+ # 尽量减少大仓位直接take平
|
|
|
+ self.loop.create_task(self.exit(delay=10))
|
|
|
+
|
|
|
+ async def _run_server(self):
|
|
|
+ print('server正在启动...')
|
|
|
+ for _ in range(30):
|
|
|
+ await asyncio.sleep(5)
|
|
|
+ if self.strategy.equity > 0.0:break
|
|
|
+ app = web.Application()
|
|
|
+ app.router.add_route('GET', '/account', self.server_handle)
|
|
|
+ app.router.add_route('POST', '/change', self.change)
|
|
|
+ try:
|
|
|
+ self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False))
|
|
|
+ except:
|
|
|
+ self.logger.error(f"Server启动失败")
|
|
|
+ self.logger.error(traceback.format_exc())
|
|
|
+ self.exit_msg = "服务启动失败 停机退出"
|
|
|
+ self.stop()
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ '''启动ws行情获取'''
|
|
|
+
|
|
|
+ def keyboard_interrupt(s, f):
|
|
|
+ self.logger.info("收到退出信号 准备关机")
|
|
|
+ self.stop()
|
|
|
+
|
|
|
+ try:
|
|
|
+ signal.signal(signal.SIGINT, keyboard_interrupt)
|
|
|
+ signal.signal(signal.SIGTERM, keyboard_interrupt)
|
|
|
+ if 'win' not in sys.platform:
|
|
|
+ signal.signal(signal.SIGKILL, keyboard_interrupt)
|
|
|
+ signal.signal(signal.SIGQUIT, keyboard_interrupt)
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ self.loop.create_task(self.before_trade())
|
|
|
+
|
|
|
+ print(f'判断启动方式...')
|
|
|
+ if self.father:
|
|
|
+ print('以父进程方式启动 最大允许运行时间为30天')
|
|
|
+ self.loop.create_task(self.exit(delay=60*60*24*30))
|
|
|
+ else:
|
|
|
+ print('以子进程方式启动 最大允许运行时间为60分钟')
|
|
|
+ self.loop.create_task(self.exit(delay=utils.CHILD_RUN_SECOND))
|
|
|
+
|
|
|
+ self.loop.run_forever()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+
|
|
|
+ if 0:
|
|
|
+ utils.check_auth()
|
|
|
+
|
|
|
+ if 0:
|
|
|
+ utils.check_time()
|
|
|
+
|
|
|
+ pnum = len(sys.argv)
|
|
|
+
|
|
|
+ if pnum > 0:
|
|
|
+ fname = None
|
|
|
+ log_file = None
|
|
|
+ pidnum = None
|
|
|
+ father = 1
|
|
|
+ for i in range(pnum):
|
|
|
+ print(f"第{i}个参数为:{sys.argv[i]}")
|
|
|
+ if sys.argv[i] == '-c' or sys.argv[i] == '--c':
|
|
|
+ fname = sys.argv[i+1]
|
|
|
+ elif sys.argv[i] == '-h':
|
|
|
+ print("帮助文档")
|
|
|
+ elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file':
|
|
|
+ log_file = sys.argv[i+1]
|
|
|
+ elif sys.argv[i] == '-num' or sys.argv[i] == '--num':
|
|
|
+ pidnum = sys.argv[i+1]
|
|
|
+ elif sys.argv[i] == '-v' or sys.argv[i] == '--v':
|
|
|
+ print(f"当前版本为 V{VERSION}")
|
|
|
+ elif sys.argv[i] == '-child' or sys.argv[i] == '--child':
|
|
|
+ father = 0
|
|
|
+ print(f"当前以子进程方式启动")
|
|
|
+ if fname and log_file and pidnum:
|
|
|
+ print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum} father:{father}")
|
|
|
+ date = time.strftime("%Y%m%d", time.localtime())
|
|
|
+ logname = f"{log_file}-{date}"
|
|
|
+ quant = Quant(utils.get_params(fname), logname, father)
|
|
|
+ quant.run()
|
|
|
+ elif fname:
|
|
|
+ print(f"运行指定配置文件{fname}")
|
|
|
+ quant = Quant(utils.get_params(fname),father=father)
|
|
|
+ quant.run()
|
|
|
+ else:
|
|
|
+ print("缺少指定参数 运行默认配置文件")
|
|
|
+ fname = 'config.toml'
|
|
|
+ quant = Quant(utils.get_params(fname),father=father)
|
|
|
+ quant.run()
|
|
|
+ else:
|
|
|
+ fname = 'config.toml'
|
|
|
+ quant = Quant(utils.get_params(fname))
|
|
|
+ quant.run()
|