import json import traceback import utils import model import toml, time, random import os, sys, asyncio, aiohttp import socket import asyncio import requests import ujson from decimal import Decimal from decimal import ROUND_HALF_UP, ROUND_FLOOR import gzip import csv import os import base64 from Crypto.Cipher import AES from Crypto import Random import os import base64 import json parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0,parentdir) ############### 全局配置 VERSION = "2022-04-18" CHILD_RUN_SECOND = 60 * 60 * 24 # child process max run time per loop EARLY_STOP_SECOND = 60 * 60 * 2 # child early stop min check time BACKTEST_PREHOT_SECOND = 60 * 30 # backtest pre hot time DUMMY_RUN_SECOND = 60 * 60 * 12 # dummy process max run time per loop DUMMY_EARLY_STOP_SECOND = 60 * 60 # dummy process max run time per loop POST_SIDE_LIMIT = [0] # post side limit MARKET_DELAY_LIMIT = 30000 # market update delay limit threhold unit:ms GRID = 1 STOPLOSS = 0.02 GAMMA = 0.999 ###### market行情数据长度 标准化n档深度+6档成交信息 ###### LEVEL = 1 TRADE_LEN = 2 # 最高 最低 成交价 LEN = LEVEL * 4 + TRADE_LEN # 总长度 BP_INDEX = LEVEL * 0 BQ_INDEX = LEVEL * 0 + 1 AP_INDEX = LEVEL * 2 AQ_INDEX = LEVEL * 2 + 1 MAX_FILL_INDEX = LEVEL * 4 + 0 MIN_FILL_INDEX = LEVEL * 4 + 1 # BUY_Q_INDEX = LEVEL * 4 + 2 # BUY_V_INDEX = LEVEL * 4 + 3 # SELL_Q_INDEX = LEVEL * 4 + 4 # SELL_V_INDEX = LEVEL * 4 + 5 #### depth/trade effient range ##### EFF_RANGE = 0.001 ### init backtest delay ### BACKTEST_DELAY = 0.15 global base_cid base_cid = 0 def get_cid(broker=None): global base_cid base_cid += 1 if base_cid > 999: base_cid=0 cid = str(time.time())[4:10]+str(random.randint(1,999))+str(base_cid) if broker: cid = broker + cid return cid def csv_to_gz_and_remove(): def List_files(filepath, substr): X = [] Y = [] for path, subdirs, files in sorted(os.walk(filepath), reverse=True): for name in files: X.append(os.path.join(path, name)) Y = [line for line in X if substr in line] return Y for file in List_files('./', '.csv'): if '.gz' not in file: data = open(file, 'rb' ).read() with gzip.open(file + '.gz', 'a') as zip: zip.write(data) zip.close() os.remove(file) def get_params(fname): # 读取配置 try: params = toml.load(fname) except: f = open(fname) data = f.read() text = base64.b64decode(data) cryptor = AES.new(key =bytes("qFHFPv6MugrSTkEsWFs8wCDg3iC6!er%".encode()), mode=AES.MODE_ECB) plain_text = cryptor.decrypt(text) paddingLen = plain_text[len(plain_text)-1] msg = plain_text[0:-paddingLen] msg = msg.decode() params = toml.loads(msg) p = model.Config() # 账号昵称 p.account_name = params['account_name'] if 'account_name' in params else 'Unknown Account' # api p.access_key = params['access_key'].replace(" ", "") if 'access_key' in params else '***' p.secret_key = params['secret_key'].replace(" ", "") if 'secret_key' in params else '***' p.pass_key = params['pass_key'].replace(" ", "") if 'pass_key' in params else 'qwer1234' # 经纪商id broker_id_from_config = params['broker_id'] if 'broker_id' in params else "" p.broker_id = get_broker_id( broker_id_from_config, params['exchange']) # 交易盘口 p.exchange = params['exchange'] if 'exchange' in params else "" # 交易品种 p.pair = params['pair'] if 'pair' in params else "" # 调试模式开关 p.debug = params['debug'] if 'debug' in params else "False" # 开仓 p.open = params['open'] if 'open' in params else "0.002" # 平仓 p.close = params['close'] if 'close' in params else "0.0002" # 监听端口 p.server_port = params['server_port'] if 'server_port' in params else 6000 # 杠杆大小 p.leverrate = float(params['leverrate']) if 'leverrate' in params else 1.0 # 参考盘口 p.refexchange = params['refexchange'].replace('[','').replace(']','').replace("'",'').replace(" ", "").split(',') if "refexchange" in params else "" # 参考品种 p.refpair = params['refpair'].replace('[','').replace(']','').replace("'",'').replace(" ", "").split(',') if "refpair" in params else "" # 网络代理 p.proxy = params['proxy'] if 'proxy' in params else None # 仅在win下有效 # 账户资金使用比例 p.used_pct = params['used_pct'] if 'used_pct' in params else "0.9" # discord播报地址 p.webhook = params['webhook'] if 'webhook' in params else "https://discord.com/api/webhooks/907870708481265675/IfN4GqH4fj8HWS_FecH3Lrc2qtRyqsCHsSJVLFHlxY8ioHprfdxIMUNAfqkZZ6opzVEP" # 默认第n参考盘口 p.index = params['index'] if 'index' in params else 0 # 止损比例 0.02 = 2% p.stoploss = params['stoploss'] if 'stoploss' in params else STOPLOSS # 平滑系数 默认0.999 p.gamma = params['gamma'] if 'gamma' in params else GAMMA # 分批建仓功能 小资金建议1 大资金建议3 p.grid = params['grid'] if 'grid' in params else GRID # 实时调参开关 会有巨大性能损耗 p.backtest = params['backtest'] if 'backtest' in params else 1 # 保存实时行情 会有巨大性能损耗 p.save = params['save'] if 'save' in params else 0 p.place_order_limit = params['place_order_limit'] if 'place_order_limit' in params else 0 # 允许的每秒下单次数 # 是否启用colocation技术 p.colo = params['colo'] if 'colo' in params else 0 # 是否启用fast行情 会增加性能开销 p.fast = params['fast'] if 'fast' in params else 1 # 选择指定的私有ip进行网络通信 默认0 用于多网卡多ip的实例 p.ip = params['ip'] if 'ip' in params else 0 # 合约不允许holdcoin持有底仓币 if "spot" in p.exchange: p.hold_coin = params['hold_coin'] if 'hold_coin' in params else 0.0 else: p.hold_coin = 0.0 # 是否开启日志记录 会有一定性能损耗 p.log = params['log'] if 'log' in params else 1 #### 特殊情况处理 if p.exchange == 'binance_usdt_swap': if p.pair in ['shib_usdt', 'xec_usdt', 'bttc_usdt']: p.pair = "1000" + p.pair ref_num = len(p.refexchange) for i in range(ref_num): if p.refexchange[i] == 'binance_usdt_swap': if p.refpair[i] in ['shib_usdt', 'xec_usdt', 'bttc_usdt']: p.refpair[i] = "1000" + p.refpair[i] #### return p def get_broker_id(broker_id , exchange_name): '''处理brokerid特殊情况''' if 'binance' in exchange_name: return broker_id elif 'gate' in exchange_name: return "t-" else: return "" # 报单频率限制等级 BASIC_LIMIT = 100 GATE_SPOT_LIMIT = 10.0 GATE_USDT_SWAP_LIMIT = 100.0 KUCOIN_SPOT_LIMIT = 15.0 KUCOIN_USDT_SWAP_LIMIT = 10.0 BINANCE_USDT_SWAP_LIMIT = 5.0 BINANCE_SPOT_LIMIT = 2.0 COINEX_SPOT_LIMIT = 40.0 COINEX_USDT_SWAP_LIMIT = 100.0 OKEX_USDT_SWAP_LIMIT= 30.0 BITGET_USDT_SWAP_LIMIT = 10.0 BYBIT_USDT_SWAP_LIMIT = 1.0 RATIO = 4.0 def get_limit_requests_num_per_second(exchange, limit=0): '''每秒请求频率''' if limit != 0: return limit*RATIO elif exchange == "gate_spot": return GATE_SPOT_LIMIT*RATIO elif exchange == "gate_usdt_swap": # 100/s return GATE_USDT_SWAP_LIMIT*RATIO elif exchange == "kucoin_spot": # 15/s return KUCOIN_SPOT_LIMIT*RATIO elif exchange == "kucoin_usdt_swap": return KUCOIN_USDT_SWAP_LIMIT*RATIO elif exchange == "binance_usdt_swap": return BINANCE_USDT_SWAP_LIMIT*RATIO elif exchange == "binance_spot": return BINANCE_SPOT_LIMIT*RATIO elif exchange == "coinex_spot": return COINEX_SPOT_LIMIT*RATIO elif exchange == "coinex_usdt_swap": return COINEX_USDT_SWAP_LIMIT*RATIO elif exchange == "okex_usdt_swap": return OKEX_USDT_SWAP_LIMIT*RATIO elif exchange == "bitget_usdt_swap": return BITGET_USDT_SWAP_LIMIT*RATIO elif exchange == "bybit_usdt_swap": return BYBIT_USDT_SWAP_LIMIT*RATIO else: print("限频规则未找到") return BASIC_LIMIT*RATIO def get_limit_order_requests_num_per_second(exchange, limit=0): '''每秒下单请求频率''' if limit != 0: return limit elif exchange == "gate_spot": # 10/s return GATE_SPOT_LIMIT elif exchange == "gate_usdt_swap": # 100/s return GATE_USDT_SWAP_LIMIT elif exchange == "kucoin_spot": # 15/s return KUCOIN_SPOT_LIMIT elif exchange == "kucoin_usdt_swap": # 10/s return KUCOIN_USDT_SWAP_LIMIT elif exchange == "binance_usdt_swap": # 5/s return BINANCE_USDT_SWAP_LIMIT elif exchange == "binance_spot": # 2/s return BINANCE_SPOT_LIMIT elif exchange == "coinex_spot": # 40/s return COINEX_SPOT_LIMIT elif exchange == "coinex_usdt_swap": # 100/s return COINEX_USDT_SWAP_LIMIT elif exchange == "okex_usdt_swap": # 30/s return OKEX_USDT_SWAP_LIMIT elif exchange == "bitget_usdt_swap": # 10/s return BITGET_USDT_SWAP_LIMIT elif exchange == "bybit_usdt_swap": # 2/s return BYBIT_USDT_SWAP_LIMIT else: print("限频规则未找到") return BASIC_LIMIT def dist_to_weight(price, mp, eff_range=EFF_RANGE): ''' 距离转换为权重 ''' dist = abs(price-mp)/mp weight = 1 - clip(dist/eff_range, 0.0, 0.95) weight = weight if weight > 0 else 0 return weight def change_params(fname, params, changes): # 更改配置 for i in changes: params[i[0]] = i[1] with open(f"{fname}","w") as f: toml.dump(params,f) def show_memory(unit='B', threshold=1024): '''查看变量占用内存情况 :param unit: 显示的单位,可为`B`,`KB`,`MB`,`GB` :param threshold: 仅显示内存数值大于等于threshold的变量 ''' from sys import getsizeof scale = {'B': 1, 'KB': 1024, 'MB': 1048576, 'GB': 1073741824}[unit] msg = '内存占用情况: \n' for i in list(globals().keys()): memory = eval("getsizeof({})".format(i)) // scale if memory >= threshold: msg += f'{i} {memory} {unit}\n' print(msg) return msg def clip(num, _min, _max): if num > _max: num = _max if num < _min: num = _min return num async def ding(msg, at_all, webhook, proxy=None): ''' 发送钉钉消息 ''' header = { "Content-Type": "application/json", "Charset": "UTF-8" } embed = { "title": "策略通知", "description": msg } message = { "content": "大吉大利 今晚吃鸡", "username": "千千喵", "embeds": [ embed ], } message_json = json.dumps(message) if 'win' in sys.platform: proxy = proxy else: proxy = None async with aiohttp.ClientSession() as session: await session.post(url=webhook, data=message_json, headers=header, proxy=proxy, timeout = 10) def _get_params(url, proxy, params): '''更新参数''' import requests try: res = requests.post(url=url, json=params, timeout = 10) return json.loads(res.text) except: traceback.print_exc() return [] async def _post_params(url, proxy, params): '''更新参数''' try: if 'win' in sys.platform: proxy = proxy else: proxy = None async with aiohttp.ClientSession() as session: res = await session.post(url=url, proxy=proxy, data=params, timeout = 10) data = await res.text() print(data) return data except: print(traceback.format_exc()) return "post_params error" return None def get_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip def check_auth(): print("*** 检查使用权限1 ***") ip = get_ip() print(f"当前IP {ip}") white_list = requests.get(f"http://158.247.204.56:7777/ip_list") if ip in white_list: print("当前IP位于白名单中") else: print("@@@ 本版本仅限指定IP白名单运行 @@@") os._exit(0) print("*** 符合要求 ***") def check_time(): print("*** 检查使用权限2 ***") if time.time() > int(time.mktime(time.strptime('2021-11-17 00:00:00', "%Y-%m-%d %H:%M:%S"))): print("@@@ 此版本目前已过试用期 @@@") os._exit(0) print("*** 符合要求 ***") def num_to_str(num, d): if d >= 1.0:return "%d"%num elif d in [0.1, 0.5]:return "%.1f"%num elif d in [0.01, 0.05]:return "%.2f"%num elif d in [0.001, 0.005]:return "%.3f"%num elif d in [0.0001, 0.0005]:return "%.4f"%num elif d in [0.00001, 0.00005]:return "%.5f"%num elif d in [0.000001, 0.000005]:return "%.6f"%num elif d in [0.0000001, 0.0000005]:return "%.7f"%num elif d in [0.00000001, 0.00000005]:return "%.8f"%num elif d in [0.000000001, 0.000000005]:return "%.9f"%num elif d in [0.0000000001, 0.0000000005]:return "%.10f"%num else: return str(num) def num_to_decimal(num): '''根据小数点位数获取精度''' num = str(num) if '.' not in num:return 0 elif '.' == num[-2]:return 1 elif '.' == num[-3]:return 2 elif '.' == num[-4]:return 3 elif '.' == num[-5]:return 4 elif '.' == num[-6]:return 5 elif '.' == num[-7]:return 6 elif '.' == num[-8]:return 7 elif '.' == num[-9]:return 8 elif '.' == num[-10]:return 9 elif '.' == num[-11]:return 10 else:return 11 def fix_amount(amount, stepSize): '''修补数量向下取整''' return float( Decimal(str(amount))//Decimal(str(stepSize) ) \ * Decimal(str(stepSize))) # return float(Decimal(str(amount)).quantize(Decimal(str(stepSize)), ROUND_FLOOR)) def fix_price(price, tickSize): '''修补价格四舍五入''' return float( round(Decimal(str(price))/Decimal(str(tickSize)) ) \ * Decimal(str(tickSize))) # return float(Decimal(str(price)).quantize(Decimal(str(tickSize)), ROUND_HALF_UP)) def timeit(func): def wrapper(*args, **kwargs): nowTime = time.time() res = func(*args, **kwargs) spend_time = time.time() - nowTime spend_time = round(spend_time * 1e6, 3) print(f'{func.__name__} 耗时 {spend_time} us') return res return wrapper def get_backtest_set(base=""): '''生成预设参数''' # 开仓距离不能太近必须超过大部分价格tick运动的距离 open_list = [ 0.0055, 0.0045, 0.0035, 0.0030, 0.0025, 0.0020, 0.0015, ] close_dict = dict() for open in open_list: close_dict[open] = [ open*0.1, open*0.2, ] alpha_list = [0.0] return open_list, close_dict, alpha_list def get_local_ip_list(): '''获取本地ip''' import netifaces as ni ipList = [] # print('检测服务器网络配置') for dev in ni.interfaces(): print('dev:',dev) if 'ens' in dev or 'eth' in dev or 'enp' in dev: # print(ni.ifaddresses(dev)) for i in ni.ifaddresses(dev)[2]: ip=i['addr'] print(f"检测到私有ip:{ip}") if ip not in ipList: ipList.append(ip) print(f"当前服务器私有ip为{ipList}") return ipList if __name__ == "__main__": ######### if 0: print(fix_amount(1.0, 0.1)) print(fix_amount(0.9, 0.05)) print(fix_amount(1.1, 0.1)) print(fix_amount(1.2, 0.5)) print(fix_amount(0.01, 0.05)) if 1: print(fix_price(1.0, 0.1)) print(fix_price(0.9, 2.0)) print(fix_price(1.1, 0.1)) print(fix_price(1.2, 0.5)) print(fix_price(4999.99, 0.5)) ######### if 0: # print(num_to_str(123.123)) print(get_backtest_set()) #################### if 0: p = get_params("config.toml") loop = asyncio.get_event_loop() # loop.create_task(ding("123", 1, "https://discord.com/api/webhooks/907870708481265675/IfN4GqH4fj8HWS_FecH3Lrc2qtRyqsCHsSJVLFHlxY8ioHprfdxIMUNAfqkZZ6opzVEP")) loop.create_task( _post_params( "http://wwww.khods.com:8888/post_params", None, ujson.dumps({ "exchange":"binance_usdt_swap", "pair":"eth_usdt", "open":"0.001", "close":"0.0001", "refexchange":"binance_spot", "refpair":"eth_usdt", "profit":0.1, }) ) ) loop.run_forever() ####################