| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- '''
- 基于深度信息的异步事件触发回测框架
- 作者 千千量化
- '''
- import os
- import sys
- import asyncio
- import json, ujson
- import requests
- import hmac
- import base64
- import zlib
- import datetime
- import time
- import strategy as strategy
- import traceback
- import joblib
- import traceback
- import configparser
- import signal
- import random
- import utils
- import model
- import time
- import copy
- import random
- def timeit(func):
- def wrapper(*args, **kwargs):
- nowTime = time.time()
- func(*args, **kwargs)
- spend_time = time.time() - nowTime
- spend_time = round(spend_time * 100, 5)
- print(f'{func.__name__} 耗时 {spend_time} ms')
- return wrapper
- class Backtest:
- '''
- 回测框架
- '''
- def __init__(self, strategy:strategy.Strategy, data=None, is_plot=0):
- '''
- 初始化实盘类
- '''
- # 获取策略对象
- self.strategy = strategy
- # 历史数据
- self.data = data
- # 交易品种
- self.symbol = 'backtest_symbol'
- # 虚拟挂单
- self.localOrders = dict()
- # 虚拟仓位
- self.pos = model.Position()
- # 中间价
- self.mp = None
- # 行情数据
- self.reference_price = []
- self.last_marketdata = []
- # 费率 maker taker
- self.fee = model.BacktestFee("v9")
- # 是否画图
- self._is_plot = is_plot
- self.long_hold = []
- self.short_hold = []
- self.index = []
- # 虚拟成交信息
- self.trade_num = 0
- # 记录持仓时间
- self.kd_time = None
- self.kk_time = None
- self.hold_times = [0.0]
- # 权益
- self.start_cash = 1000000.0
- self.equity = self.start_cash
- self.equity_real = self.start_cash
- self.equity_high = self.start_cash
- self.balance = [self.start_cash]
- self.strategy.start_cash = self.start_cash
- self.strategy.local_start_time = 0.0
- # 成交时间轴
- self.priceMotion = []
- self.kd_trade = []
- self.kk_trade = []
- self.pd_trade = []
- self.pk_trade = []
- # 挂单时间轴
- self.localOrders_timeseries = []
- # min max fill
- self.fill_price = []
- # 上次信号
- self.signal_series = dict()
- # 回测延迟 默认值
- self.backtest_delay = utils.BACKTEST_DELAY
- # 回测时间
- self.backtest_time = time.time()
- # 平均持仓时间
- self.avg_hold_time = 0.0
- # 最近一次处理的信号时间戳
- self.last_signal_time = 0.0
-
- def get_available_close_amt(self):
- a_pd, a_pk = self.pos.longPos, self.pos.shortPos
- for i in self.localOrders:
- if self.localOrders[i]["side"] == 'pd':
- a_pd -= self.localOrders[i]["amount"]
- if self.localOrders[i]["side"] == 'pk':
- a_pk -= self.localOrders[i]["amount"]
- return a_pd, a_pk
- # @timeit
- # 处理策略信号
- def handle_signal(self, signals):
- '''
- 当信号>1tick 且时间超过n ms 才允许进行处理
- '''
- keys = list(signals.keys())
- for signal_time in keys:
- # 只允许处理上一tick时间戳之前的信号
- if signal_time < self.last_signal_time:
- pass
- else:
- continue
- if self.backtest_time - signal_time >= self.backtest_delay:
- signal = signals[signal_time]
- if signal == None:
- return
- for i in signal:
- # 撤销虚拟订单
- if 'Cancel' in i:
- if signal[i][0] in self.localOrders:
- del(self.localOrders[signal[i][0]])
- # 执行虚拟下单
- elif 'Limits_open' in i:
- for j in signal[i]:
- order_event = dict()
- order_event['symbol'] = self.symbol
- order_event['status'] = "NEW"
- order_event['amount'] = float(j[0])
- order_event['side'] = j[1]
- order_event['price'] = float(j[2])
- order_event['filled_price'] = 0
- order_event['filled'] = 0
- order_event['client_id'] = str(random.randint(1,99999))
- order_event['order_id'] = str(random.randint(1,99999))
- order_event['localtime'] = signal_time
- order_event['createtime'] = signal_time
- self.localOrders[order_event['client_id']] = order_event
- elif 'Limits_close' in i:
- a_pd, a_pk = self.get_available_close_amt()
- for j in signal[i]:
- if j[1] == "pd":
- if j[0] > a_pd:
- break
- elif j[1] == "pk":
- if j[0] > a_pk:
- break
- order_event = dict()
- order_event['symbol'] = self.symbol
- order_event['status'] = "NEW"
- order_event['amount'] = float(j[0])
- order_event['side'] = j[1]
- order_event['price'] = float(j[2])
- order_event['filled_price'] = 0
- order_event['filled'] = 0
- order_event['client_id'] = str(random.randint(1,99999))
- order_event['order_id'] = str(random.randint(1,99999))
- order_event['localtime'] = signal_time
- order_event['createtime'] = signal_time
- self.localOrders[order_event['client_id']] = order_event
- del(signals[signal_time])
-
- # @timeit
- def matching(self, data):
- max_fill = 0 if data[utils.MAX_FILL_INDEX] == 0 else data[utils.MAX_FILL_INDEX] # 最高成交价
- min_fill = 9999999999 if data[utils.MIN_FILL_INDEX] == 0 else data[utils.MIN_FILL_INDEX] # 最低成交价
- # 本tick产生的pnl
- pnl = 0.0
- # 保存成交时间轴信息
- if self._is_plot:
- self.kd_trade.append(0)
- self.kk_trade.append(0)
- self.pd_trade.append(0)
- self.pk_trade.append(0)
- # 检查订单是否符合撮合条件
- max_kd = 0
- max_pk = 0
- min_kk = 9999999999
- min_pd = 9999999999
- # 本ticker
- now_bp = data[utils.BP_INDEX]
- now_ap = data[utils.AP_INDEX]
- self.mp = (now_ap + now_bp)*0.5
- match_bp = now_bp
- match_ap = now_ap
- if self.last_marketdata != []:
- # 前ticker
- bp = self.last_marketdata[utils.BP_INDEX]
- ap = self.last_marketdata[utils.AP_INDEX]
- localOrdersCid = list(self.localOrders)
- # 初始化成交标记
- filled_flag = 0
- filled_fee = 0.0
- filled_price = 0.0
- # 检查所有挂单是否被成交
- for cid in localOrdersCid:
- # 为每个订单重置成交标记
- filled_flag = 0
- filled_fee = 0.0
- filled_price = 0.0
- # 获取订单
- i = self.localOrders[cid]
- # 买单
- if i["side"] == "kd":
- if self._is_plot: max_kd = max(max_kd,i['price'])
- # 判断吃单成交还是挂单成交
- if i['price'] > ap: # 吃单成交
- filled_flag = 1
- filled_fee = self.fee.taker
- filled_price = ap
- elif i["price"] > match_ap: # 挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- elif i["price"] > min_fill: # 一定概率被挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- if filled_flag:
- del(self.localOrders[cid])
- self.trade_num += 1
- value = i['amount'] * filled_price
- self.pos.longAvg = (self.pos.longPos * self.pos.longAvg + value)/(self.pos.longPos + i['amount'])
- self.pos.longPos += i['amount']
- pnl -= filled_fee * value
- self.kd_time = self.strategy.local_time
- if self._is_plot:
- self.kd_trade[-1] = filled_price
- elif i["side"] == "pk":
- if self._is_plot: max_pk = max(max_pk,i['price'])
- # 判断吃单成交还是挂单成交
- if i["price"] > ap: # 吃单成交
- filled_flag = 1
- filled_fee = self.fee.taker
- filled_price = ap
- elif i["price"] > match_ap: # 挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- elif i["price"] > min_fill: # 一定概率被挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- if filled_flag:
- del(self.localOrders[cid])
- self.trade_num += 1
- pct = (self.pos.shortAvg - filled_price)/self.pos.shortAvg
- value = i['amount'] * filled_price
- pnl += pct * value - filled_fee * value
- self.pos.shortPos -= i['amount']
- if self._is_plot:
- self.pk_trade[-1] = filled_price
- self.hold_times.append(self.strategy.local_time - self.kk_time)
- else:
- if self.avg_hold_time == 0.0:
- self.avg_hold_time = self.strategy.local_time - self.kk_time
- else:
- self.avg_hold_time = self.avg_hold_time * 0.9 + (self.strategy.local_time - self.kk_time) * 0.1
- # 卖单
- elif i["side"] == "kk":
- if self._is_plot: min_kk = min(min_kk,i['price'])
- # 判断吃单成交还是挂单成交
- if i["price"] < bp: # 吃单成交
- filled_flag = 1
- filled_fee = self.fee.taker
- filled_price = i["price"]
- elif i["price"] < match_bp: # 挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- elif i["price"] < max_fill: # 一定概率挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- if filled_flag:
- del(self.localOrders[cid])
- self.trade_num += 1
- value = filled_price * i['amount']
- self.pos.shortAvg = (self.pos.shortAvg*self.pos.shortPos + value)/(self.pos.shortPos + i['amount'])
- self.pos.shortPos += i['amount']
- pnl -= filled_fee * value
- self.kk_time = self.strategy.local_time
- if self._is_plot:
- self.kk_trade[-1] = filled_price
- elif i["side"] == "pd":
- if self._is_plot: min_pd = min(min_pd,i['price'])
- # 判断吃单成交还是挂单成交
- if i["price"] < bp: # 吃单成交
- filled_flag = 1
- filled_fee = self.fee.taker
- filled_price = i["price"]
- elif i["price"] < match_bp: # 挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- elif i["price"] < max_fill: # 一定概率挂单成交
- filled_flag = 1
- filled_fee = self.fee.maker
- filled_price = i["price"]
- if filled_flag:
- del(self.localOrders[cid])
- self.trade_num += 1
- pct = (filled_price - self.pos.longAvg)/self.pos.longAvg
- value = i['amount'] * i['price']
- pnl += pct * value - filled_fee * value
- self.pos.longPos -= i['amount']
- if self._is_plot:
- self.pd_trade[-1] = filled_price
- self.hold_times.append(self.strategy.local_time - self.kd_time)
- else:
- if self.avg_hold_time == 0.0:
- self.avg_hold_time = self.strategy.local_time - self.kd_time
- else:
- self.avg_hold_time = self.avg_hold_time * 0.9 + (self.strategy.local_time - self.kd_time) * 0.1
- # 当前浮动盈亏
- unrealized_pnl = 0
- if self.pos.longPos > 0:
- unrealized_pnl += (self.mp - self.pos.longAvg)/self.pos.longAvg * self.pos.longPos * self.mp
- if self.pos.shortPos > 0:
- unrealized_pnl += (self.pos.shortAvg - self.mp)/self.pos.shortAvg * self.pos.shortPos * self.mp
- # 如果需要画图
- if self._is_plot:
- # 保存挂单信息时间轴
- max_kd = 0 if max_kd == 0 else max_kd
- max_pk = 0 if max_pk == 0 else max_pk
- min_kk = 0 if min_kk == 9999999999 else min_kk
- min_pd = 0 if min_pd == 9999999999 else min_pd
- self.localOrders_timeseries.append([max_kd,max_pk,min_kk,min_pd])
- self.priceMotion.append(self.strategy.mp)
- # 参考价格
- self.reference_price.append(self.strategy.ref_price)
- # 成交情况
- self.fill_price.append([max_fill,min_fill])
- # 持仓情况
- self.long_hold.append(self.pos.longPos)
- self.short_hold.append(self.pos.shortPos)
- # 净值
- self.index.append(self.strategy.equity)
- # 记录浮动净值
- self.balance.append(self.equity + pnl + unrealized_pnl)
- # 记录净值 包含未实现盈亏
- self.equity_real = self.equity + pnl + unrealized_pnl
- # 记录净值 已实现盈亏
- self.equity = self.equity + pnl
- # 记录最高净值记录
- self.equity_high = max(self.equity_high, self.equity)
- # 结束本次回测
- # 更新上一次的行情记录
- self.last_marketdata = data
- # @timeit
- def run_by_tick(self, tradeMsg:model.TraderMsg):
- # 更新策略本地时间
- self.strategy.local_time = self.backtest_time
- # 执行信号
- self.handle_signal(self.signal_series)
- # 用最新行情进行撮合
- self.matching(tradeMsg.market)
- # 更新账户信息
- tradeMsg.cash = self.equity
- tradeMsg.orders = self.localOrders
- tradeMsg.position = self.pos
- # 产生本tick信号
- signal = self.strategy.onTime(tradeMsg)
- self.signal_series[self.backtest_time] = signal
- self.last_signal_time = self.backtest_time
-
|