kucoin_usdt_swap_ws.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import ujson
  5. import hashlib
  6. import hmac
  7. import base64
  8. import traceback
  9. import csv, sys
  10. from uuid import uuid4
  11. import logging, logging.handlers
  12. import utils
  13. import model
  14. from decimal import Decimal
  15. def empty_call(msg):
  16. # print(msg)
  17. pass
  18. class KucoinUsdtSwapWs:
  19. def __init__(self, params: model.ClientParams, colo=0, is_print=0):
  20. if colo:
  21. print('不支持colo高速线路')
  22. self.BaseURL = "https://api-futures.kucoin.com"
  23. else:
  24. self.BaseURL = "https://api-futures.kucoin.com"
  25. self.params = params
  26. self.name = self.params.name
  27. self.base = self.params.pair.split('_')[0].upper()
  28. self.quote = self.params.pair.split('_')[1].upper()
  29. self.symbol = self.base + self.quote + "M"
  30. # 处理特殊情况
  31. if self.symbol == 'BTCUSDTM':
  32. self.symbol = 'XBTUSDTM'
  33. self.callback = {
  34. "onMarket":self.save_market,
  35. "onPosition":empty_call,
  36. "onEquity":empty_call,
  37. "onOrder":empty_call,
  38. "onTicker":empty_call,
  39. "onDepth":empty_call,
  40. "onTrade":empty_call,
  41. "onExit":empty_call,
  42. }
  43. self.is_print = is_print
  44. self.proxy = None
  45. if 'win' in sys.platform:
  46. self.proxy = self.params.proxy
  47. self.logger = self.get_logger()
  48. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  49. self.multiplier = None
  50. self.max_buy = 0.0
  51. self.min_sell = 0.0
  52. self.buy_v = 0.0
  53. self.buy_q = 0.0
  54. self.sell_v = 0.0
  55. self.sell_q = 0.0
  56. self.update_t = 0.0
  57. self.depth = []
  58. #### 指定发包ip
  59. iplist = utils.get_local_ip_list()
  60. self.ip = iplist[int(self.params.ip)]
  61. def get_logger(self):
  62. logger = logging.getLogger(__name__)
  63. logger.setLevel(logging.DEBUG)
  64. # log to txt
  65. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  66. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  67. handler.setLevel(logging.DEBUG)
  68. handler.setFormatter(formatter)
  69. logger.addHandler(handler)
  70. return logger
  71. def save_market(self, msg):
  72. date = time.strftime('%Y-%m-%d',time.localtime())
  73. interval = float(self.params.interval)
  74. if msg:
  75. exchange = msg['name']
  76. if len(msg['data']) > 1:
  77. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  78. 'a',
  79. newline='',
  80. encoding='utf-8') as f:
  81. writer = csv.writer(f, delimiter=',')
  82. writer.writerow(msg['data'])
  83. if self.is_print:print(f'写入行情 {self.symbol}')
  84. async def get_sign(self):
  85. headers = {}
  86. headers['Content-Type'] = 'application/json'
  87. headers['X-MBX-APIKEY'] = self.params.access_key
  88. params = {
  89. 'timestamp':int(time.time())*1000,
  90. 'recvWindow':5000,
  91. }
  92. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  93. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  94. params['signature']=signature
  95. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  96. session = aiohttp.ClientSession()
  97. response = await session.post(
  98. url,
  99. params=params,
  100. headers=headers,
  101. timeout=5,
  102. proxy=self.proxy
  103. )
  104. login_str = await response.text()
  105. await session.close()
  106. return ujson.loads(login_str)['listenKey']
  107. def _update_depth(self, msg):
  108. if msg['data']['sequence'] > self.update_t:
  109. self.update_t = msg['data']['sequence']
  110. self.ticker_info['bp'] = float(msg['data']['bids'][0][0])
  111. self.ticker_info['ap'] = float(msg['data']['asks'][0][0])
  112. self.ticker_info['time'] = msg['data']['timestamp']
  113. self.callback['onTicker'](self.ticker_info)
  114. ##### 标准化深度
  115. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  116. step = mp * utils.EFF_RANGE / utils.LEVEL
  117. bp = []
  118. ap = []
  119. bv = [0 for _ in range(utils.LEVEL)]
  120. av = [0 for _ in range(utils.LEVEL)]
  121. for i in range(utils.LEVEL):
  122. bp.append(self.ticker_info["bp"]-step*i)
  123. for i in range(utils.LEVEL):
  124. ap.append(self.ticker_info["ap"]+step*i)
  125. #
  126. price_thre = self.ticker_info["bp"] - step
  127. index = 0
  128. for bid in msg['data']['bids']:
  129. price = float(bid[0])
  130. amount = float(bid[1])
  131. if price > price_thre:
  132. bv[index] += amount
  133. else:
  134. price_thre -= step
  135. index += 1
  136. if index == utils.LEVEL:
  137. break
  138. bv[index] += amount
  139. price_thre = self.ticker_info["ap"] + step
  140. index = 0
  141. for ask in msg['data']['asks']:
  142. price = float(ask[0])
  143. amount = float(ask[1])
  144. if price < price_thre:
  145. av[index] += amount
  146. else:
  147. price_thre += step
  148. index += 1
  149. if index == utils.LEVEL:
  150. break
  151. av[index] += amount
  152. self.depth = bp + bv + ap + av
  153. self.callback['onDepth']({'name':self.name,'data':self.depth})
  154. def _update_ticker(self, msg):
  155. if msg['data']['sequence'] > self.update_t:
  156. self.update_t = msg['data']['sequence']
  157. self.ticker_info['bp'] = float(msg['data']['bestBidPrice'])
  158. self.ticker_info['ap'] = float(msg['data']['bestAskPrice'])
  159. self.ticker_info['time'] = msg['data']['ts']
  160. self.callback['onTicker'](self.ticker_info)
  161. bp = float(msg['data']['bestBidPrice'])
  162. bv = float(msg['data']['bestBidSize'])
  163. ap = float(msg['data']['bestAskPrice'])
  164. av = float(msg['data']['bestAskSize'])
  165. self.depth = [bp, bv, ap, av]
  166. self.callback['onDepth']({'name':self.name,'data':self.depth})
  167. def _update_trade(self, msg):
  168. price = float(msg["data"]['price'])
  169. side = msg["data"]['side']
  170. amount = float(msg["data"]['size'])*self.multiplier
  171. if price > self.max_buy or self.max_buy == 0.0:
  172. self.max_buy = price
  173. if price < self.min_sell or self.min_sell == 0.0:
  174. self.min_sell = price
  175. if side == 'buy':
  176. self.buy_q += amount
  177. self.buy_v += amount*price
  178. elif side == 'sell':
  179. self.sell_q += amount
  180. self.sell_v += amount*price
  181. self.callback['onTrade']({'timestamp': msg["data"]['ts'], 'price': price, 'amount': amount, 'side': side})
  182. def _update_position(self, msg):
  183. pos = model.Position()
  184. if "currentQty" in msg['data']:
  185. amt = float(msg["data"]["currentQty"]) * self.multiplier
  186. ep = float(msg["data"]["avgEntryPrice"])
  187. if amt == 0:
  188. self.callback["onPosition"](pos)
  189. elif amt > 0:
  190. pos.longPos = amt
  191. pos.longAvg = ep
  192. self.callback["onPosition"](pos)
  193. elif amt < 0:
  194. pos.shortPos = -amt
  195. pos.shortAvg = ep
  196. self.callback["onPosition"](pos)
  197. def _update_account(self, msg):
  198. pass
  199. # if msg['data']['currency'] == 'USDT' and msg['subject'] == "availableBalance.change":
  200. # cash = float(msg['data']['availableBalance']) + float(msg['data']['holdBalance'])
  201. # self.callback['onEquity'] = {self.quote:cash}
  202. def _update_order(self, msg):
  203. self.logger.debug(f"ws订单推送 {msg}")
  204. if '/contractMarket/tradeOrders' in msg['topic']:
  205. if msg["data"]["symbol"] == self.symbol:
  206. if msg["data"]["status"] == 'open': # 新增订单
  207. order_event = dict()
  208. order_event['status'] = "NEW"
  209. order_event['filled'] = 0
  210. order_event['filled_price'] = 0
  211. order_event['client_id'] = msg["data"]["clientOid"] if "clientOid" in msg["data"] else ""
  212. order_event['order_id'] = msg["data"]['orderId']
  213. self.callback["onOrder"](order_event)
  214. elif msg["data"]["type"] in ['filled','canceled']: # 删除订单
  215. order_event = dict()
  216. order_event['status'] = "REMOVE"
  217. order_event['client_id'] = msg["data"]["clientOid"] if "clientOid" in msg["data"] else ""
  218. order_event['order_id'] = msg["data"]['orderId']
  219. order_event['filled'] = float(Decimal(msg["data"]["filledSize"])*Decimal(str(self.multiplier)))
  220. if 'price' in msg["data"]:
  221. if msg['data']['price'] != '':
  222. order_event['filled_price'] = float(msg["data"]["price"])
  223. else:
  224. order_event['filled_price'] = 0
  225. else:
  226. order_event['filled_price'] = 0
  227. self.callback["onOrder"](order_event)
  228. def _get_data(self):
  229. market_data = self.depth + [self.max_buy, self.min_sell]
  230. self.max_buy = 0.0
  231. self.min_sell = 0.0
  232. self.buy_v = 0.0
  233. self.buy_q = 0.0
  234. self.sell_v = 0.0
  235. self.sell_q = 0.0
  236. return {'name': self.name,'data':market_data}
  237. async def go(self):
  238. interval = float(self.params.interval)
  239. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  240. ### onTrade
  241. while 1:
  242. try:
  243. # 更新市场信息
  244. market_data = self._get_data()
  245. self.callback['onMarket']({'name': self.name,'data':market_data})
  246. except:
  247. traceback.print_exc()
  248. await asyncio.sleep(interval)
  249. async def get_token(self, is_auth):
  250. # 获取 合约系数
  251. session = aiohttp.ClientSession()
  252. response = await session.get(
  253. "https://api-futures.kucoin.com/api/v1/contracts/active",
  254. proxy=self.proxy
  255. )
  256. res = await response.json()
  257. for i in res['data']:
  258. if i['symbol'] == self.symbol:
  259. self.multiplier = float(i["multiplier"])
  260. print(f"合约乘数为 {self.multiplier}")
  261. self.logger.debug(f"合约乘数为 {self.multiplier}")
  262. await session.close()
  263. # 获取 token
  264. if is_auth:
  265. uri = "/api/v1/bullet-private"
  266. else:
  267. uri = "/api/v1/bullet-public"
  268. headers = {}
  269. if is_auth:
  270. now_time = int(time.time()) * 1000
  271. str_to_sign = str(now_time) + "POST" + uri
  272. sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest())
  273. passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest())
  274. headers = {
  275. "KC-API-SIGN": sign.decode(),
  276. "KC-API-TIMESTAMP": str(now_time),
  277. "KC-API-KEY": self.params.access_key,
  278. "KC-API-PASSPHRASE": passphrase.decode(),
  279. "Content-Type": "application/json",
  280. "KC-API-KEY-VERSION": "2"
  281. }
  282. headers["User-Agent"] = "kucoin-python-sdk/v1.0"
  283. session = aiohttp.ClientSession()
  284. response = await session.post(
  285. self.BaseURL+uri,
  286. timeout=5,
  287. headers=headers,
  288. proxy=self.proxy
  289. )
  290. res = await response.text()
  291. res = ujson.loads(res)
  292. await session.close()
  293. if res["code"] == "200000":
  294. token = res["data"]["token"]
  295. ws_connect_id = str(uuid4()).replace('-', '')
  296. endpoint = res["data"]['instanceServers'][0]['endpoint']
  297. ws_endpoint = f"{endpoint}?token={token}&connectId={ws_connect_id}"
  298. encrypt = res["data"]['instanceServers'][0]['encrypt']
  299. if is_auth:
  300. ws_endpoint += '&acceptUserMessage=true'
  301. return ws_endpoint, encrypt
  302. else:
  303. raise Exception("kucoin usdt swap 获取token错误")
  304. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  305. while True:
  306. try:
  307. ping_time = time.time()
  308. # 尝试连接
  309. print(f'{self.name} 尝试连接ws')
  310. # 获取token
  311. ws_endpoint, encrypt = await self.get_token(is_auth)
  312. # 登陆
  313. ws_url = ws_endpoint
  314. async with aiohttp.ClientSession(
  315. connector = aiohttp.TCPConnector(
  316. limit=50,
  317. keepalive_timeout=120,
  318. verify_ssl=False,
  319. local_addr=(self.ip,0)
  320. )
  321. ).ws_connect(
  322. ws_url,
  323. proxy=self.proxy,
  324. timeout=30,
  325. receive_timeout=30,
  326. ) as _ws:
  327. print(f'{self.name} ws连接成功')
  328. self.logger.info(f'{self.name} ws连接成功')
  329. # 订阅
  330. channels=[
  331. # f"/contractMarket/tickerV2:{self.symbol}",
  332. f"/contractMarket/level2Depth50:{self.symbol}",
  333. f"/contractMarket/execution:{self.symbol}"
  334. ]
  335. if sub_trade:
  336. channels += [f"/contractMarket/execution:{self.symbol}"]
  337. if is_auth:
  338. channels += [
  339. f"/contractAccount/wallet",
  340. f"/contract/position:{self.symbol}",
  341. f"/contractMarket/tradeOrders:{self.symbol}",
  342. ]
  343. for i in channels:
  344. sub_str = ujson.dumps({"topic": i, "type": "subscribe"})
  345. if "/contractMarket/level2Depth50" not in i \
  346. and "/contractMarket/execution" not in i \
  347. and "/contractMarket/tickerV2" not in i:
  348. # print(i)
  349. sub_str = ujson.dumps({"topic": i, "type": "subscribe", "privateChannel": True, "response": True})
  350. await _ws.send_str(sub_str)
  351. while True:
  352. # 接受消息
  353. try:
  354. msg = await _ws.receive(timeout=30)
  355. except:
  356. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  357. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  358. break
  359. msg = ujson.loads(msg.data)
  360. # print(msg)
  361. # 处理消息
  362. if 'data' in msg:
  363. # if 'level2' not in msg['subject']:print(msg)
  364. if 'level2' in msg['subject']:self._update_depth(msg)
  365. elif "tickerV2" in msg["subject"]:self._update_ticker(msg)
  366. elif 'match' in msg['subject']:self._update_trade(msg)
  367. elif 'orderMargin.change' in msg['subject']:self._update_account(msg)
  368. elif 'symbolOrderChange' in msg['subject']:self._update_order(msg)
  369. elif 'position.change' in msg['subject']:self._update_position(msg)
  370. # heartbeat
  371. if time.time() - ping_time > 30:
  372. msg = {
  373. 'id': str(int(time.time() * 1000)),
  374. 'type': 'ping'
  375. }
  376. await _ws.send_str(ujson.dumps(msg))
  377. ping_time = time.time()
  378. except:
  379. traceback.print_exc()
  380. print(f'{self.name} ws连接失败 开始重连...')
  381. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  382. # await asyncio.sleep(1)