import aiohttp import time import asyncio import json, ujson import zlib import hashlib import hmac import base64 import traceback import random, csv, sys import logging, logging.handlers from datetime import datetime from urllib.parse import urlparse import urllib from decimal import Decimal import utils import model def empty_call(msg): pass def sort_num(n): if n.isdigit(): return int(n) else: return float(n) class OkexUsdtSwapRest: """""" def __init__(self, params:model.ClientParams, colo=0): if colo: print('不支持colo高速线路') self.REST = 'https://www.okx.com' # hk # REST = 'https://aws.okx.com' # aws else: self.REST = 'https://www.okx.com' # hk # REST = 'https://aws.okx.com' # aws self.params = params self.name = self.params.name self.base = params.pair.split('_')[0].upper() self.quote = params.pair.split('_')[1].upper() self.symbol = f"{self.base}-{self.quote}-SWAP" if len(self.params.pair.split('_')) > 2: self.delivery = self.params.pair.split('_')[2] # 210924 self.symbol += f"-{self.delivery}" self.data = {} self._SESSIONS = dict() self.callback = { "onMarket":empty_call, "onPosition":empty_call, "onOrder":empty_call, "onEquity":empty_call, "onExit":empty_call, } self.exchange_info = dict() self.stepSize = None self.tickSize = None self.ctVal = None self.ctMult = None self.delays = [] self.avg_delay = 0 self.max_delay = 0 self.proxy = None self.broker_id = self.params.broker_id if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.stop_flag = 0 self.coin_value = 0.0 self.cash_value = 0.0 self.getheader = self.make_header() self.postheader = self.make_header() #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='LIMIT'): ''' 下单接口 ''' if symbol not in self.exchange_info: await self.before_trade() # amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))*Decimal(str(self.exchange_info[symbol].stepSize))) # price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize))) amount = utils.fix_amount(amount, self.exchange_info[symbol].stepSize) price = utils.fix_price(price, self.exchange_info[symbol].tickSize) amount = int(amount/self.ctVal) # 这里把币转成张 后续用张来下单 # 似乎有了num_to_str就不再需要下面两行 if origin_side =='kd': side = 'buy' positionSide = 'long' elif origin_side =='pd': side = 'sell' positionSide = 'long' elif origin_side =='kk': side = 'sell' positionSide = 'short' elif origin_side =='pk': side = 'buy' positionSide = 'short' else: raise Exception(f'下单参数错误 side:{origin_side}') if amount <= 0.0: # okex 因为 数量单位为张 很容易出现这个问题 避免频繁写入日志 # self.logger.error(f'下单参数错误 amount:{amount} side:{origin_side}') order_event = dict() order_event['status'] = "REMOVE" order_event['filled_price'] = 0.0 order_event['filled'] = 0.0 order_event['client_id'] = cid self.callback["onOrder"](order_event) return None, None if price <= 0.0: self.logger.error(f'下单参数错误 price:{price} side:{origin_side}') order_event = dict() order_event['status'] = "REMOVE" order_event['filled_price'] = 0.0 order_event['filled'] = 0.0 order_event['client_id'] = cid self.callback["onOrder"](order_event) return None, None params = { 'instId': symbol, 'tdMode': "cross", 'sz': amount, 'side': side, 'posSide': positionSide, 'ordType': "limit" if order_type!="MARKET" else 'market', 'clOrdId': cid, } if params['ordType'] == 'limit': params['px'] = utils.num_to_str(price, self.exchange_info[symbol].tickSize) if self.params.debug == 'True': await asyncio.sleep(0.1) return None, None else: # 再报单 response, error = await self.http_post_request('/api/v5/trade/order', params) # 再更新 if response is not None: if response: data = response['data'][0] order_event = dict() order_event['status'] = "NEW" order_event['client_id'] = params['clOrdId'] order_event['order_id'] = data['ordId'] self.callback["onOrder"](order_event) if error: order_event = dict() order_event['status'] = "REMOVE" order_event['filled_price'] = 0 order_event['filled'] = 0 order_event['client_id'] = params['clOrdId'] self.callback["onOrder"](order_event) return response, error async def cancel_order(self, order_id=None, client_id=None, symbol=None): """注意,ok撤单不能更新订单状态,撤单成功也仅仅代表交易所收到了撤单请求""" params = { "instId": self.symbol if symbol==None else symbol, } if order_id: params["ordId"] = order_id if client_id: params["clOrdId"] = client_id if self.params.debug == 'True': await asyncio.sleep(0.1) return None else: response, error = await self.http_post_request('/api/v5/trade/cancel-order', params) if error: # print("撤单失败",error) # 撤单失败 可能已经撤单 是否发生成交需要rest查 # if client_id:await self.check_order(client_id=client_id) # if order_id:await self.check_order(order_id=order_id) return error return response async def check_order(self, order_id=None, client_id=None, symbol=None): params = { "instId": self.symbol if symbol==None else symbol, } if order_id: params["ordId"] = order_id if client_id: params["clOrdId"] = client_id if self.params.debug == 'True': await asyncio.sleep(0.1) return None else: response, error = await self.http_get_request('/api/v5/trade/order', params) if error: print(f"{self.name} 查单失败 {error}") return error if response['code']: for order in response['data']: if order['state'] in ['canceled', 'filled']: order_event = dict() order_event['status'] = 'REMOVE' order_event['client_id'] = order['clOrdId'] order_event['order_id'] = order['ordId'] order_event['filled'] = float(order['accFillSz'])*self.ctVal if order['accFillSz'] != '' else 0.0 # usdt永续需要考虑每张的单位 order_event['filled_price'] = float(order['avgPx']) if order['avgPx'] != '' else 0.0 self.callback['onOrder'](order_event) else: order_event = dict() order_event['status'] = "NEW" order_event['client_id'] = order['clOrdId'] order_event['order_id'] = order['ordId'] self.callback['onOrder'](order_event) return response async def get_order_list(self): ''' 获取挂单表 ''' response, error = await self.http_get_request('/api/v5/trade/orders-pending', {'instId':self.symbol}) # print(response) orders = [] # 查询当前挂单 只可能出现 new 和 partfill 默认成交为0 只有 done状态的订单才考虑是否有成交 if response and response['code']: for i in response['data']: order_event = dict() order_event['status'] = "NEW" order_event['filled'] = 0 order_event['filled_price'] = 0 order_event['client_id'] = i["clOrdId"] order_event['order_id'] = i['ordId'] self.callback["onOrder"](order_event) orders.append(order_event) if error: print('查询列表出错',error) return orders async def get_server_time(self): response = await self.http_get_request('/api/v5/public/time') return response async def get_equity(self): ########## res, err = await self.get_account() if res: for data in res['data']: for i in data['details']: if self.quote == i['ccy']: self.data['equity'] = float(i['eq']) self.callback['onEquity']({self.quote:self.data['equity']}) if err: print('获取账户信息错误', err) ########## async def universalTransfer(self, _type='UMFUTURE_MAIN', asset='USDT', amount=0): """okex现在统一账户,没有钱包这个概念了,不实现""" pass async def futuresTransfer(self, _type='2', asset='USDT', amount=0): """okex现在统一账户,没有钱包这个概念了,不实现""" pass async def buy_token(self): '''买入平台币''' pass async def check_position(self, hold_coin=0.0): ''' 检查是否存在非运行币种的仓位并take平仓 已支持全品种 ''' try: ### self.logger.info(f'{self.name} 检查遗漏订单') response, error = await self.http_get_request('/api/v5/trade/orders-pending', {}) if response: for order in response['data']: params = { "instId": order['instId'], "ordId": order['ordId'] } res, err = await self.http_post_request('/api/v5/trade/cancel-order', params) await asyncio.sleep(0.1) self.logger.info(f"{self.name} 清理遗漏订单 {res} {err}") ### if self.exchange_info == dict(): await self.before_trade() ### self.logger.info(f'{self.name} 检查遗漏仓位') # 清空全部仓位 response, error = await self.http_get_request('/api/v5/account/positions', {"instType":"SWAP"}) if response: for i in response['data']: symbol = i['instId'] pos = float(i['pos']) posSide = i['posSide'] ### ticker, err = await self.http_get_request('/api/v5/market/ticker', {'instId':symbol}) if err: print(err) if ticker: ap = float(ticker['data'][0]['askPx']) bp = float(ticker['data'][0]['bidPx']) ### 每个品种都要获取各自的精度 trade_side = 'sell' if posSide == 'long' else "buy" trade_pos = abs(pos) trade_pos_side = posSide params = { 'instId': symbol, 'tdMode': "cross", 'sz': int(trade_pos), 'px': ap*1.001 if trade_side == 'buy' else bp*0.999, 'side' : trade_side, 'posSide': trade_pos_side, 'ordType': "limit", } response, error = await self.http_post_request('/api/v5/trade/order', params) print("下单结果",response,error) self.logger.info('遗留仓位检测完毕') except: self.logger.error("清仓程序执行出错") self.logger.error(traceback.format_exc()) return async def before_trade(self): """""" response, error = await self.get_instruments() if error: print('获取市场信息错误',error) else: self.update_instruments(response) print(f'before_trade get_instruments successed. {self.symbol} {self.stepSize} {self.tickSize} {self.ctVal} {self.ctMult}') # 更新账户 res, err = await self.get_account() if err: print(err) else: for data in res['data']: for i in data['details']: if self.quote == i['ccy']: self.data['equity'] = float(i['eq']) self.callback['onEquity']({self.quote:self.data['equity']}) self.cash_value = float(i['eq']) print(f"{self.name} on_go {self.symbol} equity {self.quote} {self.data['equity']}") await self.change_pos_side() await asyncio.sleep(1) await self.set_leverage(10) await asyncio.sleep(1) await self.get_position() async def get_all_position(self): ''' 获取仓位信息 ''' response, error = await self.http_get_request('/api/v5/account/positions', {}) print(f'查看此账号全部仓位 {response} {error}') async def get_position(self): ''' 获取仓位信息 ''' response, error = await self.http_get_request('/api/v5/account/positions', {'instId':self.symbol}) if error: print(f"{self.name} get_position error {error}") return None longPos, shortPos = 0, 0 longAvg, shortAvg = 0, 0 for i in response['data']: if i['instId'] == self.symbol and i['pos'] and i['avgPx']: if i['posSide'] == 'long': longPos = float(i['pos'])*self.ctVal longAvg = float(i['avgPx']) elif i['posSide'] == 'short': shortPos = float(i['pos'])*self.ctVal shortAvg = float(i['avgPx']) position = model.Position() position.longPos = abs(longPos) position.longAvg = abs(longAvg) position.shortPos = abs(shortPos) position.shortAvg = abs(shortAvg) self.callback['onPosition'](position) return position async def get_ticker(self): res, err = await self.http_get_request('/api/v5/market/ticker', {'instId':self.symbol}) if res: ap = float(res['data'][0]['askPx']) bp = float(res['data'][0]['bidPx']) mp = (ap+bp)/2 ticker = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap} return ticker if err: self.logger.debug(err) return None async def get_account(self): response, error = await self.http_get_request('/api/v5/account/balance', {'ccy':self.quote}) return response, error async def go(self): ''' 盘前 获取市场信息 获取账户信息 更改仓位模式(期货) 清空仓位和挂单 盘中 更新账户信息 更新挂单列表 更新仓位信息 更新延迟信息 ''' print('Rest循环器启动') interval = 60 # 不能太快防止占用限频 ### beforeTrade await self.before_trade() await asyncio.sleep(1) ### onTrade loop = 0 while 1: loop += 1 try: # 停机信号 if self.stop_flag: return # 更新账户 res, err = await self.get_account() if err: print(err) else: for data in res['data']: for i in data['details']: if self.quote == i['ccy']: self.data['equity'] = float(i['eq']) self.callback['onEquity']({self.quote:self.data['equity']}) # print(f"{self.name} on_go {self.symbol} equity {self.quote} {self.data['equity']}") # 更新仓位 await self.get_position() # print(f"{self.name} on_go {self.symbol} position {position}") await asyncio.sleep(interval) # 打印延迟 self.get_delay_info() self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms') except asyncio.CancelledError: return except: traceback.print_exc() await asyncio.sleep(30) async def handle_signals(self, orders): ''' 执行策略指令 撤销订单 检查订单 下达订单 ''' try: for order_name in orders: if 'Cancel' in order_name: cid = orders[order_name][0] oid = orders[order_name][1] if cid: asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid)) elif oid: asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid)) for order_name in orders: if 'Check' in order_name: cid = orders[order_name][0] # oid = orders[order_name][1] asyncio.get_event_loop().create_task(self.check_order(client_id=cid)) for order_name in orders: if 'Limits' in order_name: for i in orders[order_name]: asyncio.get_event_loop().create_task(self.take_order( self.symbol, i[0], i[1], i[2], i[3] )) except Exception as e: traceback.print_exc() self.logger.error("执行信号出错"+str(e)) await asyncio.sleep(0.1) async def set_leverage(self, lever=10): params = {'instId':self.symbol, 'lever':utils.num_to_str(lever, 1), 'mgnMode':'cross'} res, error = await self.http_post_request('/api/v5/account/set-leverage', params) if error: print(f"{self.name} 设置杠杆倍数 {params} failed. -->{error}") return None, error if res['code'] == '0': print(f"{self.name} 设置杠杆倍数 {params} success. -->{res}") else: print(f"{self.name} 设置杠杆倍数 {params} 暂时不能设置. -->{res}") return res, error async def change_pos_side(self, dual='true'): '''''' params = {'posMode': "long_short_mode"} res, error = await self.http_post_request('/api/v5/account/set-position-mode', params) if error: print(f"{self.name} 设置双向持仓 failed. -->{error}") return res, error if res['code'] == '0': print(f"{self.name} 设置双向持仓 success. -->{res}") else: print(f"{self.name} 设置双向持仓 暂时不能设置. -->{res}") return res, error async def get_instruments(self): """从rest获取合约信息""" params = {'instType': 'SWAP'} res, error = await self.http_get_request('/api/v5/public/instruments', params) return res, error def update_instruments(self, data): """根据信息调整合约信息""" for info in data['data']: if info['instId'] == self.symbol: self.ctVal = sort_num(info['ctVal']) self.ctMult = sort_num(info['ctMult']) self.tickSize = sort_num(info['tickSz']) self.stepSize = sort_num(info['minSz'])*self.ctVal #### 保存交易规则信息 exchange_info = model.ExchangeInfo() exchange_info.symbol = info['instId'] exchange_info.multiplier = sort_num(info['ctMult']) exchange_info.tickSize = sort_num(info['tickSz']) exchange_info.stepSize = sort_num(info['minSz'])*sort_num(info['ctMult']) self.exchange_info[exchange_info.symbol] = exchange_info def __sign(self, timestamp, method, path, data): if data: message = timestamp + method + path + data else: message = timestamp + method + path digest = hmac.new(bytes(self.params.secret_key.encode('utf8')), bytes(message.encode('utf8')), digestmod=hashlib.sha256).digest() return base64.b64encode(digest).decode('utf-8') def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger async def http_post_request(self, method, query=None, **args): """""" params = dict() params.update(**args) if query is not None: params.update(query) data = json.dumps(params) timestamp = self.timestamp() sign = self.__sign(timestamp, 'POST', method, data) self.postheader['OK-ACCESS-SIGN'] = sign self.postheader['OK-ACCESS-TIMESTAMP'] = timestamp url = f"{self.REST}{method}" res, error = await self._request('POST', url, data) return res, error async def http_get_request(self, method, query=None, **args): """""" params = dict() params.update(**args) if query is not None: params.update(query) timestamp = self.timestamp() if params: path = '{method}?{params}'.format(method=method, params=urllib.parse.urlencode(params)) else: path = method sign = self.__sign(timestamp, 'GET', path, None) self.getheader['OK-ACCESS-SIGN'] = sign self.getheader['OK-ACCESS-TIMESTAMP'] = timestamp url = f"{self.REST}{path}" res, error = await self._request('GET', url) return res, error async def _request(self, method, url, params=None): """""" try: ###### msg = f"rest请求记录 {method} {url} {params}" self.logger.debug(msg) ###### session = self._get_session(url) start_time = time.time() if method == 'GET': headers = self.getheader response = await session.get( url, headers=headers, timeout=10, proxy=self.proxy ) elif method == 'POST': headers = self.postheader response = await session.post( url, data=params, headers=headers, timeout=10, proxy=self.proxy ) code = response.status res = await response.json() msg = f"rest请求记录 {method} {url} {headers} {params}" res_msg = msg + f' {res}' self.logger.debug(res_msg) if code not in (200, 201, 202, 203, 204, 205, 206): self.logger.error(f'METHOD:{method} URL:{url} PARAMS:{params} ERROR:{res}') return None, str(res) if 'code' in res: if int(res['code']) not in (0,): if '51401' in str(res): pass else: self.logger.error(f'METHOD:{method} URL:{url} PARAMS:{params} ERROR:{res}') return None, str(res) delay = int(1000*(time.time() - start_time)) self.delays.append(delay) return res, None except Exception as e: print('网络请求错误') print(f'URL:{url} PARAMS:{params} ERROR:{e}') self.logger.error(e) self.logger.error(traceback.format_exc()) return None, str(e) async def get_history(self): params = { 'instType': "SWAP", 'limit':"100" } res, error = await self.http_get_request('/api/v5/trade/fills', params) b_id=res['data'][0]['billId'] ### data = [] while 1: params = { 'instType': "SWAP", 'limit':"100", 'after':b_id } await asyncio.sleep(0.3) res, _ = await self.http_get_request('/api/v5/trade/fills', params) if res: if len(res['data']) == 0: break for i in res['data']: data.append(i) b_id = res['data'][-1]['billId'] # b_id_s = [] # for i in data: # if i['billId'] in b_id_s: # print(i['billId']) # b_id_s.append(i['billId']) with open("data5.json", 'w+') as f: f.write(json.dumps(data)) def get_delay_info(self): if len(self.delays) > 100: self.delays = self.delays[-100:] if max(self.delays) > self.max_delay:self.max_delay = max(self.delays) self.avg_delay = round(sum(self.delays)/len(self.delays),1) def timestamp(self): return datetime.utcnow().isoformat("T")[:-3] + 'Z' def login_params(self): """生成login字符""" timestamp = str(time.time()) message = timestamp + 'GET' + '/users/self/verify' mac = hmac.new(bytes(self.params.secret_key.encode('utf8')), bytes(message.encode('utf8')), digestmod=hashlib.sha256).digest() sign = base64.b64encode(mac) login_dict = {} login_dict['apiKey'] = self.params.access_key login_dict['passphrase'] = self.params.pass_key login_dict['timestamp'] = timestamp login_dict['sign'] = sign.decode('utf-8') login_param = {'op': 'login', 'args': [login_dict]} login_str = ujson.dumps(login_param) return login_str def make_header(self): """""" headers = {} headers['Content-Type'] = 'application/json' headers['OK-ACCESS-KEY'] = self.params.access_key headers['OK-ACCESS-SIGN'] = None headers['OK-ACCESS-TIMESTAMP'] = None headers['OK-ACCESS-PASSPHRASE'] = self.params.pass_key return headers def _get_session(self, url): parsed_url = urlparse(url) key = parsed_url.netloc or parsed_url.hostname if key not in self._SESSIONS: tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0)) session = aiohttp.ClientSession(connector=tcp) self._SESSIONS[key] = session return self._SESSIONS[key]