kucoin_spot_ws.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json, ujson
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random
  12. import gzip, csv, sys
  13. from uuid import uuid4
  14. import logging, logging.handlers
  15. import utils
  16. import model
  17. def empty_call(msg):
  18. pass
  19. class KucoinSpotWs:
  20. def __init__(self, params: model.ClientParams, colo=0, is_print=0):
  21. if colo:
  22. print('不支持colo高速线路')
  23. self.BaseURL = "https://api.kucoin.com"
  24. else:
  25. self.BaseURL = "https://api.kucoin.com"
  26. self.params = params
  27. self.name = self.params.name
  28. self.base = self.params.pair.split('_')[0].upper()
  29. self.quote = self.params.pair.split('_')[1].upper()
  30. self.symbol = self.base + '-' + self.quote
  31. self.callback = {
  32. "onMarket":self.save_market,
  33. "onPosition":empty_call,
  34. "onEquity":empty_call,
  35. "onOrder":empty_call,
  36. "onTicker":empty_call,
  37. "onDepth":empty_call,
  38. "onExit":empty_call,
  39. }
  40. self.is_print = is_print
  41. self.proxy = None
  42. if 'win' in sys.platform:
  43. self.proxy = self.params.proxy
  44. self.logger = self.get_logger()
  45. self.local_orders = dict()
  46. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  47. self.stop_flag = 0
  48. self.max_buy = 0.0
  49. self.min_sell = 0.0
  50. self.buy_v = 0.0
  51. self.buy_q = 0.0
  52. self.sell_v = 0.0
  53. self.sell_q = 0.0
  54. self.update_t = 0.0
  55. self.depth = []
  56. #### 指定发包ip
  57. iplist = utils.get_local_ip_list()
  58. self.ip = iplist[int(self.params.ip)]
  59. def get_logger(self):
  60. logger = logging.getLogger(__name__)
  61. logger.setLevel(logging.DEBUG)
  62. # log to txt
  63. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  64. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  65. handler.setLevel(logging.DEBUG)
  66. handler.setFormatter(formatter)
  67. logger.addHandler(handler)
  68. return logger
  69. def save_market(self, msg):
  70. date = time.strftime('%Y-%m-%d',time.localtime())
  71. interval = float(self.params.interval)
  72. if msg:
  73. exchange = msg['name']
  74. if len(msg['data']) > 1:
  75. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  76. 'a',
  77. newline='',
  78. encoding='utf-8') as f:
  79. writer = csv.writer(f, delimiter=',')
  80. writer.writerow(msg['data'])
  81. if self.is_print:print(f'写入行情 {self.symbol}')
  82. async def get_sign(self):
  83. headers = {}
  84. headers['Content-Type'] = 'application/json'
  85. headers['X-MBX-APIKEY'] = self.params.access_key
  86. params = {
  87. 'timestamp':int(time.time())*1000,
  88. 'recvWindow':5000,
  89. }
  90. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  91. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  92. params['signature']=signature
  93. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  94. session = aiohttp.ClientSession()
  95. response = await session.post(
  96. url,
  97. params=params,
  98. headers=headers,
  99. timeout=5,
  100. proxy=self.proxy
  101. )
  102. login_str = await response.text()
  103. await session.close()
  104. return ujson.loads(login_str)['listenKey']
  105. def _update_depth(self, msg):
  106. if msg['data']['timestamp'] > self.update_t:
  107. self.update_t = msg['data']['timestamp']
  108. self.ticker_info["bp"] = float(msg['data']['bids'][0][0])
  109. self.ticker_info["ap"] = float(msg['data']['asks'][0][0])
  110. self.callback['onTicker'](self.ticker_info)
  111. ##### 标准化深度
  112. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  113. step = mp * utils.EFF_RANGE / utils.LEVEL
  114. bp = []
  115. ap = []
  116. bv = [0 for _ in range(utils.LEVEL)]
  117. av = [0 for _ in range(utils.LEVEL)]
  118. for i in range(utils.LEVEL):
  119. bp.append(self.ticker_info["bp"]-step*i)
  120. for i in range(utils.LEVEL):
  121. ap.append(self.ticker_info["ap"]+step*i)
  122. #
  123. price_thre = self.ticker_info["bp"] - step
  124. index = 0
  125. for bid in msg['data']['bids']:
  126. price = float(bid[0])
  127. amount = float(bid[1])
  128. if price > price_thre:
  129. bv[index] += amount
  130. else:
  131. price_thre -= step
  132. index += 1
  133. if index == utils.LEVEL:
  134. break
  135. bv[index] += amount
  136. price_thre = self.ticker_info["ap"] + step
  137. index = 0
  138. for ask in msg['data']['asks']:
  139. price = float(ask[0])
  140. amount = float(ask[1])
  141. if price < price_thre:
  142. av[index] += amount
  143. else:
  144. price_thre += step
  145. index += 1
  146. if index == utils.LEVEL:
  147. break
  148. av[index] += amount
  149. self.depth = bp + bv + ap + av
  150. self.callback['onDepth']({'name':self.name,'data':self.depth})
  151. def _update_trade(self, msg):
  152. price = float(msg["data"]['price'])
  153. side = msg["data"]['side']
  154. amount = float(msg["data"]['size'])
  155. if price > self.max_buy or self.max_buy == 0.0:
  156. self.max_buy = price
  157. if price < self.min_sell or self.min_sell == 0.0:
  158. self.min_sell = price
  159. if side == 'buy':
  160. self.buy_q += amount
  161. self.buy_v += amount*price
  162. elif side == 'sell':
  163. self.sell_q += amount
  164. self.sell_v += amount*price
  165. def _update_account(self, msg):
  166. if 'trade' in msg['data']["relationEvent"]:
  167. if msg['data']["currency"].upper() == self.base:
  168. coin = abs(float(msg['data']["total"]))
  169. self.callback['onEquity']({
  170. self.base:coin
  171. })
  172. if msg['data']["currency"].upper() == self.quote:
  173. self.callback['onEquity']({
  174. self.quote:float(msg['data']["total"])
  175. })
  176. def _update_order(self, msg):
  177. self.logger.debug(f"ws订单推送 {msg}")
  178. if msg['topic'] == '/spotMarket/tradeOrders-batch':
  179. for i in msg['data']:
  180. if i["symbol"] == self.symbol:
  181. if i["status"] == 'open': # 新增订单
  182. order_event = dict()
  183. order_event['filled'] = 0
  184. order_event['filled_price'] = 0
  185. order_event['client_id'] = i["clientOid"]
  186. order_event['order_id'] = i['orderId']
  187. order_event['status'] = "NEW"
  188. self.callback["onOrder"](order_event)
  189. self.local_orders[i["clientOid"]] = order_event
  190. elif i["status"] == 'done': # 删除订单
  191. if "price" in i:
  192. price = float(i["price"])
  193. else:
  194. if i["clientOid"] in self.local_orders:
  195. price = self.local_orders[i["clientOid"]]["price"]
  196. else:
  197. # 应该是非本策略订单 忽略price
  198. price = 0
  199. order_event = dict()
  200. order_event['amount'] = float(i["size"])
  201. order_event['filled'] = float(i["filledSize"])
  202. order_event['filled_price'] = price
  203. order_event['client_id'] = i["clientOid"]
  204. order_event['order_id'] = i['orderId']
  205. order_event['status'] = "REMOVE"
  206. order_event['fee'] = float(i["fee"]) if "fee" in i["data"] else 0.0
  207. self.callback["onOrder"](order_event)
  208. if i["clientOid"] in self.local_orders:
  209. del(self.local_orders[i["clientOid"]])
  210. else:
  211. if msg["data"]["symbol"] == self.symbol:
  212. if msg["data"]["status"] == 'open': # 新增订单
  213. order_event = dict()
  214. order_event['filled'] = 0
  215. order_event['filled_price'] = 0
  216. order_event['client_id'] = msg["data"]["clientOid"]
  217. order_event['order_id'] = msg["data"]['orderId']
  218. order_event['status'] = "NEW"
  219. self.callback["onOrder"](order_event)
  220. self.local_orders[msg["data"]["clientOid"]] = order_event
  221. elif msg["data"]["status"] == 'done': # 删除订单
  222. if "price" in msg['data']:
  223. price = float(msg["data"]["price"])
  224. else:
  225. if msg["data"]["clientOid"] in self.local_orders:
  226. price = self.local_orders[msg["data"]["clientOid"]]["price"]
  227. else:
  228. # 应该是非本策略订单 忽略price
  229. price = 0
  230. order_event = dict()
  231. order_event['filled'] = float(msg["data"]["filledSize"])
  232. order_event['filled_price'] = price
  233. order_event['client_id'] = msg["data"]["clientOid"]
  234. order_event['order_id'] = msg["data"]['orderId']
  235. order_event['fee'] = float(msg["data"]["fee"]) if "fee" in msg["data"] else 0.0
  236. order_event['status'] = "REMOVE"
  237. self.callback["onOrder"](order_event)
  238. if msg["data"]["clientOid"] in self.local_orders:
  239. del(self.local_orders[msg["data"]["clientOid"]])
  240. def _get_data(self):
  241. market_data = self.depth + [self.max_buy, self.min_sell]
  242. self.max_buy = 0.0
  243. self.min_sell = 0.0
  244. self.buy_v = 0.0
  245. self.buy_q = 0.0
  246. self.sell_v = 0.0
  247. self.sell_q = 0.0
  248. return {'name': self.name,'data':market_data}
  249. async def go(self):
  250. interval = float(self.params.interval)
  251. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  252. ### onTrade
  253. while 1:
  254. try:
  255. # 更新市场信息
  256. market_data = self._get_data()
  257. self.callback['onMarket'](market_data)
  258. except:
  259. traceback.print_exc()
  260. await asyncio.sleep(interval)
  261. async def get_token(self, is_auth):
  262. if is_auth:
  263. uri = "/api/v1/bullet-private"
  264. else:
  265. uri = "/api/v1/bullet-public"
  266. headers = {}
  267. if is_auth:
  268. now_time = int(time.time()) * 1000
  269. str_to_sign = str(now_time) + "POST" + uri
  270. sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest())
  271. passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest())
  272. headers = {
  273. "KC-API-SIGN": sign.decode(),
  274. "KC-API-TIMESTAMP": str(now_time),
  275. "KC-API-KEY": self.params.access_key,
  276. "KC-API-PASSPHRASE": passphrase.decode(),
  277. "Content-Type": "application/json",
  278. "KC-API-KEY-VERSION": "2"
  279. }
  280. headers["User-Agent"] = "kucoin-python-sdk/v1.0"
  281. session = aiohttp.ClientSession()
  282. response = await session.post(
  283. self.BaseURL+uri,
  284. timeout=5,
  285. headers=headers,
  286. proxy=self.proxy
  287. )
  288. res = await response.text()
  289. res = ujson.loads(res)
  290. await session.close()
  291. if res["code"] == "200000":
  292. token = res["data"]["token"]
  293. ws_connect_id = str(uuid4()).replace('-', '')
  294. endpoint = res["data"]['instanceServers'][0]['endpoint']
  295. ws_endpoint = f"{endpoint}?token={token}&connectId={ws_connect_id}"
  296. encrypt = res["data"]['instanceServers'][0]['encrypt']
  297. if is_auth:
  298. ws_endpoint += '&acceptUserMessage=true'
  299. return ws_endpoint, encrypt
  300. else:
  301. raise Exception("kucoin spot 获取token错误")
  302. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  303. while True:
  304. try:
  305. ping_time = time.time()
  306. # 尝试连接
  307. print(f'{self.name} 尝试连接ws')
  308. # 获取token
  309. ws_endpoint, encrypt = await self.get_token(is_auth)
  310. # 登陆
  311. ws_url = ws_endpoint
  312. async with aiohttp.ClientSession(
  313. connector = aiohttp.TCPConnector(
  314. limit=50,
  315. keepalive_timeout=120,
  316. verify_ssl=False,
  317. local_addr=(self.ip,0)
  318. )
  319. ).ws_connect(
  320. ws_url,
  321. proxy=self.proxy,
  322. timeout=30,
  323. receive_timeout=30,
  324. ) as _ws:
  325. print(f'{self.name} ws连接成功')
  326. self.logger.info(f'{self.name} ws连接成功')
  327. # 订阅 ticker来的很慢
  328. channels=[
  329. # f"/market/ticker:{self.symbol}",
  330. f"/spotMarket/level2Depth50:{self.symbol}",
  331. ]
  332. if sub_trade:
  333. channels.append(f"/market/match:{self.symbol}")
  334. if is_auth:
  335. channels.append(f"/spotMarket/tradeOrders")
  336. channels.append(f"/account/balance")
  337. for i in channels:
  338. sub_str = ujson.dumps({"topic": i, "type":"subscribe"})
  339. await _ws.send_str(sub_str)
  340. while True:
  341. # 停机信号
  342. if self.stop_flag:
  343. await _ws.close()
  344. return
  345. # 接受消息
  346. try:
  347. msg = await _ws.receive(timeout=30)
  348. except:
  349. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  350. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  351. break
  352. msg = ujson.loads(msg.data)
  353. # self.logger.debug(msg)
  354. # print(msg)
  355. # 处理消息
  356. if 'data' in msg:
  357. if 'level2' in msg['subject']:self._update_depth(msg)
  358. elif 'trade.l3match' in msg['subject']:self._update_trade(msg)
  359. elif 'account.balance' in msg['subject']:self._update_account(msg)
  360. elif 'orderChange' in msg['subject']:self._update_order(msg)
  361. # heartbeat
  362. if time.time() - ping_time > 30:
  363. msg = {
  364. 'id': str(int(time.time() * 1000)),
  365. 'type': 'ping'
  366. }
  367. await _ws.send_str(ujson.dumps(msg))
  368. ping_time = time.time()
  369. except Exception as e:
  370. traceback.print_exc()
  371. print(f'{self.name} ws连接失败 开始重连...')
  372. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  373. self.logger.error(traceback.format_exc())
  374. # await asyncio.sleep(1)