import aiohttp import time import asyncio import zlib import json, ujson import zlib import hashlib import hmac import base64 import traceback import random, csv, sys, utils import logging, logging.handlers import model def empty_call(msg): pass class CoinExSpotWs: def __init__(self, params:model.ClientParams, colo=0, is_print=0): if colo: print('不支持colo高速线路') self.URL = 'wss://socket.coinex.com/' else: self.URL = 'wss://socket.coinex.com/' self.params = params self.name = self.params.name self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + self.quote self.callback = { "onMarket":self.save_market, "onDepth":empty_call, "onPosition":empty_call, "onEquity":empty_call, "onOrder":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onExit":empty_call, } self.is_print = is_print self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0} self.stop_flag = 0 self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 self.update_t = 0 self.depth = [] #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def save_market(self, msg): date = time.strftime('%Y-%m-%d',time.localtime()) interval = self.params.interval if msg: exchange = msg['name'] if len(msg['data']) > 1: with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(msg['data']) if self.is_print:print(f'写入行情 {self.symbol}') async def get_depth_flash(self): headers = {} headers['Content-Type'] = 'application/json' headers['X-MBX-APIKEY'] = self.params.access_key url = f'https://api.binance.com/api/v1/depth?symbol={self.symbol}&limit=1000' session = aiohttp.ClientSession() response = await session.get( url, headers=headers, timeout=5, proxy=self.proxy ) depth_flash = await response.text() await session.close() return ujson.loads(depth_flash) def _update_depth(self, msg): msg = ujson.loads(msg) t = float(msg['params'][1]['time']) if t > self.update_t: self.update_t = t self.ticker_info["bp"] = float(msg['params'][1]['bids'][0][0]) self.ticker_info["ap"] = float(msg['params'][1]['asks'][0][0]) self.callback['onTicker'](self.ticker_info) ##### normalize depth mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = mp * utils.EFF_RANGE / utils.LEVEL bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(self.ticker_info["bp"]-step*i) for i in range(utils.LEVEL): ap.append(self.ticker_info["ap"]+step*i) # price_thre = self.ticker_info["bp"] - step index = 0 for bid in msg['params'][1]['bids']: price = float(bid[0]) amount = float(bid[1]) if price > price_thre: bv[index] += amount else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += amount price_thre = self.ticker_info["ap"] + step index = 0 for ask in msg['params'][1]['asks']: price = float(ask[0]) amount = float(ask[1]) if price < price_thre: av[index] += amount else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += amount self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) else: self.logger.error("coienx ws推送过期信息") def _update_trade(self, msg): msg = json.loads(msg) for i in msg['params'][1]: side = i["type"] price = float(i["price"]) amount = float(i['amount']) if price > self.max_buy or self.max_buy == 0.0: self.max_buy = price if price < self.min_sell or self.min_sell == 0.0: self.min_sell = price if side == 'buy': self.buy_q += amount self.buy_v += amount*price elif side == 'sell': self.sell_q += amount self.sell_v += amount*price def _update_account(self, msg): msg = json.loads(msg) for i in msg['params'][0]: if self.quote == i: cash = float(msg['params'][0][self.quote]['available'])+float(msg['params'][0][self.quote]['frozen']) self.callback['onEquity'] = { self.quote:cash } elif self.base == i: coin = float(msg['params'][0][self.base]['available'])+float(msg['params'][0][self.base]['frozen']) self.callback['onEquity'] = { self.base:coin } def _update_order(self, msg): self.logger.debug("ws推送订单"+msg) msg = json.loads(msg) event_type = msg['params'][0] event = msg['params'][1] if event_type == 1: # 新增订单 order_event = dict() order_event['filled'] = 0 order_event['filled_price'] = 0 order_event['client_id'] = event["client_id"] order_event['order_id'] = event['id'] order_event['status'] = "NEW" self.callback["onOrder"](order_event) elif event_type == 3: # 删除订单 order_event = dict() order_event['filled'] = float(event["amount"]) - float(event["left"]) order_event['filled_price'] = float(event["price"]) # asset_fee = float(event["asset_fee"]) money_fee = float(event["money_fee"]) stock_fee = float(event["stock_fee"]) # 非amm品种 优先扣cet 其次u 再次b # amm品种 买入收b 卖出收u if event['side'] == 1: # 卖出 order_event['fee'] = money_fee elif event['side'] == 2: # 买入 order_event['fee'] = stock_fee order_event['client_id'] = event["client_id"] order_event['order_id'] = event['id'] order_event['status'] = "REMOVE" self.callback["onOrder"](order_event) def _update_position(self, msg): long_pos, short_pos = 0, 0 long_avg, short_avg = 0, 0 msg = ujson.loads(msg) for i in msg['a']['P']: if i['s'] == self.symbol: if i['ps'] == 'LONG': long_pos += abs(float(i['pa'])) long_avg = abs(float(i['ep'])) if i['ps'] == 'SHORT': short_pos += abs(float(i['pa'])) short_avg = abs(float(i['ep'])) pos = model.Position() pos.longPos = long_pos pos.longAvg = long_avg pos.shortPos = short_pos pos.shortAvg = short_avg self.callback['onPosition'](pos) def _get_data(self): market_data = self.depth + [self.max_buy, self.min_sell] self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 return {'name': self.name,'data':market_data} async def go(self): interval = float(self.params.interval) if self.is_print:print(f'Ws循环器启动 interval {interval}') ### onTrade while 1: try: # 更新市场信息 market_data = self._get_data() self.callback['onMarket'](market_data) except: traceback.print_exc() await asyncio.sleep(interval) async def run(self, is_auth=0, sub_trade=0, sub_fast=0): ping_time = time.time() while True: try: # 尝试连接 print(f'{self.name} 尝试连接ws') # 登陆 ws_url = self.URL async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f'{self.name} ws连接成功') self.logger.info(f'{self.name} ws连接成功') # 订阅 coinex 现货 symbol = self.symbol.upper() # 鉴权 if is_auth: current_time = int(time.time()*1000) sign_str = f"access_id={self.params.access_key}&tonce={current_time}&secret_key={self.params.secret_key}" md5 = hashlib.md5(sign_str.encode()) param = { "id": 1, "method": "server.sign", "params": [self.params.access_key, md5.hexdigest().upper(), current_time] } await _ws.send_str(ujson.dumps(param)) res = await _ws.receive(timeout=30) # 订阅资产 sub_str = ujson.dumps({"id": 1, "method": "asset.subscribe","params": [self.base,self.quote]}) await _ws.send_str(sub_str) # 订阅私有订单 sub_str = ujson.dumps({"id": 1, "method": "order.subscribe","params": [symbol]}) await _ws.send_str(sub_str) if sub_trade: # 订阅公开成交 sub_str = ujson.dumps({"id": 1, "method": "deals.subscribe","params": [symbol]}) await _ws.send_str(sub_str) # 订阅深度 sub_str = ujson.dumps({"id": 1, "method": "depth.subscribe","params": [symbol, 50, "0.000000001", False]}) await _ws.send_str(sub_str) while True: # 停机信号 if self.stop_flag: await _ws.close() return # 接受消息 try: msg = await _ws.receive(timeout=30) except asyncio.CancelledError: print('ws取消') return except asyncio.TimeoutError: print(f'{self.name} ws长时间没有收到消息 准备重连...') self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...') break except: print(f'{self.name} ws出现错误 准备重连...') self.logger.error(f'{self.name} ws出现错误 准备重连...') self.logger.error(traceback.format_exc()) break msg = msg.data # 处理消息 if 'depth.update' in msg:self._update_depth(msg) elif 'deals.update' in msg:self._update_trade(msg) elif 'asset.update' in msg:self._update_account(msg) elif 'order.update' in msg:self._update_order(msg) else: print(msg) pass if ping_time - time.time() > 60: ping_time = time.time() sub_str = ujson.dumps({"id": 1, "method": "server.ping","params": []}) await _ws.send_str(sub_str) except: _ws = None traceback.print_exc() print(f'{self.name} ws连接失败 开始重连...') self.logger.error(f'{self.name} ws连接失败 开始重连...') # await asyncio.sleep(1)