from os import access import aiohttp import time import asyncio import zlib import json, ujson import zlib import hashlib import hmac import base64 import traceback import random import gzip, csv, sys from uuid import uuid4 import logging, logging.handlers from yarl import URL import utils import model from decimal import Decimal def empty_call(msg): # print(msg) pass class BybitUsdtSwapWs: def __init__(self, params: model.ClientParams, colo=0, is_print=0): if colo: print('不支持colo高速线路') self.BaseURL_public = "wss://stream.bybit.com/realtime_public" self.BaseURL_private = "wss://stream.bybit.com/realtime_private" else: self.BaseURL_public = "wss://stream.bybit.com/realtime_public" self.BaseURL_private = "wss://stream.bybit.com/realtime_private" self.params = params self.name = self.params.name self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + self.quote self.callback = { "onMarket":self.save_market, "onPosition":empty_call, "onEquity":empty_call, "onOrder":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onExit":empty_call, } self.is_print = is_print self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0} self.multiplier = None self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 self.update_t = 0.0 self.depth = [] self.orderbook = dict() self.orderbook['bid'] = dict() self.orderbook['ask'] = dict() self.last_on_depth_time = time.time() self.sub_fast = 0 #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def save_market(self, msg): date = time.strftime('%Y-%m-%d',time.localtime()) interval = float(self.params.interval) if msg: exchange = msg['name'] if len(msg['data']) > 1: with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(msg['data']) if self.is_print:print(f'写入行情 {self.symbol}') async def get_sign(self): headers = {} headers['Content-Type'] = 'application/json' headers['X-MBX-APIKEY'] = self.params.access_key params = { 'timestamp':int(time.time())*1000, 'recvWindow':5000, } query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())]) signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest() params['signature']=signature url = 'https://fapi.binance.com/fapi/v1/listenKey' session = aiohttp.ClientSession() response = await session.post( url, params=params, headers=headers, timeout=5, proxy=self.proxy ) login_str = await response.text() await session.close() return ujson.loads(login_str)['listenKey'] def _update_depth(self, msg): t = int(msg['timestamp_e6']) if t > self.update_t: self.update_t = t ###### 维护orderbook if msg['type'] == 'snapshot': self.orderbook = dict() self.orderbook['bid'] = dict() self.orderbook['ask'] = dict() for i in msg['data']['order_book']: if i['side'] == 'Buy': self.orderbook['bid'][float(i['price'])] = float(i['size']) elif i['side'] == 'Sell': self.orderbook['ask'][float(i['price'])] = float(i['size']) else: print('错误类型') elif msg['type'] == 'delta': for _type in msg['data']: for i in msg['data'][_type]: if _type == 'delete': if i['side'] == 'Buy': if float(i['price']) in self.orderbook['bid']: del(self.orderbook['bid'][float(i['price'])]) elif i['side'] == 'Sell': if float(i['price']) in self.orderbook['ask']: del(self.orderbook['ask'][float(i['price'])]) else: print('错误类型') elif _type == 'update': if i['side'] == 'Buy': if float(i['price']) in self.orderbook['bid']: self.orderbook['bid'][float(i['price'])] = float(i['size']) elif i['side'] == 'Sell': if float(i['price']) in self.orderbook['ask']: self.orderbook['ask'][float(i['price'])] = float(i['size']) else: print('错误类型') elif _type == 'insert': if i['side'] == 'Buy': self.orderbook['bid'][float(i['price'])] = float(i['size']) elif i['side'] == 'Sell': self.orderbook['ask'][float(i['price'])] = float(i['size']) else: print('错误类型') else: print('错误类型') else: print('未知depth类型') ###### 限制回调频率 now_time = time.time() if now_time - self.last_on_depth_time >= 0.2 or self.sub_fast: self.last_on_depth_time = time.time() ###### self.ticker_info['bp'] = max(self.orderbook['bid'].keys()) self.ticker_info['ap'] = min(self.orderbook['ask'].keys()) ###### if self.ticker_info['bp'] > self.ticker_info['ap']: raise Exception("增量深度出现错误") ###### self.callback['onTicker'](self.ticker_info) ##### 标准化深度 mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = mp * utils.EFF_RANGE / utils.LEVEL bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(self.ticker_info["bp"]-step*i) for i in range(utils.LEVEL): ap.append(self.ticker_info["ap"]+step*i) # price_thre = self.ticker_info["bp"] - step index = 0 for bid_price in self.orderbook['bid'].keys(): price = bid_price amount = self.orderbook['bid'][bid_price] if price > price_thre: bv[index] += amount else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += amount price_thre = self.ticker_info["ap"] + step index = 0 for ask_price in self.orderbook['ask'].keys(): price = ask_price amount = self.orderbook['ask'][ask_price] if price < price_thre: av[index] += amount else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += amount self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) # print('更新深度', time.time(),self.depth) # def _update_ticker(self, msg): # if msg['data']['sequence'] > self.update_t: # self.update_t = msg['data']['sequence'] # self.ticker_info['bp'] = float(msg['data']['bestBidPrice']) # self.ticker_info['ap'] = float(msg['data']['bestAskPrice']) # self.callback['onTicker'](self.ticker_info) def _update_trade(self, msg): for i in msg['data']: price = float(i['price']) side = i['side'].lower() amount = float(i['size']) if price > self.max_buy or self.max_buy == 0.0: self.max_buy = price if price < self.min_sell or self.min_sell == 0.0: self.min_sell = price if side == 'buy': self.buy_q += amount self.buy_v += amount*price elif side == 'sell': self.sell_q += amount self.sell_v += amount*price def _update_position(self, msg): pos = model.Position() for i in msg['data']: symbol = i['symbol'] if symbol == self.symbol: amt = float(i["size"]) side = i['side'].lower() ep = float(i["entry_price"]) if side == 'buy': pos.longPos = amt pos.longAvg = ep elif side == 'sell': pos.shortPos = amt pos.shortAvg = ep else: pass self.callback["onPosition"](pos) def _update_account(self, msg): for i in msg['data']: self.callback['onEquity'] = {self.quote:float(i['wallet_balance'])} def _update_order(self, msg): self.logger.debug(f"ws订单推送 {msg}") # print(msg) for i in msg['data']: if self.symbol == i['symbol']: if i["order_status"] == 'New': # 新增订单 order_event = dict() order_event['status'] = "NEW" order_event['filled'] = 0 order_event['filled_price'] = 0 order_event['client_id'] = i["order_link_id"] if "order_link_id" in i else "" order_event['order_id'] = i['order_id'] order_event['fee'] = 0.0 self.callback["onOrder"](order_event) # print('新建',order_event['client_id']) elif i["order_status"] in ['Filled','Cancelled']: # 删除订单 # fee 负数是扣手续费 bitget没有返佣 order_event = dict() order_event['status'] = "REMOVE" order_event['client_id'] = i["order_link_id"] if "order_link_id" in i else "" order_event['order_id'] = i['order_id'] order_event['filled'] = float(i["cum_exec_qty"]) order_event['filled_price'] = float(i["last_exec_price"]) if 'last_exec_price' in i else float(i['price']) order_event['fee'] = float(i['cum_exec_fee']) self.callback["onOrder"](order_event) # print('移除',order_event['client_id']) def _get_data(self): market_data = self.depth + [self.max_buy, self.min_sell] self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 return {'name': self.name,'data':market_data} async def go(self): interval = float(self.params.interval) if self.is_print:print(f'Ws循环器启动 interval {interval}') ### onTrade while 1: try: # 更新市场信息 market_data = self._get_data() self.callback['onMarket']({'name': self.name,'data':market_data}) except: traceback.print_exc() await asyncio.sleep(interval) async def get_token(self, is_auth): # 获取 token if is_auth: uri = "/api/v1/bullet-private" else: uri = "/api/v1/bullet-public" headers = {} if is_auth: now_time = int(time.time()) * 1000 str_to_sign = str(now_time) + "POST" + uri sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest()) passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest()) headers = { "KC-API-SIGN": sign.decode(), "KC-API-TIMESTAMP": str(now_time), "KC-API-KEY": self.params.access_key, "KC-API-PASSPHRASE": passphrase.decode(), "Content-Type": "application/json", "KC-API-KEY-VERSION": "2" } headers["User-Agent"] = "kucoin-python-sdk/v1.0" session = aiohttp.ClientSession() response = await session.post( self.BaseURL+uri, timeout=5, headers=headers, proxy=self.proxy ) res = await response.text() res = ujson.loads(res) await session.close() if res["code"] == "200000": token = res["data"]["token"] ws_connect_id = str(uuid4()).replace('-', '') endpoint = res["data"]['instanceServers'][0]['endpoint'] ws_endpoint = f"{endpoint}?token={token}&connectId={ws_connect_id}" encrypt = res["data"]['instanceServers'][0]['encrypt'] if is_auth: ws_endpoint += '&acceptUserMessage=true' return ws_endpoint, encrypt else: raise Exception("kucoin usdt swap 获取token错误") async def run(self, is_auth=0, sub_trade=0, sub_fast=0): '''''' asyncio.create_task(self.run_public(sub_trade, sub_fast)) if is_auth: asyncio.create_task(self.run_private()) while True: await asyncio.sleep(5) async def run_private(self): ''' 订阅private频道 ''' while True: try: ping_time = time.time() # 尝试连接 print(f'{self.name} 尝试连接ws') # 登陆 ws_url = self.BaseURL_private async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f'{self.name} ws private 连接成功') self.logger.info(f'{self.name} ws private 连接成功') # 先鉴权 # Generate expires. expires = int((time.time() + 10000) * 1000) # Generate signature. signature = str(hmac.new( bytes(self.params.secret_key, "utf-8"), bytes(f"GET/realtime{expires}", "utf-8"), digestmod="sha256" ).hexdigest()) await _ws.send_str(ujson.dumps({ "op":"auth", "args":[ self.params.access_key, expires, signature ] })) # 订阅 channels = [ "position", "wallet", "order", ] for i in channels: sub_str = ujson.dumps({"args": [i], "op":"subscribe"}) await _ws.send_str(sub_str) while True: # 接受消息 try: msg = await _ws.receive(timeout=300) except: print(f'{self.name} ws长时间没有收到消息 private 准备重连...') self.logger.error(f'{self.name} ws长时间没有收到消息 private 准备重连...') break # self.logger.debug(msg) try: msg = ujson.loads(msg.data) except: # self.logger.warning(f'非json格式string:{msg}') pass # print(msg) # 处理消息 if 'topic' in msg: if 'wallet' in msg['topic']:self._update_account(msg) elif 'order' in msg['topic']:self._update_order(msg) elif 'position' in msg['topic']:self._update_position(msg) # heartbeat if time.time() - ping_time > 15: await _ws.send_str('{"op": "ping"}') ping_time = time.time() except: traceback.print_exc() print(f'{self.name} ws连接失败 开始重连...') self.logger.error(f'{self.name} ws连接失败 开始重连...') self.logger.error(traceback.format_exc()) # await asyncio.sleep(1) async def run_public(self, sub_trade=0, sub_fast=0): ''' 订阅public频道 ''' self.sub_fast = sub_fast while True: try: ping_time = time.time() # 尝试连接 print(f'{self.name} 尝试连接ws') # 登陆 url = self.BaseURL_public async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f'{self.name} ws public 连接成功') self.logger.info(f'{self.name} ws public 连接成功') # 订阅 channels=[ f"orderBookL2_25.{self.symbol}" # 推送频率20ms ] if sub_trade: channels += [ f"trade.{self.symbol}" ] for i in channels: sub_str = ujson.dumps({"args": [i], "op":"subscribe"}) await _ws.send_str(sub_str) while True: # 接受消息 try: msg = await _ws.receive(timeout=30) except: print(f'{self.name} ws长时间没有收到消息 public 准备重连...') self.logger.error(f'{self.name} ws长时间没有收到消息 public 准备重连...') break # self.logger.debug(msg) try: msg = ujson.loads(msg.data) except: # self.logger.warning(f'非json格式string:{msg}') pass # print(msg) # 处理消息 if 'data' in msg: if f'orderBookL2_25.{self.symbol}' == msg['topic']:self._update_depth(msg) elif 'trade' in msg['topic']:self._update_trade(msg) # heartbeat if time.time() - ping_time > 15: await _ws.send_str('{"op": "ping"}') ping_time = time.time() except: traceback.print_exc() print(f'{self.name} ws连接失败 开始重连...') self.logger.error(f'{self.name} ws连接失败 开始重连...') self.logger.error(traceback.format_exc()) # await asyncio.sleep(1) if __name__ == "__main__": p = model.ClientParams() p.name = "" p.pair = "matic_usdt" p.proxy = "http://127.0.0.1:7890" p.access_key = "nVrNVv0HQ9a1IgaDeC" p.secret_key = "7zJpfh8rImdrtNO2GnKnGMscKdAJkVMnt6Jl" p.pass_key = "qwer1234" p.interval = "0.1" p.broker_id = "x-nXtHr5jj" p.debug = "False" ws = BybitUsdtSwapWs(p, is_print=1) loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(ws.run(is_auth=1, sub_trade=1)), # asyncio.ensure_future(ws.go()), ] loop.run_until_complete(asyncio.wait(tasks))