dummy.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. import asyncio
  2. from aiohttp import web
  3. import traceback
  4. import time
  5. import strategy
  6. import backtest
  7. import utils
  8. import model
  9. import logging, logging.handlers
  10. import signal
  11. import os, json, sys
  12. import csv
  13. import predictor
  14. import subprocess
  15. from decimal import Decimal
  16. import gc
  17. import broker
  18. VERSION = utils.VERSION
  19. def timeit(func):
  20. def wrapper(*args, **kwargs):
  21. nowTime = time.time()
  22. res = func(*args, **kwargs)
  23. spend_time = time.time() - nowTime
  24. spend_time = round(spend_time * 100, 2)
  25. print(f'{func.__name__} 耗时 {spend_time} ms')
  26. return res
  27. return wrapper
  28. class Dummy:
  29. def __init__(self, params:model.Config, logname=None):
  30. print('############### Dummy System ################')
  31. print(f'>>> 版本号v{VERSION} <<<')
  32. print('*** 当前配置')
  33. self.params = params
  34. for p in self.params.__dict__:
  35. print('***', p, ' => ', getattr(self.params, p))
  36. print('##################################################')
  37. pid = os.getpid()
  38. print(f'Dummpy System 正在启动 进程号{pid}...')
  39. self.pid_start_time = time.time()
  40. self.logger = self.get_logger(logname)
  41. self.acct_name = self.params.account_name
  42. self.symbol = self.params.pair
  43. self.loop = asyncio.get_event_loop()
  44. self.interval = float(self.params.interval)
  45. self.exchange = self.params.exchange
  46. self.tradeMsg = model.TraderMsg()
  47. self.exit_msg = "正常退出"
  48. # 现货特殊变量
  49. self.is_first = 1
  50. # 参考盘口名称列表
  51. self.ref_names = []
  52. self.tickers = dict()
  53. self.tickers_update_time = dict()
  54. for i in range(len(self.params.refexchange)):
  55. refex = self.params.refexchange[i]
  56. pair = self.params.refpair[i]
  57. name = refex + '@' + pair
  58. self.ref_names.append(name)
  59. self.tickers[name] = dict()
  60. self.tickers_update_time[name] = time.time()
  61. # 参考盘口tick更新时间
  62. # 创建ws实例
  63. self.wss = dict()
  64. name = self.exchange+'@'+self.params.pair
  65. self.trade_name = name
  66. cp = model.ClientParams()
  67. cp.name = name
  68. cp.pair = self.params.pair
  69. cp.access_key = self.params.access_key
  70. cp.secret_key = self.params.secret_key
  71. cp.pass_key = self.params.pass_key
  72. cp.interval = self.params.interval
  73. cp.broker_id = self.params.broker_id
  74. cp.debug = self.params.debug
  75. cp.proxy = self.params.proxy
  76. cp.interval = self.params.interval
  77. self.ws = broker.newWs(self.exchange)(cp)
  78. self.ws.logger = self.logger
  79. self.ready = 0
  80. # 参考盘口
  81. for i,name in enumerate(self.ref_names):
  82. cp = model.ClientParams()
  83. cp.name = name
  84. cp.pair = self.params.refpair[i]
  85. cp.proxy = self.params.proxy
  86. cp.interval = self.params.interval
  87. self.wss[name] = broker.newWs(self.params.refexchange[i])(cp)
  88. self.wss[name].callback = {
  89. 'onTicker':self.update_ticker,
  90. 'onDepth':self.update_depth,
  91. }
  92. self.wss[name].logger = self.logger
  93. # 添加回调
  94. self.ws.callback = {
  95. 'onTicker':self.update_ticker,
  96. 'onDepth':self.update_depth,
  97. 'onPosition':self.update_position,
  98. 'onAccount':self.update_account,
  99. 'onEquity':self.update_equity,
  100. 'onFreeEquity':self.update_free_equity,
  101. 'onOrder':self.update_order,
  102. }
  103. # 配置定价模型
  104. self.Predictor = predictor.Predictor(ref_name=self.ref_names)
  105. # 配置实时回测
  106. # 基础参数 当找不到盈利参数时使用
  107. self.base_open = float(self.params.open)
  108. self.base_close = float(self.params.close)
  109. self.base_index = 0
  110. self.base_profit = 0.0
  111. self.backtest_tasks = list()
  112. self.backtest_start_equity = 1000000.0
  113. for _open in [0.001,0.002,0.003]:
  114. for _G in [0.2]:
  115. for _index in range(len(self.ref_names)):
  116. # 采用虚拟合约交易策略进行实时回测
  117. _close = round(_open * _G, 5)
  118. task = dict()
  119. st = strategy.Strategy(self.params, is_print=0)
  120. st.leverrate = 1.0
  121. st.trade_open_dist = _open
  122. st.trade_close_dist = _close
  123. st.ref_index = _index
  124. st.exchange = 'dummy_usdt_swap'
  125. st.local_start_time = 0.0
  126. bt = backtest.Backtest(st, is_plot=0)
  127. bt.start_cash = self.backtest_start_equity
  128. task["backtest_engine"] = bt
  129. task["open"] = _open
  130. task["close"] = _close
  131. task["index"] = _index
  132. self.backtest_tasks.append(task)
  133. def get_logger(self, logname):
  134. logger = logging.getLogger(__name__)
  135. logger.setLevel(logging.DEBUG)
  136. # log to txt
  137. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  138. if logname == None: logname = "log"
  139. handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*50,encoding='utf-8')
  140. handler.setLevel(logging.DEBUG)
  141. handler.setFormatter(formatter)
  142. # log to console
  143. console = logging.StreamHandler()
  144. console.setLevel(logging.INFO)
  145. logger.addHandler(handler)
  146. logger.addHandler(console)
  147. logger.info('开启日志记录')
  148. return logger
  149. def update_order(self, data):
  150. pass
  151. def update_position(self, data):
  152. pass
  153. def update_ticker(self, data):
  154. '''更新ticker信息'''
  155. name = data['name']
  156. # 记录深度更新时间
  157. self.tickers_update_time[name] = time.time()
  158. self.tickers[name] = data
  159. def update_depth(self, data):
  160. '''更新depth信息'''
  161. name = data['name']
  162. # 记录深度更新时间
  163. self.tickers_update_time[name] = time.time()
  164. def update_equity(self, data):
  165. pass
  166. def update_free_equity(self, data):
  167. pass
  168. def update_account(self, data):
  169. '''更新账户信息'''
  170. pass
  171. def update_trade_msg(self):
  172. pass
  173. def get_all_tickers(self):
  174. '''
  175. 组合最新价格信息
  176. 有depth用mp
  177. 两depth之间用lp
  178. '''
  179. ref_tickers = []
  180. for i in self.ref_names:
  181. ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']])
  182. return ref_tickers
  183. def real_time_back_test(self, data):
  184. '''
  185. 按照长短期回测利润选择参数
  186. 优先按长期回测利润选参数 如果找不到就
  187. 再按短期回测利润选参数 如果还找不到就
  188. 使用默认参数 如果默认参数亏损就触发冷静期
  189. '''
  190. now_time = time.time()
  191. for i in self.backtest_tasks:
  192. i["backtest_engine"].backtest_time = now_time
  193. i["backtest_engine"].run_by_tick(data)
  194. def choose_params(self):
  195. '''
  196. 按照长短期回测利润选择参数
  197. 优先按长期回测利润选参数 如果找不到就
  198. 再按短期回测利润选参数 如果还找不到就
  199. 使用默认参数 如果默认参数亏损就触发冷静期
  200. '''
  201. profits = []
  202. for i in self.backtest_tasks:
  203. equity = i["backtest_engine"].equity
  204. # 直接按收益排序 简单粗暴有效
  205. # 转换为预估日化收益
  206. profit = (equity-self.backtest_start_equity)/self.backtest_start_equity/(time.time()-self.pid_start_time)*86400.0
  207. profits.append(profit) # 利润
  208. # 按回测结果调整参数
  209. # 查找利润
  210. max_index = profits.index(max(profits))
  211. max_profit = max(profits)
  212. self.base_close = self.backtest_tasks[max_index]["close"]
  213. self.base_open = self.backtest_tasks[max_index]["open"]
  214. self.base_index = self.backtest_tasks[max_index]["index"]
  215. self.base_profit = max_profit
  216. return
  217. def check_risk(self):
  218. '''检查风控'''
  219. ###### 行情更新异常风控 ######
  220. for name in self.ref_names:
  221. delay = round((time.time() - self.tickers_update_time[name]) * 1000, 3)
  222. if delay > 60000: # 60s
  223. msg = f"{self.acct_name} ref_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
  224. self.logger.error(msg)
  225. # self.loop.create_task(utils.ding(msg, 1, self.params.webhook, self.params.proxy))
  226. self.stop()
  227. def print_backtest_results(self):
  228. if self.base_profit > 0.0:
  229. self.logger.info(f"exchange:{self.params.exchange} pair:{self.params.pair} open:{self.base_open} close:{self.base_close} index:{self.base_index} profit:{self.base_profit}")
  230. else:
  231. self.logger.info(f'无盈利结果 {self.base_profit}')
  232. async def exit(self, delay=0):
  233. '''退出操作'''
  234. self.logger.info(f"开始退出操作 delay{delay}")
  235. if delay > 0:
  236. await asyncio.sleep(delay)
  237. self.logger.info(f'停机退出 {self.exit_msg}')
  238. await asyncio.sleep(1)
  239. print("停机...")
  240. # self.loop.create_task(utils.ding(f"{self.acct_name} Dummy System 停止", 1, self.params.webhook, self.params.proxy))
  241. self.loop.stop()
  242. os._exit(0)
  243. async def on_timer(self):
  244. '''定期触发系统逻辑'''
  245. # self.loop.create_task(utils.ding(f"{self.acct_name} Dummy System 启动", 1, self.params.webhook, self.params.proxy))
  246. await asyncio.sleep(5)
  247. push_time = utils.DUMMY_EARLY_STOP_SECOND * 0.5
  248. start_time = time.time()
  249. while 1:
  250. try:
  251. ####
  252. await asyncio.sleep(60)
  253. # 检查风控
  254. self.check_risk()
  255. # 打印回测结果
  256. self.print_backtest_results()
  257. # 参数调优
  258. self.choose_params()
  259. # 发送钉钉
  260. if time.time() - start_time > push_time:
  261. await utils._post_params(
  262. "http://wwww.khods.com:8888/post_dummy_params",
  263. self.params.proxy,
  264. json.dumps({
  265. "exchange":self.params.exchange,
  266. "pair":self.params.pair,
  267. "open":self.base_open,
  268. "close":self.base_close,
  269. "refexchange":self.params.refexchange,
  270. "refpair":self.params.refpair,
  271. "profit":self.base_profit
  272. })
  273. )
  274. except Exception as e:
  275. print("定时循环系统出错"+str(e))
  276. self.logger.error(traceback.print_exc())
  277. await asyncio.sleep(10)
  278. async def early_stop_loop(self):
  279. '''判断是否需要早停'''
  280. while 1:
  281. try:
  282. # 1
  283. await asyncio.sleep(utils.DUMMY_EARLY_STOP_SECOND)
  284. # 2
  285. if self.base_profit <= 0.0:
  286. self.exit_msg = "触发早停条件"
  287. self.stop()
  288. except:
  289. self.logger.error(traceback.format_exc())
  290. def get_all_market_data(self):
  291. '''
  292. 只能定时触发
  293. 组合市场信息=交易盘口+参考盘口1+参考盘口2...
  294. '''
  295. market = []
  296. data = self.ws._get_data()["data"]
  297. if data == []:return None
  298. market += data
  299. for name in self.ref_names:
  300. data = self.wss[name]._get_data()["data"]
  301. if data == []:return None
  302. market += data
  303. return market
  304. async def run_stratey(self):
  305. '''定期触发策略'''
  306. print('定时触发器启动')
  307. # 准备交易
  308. try:
  309. await asyncio.sleep(10)
  310. while 1:
  311. await asyncio.sleep(self.interval)
  312. ### 是否准备充分
  313. if self.ready:
  314. ### 更新市场数据
  315. all_market = self.get_all_market_data()
  316. ### 更新预测值
  317. self.Predictor.onTime(all_market)
  318. self.tradeMsg.market = all_market
  319. ### 更新交易数据
  320. self.update_trade_msg()
  321. ### 更新参考价格
  322. self.tradeMsg.ref_price = self.Predictor.Get_ref(self.get_all_tickers())
  323. self.real_time_back_test(self.tradeMsg)
  324. else:
  325. self.check_ready()
  326. except Exception as e:
  327. print(e)
  328. self.logger.error(e)
  329. traceback.print_exc()
  330. await asyncio.sleep(10)
  331. def check_ready(self):
  332. '''
  333. 判断初始数据是否齐全
  334. '''
  335. ### 检查 ticker 行情
  336. # for m in self.ref_names:
  337. # if m not in self.tickers:
  338. # return
  339. # else:
  340. # if self.tickers[m]['bp'] == 0 or self.tickers[m]['ap'] == 0:
  341. # return
  342. # else:
  343. # print('ref ticker 未准备好')
  344. # if self.trade_name not in self.tickers:
  345. # return
  346. # else:
  347. # if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0:
  348. # return
  349. # else:
  350. # print('trade ticker 未准备好')
  351. ### 检查 market 行情
  352. all_market = self.get_all_market_data()
  353. if len(all_market) != utils.LEN*(1+len(self.ref_names)):
  354. self.logger.error("聚合行情未准备好")
  355. return
  356. else:
  357. # 如果行情已经就绪 预热trademsg和predictor
  358. self.tradeMsg.market = all_market
  359. self.Predictor.onTime(all_market)
  360. self.ready = 1
  361. async def server_handle(self, request):
  362. '''中控数据接口'''
  363. return web.Response(body=json.dumps({
  364. "wallet_balance":1+self.base_profit,
  365. "cross_wallet_balance":0,
  366. "unrealized_pn_l":0,
  367. "position_amount":0,
  368. "entry_price":0,
  369. "accumulated_realized":0,
  370. "now_price":(self.tickers[self.trade_name]['bp']+self.tickers[self.trade_name]['ap'])*0.5,
  371. }))
  372. async def _run_server(self):
  373. print('server正在启动...')
  374. app = web.Application()
  375. app.router.add_route('GET', '/account', self.server_handle)
  376. try:
  377. self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False))
  378. except:
  379. self.logger.error(f"Server启动失败")
  380. self.logger.error(traceback.format_exc())
  381. self.exit_msg = "服务启动失败 停机退出"
  382. self.stop()
  383. def stop(self):
  384. self.logger.info(f'进入停机流程...')
  385. self.loop.create_task(self.exit(delay=1))
  386. def run(self):
  387. '''启动ws行情获取'''
  388. tasks = []
  389. # 使用全市场行情
  390. for i in self.wss:
  391. tasks.append(asyncio.ensure_future(self.wss[i].run()))
  392. # 策略
  393. for i in [
  394. asyncio.ensure_future(self.ws.run(is_auth=0, sub_trade=1)),
  395. asyncio.ensure_future(self.run_stratey()),
  396. asyncio.ensure_future(self.on_timer()),
  397. asyncio.ensure_future(self.early_stop_loop()),
  398. asyncio.ensure_future(self._run_server()),
  399. ]:
  400. tasks.append(i)
  401. def keyboard_interrupt(s, f):
  402. self.logger.info("收到退出信号 准备关机")
  403. self.stop()
  404. try:
  405. signal.signal(signal.SIGINT, keyboard_interrupt)
  406. signal.signal(signal.SIGTERM, keyboard_interrupt)
  407. if 'win' not in sys.platform:
  408. signal.signal(signal.SIGKILL, keyboard_interrupt)
  409. signal.signal(signal.SIGQUIT, keyboard_interrupt)
  410. except:
  411. pass
  412. self.loop.run_until_complete(asyncio.wait(tasks))
  413. if __name__ == "__main__":
  414. if 0:
  415. utils.check_auth()
  416. pnum = len(sys.argv)
  417. if pnum > 0:
  418. fname = None
  419. log_file = None
  420. pidnum = None
  421. for i in range(pnum):
  422. print(f"第{i}个参数为:{sys.argv[i]}")
  423. if sys.argv[i] == '-c' or sys.argv[i] == '--c':
  424. fname = sys.argv[i+1]
  425. elif sys.argv[i] == '-h':
  426. print("帮助文档")
  427. elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file':
  428. log_file = sys.argv[i+1]
  429. elif sys.argv[i] == '-num' or sys.argv[i] == '--num':
  430. pidnum = sys.argv[i+1]
  431. elif sys.argv[i] == '-v' or sys.argv[i] == '--v':
  432. print(f"当前版本为 V{VERSION}")
  433. if fname and log_file and pidnum:
  434. print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum}")
  435. date = time.strftime("%Y%m%d", time.localtime())
  436. logname = f"{log_file}-{date}"
  437. quant = Dummy(utils.get_params(fname), logname)
  438. quant.run()
  439. elif fname:
  440. print(f"运行指定配置文件{fname}")
  441. quant = Dummy(utils.get_params(fname))
  442. quant.run()
  443. else:
  444. print("缺少指定参数 运行默认配置文件")
  445. fname = 'config_dummy.toml'
  446. quant = Dummy(utils.get_params(fname))
  447. quant.run()
  448. else:
  449. fname = 'config_dummy.toml'
  450. quant = Dummy(utils.get_params(fname))
  451. quant.run()