bybit_usdt_swap_ws.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. from os import access
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json, ujson
  7. import zlib
  8. import hashlib
  9. import hmac
  10. import base64
  11. import traceback
  12. import random
  13. import gzip, csv, sys
  14. from uuid import uuid4
  15. import logging, logging.handlers
  16. from yarl import URL
  17. import utils
  18. import model
  19. from decimal import Decimal
  20. def empty_call(msg):
  21. # print(msg)
  22. pass
  23. class BybitUsdtSwapWs:
  24. def __init__(self, params: model.ClientParams, colo=0, is_print=0):
  25. if colo:
  26. print('不支持colo高速线路')
  27. self.BaseURL_public = "wss://stream.bybit.com/realtime_public"
  28. self.BaseURL_private = "wss://stream.bybit.com/realtime_private"
  29. else:
  30. self.BaseURL_public = "wss://stream.bybit.com/realtime_public"
  31. self.BaseURL_private = "wss://stream.bybit.com/realtime_private"
  32. self.params = params
  33. self.name = self.params.name
  34. self.base = self.params.pair.split('_')[0].upper()
  35. self.quote = self.params.pair.split('_')[1].upper()
  36. self.symbol = self.base + self.quote
  37. self.callback = {
  38. "onMarket":self.save_market,
  39. "onPosition":empty_call,
  40. "onEquity":empty_call,
  41. "onOrder":empty_call,
  42. "onTicker":empty_call,
  43. "onDepth":empty_call,
  44. "onExit":empty_call,
  45. }
  46. self.is_print = is_print
  47. self.proxy = None
  48. if 'win' in sys.platform:
  49. self.proxy = self.params.proxy
  50. self.logger = self.get_logger()
  51. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  52. self.multiplier = None
  53. self.max_buy = 0.0
  54. self.min_sell = 0.0
  55. self.buy_v = 0.0
  56. self.buy_q = 0.0
  57. self.sell_v = 0.0
  58. self.sell_q = 0.0
  59. self.update_t = 0.0
  60. self.depth = []
  61. self.orderbook = dict()
  62. self.orderbook['bid'] = dict()
  63. self.orderbook['ask'] = dict()
  64. self.last_on_depth_time = time.time()
  65. self.sub_fast = 0
  66. #### 指定发包ip
  67. iplist = utils.get_local_ip_list()
  68. self.ip = iplist[int(self.params.ip)]
  69. def get_logger(self):
  70. logger = logging.getLogger(__name__)
  71. logger.setLevel(logging.DEBUG)
  72. # log to txt
  73. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  74. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  75. handler.setLevel(logging.DEBUG)
  76. handler.setFormatter(formatter)
  77. logger.addHandler(handler)
  78. return logger
  79. def save_market(self, msg):
  80. date = time.strftime('%Y-%m-%d',time.localtime())
  81. interval = float(self.params.interval)
  82. if msg:
  83. exchange = msg['name']
  84. if len(msg['data']) > 1:
  85. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  86. 'a',
  87. newline='',
  88. encoding='utf-8') as f:
  89. writer = csv.writer(f, delimiter=',')
  90. writer.writerow(msg['data'])
  91. if self.is_print:print(f'写入行情 {self.symbol}')
  92. async def get_sign(self):
  93. headers = {}
  94. headers['Content-Type'] = 'application/json'
  95. headers['X-MBX-APIKEY'] = self.params.access_key
  96. params = {
  97. 'timestamp':int(time.time())*1000,
  98. 'recvWindow':5000,
  99. }
  100. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  101. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  102. params['signature']=signature
  103. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  104. session = aiohttp.ClientSession()
  105. response = await session.post(
  106. url,
  107. params=params,
  108. headers=headers,
  109. timeout=5,
  110. proxy=self.proxy
  111. )
  112. login_str = await response.text()
  113. await session.close()
  114. return ujson.loads(login_str)['listenKey']
  115. def _update_depth(self, msg):
  116. t = int(msg['timestamp_e6'])
  117. if t > self.update_t:
  118. self.update_t = t
  119. ###### 维护orderbook
  120. if msg['type'] == 'snapshot':
  121. self.orderbook = dict()
  122. self.orderbook['bid'] = dict()
  123. self.orderbook['ask'] = dict()
  124. for i in msg['data']['order_book']:
  125. if i['side'] == 'Buy':
  126. self.orderbook['bid'][float(i['price'])] = float(i['size'])
  127. elif i['side'] == 'Sell':
  128. self.orderbook['ask'][float(i['price'])] = float(i['size'])
  129. else:
  130. print('错误类型')
  131. elif msg['type'] == 'delta':
  132. for _type in msg['data']:
  133. for i in msg['data'][_type]:
  134. if _type == 'delete':
  135. if i['side'] == 'Buy':
  136. if float(i['price']) in self.orderbook['bid']:
  137. del(self.orderbook['bid'][float(i['price'])])
  138. elif i['side'] == 'Sell':
  139. if float(i['price']) in self.orderbook['ask']:
  140. del(self.orderbook['ask'][float(i['price'])])
  141. else:
  142. print('错误类型')
  143. elif _type == 'update':
  144. if i['side'] == 'Buy':
  145. if float(i['price']) in self.orderbook['bid']:
  146. self.orderbook['bid'][float(i['price'])] = float(i['size'])
  147. elif i['side'] == 'Sell':
  148. if float(i['price']) in self.orderbook['ask']:
  149. self.orderbook['ask'][float(i['price'])] = float(i['size'])
  150. else:
  151. print('错误类型')
  152. elif _type == 'insert':
  153. if i['side'] == 'Buy':
  154. self.orderbook['bid'][float(i['price'])] = float(i['size'])
  155. elif i['side'] == 'Sell':
  156. self.orderbook['ask'][float(i['price'])] = float(i['size'])
  157. else:
  158. print('错误类型')
  159. else:
  160. print('错误类型')
  161. else:
  162. print('未知depth类型')
  163. ###### 限制回调频率
  164. now_time = time.time()
  165. if now_time - self.last_on_depth_time >= 0.2 or self.sub_fast:
  166. self.last_on_depth_time = time.time()
  167. ######
  168. self.ticker_info['bp'] = max(self.orderbook['bid'].keys())
  169. self.ticker_info['ap'] = min(self.orderbook['ask'].keys())
  170. ######
  171. if self.ticker_info['bp'] > self.ticker_info['ap']:
  172. raise Exception("增量深度出现错误")
  173. ######
  174. self.callback['onTicker'](self.ticker_info)
  175. ##### 标准化深度
  176. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  177. step = mp * utils.EFF_RANGE / utils.LEVEL
  178. bp = []
  179. ap = []
  180. bv = [0 for _ in range(utils.LEVEL)]
  181. av = [0 for _ in range(utils.LEVEL)]
  182. for i in range(utils.LEVEL):
  183. bp.append(self.ticker_info["bp"]-step*i)
  184. for i in range(utils.LEVEL):
  185. ap.append(self.ticker_info["ap"]+step*i)
  186. #
  187. price_thre = self.ticker_info["bp"] - step
  188. index = 0
  189. for bid_price in self.orderbook['bid'].keys():
  190. price = bid_price
  191. amount = self.orderbook['bid'][bid_price]
  192. if price > price_thre:
  193. bv[index] += amount
  194. else:
  195. price_thre -= step
  196. index += 1
  197. if index == utils.LEVEL:
  198. break
  199. bv[index] += amount
  200. price_thre = self.ticker_info["ap"] + step
  201. index = 0
  202. for ask_price in self.orderbook['ask'].keys():
  203. price = ask_price
  204. amount = self.orderbook['ask'][ask_price]
  205. if price < price_thre:
  206. av[index] += amount
  207. else:
  208. price_thre += step
  209. index += 1
  210. if index == utils.LEVEL:
  211. break
  212. av[index] += amount
  213. self.depth = bp + bv + ap + av
  214. self.callback['onDepth']({'name':self.name,'data':self.depth})
  215. # print('更新深度', time.time(),self.depth)
  216. # def _update_ticker(self, msg):
  217. # if msg['data']['sequence'] > self.update_t:
  218. # self.update_t = msg['data']['sequence']
  219. # self.ticker_info['bp'] = float(msg['data']['bestBidPrice'])
  220. # self.ticker_info['ap'] = float(msg['data']['bestAskPrice'])
  221. # self.callback['onTicker'](self.ticker_info)
  222. def _update_trade(self, msg):
  223. for i in msg['data']:
  224. price = float(i['price'])
  225. side = i['side'].lower()
  226. amount = float(i['size'])
  227. if price > self.max_buy or self.max_buy == 0.0:
  228. self.max_buy = price
  229. if price < self.min_sell or self.min_sell == 0.0:
  230. self.min_sell = price
  231. if side == 'buy':
  232. self.buy_q += amount
  233. self.buy_v += amount*price
  234. elif side == 'sell':
  235. self.sell_q += amount
  236. self.sell_v += amount*price
  237. def _update_position(self, msg):
  238. pos = model.Position()
  239. for i in msg['data']:
  240. symbol = i['symbol']
  241. if symbol == self.symbol:
  242. amt = float(i["size"])
  243. side = i['side'].lower()
  244. ep = float(i["entry_price"])
  245. if side == 'buy':
  246. pos.longPos = amt
  247. pos.longAvg = ep
  248. elif side == 'sell':
  249. pos.shortPos = amt
  250. pos.shortAvg = ep
  251. else:
  252. pass
  253. self.callback["onPosition"](pos)
  254. def _update_account(self, msg):
  255. for i in msg['data']:
  256. self.callback['onEquity'] = {self.quote:float(i['wallet_balance'])}
  257. def _update_order(self, msg):
  258. self.logger.debug(f"ws订单推送 {msg}")
  259. # print(msg)
  260. for i in msg['data']:
  261. if self.symbol == i['symbol']:
  262. if i["order_status"] == 'New': # 新增订单
  263. order_event = dict()
  264. order_event['status'] = "NEW"
  265. order_event['filled'] = 0
  266. order_event['filled_price'] = 0
  267. order_event['client_id'] = i["order_link_id"] if "order_link_id" in i else ""
  268. order_event['order_id'] = i['order_id']
  269. order_event['fee'] = 0.0
  270. self.callback["onOrder"](order_event)
  271. # print('新建',order_event['client_id'])
  272. elif i["order_status"] in ['Filled','Cancelled']: # 删除订单
  273. # fee 负数是扣手续费 bitget没有返佣
  274. order_event = dict()
  275. order_event['status'] = "REMOVE"
  276. order_event['client_id'] = i["order_link_id"] if "order_link_id" in i else ""
  277. order_event['order_id'] = i['order_id']
  278. order_event['filled'] = float(i["cum_exec_qty"])
  279. order_event['filled_price'] = float(i["last_exec_price"]) if 'last_exec_price' in i else float(i['price'])
  280. order_event['fee'] = float(i['cum_exec_fee'])
  281. self.callback["onOrder"](order_event)
  282. # print('移除',order_event['client_id'])
  283. def _get_data(self):
  284. market_data = self.depth + [self.max_buy, self.min_sell]
  285. self.max_buy = 0.0
  286. self.min_sell = 0.0
  287. self.buy_v = 0.0
  288. self.buy_q = 0.0
  289. self.sell_v = 0.0
  290. self.sell_q = 0.0
  291. return {'name': self.name,'data':market_data}
  292. async def go(self):
  293. interval = float(self.params.interval)
  294. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  295. ### onTrade
  296. while 1:
  297. try:
  298. # 更新市场信息
  299. market_data = self._get_data()
  300. self.callback['onMarket']({'name': self.name,'data':market_data})
  301. except:
  302. traceback.print_exc()
  303. await asyncio.sleep(interval)
  304. async def get_token(self, is_auth):
  305. # 获取 token
  306. if is_auth:
  307. uri = "/api/v1/bullet-private"
  308. else:
  309. uri = "/api/v1/bullet-public"
  310. headers = {}
  311. if is_auth:
  312. now_time = int(time.time()) * 1000
  313. str_to_sign = str(now_time) + "POST" + uri
  314. sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest())
  315. passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest())
  316. headers = {
  317. "KC-API-SIGN": sign.decode(),
  318. "KC-API-TIMESTAMP": str(now_time),
  319. "KC-API-KEY": self.params.access_key,
  320. "KC-API-PASSPHRASE": passphrase.decode(),
  321. "Content-Type": "application/json",
  322. "KC-API-KEY-VERSION": "2"
  323. }
  324. headers["User-Agent"] = "kucoin-python-sdk/v1.0"
  325. session = aiohttp.ClientSession()
  326. response = await session.post(
  327. self.BaseURL+uri,
  328. timeout=5,
  329. headers=headers,
  330. proxy=self.proxy
  331. )
  332. res = await response.text()
  333. res = ujson.loads(res)
  334. await session.close()
  335. if res["code"] == "200000":
  336. token = res["data"]["token"]
  337. ws_connect_id = str(uuid4()).replace('-', '')
  338. endpoint = res["data"]['instanceServers'][0]['endpoint']
  339. ws_endpoint = f"{endpoint}?token={token}&connectId={ws_connect_id}"
  340. encrypt = res["data"]['instanceServers'][0]['encrypt']
  341. if is_auth:
  342. ws_endpoint += '&acceptUserMessage=true'
  343. return ws_endpoint, encrypt
  344. else:
  345. raise Exception("kucoin usdt swap 获取token错误")
  346. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  347. ''''''
  348. asyncio.create_task(self.run_public(sub_trade, sub_fast))
  349. if is_auth:
  350. asyncio.create_task(self.run_private())
  351. while True:
  352. await asyncio.sleep(5)
  353. async def run_private(self):
  354. '''
  355. 订阅private频道
  356. '''
  357. while True:
  358. try:
  359. ping_time = time.time()
  360. # 尝试连接
  361. print(f'{self.name} 尝试连接ws')
  362. # 登陆
  363. ws_url = self.BaseURL_private
  364. async with aiohttp.ClientSession(
  365. connector = aiohttp.TCPConnector(
  366. limit=50,
  367. keepalive_timeout=120,
  368. verify_ssl=False,
  369. local_addr=(self.ip,0)
  370. )
  371. ).ws_connect(
  372. ws_url,
  373. proxy=self.proxy,
  374. timeout=30,
  375. receive_timeout=30,
  376. ) as _ws:
  377. print(f'{self.name} ws private 连接成功')
  378. self.logger.info(f'{self.name} ws private 连接成功')
  379. # 先鉴权
  380. # Generate expires.
  381. expires = int((time.time() + 10000) * 1000)
  382. # Generate signature.
  383. signature = str(hmac.new(
  384. bytes(self.params.secret_key, "utf-8"),
  385. bytes(f"GET/realtime{expires}", "utf-8"), digestmod="sha256"
  386. ).hexdigest())
  387. await _ws.send_str(ujson.dumps({
  388. "op":"auth",
  389. "args":[
  390. self.params.access_key, expires, signature
  391. ]
  392. }))
  393. # 订阅
  394. channels = [
  395. "position",
  396. "wallet",
  397. "order",
  398. ]
  399. for i in channels:
  400. sub_str = ujson.dumps({"args": [i], "op":"subscribe"})
  401. await _ws.send_str(sub_str)
  402. while True:
  403. # 接受消息
  404. try:
  405. msg = await _ws.receive(timeout=300)
  406. except:
  407. print(f'{self.name} ws长时间没有收到消息 private 准备重连...')
  408. self.logger.error(f'{self.name} ws长时间没有收到消息 private 准备重连...')
  409. break
  410. # self.logger.debug(msg)
  411. try:
  412. msg = ujson.loads(msg.data)
  413. except:
  414. # self.logger.warning(f'非json格式string:{msg}')
  415. pass
  416. # print(msg)
  417. # 处理消息
  418. if 'topic' in msg:
  419. if 'wallet' in msg['topic']:self._update_account(msg)
  420. elif 'order' in msg['topic']:self._update_order(msg)
  421. elif 'position' in msg['topic']:self._update_position(msg)
  422. # heartbeat
  423. if time.time() - ping_time > 15:
  424. await _ws.send_str('{"op": "ping"}')
  425. ping_time = time.time()
  426. except:
  427. traceback.print_exc()
  428. print(f'{self.name} ws连接失败 开始重连...')
  429. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  430. self.logger.error(traceback.format_exc())
  431. # await asyncio.sleep(1)
  432. async def run_public(self, sub_trade=0, sub_fast=0):
  433. '''
  434. 订阅public频道
  435. '''
  436. self.sub_fast = sub_fast
  437. while True:
  438. try:
  439. ping_time = time.time()
  440. # 尝试连接
  441. print(f'{self.name} 尝试连接ws')
  442. # 登陆
  443. url = self.BaseURL_public
  444. async with aiohttp.ClientSession(
  445. connector = aiohttp.TCPConnector(
  446. limit=50,
  447. keepalive_timeout=120,
  448. verify_ssl=False,
  449. local_addr=(self.ip,0)
  450. )
  451. ).ws_connect(
  452. url,
  453. proxy=self.proxy,
  454. timeout=30,
  455. receive_timeout=30,
  456. ) as _ws:
  457. print(f'{self.name} ws public 连接成功')
  458. self.logger.info(f'{self.name} ws public 连接成功')
  459. # 订阅
  460. channels=[
  461. f"orderBookL2_25.{self.symbol}" # 推送频率20ms
  462. ]
  463. if sub_trade:
  464. channels += [
  465. f"trade.{self.symbol}"
  466. ]
  467. for i in channels:
  468. sub_str = ujson.dumps({"args": [i], "op":"subscribe"})
  469. await _ws.send_str(sub_str)
  470. while True:
  471. # 接受消息
  472. try:
  473. msg = await _ws.receive(timeout=30)
  474. except:
  475. print(f'{self.name} ws长时间没有收到消息 public 准备重连...')
  476. self.logger.error(f'{self.name} ws长时间没有收到消息 public 准备重连...')
  477. break
  478. # self.logger.debug(msg)
  479. try:
  480. msg = ujson.loads(msg.data)
  481. except:
  482. # self.logger.warning(f'非json格式string:{msg}')
  483. pass
  484. # print(msg)
  485. # 处理消息
  486. if 'data' in msg:
  487. if f'orderBookL2_25.{self.symbol}' == msg['topic']:self._update_depth(msg)
  488. elif 'trade' in msg['topic']:self._update_trade(msg)
  489. # heartbeat
  490. if time.time() - ping_time > 15:
  491. await _ws.send_str('{"op": "ping"}')
  492. ping_time = time.time()
  493. except:
  494. traceback.print_exc()
  495. print(f'{self.name} ws连接失败 开始重连...')
  496. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  497. self.logger.error(traceback.format_exc())
  498. # await asyncio.sleep(1)
  499. if __name__ == "__main__":
  500. p = model.ClientParams()
  501. p.name = ""
  502. p.pair = "matic_usdt"
  503. p.proxy = "http://127.0.0.1:7890"
  504. p.access_key = "nVrNVv0HQ9a1IgaDeC"
  505. p.secret_key = "7zJpfh8rImdrtNO2GnKnGMscKdAJkVMnt6Jl"
  506. p.pass_key = "qwer1234"
  507. p.interval = "0.1"
  508. p.broker_id = "x-nXtHr5jj"
  509. p.debug = "False"
  510. ws = BybitUsdtSwapWs(p, is_print=1)
  511. loop = asyncio.get_event_loop()
  512. tasks = [
  513. asyncio.ensure_future(ws.run(is_auth=1, sub_trade=1)),
  514. # asyncio.ensure_future(ws.go()),
  515. ]
  516. loop.run_until_complete(asyncio.wait(tasks))