import asyncio from aiohttp import web import traceback import time import strategy import backtest import utils import model import logging, logging.handlers import signal import os, json, sys import csv import predictor import subprocess from decimal import Decimal import gc import broker VERSION = utils.VERSION def timeit(func): def wrapper(*args, **kwargs): nowTime = time.time() res = func(*args, **kwargs) spend_time = time.time() - nowTime spend_time = round(spend_time * 100, 2) print(f'{func.__name__} 耗时 {spend_time} ms') return res return wrapper class Dummy: def __init__(self, params:model.Config, logname=None): print('############### Dummy System ################') print(f'>>> 版本号v{VERSION} <<<') print('*** 当前配置') self.params = params for p in self.params.__dict__: print('***', p, ' => ', getattr(self.params, p)) print('##################################################') pid = os.getpid() print(f'Dummpy System 正在启动 进程号{pid}...') self.pid_start_time = time.time() self.logger = self.get_logger(logname) self.acct_name = self.params.account_name self.symbol = self.params.pair self.loop = asyncio.get_event_loop() self.interval = float(self.params.interval) self.exchange = self.params.exchange self.tradeMsg = model.TraderMsg() self.exit_msg = "正常退出" # 现货特殊变量 self.is_first = 1 # 参考盘口名称列表 self.ref_names = [] self.tickers = dict() self.tickers_update_time = dict() for i in range(len(self.params.refexchange)): refex = self.params.refexchange[i] pair = self.params.refpair[i] name = refex + '@' + pair self.ref_names.append(name) self.tickers[name] = dict() self.tickers_update_time[name] = time.time() # 参考盘口tick更新时间 # 创建ws实例 self.wss = dict() name = self.exchange+'@'+self.params.pair self.trade_name = name cp = model.ClientParams() cp.name = name cp.pair = self.params.pair cp.access_key = self.params.access_key cp.secret_key = self.params.secret_key cp.pass_key = self.params.pass_key cp.interval = self.params.interval cp.broker_id = self.params.broker_id cp.debug = self.params.debug cp.proxy = self.params.proxy cp.interval = self.params.interval self.ws = broker.newWs(self.exchange)(cp) self.ws.logger = self.logger self.ready = 0 # 参考盘口 for i,name in enumerate(self.ref_names): cp = model.ClientParams() cp.name = name cp.pair = self.params.refpair[i] cp.proxy = self.params.proxy cp.interval = self.params.interval self.wss[name] = broker.newWs(self.params.refexchange[i])(cp) self.wss[name].callback = { 'onTicker':self.update_ticker, 'onDepth':self.update_depth, } self.wss[name].logger = self.logger # 添加回调 self.ws.callback = { 'onTicker':self.update_ticker, 'onDepth':self.update_depth, 'onPosition':self.update_position, 'onAccount':self.update_account, 'onEquity':self.update_equity, 'onFreeEquity':self.update_free_equity, 'onOrder':self.update_order, } # 配置定价模型 self.Predictor = predictor.Predictor(ref_name=self.ref_names) # 配置实时回测 # 基础参数 当找不到盈利参数时使用 self.base_open = float(self.params.open) self.base_close = float(self.params.close) self.base_index = 0 self.base_profit = 0.0 self.backtest_tasks = list() self.backtest_start_equity = 1000000.0 for _open in [0.001,0.002,0.003]: for _G in [0.2]: for _index in range(len(self.ref_names)): # 采用虚拟合约交易策略进行实时回测 _close = round(_open * _G, 5) task = dict() st = strategy.Strategy(self.params, is_print=0) st.leverrate = 1.0 st.trade_open_dist = _open st.trade_close_dist = _close st.ref_index = _index st.exchange = 'dummy_usdt_swap' st.local_start_time = 0.0 bt = backtest.Backtest(st, is_plot=0) bt.start_cash = self.backtest_start_equity task["backtest_engine"] = bt task["open"] = _open task["close"] = _close task["index"] = _index self.backtest_tasks.append(task) def get_logger(self, logname): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') if logname == None: logname = "log" handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*50,encoding='utf-8') handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) # log to console console = logging.StreamHandler() console.setLevel(logging.INFO) logger.addHandler(handler) logger.addHandler(console) logger.info('开启日志记录') return logger def update_order(self, data): pass def update_position(self, data): pass def update_ticker(self, data): '''更新ticker信息''' name = data['name'] # 记录深度更新时间 self.tickers_update_time[name] = time.time() self.tickers[name] = data def update_depth(self, data): '''更新depth信息''' name = data['name'] # 记录深度更新时间 self.tickers_update_time[name] = time.time() def update_equity(self, data): pass def update_free_equity(self, data): pass def update_account(self, data): '''更新账户信息''' pass def update_trade_msg(self): pass def get_all_tickers(self): ''' 组合最新价格信息 有depth用mp 两depth之间用lp ''' ref_tickers = [] for i in self.ref_names: ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']]) return ref_tickers def real_time_back_test(self, data): ''' 按照长短期回测利润选择参数 优先按长期回测利润选参数 如果找不到就 再按短期回测利润选参数 如果还找不到就 使用默认参数 如果默认参数亏损就触发冷静期 ''' now_time = time.time() for i in self.backtest_tasks: i["backtest_engine"].backtest_time = now_time i["backtest_engine"].run_by_tick(data) def choose_params(self): ''' 按照长短期回测利润选择参数 优先按长期回测利润选参数 如果找不到就 再按短期回测利润选参数 如果还找不到就 使用默认参数 如果默认参数亏损就触发冷静期 ''' profits = [] for i in self.backtest_tasks: equity = i["backtest_engine"].equity # 直接按收益排序 简单粗暴有效 # 转换为预估日化收益 profit = (equity-self.backtest_start_equity)/self.backtest_start_equity/(time.time()-self.pid_start_time)*86400.0 profits.append(profit) # 利润 # 按回测结果调整参数 # 查找利润 max_index = profits.index(max(profits)) max_profit = max(profits) self.base_close = self.backtest_tasks[max_index]["close"] self.base_open = self.backtest_tasks[max_index]["open"] self.base_index = self.backtest_tasks[max_index]["index"] self.base_profit = max_profit return def check_risk(self): '''检查风控''' ###### 行情更新异常风控 ###### for name in self.ref_names: delay = round((time.time() - self.tickers_update_time[name]) * 1000, 3) if delay > 60000: # 60s msg = f"{self.acct_name} ref_name:{name} delay:{delay}ms 行情更新延迟过高 退出" self.logger.error(msg) # self.loop.create_task(utils.ding(msg, 1, self.params.webhook, self.params.proxy)) self.stop() def print_backtest_results(self): if self.base_profit > 0.0: self.logger.info(f"exchange:{self.params.exchange} pair:{self.params.pair} open:{self.base_open} close:{self.base_close} index:{self.base_index} profit:{self.base_profit}") else: self.logger.info(f'无盈利结果 {self.base_profit}') async def exit(self, delay=0): '''退出操作''' self.logger.info(f"开始退出操作 delay{delay}") if delay > 0: await asyncio.sleep(delay) self.logger.info(f'停机退出 {self.exit_msg}') await asyncio.sleep(1) print("停机...") # self.loop.create_task(utils.ding(f"{self.acct_name} Dummy System 停止", 1, self.params.webhook, self.params.proxy)) self.loop.stop() os._exit(0) async def on_timer(self): '''定期触发系统逻辑''' # self.loop.create_task(utils.ding(f"{self.acct_name} Dummy System 启动", 1, self.params.webhook, self.params.proxy)) await asyncio.sleep(5) push_time = utils.DUMMY_EARLY_STOP_SECOND * 0.5 start_time = time.time() while 1: try: #### await asyncio.sleep(60) # 检查风控 self.check_risk() # 打印回测结果 self.print_backtest_results() # 参数调优 self.choose_params() # 发送钉钉 if time.time() - start_time > push_time: await utils._post_params( "http://wwww.khods.com:8888/post_dummy_params", self.params.proxy, json.dumps({ "exchange":self.params.exchange, "pair":self.params.pair, "open":self.base_open, "close":self.base_close, "refexchange":self.params.refexchange, "refpair":self.params.refpair, "profit":self.base_profit }) ) except Exception as e: print("定时循环系统出错"+str(e)) self.logger.error(traceback.print_exc()) await asyncio.sleep(10) async def early_stop_loop(self): '''判断是否需要早停''' while 1: try: # 1 await asyncio.sleep(utils.DUMMY_EARLY_STOP_SECOND) # 2 if self.base_profit <= 0.0: self.exit_msg = "触发早停条件" self.stop() except: self.logger.error(traceback.format_exc()) def get_all_market_data(self): ''' 只能定时触发 组合市场信息=交易盘口+参考盘口1+参考盘口2... ''' market = [] data = self.ws._get_data()["data"] if data == []:return None market += data for name in self.ref_names: data = self.wss[name]._get_data()["data"] if data == []:return None market += data return market async def run_stratey(self): '''定期触发策略''' print('定时触发器启动') # 准备交易 try: await asyncio.sleep(10) while 1: await asyncio.sleep(self.interval) ### 是否准备充分 if self.ready: ### 更新市场数据 all_market = self.get_all_market_data() ### 更新预测值 self.Predictor.onTime(all_market) self.tradeMsg.market = all_market ### 更新交易数据 self.update_trade_msg() ### 更新参考价格 self.tradeMsg.ref_price = self.Predictor.Get_ref(self.get_all_tickers()) self.real_time_back_test(self.tradeMsg) else: self.check_ready() except Exception as e: print(e) self.logger.error(e) traceback.print_exc() await asyncio.sleep(10) def check_ready(self): ''' 判断初始数据是否齐全 ''' ### 检查 ticker 行情 # for m in self.ref_names: # if m not in self.tickers: # return # else: # if self.tickers[m]['bp'] == 0 or self.tickers[m]['ap'] == 0: # return # else: # print('ref ticker 未准备好') # if self.trade_name not in self.tickers: # return # else: # if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0: # return # else: # print('trade ticker 未准备好') ### 检查 market 行情 all_market = self.get_all_market_data() if len(all_market) != utils.LEN*(1+len(self.ref_names)): self.logger.error("聚合行情未准备好") return else: # 如果行情已经就绪 预热trademsg和predictor self.tradeMsg.market = all_market self.Predictor.onTime(all_market) self.ready = 1 async def server_handle(self, request): '''中控数据接口''' return web.Response(body=json.dumps({ "wallet_balance":1+self.base_profit, "cross_wallet_balance":0, "unrealized_pn_l":0, "position_amount":0, "entry_price":0, "accumulated_realized":0, "now_price":(self.tickers[self.trade_name]['bp']+self.tickers[self.trade_name]['ap'])*0.5, })) async def _run_server(self): print('server正在启动...') app = web.Application() app.router.add_route('GET', '/account', self.server_handle) try: self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False)) except: self.logger.error(f"Server启动失败") self.logger.error(traceback.format_exc()) self.exit_msg = "服务启动失败 停机退出" self.stop() def stop(self): self.logger.info(f'进入停机流程...') self.loop.create_task(self.exit(delay=1)) def run(self): '''启动ws行情获取''' tasks = [] # 使用全市场行情 for i in self.wss: tasks.append(asyncio.ensure_future(self.wss[i].run())) # 策略 for i in [ asyncio.ensure_future(self.ws.run(is_auth=0, sub_trade=1)), asyncio.ensure_future(self.run_stratey()), asyncio.ensure_future(self.on_timer()), asyncio.ensure_future(self.early_stop_loop()), asyncio.ensure_future(self._run_server()), ]: tasks.append(i) def keyboard_interrupt(s, f): self.logger.info("收到退出信号 准备关机") self.stop() try: signal.signal(signal.SIGINT, keyboard_interrupt) signal.signal(signal.SIGTERM, keyboard_interrupt) if 'win' not in sys.platform: signal.signal(signal.SIGKILL, keyboard_interrupt) signal.signal(signal.SIGQUIT, keyboard_interrupt) except: pass self.loop.run_until_complete(asyncio.wait(tasks)) if __name__ == "__main__": if 0: utils.check_auth() pnum = len(sys.argv) if pnum > 0: fname = None log_file = None pidnum = None for i in range(pnum): print(f"第{i}个参数为:{sys.argv[i]}") if sys.argv[i] == '-c' or sys.argv[i] == '--c': fname = sys.argv[i+1] elif sys.argv[i] == '-h': print("帮助文档") elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file': log_file = sys.argv[i+1] elif sys.argv[i] == '-num' or sys.argv[i] == '--num': pidnum = sys.argv[i+1] elif sys.argv[i] == '-v' or sys.argv[i] == '--v': print(f"当前版本为 V{VERSION}") if fname and log_file and pidnum: print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum}") date = time.strftime("%Y%m%d", time.localtime()) logname = f"{log_file}-{date}" quant = Dummy(utils.get_params(fname), logname) quant.run() elif fname: print(f"运行指定配置文件{fname}") quant = Dummy(utils.get_params(fname)) quant.run() else: print("缺少指定参数 运行默认配置文件") fname = 'config_dummy.toml' quant = Dummy(utils.get_params(fname)) quant.run() else: fname = 'config_dummy.toml' quant = Dummy(utils.get_params(fname)) quant.run()