quant.py 69 KB


  1. import asyncio
  2. from aiohttp import web
  3. import traceback
  4. import time, csv
  5. import strategy as strategy
  6. import utils
  7. import model
  8. import logging, logging.handlers
  9. import signal
  10. import os, json, sys
  11. import predictor
  12. import backtest
  13. import multiprocessing
  14. import random
  15. import psutil
  16. import ujson
  17. import broker
  18. from decimal import Decimal
  19. from loguru import logger
  20. logger.add("kucoin_limits.log", format="{time} {level} {message}", level="INFO")
  21. VERSION = utils.VERSION
  22. def timeit(func):
  23. def wrapper(*args, **kwargs):
  24. nowTime = time.time()
  25. res = func(*args, **kwargs)
  26. spend_time = time.time() - nowTime
  27. spend_time = round(spend_time * 1000, 5)
  28. print(f'{func.__name__} 耗时 {spend_time} ms')
  29. return res
  30. return wrapper
  31. class Quant:
  32. def __init__(self, params:model.Config, logname="test_logname", father=1):
  33. print('############### 超级无敌韭菜收割机 ################')
  34. print(f'>>> 版本 {VERSION} <<<')
  35. print('*** 当前配置')
  36. self.params = params
  37. for p in self.params.__dict__:
  38. print('***', p, ' => ', getattr(self.params, p))
  39. print('##################################################')
  40. self.logger = self.get_logger(logname)
  41. self.csvname = logname + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  42. pid = os.getpid()
  43. self.pid_start_time = time.time()
  44. self.logger.info(f"进程号{pid} 启动时间{self.pid_start_time}")
  45. ##### 绑定cpu
  46. cpu_count = psutil.cpu_count()
  47. print("检测cpu核心负载")
  48. cpu_used_pct = [0.0 for _ in range(cpu_count)]
  49. for _ in range(random.randint(5,15)):
  50. r = psutil.cpu_times_percent(percpu=True)
  51. for i in range(cpu_count):
  52. cpu_used_pct[i] += int(r[i].user)
  53. time.sleep(1)
  54. print(cpu_used_pct)
  55. cpu_id = cpu_used_pct.index(min(cpu_used_pct))
  56. print(f"当前负载最低的cpu为:{cpu_id}")
  57. self.process = psutil.Process(pid)
  58. print(f"核心数{cpu_count} 目标绑定cpu:{cpu_id}")
  59. os.system(f"taskset -cp {cpu_id} {pid}")
  60. print("调整系统调度优先级为最高等级")
  61. if 'win' not in sys.platform:
  62. print(os.nice(-20))
  63. #### cpu 内存 平均占用
  64. self.cpu_ema = 0.0
  65. self.mm_ema = 0.0
  66. #####
  67. self.acct_name = self.params.account_name
  68. self.symbol = self.params.pair
  69. self.base = self.params.pair.split('_')[0].upper()
  70. self.quote = self.params.pair.split('_')[1].upper()
  71. if 1:
  72. ### 使用uvloop
  73. if 'win' not in sys.platform:
  74. print('采用高速事件循环库')
  75. import uvloop
  76. self.loop = uvloop.new_event_loop()
  77. else:
  78. print('采用普通事件循环库')
  79. self.loop = asyncio.get_event_loop()
  80. else:
  81. ### 使用原生loop
  82. self.loop = asyncio.get_event_loop()
  83. self.strategy = strategy.Strategy(self.params, is_print=1)
  84. ###### 判断启动方式
  85. self.father = father
  86. print(f"父进程标识 {self.father}")
  87. ##### 现货底仓
  88. hold_coin = float(self.params.hold_coin)
  89. self.hold_coin = utils.clip(hold_coin, 0.0, 10000.0)
  90. ##### 本地状态量
  91. self.data = dict()
  92. self.total_equity = 0.0
  93. self.local_orders = dict() # 本地挂单表
  94. self.local_orders_backup = dict() # 本地订单缓存队列
  95. self.local_orders_backup_cid = [] # 本地订单缓存cid队列
  96. self.handled_orders_cid = [] # 本地已处理cid缓存队列
  97. self.local_profit = 0.0
  98. self.local_cash = 0.0 # 本地U保证金
  99. self.local_coin = 0.0 # 本地币保证金
  100. self.local_position = model.Position()
  101. self.local_position_by_orders = model.Position()
  102. self.local_buy_amount = 0.0
  103. self.local_sell_amount = 0.0
  104. self.local_buy_value = 0.0
  105. self.local_sell_value = 0.0
  106. self.local_cancel_log = dict()
  107. self.interval = float(self.params.interval)
  108. self.exchange = self.params.exchange
  109. self.tradeMsg = model.TraderMsg()
  110. self.exit_msg = "正常退出"
  111. self.save = int(self.params.save) # 保存行情数据
  112. self.logger.info(f"实时行情数据记录开关:{self.save}")
  113. # 仓位检查结果序列
  114. self.position_check_series = []
  115. # 止损大小
  116. self.stoploss = float(self.params.stoploss)
  117. # 资金使用率
  118. self.used_pct = float(self.params.used_pct) #使用资金比例
  119. # 启停信号 0 表示运行 大于1开始倒计时 1时停机
  120. self.mode_signal = 0
  121. # 交易盘口订单流更新时间
  122. self.trade_order_update_time = time.time()
  123. # onTick触发时间记录
  124. self.on_tick_event_time = time.time()
  125. # 盘口ticker depth信息
  126. self.tickers = dict()
  127. self.depths = dict()
  128. # 行情更新延迟监控
  129. self.market_update_time = dict()
  130. self.market_update_interval = dict()
  131. # 参考盘口名称
  132. refex = self.params.refexchange
  133. refpair = self.params.refpair
  134. if len(refex) != len(refpair):
  135. self.logger.error("参考盘口数不等于参考品种数 退出")
  136. raise Exception("参考盘口数不等于参考品种数 退出")
  137. self.ref_num = len(refex)
  138. self.ref_name = []
  139. for i in range(self.ref_num):
  140. if refex[i] not in broker.exchange_lists:
  141. self.logger.error("出现不支持的参考盘口")
  142. raise Exception("出现不支持的参考盘口")
  143. name = refex[i] + '@' + refpair[i] + '@ref'
  144. self.ref_name.append(name)
  145. self.tickers[name] = dict()
  146. self.depths[name] = list()
  147. self.market_update_time[name] = 0.0
  148. self.market_update_interval[name] = 0.0
  149. # 参考盘口tick更新时间
  150. # 服务器私有ip地址检查
  151. ipList = utils.get_local_ip_list()
  152. ipListNum = len(ipList)
  153. if int(self.params.ip) >= ipListNum:
  154. raise Exception("指定私有ip地址序号不存在")
  155. # 创建ws实例
  156. name = self.exchange+'@'+self.params.pair
  157. self.trade_name = name
  158. self.market_update_time[name] = 0.0
  159. self.market_update_interval[name] = 0.0
  160. self.tickers[name] = dict()
  161. self.depths[name] = list()
  162. cp = model.ClientParams()
  163. cp.name = self.trade_name
  164. cp.pair = self.params.pair
  165. cp.access_key = self.params.access_key
  166. cp.secret_key = self.params.secret_key
  167. cp.pass_key = self.params.pass_key
  168. cp.interval = self.params.interval
  169. cp.broker_id = self.params.broker_id
  170. cp.debug = self.params.debug
  171. cp.proxy = self.params.proxy
  172. cp.interval = self.params.interval
  173. cp.ip = int(self.params.ip)
  174. self.ws = broker.newWs(self.exchange)(
  175. params=cp,
  176. colo=int(self.params.colo),
  177. is_print=0,
  178. )
  179. self.ws.logger = self.logger
  180. self.ready = 0
  181. # rest实例
  182. self.rest = broker.newRest(self.exchange)(cp, colo=int(self.params.colo))
  183. self.ws_ref = dict()
  184. # 参考盘口 ws 实例
  185. for i in range(self.ref_num):
  186. cp = model.ClientParams()
  187. cp.name = self.ref_name[i]
  188. cp.pair = self.params.refpair[i]
  189. cp.proxy = self.params.proxy
  190. cp.interval = self.params.interval
  191. cp.ip = int(self.params.ip)
  192. exchange = self.params.refexchange[i]
  193. if exchange not in broker.exchange_lists:
  194. self.logger.error("参考盘口名称错误 退出")
  195. return
  196. _colo = 0
  197. if self.params.refexchange[i] == self.params.exchange and \
  198. self.params.refpair[i] == self.params.pair and int(self.params.colo):
  199. _colo = 1
  200. self.ws_ref[self.ref_name[i]] = broker.newWs(exchange)(cp, colo=_colo)
  201. self.ws_ref[self.ref_name[i]].callback['onTicker']=self.update_ticker
  202. self.ws_ref[self.ref_name[i]].callback['onDepth']=self.update_depth
  203. self.ws_ref[self.ref_name[i]].logger = self.logger
  204. # 添加回调
  205. self.ws.callback = {
  206. 'onTicker':self.update_ticker,
  207. 'onDepth':self.update_depth,
  208. 'onPosition':self.update_position,
  209. 'onEquity':self.update_equity,
  210. 'onOrder':self.update_order,
  211. 'onExit':self.update_exit,
  212. }
  213. self.rest.callback = {
  214. 'onTicker':self.update_ticker,
  215. 'onDepth':self.update_depth,
  216. 'onPosition':self.update_position,
  217. 'onEquity':self.update_equity,
  218. 'onOrder':self.update_order,
  219. 'onExit':self.update_exit,
  220. }
  221. self.rest.logger = self.logger
  222. # 配置策略
  223. self.strategy.logger = self.logger
  224. # 配置定价模型
  225. price_alpha = []
  226. for i in self.params.refpair:
  227. # 交易1000shib 参考 shib
  228. if '1000' in self.params.pair and '1000' not in i:
  229. price_alpha.append(1000.0)
  230. # 交易shib 参考 1000shib
  231. elif '1000' not in self.params.pair and '1000' in i:
  232. price_alpha.append(0.001)
  233. else:
  234. # 交易shib 参考 shib
  235. price_alpha.append(1.0)
  236. self.logger.info(f'价格系数{price_alpha}')
  237. self.Predictor = predictor.Predictor(ref_name=self.ref_name, alpha=price_alpha, gamma=float(self.params.gamma))
  238. # 初始化参数
  239. self.strategy.trade_open_dist = float(self.params.open)
  240. self.strategy.trade_close_dist = float(self.params.close)
  241. # 在线训练
  242. self.backtest = int(self.params.backtest)
  243. self.logger.info(f'在线训练开关 {self.backtest}')
  244. ####
  245. time.sleep(3)
  246. def get_logger(self, logname):
  247. '''日志模块'''
  248. logger = logging.getLogger(__name__)
  249. # log flag
  250. if int(self.params.log):
  251. log_level = logging.DEBUG
  252. logger.setLevel(log_level)
  253. # log to txt
  254. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  255. if logname == None: logname = "log"
  256. handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8')
  257. handler.setLevel(log_level)
  258. handler.setFormatter(formatter)
  259. # log to console
  260. console = logging.StreamHandler()
  261. console.setLevel(logging.INFO)
  262. # add
  263. logger.addHandler(handler)
  264. logger.addHandler(console)
  265. else:
  266. log_level = logging.INFO
  267. logger.setLevel(log_level)
  268. # log to txt
  269. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  270. if logname == None: logname = "log"
  271. handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*10,encoding='utf-8')
  272. handler.setLevel(log_level)
  273. handler.setFormatter(formatter)
  274. # add
  275. logger.addHandler(handler)
  276. logger.info('开启日志记录')
  277. return logger
  278. def update_order(self, data):
  279. self.loop.create_task(self._update_order(data))
  280. async def _update_order(self, data):
  281. '''
  282. 更新订单
  283. 首先直接复写本地订单
  284. 1、如果是开仓单
  285. 如果新增: 增加本地订单
  286. 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按已成交量发送平仓订单 修改本地仓位
  287. 如果成交: 删除本地订单 发送平仓订单 修改本地仓位
  288. 2、如果是平仓单
  289. 如果新增: 增加本地订单
  290. 如果取消: 删除本地订单 查看是否完全成交 如果是部分成交 则按未成交量发送平仓订单 修改本地仓位
  291. 如果成交: 删除本地订单 修改本地仓位
  292. NEW 可以从 ws / rest 来
  293. REMOVE 主要从 ws 来 必须包含 filled 和 filled_price 用于本地仓位推算 定期rest查过旧订单
  294. 为了防止下单失败依然有订单成交 本地需要做一个缓存
  295. '''
  296. try:
  297. # 触发订单更新
  298. self.trade_order_update_time = time.time()
  299. # 新增订单推送 仅需要cid oid信息
  300. if data['status'] == 'NEW':
  301. # 更新oid信息 更新订单 loceltime信息(尤其是查单返回new的情况 必须更新 否则会误触发风控)
  302. if data['client_id'] in self.local_orders:
  303. self.local_orders[data['client_id']]["order_id"] = data['order_id']
  304. self.local_orders[data['client_id']]["localtime"] = time.time()
  305. # 完成订单推送 仅需要cid filled filled_size信息
  306. elif data['status'] == 'REMOVE':
  307. # 如果在撤单记录中 说明此订单结束生命周期 可以移除记录
  308. if data["client_id"] in self.local_cancel_log:
  309. del(self.local_cancel_log[data["client_id"]])
  310. # 在cid缓存队列中 说明是本策略的订单
  311. if data["client_id"] in self.local_orders_backup:
  312. # 不在已处理cid缓存队列中 说明还没参与过仓位计算 则执行订单计算
  313. if data['client_id'] not in self.handled_orders_cid:
  314. # 添加进已处理队列
  315. self.handled_orders_cid.append(data["client_id"])
  316. # 提取成交信息 方向 价格 量
  317. filled = data["filled"]
  318. side = self.local_orders_backup[data['client_id']]["side"]
  319. if "filled_price" in data:
  320. if data["filled_price"] > 0.0:
  321. filled_price = data["filled_price"]
  322. else:
  323. filled_price = self.local_orders_backup[data['client_id']]["price"]
  324. else:
  325. filled_price = self.local_orders_backup[data['client_id']]["price"]
  326. # 只有开仓成交才触发onPosition
  327. # 如果漏推送 rest补充的订单查询信息过来 可能会导致 kd kk 推送出现计算分母为0的情况
  328. if filled > 0:
  329. if "spot" in self.exchange:# 如果是现货交易 还需要修改equity
  330. ### 现货必须考虑fee 买入fee单位为币 卖出fee单位为u
  331. fee = data["fee"]
  332. ### 现货订单流仓位计算
  333. if side == "kd": # buy
  334. self.local_buy_amount += filled - fee
  335. self.local_buy_value += (filled - fee) * filled_price
  336. new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)) - Decimal(str(fee)))
  337. if new_long_pos == 0.0:
  338. self.local_position_by_orders.longAvg = 0.0
  339. self.local_position_by_orders.longPos = 0.0
  340. else:
  341. self.local_position_by_orders.longAvg = \
  342. (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos
  343. self.local_position_by_orders.longPos = new_long_pos
  344. self.local_cash -= filled * filled_price
  345. self.local_coin += filled - fee
  346. elif side == "pd": # sell
  347. self.local_sell_amount += filled
  348. self.local_sell_value += filled * filled_price
  349. self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg)
  350. new_long_pos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled)))
  351. if new_long_pos == 0.0:
  352. self.local_position_by_orders.longAvg = 0.0
  353. self.local_position_by_orders.longPos = 0.0
  354. else:
  355. self.local_position_by_orders.longPos = new_long_pos
  356. self.local_cash += filled * filled_price - fee
  357. self.local_coin -= filled
  358. elif side == "pk": # buy
  359. self.local_buy_amount += filled - fee
  360. self.local_buy_value += (filled - fee) * filled_price
  361. self.local_profit += filled * (self.local_position_by_orders.shortAvg - filled_price)
  362. new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)) - Decimal(str(fee)))
  363. if new_short_pos == 0.0:
  364. self.local_position_by_orders.shortAvg = 0.0
  365. self.local_position_by_orders.shortPos = 0.0
  366. else:
  367. self.local_position_by_orders.shortPos = new_short_pos
  368. self.local_cash -= filled * filled_price
  369. self.local_coin += filled - fee
  370. elif side == "kk": # sell
  371. self.local_sell_amount += filled
  372. self.local_sell_value += filled * filled_price
  373. new_short_pos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled)))
  374. if new_short_pos == 0.0:
  375. self.local_position_by_orders.shortAvg = 0.0
  376. self.local_position_by_orders.shortPos = 0.0
  377. else:
  378. self.local_position_by_orders.shortAvg = \
  379. (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos
  380. self.local_position_by_orders.shortPos = new_short_pos
  381. self.local_cash += filled * filled_price - fee
  382. self.local_coin -= filled
  383. else:
  384. self.logger.error(f"错误的仓位方向{side}")
  385. else:
  386. ### 合约订单流仓位计算
  387. if side == "kd":
  388. self.local_buy_amount += filled
  389. self.local_buy_value += filled * filled_price
  390. new_long_pos = (self.local_position_by_orders.longPos + filled)
  391. if new_long_pos == 0.0:
  392. self.local_position_by_orders.longAvg = 0
  393. self.local_position_by_orders.longPos = 0
  394. else:
  395. self.local_position_by_orders.longAvg = \
  396. (self.local_position_by_orders.longPos * self.local_position_by_orders.longAvg + filled * filled_price) / new_long_pos
  397. self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) + Decimal(str(filled)))
  398. elif side == "kk":
  399. self.local_sell_amount += filled
  400. self.local_sell_value += filled * filled_price
  401. new_short_pos = (self.local_position_by_orders.shortPos + filled)
  402. if new_short_pos == 0.0:
  403. self.local_position_by_orders.shortAvg = 0
  404. self.local_position_by_orders.shortPos = 0
  405. else:
  406. self.local_position_by_orders.shortAvg = \
  407. (self.local_position_by_orders.shortPos * self.local_position_by_orders.shortAvg + filled * filled_price) / new_short_pos
  408. self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) + Decimal(str(filled)))
  409. elif side == "pd":
  410. self.local_sell_amount += filled
  411. self.local_sell_value += filled * filled_price
  412. self.local_profit += filled * (filled_price - self.local_position_by_orders.longAvg)
  413. self.local_position_by_orders.longPos = float(Decimal(str(self.local_position_by_orders.longPos)) - Decimal(str(filled)))
  414. if self.local_position_by_orders.longPos == 0:self.local_position_by_orders.longAvg = 0
  415. elif side == "pk":
  416. self.local_buy_amount += filled
  417. self.local_buy_value += filled * filled_price
  418. self.local_profit += filled * (self.local_position_by_orders.shortAvg-filled_price)
  419. self.local_position_by_orders.shortPos = float(Decimal(str(self.local_position_by_orders.shortPos)) - Decimal(str(filled)))
  420. if self.local_position_by_orders.shortPos == 0:self.local_position_by_orders.shortAvg = 0
  421. else:
  422. self.logger.error(f"错误的仓位方向{side}")
  423. # 统计合约交易手续费 正fee为扣手续费 负fee为返佣
  424. if 'fee' in data:
  425. self.local_profit -= data['fee']
  426. self.logger.debug('更新推算仓位'+str(self.local_position_by_orders.__dict__))
  427. ###
  428. self._print_local_trades_summary()
  429. # 每次有订单变动就触发一次策略
  430. if self.mode_signal == 0 and self.ready:
  431. ### 更新交易数据
  432. self.update_trade_msg()
  433. ### 触发策略挂单逻辑
  434. # 更新策略时间
  435. self.strategy.local_time = time.time()
  436. orders = self.strategy.onTime(self.tradeMsg)
  437. ### 记录指令触发信息
  438. if self._not_empty(orders):
  439. self.logger.debug("触发onOrder")
  440. self._update_local_orders(orders)
  441. self.loop.create_task(self.rest.handle_signals(orders))
  442. self.logger.debug(orders)
  443. else:
  444. self.logger.debug(f"订单已经参与过仓位计算 拒绝重复进行计算{data['client_id']}")
  445. else:
  446. self.logger.debug(f"订单不属于本策略 拒绝进行仓位计算{data['client_id']}")
  447. # 移除本地订单
  448. if data["client_id"] in self.local_orders:
  449. self.logger.debug(['删除本地订单', data["client_id"]])
  450. del(self.local_orders[data["client_id"]])
  451. else:
  452. self.logger.debug(['该订单不在本地挂单表中', data["client_id"]])
  453. else:
  454. print(data)
  455. self.logger.debug(f"未知的订单事件类型 {data}")
  456. except Exception as e:
  457. print("处理订单推送出错:"+str(e))
  458. self.logger.error("处理订单推送出错:"+str(e))
  459. self.logger.error(traceback.format_exc())
  460. self.exit_msg="处理订单推送出错"
  461. self.stop()
  462. def _update_local_orders(self, orders):
  463. """
  464. 本地记录所有报单信息
  465. """
  466. try:
  467. for i in orders:
  468. if "Limits" in i:
  469. for j in orders[i]:
  470. order_info = dict()
  471. order_info['symbol'] = self.symbol
  472. order_info['amount'] = float(j[0])
  473. order_info['side'] = j[1]
  474. order_info['price'] = float(j[2])
  475. order_info['client_id'] = j[3]
  476. order_info['filled_price'] = 0
  477. order_info['filled'] = 0
  478. order_info['order_id'] = ""
  479. order_info['localtime'] = self.strategy.local_time
  480. order_info['createtime'] = self.strategy.local_time
  481. self.local_orders[j[3]] = order_info # 本地挂单表
  482. self.logger.debug(['新增本地订单', order_info])
  483. self.local_orders_backup[j[3]] = order_info # 本地缓存表
  484. self.local_orders_backup_cid.append(j[3]) # 本地缓存cid表
  485. if 'Cancel' in i:
  486. # 记录撤单次数
  487. cid = orders[i][0]
  488. if cid in self.local_cancel_log:
  489. self.local_cancel_log[cid] += 1
  490. else:
  491. self.local_cancel_log[cid] = 0
  492. # 清除过于久远的历史记录
  493. if len(self.local_orders_backup_cid) > 9999:
  494. cid = self.local_orders_backup_cid[0]
  495. # 判断是否超过1个小时 如果超过则移除历史记录
  496. if cid in self.local_orders_backup:
  497. if time.time() - self.local_orders_backup[cid]["localtime"] > 3600:
  498. del(self.local_orders_backup[cid])
  499. del(self.local_orders_backup_cid[0])
  500. if len(self.handled_orders_cid) > 9999:
  501. del(self.handled_orders_cid[0])
  502. except:
  503. self.logger.error("本地记录订单信息出错")
  504. self.logger.error(traceback.format_exc())
  505. self.exit_msg="本地记录订单信息出错"
  506. self.stop()
  507. def _not_empty(self, orders):
  508. '''检查指令是否不为空'''
  509. if isinstance(orders, dict):
  510. for order_name in orders:
  511. if "Cancel" in order_name or "Check" in order_name:
  512. return 1
  513. elif "Limits_open" in order_name:
  514. if len(orders["Limits_open"]) > 0:
  515. return 1
  516. elif "Limits_close" in order_name:
  517. if len(orders["Limits_close"]) > 0:
  518. return 1
  519. return 0
  520. def _print_local_trades_summary(self):
  521. '''计算本地累计利润'''
  522. ###
  523. local_buy_amount = round(self.local_buy_amount,5)
  524. local_buy_value = round(self.local_buy_value,5)
  525. local_sell_amount = round(self.local_sell_amount,5)
  526. local_sell_value = round(self.local_sell_value,5)
  527. local_profit = 0.0
  528. if isinstance(self.strategy.mp, float):
  529. unrealized = (local_buy_amount - local_sell_amount) * self.strategy.mp
  530. realized = local_sell_value - local_buy_value
  531. local_profit = round(unrealized+realized,5)
  532. self.strategy.local_profit = local_profit
  533. ###
  534. msg = f"买量{local_buy_amount} 卖量{local_sell_amount} 买额{local_buy_value} 卖额{local_sell_value} 利润 {local_profit}"
  535. self.logger.info(msg)
  536. def update_position(self, data):
  537. '''
  538. 更新仓位信息
  539. '''
  540. if data != self.local_position:
  541. self.local_position = data
  542. self.logger.debug('更新本地仓位'+str(self.local_position.__dict__))
  543. """
  544. 2023-2-22
  545. 用create_task去执行,会延迟,占用越大,延迟越大,可能会延迟100ms计算
  546. """
  547. def update_ticker(self, data):
  548. '''
  549. 增加onticker撤单 可能会导致平仓难度加大
  550. '''
  551. self.loop.create_task(self._update_ticker(data))
  552. def update_depth(self, data):
  553. self.loop.create_task(self._update_depth(data))
  554. async def _update_ticker(self, data):
  555. '''
  556. update ticker infomation
  557. '''
  558. name = data['name']
  559. # 记录tick更新时间
  560. # self.market_update_time[name] = time.time()
  561. self.tickers[name] = data
  562. ### 判断是否需要触发ontick
  563. if name == self.ref_name[self.strategy.ref_index]:
  564. pass
  565. elif name == self.trade_name:
  566. pass
  567. else:
  568. pass
  569. # @utils.timeit
  570. async def _update_depth(self, data):
  571. # logger.info(data)
  572. '''
  573. update orderbook infomation
  574. '''
  575. name = data['name']
  576. now_time = time.time()
  577. if self.market_update_time[name] == 0.0:
  578. pass
  579. else:
  580. interval = now_time - self.market_update_time[name]
  581. if self.market_update_interval[name] == 0.0:
  582. self.market_update_interval[name] = interval
  583. else:
  584. self.market_update_interval[name] = self.market_update_interval[name]*0.999 + interval*0.001
  585. self.market_update_time[name] = now_time
  586. ### 初始化depths
  587. if self.depths[name] == list():
  588. self.depths[name] = data['data']
  589. ### 判断是否需要触发ondepth
  590. # 如果是交易盘口
  591. if name == self.trade_name:
  592. ### 更新depths
  593. self.depths[name] = data['data']
  594. # 允许交易
  595. if self.mode_signal == 0 and self.ready:
  596. ### 聚合行情处理
  597. self.on_agg_market()
  598. ### 判断是否为当前跟踪的盘口
  599. elif name == self.ref_name[self.strategy.ref_index]:
  600. ### 判断是否需要触发ontick 对行情进行过滤
  601. ### 过滤条件 价格变化很大 时间间隔很长
  602. flag = 0
  603. if abs(data['data'][utils.BP_INDEX] - self.depths[name][utils.BP_INDEX])/data['data'][utils.BP_INDEX] > 0.0002 or \
  604. abs(data['data'][utils.AP_INDEX] - self.depths[name][utils.AP_INDEX])/data['data'][utils.AP_INDEX] > 0.0002 or \
  605. time.time() - self.on_tick_event_time > 0.05:
  606. ### 允许交易
  607. flag = 1
  608. ### 更新ontick触发时间记录
  609. self.on_tick_event_time = time.time()
  610. ### 更新depths
  611. self.depths[name] = data['data']
  612. # 允许交易
  613. if self.mode_signal == 0 and self.ready and flag:
  614. ### 更新交易数据
  615. self.update_trade_msg()
  616. ### 触发事件撤单逻辑
  617. # 更新策略时间
  618. self.strategy.local_time = time.time()
  619. # 产生交易信号
  620. orders = self.strategy.onTime(self.tradeMsg)
  621. ### 记录指令触发信息
  622. if self._not_empty(orders):
  623. self.logger.debug("触发onTick")
  624. self._update_local_orders(orders)
  625. self.loop.create_task(self.rest.handle_signals(orders))
  626. self.logger.debug(orders)
  627. else:
  628. pass
  629. # @timeit
  630. async def real_time_back_test(self, data):
  631. '''
  632. 按照长短期回测利润选择参数
  633. 优先按长期回测利润选参数 如果找不到就
  634. 再按短期回测利润选参数 如果还找不到就
  635. 使用默认参数 如果默认参数亏损就触发冷静期
  636. '''
  637. now_time = time.time()
  638. await asyncio.sleep(0.005)
  639. for i in self.backtest_tasks:
  640. i["backtest_engine"].backtest_time = now_time
  641. i["backtest_engine"].run_by_tick(data)
  642. def choose_params(self):
  643. '''
  644. 按照长短期回测利润选择参数
  645. 优先按长期回测利润选参数 如果找不到就
  646. 再按短期回测利润选参数 如果还找不到就
  647. 使用默认参数 如果默认参数亏损就触发冷静期
  648. '''
  649. profits = []
  650. for i in self.backtest_tasks:
  651. # 获取绩效信息
  652. e = i["backtest_engine"].equity # 最终净值
  653. # 计算标准化利润
  654. p = (e-self.backtest_start_cash) / self.backtest_start_cash \
  655. / self.backtest_look_length * self.tick_profit_to_daily
  656. # 有一定成交次数的回测结果才有代表性 持仓太久的参数禁止使用
  657. _trade_num = i['backtest_engine'].trade_num
  658. _avg_hold_time = i['backtest_engine'].avg_hold_time
  659. _equity_high = i['backtest_engine'].equity_high
  660. # 排除交易次数太少的参数
  661. if i['open'] <= 0.002:
  662. if _trade_num < 10:
  663. p = 0.0
  664. # 排除长期持仓的参数
  665. if _avg_hold_time > 600:
  666. p = 0.0
  667. # 排除近期回撤较大的参数
  668. if _equity_high > e*1.01:
  669. p = 0.0
  670. profits.append(p) #利润
  671. ############## 重置回测
  672. # if _trade_num > 200:
  673. # i["backtest_engine"].trade_num = 0
  674. # i["backtest_engine"].equity = self.backtest_start_cash
  675. # 盈利参数个数不能太少 防止孤岛参数
  676. win_num = 0
  677. for i in profits:
  678. if i > 0.0:
  679. win_num += 1
  680. cond1 = win_num > self.backtest_num*0.1
  681. cond2 = win_num > 2
  682. cond_win = cond1 and cond2
  683. if cond_win:
  684. # 按最优回测结果调整参数
  685. max_profit = max(profits)
  686. max_index = profits.index(max_profit)
  687. self.strategy.trade_open_dist = self.backtest_tasks[max_index]["open"]
  688. self.strategy.trade_close_dist = self.backtest_tasks[max_index]["close"]
  689. self.strategy.ref_index = self.backtest_tasks[max_index]["index"]
  690. self.strategy.post_side = self.backtest_tasks[max_index]["side"]
  691. self.strategy.predict_alpha = self.backtest_tasks[max_index]["alpha"]
  692. # 检查是否需要关闭回测
  693. # if self.strategy.ready == 1:
  694. # self.backtest = 0
  695. else:
  696. # 如果没有符合条件的盈利参数
  697. self.strategy.trade_open_dist = 0.01
  698. self.strategy.trade_close_dist = 0.00001
  699. self.strategy.ref_index = 0
  700. self.strategy.post_side = 0
  701. self.strategy.predict_alpha = 0
  702. # 检查是否需要关闭回测
  703. # if self.strategy.ready == 1:
  704. # self.backtest = 0
  705. # self.exit_msg = "未找到合适参数 停机"
  706. # self.stop()
  707. return
  708. def update_equity(self, data):
  709. '''
  710. 更新保证金信息
  711. 合约一直更新
  712. 现货只有当出现异常时更新
  713. '''
  714. if "spot" in self.exchange:
  715. pass
  716. else:
  717. self.local_cash = data[self.quote] * self.used_pct
  718. def update_exit(self, data):
  719. '''
  720. 底层触发停机
  721. '''
  722. self.exit_msg = data
  723. self.stop()
  724. def get_all_market_data(self):
  725. '''
  726. 只能定时触发
  727. 组合市场信息=交易盘口+参考盘口
  728. '''
  729. market = []
  730. data = self.ws._get_data()["data"]
  731. market += data
  732. for i in self.ref_name:
  733. data = self.ws_ref[i]._get_data()["data"]
  734. market += data
  735. # handle save real market data
  736. if self.save:
  737. with open(f'./{self.csvname}.csv',
  738. 'a',
  739. newline='',
  740. encoding='utf-8') as f:
  741. writer = csv.writer(f, delimiter=',')
  742. writer.writerow(market)
  743. return market
  744. async def before_trade(self):
  745. ####### 启动ws #######
  746. # 启动交易ws
  747. # 当开启回测时才订阅交易盘口的成交流
  748. _sub_trade = int(self.params.backtest)
  749. _sub_fast = int(self.params.fast)
  750. self.loop.create_task(self.ws.run(is_auth=1, sub_trade=_sub_trade, sub_fast=0))
  751. for i in self.ref_name:
  752. # 启动参考ws 参考盘口使用fast行情性能消耗更大 使用普通行情可以节省性能
  753. self.loop.create_task(self.ws_ref[i].run(is_auth=0, sub_trade=0, sub_fast=_sub_fast))
  754. await asyncio.sleep(1)
  755. ###### 做交易前准备工作 ######
  756. # 买入平台币
  757. await self.rest.buy_token()
  758. await asyncio.sleep(1)
  759. # 清空挂单和仓位
  760. await self.rest.check_position(hold_coin=self.hold_coin)
  761. await asyncio.sleep(1)
  762. # 获取市场信息
  763. await self.rest.before_trade()
  764. await asyncio.sleep(1)
  765. # 获取价格信息
  766. ticker = await self.rest.get_ticker()
  767. mp = ticker['mp']
  768. # 获取账户信息
  769. await asyncio.sleep(1)
  770. await self.rest.get_equity()
  771. # 初始资金
  772. start_cash = self.rest.cash_value * self.used_pct
  773. start_coin = self.rest.coin_value * self.used_pct
  774. if start_cash == 0.0 and start_coin == 0.0:
  775. self.exit_msg = f"初始为零 cash: {start_cash} coin: {start_coin}"
  776. self.stop()
  777. self.logger.info(f"初始cash: {start_cash} 初始coin: {start_coin}")
  778. # 初始化策略基础信息
  779. if isinstance(mp, float):
  780. if mp <= 0.0:
  781. self.exit_msg = f"初始价格获取错误 {mp}"
  782. self.stop()
  783. else:
  784. print(f"初始价格为 {mp}")
  785. else:
  786. self.exit_msg = f"初始价格获取错误 {mp}"
  787. self.stop()
  788. self.strategy.mp = mp
  789. self.strategy.start_cash = start_cash
  790. self.strategy.start_coin = start_coin
  791. self.strategy.start_equity = start_cash + start_coin * mp
  792. self.strategy.max_equity = self.strategy.start_equity
  793. self.strategy.equity = self.strategy.start_equity
  794. self.strategy.total_amount = self.strategy.equity * self.strategy.leverrate / self.strategy.mp
  795. self.strategy.stepSize = self.rest.stepSize if self.rest.stepSize < 1.0 else int(self.rest.stepSize)
  796. self.strategy.tickSize = self.rest.tickSize if self.rest.tickSize < 1.0 else int(self.rest.tickSize)
  797. if self.strategy.stepSize == None or self.strategy.tickSize == None:
  798. self.exit_msg = f"交易精度未正常获取 stepsize: {self.strategy.stepSize} ticksize: {self.strategy.tickSize}"
  799. self.stop()
  800. else:
  801. self.logger.info(f"数量精度{self.strategy.stepSize}")
  802. self.logger.info(f"价格精度{self.strategy.tickSize}")
  803. grid = float(self.params.grid)
  804. if "spot" in self.exchange:
  805. long_one_hand_value = start_cash * float(self.params.leverrate) / grid
  806. short_one_hand_value = start_coin * mp * float(self.params.leverrate) / grid
  807. long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
  808. short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
  809. else:
  810. long_one_hand_value = start_cash * float(self.params.leverrate) / grid
  811. short_one_hand_value = start_cash * float(self.params.leverrate) / grid
  812. long_one_hand_amount = float(Decimal(str(long_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
  813. short_one_hand_amount = float(Decimal(str(short_one_hand_value / mp//self.strategy.stepSize))*Decimal(str(self.strategy.stepSize)))
  814. # 检查是否满足最低交易要求
  815. print(f"最低单手交易下单量为 buy: {long_one_hand_amount} sell: {short_one_hand_amount}")
  816. if (long_one_hand_amount == 0 and short_one_hand_amount == 0) or (long_one_hand_value < 20 and short_one_hand_value < 20):
  817. self.exit_msg = f"初始下单量太少 buy: {long_one_hand_amount} sell: {short_one_hand_amount}"
  818. self.stop()
  819. # 初始化调度器
  820. self.local_cash = start_cash
  821. self.local_coin = start_coin
  822. # 配置在线训练
  823. if self.backtest:
  824. # 设置策略默认参数
  825. self.strategy.trade_close_dist = 0.00001
  826. self.strategy.trade_open_dist = 0.01
  827. self.backtest_look_length = 86400 / self.interval # 回测区间足够长
  828. self.backtest_tasks = list()
  829. self.tick_profit_to_daily = (86400/self.interval)
  830. self.backtest_start_cash = 1000000.0
  831. # 备选参数
  832. open_list, close_list, alpha_list = utils.get_backtest_set(self.base)
  833. if 'spot' in self.exchange:
  834. side_list = []
  835. if long_one_hand_amount > 0:
  836. side_list.append(1)
  837. if short_one_hand_amount > 0:
  838. side_list.append(-1)
  839. if 1 in side_list and -1 in side_list:
  840. side_list.append(0)
  841. else:
  842. side_list = [-1,0,1]
  843. side_list_allow = []
  844. for s in side_list:
  845. if s in utils.POST_SIDE_LIMIT:
  846. side_list_allow.append(s)
  847. side_list = side_list_allow
  848. for _open in open_list:
  849. for _side in side_list:
  850. for _close in close_list[_open]:
  851. for _index in range(self.ref_num):
  852. for _alpha in alpha_list:
  853. task = dict()
  854. st = strategy.Strategy(self.params, is_print=0)
  855. st.leverrate = 1.0
  856. st.trade_open_dist = _open
  857. st.trade_close_dist = _close
  858. st.predict_alpha = _alpha
  859. st.ref_index = _index
  860. st.post_side = _side
  861. st.exchange = "dummy_usdt_swap"
  862. st.local_start_time = 0.0
  863. bt = backtest.Backtest(st, is_plot=0)
  864. bt.start_cash = self.backtest_start_cash
  865. task["backtest_engine"] = bt
  866. task["open"] = _open
  867. task["close"] = _close
  868. task["index"] = _index
  869. task["side"] = _side
  870. task["alpha"] = _alpha
  871. self.backtest_tasks.append(task)
  872. backtest_num = len(self.backtest_tasks)
  873. self.backtest_num = backtest_num
  874. self.logger.info(f'在线模拟撮合数量{backtest_num}')
  875. self.logger.info(f'当前为在线训练模式 需预热{utils.BACKTEST_PREHOT_SECOND}秒 请耐心等候...')
  876. else:
  877. self.logger.info('当前为指定参数模式...')
  878. ###### 交易前准备就绪 可以开始交易 ######
  879. self.loop.create_task(self.rest.go())
  880. self.loop.create_task(self.on_timer())
  881. self.loop.create_task(self._run_server())
  882. self.loop.create_task(self.run_stratey())
  883. #self.loop.create_task(self.post_loop()) #改
  884. self.loop.create_task(self.early_stop_loop())
  885. def update_trade_msg(self):
  886. # 更新保证金
  887. self.tradeMsg.cash = round(self.local_cash,10)
  888. self.tradeMsg.coin = round(self.local_coin,10)
  889. # 使用本地推算仓位
  890. self.tradeMsg.position = self.local_position_by_orders
  891. # 更新订单
  892. self.tradeMsg.orders = self.local_orders
  893. ### 更新 ref
  894. ref_tickers = []
  895. for i in self.ref_name:
  896. ref_tickers.append([self.tickers[i]['bp'], self.tickers[i]['ap']])
  897. self.tradeMsg.ref_price = self.Predictor.Get_ref(ref_tickers)
  898. async def server_handle(self, request):
  899. '''中控数据接口'''
  900. if 'spot' in self.exchange:
  901. pos = self.local_position_by_orders.longPos - self.local_position_by_orders.shortPos
  902. else:
  903. pos = self.local_position.longPos - self.local_position.shortPos
  904. if pos > 0.0:
  905. entryPrice = self.local_position_by_orders.longAvg
  906. elif pos < 0.0:
  907. entryPrice = self.local_position_by_orders.shortAvg
  908. else:
  909. entryPrice = 0
  910. return web.Response(body=json.dumps({
  911. "now_balance": round(self.strategy.equity/self.used_pct, 4), #钱包余额
  912. "unrealized_pn_l": round(self.local_profit, 4), #未实现盈利
  913. "pos": round(pos, 8), #持仓数量
  914. "entry_price": round(entryPrice, 8), #开仓价格
  915. "now_price": round(self.strategy.mp, 8), #当前价格
  916. }))
  917. async def change(self, request):
  918. '''中控台修改参数'''
  919. try:
  920. data = await request.json()
  921. if "stop" in data:
  922. self.logger.warning('中控停机')
  923. self.exit_msg = '中控停机'
  924. self.stop()
  925. return web.Response(text=f"停机成功")
  926. ip = request.remote
  927. print(f'从{ip}收到更新参数请求',data)
  928. if isinstance(data, str):
  929. data = json.loads(data)
  930. if self.backtest == 1:
  931. return web.Response(text="自动调参模式不允许手动修改参数")
  932. else:
  933. open = float(data['open'])
  934. close = float(data['close'])
  935. self.strategy.trade_open_dist = open
  936. self.strategy.trade_close_dist = close
  937. return web.Response(text=f"参数修改成功 {open} {close}")
  938. except Exception as e:
  939. return web.Response(text=f"参数修改失败 {e}")
  940. # @utils.timeit
  941. def check_risk(self):
  942. '''检查风控'''
  943. if self.strategy.start_cash == 0.0:
  944. print("请检查交易账户余额")
  945. return 0
  946. if isinstance(self.strategy.mp, float):
  947. pass
  948. else:
  949. print("请检查最新价格")
  950. return 0
  951. ############
  952. # print("当前线程数",self.process.num_threads())
  953. ###### 资源风控0 ######
  954. cpu_pct = psutil.cpu_times_percent().user
  955. self.cpu_ema = self.cpu_ema * 0.8 + cpu_pct * 0.2
  956. # print(f"cpu占用 {cpu_pct}")
  957. if self.cpu_ema > 95:
  958. msg = f"cpu占用过高 {self.cpu_ema} 准备停机"
  959. print(msg)
  960. self.logger.warning(msg)
  961. self.exit_msg = msg
  962. self.stop()
  963. mm_pct = psutil.virtual_memory().percent
  964. self.mm_ema = self.mm_ema * 0.8 + mm_pct * 0.2
  965. # print(f"内存占用 {mm_pct}")
  966. if self.mm_ema > 95:
  967. msg = f"内存占用过高 {self.mm_ema} 准备停机"
  968. print(msg)
  969. self.logger.warning(msg)
  970. self.exit_msg = msg
  971. self.stop()
  972. ###### 回撤风控1 ######
  973. if "spot" not in self.exchange:
  974. draw_back = 1-self.strategy.equity/self.strategy.max_equity
  975. if draw_back > self.stoploss:
  976. msg = f"{self.acct_name} 总资金吊灯回撤{draw_back} 当前{self.strategy.equity} 最高{self.strategy.max_equity} 触发止损 准备停机"
  977. print(msg)
  978. self.logger.warning(msg)
  979. self.exit_msg = msg
  980. self.stop()
  981. ###### 回撤风控2 ######
  982. draw_back = self.local_profit/self.strategy.start_equity
  983. if draw_back < -self.stoploss:
  984. msg = f"{self.acct_name} 交易亏损 触发止损 准备停机"
  985. print(msg)
  986. self.logger.warning(msg)
  987. self.exit_msg = msg
  988. self.stop()
  989. ###### 报单延迟风控 ######
  990. if self.rest.avg_delay > 5000: # 平均延迟允许上限 5000ms
  991. msg = f"{self.acct_name} 延迟爆表 触发风控 准备停机"
  992. print(msg)
  993. self.logger.warning(msg)
  994. self.exit_msg = msg
  995. self.stop()
  996. ###### 仓位异常风控 ######
  997. ### 合约60秒更新一次绝对仓位 ###
  998. # 连续5分钟仓位不正确就停机
  999. # 5 * 60 = 300 300/10 = 30
  1000. diff_pos = max(abs(self.local_position.longPos - self.local_position_by_orders.longPos),abs(self.local_position.shortPos - self.local_position_by_orders.shortPos))
  1001. if "spot" not in self.exchange:
  1002. diff_pos_value = diff_pos * self.strategy.mp
  1003. if diff_pos_value > self.strategy._min_amount_value:
  1004. msg = f"{self.acct_name} ***发现仓位异常*** 推算{self.local_position_by_orders.__dict__} 本地{self.local_position.__dict__}"
  1005. print(msg)
  1006. self.logger.warning(msg)
  1007. self.position_check_series.append(1)
  1008. else:
  1009. self.position_check_series.append(0)
  1010. if len(self.position_check_series) > 30:
  1011. del(self.position_check_series[0])
  1012. if sum(self.position_check_series) >= 30:
  1013. msg = f"{self.acct_name} 合约连续检查本地仓位和推算仓位不相符 退出"
  1014. print(msg)
  1015. self.logger.warning(msg)
  1016. self.exit_msg = msg
  1017. self.stop()
  1018. ###### 下单异常风控 ######
  1019. if self.strategy.total_amount == 0.0:
  1020. msg = f"{self.acct_name} 开仓量为零 退出"
  1021. print(msg)
  1022. self.logger.warning(msg)
  1023. self.exit_msg = msg
  1024. self.stop()
  1025. ###### 行情更新异常风控 ######
  1026. for name in self.ref_name:
  1027. delay = round((time.time() - self.market_update_time[name]) * 1000, 3)
  1028. if delay > utils.MARKET_DELAY_LIMIT: # thre
  1029. msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
  1030. self.logger.error(msg)
  1031. self.exit_msg = msg
  1032. self.stop()
  1033. for name in [self.trade_name]:
  1034. delay = round((time.time() - self.market_update_time[name]) * 1000, 3)
  1035. if delay > utils.MARKET_DELAY_LIMIT: # thre
  1036. msg = f"{self.acct_name} ticker_name:{name} delay:{delay}ms 行情更新延迟过高 退出"
  1037. self.logger.error(msg)
  1038. self.exit_msg = msg
  1039. self.stop()
  1040. ###### 订单异常风控 ######
  1041. for cid in self.local_orders:
  1042. if time.time() - self.local_orders[cid]["localtime"] > 300: # 订单长时间停留 怀疑漏单 但未必一定漏 5min
  1043. msg = f"{self.acct_name} cid:{cid} 订单停留过久 怀疑异常 退出"
  1044. self.logger.error(msg)
  1045. self.exit_msg = msg
  1046. self.stop()
  1047. ###### 持仓均价异常风控 ######
  1048. if isinstance(self.strategy.long_pos_bias, float):
  1049. # 偏离mp较大 且持仓较大 说明出现异常
  1050. if self.strategy.long_hold_value > 2*self.strategy._min_amount_value:
  1051. if self.strategy.long_pos_bias > 4.0 or self.strategy.long_pos_bias < -2.0:
  1052. msg = f"{self.acct_name} long_pos_bias:{self.strategy.long_pos_bias} 持仓均价异常 退出"
  1053. self.logger.error(msg)
  1054. self.exit_msg = msg
  1055. self.stop()
  1056. if isinstance(self.strategy.short_pos_bias, float):
  1057. # 偏离mp较大 且持仓较大 说明出现出现异常
  1058. if self.strategy.short_hold_value > 2*self.strategy._min_amount_value:
  1059. if self.strategy.short_pos_bias > 4.0 or self.strategy.short_pos_bias < -2.0:
  1060. msg = f"{self.acct_name} short_pos_bias:{self.strategy.short_pos_bias} 持仓均价异常 退出"
  1061. self.logger.error(msg)
  1062. self.exit_msg = msg
  1063. self.stop()
  1064. ###### 订单撤单异常风控 ######
  1065. for cid in self.local_cancel_log:
  1066. if self.local_cancel_log[cid] > 300:
  1067. msg = f"{self.acct_name} 订单长时间无法撤销 退出"
  1068. self.logger.error(msg)
  1069. self.exit_msg = msg
  1070. self.stop()
  1071. ###### 定价异常风控 ######
  1072. if abs(self.strategy.ref_price-self.strategy.mp)/self.strategy.mp > 0.03:
  1073. msg = f"{self.acct_name} 定价偏离过大 怀疑异常 退出"
  1074. self.logger.error(msg)
  1075. self.exit_msg = msg
  1076. self.stop()
  1077. async def exit(self, delay=0):
  1078. '''退出操作'''
  1079. try:
  1080. self.logger.info(f"预约退出操作 delay:{delay}")
  1081. if delay > 0:
  1082. await asyncio.sleep(delay)
  1083. self.logger.info(f"开始退出操作")
  1084. self.logger.info("为避免api失效导致遗漏仓位 建议人工复查")
  1085. await self.rest.check_position(hold_coin=self.hold_coin)
  1086. # stop flag
  1087. self.rest.stop_flag = 1
  1088. self.ws.stop_flag = 1
  1089. for i in self.ref_name:
  1090. self.ws_ref[i].stop_flag = 1
  1091. # double check 需要延迟几秒以便等待更新数据
  1092. await asyncio.sleep(3)
  1093. self.logger.info("双重检查遗漏仓位")
  1094. await self.rest.check_position(hold_coin=self.hold_coin)
  1095. self.logger.info(f'停机退出 停机原因 {self.exit_msg}')
  1096. await asyncio.sleep(1)
  1097. # 发送交易状态
  1098. await self._post_params()
  1099. # 压缩行情文件
  1100. utils.csv_to_gz_and_remove()
  1101. # close pid
  1102. self.logger.info("退出进程")
  1103. except:
  1104. self.logger.error(traceback.format_exc())
  1105. finally:
  1106. os._exit(0)
  1107. async def on_timer(self):
  1108. '''定期触发系统逻辑'''
  1109. await asyncio.sleep(20)
  1110. while 1:
  1111. try:
  1112. # 10秒检查一次风控
  1113. await asyncio.sleep(10)
  1114. # 检查风控
  1115. self.check_risk()
  1116. # stop
  1117. if self.mode_signal == 1:return
  1118. # 计算预估成交额
  1119. total_trade_value = self.local_buy_value + self.local_sell_value
  1120. self.strategy.trade_vol_24h = round(total_trade_value / (time.time()-self.pid_start_time) * 86400 / 10000, 2)
  1121. # 打印
  1122. if int(self.params.log):
  1123. self.strategy._print_summary()
  1124. # 打印行情延迟监控
  1125. self.logger.info('Rest 报单平均延迟 ' + str(self.rest.avg_delay) + 'ms ')
  1126. self.logger.info('Rest 报单最高延迟 ' + str(self.rest.max_delay) + 'ms ')
  1127. for name in self.market_update_interval:
  1128. avg_interval = round(self.market_update_interval[name]*1e3, 2)
  1129. self.logger.info(f'WS 盘口{name}行情 平均更新间隔 {avg_interval}ms')
  1130. # 选择参数
  1131. if self.backtest:
  1132. self.choose_params()
  1133. except asyncio.CancelledError:
  1134. print('定期循环任务取消')
  1135. except:
  1136. print("定时循环系统出错")
  1137. self.logger.error(traceback.print_exc())
  1138. await asyncio.sleep(10)
  1139. async def _post_params(self):
  1140. '''推送交易信息'''
  1141. profit = round(self.strategy.daily_return/self.strategy.leverrate,4)
  1142. if time.time() - self.pid_start_time > utils.EARLY_STOP_SECOND * 0.5 or profit < 0.0:
  1143. await utils._post_params(
  1144. "http://wwww.khods.com:8888/post_params",
  1145. self.params.proxy,
  1146. ujson.dumps({
  1147. "pwd":"123456",
  1148. "exchange":self.params.exchange,
  1149. "pair":self.params.pair,
  1150. "open":self.params.open,
  1151. "close":self.params.close,
  1152. "refexchange":self.params.refexchange[self.strategy.ref_index],
  1153. "profit":profit,
  1154. })
  1155. )
  1156. else:
  1157. self.logger.info("不满足推送过滤条件 放弃推送参数")
  1158. async def post_loop(self):
  1159. '''定期触发交易信息推送'''
  1160. await asyncio.sleep(30)
  1161. _interval = 60 # 定期推送一次盈利情况
  1162. while 1:
  1163. try:
  1164. # 定期推送一次
  1165. await asyncio.sleep(_interval)
  1166. # 发送交易状态
  1167. await self._post_params()
  1168. except asyncio.CancelledError:
  1169. print('post loop 循环任务取消')
  1170. except:
  1171. print("post loop 循环系统出错")
  1172. self.logger.error(traceback.print_exc())
  1173. await asyncio.sleep(10)
  1174. async def early_stop_loop(self):
  1175. '''定期触发交易信息推送'''
  1176. if self.father:
  1177. self.logger.info(f'以父进程方式启动 关闭早停检测')
  1178. return
  1179. else:
  1180. self.logger.info(f'以子进程方式启动 开启早停检测')
  1181. await asyncio.sleep(30)
  1182. _interval = utils.EARLY_STOP_SECOND
  1183. _last_equity = self.strategy.start_equity
  1184. _last_local_profit = 0.0
  1185. while 1:
  1186. try:
  1187. # 休眠
  1188. await asyncio.sleep(_interval)
  1189. ###### 子进场早停风控 ######
  1190. self.logger.info(f'当前净值{self.strategy.equity} 上次检测时净值{_last_equity} 当前累积利润{self.local_profit} 上次检测时利润{_last_local_profit}')
  1191. # 检查是否需要早停 没有成交 或者 亏损
  1192. if self.strategy.equity <= _last_equity or self.local_profit <= _last_local_profit:
  1193. self.logger.info('触发早停条件 当零持仓时退出')
  1194. # 没有持仓
  1195. for _ in range(30):
  1196. await asyncio.sleep(5)
  1197. if self.strategy.long_hold_value < self.strategy._min_amount_value and \
  1198. self.strategy.short_hold_value < self.strategy._min_amount_value:
  1199. msg = f"{self.acct_name} 子进程盈利状况不理想 提前停机 退出"
  1200. self.logger.error(msg)
  1201. self.exit_msg = msg
  1202. self.stop()
  1203. # 更新上一次检测的净值
  1204. _last_equity = self.strategy.equity
  1205. _last_local_profit = self.local_profit
  1206. except asyncio.CancelledError:
  1207. print('early stop 循环任务取消')
  1208. except:
  1209. print("early stop 循环系统出错")
  1210. self.logger.error(traceback.print_exc())
  1211. await asyncio.sleep(10)
  1212. def on_agg_market(self):
  1213. '''
  1214. 处理聚合行情
  1215. 1. 获取聚合行情
  1216. 2. 更新预测器
  1217. 3. 触发tick回测
  1218. '''
  1219. ### 更新聚合市场数据
  1220. agg_market = self.get_all_market_data()
  1221. ### 更新聚合市场信息
  1222. self.tradeMsg.market = agg_market
  1223. ### 更新预测器
  1224. self.Predictor.onTime(agg_market)
  1225. ### 触发回测
  1226. if self.backtest:
  1227. self.loop.create_task(self.real_time_back_test(self.tradeMsg))
  1228. async def run_stratey(self):
  1229. '''
  1230. 定期触发策略
  1231. '''
  1232. print('定时触发器启动')
  1233. # 准备交易
  1234. try:
  1235. print('前期准备完成')
  1236. await asyncio.sleep(10)
  1237. while 1:
  1238. try:
  1239. # 时间预设
  1240. start_time = time.time()
  1241. ### 是否准备充分
  1242. if self.ready:
  1243. ### 更新交易信息集合
  1244. self.update_trade_msg()
  1245. ### 触发策略
  1246. if self.mode_signal == 0:
  1247. pass
  1248. # # 更新策略时间
  1249. # self.strategy.local_time = time.time()
  1250. # # 产生信号
  1251. # orders = self.strategy.onTime(self.tradeMsg)
  1252. # ### 记录指令触发信息
  1253. # if self._not_empty(orders):
  1254. # self.logger.debug("触发onTime")
  1255. # self._update_local_orders(orders)
  1256. # self.loop.create_task(self.rest.handle_signals(orders))
  1257. # self.logger.debug(orders)
  1258. else:
  1259. if self.mode_signal > 1:self.mode_signal -= 1
  1260. if self.mode_signal == 1:return
  1261. # 触发策略
  1262. # 更新策略时间
  1263. self.strategy.local_time = time.time()
  1264. # 获取信号
  1265. if self.mode_signal > 20:
  1266. # 先执行onExit
  1267. orders = self.strategy.onExit(self.tradeMsg)
  1268. ### 记录指令触发信息
  1269. if self._not_empty(orders):
  1270. self.logger.debug("触发onExit")
  1271. self._update_local_orders(orders)
  1272. self.loop.create_task(self.rest.handle_signals(orders))
  1273. self.logger.debug(orders)
  1274. else:
  1275. # 再执行onSleep
  1276. orders = self.strategy.onSleep(self.tradeMsg)
  1277. ### 记录指令触发信息
  1278. if self._not_empty(orders):
  1279. self.logger.debug("触发onSleep")
  1280. self._update_local_orders(orders)
  1281. self.loop.create_task(self.rest.handle_signals(orders))
  1282. self.logger.debug(orders)
  1283. ############################################################
  1284. else:
  1285. self.check_ready()
  1286. ### 计算耗时并进行休眠
  1287. pass_time = time.time()-start_time
  1288. await asyncio.sleep(utils.clip(self.interval-pass_time, 0.0, 1.0))
  1289. except asyncio.CancelledError:
  1290. print('策略触发任务取消')
  1291. except:
  1292. self.logger.error(traceback.format_exc())
  1293. traceback.print_exc()
  1294. await asyncio.sleep(10)
  1295. except asyncio.CancelledError:
  1296. print('策略触发任务取消')
  1297. except:
  1298. self.logger.error(traceback.format_exc())
  1299. traceback.print_exc()
  1300. await asyncio.sleep(10)
  1301. def check_ready(self):
  1302. '''
  1303. 判断初始数据是否齐全
  1304. '''
  1305. ### 检查 ticker 行情
  1306. for i in self.ref_name:
  1307. if i not in self.tickers or self.tickers[i] == {}:
  1308. print("参考盘口ticker未准备好")
  1309. return
  1310. else:
  1311. if self.tickers[i]['bp'] == 0 or self.tickers[i]['ap'] == 0:
  1312. print("参考盘口ticker未准备好")
  1313. return
  1314. if self.trade_name not in self.tickers or self.tickers[self.trade_name] == {}:
  1315. print("交易盘口ticker未准备好")
  1316. return
  1317. else:
  1318. if self.tickers[self.trade_name]['bp'] == 0 or self.tickers[self.trade_name]['ap'] == 0:
  1319. print("交易盘口ticker未准备好")
  1320. return
  1321. ### 检查 market 行情
  1322. all_market = self.get_all_market_data()
  1323. if len(all_market) != utils.LEN*(1+self.ref_num):
  1324. print("聚合行情未准备好")
  1325. return
  1326. else:
  1327. # 如果行情已经就绪 预热trademsg和predictor
  1328. print("聚合行情准备就绪")
  1329. self.tradeMsg.market = all_market
  1330. self.Predictor.onTime(all_market)
  1331. self.ready = 1
  1332. def stop(self):
  1333. '''
  1334. 停机函数
  1335. mode_signal 不能小于80
  1336. 前6秒用于maker平仓
  1337. 后2秒用于撤maker平仓单
  1338. 休眠2秒再执行check_position 避免卡单导致漏仓位
  1339. '''
  1340. self.logger.info(f'进入停机流程...')
  1341. self.mode_signal = 80
  1342. # 等strategy onExit 彻底执行完毕 进入沉默状态之后 再进入exit 否则可能导致多处同时操作订单
  1343. # 尽量减少大仓位直接take平
  1344. self.loop.create_task(self.exit(delay=10))
  1345. async def _run_server(self):
  1346. print('server正在启动...')
  1347. for _ in range(30):
  1348. await asyncio.sleep(5)
  1349. if self.strategy.equity > 0.0:break
  1350. app = web.Application()
  1351. app.router.add_route('GET', '/account', self.server_handle)
  1352. app.router.add_route('POST', '/change', self.change)
  1353. try:
  1354. self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False))
  1355. except:
  1356. self.logger.error(f"Server启动失败")
  1357. self.logger.error(traceback.format_exc())
  1358. self.exit_msg = "服务启动失败 停机退出"
  1359. self.stop()
  1360. def run(self):
  1361. '''启动ws行情获取'''
  1362. def keyboard_interrupt(s, f):
  1363. self.logger.info("收到退出信号 准备关机")
  1364. self.stop()
  1365. try:
  1366. signal.signal(signal.SIGINT, keyboard_interrupt)
  1367. signal.signal(signal.SIGTERM, keyboard_interrupt)
  1368. if 'win' not in sys.platform:
  1369. signal.signal(signal.SIGKILL, keyboard_interrupt)
  1370. signal.signal(signal.SIGQUIT, keyboard_interrupt)
  1371. except:
  1372. pass
  1373. self.loop.create_task(self.before_trade())
  1374. print(f'判断启动方式...')
  1375. if self.father:
  1376. print('以父进程方式启动 最大允许运行时间为30天')
  1377. self.loop.create_task(self.exit(delay=60*60*24*30))
  1378. else:
  1379. print('以子进程方式启动 最大允许运行时间为60分钟')
  1380. self.loop.create_task(self.exit(delay=utils.CHILD_RUN_SECOND))
  1381. self.loop.run_forever()
  1382. if __name__ == "__main__":
  1383. if 0:
  1384. utils.check_auth()
  1385. if 0:
  1386. utils.check_time()
  1387. pnum = len(sys.argv)
  1388. if pnum > 0:
  1389. fname = None
  1390. log_file = None
  1391. pidnum = None
  1392. father = 1
  1393. for i in range(pnum):
  1394. print(f"第{i}个参数为:{sys.argv[i]}")
  1395. if sys.argv[i] == '-c' or sys.argv[i] == '--c':
  1396. fname = sys.argv[i+1]
  1397. elif sys.argv[i] == '-h':
  1398. print("帮助文档")
  1399. elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file':
  1400. log_file = sys.argv[i+1]
  1401. elif sys.argv[i] == '-num' or sys.argv[i] == '--num':
  1402. pidnum = sys.argv[i+1]
  1403. elif sys.argv[i] == '-v' or sys.argv[i] == '--v':
  1404. print(f"当前版本为 V{VERSION}")
  1405. elif sys.argv[i] == '-child' or sys.argv[i] == '--child':
  1406. father = 0
  1407. print(f"当前以子进程方式启动")
  1408. if fname and log_file and pidnum:
  1409. print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum} father:{father}")
  1410. date = time.strftime("%Y%m%d", time.localtime())
  1411. logname = f"{log_file}-{date}"
  1412. quant = Quant(utils.get_params(fname), logname, father)
  1413. quant.run()
  1414. elif fname:
  1415. print(f"运行指定配置文件{fname}")
  1416. quant = Quant(utils.get_params(fname),father=father)
  1417. quant.run()
  1418. else:
  1419. print("缺少指定参数 运行默认配置文件")
  1420. fname = 'config.toml'
  1421. quant = Quant(utils.get_params(fname),father=father)
  1422. quant.run()
  1423. else:
  1424. fname = 'config.toml'
  1425. quant = Quant(utils.get_params(fname))
  1426. quant.run()