kucoin_usdt_swap_ws.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json, ujson
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random
  12. import gzip, csv, sys
  13. from uuid import uuid4
  14. import logging, logging.handlers
  15. import utils
  16. import model
  17. from decimal import Decimal
  18. from loguru import logger
  19. def empty_call(msg):
  20. # print(msg)
  21. pass
  22. class KucoinUsdtSwapWs:
  23. def __init__(self, params: model.ClientParams, colo=0, is_print=0):
  24. if colo:
  25. print('不支持colo高速线路')
  26. self.BaseURL = "https://api-futures.kucoin.com"
  27. else:
  28. self.BaseURL = "https://api-futures.kucoin.com"
  29. self.params = params
  30. self.name = self.params.name
  31. self.base = self.params.pair.split('_')[0].upper()
  32. self.quote = self.params.pair.split('_')[1].upper()
  33. self.symbol = self.base + self.quote + "M"
  34. # 处理特殊情况
  35. if self.symbol == 'BTCUSDTM':
  36. self.symbol = 'XBTUSDTM'
  37. self.callback = {
  38. "onMarket":self.save_market,
  39. "onPosition":empty_call,
  40. "onEquity":empty_call,
  41. "onOrder":empty_call,
  42. "onTicker":empty_call,
  43. "onDepth":empty_call,
  44. "onExit":empty_call,
  45. }
  46. self.is_print = is_print
  47. self.proxy = None
  48. if 'win' in sys.platform:
  49. self.proxy = self.params.proxy
  50. self.logger = self.get_logger()
  51. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  52. self.multiplier = None
  53. self.max_buy = 0.0
  54. self.min_sell = 0.0
  55. self.buy_v = 0.0
  56. self.buy_q = 0.0
  57. self.sell_v = 0.0
  58. self.sell_q = 0.0
  59. self.update_t = 0.0
  60. self.depth = []
  61. #### 指定发包ip
  62. iplist = utils.get_local_ip_list()
  63. self.ip = iplist[int(self.params.ip)]
  64. def get_logger(self):
  65. logger = logging.getLogger(__name__)
  66. logger.setLevel(logging.DEBUG)
  67. # log to txt
  68. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  69. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  70. handler.setLevel(logging.DEBUG)
  71. handler.setFormatter(formatter)
  72. logger.addHandler(handler)
  73. return logger
  74. def save_market(self, msg):
  75. date = time.strftime('%Y-%m-%d',time.localtime())
  76. interval = float(self.params.interval)
  77. if msg:
  78. exchange = msg['name']
  79. if len(msg['data']) > 1:
  80. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  81. 'a',
  82. newline='',
  83. encoding='utf-8') as f:
  84. writer = csv.writer(f, delimiter=',')
  85. writer.writerow(msg['data'])
  86. if self.is_print:print(f'写入行情 {self.symbol}')
  87. async def get_sign(self):
  88. headers = {}
  89. headers['Content-Type'] = 'application/json'
  90. headers['X-MBX-APIKEY'] = self.params.access_key
  91. params = {
  92. 'timestamp':int(time.time())*1000,
  93. 'recvWindow':5000,
  94. }
  95. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  96. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  97. params['signature']=signature
  98. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  99. session = aiohttp.ClientSession()
  100. response = await session.post(
  101. url,
  102. params=params,
  103. headers=headers,
  104. timeout=5,
  105. proxy=self.proxy
  106. )
  107. login_str = await response.text()
  108. await session.close()
  109. return ujson.loads(login_str)['listenKey']
  110. def _update_depth(self, msg):
  111. if msg['data']['sequence'] > self.update_t:
  112. self.update_t = msg['data']['sequence']
  113. self.ticker_info['bp'] = float(msg['data']['bids'][0][0])
  114. self.ticker_info['ap'] = float(msg['data']['asks'][0][0])
  115. self.callback['onTicker'](self.ticker_info)
  116. ##### 标准化深度
  117. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  118. step = mp * utils.EFF_RANGE / utils.LEVEL
  119. bp = []
  120. ap = []
  121. bv = [0 for _ in range(utils.LEVEL)]
  122. av = [0 for _ in range(utils.LEVEL)]
  123. for i in range(utils.LEVEL):
  124. bp.append(self.ticker_info["bp"]-step*i)
  125. for i in range(utils.LEVEL):
  126. ap.append(self.ticker_info["ap"]+step*i)
  127. #
  128. price_thre = self.ticker_info["bp"] - step
  129. index = 0
  130. for bid in msg['data']['bids']:
  131. price = float(bid[0])
  132. amount = float(bid[1])
  133. if price > price_thre:
  134. bv[index] += amount
  135. else:
  136. price_thre -= step
  137. index += 1
  138. if index == utils.LEVEL:
  139. break
  140. bv[index] += amount
  141. price_thre = self.ticker_info["ap"] + step
  142. index = 0
  143. for ask in msg['data']['asks']:
  144. price = float(ask[0])
  145. amount = float(ask[1])
  146. if price < price_thre:
  147. av[index] += amount
  148. else:
  149. price_thre += step
  150. index += 1
  151. if index == utils.LEVEL:
  152. break
  153. av[index] += amount
  154. self.depth = bp + bv + ap + av
  155. self.callback['onDepth']({'name':self.name,'data':self.depth})
  156. def _update_ticker(self, msg):
  157. if msg['data']['sequence'] > self.update_t:
  158. self.update_t = msg['data']['sequence']
  159. self.ticker_info['bp'] = float(msg['data']['bestBidPrice'])
  160. self.ticker_info['ap'] = float(msg['data']['bestAskPrice'])
  161. self.callback['onTicker'](self.ticker_info)
  162. def _update_trade(self, msg):
  163. price = float(msg["data"]['price'])
  164. side = msg["data"]['side']
  165. amount = float(msg["data"]['size'])*self.multiplier
  166. if price > self.max_buy or self.max_buy == 0.0:
  167. self.max_buy = price
  168. if price < self.min_sell or self.min_sell == 0.0:
  169. self.min_sell = price
  170. if side == 'buy':
  171. self.buy_q += amount
  172. self.buy_v += amount*price
  173. elif side == 'sell':
  174. self.sell_q += amount
  175. self.sell_v += amount*price
  176. def _update_position(self, msg):
  177. pos = model.Position()
  178. if "currentQty" in msg['data']:
  179. amt = float(msg["data"]["currentQty"]) * self.multiplier
  180. ep = float(msg["data"]["avgEntryPrice"])
  181. if amt == 0:
  182. self.callback["onPosition"](pos)
  183. elif amt > 0:
  184. pos.longPos = amt
  185. pos.longAvg = ep
  186. self.callback["onPosition"](pos)
  187. elif amt < 0:
  188. pos.shortPos = -amt
  189. pos.shortAvg = ep
  190. self.callback["onPosition"](pos)
  191. def _update_account(self, msg):
  192. pass
  193. # if msg['data']['currency'] == 'USDT' and msg['subject'] == "availableBalance.change":
  194. # cash = float(msg['data']['availableBalance']) + float(msg['data']['holdBalance'])
  195. # self.callback['onEquity'] = {self.quote:cash}
  196. def _update_order(self, msg):
  197. self.logger.debug(f"ws订单推送 {msg}")
  198. if '/contractMarket/tradeOrders' in msg['topic']:
  199. if msg["data"]["symbol"] == self.symbol:
  200. if msg["data"]["status"] == 'open': # 新增订单
  201. order_event = dict()
  202. order_event['status'] = "NEW"
  203. order_event['filled'] = 0
  204. order_event['filled_price'] = 0
  205. order_event['client_id'] = msg["data"]["clientOid"] if "clientOid" in msg["data"] else ""
  206. order_event['order_id'] = msg["data"]['orderId']
  207. self.callback["onOrder"](order_event)
  208. elif msg["data"]["type"] in ['filled','canceled']: # 删除订单
  209. order_event = dict()
  210. order_event['status'] = "REMOVE"
  211. order_event['client_id'] = msg["data"]["clientOid"] if "clientOid" in msg["data"] else ""
  212. order_event['order_id'] = msg["data"]['orderId']
  213. order_event['filled'] = float(Decimal(msg["data"]["filledSize"])*Decimal(str(self.multiplier)))
  214. if 'price' in msg["data"]:
  215. if msg['data']['price'] != '':
  216. order_event['filled_price'] = float(msg["data"]["price"])
  217. else:
  218. order_event['filled_price'] = 0
  219. else:
  220. order_event['filled_price'] = 0
  221. self.callback["onOrder"](order_event)
  222. def _get_data(self):
  223. market_data = self.depth + [self.max_buy, self.min_sell]
  224. self.max_buy = 0.0
  225. self.min_sell = 0.0
  226. self.buy_v = 0.0
  227. self.buy_q = 0.0
  228. self.sell_v = 0.0
  229. self.sell_q = 0.0
  230. return {'name': self.name,'data':market_data}
  231. async def go(self):
  232. interval = float(self.params.interval)
  233. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  234. ### onTrade
  235. while 1:
  236. try:
  237. # 更新市场信息
  238. market_data = self._get_data()
  239. self.callback['onMarket']({'name': self.name,'data':market_data})
  240. except:
  241. traceback.print_exc()
  242. await asyncio.sleep(interval)
  243. async def get_token(self, is_auth):
  244. # 获取 合约系数
  245. session = aiohttp.ClientSession()
  246. response = await session.get(
  247. "https://api-futures.kucoin.com/api/v1/contracts/active",
  248. proxy=self.proxy
  249. )
  250. res = await response.json()
  251. for i in res['data']:
  252. if i['symbol'] == self.symbol:
  253. self.multiplier = float(i["multiplier"])
  254. print(f"合约乘数为 {self.multiplier}")
  255. self.logger.debug(f"合约乘数为 {self.multiplier}")
  256. await session.close()
  257. # 获取 token
  258. if is_auth:
  259. uri = "/api/v1/bullet-private"
  260. else:
  261. uri = "/api/v1/bullet-public"
  262. headers = {}
  263. if is_auth:
  264. now_time = int(time.time()) * 1000
  265. str_to_sign = str(now_time) + "POST" + uri
  266. sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest())
  267. passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest())
  268. headers = {
  269. "KC-API-SIGN": sign.decode(),
  270. "KC-API-TIMESTAMP": str(now_time),
  271. "KC-API-KEY": self.params.access_key,
  272. "KC-API-PASSPHRASE": passphrase.decode(),
  273. "Content-Type": "application/json",
  274. "KC-API-KEY-VERSION": "2"
  275. }
  276. headers["User-Agent"] = "kucoin-python-sdk/v1.0"
  277. session = aiohttp.ClientSession()
  278. response = await session.post(
  279. self.BaseURL+uri,
  280. timeout=5,
  281. headers=headers,
  282. proxy=self.proxy
  283. )
  284. res = await response.text()
  285. res = ujson.loads(res)
  286. await session.close()
  287. if res["code"] == "200000":
  288. token = res["data"]["token"]
  289. ws_connect_id = str(uuid4()).replace('-', '')
  290. endpoint = res["data"]['instanceServers'][0]['endpoint']
  291. ws_endpoint = f"{endpoint}?token={token}&connectId={ws_connect_id}"
  292. encrypt = res["data"]['instanceServers'][0]['encrypt']
  293. if is_auth:
  294. ws_endpoint += '&acceptUserMessage=true'
  295. return ws_endpoint, encrypt
  296. else:
  297. raise Exception("kucoin usdt swap 获取token错误")
  298. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  299. while True:
  300. try:
  301. ping_time = time.time()
  302. # 尝试连接
  303. print(f'{self.name} 尝试连接ws')
  304. # 获取token
  305. ws_endpoint, encrypt = await self.get_token(is_auth)
  306. # 登陆
  307. ws_url = ws_endpoint
  308. async with aiohttp.ClientSession(
  309. connector = aiohttp.TCPConnector(
  310. limit=50,
  311. keepalive_timeout=120,
  312. verify_ssl=False,
  313. local_addr=(self.ip,0)
  314. )
  315. ).ws_connect(
  316. ws_url,
  317. proxy=self.proxy,
  318. timeout=30,
  319. receive_timeout=30,
  320. ) as _ws:
  321. print(f'{self.name} ws连接成功')
  322. self.logger.info(f'{self.name} ws连接成功')
  323. # 订阅
  324. channels=[
  325. # f"/contractMarket/tickerV2:{self.symbol}",
  326. f"/contractMarket/level2Depth50:{self.symbol}",
  327. ]
  328. if sub_trade:
  329. channels += [f"/contractMarket/execution:{self.symbol}"]
  330. if is_auth:
  331. channels += [
  332. f"/contractAccount/wallet",
  333. f"/contract/position:{self.symbol}",
  334. f"/contractMarket/tradeOrders:{self.symbol}",
  335. ]
  336. for i in channels:
  337. sub_str = ujson.dumps({"topic": i, "type": "subscribe"})
  338. if "/contractMarket/level2Depth50" not in i and "/contractMarket/execution" not in i:
  339. # print(i)
  340. sub_str = ujson.dumps({"topic": i, "type": "subscribe", "privateChannel": True, "response": True})
  341. await _ws.send_str(sub_str)
  342. while True:
  343. # 接受消息
  344. try:
  345. msg = await _ws.receive(timeout=30)
  346. except:
  347. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  348. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  349. break
  350. msg = ujson.loads(msg.data)
  351. # print(msg)
  352. # 处理消息
  353. if 'data' in msg:
  354. # if 'level2' not in msg['subject']:print(msg)
  355. if 'level2' in msg['subject']:self._update_depth(msg)
  356. elif "tickerV2" in msg["subject"]:self._update_ticker(msg)
  357. elif 'match' in msg['subject']:self._update_trade(msg)
  358. elif 'orderMargin.change' in msg['subject']:self._update_account(msg)
  359. elif 'symbolOrderChange' in msg['subject']:self._update_order(msg)
  360. elif 'position.change' in msg['subject']:self._update_position(msg)
  361. # heartbeat
  362. if time.time() - ping_time > 30:
  363. msg = {
  364. 'id': str(int(time.time() * 1000)),
  365. 'type': 'ping'
  366. }
  367. await _ws.send_str(ujson.dumps(msg))
  368. ping_time = time.time()
  369. except:
  370. traceback.print_exc()
  371. print(f'{self.name} ws连接失败 开始重连...')
  372. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  373. # await asyncio.sleep(1)