import aiohttp import time import asyncio import json, ujson import zlib import hashlib import hmac import base64 import traceback import random, csv, sys import logging, logging.handlers from itertools import zip_longest from datetime import datetime import urllib import utils import model from collections import defaultdict, deque def empty_call(msg): pass ZERO = 1e-8 class FtxUsdtSwapWs: """""" def __init__(self, params:model.ClientParams, colo=0, is_print=0): if colo: print('不支持colo高速线路') self.URL = 'wss://ftx.com/ws/' else: self.URL = 'wss://ftx.com/ws/' self.params = params self.name = self.params.name # self.base = params.pair.split('_')[0].upper() self.quote = params.pair.split('_')[1].upper() self.symbol = f"{self.base}-PERP" # self.data = dict() self.data['trade'] = [] self.data['force'] = [] self.callback = { "onMarket":self.save_market, "onDepth":empty_call, "onPosition":empty_call, "onEquity":empty_call, "onOrder":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onExit":empty_call, } self.depth_update = [] self.need_flash = 1 self.updata_u = None self.last_update_id = None self.depth = dict() self.depth['bids'] = dict() self.depth['asks'] = dict() self.is_print = is_print self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0} self.stop_flag = 0 self.public_update_time = time.time() self.private_update_time = time.time() self.expired_time = 300 self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 self.stepSize = None self.tickSize = None self.ctVal = None # 合约乘数 self.ctMult = None # 合约面值 self.depth = [] self._reset_orderbook() #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def _reset_orderbook(self) -> None: self._orderbook_timestamp = 0 self._orderbook = {side: defaultdict(float) for side in ['bids','asks']} def save_market(self, msg): print(msg) #pass #date = time.strftime('%Y-%m-%d',time.localtime()) #interval = self.params.interval #if msg: # exchange = msg['name'] # if len(msg['data']) > 1: # with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv', # 'a', # newline='', # encoding='utf-8') as f: # writer = csv.writer(f, delimiter=',') # writer.writerow(msg['data']) # if self.is_print:print(f'写入行情 {self.symbol}') def _get_data(self): market_data = self.depth + [self.max_buy, self.min_sell] self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 return {'name': self.name,'data':market_data} async def go(self): interval = float(self.params.interval) if self.is_print:print(f'Ws循环器启动 interval {interval}') ### onTrade while 1: try: # 更新市场信息 market_data = self._get_data() self.callback['onMarket'](market_data) except: traceback.print_exc() await asyncio.sleep(interval) def subscribe_public(self, sub_trade = 0): channels = [ "orderbook", # "ticker" ] if sub_trade: channels.append("trades") subs = [ujson.dumps({'op':'subscribe','market':self.symbol, 'channel':channel}) for channel in channels] return subs async def run_public(self, sub_trade=0): """""" while 1: try: self.public_update_time = time.time() print(f"{self.name} public 尝试连接ws") ws_url = self.URL async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f"{self.name} public ws连接成功") self.logger.debug(f"{self.name} public ws连接成功") for sub in self.subscribe_public(sub_trade): await _ws.send_str(sub) while True: # 停机信号 if self.stop_flag: await _ws.close() return # 接受消息 try: msg = await _ws.receive() except: print(f'{self.name} public ws长时间没有收到消息 准备重连...') self.logger.error(f'{self.name} public ws长时间没有收到消息 准备重连...') break msg = msg.data await self.on_message_public(_ws, msg) except: traceback.print_exc() print(f'{self.name} ws public 连接失败 开始重连...') self.logger.error(f'{self.name} ws public 连接失败 开始重连...') self.logger.error(traceback.format_exc()) await asyncio.sleep(1) async def run(self, is_auth=0, sub_trade=0, sub_fast=0): asyncio.create_task(self.run_public(sub_trade)) while True: await asyncio.sleep(5) async def on_message_public(self, _ws, msg): """""" #print(msg) if "data" in msg: # 推送数据时,有data字段,优先级也最高 if "ticker" in msg: self._update_ticker(msg) elif "trades" in msg: self._update_trade(msg) elif "orderbook" in msg: await self._update_depth(_ws, msg) elif "type" in msg: # event常见于事件回报,一般都可以忽略,只需要看看是否有error if "error" in msg: info = f'{self.name} on_message error! --> {msg}' print(info) self.logger.error(info) elif 'ping' in msg: await _ws.send_str('pong') else: print(msg) def _update_ticker(self, msg): """""" self.public_update_time = time.time() msg = ujson.loads(msg) ticker = msg['data'] bp = float(ticker['bid']) if ticker['bid'] != 'null' else 0 ap = float(ticker['ask']) if ticker['ask'] != 'null' else 0 self.ticker_info["bp"] = bp self.ticker_info["ap"] = ap self.callback['onTicker'](self.ticker_info) def _update_trade(self, msg): """""" self.public_update_time = time.time() msg = ujson.loads(msg) for trade in msg['data']: price = float(trade['price']) amount = float(trade['size']) side = trade['side'] if price > self.max_buy or self.max_buy == 0.0: self.max_buy = price if price < self.min_sell or self.min_sell == 0.0: self.min_sell = price if side == 'buy': self.buy_q += amount self.buy_v += amount*price elif side == 'sell': self.sell_q += amount self.sell_v += amount*price async def _update_depth(self, _ws, msg): """""" msg = ujson.loads(msg) if msg['market']!=self.symbol: return depth = msg['data'] action = msg['type'] if action == 'partial': self._reset_orderbook() for side in {'bids', 'asks'}: book = self._orderbook[side] for price, size in depth[side]: if size: book[price] = size else: del book[price] self._orderbook_timestamp = depth['time'] ob = self.get_orderbook() if self.compare_checksum(ob, depth): self.public_update_time = time.time() bp = float(ob['bids'][0][0]) ap = float(ob['asks'][0][0]) self.ticker_info["bp"] = bp self.ticker_info["ap"] = ap self.callback['onTicker'](self.ticker_info) ##### 标准化深度 mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = mp * utils.EFF_RANGE / utils.LEVEL bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(self.ticker_info["bp"]-step*i) for i in range(utils.LEVEL): ap.append(self.ticker_info["ap"]+step*i) # price_thre = self.ticker_info["bp"] - step index = 0 for bid in ob['bids']: price = float(bid[0]) amount = float(bid[1]) if price > price_thre: bv[index] += amount else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += amount price_thre = self.ticker_info["ap"] + step index = 0 for ask in ob['asks']: price = float(ask[0]) amount = float(ask[1]) if price < price_thre: av[index] += amount else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += amount self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) else: self._reset_orderbook() await self.resubscribe_depth(_ws) async def resubscribe_depth(self, _ws): info = f"{self.name} checksum not correct!" print(info) self.logger.info(info) sub_str = {'op':"unsubscribe",'market':self.symbol, 'channel':'orderbook'} await _ws.send_str(ujson.dumps(sub_str)) await asyncio.sleep(1) sub_str['op'] = 'subscribe' await _ws.send_str(ujson.dumps(sub_str)) def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def get_orderbook(self): return { side: sorted( [(price, quantity) for price, quantity in list(self._orderbook[side].items()) if quantity], key=lambda order: order[0] * (-1 if side == 'bids' else 1) ) for side in {'bids', 'asks'} } @staticmethod def compare_checksum(ob, depth): """计算深度的校验和""" #t1 = time.time() checksum_data = [ ':'.join([f'{float(order[0])}:{float(order[1])}' for order in (bid, offer) if order]) for (bid, offer) in zip_longest(ob['bids'][:100], ob['asks'][:100]) ] cm = int(zlib.crc32(':'.join(checksum_data).encode())) #t2 = time.time() #print(cm, depth['checksum'], (t2-t1)*1000) return cm==depth['checksum']