''' 基于深度信息的异步事件触发回测框架 作者 千千量化 ''' import os import sys import asyncio import json, ujson import requests import hmac import base64 import zlib import datetime import time import strategy as strategy import traceback import joblib import traceback import configparser import signal import random import utils import model import time import copy import random def timeit(func): def wrapper(*args, **kwargs): nowTime = time.time() func(*args, **kwargs) spend_time = time.time() - nowTime spend_time = round(spend_time * 100, 5) print(f'{func.__name__} 耗时 {spend_time} ms') return wrapper class Backtest: ''' 回测框架 ''' def __init__(self, strategy:strategy.Strategy, data=None, is_plot=0): ''' 初始化实盘类 ''' # 获取策略对象 self.strategy = strategy # 历史数据 self.data = data # 交易品种 self.symbol = 'backtest_symbol' # 虚拟挂单 self.localOrders = dict() # 虚拟仓位 self.pos = model.Position() # 中间价 self.mp = None # 行情数据 self.reference_price = [] self.last_marketdata = [] # 费率 maker taker self.fee = model.BacktestFee("v9") # 是否画图 self._is_plot = is_plot self.long_hold = [] self.short_hold = [] self.index = [] # 虚拟成交信息 self.trade_num = 0 # 记录持仓时间 self.kd_time = None self.kk_time = None self.hold_times = [0.0] # 权益 self.start_cash = 1000000.0 self.equity = self.start_cash self.equity_real = self.start_cash self.equity_high = self.start_cash self.balance = [self.start_cash] self.strategy.start_cash = self.start_cash self.strategy.local_start_time = 0.0 # 成交时间轴 self.priceMotion = [] self.kd_trade = [] self.kk_trade = [] self.pd_trade = [] self.pk_trade = [] # 挂单时间轴 self.localOrders_timeseries = [] # min max fill self.fill_price = [] # 上次信号 self.signal_series = dict() # 回测延迟 默认值 self.backtest_delay = utils.BACKTEST_DELAY # 回测时间 self.backtest_time = time.time() # 平均持仓时间 self.avg_hold_time = 0.0 # 最近一次处理的信号时间戳 self.last_signal_time = 0.0 def get_available_close_amt(self): a_pd, a_pk = self.pos.longPos, self.pos.shortPos for i in self.localOrders: if self.localOrders[i]["side"] == 'pd': a_pd -= self.localOrders[i]["amount"] if self.localOrders[i]["side"] == 'pk': a_pk -= self.localOrders[i]["amount"] return a_pd, a_pk # @timeit # 处理策略信号 def handle_signal(self, signals): ''' 当信号>1tick 且时间超过n ms 才允许进行处理 ''' keys = list(signals.keys()) for signal_time in keys: # 只允许处理上一tick时间戳之前的信号 if signal_time < self.last_signal_time: pass else: continue if self.backtest_time - signal_time >= self.backtest_delay: signal = signals[signal_time] if signal == None: return for i in signal: # 撤销虚拟订单 if 'Cancel' in i: if signal[i][0] in self.localOrders: del(self.localOrders[signal[i][0]]) # 执行虚拟下单 elif 'Limits_open' in i: for j in signal[i]: order_event = dict() order_event['symbol'] = self.symbol order_event['status'] = "NEW" order_event['amount'] = float(j[0]) order_event['side'] = j[1] order_event['price'] = float(j[2]) order_event['filled_price'] = 0 order_event['filled'] = 0 order_event['client_id'] = str(random.randint(1,99999)) order_event['order_id'] = str(random.randint(1,99999)) order_event['localtime'] = signal_time order_event['createtime'] = signal_time self.localOrders[order_event['client_id']] = order_event elif 'Limits_close' in i: a_pd, a_pk = self.get_available_close_amt() for j in signal[i]: if j[1] == "pd": if j[0] > a_pd: break elif j[1] == "pk": if j[0] > a_pk: break order_event = dict() order_event['symbol'] = self.symbol order_event['status'] = "NEW" order_event['amount'] = float(j[0]) order_event['side'] = j[1] order_event['price'] = float(j[2]) order_event['filled_price'] = 0 order_event['filled'] = 0 order_event['client_id'] = str(random.randint(1,99999)) order_event['order_id'] = str(random.randint(1,99999)) order_event['localtime'] = signal_time order_event['createtime'] = signal_time self.localOrders[order_event['client_id']] = order_event del(signals[signal_time]) # @timeit def matching(self, data): max_fill = 0 if data[utils.MAX_FILL_INDEX] == 0 else data[utils.MAX_FILL_INDEX] # 最高成交价 min_fill = 9999999999 if data[utils.MIN_FILL_INDEX] == 0 else data[utils.MIN_FILL_INDEX] # 最低成交价 # 本tick产生的pnl pnl = 0.0 # 保存成交时间轴信息 if self._is_plot: self.kd_trade.append(0) self.kk_trade.append(0) self.pd_trade.append(0) self.pk_trade.append(0) # 检查订单是否符合撮合条件 max_kd = 0 max_pk = 0 min_kk = 9999999999 min_pd = 9999999999 # 本ticker now_bp = data[utils.BP_INDEX] now_ap = data[utils.AP_INDEX] self.mp = (now_ap + now_bp)*0.5 match_bp = now_bp match_ap = now_ap if self.last_marketdata != []: # 前ticker bp = self.last_marketdata[utils.BP_INDEX] ap = self.last_marketdata[utils.AP_INDEX] localOrdersCid = list(self.localOrders) # 初始化成交标记 filled_flag = 0 filled_fee = 0.0 filled_price = 0.0 # 检查所有挂单是否被成交 for cid in localOrdersCid: # 为每个订单重置成交标记 filled_flag = 0 filled_fee = 0.0 filled_price = 0.0 # 获取订单 i = self.localOrders[cid] # 买单 if i["side"] == "kd": if self._is_plot: max_kd = max(max_kd,i['price']) # 判断吃单成交还是挂单成交 if i['price'] > ap: # 吃单成交 filled_flag = 1 filled_fee = self.fee.taker filled_price = ap elif i["price"] > match_ap: # 挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] elif i["price"] > min_fill: # 一定概率被挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] if filled_flag: del(self.localOrders[cid]) self.trade_num += 1 value = i['amount'] * filled_price self.pos.longAvg = (self.pos.longPos * self.pos.longAvg + value)/(self.pos.longPos + i['amount']) self.pos.longPos += i['amount'] pnl -= filled_fee * value self.kd_time = self.strategy.local_time if self._is_plot: self.kd_trade[-1] = filled_price elif i["side"] == "pk": if self._is_plot: max_pk = max(max_pk,i['price']) # 判断吃单成交还是挂单成交 if i["price"] > ap: # 吃单成交 filled_flag = 1 filled_fee = self.fee.taker filled_price = ap elif i["price"] > match_ap: # 挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] elif i["price"] > min_fill: # 一定概率被挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] if filled_flag: del(self.localOrders[cid]) self.trade_num += 1 pct = (self.pos.shortAvg - filled_price)/self.pos.shortAvg value = i['amount'] * filled_price pnl += pct * value - filled_fee * value self.pos.shortPos -= i['amount'] if self._is_plot: self.pk_trade[-1] = filled_price self.hold_times.append(self.strategy.local_time - self.kk_time) else: if self.avg_hold_time == 0.0: self.avg_hold_time = self.strategy.local_time - self.kk_time else: self.avg_hold_time = self.avg_hold_time * 0.9 + (self.strategy.local_time - self.kk_time) * 0.1 # 卖单 elif i["side"] == "kk": if self._is_plot: min_kk = min(min_kk,i['price']) # 判断吃单成交还是挂单成交 if i["price"] < bp: # 吃单成交 filled_flag = 1 filled_fee = self.fee.taker filled_price = i["price"] elif i["price"] < match_bp: # 挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] elif i["price"] < max_fill: # 一定概率挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] if filled_flag: del(self.localOrders[cid]) self.trade_num += 1 value = filled_price * i['amount'] self.pos.shortAvg = (self.pos.shortAvg*self.pos.shortPos + value)/(self.pos.shortPos + i['amount']) self.pos.shortPos += i['amount'] pnl -= filled_fee * value self.kk_time = self.strategy.local_time if self._is_plot: self.kk_trade[-1] = filled_price elif i["side"] == "pd": if self._is_plot: min_pd = min(min_pd,i['price']) # 判断吃单成交还是挂单成交 if i["price"] < bp: # 吃单成交 filled_flag = 1 filled_fee = self.fee.taker filled_price = i["price"] elif i["price"] < match_bp: # 挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] elif i["price"] < max_fill: # 一定概率挂单成交 filled_flag = 1 filled_fee = self.fee.maker filled_price = i["price"] if filled_flag: del(self.localOrders[cid]) self.trade_num += 1 pct = (filled_price - self.pos.longAvg)/self.pos.longAvg value = i['amount'] * i['price'] pnl += pct * value - filled_fee * value self.pos.longPos -= i['amount'] if self._is_plot: self.pd_trade[-1] = filled_price self.hold_times.append(self.strategy.local_time - self.kd_time) else: if self.avg_hold_time == 0.0: self.avg_hold_time = self.strategy.local_time - self.kd_time else: self.avg_hold_time = self.avg_hold_time * 0.9 + (self.strategy.local_time - self.kd_time) * 0.1 # 当前浮动盈亏 unrealized_pnl = 0 if self.pos.longPos > 0: unrealized_pnl += (self.mp - self.pos.longAvg)/self.pos.longAvg * self.pos.longPos * self.mp if self.pos.shortPos > 0: unrealized_pnl += (self.pos.shortAvg - self.mp)/self.pos.shortAvg * self.pos.shortPos * self.mp # 如果需要画图 if self._is_plot: # 保存挂单信息时间轴 max_kd = 0 if max_kd == 0 else max_kd max_pk = 0 if max_pk == 0 else max_pk min_kk = 0 if min_kk == 9999999999 else min_kk min_pd = 0 if min_pd == 9999999999 else min_pd self.localOrders_timeseries.append([max_kd,max_pk,min_kk,min_pd]) self.priceMotion.append(self.strategy.mp) # 参考价格 self.reference_price.append(self.strategy.ref_price) # 成交情况 self.fill_price.append([max_fill,min_fill]) # 持仓情况 self.long_hold.append(self.pos.longPos) self.short_hold.append(self.pos.shortPos) # 净值 self.index.append(self.strategy.equity) # 记录浮动净值 self.balance.append(self.equity + pnl + unrealized_pnl) # 记录净值 包含未实现盈亏 self.equity_real = self.equity + pnl + unrealized_pnl # 记录净值 已实现盈亏 self.equity = self.equity + pnl # 记录最高净值记录 self.equity_high = max(self.equity_high, self.equity) # 结束本次回测 # 更新上一次的行情记录 self.last_marketdata = data # @timeit def run_by_tick(self, tradeMsg:model.TraderMsg): # 更新策略本地时间 self.strategy.local_time = self.backtest_time # 执行信号 self.handle_signal(self.signal_series) # 用最新行情进行撮合 self.matching(tradeMsg.market) # 更新账户信息 tradeMsg.cash = self.equity tradeMsg.orders = self.localOrders tradeMsg.position = self.pos # 产生本tick信号 signal = self.strategy.onTime(tradeMsg) self.signal_series[self.backtest_time] = signal self.last_signal_time = self.backtest_time