import aiohttp import time import asyncio import zlib import json, ujson import zlib import hashlib import hmac import base64 import traceback import random, csv, sys, utils import logging, logging.handlers import model from loguru import logger def empty_call(msg): pass def timeit(func): def wrapper(*args, **kwargs): nowTime = time.time() res = func(*args, **kwargs) spend_time = time.time() - nowTime spend_time = round(spend_time * 1000, 5) print(f'{func.__name__} 耗时 {spend_time} ms') return res return wrapper def inflate(data): ''' 解压缩数据 ''' decompress = zlib.decompressobj(-zlib.MAX_WBITS) inflated = decompress.decompress(data) inflated += decompress.flush() return inflated class BinanceUsdtSwapWs: def __init__(self, params:model.ClientParams, colo=0, is_print=0): if colo: print('不支持colo高速线路') self.URL = 'wss://fstream.binance.com/ws/' else: self.URL = 'wss://fstream.binance.com/ws/' self.params = params self.name = self.params.name self.base = params.pair.split('_')[0].upper() self.quote = params.pair.split('_')[1].upper() self.symbol = self.base + self.quote if len(self.params.pair.split('_')) > 2: self.delivery = self.params.pair.split('_')[2] # 210924 self.symbol += f"_{self.delivery}" self.callback = { "onMarket":self.save_market, "onPosition":empty_call, "onEquity":empty_call, "onOrder":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onExit":empty_call, } self.is_print = is_print self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0} self.stop_flag = 0 self.public_update_time = time.time() self.private_update_time = time.time() self.expired_time = 300 ### 更新id self.update_flag_u = 0 self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 self.depth = [] #### self.depth_update = [] self.need_flash = 1 self.lastUpdateId = None # 就是小写u self.depth_full = dict() self.depth_full['bids'] = dict() self.depth_full['asks'] = dict() self.decimal = 99 #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def save_market(self, msg): date = time.strftime('%Y-%m-%d',time.localtime()) interval = self.params.interval if msg: exchange = msg['name'] if len(msg['data']) > 1: with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(msg['data']) if self.is_print:print(f'写入行情 {self.symbol}') async def get_sign(self): headers = {} headers['Content-Type'] = 'application/json' headers['X-MBX-APIKEY'] = self.params.access_key params = { 'timestamp':int(time.time())*1000, 'recvWindow':5000, } query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())]) signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest() params['signature']=signature url = 'https://fapi.binance.com/fapi/v1/listenKey' session = aiohttp.ClientSession() response = await session.post( url, params=params, headers=headers, timeout=5, proxy=self.proxy ) self.logger.debug("申请key") login_str = await response.text() print(login_str) self.logger.debug(login_str) await session.close() try: return ujson.loads(login_str)['listenKey'] except: self.logger.error('登录失败') return 'qqlh' async def long_key(self): headers = {} headers['Content-Type'] = 'application/json' headers['X-MBX-APIKEY'] = self.params.access_key params = { 'timestamp':int(time.time())*1000, 'recvWindow':5000, } query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())]) signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest() params['signature']=signature url = 'https://fapi.binance.com/fapi/v1/listenKey' session = aiohttp.ClientSession() response = await session.put( url, params=params, headers=headers, timeout=5, proxy=self.proxy ) self.logger.debug("续期key") login_str = await response.text() self.logger.debug(login_str) await session.close() return ujson.loads(login_str) def _check_update_u(self, id): if id > self.update_flag_u: self.update_flag_u = id return 0 else: return 1 def _update_ticker(self, msg): self.public_update_time = time.time() msg = ujson.loads(msg) if self._check_update_u(msg['u']): return else: bp = float(msg['b']) bq = float(msg['B']) ap = float(msg['a']) aq = float(msg['A']) self.ticker_info["bp"] = bp self.ticker_info["ap"] = ap self.callback['onTicker'](self.ticker_info) ### 标准化深度 self.depth = [bp,bq,ap,aq] self.callback['onDepth']({'name':self.name,'data':self.depth}) # @timeit def _update_depth20(self, msg): # ge = json.loads(msg)['T'] # logger.info(int(time.time() * 1000) - ge) # logger.info(msg) self.public_update_time = time.time() msg = ujson.loads(msg) if self._check_update_u(msg['u']): return else: # 更新ticker信息但不触发 self.ticker_info["bp"] = float(msg['b'][0][0]) self.ticker_info["ap"] = float(msg['a'][0][0]) self.callback['onTicker'](self.ticker_info) if self.decimal == 99:self.decimal = utils.num_to_decimal(msg['b'][0][0]) ##### 标准化深度 mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = round(mp * utils.EFF_RANGE / utils.LEVEL, self.decimal) bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(round(self.ticker_info["bp"]-step*i, self.decimal)) for i in range(utils.LEVEL): ap.append(round(self.ticker_info["ap"]+step*i, self.decimal)) ############################################### price_thre = self.ticker_info["bp"] - step index = 0 for i in msg['b']: price = float(i[0]) amount = float(i[1]) if price > price_thre: bv[index] += amount else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += amount price_thre = self.ticker_info["ap"] + step index = 0 for i in msg['a']: price = float(i[0]) amount = float(i[1]) if price < price_thre: av[index] += amount else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += amount self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) # @timeit def _update_depth(self, msg): self.public_update_time = time.time() msg = ujson.loads(msg) self.depth_update.append(msg) ### 检查是否有遗漏 for i in range(1,len(self.depth_update)): if self.depth_update[i]['pu'] != self.depth_update[i]['u']: self.need_flash = 1 self.logger.error('发现遗漏增量深度推送 重置绝对深度') return # print(len(self.depth_update)) if self.need_flash == 0: # 可以更新深度 for i in self.depth_update[:]: u = i['u'] U = i['U'] pu = i['pu'] # print(f'处理 {u}') if u < self.lastUpdateId: # 丢弃过旧的信息 self.depth_update.remove(i) else: if u >= self.lastUpdateId: # 后续更新本地副本 # print(f'符合要求 {u}') # 开始更新深度 for j in i['b']: price = float(j[0]) amount = float(j[1]) if amount > 0: self.depth_full['bids'][price] = amount else: if price in self.depth_full['bids']:del(self.depth_full['bids'][price]) for j in i['a']: price = float(j[0]) amount = float(j[1]) if amount > 0: self.depth_full['asks'][price] = amount else: if price in self.depth_full['asks']:del(self.depth_full['asks'][price]) self.depth_update.remove(i) self.lastUpdateId = u else: self.logger.error('增量深度不满足文档要求的条件') buyP = list(self.depth_full['bids'].keys()) buyP.sort(reverse=True) # 从大到小 sellP = list(self.depth_full['asks'].keys()) sellP.sort(reverse=False) # 从小到大 # update ticker self.ticker_info["bp"] = float(buyP[0]) self.ticker_info["ap"] = float(sellP[0]) self.callback['onTicker'](self.ticker_info) if self.ticker_info["bp"] > self.ticker_info["ap"]: self.need_flash = 1 ##### 标准化深度 mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = mp * utils.EFF_RANGE / utils.LEVEL bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(self.ticker_info["bp"]-step*i) for i in range(utils.LEVEL): ap.append(self.ticker_info["ap"]+step*i) # price_thre = self.ticker_info["bp"] - step index = 0 for price in buyP: if price > price_thre: bv[index] += self.depth_full['bids'][price] else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += self.depth_full['bids'][price] price_thre = self.ticker_info["ap"] + step index = 0 for price in sellP: if price < price_thre: av[index] += self.depth_full['asks'][price] else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += self.depth_full['asks'][price] self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) def _update_trade(self, msg): ''' 根据trade修正depth对性能消耗很大 ''' self.public_update_time = time.time() msg = ujson.loads(msg) price = float(msg['p']) if self.decimal == 99:self.decimal=utils.num_to_decimal(price) amount = float(msg['q']) side = 'sell' if msg['m'] else 'buy' if price > self.max_buy or self.max_buy == 0.0: self.max_buy = price if price < self.min_sell or self.min_sell == 0.0: self.min_sell = price if side == 'buy': self.buy_q += amount self.buy_v += amount*price elif side == 'sell': self.sell_q += amount self.sell_v += amount*price #### 修正ticker #### # side = 'sell' if msg['m'] else 'buy' # if side == 'buy' and price > self.ticker_info['ap']: # self.ticker_info['ap'] = price # self.callback['onTicker'](self.ticker_info) # if side == 'sell' and price < self.ticker_info['bp']: # self.ticker_info['bp'] = price # self.callback['onTicker'](self.ticker_info) def _update_account(self, msg): self.private_update_time = time.time() msg = ujson.loads(msg) for i in msg['a']['B']: if i['a'] == self.quote: self.callback['onEquity']({self.quote:float(i['wb'])}) def _update_order(self, msg): '''将ws收到的订单信息触发quant''' msg = ujson.loads(msg) self.logger.debug(f"ws订单推送 {msg}") data = msg['o'] if self.symbol in data['s']: order_event = dict() status = data['X'] if status == "NEW": # 新增 local_status = "NEW" elif status in ["CANCELED", "FILLED", "EXPIRED"]: # 删除 local_status = "REMOVE" elif status in ["PARTIALLY_FILLED"]: # 忽略 return else: print("未知订单状态",data) return order_event['status'] = local_status order_event['filled_price'] = float(data['ap']) order_event['filled'] = float(data['z']) order_event['client_id'] = data['c'] order_event['order_id'] = data['i'] self.callback['onOrder'](order_event) self.private_update_time = time.time() def _update_position(self, msg): long_pos, short_pos = 0, 0 long_avg, short_avg = 0, 0 msg = ujson.loads(msg) is_update = 0 for i in msg['a']['P']: if i['s'] == self.symbol: is_update = 1 if i['ps'] == 'LONG': long_pos += abs(float(i['pa'])) long_avg = abs(float(i['ep'])) if i['ps'] == 'SHORT': short_pos += abs(float(i['pa'])) short_avg = abs(float(i['ep'])) if is_update: pos = model.Position() pos.longPos = long_pos pos.longAvg = long_avg pos.shortPos = short_pos pos.shortAvg = short_avg self.callback['onPosition'](pos) self.private_update_time = time.time() def _get_data(self): market_data = self.depth + [self.max_buy, self.min_sell] self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 return {'name': self.name,'data':market_data} async def get_depth_flash(self): headers = {} headers['Content-Type'] = 'application/json' url = f'https://fapi.binance.com/fapi/v1/depth?symbol={self.symbol}&limit=1000' session = aiohttp.ClientSession() response = await session.get( url, headers=headers, timeout=5, proxy=self.proxy ) depth_flash = await response.text() await session.close() return ujson.loads(depth_flash) async def go(self): interval = float(self.params.interval) if self.is_print:print(f'Ws循环器启动 interval {interval}') ### onTrade while 1: try: # 更新市场信息 market_data = self._get_data() self.callback['onMarket'](market_data) except: traceback.print_exc() await asyncio.sleep(interval) async def run(self, is_auth=0, sub_trade=0, sub_fast=0): while True: try: # 重置更新时间 self.public_update_time = time.time() self.private_update_time = time.time() # 尝试连接 print(f'{self.name} 尝试连接ws') # 登陆 if is_auth: listenKey = await self.get_sign() listenKeyTime = time.time() else: listenKey = 'qqlh' ws_url = self.URL+listenKey async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f'{self.name} ws连接成功') self.logger.debug(f'{self.name} ws连接成功') # 订阅 symbol = self.symbol.lower() if sub_fast: channels=[ f"{symbol}@bookTicker", ] else: channels=[ # f"{symbol}@depth@100ms", f"{symbol}@depth20@100ms", ] if sub_trade: channels.append(f"{symbol}@aggTrade") sub_str = ujson.dumps({"method": "SUBSCRIBE", "params": channels, "id":random.randint(1,1000)}) await _ws.send_str(sub_str) self.need_flash = 1 while True: # 停机信号 if self.stop_flag: await _ws.close() return # 接受消息 try: msg = await _ws.receive(timeout=30) except: print(f'{self.name} ws长时间没有收到消息 准备重连...') self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...') break msg = msg.data # 处理消息 # if 'depthUpdate' in msg:self._update_depth(msg) if 'depthUpdate' in msg:self._update_depth20(msg) elif 'bookTicker' in msg:self._update_ticker(msg) elif 'aggTrade' in msg:self._update_trade(msg) elif 'ACCOUNT_UPDATE' in msg: self._update_position(msg) self._update_account(msg) elif 'ORDER_TRADE_UPDATE' in msg:self._update_order(msg) elif 'ping' in msg:await _ws.send_str('pong') elif 'listenKeyExpired' in msg:raise Exception('key过期重连') # 续期listenkey if is_auth: if time.time() - listenKeyTime > 60*15: # 每15分钟续一次 print('续期listenKey') listenKeyTime = time.time() await self.long_key() if time.time() - self.private_update_time > self.expired_time*5: raise Exception('长期未更新私有信息重连') if time.time() - self.public_update_time > self.expired_time: raise Exception('长期未更新公有信息重连') # if self.need_flash: # depth_flash = await self.get_depth_flash() # self.lastUpdateId = depth_flash['lastUpdateId'] # print(f'更新绝对深度 {self.lastUpdateId}') # # 检查已有更新中是否包含 # self.depth_full['bids'] = dict() # self.depth_full['asks'] = dict() # for i in depth_flash['bids']:self.depth_full['bids'][float(i[0])] = float(i[1]) # for i in depth_flash['asks']:self.depth_full['asks'][float(i[0])] = float(i[1]) # self.need_flash = 0 except: traceback.print_exc() print(f'{self.name} ws连接失败 开始重连...') self.logger.error(f'{self.name} ws连接失败 开始重连...') self.logger.error(traceback.format_exc()) await asyncio.sleep(1)