from os import times import aiohttp import time import asyncio import zlib import json, ujson import zlib import hashlib import hmac import base64 import traceback import random import gzip, sys import csv import logging, logging.handlers import utils import model import datetime import urllib def empty_call(msg): pass class HuobiUsdtSwapWs: def __init__(self, params:model.ClientParams, colo=0, is_print=0): if colo: print('不支持colo高速线路') self.URL_public = 'wss://api.hbdm.com/linear-swap-ws' self.URL_private = 'wss://api.hbdm.com/linear-swap-notification' else: self.URL_public = 'wss://api.hbdm.com/linear-swap-ws' self.URL_private = 'wss://api.hbdm.com/linear-swap-notification' self.params = params self.name = self.params.name self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + '-'+ self.quote self.callback = { "onMarket":self.save_market, "onDepth":empty_call, "onPosition":empty_call, "onEquity":empty_call, "onOrder":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onExit":empty_call, } self.is_print = is_print self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.stop_flag = 0 self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0} self.public_update_time = time.time() self.private_update_time = time.time() self.expired_time = 300 self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 self.depth = [] #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def save_market(self, msg): date = time.strftime('%Y-%m-%d',time.localtime()) interval = self.params.interval if msg: name = msg['name'] if len(msg['data']) > 1: with open(f'./history/{name}_{self.symbol}_{interval}_{date}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(msg['data']) if self.is_print:print(f'写入行情 {self.symbol}') async def get_sign(self): headers = {} headers['Content-Type'] = 'application/json' headers['X-MBX-APIKEY'] = self.params.access_key params = { 'timestamp':int(time.time())*1000, 'recvWindow':5000, } query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())]) signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest() params['signature']=signature url = 'https://fapi.binance.com/fapi/v1/listenKey' session = aiohttp.ClientSession() response = await session.post( url, params=params, headers=headers, timeout=5, proxy=self.proxy ) login_str = await response.text() await session.close() return eval(login_str)['listenKey'] def _update_depth(self, msg): self.public_update_time = time.time() self.ticker_info["bp"] = float(msg['tick']['bids'][0][0]) self.ticker_info["ap"] = float(msg['tick']['asks'][0][0]) self.callback['onTicker'](self.ticker_info) ##### 标准化深度 mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = mp * utils.EFF_RANGE / utils.LEVEL bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(self.ticker_info["bp"]-step*i) for i in range(utils.LEVEL): ap.append(self.ticker_info["ap"]+step*i) # price_thre = self.ticker_info["bp"] - step index = 0 for bid in msg['tick']['bids']: price = float(bid[0]) amount = float(bid[1]) if price > price_thre: bv[index] += amount else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += amount price_thre = self.ticker_info["ap"] + step index = 0 for ask in msg['tick']['asks']: price = float(ask[0]) amount = float(ask[1]) if price < price_thre: av[index] += amount else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += amount self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) def _update_trade(self, msg): self.public_update_time = time.time() for i in msg['tick']['data']: price = float(i['price']) side = i['direction'] amount = float(i['amount']) if price > self.max_buy or self.max_buy == 0.0: self.max_buy = price if price < self.min_sell or self.min_sell == 0.0: self.min_sell = price if side == 'buy': self.buy_q += amount self.buy_v += amount*price elif side == 'sell': self.sell_q += amount self.sell_v += amount*price #### 修正ticker #### # if side == 'buy' and price > self.ticker_info['ap']: # self.ticker_info['ap'] = price # self.callback['onTicker'](self.ticker_info) # if side == 'sell' and price < self.ticker_info['bp']: # self.ticker_info['bp'] = price # self.callback['onTicker'](self.ticker_info) def _update_account(self, msg): for i in msg['data']: if i['margin_asset'] == self.quote: cash = i['margin_balance'] self.callback['onEquity']({self.quote:cash}) def _update_order(self, msg): if msg['contract_code'] == self.symbol: if msg['status'] in [3] : # 新增订单 order_event = dict() order_event['status'] = "NEW" order_event['filled'] = 0 order_event['filled_price'] = 0 order_event['client_id'] = msg["client_order_id"] if "client_order_id" in msg else "" order_event['order_id'] = msg['order_id'] order_event['fee'] = 0.0 self.callback["onOrder"](order_event) elif msg['status'] in [5,6,7]: # 删除订单 order_event = dict() order_event['status'] = "REMOVE" order_event['filled'] = float(msg['trade_volume']) order_event['filled_price'] = float(msg['trade_price']) order_event['client_id'] = msg["client_order_id"] if "client_order_id" in msg else "" order_event['order_id'] = msg['order_id'] if msg['fee_asset'] == self.quote: order_event['fee'] = float(msg['trade_fee']) self.callback["onOrder"](order_event) def _update_position(self, msg): p = model.Position() for i in msg['data']: if i['pair'] == self.symbol: if i['direction'] == 'buy': p.longPos = float(i['volume']) p.longAvg = float(i['cost_hold']) if i['direction'] == 'sell': p.shortPos = float(i['volume']) p.shortAvg = float(i['cost_hold']) self.callback['onPosition'](p) def _get_data(self): market_data = self.depth + [self.max_buy, self.min_sell] self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 return {'name': self.name,'data':market_data} async def go(self): interval = float(self.params.interval) if self.is_print:print(f'Ws循环器启动 interval {interval}') ### onTrade while 1: try: # 更新市场信息 market_data = self._get_data() self.callback['onMarket'](market_data) except: traceback.print_exc() await asyncio.sleep(interval) async def run(self, is_auth=0, sub_trade=0, sub_fast=0): # run asyncio.create_task(self.run_public(sub_trade=0, sub_fast=0)) if is_auth: asyncio.create_task(self.run_private()) while True: await asyncio.sleep(5) async def run_public(self, sub_trade=0, sub_fast=0): while True: try: # 尝试连接 print(f'{self.name} 尝试连接ws public') # 登陆 ws_url = self.URL_public async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f'{self.name} ws public 连接成功') # 订阅 symbol = self.symbol channels=[ f"market.{symbol}.depth.step6", ] if sub_trade: channels.append(f"market.{symbol}.trade.detail") for i in channels: sub_str = json.dumps({"sub": i}) await _ws.send_str(sub_str) while True: # 停机信号 if self.stop_flag:return # 接受消息 try: msg = await _ws.receive(timeout=30) except: print(f'{self.name} ws public 长时间没有收到消息 准备重连...') self.logger.error(f'{self.name} ws public 长时间没有收到消息 准备重连...') break msg = ujson.loads(gzip.decompress(msg.data).decode()) # print(msg) # 处理消息 if 'ch' in msg: if 'depth' in msg['ch']:self._update_depth(msg) if 'trade' in msg['ch']:self._update_trade(msg) if 'ping' in msg: await _ws.send_str(json.dumps({"pong":int(time.time())*1000})) except: traceback.print_exc() print(f'{self.name} ws public 连接失败 开始重连...') self.logger.error(f'{self.name} ws public 连接失败 开始重连...') # await asyncio.sleep(1) async def run_private(self): while True: try: # 尝试连接 print(f'{self.name} 尝试连接ws private') # 登陆 ws_url = self.URL_private async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f'{self.name} ws private 连接成功') # 订阅 def generate_signature(method, params, host, request_path, secret_key): host_url = urllib.parse.urlparse(host).hostname.lower() sorted_params = sorted(params.items(), key=lambda d: d[0], reverse=False) encode_params = urllib.parse.urlencode(sorted_params) payload = [method, host_url, request_path, encode_params] payload = '{}\n{}\n{}\n{}'.format(payload[0],payload[1],payload[2],payload[3]) payload = payload.encode(encoding="utf8") secret_key = secret_key.encode(encoding="utf8") digest = hmac.new(secret_key, payload, digestmod=hashlib.sha256).digest() signature = base64.b64encode(digest) signature = signature.decode() # print(payload) # digest = hmac.new(secret_key.encode('utf8'), payload.encode( # 'utf8'), digestmod=hashlib.sha256).digest() # signature = base64.b64encode(digest).decode() # get Signature return signature timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") suffix = 'AccessKeyId={}&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp={}'.format( self.params.access_key, timestamp) payload = '{}\n{}\n{}\n{}'.format("GET", self.URL_private, "/linear-swap-notification", suffix) digest = hmac.new(self.params.secret_key.encode('utf8'), payload.encode( 'utf8'), digestmod=hashlib.sha256).digest() signature = base64.b64encode(digest).decode() data = { "AccessKeyId": self.params.access_key, "SignatureMethod": "HmacSHA256", "SignatureVersion": "2", "Timestamp": timestamp } # signature = generate_signature("GET", data, self.URL_private, "/swap-notification", self.params.secret_key) data["op"] = "auth" data["type"] = "api" data["Signature"] = signature await _ws.send_str(ujson.dumps(data)) # position positions_cross.$contract_code await _ws.send_str(ujson.dumps({"op":"sub","topic": f"positions_cross.{self.symbol.lower()}"})) # account accounts_cross.$contract_code await _ws.send_str(ujson.dumps({"op":"sub","topic": f"accounts_cross.{self.symbol.lower()}"})) # trade orders_cross.$contract_code await _ws.send_json(ujson.dumps({"op":"sub","topic": f"orders_cross.{self.symbol.lower()}"})) while True: # 停机信号 if self.stop_flag:return # 接受消息 try: msg = await _ws.receive(timeout=30) except: print(f'{self.name} ws private 长时间没有收到消息 准备重连...') self.logger.error(f'{self.name} ws private 长时间没有收到消息 准备重连...') break msg = ujson.loads(gzip.decompress(msg.data).decode()) print(msg) # 处理消息 if 'ch' in msg: if 'positions_cross' in msg['topic']:self._update_position(msg) if 'accounts_cross' in msg['topic']:self._update_account(msg) if 'orders_cross' in msg['topic']:self._update_order(msg) if 'ping' in msg: await _ws.send_str(json.dumps({"pong":int(time.time())*1000})) except: traceback.print_exc() print(f'{self.name} ws private 连接失败 开始重连...') self.logger.error(f'{self.name} ws private 连接失败 开始重连...') # await asyncio.sleep(1)