import random import aiohttp import time import asyncio import zlib import json, ujson import hmac import base64 import hashlib import traceback import urllib from urllib import parse from urllib.parse import urljoin import datetime, sys, utils from urllib.parse import urlparse import logging, logging.handlers import model from decimal import Decimal def empty_call(msg): print(f'空的回调函数 {msg}') class GateUsdtSwapRest: def __init__(self, params:model.ClientParams, colo=0): if colo: print('使用colo高速线路') self.HOST = 'https://apiv4-private.gateapi.io' else: self.HOST = 'https://api.gateio.ws' self.params = params self.name = self.params.name self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + '_' + self.quote self._SESSIONS = dict() self.callback = { "onMarket":empty_call, "onPosition":empty_call, "onOrder":empty_call, "onEquity":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onExit":empty_call, } self.exchange_info = dict() self.tickSize = None self.stepSize = None self.delays = [] self.max_delay = 0.0 self.avg_delay = 0.0 self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.stop_flag = 0 self.coin_value = 0.0 self.cash_value = 0.0 self.multiplier = None #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def _get_session(self, url): parsed_url = urlparse(url) key = parsed_url.netloc or parsed_url.hostname if key not in self._SESSIONS: tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0)) session = aiohttp.ClientSession(connector=tcp) self._SESSIONS[key] = session return self._SESSIONS[key] def generate_signature(self, method, uri, query_param=None, body=None): t = time.time() m = hashlib.sha512() m.update((body or "").encode('utf-8')) hashed_payload = m.hexdigest() s = '%s\n%s\n%s\n%s\n%s' % (method, uri, query_param or "", hashed_payload, t) sign = hmac.new(self.params.secret_key.encode('utf-8'), s.encode('utf-8'), hashlib.sha512).hexdigest() return {'KEY': self.params.access_key, 'Timestamp': str(t), 'SIGN': sign} async def _request(self, method, uri, body=None, params=None, auth=False): url = urljoin(self.HOST, uri) if method == "GET": headers = { "Content-type": "application/x-www-form-urlencoded", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/39.0.2171.71 Safari/537.36" } else: headers = { "Accept": "application/json", "Content-type": "application/json" } if auth: if method == "POST": query_param = '' if params: for i in params: query_param += f'{i}={params[i]}&' query_param = query_param[:-1] url += "?"+ query_param if body: body = ujson.dumps(body) sign_headers = self.generate_signature(method, uri, query_param, body) headers.update(sign_headers) if method == "GET" or method == "DELETE": query_param = '' for i in params: query_param += f'{i}={params[i]}&' query_param = query_param[:-1] sign_headers = self.generate_signature(method, uri, query_param) headers.update(sign_headers) # 发起请求 session = self._get_session(url) timeout = aiohttp.ClientTimeout(10) msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body) self.logger.debug(msg) try: start_time = time.time() if method == "GET": response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy) elif method == "POST": response = await session.post(url, params=None, data=body, headers=headers, timeout=timeout, proxy=self.proxy) elif method == "DELETE": response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy) code = response.status res = await response.json() delay = int(1000*(time.time() - start_time)) self.delays.append(delay) self.get_delay_info() res_msg = msg + f' 回报 {res}' self.logger.debug(res_msg) if code not in (200, 201, 202, 203, 204, 205, 206): print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}') return None, res return res, None except Exception as e: print(f'{self.name} 请求出错', e) self.logger.error('请求错误'+str(e)) self.logger.error(traceback.format_exc()) return None, e async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='limit'): ''' 传入单位是张 内部转换为币 ''' if origin_side =='kd': side = 'buy' reduce_only = False elif origin_side =='pd': side = 'sell' reduce_only = True elif origin_side =='kk': side = 'sell' reduce_only = False elif origin_side =='pk': side = 'buy' reduce_only = True else: return None amount = int(amount/self.exchange_info[symbol].multiplier) # 币转换为张 if amount <= 0.0 or price <= 0.0: self.logger.error(f"下单参数错误 amount:{amount} price:{price}") order_event = dict() order_event['status'] = "REMOVE" order_event['client_id'] = cid order_event['filled_price'] = 0.0 order_event['filled'] = 0.0 order_event['fee'] = 0.0 self.callback["onOrder"](order_event) if side == 'sell': amount = -amount params = { 'text':cid, 'contract': symbol, 'size': amount, 'reduce_only':reduce_only, 'price': utils.num_to_str(price, self.exchange_info[symbol].tickSize), 'type':order_type, } # logger.info(f'下单指令 {params}') if self.params.debug == 'True': return await asyncio.sleep(0.1) else: # 发单 response, error = await self._request('POST', '/api/v4/futures/usdt/orders', body=params, auth=1) if response: # 增加新的 order_event = dict() order_event['status'] = "NEW" order_event['client_id'] = response["text"] order_event['order_id'] = response["id"] self.callback["onOrder"](order_event) if error: order_event = dict() order_event['status'] = "REMOVE" order_event['client_id'] = params["text"] order_event['filled_price'] = 0.0 order_event['filled'] = 0.0 order_event['fee'] = 0.0 self.callback["onOrder"](order_event) return error return response async def check_order(self, order_id=None, client_id=None): params = {} if order_id: response, error = await self._request('GET', f'/api/v4/futures/usdt/orders/{order_id}', params=params, auth=1) elif client_id: response, error = await self._request('GET', f'/api/v4/futures/usdt/orders/{client_id}', params=params, auth=1) if response: if response['status'] in ['cancelled','closed','finished']: # 已撤销 或 全部成交 order_event = dict() order_event['client_id'] = response["text"] order_event['order_id'] = response['id'] order_event['filled'] = (abs(float(response["size"])) - abs(float(response["left"])))*self.multiplier order_event['filled_price'] = float(response["price"]) order_event['fee'] = 0.0 order_event['status'] = "REMOVE" self.callback['onOrder'](order_event) elif response['status'] in ['open']: # 还在挂单中 order_event = dict() order_event['client_id'] = response["text"] order_event['order_id'] = response['id'] order_event['status'] = "NEW" self.callback['onOrder'](order_event) if error: pass return response async def cancel_order(self, order_id=None, client_id=None): params = { "currency_pair": self.symbol } if order_id: response, error = await self._request('DELETE', f'/api/v4/futures/usdt/orders/{order_id}', params=params, auth=1) elif client_id: response, error = await self._request('DELETE', f'/api/v4/futures/usdt/orders/{client_id}', params=params, auth=1) if response: pass # self.logger.info(f'撤单回报 {response}') # if response['status'] == 'cancelled': # 已撤销 # order_event = dict() # order_event['price'] = float(response["price"]) # order_event['amount'] = float(response["amount"]) # order_event['client_id'] = response["text"] # order_event['order_id'] = response['id'] # order_event['filled'] = float(response["amount"]) - float(response["left"]) # order_event['filled_price'] = float(response["price"]) # order_event['fee'] = float(response["fee"]) # order_event['status'] = "REMOVE" # self.callback['onOrder'](order_event) if error: return error return response async def get_order_list(self): params = { 'currency_pair':self.symbol, 'status':"open", } response, error = await self._request('GET', '/api/v4/futures/usdt/orders', params=params, auth=1) orders = [] # 重置本地订单列表 if response is not None: for i in response: if i['side'] == 'buy': side = 'kd' elif i['side'] == 'sell': side = 'pd' else: raise Exception(f"{self.name} wrong side") order_event = dict() order_event['price'] = float(i["price"]) order_event['amount'] = float(i["size"])*self.multiplier order_event['client_id'] = i["text"] order_event['order_id'] = i['id'] order_event['status'] = "NEW" self.callback['onOrder'](order_event) return response async def get_server_time(self): params = {} response = await self._request('GET', '/api/v1/timestamp', params=params) return response async def get_account(self): return await self._request('GET','/api/v4/futures/usdt/accounts', params={}, auth=1) async def get_position(self): '''获取持仓 symbol: BTC-USDT''' return await self._request('GET', f'/api/v4/futures/usdt/dual_comp/positions/{self.symbol}', params={}, auth=1) async def get_market_details(self): return await self._request('GET',f'/api/v4/futures/usdt/contracts', params={}, auth=1) async def get_ticker(self): res, err = await self._request('GET',f'/api/v4/futures/usdt/tickers', params={"contract":self.symbol}, auth=1) if res: ap = float(res[0]["last"]) bp = float(res[0]["last"]) mp = (ap+bp)*0.5 d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap} self.callback['onTicker'](d) return d if err: self.logger.error(err) return None async def before_trade(self): ### 获取市场信息 res, err = await self.get_market_details() if err: pass if res: for i in res: if self.symbol == i['name']: self.tickSize = float(i['order_price_round']) self.multiplier = float(i['quanto_multiplier']) self.stepSize = float(i['order_size_min'])*float(i['quanto_multiplier']) # 张 转换为 币 #### 保存交易规则信息 exchange_info = model.ExchangeInfo() exchange_info.symbol = i['name'] exchange_info.multiplier = float(i['quanto_multiplier']) exchange_info.tickSize = float(i['order_price_round']) exchange_info.stepSize = float(i['order_size_min'])*float(i['quanto_multiplier']) self.exchange_info[exchange_info.symbol] = exchange_info ### 获取持仓模式 res, err = await self._request('POST', f'/api/v4/futures/usdt/dual_mode', params={'dual_mode':"true"}, auth=1) if err: print(err) if res: print(res) ### 杠杆 res, err = await self._request('POST', f'/api/v4/futures/usdt/dual_comp/positions/{self.symbol}/leverage', params={'leverage':"20"}, auth=1) if err: print(err) if res: print(res) async def get_equity(self): # 更新账户 res, err = await self.get_account() if err:print(err) if res: if res['currency'] == self.quote: cash = float(res['total']) self.callback['onEquity']({ self.quote:cash }) self.cash_value = cash async def buy_token(self): '''买入平台币''' pass async def check_position(self, hold_coin=0.0): ''' 清空挂单清空仓位 已支持全品种 ''' try: ############################# self.logger.info("清空挂单") params = { 'contract':self.symbol } response, error = await self._request('DELETE', '/api/v4/futures/usdt/orders', params=params, auth=1) if response: self.logger.info(response) if error: self.logger.info(error) ############################# ############################# self.logger.info("检查遗漏仓位") res, err = await self._request('GET', f'/api/v4/futures/usdt/positions', params={}, auth=1) if err: self.logger.info(err) if res: for i in res: symbol = i['contract'] if symbol not in self.exchange_info: await self.before_trade() size = abs(float(i['size'])) # 单位张 side = i['mode'] if size == 0: pass else: ####### res, err = await self._request('GET',f'/api/v4/futures/usdt/tickers', params={"contract":symbol}, auth=1) if res: ap = float(res[0]["last"]) bp = float(res[0]["last"]) mp = (ap+bp)*0.5 if err: pass ####### amount = abs(size)*self.exchange_info[symbol].multiplier ####### if side == 'dual_short': # pk price = float(Decimal(str(mp*1.001//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize))) res = await self.take_order(symbol, amount, "pk", price, "t-123", "limit") self.logger.info(res) if side == 'dual_long': # pd price = float(Decimal(str(mp*0.999//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize))) res = await self.take_order(symbol, amount, "pd", price, "t-123", "limit") self.logger.info(res) except: self.logger.error("清仓程序执行出错") self.logger.error(traceback.format_exc()) return async def go(self): interval = 60 await self.before_trade() await asyncio.sleep(1) while 1: try: # 停机信号 if self.stop_flag:return # 更新账户 res, err = await self.get_account() if err:print(err) if res: if res['currency'] == self.quote: cash = float(res['total']) self.callback['onEquity']({ self.quote:cash }) self.cash_value = cash self.logger.debug(f"rest cash {cash}") # 更新仓位 res, err = await self.get_position() if err: self.logger.info(err) if res: p = model.Position() for i in res: symbol = i['contract'] size = abs(float(i['size']))*self.multiplier price = float(i['entry_price']) side = i['mode'] if self.symbol == symbol: if size == 0: pass else: ####### if side == 'dual_short': p.shortAvg = price p.shortPos = size if side == 'dual_long': p.longAvg = price p.longPos = size self.callback['onPosition'](p) await asyncio.sleep(interval) # 打印延迟 self.logger.debug(f'{self.name} rest报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms') except: traceback.print_exc() await asyncio.sleep(10) async def transfor(self): params = { 'currency':'USDT', 'from':'spot', 'to':'futures', 'amount':'400', 'settle':'USDT', } response, error = await self._request('POST', '/api/v4/wallet/transfers', body=params, auth=1) if response: print(response) if error: print(error) def get_delay_info(self): if len(self.delays) > 100: self.delays = self.delays[-100:] if max(self.delays) > self.max_delay:self.max_delay = max(self.delays) self.avg_delay = round(sum(self.delays)/len(self.delays),1) async def handle_signals(self, orders): '''执行策略指令''' try: for order_name in orders: if 'Cancel' in order_name: cid = orders[order_name][0] oid = orders[order_name][1] if cid: asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid)) elif oid: asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid)) for order_name in orders: if 'Limits' in order_name: for i in orders[order_name]: asyncio.get_event_loop().create_task(self.take_order( self.symbol, i[0], i[1], i[2], i[3], )) for order_name in orders: if 'Check' in order_name: cid = orders[order_name][0] # oid = orders[order_name][1] asyncio.get_event_loop().create_task(self.check_order(client_id=cid)) except: # traceback.print_exc() await asyncio.sleep(0.1)