| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636 |
- import asyncio
- from http.client import NON_AUTHORITATIVE_INFORMATION
- from aiohttp import web
- import traceback
- import time
- import utils
- import logging, logging.handlers
- import signal
- import broker
- import os, json, sys, random
- def timeit(func):
- def wrapper(*args, **kwargs):
- nowTime = time.time()
- res = func(*args, **kwargs)
- spend_time = time.time() - nowTime
- spend_time = round(spend_time * 100, 2)
- print(f'{func.__name__} 耗时 {spend_time} ms')
- return res
- return wrapper
- def takeSecond(elem):
- return elem[1]
- import ccxt.async_support as ccxt
- ex_list = [
- ccxt.binanceusdm(),
- ccxt.binance(),
- # ccxt.okex5(),
- # ccxt.kucoin(),
- # ccxt.gateio(),
- # ccxt.coinex(),
- ]
- class Center:
- def __init__(self, fname, logname=None):
- print('############### 参数中心 ################')
- print(f'>> {utils.VERSION} <<<')
- print('*** 当前配置 ***')
- self.fname = fname
- self.params = utils.get_params(fname)
- for p in self.params.__dict__:
- print('***', p, ' => ', getattr(self.params, p))
- print('##################################################')
- pid = os.getpid()
- print(f'交易程序正在启动 进程号{pid}...')
- self.logger = self.get_logger(logname)
- self.params_base = dict()
- for i in broker.exchange_lists:self.params_base[i] = dict()
- self.params_dummy = dict()
- for i in broker.exchange_lists:self.params_dummy[i] = dict()
- self.params_real = dict()
- for i in broker.exchange_lists:self.params_real[i] = dict()
- self.win_dict = dict()
- for i in broker.exchange_lists:self.win_dict[i] = dict()
- self.loss_dict = dict()
- for i in broker.exchange_lists:self.loss_dict[i] = dict()
- self.choose_dict = dict()
- for i in broker.exchange_lists:self.choose_dict[i] = dict()
- self.dummy_choose_dict = dict()
- for i in broker.exchange_lists:self.dummy_choose_dict[i] = dict()
- try:
- with open('params_real.json','r') as f:
- self.params_real = json.load(f)
- except:
- pass
- try:
- with open('choose_dict.json','r') as f:
- self.choose_dict = json.load(f)
- except:
- pass
- try:
- with open('dummy_choose_dict.json','r') as f:
- self.dummy_choose_dict = json.load(f)
- except:
- pass
- try:
- with open('params_dummy.json','r') as f:
- self.params_dummy = json.load(f)
- except:
- pass
- self.loop = asyncio.get_event_loop()
- ###
- self.market = dict()
- self.score = dict()
- self.info_msg = "加载中..."
- def get_logger(self, logname):
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.DEBUG)
- # log to txt
- formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
- if logname == None: logname = "log"
- handler = logging.handlers.RotatingFileHandler(f"{logname}.log",maxBytes=1024*1024*50,backupCount=10,encoding='utf-8')
- handler.setLevel(logging.DEBUG)
- handler.setFormatter(formatter)
- # log to console
- console = logging.StreamHandler()
- console.setLevel(logging.INFO)
- logger.addHandler(handler)
- logger.addHandler(console)
- logger.info('开启日志记录')
- return logger
- async def exit(self, delay=0):
- '''退出操作'''
- print(f"开始退出操作 delay{delay}")
- if delay > 0:
- await asyncio.sleep(delay)
- self.logger.info(f'停机退出')
- await asyncio.sleep(1)
- print("停机...")
- # self.loop.create_task(utils.ding(f"参数中心程序停止", 1, self.params.webhook, self.params.proxy))
- self.loop.stop()
- os._exit(0)
- async def read_market(self):
- market = dict()
-
- print("开始查询行情")
- for exchange in ex_list:
-
- if (exchange.has['fetchTickers']):
- tickers = await exchange.fetch_tickers()
- for i in tickers:
- symbol = tickers[i]['symbol']
- if '/USDT' not in symbol:continue
- pair = symbol.split('/')[0].lower() + '_' + symbol.split('/')[1].lower()
- last = tickers[i]['last']
- if last > 0:
- if pair not in market:
- market[pair] = last
- return market
- async def update_score(self):
- '''
- 按最近涨跌幅打分
- '''
- await asyncio.sleep(5)
- while 1:
- try:
- market = await self.read_market()
- self.score = dict()
- for i in self.market:
- if i in market:
- score = abs(market[i] - self.market[i])/market[i]
- if score > 0:
- self.score[i] = score
- score_list = []
- for i in self.score:
- score_list.append(self.score[i])
- if len(score_list) > 0:
- max_score = max(score_list)
- for i in self.score:
- self.score[i] = round(self.score[i]/max_score,3)
- self.market = market
- print("当前打分", self.score)
- await asyncio.sleep(60)
- except Exception as e:
- print("定时循环系统出错"+str(e))
- self.logger.error(traceback.print_exc())
- await asyncio.sleep(10)
- async def _on_timer(self):
- '''定期触发系统逻辑'''
- await asyncio.sleep(1)
- while 1:
- try:
- print('整理参数池...')
- # 重置info_msg
- self.info_msg = ""
- # 加载配置文件群
- p_list = []
- for root, dirs, files in os.walk(os.getcwd()):
- for file_name in files:
- if 'config_dummy' in file_name:
- p_list.append(utils.get_params(os.path.join(root, file_name)))
- # 打印本地参数数据
- self.params_base = dict()
- for i in broker.exchange_lists:self.params_base[i] = dict()
- for p in p_list:
- name = p.exchange + '@' + p.pair
- self.params_base[p.exchange][name] = dict()
- self.params_base[p.exchange][name]['pair'] = p.pair
- self.params_base[p.exchange][name]['exchange'] = p.exchange
- self.params_base[p.exchange][name]['open'] = p.open
- self.params_base[p.exchange][name]['close'] = p.close
- self.params_base[p.exchange][name]['refexchange'] = p.refexchange
- self.params_base[p.exchange][name]['refpair'] = p.refpair
- self.params_base[p.exchange][name]['profit'] = 0.0
- #### 整理params_real
- # 移除太旧的信息
- params_real = dict()
- for ex in broker.exchange_lists:
- params_real[ex] = dict()
- for name in self.params_real[ex].keys():
- if time.time() - self.params_real[ex][name]['time'] > utils.EARLY_STOP_SECOND:
- print(f"params real 记录时间太久 移除{name}")
- else:
- params_real[ex][name] = self.params_real[ex][name]
- self.params_real = params_real
- # 移除异常信息
- params_real = dict()
- for ex in broker.exchange_lists:
- params_real[ex] = dict()
- for name in self.params_real[ex].keys():
- if abs(float(self.params_real[ex][name]['profit'])) > 10000.0:
- print(f"params real 收益率异常 移除{name}")
- else:
- params_real[ex][name] = self.params_real[ex][name]
- self.params_real = params_real
- #### 整理params_dummy
- # 移除太旧的信息
- params_dummy = dict()
- for ex in broker.exchange_lists:
- params_dummy[ex] = dict()
- for name in self.params_dummy[ex].keys():
- if time.time() - self.params_dummy[ex][name]['time'] > utils.DUMMY_EARLY_STOP_SECOND:
- print(f"params dummy 记录时间太久 移除{name}")
- else:
- params_dummy[ex][name] = self.params_dummy[ex][name]
- self.params_dummy = params_dummy
- # 移除异常信息
- params_dummy = dict()
- for ex in broker.exchange_lists:
- params_dummy[ex] = dict()
- for name in self.params_dummy[ex].keys():
- if abs(float(self.params_dummy[ex][name]['profit'])) > 10000.0:
- print(f"params dummy 收益率异常 移除{name}")
- else:
- params_dummy[ex][name] = self.params_dummy[ex][name]
- self.params_dummy = params_dummy
- # 从params_real 提取 win_dict loss_dict
- self.logger.info('='*10)
- profit_thre = 0.001
- for ex in broker.exchange_lists:
- self.logger.info('='*10)
- self.info_msg += '='*10 + '\r'
- # 更新windict lossdict
- self.win_dict[ex] = dict()
- self.loss_dict[ex] = dict()
- profits = []
- pairs = []
- for name in self.params_real[ex]:
- profit = self.params_real[ex][name]['profit']
- profits.append(profit)
- pairs.append(self.params_real[ex][name]['pair'])
- if profit > profit_thre:
- self.win_dict[ex][name] = self.params_real[ex][name]
- self.win_dict[ex][name]['leverrate'] = 2.0
- else:
- self.loss_dict[ex][name] = self.params_real[ex][name]
- self.loss_dict[ex][name]['leverrate'] = 0.0
- if len(profits) > 0:
- max_profit = max(profits)
- max_profit_name = pairs[profits.index(max_profit)]
- max_profit - round(max_profit, 5)
- min_profit = min(profits)
- min_profit_name = pairs[profits.index(min_profit)]
- min_profit - round(min_profit, 5)
- avg_profit = round(sum(profits)/len(profits),5)
- win_num = 0
- for i in profits:
- if i > profit_thre:win_num+=1
- total_num = len(profits)
- base_num = len(self.params_base[ex])
- msg = f"Real情况 盘口{ex} 最大{max_profit_name} {max_profit} 最小{min_profit_name} {min_profit} 平均{avg_profit} 盈利{win_num} 记录{total_num} 总数{base_num}"
- self.logger.info(msg)
- self.info_msg += msg + "\r"
- msg = ""
- for i in range(total_num):
- msg += pairs[i] + " "
- if (i+1)%10 == 0:
- self.logger.info(msg)
- self.info_msg += msg + "\r"
- msg = ""
- self.logger.info(msg)
- self.info_msg += msg + "\r"
- self.logger.info('-'*10)
- self.info_msg += '-'*10 + '\r'
- # 打印dummy_params情况
- profits = []
- pairs = []
- for name in self.params_dummy[ex]:
- profit = self.params_dummy[ex][name]['profit']
- profits.append(profit)
- pairs.append(self.params_dummy[ex][name]['pair'])
- if len(profits) > 0:
- max_profit = max(profits)
- max_profit_name = pairs[profits.index(max_profit)]
- max_profit - round(max_profit, 5)
- min_profit = min(profits)
- min_profit_name = pairs[profits.index(min_profit)]
- min_profit - round(min_profit, 5)
- avg_profit = round(sum(profits)/len(profits),5)
- win_num = 0
- for i in profits:
- if i > profit_thre:win_num+=1
- total_num = len(profits)
- base_num = len(self.params_base[ex])
- msg = f"Dummy情况 盘口{ex} 最大{max_profit_name} {max_profit} 最小{min_profit_name} {min_profit} 平均{avg_profit} 盈利{win_num} 记录{total_num} 总数{base_num}"
- self.logger.info(msg)
- self.info_msg += msg + "\r"
- msg = ""
- for i in range(total_num):
- msg += pairs[i] + " "
- if (i+1)%10 == 0:
- self.logger.info(msg)
- self.info_msg += msg + "\r"
- msg = ""
- self.logger.info(msg)
- self.info_msg += msg + "\r"
- # 保存信息
- with open('params_real.json', 'w') as f:
- json.dump(self.params_real,f)
- with open('choose_dict.json', 'w') as f:
- json.dump(self.choose_dict,f)
- with open('dummy_choose_dict.json', 'w') as f:
- json.dump(self.dummy_choose_dict,f)
- with open('params_dummy.json', 'w') as f:
- json.dump(self.params_dummy,f)
- # 休眠
- await asyncio.sleep(60)
- except Exception as e:
- print("定时循环系统出错"+str(e))
- self.logger.error(traceback.print_exc())
- await asyncio.sleep(10)
- def stop(self):
- print(f'进入停机流程...')
- self.loop.create_task(self.exit(delay=1))
- async def get_info(self, request):
- print(request.remote)
- return web.Response(text=self.info_msg)
- async def get_dummy_params(self, request):
- '''
- 中控数据接口
- 从base参数池随机选参数
- '''
- data = await request.json()
- if isinstance(data, str):
- data = json.loads(data)
- exchange = data['exchange']
- res = dict()
- # 全部dummy实例从base参数池随机选参数 尝试新品种
- for _ in range(len(self.params_base[exchange])):
- name = random.choice(list(self.params_base[exchange].keys()))
- if name not in self.dummy_choose_dict[exchange]:
- self.dummy_choose_dict[exchange][name] = 0
- if name not in self.loss_dict[exchange] and time.time() - self.dummy_choose_dict[exchange][name] > utils.DUMMY_EARLY_STOP_SECOND:
- res[name] = self.params_base[exchange][name]
- res[name]['leverrate'] = 0.5
- self.dummy_choose_dict[exchange][name] = int(time.time())
- break
- ###### 如果没有找到满足条件的参数 随机一组参数
- if res == dict():
- name = random.choice(list(self.params_base[exchange].keys()))
- res[name] = self.params_base[exchange][name]
- res[name]['leverrate'] = 0.5
- self.dummy_choose_dict[exchange][name] = int(time.time())
- return web.Response(body=json.dumps(res))
- async def post_dummy_params(self, request):
- '''
- 中控数据接口
- 更新params_dummy参数池
- '''
- data = await request.json()
- # ip = request.remote
- # print(f'从{ip}更新dummy参数',data)
- if isinstance(data, str):
- data = json.loads(data)
- exchange = data['exchange']
- pair = data['pair']
- name = exchange+"@"+pair
- profit = round(float(data['profit']),4)
- #######
- if exchange in self.params_dummy:
- # 如果已有记录
- if name in self.params_dummy[exchange]:
- self.params_dummy[exchange][name]['exchange'] = data['exchange']
- self.params_dummy[exchange][name]['pair'] = data['pair']
- self.params_dummy[exchange][name]['open'] = data['open']
- self.params_dummy[exchange][name]['close'] = data['close']
- self.params_dummy[exchange][name]['refexchange'] = data['refexchange']
- self.params_dummy[exchange][name]['refpair'] = data['refpair']
- self.params_dummy[exchange][name]['profit'] = \
- round( profit * 0.3 + self.params_dummy[exchange][name]['profit'] * 0.7, 4)
- self.params_dummy[exchange][name]['time'] = int(time.time())
- else:
- # 如果没有记录
- self.params_dummy[exchange][name] = dict()
- self.params_dummy[exchange][name]['exchange'] = data['exchange']
- self.params_dummy[exchange][name]['pair'] = data['pair']
- self.params_dummy[exchange][name]['open'] = data['open']
- self.params_dummy[exchange][name]['close'] = data['close']
- self.params_dummy[exchange][name]['refexchange'] = data['refexchange']
- self.params_dummy[exchange][name]['refpair'] = data['refpair']
- self.params_dummy[exchange][name]['profit'] = profit
- self.params_dummy[exchange][name]['time'] = int(time.time())
- return web.Response(body=json.dumps({}))
- async def get_params(self, request):
- '''
- 中控数据接口
- 从base参数池随机选参数
- 从dummy参数池随机选参数
- '''
- data = await request.json()
- if isinstance(data, str):
- data = json.loads(data)
- exchange = data['exchange']
- res = dict()
- # 本盘口盈利品种数量
- win_num = len(self.win_dict[exchange])
- # 计算盈利参数占比
- if win_num == 0:
- # 没有发现盈利品种的时候 全部实例尝试新品种
- if random.randint(0,100) < 10:
- # 小概率 从base参数池选参数
- for _ in range(len(self.params_base[exchange])):
- name = random.choice(list(self.params_base[exchange].keys()))
- if name not in self.choose_dict[exchange]:
- self.choose_dict[exchange][name] = 0
- if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND:
- res[name] = self.params_base[exchange][name]
- res[name]['leverrate'] = 0.5
- self.choose_dict[exchange][name] = int(time.time())
- break
- else:
- # 大概率 从dummy参数池选参数
- # 计算概率数组
- p_list = []
- name_list = []
- for name in self.params_dummy[exchange]:
- p = self.params_dummy[exchange][name]['profit']
- if p > 0.0:
- p_list.append(p)
- name_list.append(name)
- if len(p_list) > 1:
- for _ in range(len(p_list)):
- name = random.choices(
- name_list,
- p_list,
- k=1,
- )[0]
- if name not in self.choose_dict[exchange]:
- self.choose_dict[exchange][name] = 0
- if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND:
- res[name] = self.params_dummy[exchange][name]
- res[name]['leverrate'] = 0.5
- self.choose_dict[exchange][name] = int(time.time())
- break
- else:
- # 允许10%的实例去搜索新品种
- if random.randint(0,100) < 90:
- # 跑实盘盈利品种
- res = self.win_dict[exchange]
- else:
- # 没有发现盈利品种的时候 全部实例尝试新品种
- if random.randint(0,100) < 10:
- # 小概率 从base参数池选参数
- for _ in range(len(self.params_base[exchange])):
- name = random.choice(list(self.params_base[exchange].keys()))
- if name not in self.choose_dict[exchange]:
- self.choose_dict[exchange][name] = 0
- if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND:
- res[name] = self.params_base[exchange][name]
- res[name]['leverrate'] = 0.5
- self.choose_dict[exchange][name] = int(time.time())
- break
- else:
- # 大概率 从dummy参数池选参数
- # 计算概率数组
- p_list = []
- name_list = []
- for name in self.params_dummy[exchange]:
- p = self.params_dummy[exchange][name]['profit']
- if p > 0.0:
- p_list.append(p)
- name_list.append(name)
- if len(p_list) > 1:
- for _ in range(len(p_list)):
- name = random.choices(
- name_list,
- p_list,
- k=1,
- )[0]
- if name not in self.choose_dict[exchange]:
- self.choose_dict[exchange][name] = 0
- if name not in self.loss_dict[exchange] and time.time() - self.choose_dict[exchange][name] > utils.EARLY_STOP_SECOND:
- res[name] = self.params_dummy[exchange][name]
- res[name]['leverrate'] = 0.5
- self.choose_dict[exchange][name] = int(time.time())
- break
- ###### 如果没有找到满足条件的参数 随机一组参数
- if res == dict():
- name = random.choice(list(self.params_base[exchange].keys()))
- res[name] = self.params_base[exchange][name]
- res[name]['leverrate'] = 0.5
- self.choose_dict[exchange][name] = int(time.time())
- return web.Response(body=json.dumps(res))
- async def post_params(self, request):
- '''
- 中控数据接口
- 更新real参数池
- '''
- data = await request.json()
- ip = request.remote
- # print(f'从{ip}更新参数',data)
- if isinstance(data, str):
- data = json.loads(data)
- exchange = data['exchange']
- pair = data['pair']
- name = exchange+"@"+pair
- profit = round(float(data['profit']),4)
- if exchange in self.params_real:
- # 如果已有记录
- if name in self.params_real[exchange]:
- self.params_real[exchange][name]['exchange'] = data['exchange']
- self.params_real[exchange][name]['pair'] = data['pair']
- self.params_real[exchange][name]['open'] = data['open']
- self.params_real[exchange][name]['close'] = data['close']
- self.params_real[exchange][name]['refexchange'] = data['refexchange']
- self.params_real[exchange][name]['refpair'] = data['refpair']
- self.params_real[exchange][name]['profit'] = \
- round( profit * 0.3 + self.params_real[exchange][name]['profit'] * 0.7, 4)
- self.params_real[exchange][name]['time'] = int(time.time())
- else:
- # 如果没有记录
- self.params_real[exchange][name] = dict()
- self.params_real[exchange][name]['exchange'] = data['exchange']
- self.params_real[exchange][name]['pair'] = data['pair']
- self.params_real[exchange][name]['open'] = data['open']
- self.params_real[exchange][name]['close'] = data['close']
- self.params_real[exchange][name]['refexchange'] = data['refexchange']
- self.params_real[exchange][name]['refpair'] = data['refpair']
- self.params_real[exchange][name]['profit'] = profit
- self.params_real[exchange][name]['time'] = int(time.time())
- return web.Response(body=json.dumps({}))
- async def _run_server(self):
- print('server正在启动...')
- await asyncio.sleep(10)
- app = web.Application()
- app.router.add_route('*', f'/get_info', self.get_info)
- app.router.add_route('POST', f'/get_params', self.get_params)
- app.router.add_route('POST', f'/get_dummy_params', self.get_dummy_params)
- app.router.add_route('POST', f'/post_params', self.post_params)
- app.router.add_route('POST', f'/post_dummy_params', self.post_dummy_params)
- try:
- self.loop.create_task(web._run_app(app, host='0.0.0.0', port=self.params.server_port, handle_signals=False))
- except:
- self.logger.error(traceback.format_exc())
- def run(self):
- '''启动ws行情获取'''
-
- tasks = []
- # 策略
- for i in [
- asyncio.ensure_future(self._run_server()),
- asyncio.ensure_future(self._on_timer()),
- # asyncio.ensure_future(self.update_score()),
- ]:
- tasks.append(i)
- def keyboard_interrupt(s, f):
- print("收到退出信号 准备关机")
- self.logger.info("收到退出信号 准备关机")
- self.stop()
- try:
- signal.signal(signal.SIGINT, keyboard_interrupt)
- signal.signal(signal.SIGTERM, keyboard_interrupt)
- if 'win' not in sys.platform:
- signal.signal(signal.SIGKILL, keyboard_interrupt)
- signal.signal(signal.SIGQUIT, keyboard_interrupt)
- except:
- pass
- self.loop.run_until_complete(asyncio.wait(tasks))
- if __name__ == "__main__":
- if 0:
- utils.check_auth()
- pnum = len(sys.argv)
- if pnum > 0:
- fname = None
- log_file = None
- pidnum = None
- for i in range(pnum):
- print(f"第{i}个参数为:{sys.argv[i]}")
- if sys.argv[i] == '-c' or sys.argv[i] == '--c':
- fname = sys.argv[i+1]
- elif sys.argv[i] == '-h':
- print("帮助文档")
- elif sys.argv[i] == '-log_file' or sys.argv[i] == '--log_file':
- log_file = sys.argv[i+1]
- elif sys.argv[i] == '-num' or sys.argv[i] == '--num':
- pidnum = sys.argv[i+1]
- elif sys.argv[i] == '-v' or sys.argv[i] == '--v':
- print("当前版本为 V4.1")
- if fname and log_file and pidnum:
- print(f"指定的配置为 fname:{fname} log_file:{log_file} pidnum:{pidnum}")
- date = time.strftime("%Y%m%d", time.localtime())
- logname = f"{log_file}-{date}"
- quant = Center(fname, logname)
- quant.run()
- elif fname:
- print(f"运行指定配置文件{fname}")
- quant = Center(fname)
- quant.run()
- else:
- print("缺少指定参数 运行默认配置文件")
- fname = 'config.toml'
- quant = Center(fname)
- quant.run()
- else:
- fname = 'config.toml'
- quant = Center(fname)
- quant.run()
|