import random, csv, sys, utils import logging, logging.handlers import model import time import json, ujson import asyncio import aiohttp import traceback import hashlib def empty_call(msg): print(f'空的回调函数 {msg}') class MexcSpotWs: def __init__(self, params:model.ClientParams, colo=0, is_print=0): if colo: print('不支持colo高速线路') self.URL = 'wss://wbs.mexc.com/ws' else: self.URL = 'wss://wbs.mexc.com/ws' self.params = params self.name = self.params.name self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + self.quote self.callback = { "onMarket":self.save_market, "onDepth":empty_call, "onPosition":empty_call, "onEquity":empty_call, "onOrder":empty_call, "onTicker":empty_call, "onDepth":empty_call, "onExit":empty_call, } self.is_print = is_print self.proxy = None if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0} self.stop_flag = 0 self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 self.update_t = 0 self.depth = [] #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def save_market(self, msg): date = time.strftime('%Y-%m-%d',time.localtime()) interval = self.params.interval if msg: exchange = msg['name'] if len(msg['data']) > 1: with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f, delimiter=',') writer.writerow(msg['data']) if self.is_print:print(f'写入行情 {self.symbol}') async def get_depth_flash(self): headers = {} headers['Content-Type'] = 'application/json' headers['X-MBX-APIKEY'] = self.params.access_key url = f'https://api.mexc.com/api/v3/depth?symbol={self.symbol}&limit=1000' session = aiohttp.ClientSession() response = await session.get( url, headers=headers, timeout=5, proxy=self.proxy ) depth_flash = await response.text() await session.close() return ujson.loads(depth_flash) def _update_depth(self, msg): msg = ujson.loads(msg) t = float(msg['params'][1]['time']) if t > self.update_t: self.update_t = t self.ticker_info["bp"] = float(msg['params'][1]['bids'][0][0]) self.ticker_info["ap"] = float(msg['params'][1]['asks'][0][0]) self.callback['onTicker'](self.ticker_info) ##### normalize depth mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5 step = mp * utils.EFF_RANGE / utils.LEVEL bp = [] ap = [] bv = [0 for _ in range(utils.LEVEL)] av = [0 for _ in range(utils.LEVEL)] for i in range(utils.LEVEL): bp.append(self.ticker_info["bp"]-step*i) for i in range(utils.LEVEL): ap.append(self.ticker_info["ap"]+step*i) # price_thre = self.ticker_info["bp"] - step index = 0 for bid in msg['params'][1]['bids']: price = float(bid[0]) amount = float(bid[1]) if price > price_thre: bv[index] += amount else: price_thre -= step index += 1 if index == utils.LEVEL: break bv[index] += amount price_thre = self.ticker_info["ap"] + step index = 0 for ask in msg['params'][1]['asks']: price = float(ask[0]) amount = float(ask[1]) if price < price_thre: av[index] += amount else: price_thre += step index += 1 if index == utils.LEVEL: break av[index] += amount self.depth = bp + bv + ap + av self.callback['onDepth']({'name':self.name,'data':self.depth}) else: self.logger.error("mexc ws推送过期信息") def _update_trade(self, msg): msg = json.loads(msg) for i in msg['params'][1]: side = i["type"] price = float(i["price"]) amount = float(i['amount']) if price > self.max_buy or self.max_buy == 0.0: self.max_buy = price if price < self.min_sell or self.min_sell == 0.0: self.min_sell = price if side == 'buy': self.buy_q += amount self.buy_v += amount*price elif side == 'sell': self.sell_q += amount self.sell_v += amount*price def _update_account(self, msg): msg = json.loads(msg) for i in msg['params'][0]: if self.quote == i: cash = float(msg['params'][0][self.quote]['available'])+float(msg['params'][0][self.quote]['frozen']) self.callback['onEquity'] = { self.quote:cash } elif self.base == i: coin = float(msg['params'][0][self.base]['available'])+float(msg['params'][0][self.base]['frozen']) self.callback['onEquity'] = { self.base:coin } def _update_order(self, msg): self.logger.debug("ws推送订单"+msg) msg = json.loads(msg) event_type = msg['params'][0] event = msg['params'][1] if event_type == 1: # 新增订单 order_event = dict() order_event['filled'] = 0 order_event['filled_price'] = 0 order_event['client_id'] = event["client_id"] order_event['order_id'] = event['id'] order_event['status'] = "NEW" self.callback["onOrder"](order_event) elif event_type == 3: # 删除订单 order_event = dict() order_event['filled'] = float(event["amount"]) - float(event["left"]) order_event['filled_price'] = float(event["price"]) # asset_fee = float(event["asset_fee"]) money_fee = float(event["money_fee"]) stock_fee = float(event["stock_fee"]) # 非amm品种 优先扣cet 其次u 再次b # amm品种 买入收b 卖出收u if event['side'] == 1: # 卖出 order_event['fee'] = money_fee elif event['side'] == 2: # 买入 order_event['fee'] = stock_fee order_event['client_id'] = event["client_id"] order_event['order_id'] = event['id'] order_event['status'] = "REMOVE" self.callback["onOrder"](order_event) def _update_position(self, msg): long_pos, short_pos = 0, 0 long_avg, short_avg = 0, 0 msg = ujson.loads(msg) for i in msg['a']['P']: if i['s'] == self.symbol: if i['ps'] == 'LONG': long_pos += abs(float(i['pa'])) long_avg = abs(float(i['ep'])) if i['ps'] == 'SHORT': short_pos += abs(float(i['pa'])) short_avg = abs(float(i['ep'])) pos = model.Position() pos.longPos = long_pos pos.longAvg = long_avg pos.shortPos = short_pos pos.shortAvg = short_avg self.callback['onPosition'](pos) def _get_data(self): market_data = self.depth + [self.max_buy, self.min_sell] self.max_buy = 0.0 self.min_sell = 0.0 self.buy_v = 0.0 self.buy_q = 0.0 self.sell_v = 0.0 self.sell_q = 0.0 return {'name': self.name,'data':market_data} async def go(self): interval = float(self.params.interval) if self.is_print:print(f'Ws循环器启动 interval {interval}') ### onTrade while 1: try: # 更新市场信息 market_data = self._get_data() self.callback['onMarket'](market_data) except: traceback.print_exc() await asyncio.sleep(interval) async def run(self, is_auth=0, sub_trade=0, sub_fast=0): ping_time = time.time() while True: try: # 尝试连接 print(f'{self.name} 尝试连接ws') # 登陆 ws_url = self.URL async with aiohttp.ClientSession( connector = aiohttp.TCPConnector( limit=50, keepalive_timeout=120, verify_ssl=False, local_addr=(self.ip,0) ) ).ws_connect( ws_url, proxy=self.proxy, timeout=30, receive_timeout=30, ) as _ws: print(f'{self.name} ws连接成功') self.logger.info(f'{self.name} ws连接成功') # 订阅 mexc 现货 symbol = self.symbol.upper() # 鉴权 if is_auth: current_time = int(time.time()*1000) sign_str = f"access_id={self.params.access_key}&tonce={current_time}&secret_key={self.params.secret_key}" md5 = hashlib.md5(sign_str.encode()) param = { "id": 1, "method": "server.sign", "params": [self.params.access_key, md5.hexdigest().upper(), current_time] } await _ws.send_str(ujson.dumps(param)) res = await _ws.receive(timeout=30) # 订阅资产 sub_str = ujson.dumps({"id": 1, "method": "asset.subscribe","params": [self.base,self.quote]}) await _ws.send_str(sub_str) # 订阅私有订单 sub_str = ujson.dumps({"id": 1, "method": "order.subscribe","params": [symbol]}) await _ws.send_str(sub_str) if sub_trade: # 订阅公开成交 sub_str = ujson.dumps({"id": 1, "method": "deals.subscribe","params": [symbol]}) await _ws.send_str(sub_str) # 订阅深度 sub_str = ujson.dumps({"id": 1, "method": "depth.subscribe","params": [symbol, 50, "0.000000001", False]}) await _ws.send_str(sub_str) while True: # 停机信号 if self.stop_flag: await _ws.close() return # 接受消息 try: msg = await _ws.receive(timeout=30) except asyncio.CancelledError: print('ws取消') return except asyncio.TimeoutError: print(f'{self.name} ws长时间没有收到消息 准备重连...') self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...') break except: print(f'{self.name} ws出现错误 准备重连...') self.logger.error(f'{self.name} ws出现错误 准备重连...') self.logger.error(traceback.format_exc()) break msg = msg.data # 处理消息 if 'depth.update' in msg:self._update_depth(msg) elif 'deals.update' in msg:self._update_trade(msg) elif 'asset.update' in msg:self._update_account(msg) elif 'order.update' in msg:self._update_order(msg) else: print(msg) pass if ping_time - time.time() > 60: ping_time = time.time() sub_str = ujson.dumps({"id": 1, "method": "server.ping","params": []}) await _ws.send_str(sub_str) except: _ws = None traceback.print_exc() print(f'{self.name} ws连接失败 开始重连...') self.logger.error(f'{self.name} ws连接失败 开始重连...') await asyncio.sleep(1)