import asyncio from http.client import NON_AUTHORITATIVE_INFORMATION from aiohttp import web import traceback import time import utils import logging, logging.handlers import signal import broker import os, json, sys, random def timeit(func): def wrapper(*args, **kwargs): nowTime = time.time() res = func(*args, **kwargs) spend_time = time.time() - nowTime spend_time = round(spend_time * 100, 2) print(f'{func.__name__} 耗时 {spend_time} ms') return res return wrapper def takeSecond(elem): return elem[1] import ccxt.async_support as ccxt ex_list = [ ccxt.binanceusdm(), ccxt.binance(), # ccxt.okex5(), # ccxt.kucoin(), # ccxt.gateio(), # ccxt.coinex(), ] class Center: def __init__(self, fname, logname=None): print('############### 参数中心 ################') print(f'>> {utils.VERSION} <<<') print('*** 当前配置 ***') self.fname = fname self.params = utils.get_params(fname) for p in self.params.__dict__: print('***', p, ' => ', getattr(self.params, p)) print('##################################################') pid = os.getpid() print(f'交易程序正在启动 进程号{pid}...') self.logger = self.get_logger(logname) self.params_base = dict() for i in broker.exchange_lists:self.params_base[i] = dict() self.params_dummy = dict() for i in broker.exchange_lists:self.params_dummy[i] = dict() self.params_real = dict() for i in broker.exchange_lists:self.params_real[i] = dict() self.win_dict = dict() for i in broker.exchange_lists:self.win_dict[i] = dict() self.loss_dict = dict() for i in broker.exchange_lists:self.loss_dict[i] = dict() self.choose_dict = dict() for i in broker.exchange_lists:self.choose_dict[i] = dict() self.dummy_choose_dict = dict() for i in broker.exchange_lists:self.dummy_choose_dict[i] = dict() try: with open('params_real.json','r') as f: self.params_real = json.load(f) except: pass try: with open('choose_dict.json','r') as f: self.choose_dict = json.load(f) except: pass try: with open('dummy_choose_dict.json','r') as f: self.dummy_choose_dict = json.load(f) except: pass try: with open('params_dummy.json','r') as f: self.params_dummy = json.load(f) except: pass self.loop = asyncio.get_event_loop() ### self.market = dict() self.score = dict() self.info_msg = "加载中..." def get_logger(self, logname): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') if logname == None: logname = "log" handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*50,backupCount=10,encoding='utf-8') handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) # log to console console = logging.StreamHandler() console.setLevel(logging.INFO) logger.addHandler(handler) logger.addHandler(console) logger.info('开启日志记录') return logger async def exit(self, delay=0): '''退出操作''' print(f"开始退出操作 delay{delay}") if delay > 0: await asyncio.sleep(delay) self.logger.info(f'停机退出') await asyncio.sleep(1) print("停机...") # self.loop.create_task(utils.ding(f"参数中心程序停止", 1, self.params.webhook, self.params.proxy)) self.loop.stop() os._exit(0) async def read_market(self): market = dict() print("开始查询行情") for exchange in ex_list: if (exchange.has['fetchTickers']): tickers = await exchange.fetch_tickers() for i in tickers: symbol = tickers[i]['symbol'] if '/USDT' not in symbol:continue pair = symbol.split('/')[0].lower() + '_' + symbol.split('/')[1].lower() last = tickers[i]['last'] if last > 0: if pair not in market: market[pair] = last return market async def update_score(self): ''' 按最近涨跌幅打分 ''' await asyncio.sleep(5) while 1: try: market = await self.read_market() self.score = dict() for i in self.market: if i in market: score = abs(market[i] - self.market[i])/market[i] if score > 0: self.score[i] = score score_list = [] for i in self.score: score_list.append(self.score[i]) if len(score_list) > 0: max_score = max(score_list) for i in self.score: self.score[i] = round(self.score[i]/max_score,3) self.market = market print("当前打分", self.score) await asyncio.sleep(60) except Exception as e: print("定时循环系统出错"+str(e)) self.logger.error(traceback.print_exc()) await asyncio.sleep(10) async def _on_timer(self): '''定期触发系统逻辑''' await asyncio.sleep(1) while 1: try: print('整理参数池...') # 重置info_msg self.info_msg = "" # 加载配置文件群 p_list = [] for root, dirs, files in os.walk(os.getcwd()): for file_name in files: if 'config_dummy' in file_name: p_list.append(utils.get_params(os.path.join(root, file_name))) # 打印本地参数数据 self.params_base = dict() for i in broker.exchange_lists:self.params_base[i] = dict() for p in p_list: name = p.exchange + '@' + p.pair self.params_base[p.exchange][name] = dict() self.params_base[p.exchange][name]['pair'] = p.pair self.params_base[p.exchange][name]['exchange'] = p.exchange self.params_base[p.exchange][name]['open'] = p.open self.params_base[p.exchange][name]['close'] = p.close self.params_base[p.exchange][name]['refexchange'] = p.refexchange self.params_base[p.exchange][name]['refpair'] = p.refpair self.params_base[p.exchange][name]['profit'] = 0.0 #### 整理params_real # 移除太旧的信息 params_real = dict() for ex in broker.exchange_lists: params_real[ex] = dict() for name in self.params_real[ex].keys(): if time.time() - self.params_real[ex][name]['time'] > utils.EARLY_STOP_SECOND: print(f"params real 记录时间太久 移除{name}") else: params_real[ex][name] = self.params_real[ex][name] self.params_real = params_real # 移除异常信息 params_real = dict() for ex in broker.exchange_lists: params_real[ex] = dict() for name in self.params_real[ex].keys(): if abs(float(self.params_real[ex][name]['profit'])) > 10000.0: print(f"params real 收益率异常 移除{name}") else: params_real[ex][name] = self.params_real[ex][name] self.params_real = params_real #### 整理params_dummy # 移除太旧的信息 params_dummy = dict() for ex in broker.exchange_lists: params_dummy[ex] = dict() for name in self.params_dummy[ex].keys(): if time.time() - self.params_dummy[ex][name]['time'] > utils.DUMMY_EARLY_STOP_SECOND: print(f"params dummy 记录时间太久 移除{name}") else: params_dummy[ex][name] = self.params_dummy[ex][name] self.params_dummy = params_dummy # 移除异常信息 params_dummy = dict() for ex in broker.exchange_lists: params_dummy[ex] = dict() for name in self.params_dummy[ex].keys(): if abs(float(self.params_dummy[ex][name]['profit'])) > 10000.0: print(f"params dummy 收益率异常 移除{name}") else: params_dummy[ex][name] = self.params_dummy[ex][name] self.params_dummy = params_dummy # 从params_real 提取 win_dict loss_dict self.logger.info('='*10) profit_thre = 0.001 for ex in broker.exchange_lists: self.logger.info('='*10) self.info_msg += '='*10 + '\r' # 更新windict lossdict self.win_dict[ex] = dict() self.loss_dict[ex] = dict() profits = [] pairs = [] for name in self.params_real[ex]: profit = self.params_real[ex][name]['profit'] profits.append(profit) pairs.append(self.params_real[ex][name]['pair']) if profit > profit_thre: self.win_dict[ex][name] = self.params_real[ex][name] self.win_dict[ex][name]['leverrate'] = 2.0 else: self.loss_dict[ex][name] = self.params_real[ex][name] self.loss_dict[ex][name]['leverrate'] = 0.0 if len(profits) > 0: max_profit = max(profits) max_profit_name = pairs[profits.index(max_profit)] max_profit - round(max_profit, 5) min_profit = min(profits) min_profit_name = pairs[profits.index(min_profit)] min_profit - round(min_profit, 5) avg_profit = round(sum(profits)/len(profits),5) win_num = 0 for i in profits: if i > profit_thre:win_num+=1 total_num = len(profits) base_num = len(self.params_base[ex]) msg = f"Real情况 盘口{ex} 最大{max_profit_name} {max_profit} 最小{min_profit_name} {min_profit} 平均{avg_profit} 盈利{win_num} 记录{total_num} 总数{base_num}" self.logger.info(msg) self.info_msg += msg + "\r" msg = "" for i in range(total_num): msg += pairs[i] + " " if (i+1)%10 == 0: self.logger.info(msg) self.info_msg += msg + "\r" msg = "" self.logger.info(msg) self.info_msg += msg + "\r" self.logger.info('-'*10) self.info_msg += '-'*10 + '\r' # 打印dummy_params情况 profits = [] pairs = [] for name in self.params_dummy[ex]: profit = self.params_dummy[ex][name]['profit'] profits.append(profit) pairs.append(self.params_dummy[ex][name]['pair']) if len(profits) > 0: max_profit = max(profits) max_profit_name = pairs[profits.index(max_profit)] max_profit - round(max_profit, 5) min_profit = min(profits) min_profit_name = pairs[profits.index(min_profit)] min_profit - round(min_profit, 5) avg_profit = round(sum(profits)/len(profits),5) win_num = 0 for i in profits: if i > profit_thre:win_num+=1 total_num = len(profits) base_num = len(self.params_base[ex]) msg = f"Dummy情况 盘口{ex} 最大{max_profit_name} {max_profit} 最小{min_profit_name} {min_profit} 平均{avg_profit} 盈利{win_num} 记录{total_num} 总数{base_num}" self.logger.info(msg) self.info_msg += msg + "\r" msg = "" for i in range(total_num): msg += pairs[i] + " " if (i+1)%10 == 0: self.logger.info(msg) self.info_msg += msg + "\r" msg = "" self.logger.info(msg) self.info_msg += msg + "\r" # 保存信息 with open('params_real.json', 'w') as f: json.dump(self.params_real,f) with open('choose_dict.json', 'w') as f: json.dump(self.choose_dict,f) with open('dummy_choose_dict.json', 'w') as f: json.dump(self.dummy_choose_dict,f) with open('params_dummy.json', 'w') as f: json.dump(self.params_dummy,f) # 休眠 await asyncio.sleep(60) except Exception as e: print("定时循环系统出错"+str(e)) self.logger.error(traceback.print_exc()) await asyncio.sleep(10) def stop(self): print(f'进入停机流程...') self.loop.create_task(self.exit(delay=1)) async def get_info(self, request): print(request.remote) return web.Response(text=self.info_msg) async def get_dummy_params(self, request): ''' 中控数据接口 从base参数池随机选参数 ''' data = await request.json() if isinstance(data, str): data = json.loads(data) exchange = data['exchange'] res = dict() # 全部dummy实例从base参数池随机选参数 尝试新品种 for _ in range(len(self.params_base[exchange])): name = random.choice(list(self.params_base[exchange].keys())) if name not in self.dummy_choose_dict[exchange]: self.dummy_choose_dict[exchange][name] = 0 if name not in self.loss_dict[exchange] and time.time() - self.dummy_choose_dict[exchange][name] > utils.DUMMY_EARLY_STOP_SECOND: res[name] = self.params_base[exchange][name] res[name]['leverrate'] = 0.5 self.dummy_choose_dict[exchange][name] = int(time.time()) break ###### 如果没有找到满足条件的参数 随机一组参数 if res == dict(): name = random.choice(list(self.params_base[exchange].keys())) res[name] = self.params_base[exchange][name] res[name]['leverrate'] = 0.5 self.dummy_choose_dict[exchange][name] = int(time.time()) return web.Response(body=json.dumps(res)) async def post_dummy_params(self, request): ''' 中控数据接口 更新params_dummy参数池 ''' data = await request.json() # ip = request.remote # print(f'从{ip}更新dummy参数',data) if isinstance(data, str): data = json.loads(data) exchange = data['exchange'] pair = data['pair'] name = exchange+"@"+pair profit = round(float(data['profit']),4) ####### if exchange in self.params_dummy: # 如果已有记录 if name in self.params_dummy[exchange]: self.params_dummy[exchange][name]['exchange'] = data['exchange'] self.params_dummy[exchange][name]['pair'] = data['pair'] self.params_dummy[exchange][name]['open'] = data['open'] self.params_dummy[exchange][name]['close'] = data['close'] self.params_dummy[exchange][name]['refexchange'] = data['refexchange'] self.params_dummy[exchange][name]['refpair'] = data['refpair'] self.params_dummy[exchange][name]['profit'] = \ round( profit * 0.3 + self.params_dummy[exchange][name]['profit'] * 0.7, 4) self.params_dummy[exchange][name]['time'] = int(time.time()) else: # 如果没有记录 self.params_dummy[exchange][name] = dict() self.params_dummy[exchange][name]['exchange'] = data['exchange'] self.params_dummy[exchange][name]['pair'] = data['pair'] self.params_dummy[exchange][name]['open'] = data['open'] self.params_dummy[exchange][name]['close'] = data['close'] self.params_dummy[exchange][name]['refexchange'] = data['refexchange'] self.params_dummy[exchange][name]['refpair'] = data['refpair'] self.params_dummy[exchange][name]['profit'] = profit self.params_dummy[exchange][name]['time'] = int(time.time()) return web.Response(body=json.dumps({})) async def get_params(self, request): ''' 中控数据接口 从base参数池随机选参数 从dummy参数池随机选参数 ''' data = await request.json() if isinstance(data, str): data = json.loads(data) exchange = data['exchange'] res = dict() # 本盘口盈利品种数量 win_num = len(self.win_dict[exchange]) # 计算盈利参数占比 if win_num == 0: # 没有发现盈利品种的时候 全部实例尝试新品种 if random.randint(0,100) < 10: # 小概率 从base参数池选参数 for _ in range(len(self.params_base[exchange])): name = random.choice(list(self.params_base[exchange].keys())) if name not in self.choose_dict[exchange]: self.choose_dict[exchange][name] = 0 if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND: res[name] = self.params_base[exchange][name] res[name]['leverrate'] = 0.5 self.choose_dict[exchange][name] = int(time.time()) break else: # 大概率 从dummy参数池选参数 # 计算概率数组 p_list = [] name_list = [] for name in self.params_dummy[exchange]: p = self.params_dummy[exchange][name]['profit'] if p > 0.0: p_list.append(p) name_list.append(name) if len(p_list) > 1: for _ in range(len(p_list)): name = random.choices( name_list, p_list, k=1, )[0] if name not in self.choose_dict[exchange]: self.choose_dict[exchange][name] = 0 if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND: res[name] = self.params_dummy[exchange][name] res[name]['leverrate'] = 0.5 self.choose_dict[exchange][name] = int(time.time()) break else: # 允许10%的实例去搜索新品种 if random.randint(0,100) < 90: # 跑实盘盈利品种 res = self.win_dict[exchange] else: # 没有发现盈利品种的时候 全部实例尝试新品种 if random.randint(0,100) < 10: # 小概率 从base参数池选参数 for _ in range(len(self.params_base[exchange])): name = random.choice(list(self.params_base[exchange].keys())) if name not in self.choose_dict[exchange]: self.choose_dict[exchange][name] = 0 if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND: res[name] = self.params_base[exchange][name] res[name]['leverrate'] = 0.5 self.choose_dict[exchange][name] = int(time.time()) break else: # 大概率 从dummy参数池选参数 # 计算概率数组 p_list = [] name_list = [] for name in self.params_dummy[exchange]: p = self.params_dummy[exchange][name]['profit'] if p > 0.0: p_list.append(p) name_list.append(name) if len(p_list) > 1: for _ in range(len(p_list)): name = random.choices( name_list, p_list, k=1, )[0] if name not in self.choose_dict[exchange]: self.choose_dict[exchange][name] = 0 if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND: res[name] = self.params_dummy[exchange][name] res[name]['leverrate'] = 0.5 self.choose_dict[exchange][name] = int(time.time()) break ###### 如果没有找到满足条件的参数 随机一组参数 if res == dict(): name = random.choice(list(self.params_base[exchange].keys())) res[name] = self.params_base[exchange][name] res[name]['leverrate'] = 0.5 self.choose_dict[exchange][name] = int(time.time()) return web.Response(body=json.dumps(res)) async def post_params(self, request): ''' 中控数据接口 更新real参数池 ''' data = await request.json() ip = request.remote # print(f'从{ip}更新参数',data) if isinstance(data, str): data = json.loads(data) exchange = data['exchange'] pair = data['pair'] name = exchange+"@"+pair profit = round(float(data['profit']),4) if exchange in self.params_real: # 如果已有记录 if name in self.params_real[exchange]: self.params_real[exchange][name]['exchange'] = data['exchange'] self.params_real[exchange][name]['pair'] = data['pair'] self.params_real[exchange][name]['open'] = data['open'] self.params_real[exchange][name]['close'] = data['close'] self.params_real[exchange][name]['refexchange'] = data['refexchange'] self.params_real[exchange][name]['refpair'] = data['refpair'] self.params_real[exchange][name]['profit'] = \ round( profit * 0.3 + self.params_real[exchange][name]['profit'] * 0.7, 4) self.params_real[exchange][name]['time'] = int(time.time()) else: # 如果没有记录 self.params_real[exchange][name] = dict() self.params_real[exchange][name]['exchange'] = data['exchange'] self.params_real[exchange][name]['pair'] = data['pair'] self.params_real[exchange][name]['open'] = data['open'] self.params_real[exchange][name]['close'] = data['close'] self.params_real[exchange][name]['refexchange'] = data['refexchange'] self.params_real[exchange][name]['refpair'] = data['refpair'] self.params_real[exchange][name]['profit'] = profit self.params_real[exchange][name]['time'] = int(time.time()) return web.Response(body=json.dumps({})) async def _run_server(self): print('server正在启动...') await asyncio.sleep(10) app = web.Application() app.router.add_route('*', f'/get_info', self.get_info) app.router.add_route('POST', f'/get_params', self.get_params) app.router.add_route('POST', f'/get_dummy_params', self.get_dummy_params) app.router.add_route('POST', f'/post_params', self.post_params) app.router.add_route('POST', f'/post_dummy_params', self.post_dummy_params) try: self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False)) except: self.logger.error(traceback.format_exc()) def run(self): '''启动ws行情获取''' tasks = [] # 策略 for i in [ asyncio.ensure_future(self._run_server()), asyncio.ensure_future(self._on_timer()), # asyncio.ensure_future(self.update_score()), ]: tasks.append(i) def keyboard_interrupt(s, f): print("收到退出信号 准备关机") self.logger.info("收到退出信号 准备关机") self.stop() try: signal.signal(signal.SIGINT, keyboard_interrupt) signal.signal(signal.SIGTERM, keyboard_interrupt) if 'win' not in sys.platform: signal.signal(signal.SIGKILL, keyboard_interrupt) signal.signal(signal.SIGQUIT, keyboard_interrupt) except: pass self.loop.run_until_complete(asyncio.wait(tasks)) if __name__ == "__main__": if 0: utils.check_auth() pnum = len(sys.argv) if pnum > 0: fname = None log_file = None pidnum = None for i in range(pnum): print(f"第{i}个参数为:{sys.argv[i]}") if sys.argv[i] == '-c' or sys.argv[i] == '--c': fname = sys.argv[i+1] elif sys.argv[i] == '-h': print("帮助文档") elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file': log_file = sys.argv[i+1] elif sys.argv[i] == '-num' or sys.argv[i] == '--num': pidnum = sys.argv[i+1] elif sys.argv[i] == '-v' or sys.argv[i] == '--v': print("当前版本为 V4.1") if fname and log_file and pidnum: print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum}") date = time.strftime("%Y%m%d", time.localtime()) logname = f"{log_file}-{date}" quant = Center(fname, logname) quant.run() elif fname: print(f"运行指定配置文件{fname}") quant = Center(fname) quant.run() else: print("缺少指定参数 运行默认配置文件") fname = 'config.toml' quant = Center(fname) quant.run() else: fname = 'config.toml' quant = Center(fname) quant.run()