coinex_usdt_swap_ws.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json, ujson
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random, csv, sys, utils
  12. import logging, logging.handlers
  13. import model
  14. def empty_call(msg):
  15. pass
  16. class CoinExUsdtSwapWs:
  17. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  18. if colo:
  19. print('不支持colo高速线路')
  20. self.URL = 'wss://perpetual.coinex.com/'
  21. else:
  22. self.URL = 'wss://perpetual.coinex.com/'
  23. self.params = params
  24. self.name = self.params.name
  25. self.base = self.params.pair.split('_')[0].upper()
  26. self.quote = self.params.pair.split('_')[1].upper()
  27. self.symbol = self.base + self.quote
  28. self.data = dict()
  29. self.data['trade'] = []
  30. self.on_trade_ratio = 5.0
  31. self.on_trade_value = 1000000.0
  32. self.trade_mean = 0.0
  33. self.data['force'] = []
  34. self.callback = {
  35. "onMarket":self.save_market,
  36. "onPosition":empty_call,
  37. "onEquity":empty_call,
  38. "onOrder":empty_call,
  39. "onTicker":empty_call,
  40. "onDepth":empty_call,
  41. "onExit":empty_call,
  42. }
  43. self.is_print = is_print
  44. self.proxy = None
  45. if 'win' in sys.platform:
  46. self.proxy = self.params.proxy
  47. self.logger = self.get_logger()
  48. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  49. self.stop_flag = 0
  50. self.max_buy = 0.0
  51. self.min_sell = 0.0
  52. self.buy_v = 0.0
  53. self.buy_q = 0.0
  54. self.sell_v = 0.0
  55. self.sell_q = 0.0
  56. self.depth = []
  57. #### 指定发包ip
  58. iplist = utils.get_local_ip_list()
  59. self.ip = iplist[int(self.params.ip)]
  60. def get_logger(self):
  61. logger = logging.getLogger(__name__)
  62. logger.setLevel(logging.DEBUG)
  63. # log to txt
  64. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  65. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  66. handler.setLevel(logging.DEBUG)
  67. handler.setFormatter(formatter)
  68. logger.addHandler(handler)
  69. return logger
  70. def save_market(self, msg):
  71. date = time.strftime('%Y-%m-%d',time.localtime())
  72. interval = self.params.interval
  73. if msg:
  74. exchange = msg['name']
  75. if len(msg['data']) > 1:
  76. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  77. 'a',
  78. newline='',
  79. encoding='utf-8') as f:
  80. writer = csv.writer(f, delimiter=',')
  81. writer.writerow(msg['data'])
  82. if self.is_print:print(f'写入行情 {self.symbol}')
  83. def _update_depth(self, msg):
  84. self.public_update_time = time.time()
  85. msg = ujson.loads(msg)
  86. self.ticker_info["bp"] = float(msg['params'][1]['bids'][0][0])
  87. self.ticker_info["ap"] = float(msg['params'][1]['asks'][0][0])
  88. self.callback['onTicker'](self.ticker_info)
  89. ##### 标准化深度
  90. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  91. step = mp * utils.EFF_RANGE / utils.LEVEL
  92. bp = []
  93. ap = []
  94. bv = [0 for _ in range(utils.LEVEL)]
  95. av = [0 for _ in range(utils.LEVEL)]
  96. for i in range(utils.LEVEL):
  97. bp.append(self.ticker_info["bp"]-step*i)
  98. for i in range(utils.LEVEL):
  99. ap.append(self.ticker_info["ap"]+step*i)
  100. #
  101. price_thre = self.ticker_info["bp"] - step
  102. index = 0
  103. for bid in msg['params'][1]['bids']:
  104. price = float(bid[0])
  105. amount = float(bid[1])
  106. if price > price_thre:
  107. bv[index] += amount
  108. else:
  109. price_thre -= step
  110. index += 1
  111. if index == utils.LEVEL:
  112. break
  113. bv[index] += amount
  114. price_thre = self.ticker_info["ap"] + step
  115. index = 0
  116. for ask in msg['params'][1]['asks']:
  117. price = float(ask[0])
  118. amount = float(ask[1])
  119. if price < price_thre:
  120. av[index] += amount
  121. else:
  122. price_thre += step
  123. index += 1
  124. if index == utils.LEVEL:
  125. break
  126. av[index] += amount
  127. self.depth = bp + bv + ap + av
  128. self.callback['onDepth']({'name':self.name,'data':self.depth})
  129. def _update_trade(self, msg):
  130. self.public_update_time = time.time()
  131. msg = json.loads(msg)
  132. for i in msg['params'][1]:
  133. side = i["type"]
  134. price = float(i["price"])
  135. amount = float(i['amount'])
  136. if price > self.max_buy or self.max_buy == 0.0:
  137. self.max_buy = price
  138. if price < self.min_sell or self.min_sell == 0.0:
  139. self.min_sell = price
  140. if side == 'buy':
  141. self.buy_q += amount
  142. self.buy_v += amount*price
  143. elif side == 'sell':
  144. self.sell_q += amount
  145. self.sell_v += amount*price
  146. def _update_account(self, msg):
  147. msg = ujson.loads(msg)
  148. for i in msg['params'][0]:
  149. if self.quote == i:
  150. cash = float(msg['params'][0][self.quote]['available'])+float(msg['params'][0][self.quote]['frozen'])+\
  151. float(msg['params'][0][self.quote]['margin'])+float(msg['params'][0][self.quote]['profit_unreal'])
  152. self.callback['onEquity'] = {
  153. self.quote:cash
  154. }
  155. def _update_order(self, msg):
  156. self.logger.debug("ws推送订单"+msg)
  157. msg = ujson.loads(msg)
  158. event_type = msg['params'][0]
  159. event = msg['params'][1]
  160. if event_type == 1: # 新增订单
  161. order_event = dict()
  162. order_event['filled'] = 0.0
  163. order_event['filled_price'] = 0.0
  164. order_event['client_id'] = event["client_id"]
  165. order_event['order_id'] = event['order_id']
  166. order_event['status'] = "NEW"
  167. self.callback["onOrder"](order_event)
  168. elif event_type == 3: # 删除订单
  169. order_event = dict()
  170. order_event['filled'] = float(event["amount"]) - float(event["left"])
  171. order_event['filled_price'] = float(event["price"])
  172. order_event['fee'] = float(event["deal_fee"])
  173. order_event['client_id'] = event["client_id"]
  174. order_event['order_id'] = event['order_id']
  175. order_event['status'] = "REMOVE"
  176. self.callback["onOrder"](order_event)
  177. def _update_position(self, msg):
  178. long_pos, short_pos = 0, 0
  179. long_avg, short_avg = 0, 0
  180. msg = ujson.loads(msg)
  181. msg = msg['params'][1]
  182. if msg['market'] == self.symbol:
  183. side = msg['side']
  184. pos = float(msg['amount'])
  185. price = float(msg['open_price'])
  186. if side == 1:
  187. short_pos = pos
  188. short_avg = price
  189. elif side == 2:
  190. long_pos = pos
  191. long_avg = price
  192. pos = model.Position()
  193. pos.longPos = long_pos
  194. pos.longAvg = long_avg
  195. pos.shortPos = short_pos
  196. pos.shortAvg = short_avg
  197. self.callback['onPosition'](pos)
  198. def _get_data(self):
  199. market_data = self.depth + [self.max_buy, self.min_sell]
  200. self.max_buy = 0.0
  201. self.min_sell = 0.0
  202. self.buy_v = 0.0
  203. self.buy_q = 0.0
  204. self.sell_v = 0.0
  205. self.sell_q = 0.0
  206. return {'name': self.name,'data':market_data}
  207. async def go(self):
  208. interval = float(self.params.interval)
  209. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  210. ### onTrade
  211. while 1:
  212. try:
  213. # 更新市场信息
  214. market_data = self._get_data()
  215. self.callback['onMarket'](market_data)
  216. except:
  217. traceback.print_exc()
  218. await asyncio.sleep(interval)
  219. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  220. ping_time = time.time()
  221. while True:
  222. try:
  223. # 尝试连接
  224. print(f'{self.name} 尝试连接ws')
  225. # 登陆
  226. ws_url = self.URL
  227. async with aiohttp.ClientSession(
  228. connector = aiohttp.TCPConnector(
  229. limit=50,
  230. keepalive_timeout=120,
  231. verify_ssl=False,
  232. local_addr=(self.ip,0)
  233. )
  234. ).ws_connect(
  235. ws_url,
  236. proxy=self.proxy,
  237. timeout=30,
  238. receive_timeout=30,
  239. ) as _ws:
  240. print(f'{self.name} ws连接成功')
  241. self.logger.info(f'{self.name} ws连接成功')
  242. # 订阅 coinex 合约行情
  243. symbol = self.symbol.upper()
  244. # 鉴权
  245. if is_auth:
  246. current_time = int(time.time()*1000)
  247. sign_str = f"access_id={self.params.access_key}&timestamp={current_time}&secret_key={self.params.secret_key}"
  248. md5 = hashlib.sha256(sign_str.encode())
  249. param = {
  250. "id": 1,
  251. "method": "server.sign",
  252. "params": [self.params.access_key, md5.hexdigest().lower(), current_time]
  253. }
  254. await _ws.send_str(ujson.dumps(param))
  255. res = await _ws.receive(timeout=30)
  256. self.logger.info(res)
  257. # 订阅资产
  258. sub_str = ujson.dumps({"id": 1, "method": "asset.subscribe","params": [self.quote]})
  259. await _ws.send_str(sub_str)
  260. # 订阅订单
  261. sub_str = ujson.dumps({"id": 1, "method": "order.subscribe","params": [symbol]})
  262. await _ws.send_str(sub_str)
  263. # 订阅仓位
  264. sub_str = ujson.dumps({"id": 1, "method": "position.subscribe","params": [symbol]})
  265. await _ws.send_str(sub_str)
  266. if sub_trade:
  267. # 订阅公开成交
  268. sub_str = ujson.dumps({"id": 1, "method": "deals.subscribe","params": [symbol]})
  269. await _ws.send_str(sub_str)
  270. # 订阅深度
  271. sub_str = ujson.dumps({"id": 1, "method": "depth.subscribe","params": [symbol, 50, "0.000000001", False]})
  272. await _ws.send_str(sub_str)
  273. while True:
  274. # 停机信号
  275. if self.stop_flag:
  276. await _ws.close()
  277. return
  278. # 接受消息
  279. try:
  280. msg = await _ws.receive(timeout=30)
  281. except asyncio.CancelledError:
  282. print('ws取消')
  283. return
  284. except asyncio.TimeoutError:
  285. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  286. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  287. break
  288. except:
  289. print(f'{self.name} ws出现错误 准备重连...')
  290. self.logger.error(f'{self.name} ws出现错误 准备重连...')
  291. self.logger.error(traceback.format_exc())
  292. break
  293. msg = msg.data
  294. # print(msg)
  295. # 处理消息
  296. if 'depth.update' in msg:self._update_depth(msg)
  297. elif 'deals.update' in msg:self._update_trade(msg)
  298. elif 'asset.update' in msg:self._update_account(msg)
  299. elif 'order.update' in msg:self._update_order(msg)
  300. elif 'position.update' in msg:self._update_position(msg)
  301. else:
  302. print(msg)
  303. pass
  304. if ping_time - time.time() > 60:
  305. ping_time = time.time()
  306. sub_str = ujson.dumps({"id": 1, "method": "server.ping","params": []})
  307. await _ws.send_str(sub_str)
  308. except:
  309. _ws = None
  310. traceback.print_exc()
  311. print(f'{self.name} ws连接失败 开始重连...')
  312. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  313. # await asyncio.sleep(1)