import aiohttp import time import asyncio import zlib import json, ujson import zlib import hmac, sys import base64, csv, random import traceback, hashlib import logging, logging.handlers import utils import model def inflate(data): ''' 解压缩数据 ''' decompress = zlib.decompressobj(-zlib.MAX_WBITS) inflated = decompress.decompress(data) inflated += decompress.flush() return inflated def empty_call(msg): pass def get_sign(secret_key, message): h = (base64.b64encode(hmac.new(secret_key.encode('utf-8'), message.encode('utf-8'), hashlib.sha512).digest())).decode() return h class GateSpotWs: def __init__(self, params:model.ClientParams, colo=0, is_print=0): if colo: print('启用colo高速线路') self.URL = 'wss://spotws-private.gateapi.io/ws/v4/' else: self.URL = 'wss://api.gateio.ws/ws/v4/' self.params = params self.name = self.params.name self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + '_' + self.quote self.callback = { "onMarket":self.save_market, "onDepth":empty_call, "onPosition":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onEquity":empty_call, "onOrder":empty_call, "onTrade":empty_call, "onExit":empty_call, } self.is_print = is_print self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0} self.stop_flag = 0 self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 self.update_t = 0.0 self.depth = [] #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def save_market(self, msg): date = time.strftime('%Y-%m-%d',time.localtime()) interval = self.params.interval if msg: exchange = msg['name'] if len(msg['data']) > 1: with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(msg['data']) if self.is_print:print(f'写入行情 {self.symbol}') def _update_ticker(self, msg): self.ticker_info["bp"] = float(msg['highest_bid']) self.ticker_info["ap"] = float(msg['lowest_ask']) self.callback['onTicker'](self.ticker_info) def _update_depth(self, msg): if msg['t'] > self.update_t: self.update_t = msg['t'] self.ticker_info["bp"] = float(msg['bids'][0][0]) self.ticker_info["ap"] = float(msg['asks'][0][0]) self.callback['onTicker'](self.ticker_info) ##### 标准化深度 mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = mp * utils.EFF_RANGE / utils.LEVEL bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(self.ticker_info["bp"]-step*i) for i in range(utils.LEVEL): ap.append(self.ticker_info["ap"]+step*i) # price_thre = self.ticker_info["bp"] - step index = 0 for bid in msg['bids']: price = float(bid[0]) amount = float(bid[1]) if price > price_thre: bv[index] += amount else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += amount price_thre = self.ticker_info["ap"] + step index = 0 for ask in msg['asks']: price = float(ask[0]) amount = float(ask[1]) if price < price_thre: av[index] += amount else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += amount self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) else: print("收到过期depth") def _update_trade(self, msg): price = float(msg['price']) amount = float(msg['amount']) side = msg['side'] if price > self.max_buy or self.max_buy == 0.0: self.max_buy = price if price < self.min_sell or self.min_sell == 0.0: self.min_sell = price if side == 'buy': self.buy_q += amount self.buy_v += amount*price elif side == 'sell': self.sell_q += amount self.sell_v += amount*price def _update_account(self, msg): for i in msg: if i['currency'].upper() == self.quote: cash = float(i['total']) self.callback['onEquity'] = { self.quote:cash } elif i['currency'].upper() == self.base: coin = float(i['total']) self.callback['onEquity'] = { self.base:coin } def _update_order(self, msg): self.logger.debug(f"ws订单推送 {msg}") for i in msg: if i['event'] == 'put': order_event = dict() order_event['filled'] = 0 order_event['filled_price'] = 0 order_event['client_id'] = i["text"] order_event['order_id'] = i['id'] order_event['status'] = "NEW" self.callback['onOrder'](order_event) elif i['event'] == 'finish': order_event = dict() order_event['filled'] = float(i["amount"]) - float(i["left"]) if order_event['filled'] > 0: order_event['filled_price'] = float(i["filled_total"])/order_event['filled'] else: order_event['filled_price'] = 0 order_event['client_id'] = i["text"] order_event['order_id'] = i['id'] order_event['fee'] = float(i["fee"]) order_event['status'] = "REMOVE" self.callback['onOrder'](order_event) # 根据成交信息更新仓位信息 因为账户信息推送有延迟 # 但订单信息和账户信息到达先后时间可能有前有后 可能平仓 账户先置零仓位 然后sell成交达到 导致仓位变成负数 def _update_usertrade(self, msg): '''暂时不用''' pass def _update_position(self, msg): long_pos, short_pos = 0, 0 long_avg, short_avg = 0, 0 for i in msg[0]['holding']: if i['side'] == 'long': long_pos += float(i['position']) long_avg = float(i['avg_cost']) if i['side'] == 'short': short_pos += float(i['position']) short_avg = float(i['avg_cost']) pos = model.Position() pos.longPos = long_pos pos.longAvg = long_avg pos.shortPos = short_pos pos.shortAvg = short_avg self.callback['onPosition'](pos) def _get_data(self): market_data = self.depth + [self.max_buy, self.min_sell] self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 return {'name': self.name,'data':market_data} async def go(self): interval = float(self.params.interval) if self.is_print:print(f'Ws循环器启动 interval {interval}') ### onTrade while 1: try: # 更新市场信息 market_data = self._get_data() self.callback['onMarket'](market_data) except: traceback.print_exc() await asyncio.sleep(interval) def get_sign(self, message): h = hmac.new(self.params.secret_key.encode("utf8"), message.encode("utf8"), hashlib.sha512) return h.hexdigest() async def run(self, is_auth=0, sub_trade=0, sub_fast=0): while True: try: ping_time = time.time() # 尝试连接 print(f'{self.name} 尝试连接ws') ws_url = self.URL async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: self.is_print:print(f'{self.name} ws连接成功') # 登陆 if is_auth: # userorders current_time = int(time.time()) channel = "spot.orders" sub_str = { "time": current_time, "channel": channel, "event": "subscribe", "payload": [self.symbol] } message = 'channel=%s&event=%s&time=%d' % (channel, "subscribe", current_time) sub_str["auth"] = { "method": "api_key", "KEY": self.params.access_key, "SIGN": self.get_sign(message)} await _ws.send_str(ujson.dumps(sub_str)) # usertrades current_time = int(time.time()) channel = "spot.usertrades" sub_str = { "time": current_time, "channel": channel, "event": "subscribe", "payload": [self.symbol] } message = 'channel=%s&event=%s&time=%d' % (channel, "subscribe", current_time) sub_str["auth"] = { "method": "api_key", "KEY": self.params.access_key, "SIGN": self.get_sign(message)} await _ws.send_str(ujson.dumps(sub_str)) # balance current_time = int(time.time()) channel = "spot.balances" sub_str = { "time": current_time, "channel": channel, "event": "subscribe", "payload": [self.symbol] } message = 'channel=%s&event=%s&time=%d' % (channel, "subscribe", current_time) sub_str["auth"] = { "method": "api_key", "KEY": self.params.access_key, "SIGN": self.get_sign(message)} await _ws.send_str(ujson.dumps(sub_str)) if sub_trade: # public trade current_time = int(time.time()) channel = "spot.trades" sub_str = { "time": current_time, "channel": channel, "event": "subscribe", "payload": [self.symbol] } await _ws.send_str(ujson.dumps(sub_str)) # 订阅public # tickers 太慢了 # current_time = int(time.time()) # channel = "spot.tickers" # sub_str = { # "time": current_time, # "channel": channel, # "event": "subscribe", # "payload": [self.symbol] # } # await _ws.send_str(ujson.dumps(sub_str)) # depth current_time = int(time.time()) channel = "spot.order_book" sub_str = { "time": current_time, "channel": channel, "event": "subscribe", "payload": [self.symbol,"20","100ms"] } await _ws.send_str(ujson.dumps(sub_str)) while True: # 停机信号 if self.stop_flag: await _ws.close() return # 接受消息 try: msg = await _ws.receive(timeout=10) except: print(f'{self.name} ws长时间没有收到消息 准备重连...') self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...') break msg = msg[1] # 处理消息 if 'update' in msg: msg = ujson.loads(msg) # if msg['channel'] == 'spot.tickers':self._update_ticker(msg['result']) if msg['channel'] == 'spot.order_book':self._update_depth(msg['result']) elif msg['channel'] == 'spot.balances':self._update_account(msg['result']) elif msg['channel'] == 'spot.orders':self._update_order(msg['result']) # if msg['channel'] == 'spot.usertrades':self._update_usertrade(msg['result']) elif msg['channel'] == 'spot.trades':self._update_trade(msg['result']) else: # print(msg) pass # pong if time.time() - ping_time > 5: await _ws.send_str('{"time": %d, "channel" : "spot.ping"}' % int(time.time())) ping_time = time.time() except: traceback.print_exc() print(f'{self.name} ws连接失败 开始重连...') self.logger.error(f'{self.name} ws连接失败 开始重连...') # await asyncio.sleep(1)