bitget_usdt_swap_ws.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. from os import access
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json, ujson
  7. import zlib
  8. import hashlib
  9. import hmac
  10. import base64
  11. import traceback
  12. import random
  13. import gzip, csv, sys
  14. from uuid import uuid4
  15. import logging, logging.handlers
  16. from yarl import URL
  17. import utils
  18. import model
  19. from decimal import Decimal
  20. def empty_call(msg):
  21. # print(msg)
  22. pass
  23. def sign(message, secret_key):
  24. mac = hmac.new(bytes(secret_key, encoding='utf8'), bytes(message, encoding='utf-8'), digestmod='sha256')
  25. d = mac.digest()
  26. return str(base64.b64encode(d), 'utf8')
  27. def pre_hash(timestamp, method, request_path):
  28. return str(timestamp) + str.upper(method) + str(request_path)
  29. class BitgetUsdtSwapWs:
  30. def __init__(self, params: model.ClientParams, colo=0, is_print=0):
  31. if colo:
  32. print('不支持colo高速线路')
  33. self.BaseURL = "wss://ws.bitget.com/mix/v1/stream"
  34. else:
  35. self.BaseURL = "wss://ws.bitget.com/mix/v1/stream"
  36. self.params = params
  37. self.name = self.params.name
  38. self.base = self.params.pair.split('_')[0].upper()
  39. self.quote = self.params.pair.split('_')[1].upper()
  40. self.symbol = self.base + self.quote
  41. self.instId = self.symbol + '_UMCBL'
  42. self.callback = {
  43. "onMarket":self.save_market,
  44. "onPosition":empty_call,
  45. "onEquity":empty_call,
  46. "onOrder":empty_call,
  47. "onTicker":empty_call,
  48. "onDepth":empty_call,
  49. "onExit":empty_call,
  50. }
  51. self.is_print = is_print
  52. self.proxy = None
  53. if 'win' in sys.platform:
  54. self.proxy = self.params.proxy
  55. self.logger = self.get_logger()
  56. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  57. self.multiplier = None
  58. self.max_buy = 0.0
  59. self.min_sell = 0.0
  60. self.buy_v = 0.0
  61. self.buy_q = 0.0
  62. self.sell_v = 0.0
  63. self.sell_q = 0.0
  64. self.update_t = 0.0
  65. self.depth = []
  66. #### 指定发包ip
  67. iplist = utils.get_local_ip_list()
  68. self.ip = iplist[int(self.params.ip)]
  69. def get_logger(self):
  70. logger = logging.getLogger(__name__)
  71. logger.setLevel(logging.DEBUG)
  72. # log to txt
  73. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  74. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  75. handler.setLevel(logging.DEBUG)
  76. handler.setFormatter(formatter)
  77. logger.addHandler(handler)
  78. return logger
  79. def save_market(self, msg):
  80. date = time.strftime('%Y-%m-%d',time.localtime())
  81. interval = float(self.params.interval)
  82. if msg:
  83. exchange = msg['name']
  84. if len(msg['data']) > 1:
  85. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  86. 'a',
  87. newline='',
  88. encoding='utf-8') as f:
  89. writer = csv.writer(f, delimiter=',')
  90. writer.writerow(msg['data'])
  91. if self.is_print:print(f'写入行情 {self.symbol}')
  92. async def get_sign(self):
  93. headers = {}
  94. headers['Content-Type'] = 'application/json'
  95. headers['X-MBX-APIKEY'] = self.params.access_key
  96. params = {
  97. 'timestamp':int(time.time())*1000,
  98. 'recvWindow':5000,
  99. }
  100. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  101. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  102. params['signature']=signature
  103. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  104. session = aiohttp.ClientSession()
  105. response = await session.post(
  106. url,
  107. params=params,
  108. headers=headers,
  109. timeout=5,
  110. proxy=self.proxy
  111. )
  112. login_str = await response.text()
  113. await session.close()
  114. return ujson.loads(login_str)['listenKey']
  115. def _update_depth(self, msg):
  116. t = int(msg['data'][0]['ts'])
  117. if t > self.update_t:
  118. self.update_t = t
  119. self.ticker_info['bp'] = float(msg['data'][0]['bids'][0][0])
  120. self.ticker_info['ap'] = float(msg['data'][0]['asks'][0][0])
  121. self.callback['onTicker'](self.ticker_info)
  122. ##### 标准化深度
  123. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  124. step = mp * utils.EFF_RANGE / utils.LEVEL
  125. bp = []
  126. ap = []
  127. bv = [0 for _ in range(utils.LEVEL)]
  128. av = [0 for _ in range(utils.LEVEL)]
  129. for i in range(utils.LEVEL):
  130. bp.append(self.ticker_info["bp"]-step*i)
  131. for i in range(utils.LEVEL):
  132. ap.append(self.ticker_info["ap"]+step*i)
  133. #
  134. price_thre = self.ticker_info["bp"] - step
  135. index = 0
  136. for bid in msg['data'][0]['bids']:
  137. price = float(bid[0])
  138. amount = float(bid[1])
  139. if price > price_thre:
  140. bv[index] += amount
  141. else:
  142. price_thre -= step
  143. index += 1
  144. if index == utils.LEVEL:
  145. break
  146. bv[index] += amount
  147. price_thre = self.ticker_info["ap"] + step
  148. index = 0
  149. for ask in msg['data'][0]['asks']:
  150. price = float(ask[0])
  151. amount = float(ask[1])
  152. if price < price_thre:
  153. av[index] += amount
  154. else:
  155. price_thre += step
  156. index += 1
  157. if index == utils.LEVEL:
  158. break
  159. av[index] += amount
  160. self.depth = bp + bv + ap + av
  161. self.callback['onDepth']({'name':self.name,'data':self.depth})
  162. # def _update_ticker(self, msg):
  163. # if msg['data']['sequence'] > self.update_t:
  164. # self.update_t = msg['data']['sequence']
  165. # self.ticker_info['bp'] = float(msg['data']['bestBidPrice'])
  166. # self.ticker_info['ap'] = float(msg['data']['bestAskPrice'])
  167. # self.callback['onTicker'](self.ticker_info)
  168. def _update_trade(self, msg):
  169. for i in msg['data']:
  170. price = float(i[1])
  171. side = i[3]
  172. amount = float(i[2])
  173. if price > self.max_buy or self.max_buy == 0.0:
  174. self.max_buy = price
  175. if price < self.min_sell or self.min_sell == 0.0:
  176. self.min_sell = price
  177. if side == 'buy':
  178. self.buy_q += amount
  179. self.buy_v += amount*price
  180. elif side == 'sell':
  181. self.sell_q += amount
  182. self.sell_v += amount*price
  183. def _update_position(self, msg):
  184. pos = model.Position()
  185. for i in msg['data']:
  186. symbol = i['instName']
  187. if symbol == self.symbol:
  188. amt = float(i["total"])
  189. side = i['holdSide']
  190. ep = float(i["averageOpenPrice"])
  191. if side == 'long':
  192. pos.longPos = amt
  193. pos.longAvg = ep
  194. elif side == 'short':
  195. pos.shortPos = amt
  196. pos.shortAvg = ep
  197. else:
  198. pass
  199. self.callback["onPosition"](pos)
  200. def _update_account(self, msg):
  201. for i in msg['data']:
  202. if i['marginCoin'] == 'USDT':
  203. self.callback['onEquity'] = {self.quote:float(i['equity'])}
  204. def _update_order(self, msg):
  205. self.logger.debug(f"ws订单推送 {msg}")
  206. # print(msg)
  207. for i in msg['data']:
  208. if self.instId == i['instId']:
  209. if i["status"] == 'new': # 新增订单
  210. order_event = dict()
  211. order_event['status'] = "NEW"
  212. order_event['filled'] = 0
  213. order_event['filled_price'] = 0
  214. order_event['client_id'] = i["clOrdId"] if "clOrdId" in i else ""
  215. order_event['order_id'] = i['ordId']
  216. order_event['fee'] = 0.0
  217. self.callback["onOrder"](order_event)
  218. # print('新建',order_event['client_id'])
  219. elif i["status"] in ['full-fill','cancelled']: # 删除订单
  220. # fee 负数是扣手续费 bitget没有返佣
  221. order_event = dict()
  222. order_event['status'] = "REMOVE"
  223. order_event['client_id'] = i["clOrdId"] if "clOrdId" in i else ""
  224. order_event['order_id'] = i['ordId']
  225. order_event['filled'] = float(i["accFillSz"])
  226. order_event['filled_price'] = float(i["fillPx"]) if 'fillPx' in i else float(i['px'])
  227. for j in i['orderFee']:
  228. if j['feeCcy'] == 'USDT':
  229. order_event['fee'] = -float(j['fee'])
  230. self.callback["onOrder"](order_event)
  231. # print('移除',order_event['client_id'])
  232. def _get_data(self):
  233. market_data = self.depth + [self.max_buy, self.min_sell]
  234. self.max_buy = 0.0
  235. self.min_sell = 0.0
  236. self.buy_v = 0.0
  237. self.buy_q = 0.0
  238. self.sell_v = 0.0
  239. self.sell_q = 0.0
  240. return {'name': self.name,'data':market_data}
  241. async def go(self):
  242. interval = float(self.params.interval)
  243. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  244. ### onTrade
  245. while 1:
  246. try:
  247. # 更新市场信息
  248. market_data = self._get_data()
  249. self.callback['onMarket']({'name': self.name,'data':market_data})
  250. except:
  251. traceback.print_exc()
  252. await asyncio.sleep(interval)
  253. async def get_token(self, is_auth):
  254. # 获取 token
  255. if is_auth:
  256. uri = "/api/v1/bullet-private"
  257. else:
  258. uri = "/api/v1/bullet-public"
  259. headers = {}
  260. if is_auth:
  261. now_time = int(time.time()) * 1000
  262. str_to_sign = str(now_time) + "POST" + uri
  263. sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest())
  264. passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest())
  265. headers = {
  266. "KC-API-SIGN": sign.decode(),
  267. "KC-API-TIMESTAMP": str(now_time),
  268. "KC-API-KEY": self.params.access_key,
  269. "KC-API-PASSPHRASE": passphrase.decode(),
  270. "Content-Type": "application/json",
  271. "KC-API-KEY-VERSION": "2"
  272. }
  273. headers["User-Agent"] = "kucoin-python-sdk/v1.0"
  274. session = aiohttp.ClientSession()
  275. response = await session.post(
  276. self.BaseURL+uri,
  277. timeout=5,
  278. headers=headers,
  279. proxy=self.proxy
  280. )
  281. res = await response.text()
  282. res = ujson.loads(res)
  283. await session.close()
  284. if res["code"] == "200000":
  285. token = res["data"]["token"]
  286. ws_connect_id = str(uuid4()).replace('-', '')
  287. endpoint = res["data"]['instanceServers'][0]['endpoint']
  288. ws_endpoint = f"{endpoint}?token={token}&connectId={ws_connect_id}"
  289. encrypt = res["data"]['instanceServers'][0]['encrypt']
  290. if is_auth:
  291. ws_endpoint += '&acceptUserMessage=true'
  292. return ws_endpoint, encrypt
  293. else:
  294. raise Exception("kucoin usdt swap 获取token错误")
  295. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  296. while True:
  297. try:
  298. ping_time = time.time()
  299. # 尝试连接
  300. print(f'{self.name} 尝试连接ws')
  301. # 登陆
  302. ws_url = self.BaseURL
  303. async with aiohttp.ClientSession(
  304. connector = aiohttp.TCPConnector(
  305. limit=50,
  306. keepalive_timeout=120,
  307. verify_ssl=False,
  308. local_addr=(self.ip,0)
  309. )
  310. ).ws_connect(
  311. ws_url,
  312. proxy=self.proxy,
  313. timeout=30,
  314. receive_timeout=30,
  315. ) as _ws:
  316. print(f'{self.name} ws连接成功')
  317. self.logger.info(f'{self.name} ws连接成功')
  318. # 订阅
  319. channels=[
  320. {
  321. "instType":"mc",
  322. "channel":"books5",
  323. "instId":self.symbol
  324. }
  325. ]
  326. if sub_trade:
  327. channels += [
  328. {
  329. "instType":"MC",
  330. "channel":"trade",
  331. "instId":self.symbol
  332. }
  333. ]
  334. if is_auth:
  335. # 先登录
  336. timestamp = int(time.time())
  337. sign_str = sign(pre_hash(timestamp, "GET", '/user/verify'), self.params.secret_key)
  338. await _ws.send_str(ujson.dumps({
  339. "op":"login",
  340. "args":[
  341. {
  342. "apiKey":self.params.access_key,
  343. "passphrase":self.params.pass_key,
  344. "timestamp":timestamp,
  345. "sign":sign_str
  346. }
  347. ]
  348. }))
  349. # 先登录
  350. channels += [
  351. {
  352. "instType": "UMCBL",
  353. "channel": "account",
  354. "instId": "default"
  355. },
  356. {
  357. "instType": "UMCBL",
  358. "channel": "positions",
  359. "instId": "default"
  360. },
  361. {
  362. "channel": "orders",
  363. "instType": "UMCBL",
  364. "instId": "default"
  365. }
  366. ]
  367. for i in channels:
  368. sub_str = ujson.dumps({"args": [i], "op":"subscribe"})
  369. await _ws.send_str(sub_str)
  370. while True:
  371. # 接受消息
  372. try:
  373. msg = await _ws.receive(timeout=30)
  374. except:
  375. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  376. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  377. break
  378. # self.logger.debug(msg)
  379. try:
  380. msg = ujson.loads(msg.data)
  381. except:
  382. # self.logger.warning(f'非json格式string:{msg}')
  383. pass
  384. # 处理消息
  385. if 'data' in msg:
  386. if 'books5' in msg['arg']['channel']:self._update_depth(msg)
  387. # elif "tickerV2" in msg["subject"]:self._update_ticker(msg)
  388. elif 'trade' in msg['arg']['channel']:self._update_trade(msg)
  389. elif 'account' in msg['arg']['channel']:self._update_account(msg)
  390. elif 'orders' in msg['arg']['channel']:self._update_order(msg)
  391. elif 'positions' in msg['arg']['channel']:self._update_position(msg)
  392. # heartbeat
  393. if time.time() - ping_time > 15:
  394. await _ws.send_str("ping")
  395. ping_time = time.time()
  396. except:
  397. traceback.print_exc()
  398. print(f'{self.name} ws连接失败 开始重连...')
  399. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  400. self.logger.error(traceback.format_exc())
  401. # await asyncio.sleep(1)