mexc_spot_ws.py 14 KB


  1. import random, csv, sys, utils
  2. import logging, logging.handlers
  3. import model
  4. import time
  5. import json, ujson
  6. import asyncio
  7. import aiohttp
  8. import traceback
  9. import hashlib
  10. def empty_call(msg):
  11. print(f'空的回调函数 {msg}')
  12. class MexcSpotWs:
  13. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  14. if colo:
  15. print('不支持colo高速线路')
  16. self.URL = 'wss://wbs.mexc.com/ws'
  17. else:
  18. self.URL = 'wss://wbs.mexc.com/ws'
  19. self.params = params
  20. self.name = self.params.name
  21. self.base = self.params.pair.split('_')[0].upper()
  22. self.quote = self.params.pair.split('_')[1].upper()
  23. self.symbol = self.base + self.quote
  24. self.callback = {
  25. "onMarket":self.save_market,
  26. "onDepth":empty_call,
  27. "onPosition":empty_call,
  28. "onEquity":empty_call,
  29. "onOrder":empty_call,
  30. "onTicker":empty_call,
  31. "onDepth":empty_call,
  32. "onExit":empty_call,
  33. }
  34. self.is_print = is_print
  35. self.proxy = None
  36. if 'win' in sys.platform:
  37. self.proxy = self.params.proxy
  38. self.logger = self.get_logger()
  39. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  40. self.stop_flag = 0
  41. self.max_buy = 0.0
  42. self.min_sell = 0.0
  43. self.buy_v = 0.0
  44. self.buy_q = 0.0
  45. self.sell_v = 0.0
  46. self.sell_q = 0.0
  47. self.update_t = 0
  48. self.depth = []
  49. #### 指定发包ip
  50. iplist = utils.get_local_ip_list()
  51. self.ip = iplist[int(self.params.ip)]
  52. def get_logger(self):
  53. logger = logging.getLogger(__name__)
  54. logger.setLevel(logging.DEBUG)
  55. # log to txt
  56. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  57. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  58. handler.setLevel(logging.DEBUG)
  59. handler.setFormatter(formatter)
  60. logger.addHandler(handler)
  61. return logger
  62. def save_market(self, msg):
  63. date = time.strftime('%Y-%m-%d',time.localtime())
  64. interval = self.params.interval
  65. if msg:
  66. exchange = msg['name']
  67. if len(msg['data']) > 1:
  68. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  69. 'a',
  70. newline='',
  71. encoding='utf-8') as f:
  72. writer = csv.writer(f, delimiter=',')
  73. writer.writerow(msg['data'])
  74. if self.is_print:print(f'写入行情 {self.symbol}')
  75. async def get_depth_flash(self):
  76. headers = {}
  77. headers['Content-Type'] = 'application/json'
  78. headers['X-MBX-APIKEY'] = self.params.access_key
  79. url = f'https://api.mexc.com/api/v3/depth?symbol={self.symbol}&limit=1000'
  80. session = aiohttp.ClientSession()
  81. response = await session.get(
  82. url,
  83. headers=headers,
  84. timeout=5,
  85. proxy=self.proxy
  86. )
  87. depth_flash = await response.text()
  88. await session.close()
  89. return ujson.loads(depth_flash)
  90. def _update_depth(self, msg):
  91. msg = ujson.loads(msg)
  92. t = float(msg['params'][1]['time'])
  93. if t > self.update_t:
  94. self.update_t = t
  95. self.ticker_info["bp"] = float(msg['params'][1]['bids'][0][0])
  96. self.ticker_info["ap"] = float(msg['params'][1]['asks'][0][0])
  97. self.callback['onTicker'](self.ticker_info)
  98. ##### normalize depth
  99. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  100. step = mp * utils.EFF_RANGE / utils.LEVEL
  101. bp = []
  102. ap = []
  103. bv = [0 for _ in range(utils.LEVEL)]
  104. av = [0 for _ in range(utils.LEVEL)]
  105. for i in range(utils.LEVEL):
  106. bp.append(self.ticker_info["bp"]-step*i)
  107. for i in range(utils.LEVEL):
  108. ap.append(self.ticker_info["ap"]+step*i)
  109. #
  110. price_thre = self.ticker_info["bp"] - step
  111. index = 0
  112. for bid in msg['params'][1]['bids']:
  113. price = float(bid[0])
  114. amount = float(bid[1])
  115. if price > price_thre:
  116. bv[index] += amount
  117. else:
  118. price_thre -= step
  119. index += 1
  120. if index == utils.LEVEL:
  121. break
  122. bv[index] += amount
  123. price_thre = self.ticker_info["ap"] + step
  124. index = 0
  125. for ask in msg['params'][1]['asks']:
  126. price = float(ask[0])
  127. amount = float(ask[1])
  128. if price < price_thre:
  129. av[index] += amount
  130. else:
  131. price_thre += step
  132. index += 1
  133. if index == utils.LEVEL:
  134. break
  135. av[index] += amount
  136. self.depth = bp + bv + ap + av
  137. self.callback['onDepth']({'name':self.name,'data':self.depth})
  138. else:
  139. self.logger.error("mexc ws推送过期信息")
  140. def _update_trade(self, msg):
  141. msg = json.loads(msg)
  142. for i in msg['params'][1]:
  143. side = i["type"]
  144. price = float(i["price"])
  145. amount = float(i['amount'])
  146. if price > self.max_buy or self.max_buy == 0.0:
  147. self.max_buy = price
  148. if price < self.min_sell or self.min_sell == 0.0:
  149. self.min_sell = price
  150. if side == 'buy':
  151. self.buy_q += amount
  152. self.buy_v += amount*price
  153. elif side == 'sell':
  154. self.sell_q += amount
  155. self.sell_v += amount*price
  156. def _update_account(self, msg):
  157. msg = json.loads(msg)
  158. for i in msg['params'][0]:
  159. if self.quote == i:
  160. cash = float(msg['params'][0][self.quote]['available'])+float(msg['params'][0][self.quote]['frozen'])
  161. self.callback['onEquity'] = {
  162. self.quote:cash
  163. }
  164. elif self.base == i:
  165. coin = float(msg['params'][0][self.base]['available'])+float(msg['params'][0][self.base]['frozen'])
  166. self.callback['onEquity'] = {
  167. self.base:coin
  168. }
  169. def _update_order(self, msg):
  170. self.logger.debug("ws推送订单"+msg)
  171. msg = json.loads(msg)
  172. event_type = msg['params'][0]
  173. event = msg['params'][1]
  174. if event_type == 1: # 新增订单
  175. order_event = dict()
  176. order_event['filled'] = 0
  177. order_event['filled_price'] = 0
  178. order_event['client_id'] = event["client_id"]
  179. order_event['order_id'] = event['id']
  180. order_event['status'] = "NEW"
  181. self.callback["onOrder"](order_event)
  182. elif event_type == 3: # 删除订单
  183. order_event = dict()
  184. order_event['filled'] = float(event["amount"]) - float(event["left"])
  185. order_event['filled_price'] = float(event["price"])
  186. # asset_fee = float(event["asset_fee"])
  187. money_fee = float(event["money_fee"])
  188. stock_fee = float(event["stock_fee"])
  189. # 非amm品种 优先扣cet 其次u 再次b
  190. # amm品种 买入收b 卖出收u
  191. if event['side'] == 1:
  192. # 卖出
  193. order_event['fee'] = money_fee
  194. elif event['side'] == 2:
  195. # 买入
  196. order_event['fee'] = stock_fee
  197. order_event['client_id'] = event["client_id"]
  198. order_event['order_id'] = event['id']
  199. order_event['status'] = "REMOVE"
  200. self.callback["onOrder"](order_event)
  201. def _update_position(self, msg):
  202. long_pos, short_pos = 0, 0
  203. long_avg, short_avg = 0, 0
  204. msg = ujson.loads(msg)
  205. for i in msg['a']['P']:
  206. if i['s'] == self.symbol:
  207. if i['ps'] == 'LONG':
  208. long_pos += abs(float(i['pa']))
  209. long_avg = abs(float(i['ep']))
  210. if i['ps'] == 'SHORT':
  211. short_pos += abs(float(i['pa']))
  212. short_avg = abs(float(i['ep']))
  213. pos = model.Position()
  214. pos.longPos = long_pos
  215. pos.longAvg = long_avg
  216. pos.shortPos = short_pos
  217. pos.shortAvg = short_avg
  218. self.callback['onPosition'](pos)
  219. def _get_data(self):
  220. market_data = self.depth + [self.max_buy, self.min_sell]
  221. self.max_buy = 0.0
  222. self.min_sell = 0.0
  223. self.buy_v = 0.0
  224. self.buy_q = 0.0
  225. self.sell_v = 0.0
  226. self.sell_q = 0.0
  227. return {'name': self.name,'data':market_data}
  228. async def go(self):
  229. interval = float(self.params.interval)
  230. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  231. ### onTrade
  232. while 1:
  233. try:
  234. # 更新市场信息
  235. market_data = self._get_data()
  236. self.callback['onMarket'](market_data)
  237. except:
  238. traceback.print_exc()
  239. await asyncio.sleep(interval)
  240. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  241. ping_time = time.time()
  242. while True:
  243. try:
  244. # 尝试连接
  245. print(f'{self.name} 尝试连接ws')
  246. # 登陆
  247. ws_url = self.URL
  248. async with aiohttp.ClientSession(
  249. connector = aiohttp.TCPConnector(
  250. limit=50,
  251. keepalive_timeout=120,
  252. verify_ssl=False,
  253. local_addr=(self.ip,0)
  254. )
  255. ).ws_connect(
  256. ws_url,
  257. proxy=self.proxy,
  258. timeout=30,
  259. receive_timeout=30,
  260. ) as _ws:
  261. print(f'{self.name} ws连接成功')
  262. self.logger.info(f'{self.name} ws连接成功')
  263. # 订阅 mexc 现货
  264. symbol = self.symbol.upper()
  265. # 鉴权
  266. if is_auth:
  267. current_time = int(time.time()*1000)
  268. sign_str = f"access_id={self.params.access_key}&tonce={current_time}&secret_key={self.params.secret_key}"
  269. md5 = hashlib.md5(sign_str.encode())
  270. param = {
  271. "id": 1,
  272. "method": "server.sign",
  273. "params": [self.params.access_key, md5.hexdigest().upper(), current_time]
  274. }
  275. await _ws.send_str(ujson.dumps(param))
  276. res = await _ws.receive(timeout=30)
  277. # 订阅资产
  278. sub_str = ujson.dumps({"id": 1, "method": "asset.subscribe","params": [self.base,self.quote]})
  279. await _ws.send_str(sub_str)
  280. # 订阅私有订单
  281. sub_str = ujson.dumps({"id": 1, "method": "order.subscribe","params": [symbol]})
  282. await _ws.send_str(sub_str)
  283. if sub_trade:
  284. # 订阅公开成交
  285. sub_str = ujson.dumps({"id": 1, "method": "deals.subscribe","params": [symbol]})
  286. await _ws.send_str(sub_str)
  287. # 订阅深度
  288. sub_str = ujson.dumps({"id": 1, "method": "depth.subscribe","params": [symbol, 50, "0.000000001", False]})
  289. await _ws.send_str(sub_str)
  290. while True:
  291. # 停机信号
  292. if self.stop_flag:
  293. await _ws.close()
  294. return
  295. # 接受消息
  296. try:
  297. msg = await _ws.receive(timeout=30)
  298. except asyncio.CancelledError:
  299. print('ws取消')
  300. return
  301. except asyncio.TimeoutError:
  302. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  303. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  304. break
  305. except:
  306. print(f'{self.name} ws出现错误 准备重连...')
  307. self.logger.error(f'{self.name} ws出现错误 准备重连...')
  308. self.logger.error(traceback.format_exc())
  309. break
  310. msg = msg.data
  311. # 处理消息
  312. if 'depth.update' in msg:self._update_depth(msg)
  313. elif 'deals.update' in msg:self._update_trade(msg)
  314. elif 'asset.update' in msg:self._update_account(msg)
  315. elif 'order.update' in msg:self._update_order(msg)
  316. else:
  317. print(msg)
  318. pass
  319. if ping_time - time.time() > 60:
  320. ping_time = time.time()
  321. sub_str = ujson.dumps({"id": 1, "method": "server.ping","params": []})
  322. await _ws.send_str(sub_str)
  323. except:
  324. _ws = None
  325. traceback.print_exc()
  326. print(f'{self.name} ws连接失败 开始重连...')
  327. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  328. await asyncio.sleep(1)