coinex_spot_ws.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json, ujson
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random, csv, sys, utils
  12. import logging, logging.handlers
  13. import model
  14. def empty_call(msg):
  15. pass
  16. class CoinExSpotWs:
  17. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  18. if colo:
  19. print('不支持colo高速线路')
  20. self.URL = 'wss://socket.coinex.com/'
  21. else:
  22. self.URL = 'wss://socket.coinex.com/'
  23. self.params = params
  24. self.name = self.params.name
  25. self.base = self.params.pair.split('_')[0].upper()
  26. self.quote = self.params.pair.split('_')[1].upper()
  27. self.symbol = self.base + self.quote
  28. self.callback = {
  29. "onMarket":self.save_market,
  30. "onDepth":empty_call,
  31. "onPosition":empty_call,
  32. "onEquity":empty_call,
  33. "onOrder":empty_call,
  34. "onTicker":empty_call,
  35. "onDepth":empty_call,
  36. "onExit":empty_call,
  37. }
  38. self.is_print = is_print
  39. self.proxy = None
  40. if 'win' in sys.platform:
  41. self.proxy = self.params.proxy
  42. self.logger = self.get_logger()
  43. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  44. self.stop_flag = 0
  45. self.max_buy = 0.0
  46. self.min_sell = 0.0
  47. self.buy_v = 0.0
  48. self.buy_q = 0.0
  49. self.sell_v = 0.0
  50. self.sell_q = 0.0
  51. self.update_t = 0
  52. self.depth = []
  53. #### 指定发包ip
  54. iplist = utils.get_local_ip_list()
  55. self.ip = iplist[int(self.params.ip)]
  56. def get_logger(self):
  57. logger = logging.getLogger(__name__)
  58. logger.setLevel(logging.DEBUG)
  59. # log to txt
  60. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  61. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  62. handler.setLevel(logging.DEBUG)
  63. handler.setFormatter(formatter)
  64. logger.addHandler(handler)
  65. return logger
  66. def save_market(self, msg):
  67. date = time.strftime('%Y-%m-%d',time.localtime())
  68. interval = self.params.interval
  69. if msg:
  70. exchange = msg['name']
  71. if len(msg['data']) > 1:
  72. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  73. 'a',
  74. newline='',
  75. encoding='utf-8') as f:
  76. writer = csv.writer(f, delimiter=',')
  77. writer.writerow(msg['data'])
  78. if self.is_print:print(f'写入行情 {self.symbol}')
  79. async def get_depth_flash(self):
  80. headers = {}
  81. headers['Content-Type'] = 'application/json'
  82. headers['X-MBX-APIKEY'] = self.params.access_key
  83. url = f'https://api.binance.com/api/v1/depth?symbol={self.symbol}&limit=1000'
  84. session = aiohttp.ClientSession()
  85. response = await session.get(
  86. url,
  87. headers=headers,
  88. timeout=5,
  89. proxy=self.proxy
  90. )
  91. depth_flash = await response.text()
  92. await session.close()
  93. return ujson.loads(depth_flash)
  94. def _update_depth(self, msg):
  95. msg = ujson.loads(msg)
  96. t = float(msg['params'][1]['time'])
  97. if t > self.update_t:
  98. self.update_t = t
  99. self.ticker_info["bp"] = float(msg['params'][1]['bids'][0][0])
  100. self.ticker_info["ap"] = float(msg['params'][1]['asks'][0][0])
  101. self.callback['onTicker'](self.ticker_info)
  102. ##### normalize depth
  103. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  104. step = mp * utils.EFF_RANGE / utils.LEVEL
  105. bp = []
  106. ap = []
  107. bv = [0 for _ in range(utils.LEVEL)]
  108. av = [0 for _ in range(utils.LEVEL)]
  109. for i in range(utils.LEVEL):
  110. bp.append(self.ticker_info["bp"]-step*i)
  111. for i in range(utils.LEVEL):
  112. ap.append(self.ticker_info["ap"]+step*i)
  113. #
  114. price_thre = self.ticker_info["bp"] - step
  115. index = 0
  116. for bid in msg['params'][1]['bids']:
  117. price = float(bid[0])
  118. amount = float(bid[1])
  119. if price > price_thre:
  120. bv[index] += amount
  121. else:
  122. price_thre -= step
  123. index += 1
  124. if index == utils.LEVEL:
  125. break
  126. bv[index] += amount
  127. price_thre = self.ticker_info["ap"] + step
  128. index = 0
  129. for ask in msg['params'][1]['asks']:
  130. price = float(ask[0])
  131. amount = float(ask[1])
  132. if price < price_thre:
  133. av[index] += amount
  134. else:
  135. price_thre += step
  136. index += 1
  137. if index == utils.LEVEL:
  138. break
  139. av[index] += amount
  140. self.depth = bp + bv + ap + av
  141. self.callback['onDepth']({'name':self.name,'data':self.depth})
  142. else:
  143. self.logger.error("coienx ws推送过期信息")
  144. def _update_trade(self, msg):
  145. msg = json.loads(msg)
  146. for i in msg['params'][1]:
  147. side = i["type"]
  148. price = float(i["price"])
  149. amount = float(i['amount'])
  150. if price > self.max_buy or self.max_buy == 0.0:
  151. self.max_buy = price
  152. if price < self.min_sell or self.min_sell == 0.0:
  153. self.min_sell = price
  154. if side == 'buy':
  155. self.buy_q += amount
  156. self.buy_v += amount*price
  157. elif side == 'sell':
  158. self.sell_q += amount
  159. self.sell_v += amount*price
  160. def _update_account(self, msg):
  161. msg = json.loads(msg)
  162. for i in msg['params'][0]:
  163. if self.quote == i:
  164. cash = float(msg['params'][0][self.quote]['available'])+float(msg['params'][0][self.quote]['frozen'])
  165. self.callback['onEquity'] = {
  166. self.quote:cash
  167. }
  168. elif self.base == i:
  169. coin = float(msg['params'][0][self.base]['available'])+float(msg['params'][0][self.base]['frozen'])
  170. self.callback['onEquity'] = {
  171. self.base:coin
  172. }
  173. def _update_order(self, msg):
  174. self.logger.debug("ws推送订单"+msg)
  175. msg = json.loads(msg)
  176. event_type = msg['params'][0]
  177. event = msg['params'][1]
  178. if event_type == 1: # 新增订单
  179. order_event = dict()
  180. order_event['filled'] = 0
  181. order_event['filled_price'] = 0
  182. order_event['client_id'] = event["client_id"]
  183. order_event['order_id'] = event['id']
  184. order_event['status'] = "NEW"
  185. self.callback["onOrder"](order_event)
  186. elif event_type == 3: # 删除订单
  187. order_event = dict()
  188. order_event['filled'] = float(event["amount"]) - float(event["left"])
  189. order_event['filled_price'] = float(event["price"])
  190. # asset_fee = float(event["asset_fee"])
  191. money_fee = float(event["money_fee"])
  192. stock_fee = float(event["stock_fee"])
  193. # 非amm品种 优先扣cet 其次u 再次b
  194. # amm品种 买入收b 卖出收u
  195. if event['side'] == 1:
  196. # 卖出
  197. order_event['fee'] = money_fee
  198. elif event['side'] == 2:
  199. # 买入
  200. order_event['fee'] = stock_fee
  201. order_event['client_id'] = event["client_id"]
  202. order_event['order_id'] = event['id']
  203. order_event['status'] = "REMOVE"
  204. self.callback["onOrder"](order_event)
  205. def _update_position(self, msg):
  206. long_pos, short_pos = 0, 0
  207. long_avg, short_avg = 0, 0
  208. msg = ujson.loads(msg)
  209. for i in msg['a']['P']:
  210. if i['s'] == self.symbol:
  211. if i['ps'] == 'LONG':
  212. long_pos += abs(float(i['pa']))
  213. long_avg = abs(float(i['ep']))
  214. if i['ps'] == 'SHORT':
  215. short_pos += abs(float(i['pa']))
  216. short_avg = abs(float(i['ep']))
  217. pos = model.Position()
  218. pos.longPos = long_pos
  219. pos.longAvg = long_avg
  220. pos.shortPos = short_pos
  221. pos.shortAvg = short_avg
  222. self.callback['onPosition'](pos)
  223. def _get_data(self):
  224. market_data = self.depth + [self.max_buy, self.min_sell]
  225. self.max_buy = 0.0
  226. self.min_sell = 0.0
  227. self.buy_v = 0.0
  228. self.buy_q = 0.0
  229. self.sell_v = 0.0
  230. self.sell_q = 0.0
  231. return {'name': self.name,'data':market_data}
  232. async def go(self):
  233. interval = float(self.params.interval)
  234. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  235. ### onTrade
  236. while 1:
  237. try:
  238. # 更新市场信息
  239. market_data = self._get_data()
  240. self.callback['onMarket'](market_data)
  241. except:
  242. traceback.print_exc()
  243. await asyncio.sleep(interval)
  244. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  245. ping_time = time.time()
  246. while True:
  247. try:
  248. # 尝试连接
  249. print(f'{self.name} 尝试连接ws')
  250. # 登陆
  251. ws_url = self.URL
  252. async with aiohttp.ClientSession(
  253. connector = aiohttp.TCPConnector(
  254. limit=50,
  255. keepalive_timeout=120,
  256. verify_ssl=False,
  257. local_addr=(self.ip,0)
  258. )
  259. ).ws_connect(
  260. ws_url,
  261. proxy=self.proxy,
  262. timeout=30,
  263. receive_timeout=30,
  264. ) as _ws:
  265. print(f'{self.name} ws连接成功')
  266. self.logger.info(f'{self.name} ws连接成功')
  267. # 订阅 coinex 现货
  268. symbol = self.symbol.upper()
  269. # 鉴权
  270. if is_auth:
  271. current_time = int(time.time()*1000)
  272. sign_str = f"access_id={self.params.access_key}&tonce={current_time}&secret_key={self.params.secret_key}"
  273. md5 = hashlib.md5(sign_str.encode())
  274. param = {
  275. "id": 1,
  276. "method": "server.sign",
  277. "params": [self.params.access_key, md5.hexdigest().upper(), current_time]
  278. }
  279. await _ws.send_str(ujson.dumps(param))
  280. res = await _ws.receive(timeout=30)
  281. # 订阅资产
  282. sub_str = ujson.dumps({"id": 1, "method": "asset.subscribe","params": [self.base,self.quote]})
  283. await _ws.send_str(sub_str)
  284. # 订阅私有订单
  285. sub_str = ujson.dumps({"id": 1, "method": "order.subscribe","params": [symbol]})
  286. await _ws.send_str(sub_str)
  287. if sub_trade:
  288. # 订阅公开成交
  289. sub_str = ujson.dumps({"id": 1, "method": "deals.subscribe","params": [symbol]})
  290. await _ws.send_str(sub_str)
  291. # 订阅深度
  292. sub_str = ujson.dumps({"id": 1, "method": "depth.subscribe","params": [symbol, 50, "0.000000001", False]})
  293. await _ws.send_str(sub_str)
  294. while True:
  295. # 停机信号
  296. if self.stop_flag:
  297. await _ws.close()
  298. return
  299. # 接受消息
  300. try:
  301. msg = await _ws.receive(timeout=30)
  302. except asyncio.CancelledError:
  303. print('ws取消')
  304. return
  305. except asyncio.TimeoutError:
  306. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  307. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  308. break
  309. except:
  310. print(f'{self.name} ws出现错误 准备重连...')
  311. self.logger.error(f'{self.name} ws出现错误 准备重连...')
  312. self.logger.error(traceback.format_exc())
  313. break
  314. msg = msg.data
  315. # 处理消息
  316. if 'depth.update' in msg:self._update_depth(msg)
  317. elif 'deals.update' in msg:self._update_trade(msg)
  318. elif 'asset.update' in msg:self._update_account(msg)
  319. elif 'order.update' in msg:self._update_order(msg)
  320. else:
  321. print(msg)
  322. pass
  323. if ping_time - time.time() > 60:
  324. ping_time = time.time()
  325. sub_str = ujson.dumps({"id": 1, "method": "server.ping","params": []})
  326. await _ws.send_str(sub_str)
  327. except:
  328. _ws = None
  329. traceback.print_exc()
  330. print(f'{self.name} ws连接失败 开始重连...')
  331. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  332. # await asyncio.sleep(1)