| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520 |
- import json
- import traceback
- import utils
- import model
- import toml, time, random
- import os, sys, asyncio, aiohttp
- import socket
- import asyncio
- import requests
- import ujson
- from decimal import Decimal
- from decimal import ROUND_HALF_UP, ROUND_FLOOR
- import gzip
- import csv
- import os
- import base64
- from Crypto.Cipher import AES
- from Crypto import Random
- import os
- import base64
- import json
- parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.insert(0,parentdir)
- ############### 全局配置
- VERSION = "2022-04-18"
- CHILD_RUN_SECOND = 60 * 60 * 24 # child process max run time per loop
- EARLY_STOP_SECOND = 60 * 60 * 2 # child early stop min check time
- BACKTEST_PREHOT_SECOND = 60 * 30 # backtest pre hot time
- DUMMY_RUN_SECOND = 60 * 60 * 12 # dummy process max run time per loop
- DUMMY_EARLY_STOP_SECOND = 60 * 60 # dummy process max run time per loop
- POST_SIDE_LIMIT = [0] # post side limit
- MARKET_DELAY_LIMIT = 30000 # market update delay limit threhold unit:ms
- GRID = 1
- STOPLOSS = 0.02
- GAMMA = 0.999
- ###### market行情数据长度 标准化n档深度+6档成交信息 ######
- LEVEL = 1
- TRADE_LEN = 2 # 最高 最低 成交价
- LEN = LEVEL * 4 + TRADE_LEN # 总长度
- BP_INDEX = LEVEL * 0
- BQ_INDEX = LEVEL * 0 + 1
- AP_INDEX = LEVEL * 2
- AQ_INDEX = LEVEL * 2 + 1
- MAX_FILL_INDEX = LEVEL * 4 + 0
- MIN_FILL_INDEX = LEVEL * 4 + 1
- # BUY_Q_INDEX = LEVEL * 4 + 2
- # BUY_V_INDEX = LEVEL * 4 + 3
- # SELL_Q_INDEX = LEVEL * 4 + 4
- # SELL_V_INDEX = LEVEL * 4 + 5
- #### depth/trade effient range #####
- EFF_RANGE = 0.001
- ### init backtest delay ###
- BACKTEST_DELAY = 0.15
- global base_cid
- base_cid = 0
- def get_cid(broker=None):
- global base_cid
- base_cid += 1
- if base_cid > 999:
- base_cid=0
- cid = str(time.time())[4:10]+str(random.randint(1,999))+str(base_cid)
- if broker:
- cid = broker + cid
- return cid
- def csv_to_gz_and_remove():
- def List_files(filepath, substr):
- X = []
- Y = []
- for path, subdirs, files in sorted(os.walk(filepath), reverse=True):
- for name in files:
- X.append(os.path.join(path, name))
- Y = [line for line in X if substr in line]
- return Y
- for file in List_files('./', '.csv'):
- if '.gz' not in file:
- data = open(file, 'rb' ).read()
- with gzip.open(file + '.gz', 'a') as zip:
- zip.write(data)
- zip.close()
- os.remove(file)
- def get_params(fname):
- # 读取配置
- try:
- params = toml.load(fname)
- except:
- f = open(fname)
- data = f.read()
- text = base64.b64decode(data)
- cryptor = AES.new(key =bytes("qFHFPv6MugrSTkEsWFs8wCDg3iC6!er%".encode()), mode=AES.MODE_ECB)
- plain_text = cryptor.decrypt(text)
- paddingLen = plain_text[len(plain_text)-1]
- msg = plain_text[0:-paddingLen]
- msg = msg.decode()
- params = toml.loads(msg)
- p = model.Config()
- # 账号昵称
- p.account_name = params['account_name'] if 'account_name' in params else 'Unknown Account'
- # api
- p.access_key = params['access_key'].replace(" ", "") if 'access_key' in params else '***'
- p.secret_key = params['secret_key'].replace(" ", "") if 'secret_key' in params else '***'
- p.pass_key = params['pass_key'].replace(" ", "") if 'pass_key' in params else 'qwer1234'
- # 经纪商id
- broker_id_from_config = params['broker_id'] if 'broker_id' in params else ""
- p.broker_id = get_broker_id( broker_id_from_config, params['exchange'])
- # 交易盘口
- p.exchange = params['exchange'] if 'exchange' in params else ""
- # 交易品种
- p.pair = params['pair'] if 'pair' in params else ""
- # 调试模式开关
- p.debug = params['debug'] if 'debug' in params else "False"
- # 开仓
- p.open = params['open'] if 'open' in params else "0.002"
- # 平仓
- p.close = params['close'] if 'close' in params else "0.0002"
- # 监听端口
- p.server_port = params['server_port'] if 'server_port' in params else 6000
- # 杠杆大小
- p.leverrate = float(params['leverrate']) if 'leverrate' in params else 1.0
- # 参考盘口
- p.refexchange = params['refexchange'].replace('[','').replace(']','').replace("'",'').replace(" ", "").split(',') if "refexchange" in params else ""
- # 参考品种
- p.refpair = params['refpair'].replace('[','').replace(']','').replace("'",'').replace(" ", "").split(',') if "refpair" in params else ""
- # 网络代理
- p.proxy = params['proxy'] if 'proxy' in params else None # 仅在win下有效
- # 账户资金使用比例
- p.used_pct = params['used_pct'] if 'used_pct' in params else "0.9"
- # discord播报地址
- p.webhook = params['webhook'] if 'webhook' in params else "https://discord.com/api/webhooks/907870708481265675/IfN4GqH4fj8HWS_FecH3Lrc2qtRyqsCHsSJVLFHlxY8ioHprfdxIMUNAfqkZZ6opzVEP"
- # 默认第n参考盘口
- p.index = params['index'] if 'index' in params else 0
- # 止损比例 0.02 = 2%
- p.stoploss = params['stoploss'] if 'stoploss' in params else STOPLOSS
- # 平滑系数 默认0.999
- p.gamma = params['gamma'] if 'gamma' in params else GAMMA
- # 分批建仓功能 小资金建议1 大资金建议3
- p.grid = params['grid'] if 'grid' in params else GRID
- # 实时调参开关 会有巨大性能损耗
- p.backtest = params['backtest'] if 'backtest' in params else 1
- # 保存实时行情 会有巨大性能损耗
- p.save = params['save'] if 'save' in params else 0
- p.place_order_limit = params['place_order_limit'] if 'place_order_limit' in params else 0 # 允许的每秒下单次数
- # 是否启用colocation技术
- p.colo = params['colo'] if 'colo' in params else 0
- # 是否启用fast行情 会增加性能开销
- p.fast = params['fast'] if 'fast' in params else 1
- # 选择指定的私有ip进行网络通信 默认0 用于多网卡多ip的实例
- p.ip = params['ip'] if 'ip' in params else 0
- # 合约不允许holdcoin持有底仓币
- if "spot" in p.exchange:
- p.hold_coin = params['hold_coin'] if 'hold_coin' in params else 0.0
- else:
- p.hold_coin = 0.0
- # 是否开启日志记录 会有一定性能损耗
- p.log = params['log'] if 'log' in params else 1
- #### 特殊情况处理
- if p.exchange == 'binance_usdt_swap':
- if p.pair in ['shib_usdt', 'xec_usdt', 'bttc_usdt']:
- p.pair = "1000" + p.pair
- ref_num = len(p.refexchange)
- for i in range(ref_num):
- if p.refexchange[i] == 'binance_usdt_swap':
- if p.refpair[i] in ['shib_usdt', 'xec_usdt', 'bttc_usdt']:
- p.refpair[i] = "1000" + p.refpair[i]
- ####
- return p
- def get_broker_id(broker_id , exchange_name):
- '''处理brokerid特殊情况'''
- if 'binance' in exchange_name:
- return broker_id
- elif 'gate' in exchange_name:
- return "t-"
- else:
- return ""
- # 报单频率限制等级
- BASIC_LIMIT = 100
- GATE_SPOT_LIMIT = 10.0
- GATE_USDT_SWAP_LIMIT = 100.0
- KUCOIN_SPOT_LIMIT = 15.0
- KUCOIN_USDT_SWAP_LIMIT = 10.0
- BINANCE_USDT_SWAP_LIMIT = 5.0
- BINANCE_SPOT_LIMIT = 2.0
- COINEX_SPOT_LIMIT = 40.0
- COINEX_USDT_SWAP_LIMIT = 100.0
- OKEX_USDT_SWAP_LIMIT= 30.0
- BITGET_USDT_SWAP_LIMIT = 10.0
- BYBIT_USDT_SWAP_LIMIT = 1.0
- RATIO = 4.0
- def get_limit_requests_num_per_second(exchange, limit=0):
- '''每秒请求频率'''
- if limit != 0:
- return limit*RATIO
- elif exchange == "gate_spot":
- return GATE_SPOT_LIMIT*RATIO
- elif exchange == "gate_usdt_swap": # 100/s
- return GATE_USDT_SWAP_LIMIT*RATIO
- elif exchange == "kucoin_spot": # 15/s
- return KUCOIN_SPOT_LIMIT*RATIO
- elif exchange == "kucoin_usdt_swap":
- return KUCOIN_USDT_SWAP_LIMIT*RATIO
- elif exchange == "binance_usdt_swap":
- return BINANCE_USDT_SWAP_LIMIT*RATIO
- elif exchange == "binance_spot":
- return BINANCE_SPOT_LIMIT*RATIO
- elif exchange == "coinex_spot":
- return COINEX_SPOT_LIMIT*RATIO
- elif exchange == "coinex_usdt_swap":
- return COINEX_USDT_SWAP_LIMIT*RATIO
- elif exchange == "okex_usdt_swap":
- return OKEX_USDT_SWAP_LIMIT*RATIO
- elif exchange == "bitget_usdt_swap":
- return BITGET_USDT_SWAP_LIMIT*RATIO
- elif exchange == "bybit_usdt_swap":
- return BYBIT_USDT_SWAP_LIMIT*RATIO
- else:
- print("限频规则未找到")
- return BASIC_LIMIT*RATIO
- def get_limit_order_requests_num_per_second(exchange, limit=0):
- '''每秒下单请求频率'''
- if limit != 0:
- return limit
- elif exchange == "gate_spot": # 10/s
- return GATE_SPOT_LIMIT
- elif exchange == "gate_usdt_swap": # 100/s
- return GATE_USDT_SWAP_LIMIT
- elif exchange == "kucoin_spot": # 15/s
- return KUCOIN_SPOT_LIMIT
- elif exchange == "kucoin_usdt_swap": # 10/s
- return KUCOIN_USDT_SWAP_LIMIT
- elif exchange == "binance_usdt_swap": # 5/s
- return BINANCE_USDT_SWAP_LIMIT
- elif exchange == "binance_spot": # 2/s
- return BINANCE_SPOT_LIMIT
- elif exchange == "coinex_spot": # 40/s
- return COINEX_SPOT_LIMIT
- elif exchange == "coinex_usdt_swap": # 100/s
- return COINEX_USDT_SWAP_LIMIT
- elif exchange == "okex_usdt_swap": # 30/s
- return OKEX_USDT_SWAP_LIMIT
- elif exchange == "bitget_usdt_swap": # 10/s
- return BITGET_USDT_SWAP_LIMIT
- elif exchange == "bybit_usdt_swap": # 2/s
- return BYBIT_USDT_SWAP_LIMIT
- else:
- print("限频规则未找到")
- return BASIC_LIMIT
- def dist_to_weight(price, mp, eff_range=EFF_RANGE):
- '''
- 距离转换为权重
- '''
- dist = abs(price-mp)/mp
- weight = 1 - clip(dist/eff_range, 0.0, 0.95)
- weight = weight if weight > 0 else 0
- return weight
- def change_params(fname, params, changes):
- # 更改配置
- for i in changes:
- params[i[0]] = i[1]
- with open(f"{fname}","w") as f:
- toml.dump(params,f)
- def show_memory(unit='B', threshold=1024):
- '''查看变量占用内存情况
- :param unit: 显示的单位,可为`B`,`KB`,`MB`,`GB`
- :param threshold: 仅显示内存数值大于等于threshold的变量
- '''
- from sys import getsizeof
- scale = {'B': 1, 'KB': 1024, 'MB': 1048576, 'GB': 1073741824}[unit]
- msg = '内存占用情况: \n'
- for i in list(globals().keys()):
- memory = eval("getsizeof({})".format(i)) // scale
- if memory >= threshold:
- msg += f'{i} {memory} {unit}\n'
- print(msg)
- return msg
- def clip(num, _min, _max):
- if num > _max: num = _max
- if num < _min: num = _min
- return num
- async def ding(msg, at_all, webhook, proxy=None):
- '''
- 发送钉钉消息
- '''
- header = {
- "Content-Type": "application/json",
- "Charset": "UTF-8"
- }
- embed = {
- "title": "策略通知",
- "description": msg
- }
- message = {
- "content": "大吉大利 今晚吃鸡",
- "username": "千千喵",
- "embeds": [
- embed
- ],
- }
- message_json = json.dumps(message)
- if 'win' in sys.platform:
- proxy = proxy
- else:
- proxy = None
- async with aiohttp.ClientSession() as session:
- await session.post(url=webhook, data=message_json, headers=header, proxy=proxy, timeout = 10)
- def _get_params(url, proxy, params):
- '''更新参数'''
- import requests
- try:
- res = requests.post(url=url, json=params, timeout = 10)
- return json.loads(res.text)
- except:
- traceback.print_exc()
- return []
- async def _post_params(url, proxy, params):
- '''更新参数'''
- try:
- if 'win' in sys.platform:
- proxy = proxy
- else:
- proxy = None
- async with aiohttp.ClientSession() as session:
- res = await session.post(url=url, proxy=proxy, data=params, timeout = 10)
- data = await res.text()
- print(data)
- return data
- except:
- print(traceback.format_exc())
- return "post_params error"
- return None
- def get_ip():
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('8.8.8.8', 80))
- ip = s.getsockname()[0]
- finally:
- s.close()
- return ip
- def check_auth():
- print("*** 检查使用权限1 ***")
- ip = get_ip()
- print(f"当前IP {ip}")
- white_list = requests.get(f"http://158.247.204.56:7777/ip_list")
- if ip in white_list:
- print("当前IP位于白名单中")
- else:
- print("@@@ 本版本仅限指定IP白名单运行 @@@")
- os._exit(0)
- print("*** 符合要求 ***")
- def check_time():
- print("*** 检查使用权限2 ***")
- if time.time() > int(time.mktime(time.strptime('2021-11-17 00:00:00', "%Y-%m-%d %H:%M:%S"))):
- print("@@@ 此版本目前已过试用期 @@@")
- os._exit(0)
- print("*** 符合要求 ***")
- def num_to_str(num, d):
- if d >= 1.0:return "%d"%num
- elif d in [0.1, 0.5]:return "%.1f"%num
- elif d in [0.01, 0.05]:return "%.2f"%num
- elif d in [0.001, 0.005]:return "%.3f"%num
- elif d in [0.0001, 0.0005]:return "%.4f"%num
- elif d in [0.00001, 0.00005]:return "%.5f"%num
- elif d in [0.000001, 0.000005]:return "%.6f"%num
- elif d in [0.0000001, 0.0000005]:return "%.7f"%num
- elif d in [0.00000001, 0.00000005]:return "%.8f"%num
- elif d in [0.000000001, 0.000000005]:return "%.9f"%num
- elif d in [0.0000000001, 0.0000000005]:return "%.10f"%num
- else: return str(num)
- def num_to_decimal(num):
- '''根据小数点位数获取精度'''
- num = str(num)
- if '.' not in num:return 0
- elif '.' == num[-2]:return 1
- elif '.' == num[-3]:return 2
- elif '.' == num[-4]:return 3
- elif '.' == num[-5]:return 4
- elif '.' == num[-6]:return 5
- elif '.' == num[-7]:return 6
- elif '.' == num[-8]:return 7
- elif '.' == num[-9]:return 8
- elif '.' == num[-10]:return 9
- elif '.' == num[-11]:return 10
- else:return 11
- def fix_amount(amount, stepSize):
- '''修补数量向下取整'''
- return float(
- Decimal(str(amount))//Decimal(str(stepSize)
- ) \
- * Decimal(str(stepSize)))
- # return float(Decimal(str(amount)).quantize(Decimal(str(stepSize)), ROUND_FLOOR))
- def fix_price(price, tickSize):
- '''修补价格四舍五入'''
- return float(
- round(Decimal(str(price))/Decimal(str(tickSize))
- ) \
- * Decimal(str(tickSize)))
- # return float(Decimal(str(price)).quantize(Decimal(str(tickSize)), ROUND_HALF_UP))
- def timeit(func):
- def wrapper(*args, **kwargs):
- nowTime = time.time()
- res = func(*args, **kwargs)
- spend_time = time.time() - nowTime
- spend_time = round(spend_time * 1e6, 3)
- print(f'{func.__name__} 耗时 {spend_time} us')
- return res
- return wrapper
- def get_backtest_set(base=""):
- '''生成预设参数'''
- # 开仓距离不能太近必须超过大部分价格tick运动的距离
- open_list = [
- 0.0055,
- 0.0045,
- 0.0035,
- 0.0030,
- 0.0025,
- 0.0020,
- 0.0015,
- ]
- close_dict = dict()
- for open in open_list:
- close_dict[open] = [
- open*0.1,
- open*0.2,
- ]
- alpha_list = [0.0]
- return open_list, close_dict, alpha_list
- def get_local_ip_list():
- '''获取本地ip'''
- import netifaces as ni
- ipList = []
- # print('检测服务器网络配置')
- for dev in ni.interfaces():
- print('dev:',dev)
- if 'ens' in dev or 'eth' in dev or 'enp' in dev:
- # print(ni.ifaddresses(dev))
- for i in ni.ifaddresses(dev)[2]:
- ip=i['addr']
- print(f"检测到私有ip:{ip}")
- if ip not in ipList:
- ipList.append(ip)
- print(f"当前服务器私有ip为{ipList}")
- return ipList
-
- if __name__ == "__main__":
- #########
- if 0:
- print(fix_amount(1.0, 0.1))
- print(fix_amount(0.9, 0.05))
- print(fix_amount(1.1, 0.1))
- print(fix_amount(1.2, 0.5))
- print(fix_amount(0.01, 0.05))
- if 1:
- print(fix_price(1.0, 0.1))
- print(fix_price(0.9, 2.0))
- print(fix_price(1.1, 0.1))
- print(fix_price(1.2, 0.5))
- print(fix_price(4999.99, 0.5))
- #########
- if 0:
- # print(num_to_str(123.123))
- print(get_backtest_set())
- ####################
- if 0:
- p = get_params("config.toml")
- loop = asyncio.get_event_loop()
-
- # loop.create_task(ding("123", 1, "https://discord.com/api/webhooks/907870708481265675/IfN4GqH4fj8HWS_FecH3Lrc2qtRyqsCHsSJVLFHlxY8ioHprfdxIMUNAfqkZZ6opzVEP"))
-
- loop.create_task(
- _post_params(
- "http://wwww.khods.com:8888/post_params",
- None,
- ujson.dumps({
- "exchange":"binance_usdt_swap",
- "pair":"eth_usdt",
- "open":"0.001",
- "close":"0.0001",
- "refexchange":"binance_spot",
- "refpair":"eth_usdt",
- "profit":0.1,
- })
- )
- )
- loop.run_forever()
-
- ####################
|