| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- import asyncio
- from aiohttp import web
- import traceback
- import time
- import strategy
- import backtest
- import utils
- import model
- import logging, logging.handlers
- import signal
- import os, json, sys
- import csv
- import predictor
- import subprocess
- from decimal import Decimal
- import gc
- import broker
- VERSION = utils.VERSION
- def timeit(func):
- def wrapper(*args, **kwargs):
- nowTime = time.time()
- res = func(*args, **kwargs)
- spend_time = time.time() - nowTime
- spend_time = round(spend_time * 100, 2)
- print(f'{func.__name__} 耗时 {spend_time} ms')
- return res
- return wrapper
- class Dummy:
- def __init__(self, params:model.Config, logname=None):
- print('############### Dummy System ################')
- print(f'>>> 版本号v{VERSION} <<<')
- print('*** 当前配置')
- self.params = params
- for p in self.params.__dict__:
- print('***', p, ' => ', getattr(self.params, p))
- print('##################################################')
- pid = os.getpid()
- print(f'Dummpy System 正在启动 进程号{pid}...')
- self.pid_start_time = time.time()
- self.logger = self.get_logger(logname)
- self.acct_name = self.params.account_name
- self.symbol = self.params.pair
- self.loop = asyncio.get_event_loop()
- self.interval = float(self.params.interval)
- self.exchange = self.params.exchange
- self.tradeMsg = model.TraderMsg()
- self.exit_msg = "正常退出"
- # 现货特殊变量
- self.is_first = 1
- # 参考盘口名称列表
- self.ref_names = []
- self.tickers = dict()
- self.tickers_update_time = dict()
- for i in range(len(self.params.refexchange)):
- refex = self.params.refexchange[i]
- pair = self.params.refpair[i]
- name = refex + '@' + pair
- self.ref_names.append(name)
- self.tickers[name] = dict()
- self.tickers_update_time[name] = time.time()
- # 参考盘口tick更新时间
- # 创建ws实例
- self.wss = dict()
- name = self.exchange+'@'+self.params.pair
- self.trade_name = name
- cp = model.ClientParams()
- cp.name = name
- cp.pair = self.params.pair
- cp.access_key = self.params.access_key
- cp.secret_key = self.params.secret_key
- cp.pass_key = self.params.pass_key
- cp.interval = self.params.interval
- cp.broker_id = self.params.broker_id
- cp.debug = self.params.debug
- cp.proxy = self.params.proxy
- cp.interval = self.params.interval
- self.ws = broker.newWs(self.exchange)(cp)
- self.ws.logger = self.logger
- self.ready = 0
- # 参考盘口
- for i,name in enumerate(self.ref_names):
- cp = model.ClientParams()
- cp.name = name
- cp.pair = self.params.refpair[i]
- cp.proxy = self.params.proxy
- cp.interval = self.params.interval
- self.wss[name] = broker.newWs(self.params.refexchange[i])(cp)
- self.wss[name].callback = {
- 'onTicker':self.update_ticker,
- 'onDepth':self.update_depth,
- }
- self.wss[name].logger = self.logger
- # 添加回调
- self.ws.callback = {
- 'onTicker':self.update_ticker,
- 'onDepth':self.update_depth,
- 'onPosition':self.update_position,
- 'onAccount':self.update_account,
- 'onEquity':self.update_equity,
- 'onFreeEquity':self.update_free_equity,
- 'onOrder':self.update_order,
- }
- # 配置定价模型
- self.Predictor = predictor.Predictor(ref_name=self.ref_names)
- # 配置实时回测
- # 基础参数 当找不到盈利参数时使用
- self.base_open = float(self.params.open)
- self.base_close = float(self.params.close)
- self.base_index = 0
- self.base_profit = 0.0
- self.backtest_tasks = list()
- self.backtest_start_equity = 1000000.0
- for _open in [0.001,0.002,0.003]:
- for _G in [0.2]:
- for _index in range(len(self.ref_names)):
- # 采用虚拟合约交易策略进行实时回测
- _close = round(_open * _G, 5)
- task = dict()
- st = strategy.Strategy(self.params, is_print=0)
- st.leverrate = 1.0
- st.trade_open_dist = _open
- st.trade_close_dist = _close
- st.ref_index = _index
- st.exchange = 'dummy_usdt_swap'
- st.local_start_time = 0.0
- bt = backtest.Backtest(st, is_plot=0)
- bt.start_cash = self.backtest_start_equity
- task["backtest_engine"] = bt
- task["open"] = _open
- task["close"] = _close
- task["index"] = _index
- self.backtest_tasks.append(task)
- def get_logger(self, logname):
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.DEBUG)
- # log to txt
- formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
- if logname == None: logname = "log"
- handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*50,encoding='utf-8')
- handler.setLevel(logging.DEBUG)
- handler.setFormatter(formatter)
- # log to console
- console = logging.StreamHandler()
- console.setLevel(logging.INFO)
- logger.addHandler(handler)
- logger.addHandler(console)
- logger.info('开启日志记录')
- return logger
- def update_order(self, data):
- pass
- def update_position(self, data):
- pass
- def update_ticker(self, data):
- '''更新ticker信息'''
- name = data['name']
- # 记录深度更新时间
- self.tickers_update_time[name] = time.time()
- self.tickers[name] = data
-
- def update_depth(self, data):
- '''更新depth信息'''
- name = data['name']
- # 记录深度更新时间
- self.tickers_update_time[name] = time.time()
- def update_equity(self, data):
- pass
- def update_free_equity(self, data):
- pass
- def update_account(self, data):
- '''更新账户信息'''
- pass
- def update_trade_msg(self):
- pass
- def get_all_tickers(self):
- '''
- 组合最新价格信息
- 有depth用mp
- 两depth之间用lp
- '''
- ref_tickers = []
- for i in self.ref_names:
- ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']])
- return ref_tickers
- def real_time_back_test(self, data):
- '''
- 按照长短期回测利润选择参数
- 优先按长期回测利润选参数 如果找不到就
- 再按短期回测利润选参数 如果还找不到就
- 使用默认参数 如果默认参数亏损就触发冷静期
- '''
- now_time = time.time()
- for i in self.backtest_tasks:
- i["backtest_engine"].backtest_time = now_time
- i["backtest_engine"].run_by_tick(data)
- def choose_params(self):
- '''
- 按照长短期回测利润选择参数
- 优先按长期回测利润选参数 如果找不到就
- 再按短期回测利润选参数 如果还找不到就
- 使用默认参数 如果默认参数亏损就触发冷静期
- '''
- profits = []
- for i in self.backtest_tasks:
- equity = i["backtest_engine"].equity
- # 直接按收益排序 简单粗暴有效
- # 转换为预估日化收益
- profit = (equity-self.backtest_start_equity)/self.backtest_start_equity/(time.time()-self.pid_start_time)*86400.0
- profits.append(profit) # 利润
- # 按回测结果调整参数
- # 查找利润
- max_index = profits.index(max(profits))
- max_profit = max(profits)
- self.base_close = self.backtest_tasks[max_index]["close"]
- self.base_open = self.backtest_tasks[max_index]["open"]
- self.base_index = self.backtest_tasks[max_index]["index"]
- self.base_profit = max_profit
- return
- def check_risk(self):
- '''检查风控'''
- ###### 行情更新异常风控 ######
- for name in self.ref_names:
- delay = round((time.time() - self.tickers_update_time[name]) * 1000, 3)
- if delay > 60000: # 60s
- msg = f"{self.acct_name} ref_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
- self.logger.error(msg)
- # self.loop.create_task(utils.ding(msg, 1, self.params.webhook, self.params.proxy))
- self.stop()
- def print_backtest_results(self):
- if self.base_profit > 0.0:
- self.logger.info(f"exchange:{self.params.exchange} pair:{self.params.pair} open:{self.base_open} close:{self.base_close} index:{self.base_index} profit:{self.base_profit}")
- else:
- self.logger.info(f'无盈利结果 {self.base_profit}')
- async def exit(self, delay=0):
- '''退出操作'''
- self.logger.info(f"开始退出操作 delay{delay}")
- if delay > 0:
- await asyncio.sleep(delay)
- self.logger.info(f'停机退出 {self.exit_msg}')
- await asyncio.sleep(1)
- print("停机...")
- # self.loop.create_task(utils.ding(f"{self.acct_name} Dummy System 停止", 1, self.params.webhook, self.params.proxy))
- self.loop.stop()
- os._exit(0)
- async def on_timer(self):
- '''定期触发系统逻辑'''
- # self.loop.create_task(utils.ding(f"{self.acct_name} Dummy System 启动", 1, self.params.webhook, self.params.proxy))
- await asyncio.sleep(5)
- push_time = utils.DUMMY_EARLY_STOP_SECOND * 0.5
- start_time = time.time()
- while 1:
- try:
- ####
- await asyncio.sleep(60)
- # 检查风控
- self.check_risk()
- # 打印回测结果
- self.print_backtest_results()
- # 参数调优
- self.choose_params()
- # 发送钉钉
- if time.time() - start_time > push_time:
- await utils._post_params(
- "http://wwww.khods.com:8888/post_dummy_params",
- self.params.proxy,
- json.dumps({
- "exchange":self.params.exchange,
- "pair":self.params.pair,
- "open":self.base_open,
- "close":self.base_close,
- "refexchange":self.params.refexchange,
- "refpair":self.params.refpair,
- "profit":self.base_profit
- })
- )
- except Exception as e:
- print("定时循环系统出错"+str(e))
- self.logger.error(traceback.print_exc())
- await asyncio.sleep(10)
- async def early_stop_loop(self):
- '''判断是否需要早停'''
- while 1:
- try:
- # 1
- await asyncio.sleep(utils.DUMMY_EARLY_STOP_SECOND)
- # 2
- if self.base_profit <= 0.0:
- self.exit_msg = "触发早停条件"
- self.stop()
- except:
- self.logger.error(traceback.format_exc())
- def get_all_market_data(self):
- '''
- 只能定时触发
- 组合市场信息=交易盘口+参考盘口1+参考盘口2...
- '''
- market = []
- data = self.ws._get_data()["data"]
- if data == []:return None
- market += data
- for name in self.ref_names:
- data = self.wss[name]._get_data()["data"]
- if data == []:return None
- market += data
- return market
- async def run_stratey(self):
- '''定期触发策略'''
- print('定时触发器启动')
- # 准备交易
- try:
- await asyncio.sleep(10)
- while 1:
- await asyncio.sleep(self.interval)
- ### 是否准备充分
- if self.ready:
- ### 更新市场数据
- all_market = self.get_all_market_data()
- ### 更新预测值
- self.Predictor.onTime(all_market)
- self.tradeMsg.market = all_market
- ### 更新交易数据
- self.update_trade_msg()
- ### 更新参考价格
- self.tradeMsg.ref_price = self.Predictor.Get_ref(self.get_all_tickers())
- self.real_time_back_test(self.tradeMsg)
- else:
- self.check_ready()
- except Exception as e:
- print(e)
- self.logger.error(e)
- traceback.print_exc()
- await asyncio.sleep(10)
-
- def check_ready(self):
- '''
- 判断初始数据是否齐全
- '''
- ### 检查 ticker 行情
- # for m in self.ref_names:
- # if m not in self.tickers:
- # return
- # else:
- # if self.tickers[m]['bp'] == 0 or self.tickers[m]['ap'] == 0:
- # return
- # else:
- # print('ref ticker 未准备好')
- # if self.trade_name not in self.tickers:
- # return
- # else:
- # if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0:
- # return
- # else:
- # print('trade ticker 未准备好')
- ### 检查 market 行情
- all_market = self.get_all_market_data()
- if len(all_market) != utils.LEN*(1+len(self.ref_names)):
- self.logger.error("聚合行情未准备好")
- return
- else:
- # 如果行情已经就绪 预热trademsg和predictor
- self.tradeMsg.market = all_market
- self.Predictor.onTime(all_market)
- self.ready = 1
- async def server_handle(self, request):
- '''中控数据接口'''
- return web.Response(body=json.dumps({
- "wallet_balance":1+self.base_profit,
- "cross_wallet_balance":0,
- "unrealized_pn_l":0,
- "position_amount":0,
- "entry_price":0,
- "accumulated_realized":0,
- "now_price":(self.tickers[self.trade_name]['bp']+self.tickers[self.trade_name]['ap'])*0.5,
- }))
- async def _run_server(self):
- print('server正在启动...')
- app = web.Application()
- app.router.add_route('GET', '/account', self.server_handle)
- try:
- self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False))
- except:
- self.logger.error(f"Server启动失败")
- self.logger.error(traceback.format_exc())
- self.exit_msg = "服务启动失败 停机退出"
- self.stop()
- def stop(self):
- self.logger.info(f'进入停机流程...')
- self.loop.create_task(self.exit(delay=1))
- def run(self):
- '''启动ws行情获取'''
-
- tasks = []
- # 使用全市场行情
- for i in self.wss:
- tasks.append(asyncio.ensure_future(self.wss[i].run()))
- # 策略
- for i in [
- asyncio.ensure_future(self.ws.run(is_auth=0, sub_trade=1)),
- asyncio.ensure_future(self.run_stratey()),
- asyncio.ensure_future(self.on_timer()),
- asyncio.ensure_future(self.early_stop_loop()),
- asyncio.ensure_future(self._run_server()),
- ]:
- tasks.append(i)
- def keyboard_interrupt(s, f):
- self.logger.info("收到退出信号 准备关机")
- self.stop()
- try:
- signal.signal(signal.SIGINT, keyboard_interrupt)
- signal.signal(signal.SIGTERM, keyboard_interrupt)
- if 'win' not in sys.platform:
- signal.signal(signal.SIGKILL, keyboard_interrupt)
- signal.signal(signal.SIGQUIT, keyboard_interrupt)
- except:
- pass
- self.loop.run_until_complete(asyncio.wait(tasks))
- if __name__ == "__main__":
- if 0:
- utils.check_auth()
- pnum = len(sys.argv)
- if pnum > 0:
- fname = None
- log_file = None
- pidnum = None
- for i in range(pnum):
- print(f"第{i}个参数为:{sys.argv[i]}")
- if sys.argv[i] == '-c' or sys.argv[i] == '--c':
- fname = sys.argv[i+1]
- elif sys.argv[i] == '-h':
- print("帮助文档")
- elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file':
- log_file = sys.argv[i+1]
- elif sys.argv[i] == '-num' or sys.argv[i] == '--num':
- pidnum = sys.argv[i+1]
- elif sys.argv[i] == '-v' or sys.argv[i] == '--v':
- print(f"当前版本为 V{VERSION}")
- if fname and log_file and pidnum:
- print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum}")
- date = time.strftime("%Y%m%d", time.localtime())
- logname = f"{log_file}-{date}"
- quant = Dummy(utils.get_params(fname), logname)
- quant.run()
- elif fname:
- print(f"运行指定配置文件{fname}")
- quant = Dummy(utils.get_params(fname))
- quant.run()
- else:
- print("缺少指定参数 运行默认配置文件")
- fname = 'config_dummy.toml'
- quant = Dummy(utils.get_params(fname))
- quant.run()
- else:
- fname = 'config_dummy.toml'
- quant = Dummy(utils.get_params(fname))
- quant.run()
|