import aiohttp import time import asyncio import zlib import json import hmac import base64 import hashlib import traceback import logging, logging.handlers import urllib, sys from urllib import parse from urllib.parse import urljoin import datetime import logging, logging.handlers import model import utils def empty_call(msg): print('空的回调函数') def decimal_amount(amount, d): if int(d) == 0: return str(int(amount)) elif int(d) > 0: return str(round(float(amount), int(d))) def decimal_price(price, d): if int(d) == 0: return str(int(price)) elif int(d) > 0: return str(round(float(price), int(d))) class HuobiUsdtSwapRest: def __init__(self, params:model.ClientParams, colo=0): if colo: print('不支持colo高速线路') self.HOST = 'https://api.hbdm.com' else: self.HOST = 'https://api.hbdm.com' self.params = params self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + '-' + self.quote self.data = {} self._SESSIONS = dict() self.data['account'] = {} self.callback = { "onMarket":empty_call, "onPosition":empty_call, "onOrder":empty_call, } self.exchange_info = dict() self.delays = [] self.max_delay = 0.0 self.avg_delay = 0.0 self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.stop_flag = 0 self.coin_value = 0.0 self.cash_value = 0.0 self.multiplier = None #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_delay_info(self): if len(self.delays) > 100: self.delays = self.delays[-100:] if max(self.delays) > self.max_delay:self.max_delay = max(self.delays) self.avg_delay = round(sum(self.delays)/len(self.delays),1) def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def _get_session(self, url): key = url if key not in self._SESSIONS: tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0)) session = aiohttp.ClientSession(connector=tcp) self._SESSIONS[key] = session return self._SESSIONS[key] def generate_signature(self, method, params, host_url, request_path): if request_path.startswith("http://") or request_path.startswith("https://"): host_url = urllib.parse.urlparse(request_path).hostname.lower() request_path = '/' + '/'.join(request_path.split('/')[3:]) else: host_url = urllib.parse.urlparse(self.HOST).hostname.lower() sorted_params = sorted(params.items(), key=lambda d: d[0], reverse=False) encode_params = urllib.parse.urlencode(sorted_params) payload = [method, host_url, request_path, encode_params] payload = "\n".join(payload) payload = payload.encode(encoding="UTF8") secret_key = self.params.secret_key.encode(encoding="utf8") digest = hmac.new(secret_key, payload, digestmod=hashlib.sha256).digest() signature = base64.b64encode(digest) signature = signature.decode() return signature async def _request(self, method, uri, body=None, params=None, auth=False): url = urljoin(self.HOST, uri) if auth: timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") params = params if params else {} params.update({"AccessKeyId": self.params.access_key, "SignatureMethod": "HmacSHA256", "SignatureVersion": "2", "Timestamp": timestamp}) host_name = urllib.parse.urlparse(self.HOST).hostname.lower() params["Signature"] = self.generate_signature(method, params, host_name, uri) if method == "GET": headers = { "Content-type": "application/x-www-form-urlencoded", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/39.0.2171.71 Safari/537.36" } else: headers = { "Accept": "application/json", "Content-type": "application/json" } # 发起请求 session = self._get_session(url) timeout = aiohttp.ClientTimeout(10) msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body) self.logger.debug(msg) try: start_time = time.time() if method == "GET": response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy) elif method == "POST": response = await session.post(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy) elif method == "DELETE": response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy) code = response.status res = await response.json() delay = int(1000*(time.time() - start_time)) self.delays.append(delay) res_msg = msg + f' 回报 {res}' self.logger.debug(res_msg) if code not in (200, 201, 202, 203, 204, 205, 206): self.logger.error(f'请求错误 {res}') self.logger.error(code) return None ,res return json.loads(res), None except Exception as e: print('网络请求出错', e) self.logger.error('请求错误') self.logger.error(e) return None, e async def take_order(self, symbol, amount, origin_side, price, cid, order_type='LIMIT'): if origin_side =='kd': side = 'buy' positionSide = 'open' elif origin_side =='pd': side = 'sell' positionSide = 'close' elif origin_side =='kk': side = 'sell' positionSide = 'open' elif origin_side =='pk': side = 'buy' positionSide = 'close' else: raise Exception('下单参数错误') params = { 'symbol': symbol, 'quantity': decimal_amount(amount, self.params['decimal_amount']), 'side': side, 'positionSide': positionSide, 'price': decimal_price(price, self.params['decimal_price'] ), 'type':order_type, 'timeInForce':'GTC', } # logger.info(f'下单指令 {params}') if self.params['debug'] == 'True': return await asyncio.sleep(0.1) else: response, error = await self._request('POST', '/linear-swap-api/v1/swap_cross_order', params=params, auth=1) # logger.info(f'下单回报 {response}') if response: order_event = dict() order_event['status'] = "NEW" order_event['client_id'] = cid order_event['order_id'] = response["order_id"] self.callback["onOrder"](order_event) if error: order_event = dict() order_event['status'] = "REMOVE" order_event['client_id'] = cid order_event['filled_price'] = 0.0 order_event['filled'] = 0.0 order_event['fee'] = 0.0 self.callback["onOrder"](order_event) return response # async def take_orders(self, orders): # def change(side): # if side =='kd': # side = 'BUY' # positionSide = 'LONG' # elif side =='pd': # side = 'SELL' # positionSide = 'LONG' # elif side =='kk': # side = 'SELL' # positionSide = 'SHORT' # elif side =='pk': # side = 'BUY' # positionSide = 'SHORT' # else: # raise Exception('下单参数错误') # return side, positionSide # params = {} # data = [] # for i in orders: # data.append({ # 'symbol': i[0], # 'quantity': i[1], # 'side': change(i[2])[0], # 'positionSide': change(i[2])[1], # 'price': i[3], # 'type':'LIMIT', # 'timeInForce':'GTC', # }) # params['batchOrders'] = json.dumps(data) # # logger.info(f'下单指令 {params}') # if self.params['debug'] == 'True': # return await asyncio.sleep(0.1) # else: # response = await self._request('POST', '/fapi/v1/batchOrders', params=params) # # logger.info(f'下单回报 {response}') # return response async def cancel_order(self, order_id=None, client_id=None): if order_id: params = { "contract_code": self.symbol, "order_id": order_id, } response, error = await self._request('POST', f'/linear-swap-api/v1/swap_cross_cancel', params=params, auth=1) if response: pass if error: pass return None async def cancel_all_orders(self): params = { "contract_code": self.symbol } return await self._request('POST', f'/linear-swap-api/v1/swap_cross_cancelall', params=params, auth=1) async def get_order_list(self): params = {'contract_code':self.symbol} response, error = await self._request('POST', '/linear-swap-api/v1/swap_cross_openorders', params=params, auth=1) if response: for i in response: pass # if i['direction'] == 'buy' and i['offset'] == 'open': # side = 'kd' # elif i['direction'] == 'sell' and i['offset'] == 'close': # side = 'pd' # elif i['direction'] == 'sell' and i['offset'] == 'open': # side = 'kk' # elif i['direction'] == 'buy' and i['offset'] == 'close': # side = 'pk' # orders.append({ # 'order_id':i['order_id'], # 'symbol':i['contract_code'], # 'amount':float(i['volume']), # 'side':side, # 'price':float(i['price']), # }) # self.callback['onOrder']({"refresh":orders}) if error: pass return None async def get_server_time(self): params = {} response, error = await self._request('GET', '/api/v1/timestamp', params=params) return response async def get_account(self): return await self._request('POST','/linear-swap-api/v1/swap_cross_account_info', params={}, auth=1) async def get_position(self): '''获取持仓 symbol: BTC-USDT''' return await self._request('POST','/linear-swap-api/v1/swap_position_info', params={'contract_code':self.symbol}, auth=1) async def before_trade(self): '''获取市场信息''' res, err = await self._request('GET',f'/linear-swap-api/v1/swap_contract_info', params={}, auth=1) if err: print(err) if res: for i in res['data']: if self.symbol == i['name']: self.multiplier = float(i['contract_size']) self.tickSize = float(i['price_tick']) self.stepSize = 1*float(i['contract_size']) # 张 转换为 币 #### 保存交易规则信息 exchange_info = model.ExchangeInfo() exchange_info.symbol = i['name'] exchange_info.multiplier = float(i['contract_size']) exchange_info.tickSize = float(i['price_tick']) exchange_info.stepSize = 1*float(i['contract_size']) self.exchange_info[exchange_info.symbol] = exchange_info pass async def go(self): while 1: try: # 更新账户 res, err = await self.get_account() for i in res: self.data['account'][i['asset']] = i['balance'] if self.quote == i['asset'].upper(): cash = float(i['balance']) self.callback['onEquity']({ self.quote:cash }) # 更新仓位 res, err = await self.get_position() if res: p = model.Position() for i in res: if i['symbol'] == self.symbol: if i['positionSide'] == 'LONG': p.longPos = float(i['positionAmt']) p.longAvg = float(i['entryPrice']) if i['positionSide'] == 'SHORT': p.shortPos = float(i['positionAmt']) p.shortAvg = float(i['entryPrice']) self.callback['onPosition'](p) await asyncio.sleep(1) # 打印延迟 self.get_delay_info() except: # traceback.print_exc() await asyncio.sleep(10) def get_data(self): return self.data async def run(self, orders): '''执行策略指令''' try: for order_name in orders: if 'Cancel' in order_name: cid = orders[order_name][0] oid = orders[order_name][1] if cid: asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid)) if oid: asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid)) for order_name in orders: if 'Limits' in order_name: for i in orders[order_name]: asyncio.get_event_loop().create_task(self.take_order( self.symbol, i[0], i[1], i[2], i[3], )) for order_name in orders: if 'Check' in order_name: cid = orders[order_name][0] # oid = orders[order_name][1] asyncio.get_event_loop().create_task(self.check_order(client_id=cid)) except: # traceback.print_exc() await asyncio.sleep(0.1)