gate_usdt_swap_ws.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. from re import sub
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json, ujson
  7. import zlib
  8. import hmac, sys
  9. import base64, csv, random
  10. import traceback, hashlib
  11. import logging, logging.handlers
  12. import utils
  13. import model
  14. from decimal import Decimal
  15. def inflate(data):
  16. '''
  17. 解压缩数据
  18. '''
  19. decompress = zlib.decompressobj(-zlib.MAX_WBITS)
  20. inflated = decompress.decompress(data)
  21. inflated += decompress.flush()
  22. return inflated
  23. def empty_call(msg):
  24. pass
  25. class GateUsdtSwapWs:
  26. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  27. if colo:
  28. print('使用colo高速线路')
  29. self.URL = 'wss://fxws-private.gateapi.io/v4/ws/usdt'
  30. else:
  31. self.URL = 'wss://fx-ws.gateio.ws/v4/ws/usdt'
  32. self.params = params
  33. self.name = self.params.name
  34. self.base = self.params.pair.split('_')[0].upper()
  35. self.quote = self.params.pair.split('_')[1].upper()
  36. self.symbol = self.base + '_' + self.quote
  37. self.callback = {
  38. "onMarket":self.save_market,
  39. "onPosition":empty_call,
  40. "onTicker":empty_call,
  41. "onDepth":empty_call,
  42. "onEquity":empty_call,
  43. "onOrder":empty_call,
  44. "onTrade":empty_call,
  45. "onExit":empty_call,
  46. }
  47. self.is_print = is_print
  48. self.proxy = None
  49. if 'win' in sys.platform:
  50. self.proxy = self.params.proxy
  51. self.logger = self.get_logger()
  52. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  53. self.stop_flag = 0
  54. self.update_t = 0
  55. self.max_buy = 0.0
  56. self.min_sell = 0.0
  57. self.buy_v = 0.0
  58. self.buy_q = 0.0
  59. self.sell_v = 0.0
  60. self.sell_q = 0.0
  61. # 过期检查
  62. self.public_update_time = time.time()
  63. self.private_update_time = time.time()
  64. self.expired_time = 300
  65. self.multiplier = None
  66. self.depth = []
  67. #### 指定发包ip
  68. iplist = utils.get_local_ip_list()
  69. self.ip = iplist[int(self.params.ip)]
  70. def get_logger(self):
  71. logger = logging.getLogger(__name__)
  72. logger.setLevel(logging.DEBUG)
  73. # log to txt
  74. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  75. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  76. handler.setLevel(logging.DEBUG)
  77. handler.setFormatter(formatter)
  78. logger.addHandler(handler)
  79. return logger
  80. def gen_signed(self, channel, event, timestamp):
  81. # 为消息签名
  82. api_key = self.params.access_key
  83. api_secret = self.params.secret_key
  84. s = 'channel=%s&event=%s&time=%d' % (channel, event, timestamp)
  85. sign = hmac.new(api_secret.encode('utf-8'), s.encode('utf-8'), hashlib.sha512).hexdigest()
  86. return {'method': 'api_key', 'KEY': api_key, 'SIGN': sign}
  87. def save_market(self, msg):
  88. date = time.strftime('%Y-%m-%d',time.localtime())
  89. interval = self.params.interval
  90. if msg:
  91. exchange = msg['name']
  92. if len(msg['data']) > 1:
  93. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  94. 'a',
  95. newline='',
  96. encoding='utf-8') as f:
  97. writer = csv.writer(f, delimiter=',')
  98. writer.writerow(msg['data'])
  99. if self.is_print:print(f'写入行情 {self.symbol}')
  100. def _update_depth(self, msg):
  101. self.public_update_time = time.time()
  102. if msg['t'] > self.update_t:
  103. self.update_t = msg['t']
  104. self.ticker_info["bp"] = float(msg['bids'][0]['p'])
  105. self.ticker_info["ap"] = float(msg['asks'][0]['p'])
  106. self.callback['onTicker'](self.ticker_info)
  107. ##### 标准化深度
  108. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  109. step = mp * utils.EFF_RANGE / utils.LEVEL
  110. bp = []
  111. ap = []
  112. bv = [0 for _ in range(utils.LEVEL)]
  113. av = [0 for _ in range(utils.LEVEL)]
  114. for i in range(utils.LEVEL):
  115. bp.append(self.ticker_info["bp"]-step*i)
  116. for i in range(utils.LEVEL):
  117. ap.append(self.ticker_info["ap"]+step*i)
  118. #
  119. price_thre = self.ticker_info["bp"] - step
  120. index = 0
  121. for bid in msg['bids']:
  122. price = float(bid['p'])
  123. amount = float(bid['s'])
  124. if price > price_thre:
  125. bv[index] += amount
  126. else:
  127. price_thre -= step
  128. index += 1
  129. if index == utils.LEVEL:
  130. break
  131. bv[index] += amount
  132. price_thre = self.ticker_info["ap"] + step
  133. index = 0
  134. for ask in msg['asks']:
  135. price = float(ask['p'])
  136. amount = float(ask['s'])
  137. if price < price_thre:
  138. av[index] += amount
  139. else:
  140. price_thre += step
  141. index += 1
  142. if index == utils.LEVEL:
  143. break
  144. av[index] += amount
  145. self.depth = bp + bv + ap + av
  146. self.callback['onDepth']({'name':self.name,'data':self.depth})
  147. else:
  148. self.logger.error(f"收到过时的depth推送 {self.update_t}")
  149. def _update_trade(self, msg):
  150. self.public_update_time = time.time()
  151. for i in msg:
  152. amount = float(i['size'])*self.multiplier
  153. price = float(i['price'])
  154. side = "buy" if amount > 0.0 else "sell"
  155. if price > self.max_buy or self.max_buy == 0.0:
  156. self.max_buy = price
  157. if price < self.min_sell or self.min_sell == 0.0:
  158. self.min_sell = price
  159. if side == 'buy':
  160. self.buy_q += amount
  161. self.buy_v += amount*price
  162. elif side == 'sell':
  163. self.sell_q += amount
  164. self.sell_v += amount*price
  165. def _update_account(self, msg):
  166. self.private_update_time = time.time()
  167. for i in msg:
  168. if self.symbol in i['text']:
  169. cash = float(i['balance'])
  170. self.callback['onEquity'] = {
  171. self.quote:cash
  172. }
  173. self.logger.debug(f"ws cash {cash}")
  174. def _update_order(self, msg):
  175. self.private_update_time = time.time()
  176. self.logger.debug(f"ws订单推送 {msg}")
  177. for i in msg:
  178. if i['status'] in ['open']:
  179. order_event = dict()
  180. order_event['filled'] = 0
  181. order_event['filled_price'] = 0
  182. order_event['client_id'] = i["text"]
  183. order_event['order_id'] = i['id']
  184. order_event['status'] = "NEW"
  185. self.callback['onOrder'](order_event)
  186. elif i['status'] in ['finished']:
  187. order_event = dict()
  188. filled_paper = Decimal(abs(float(i["size"]))) - Decimal(abs(float(i["left"])))
  189. filled_amount = filled_paper*Decimal(str(self.multiplier))
  190. order_event['filled'] = float(filled_amount)
  191. order_event['filled_price'] = float(i["fill_price"])
  192. order_event['client_id'] = i["text"]
  193. order_event['order_id'] = i['id']
  194. order_event['fee'] = 0.0
  195. order_event['status'] = "REMOVE"
  196. self.callback['onOrder'](order_event)
  197. # 根据成交信息更新仓位信息 因为账户信息推送有延迟
  198. # 但订单信息和账户信息到达先后时间可能有前有后 可能平仓 账户先置零仓位 然后sell成交达到 导致仓位变成负数
  199. def _update_usertrade(self, msg):
  200. '''暂时不用'''
  201. return
  202. def _update_position(self, msg):
  203. self.private_update_time = time.time()
  204. long_pos, short_pos = 0, 0
  205. long_avg, short_avg = 0, 0
  206. for i in msg:
  207. if i['contract'] == self.symbol:
  208. size = float(i['size'])*self.multiplier
  209. if size > 0:
  210. long_pos = abs(size)
  211. long_avg = float(i['entry_price'])
  212. if size < 0:
  213. short_pos = abs(size)
  214. short_avg = float(i['entry_price'])
  215. pos = model.Position()
  216. pos.longPos = long_pos
  217. pos.longAvg = long_avg
  218. pos.shortPos = short_pos
  219. pos.shortAvg = short_avg
  220. self.callback['onPosition'](pos)
  221. def _get_data(self):
  222. market_data = self.depth + [self.max_buy, self.min_sell]
  223. self.max_buy = 0.0
  224. self.min_sell = 0.0
  225. self.buy_v = 0.0
  226. self.buy_q = 0.0
  227. self.sell_v = 0.0
  228. self.sell_q = 0.0
  229. return {'name': self.name,'data':market_data}
  230. async def go(self):
  231. interval = float(self.params.interval)
  232. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  233. ### onTrade
  234. while 1:
  235. try:
  236. # 更新市场信息
  237. market_data = self._get_data()
  238. self.callback['onMarket'](market_data)
  239. except:
  240. traceback.print_exc()
  241. await asyncio.sleep(interval)
  242. def get_sign(self, message):
  243. h = hmac.new(self.params.secret_key.encode("utf8"), message.encode("utf8"), hashlib.sha512)
  244. return h.hexdigest()
  245. def _get_uid(self):
  246. pass
  247. def generate_signature(self, method, uri, query_param=None, body=None):
  248. t = time.time()
  249. m = hashlib.sha512()
  250. m.update((body or "").encode('utf-8'))
  251. hashed_payload = m.hexdigest()
  252. s = '%s\n%s\n%s\n%s\n%s' % (method, uri, query_param or "", hashed_payload, t)
  253. sign = hmac.new(self.params.secret_key.encode('utf-8'), s.encode('utf-8'), hashlib.sha512).hexdigest()
  254. return {'KEY': self.params.access_key, 'Timestamp': str(t), 'SIGN': sign}
  255. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  256. while True:
  257. try:
  258. # 重置更新时间
  259. self.public_update_time = time.time()
  260. self.private_update_time = time.time()
  261. ping_time = time.time()
  262. # 获取uid
  263. headers = {
  264. "Accept": "application/json",
  265. "Content-type": "application/json"
  266. }
  267. if is_auth:
  268. user_id = ""
  269. uri = "/api/v4/wallet/fee"
  270. query_param = ''
  271. sign_headers = self.generate_signature('GET', uri, query_param)
  272. headers.update(sign_headers)
  273. async with aiohttp.ClientSession(connector = aiohttp.TCPConnector(
  274. limit=50,
  275. keepalive_timeout=120,
  276. verify_ssl=False,
  277. local_addr=(self.ip,0)
  278. )) as session:
  279. response = await session.get(
  280. "https://api.gateio.ws" + uri,
  281. headers=headers,
  282. proxy=self.proxy
  283. )
  284. res = await response.json()
  285. user_id = str(res['user_id'])
  286. print(f"uid {user_id}")
  287. # 获取合约乘数
  288. async with aiohttp.ClientSession(connector = aiohttp.TCPConnector(
  289. limit=50,
  290. keepalive_timeout=120,
  291. verify_ssl=False,
  292. local_addr=(self.ip,0)
  293. )) as session:
  294. uri = "/api/v4/futures/usdt/contracts"
  295. response = await session.get(
  296. "https://api.gateio.ws" + uri,
  297. headers=headers,
  298. proxy=self.proxy
  299. )
  300. res = await response.json()
  301. if res:
  302. for i in res:
  303. if self.symbol == i['name']:
  304. self.multiplier = float(i['quanto_multiplier'])
  305. print(f"contract multiplier {self.multiplier}")
  306. # 尝试连接
  307. print(f'{self.name} 尝试连接ws')
  308. ws_url = self.URL
  309. async with aiohttp.ClientSession(
  310. connector = aiohttp.TCPConnector(
  311. limit=50,
  312. keepalive_timeout=120,
  313. verify_ssl=False,
  314. local_addr=(self.ip,0)
  315. )
  316. ).ws_connect(
  317. ws_url,
  318. proxy=self.proxy,
  319. timeout=30,
  320. receive_timeout=30,
  321. ) as _ws:
  322. print(f'{self.name} ws连接成功')
  323. # 登陆
  324. if is_auth:
  325. # userorders
  326. current_time = int(time.time())
  327. channel = "futures.orders"
  328. sub_str = {
  329. "time": current_time,
  330. "channel": channel,
  331. "event": "subscribe",
  332. "payload": [user_id,self.symbol]
  333. }
  334. sub_str["auth"] = self.gen_signed(sub_str['channel'], sub_str['event'], sub_str['time'])
  335. await _ws.send_str(ujson.dumps(sub_str))
  336. # positions
  337. current_time = int(time.time())
  338. channel = "futures.positions"
  339. sub_str = {
  340. "time": current_time,
  341. "channel": channel,
  342. "event": "subscribe",
  343. "payload": [user_id,self.symbol]
  344. }
  345. sub_str["auth"] = self.gen_signed(sub_str['channel'], sub_str['event'], sub_str['time'])
  346. await _ws.send_str(ujson.dumps(sub_str))
  347. # usertrades
  348. # current_time = int(time.time())
  349. # channel = "futures.usertrades"
  350. # sub_str = {
  351. # "time": current_time,
  352. # "channel": channel,
  353. # "event": "subscribe",
  354. # "payload": [self.symbol]
  355. # }
  356. # message = 'channel=%s&event=%s&time=%d' % (channel, "subscribe", current_time)
  357. # sub_str["auth"] = {
  358. # "method": "api_key",
  359. # "KEY": self.params.access_key,
  360. # "SIGN": self.get_sign(message)}
  361. # await _ws.send_str(ujson.dumps(sub_str))
  362. # balance
  363. current_time = int(time.time())
  364. channel = "futures.balances"
  365. sub_str = {
  366. "time": current_time,
  367. "channel": channel,
  368. "event": "subscribe",
  369. "payload": [user_id]
  370. }
  371. sub_str["auth"] = self.gen_signed(sub_str['channel'], sub_str['event'], sub_str['time'])
  372. await _ws.send_str(ujson.dumps(sub_str))
  373. if sub_trade:
  374. # public trade
  375. current_time = int(time.time())
  376. channel = "futures.trades"
  377. sub_str = {
  378. "time": current_time,
  379. "channel": channel,
  380. "event": "subscribe",
  381. "payload": [self.symbol]
  382. }
  383. await _ws.send_str(ujson.dumps(sub_str))
  384. # 订阅
  385. # tickers 速度慢
  386. # current_time = int(time.time())
  387. # channel = "futures.tickers"
  388. # sub_str = {
  389. # "time": current_time,
  390. # "channel": channel,
  391. # "event": "subscribe",
  392. # "payload": [self.symbol]
  393. # }
  394. # await _ws.send_str(ujson.dumps(sub_str))
  395. # depth
  396. current_time = int(time.time())
  397. channel = "futures.order_book"
  398. sub_str = {
  399. "time": current_time,
  400. "channel": channel,
  401. "event": "subscribe",
  402. "payload": [self.symbol,"20","0"]
  403. }
  404. await _ws.send_str(ujson.dumps(sub_str))
  405. while True:
  406. # 停机信号
  407. if self.stop_flag:
  408. await _ws.close()
  409. return
  410. # 接受消息
  411. try:
  412. msg = await _ws.receive(timeout=10)
  413. except:
  414. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  415. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  416. break
  417. msg = ujson.loads(msg.data)
  418. # 处理消息
  419. if msg['event'] in ['update', 'all']:
  420. if msg['channel'] == 'futures.order_book':self._update_depth(msg['result'])
  421. elif msg['channel'] == 'futures.balances':self._update_account(msg['result'])
  422. elif msg['channel'] == 'futures.orders':self._update_order(msg['result'])
  423. # elif msg['channel'] == 'futures.usertrades':self._update_usertrade(msg['result'])
  424. elif msg['channel'] == 'futures.positions':self._update_position(msg['result'])
  425. elif msg['channel'] == 'futures.trades':self._update_trade(msg['result'])
  426. else:
  427. pass
  428. # pong
  429. if time.time() - ping_time > 5:
  430. await _ws.send_str('{"time": %d, "channel" : "futures.ping"}' % int(time.time()))
  431. ping_time = time.time()
  432. if is_auth:
  433. if time.time() - self.private_update_time > self.expired_time*5:
  434. raise Exception('长期未更新私有信息重连')
  435. if time.time() - self.public_update_time > self.expired_time:
  436. raise Exception('长期未更新公有信息重连')
  437. except:
  438. traceback.print_exc()
  439. print(f'{self.name} ws连接失败 开始重连...')
  440. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  441. self.logger.error(traceback.format_exc())
  442. await asyncio.sleep(1)