huobi_usdt_swap_ws.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. from os import times
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json, ujson
  7. import zlib
  8. import hashlib
  9. import hmac
  10. import base64
  11. import traceback
  12. import random
  13. import gzip, sys
  14. import csv
  15. import logging, logging.handlers
  16. import utils
  17. import model
  18. import datetime
  19. import urllib
  20. def empty_call(msg):
  21. pass
  22. class HuobiUsdtSwapWs:
  23. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  24. if colo:
  25. print('不支持colo高速线路')
  26. self.URL_public = 'wss://api.hbdm.com/linear-swap-ws'
  27. self.URL_private = 'wss://api.hbdm.com/linear-swap-notification'
  28. else:
  29. self.URL_public = 'wss://api.hbdm.com/linear-swap-ws'
  30. self.URL_private = 'wss://api.hbdm.com/linear-swap-notification'
  31. self.params = params
  32. self.name = self.params.name
  33. self.base = self.params.pair.split('_')[0].upper()
  34. self.quote = self.params.pair.split('_')[1].upper()
  35. self.symbol = self.base + '-'+ self.quote
  36. self.callback = {
  37. "onMarket":self.save_market,
  38. "onDepth":empty_call,
  39. "onPosition":empty_call,
  40. "onEquity":empty_call,
  41. "onOrder":empty_call,
  42. "onTicker":empty_call,
  43. "onDepth":empty_call,
  44. "onExit":empty_call,
  45. }
  46. self.is_print = is_print
  47. self.proxy = None
  48. if 'win' in sys.platform:
  49. self.proxy = self.params.proxy
  50. self.logger = self.get_logger()
  51. self.stop_flag = 0
  52. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  53. self.public_update_time = time.time()
  54. self.private_update_time = time.time()
  55. self.expired_time = 300
  56. self.max_buy = 0.0
  57. self.min_sell = 0.0
  58. self.buy_v = 0.0
  59. self.buy_q = 0.0
  60. self.sell_v = 0.0
  61. self.sell_q = 0.0
  62. self.depth = []
  63. #### 指定发包ip
  64. iplist = utils.get_local_ip_list()
  65. self.ip = iplist[int(self.params.ip)]
  66. def get_logger(self):
  67. logger = logging.getLogger(__name__)
  68. logger.setLevel(logging.DEBUG)
  69. # log to txt
  70. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  71. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  72. handler.setLevel(logging.DEBUG)
  73. handler.setFormatter(formatter)
  74. logger.addHandler(handler)
  75. return logger
  76. def save_market(self, msg):
  77. date = time.strftime('%Y-%m-%d',time.localtime())
  78. interval = self.params.interval
  79. if msg:
  80. name = msg['name']
  81. if len(msg['data']) > 1:
  82. with open(f'./history/{name}_{self.symbol}_{interval}_{date}.csv',
  83. 'a',
  84. newline='',
  85. encoding='utf-8') as f:
  86. writer = csv.writer(f, delimiter=',')
  87. writer.writerow(msg['data'])
  88. if self.is_print:print(f'写入行情 {self.symbol}')
  89. async def get_sign(self):
  90. headers = {}
  91. headers['Content-Type'] = 'application/json'
  92. headers['X-MBX-APIKEY'] = self.params.access_key
  93. params = {
  94. 'timestamp':int(time.time())*1000,
  95. 'recvWindow':5000,
  96. }
  97. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  98. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  99. params['signature']=signature
  100. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  101. session = aiohttp.ClientSession()
  102. response = await session.post(
  103. url,
  104. params=params,
  105. headers=headers,
  106. timeout=5,
  107. proxy=self.proxy
  108. )
  109. login_str = await response.text()
  110. await session.close()
  111. return eval(login_str)['listenKey']
  112. def _update_depth(self, msg):
  113. self.public_update_time = time.time()
  114. self.ticker_info["bp"] = float(msg['tick']['bids'][0][0])
  115. self.ticker_info["ap"] = float(msg['tick']['asks'][0][0])
  116. self.callback['onTicker'](self.ticker_info)
  117. ##### 标准化深度
  118. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  119. step = mp * utils.EFF_RANGE / utils.LEVEL
  120. bp = []
  121. ap = []
  122. bv = [0 for _ in range(utils.LEVEL)]
  123. av = [0 for _ in range(utils.LEVEL)]
  124. for i in range(utils.LEVEL):
  125. bp.append(self.ticker_info["bp"]-step*i)
  126. for i in range(utils.LEVEL):
  127. ap.append(self.ticker_info["ap"]+step*i)
  128. #
  129. price_thre = self.ticker_info["bp"] - step
  130. index = 0
  131. for bid in msg['tick']['bids']:
  132. price = float(bid[0])
  133. amount = float(bid[1])
  134. if price > price_thre:
  135. bv[index] += amount
  136. else:
  137. price_thre -= step
  138. index += 1
  139. if index == utils.LEVEL:
  140. break
  141. bv[index] += amount
  142. price_thre = self.ticker_info["ap"] + step
  143. index = 0
  144. for ask in msg['tick']['asks']:
  145. price = float(ask[0])
  146. amount = float(ask[1])
  147. if price < price_thre:
  148. av[index] += amount
  149. else:
  150. price_thre += step
  151. index += 1
  152. if index == utils.LEVEL:
  153. break
  154. av[index] += amount
  155. self.depth = bp + bv + ap + av
  156. self.callback['onDepth']({'name':self.name,'data':self.depth})
  157. def _update_trade(self, msg):
  158. self.public_update_time = time.time()
  159. for i in msg['tick']['data']:
  160. price = float(i['price'])
  161. side = i['direction']
  162. amount = float(i['amount'])
  163. if price > self.max_buy or self.max_buy == 0.0:
  164. self.max_buy = price
  165. if price < self.min_sell or self.min_sell == 0.0:
  166. self.min_sell = price
  167. if side == 'buy':
  168. self.buy_q += amount
  169. self.buy_v += amount*price
  170. elif side == 'sell':
  171. self.sell_q += amount
  172. self.sell_v += amount*price
  173. #### 修正ticker ####
  174. # if side == 'buy' and price > self.ticker_info['ap']:
  175. # self.ticker_info['ap'] = price
  176. # self.callback['onTicker'](self.ticker_info)
  177. # if side == 'sell' and price < self.ticker_info['bp']:
  178. # self.ticker_info['bp'] = price
  179. # self.callback['onTicker'](self.ticker_info)
  180. def _update_account(self, msg):
  181. for i in msg['data']:
  182. if i['margin_asset'] == self.quote:
  183. cash = i['margin_balance']
  184. self.callback['onEquity']({self.quote:cash})
  185. def _update_order(self, msg):
  186. if msg['contract_code'] == self.symbol:
  187. if msg['status'] in [3] : # 新增订单
  188. order_event = dict()
  189. order_event['status'] = "NEW"
  190. order_event['filled'] = 0
  191. order_event['filled_price'] = 0
  192. order_event['client_id'] = msg["client_order_id"] if "client_order_id" in msg else ""
  193. order_event['order_id'] = msg['order_id']
  194. order_event['fee'] = 0.0
  195. self.callback["onOrder"](order_event)
  196. elif msg['status'] in [5,6,7]: # 删除订单
  197. order_event = dict()
  198. order_event['status'] = "REMOVE"
  199. order_event['filled'] = float(msg['trade_volume'])
  200. order_event['filled_price'] = float(msg['trade_price'])
  201. order_event['client_id'] = msg["client_order_id"] if "client_order_id" in msg else ""
  202. order_event['order_id'] = msg['order_id']
  203. if msg['fee_asset'] == self.quote:
  204. order_event['fee'] = float(msg['trade_fee'])
  205. self.callback["onOrder"](order_event)
  206. def _update_position(self, msg):
  207. p = model.Position()
  208. for i in msg['data']:
  209. if i['pair'] == self.symbol:
  210. if i['direction'] == 'buy':
  211. p.longPos = float(i['volume'])
  212. p.longAvg = float(i['cost_hold'])
  213. if i['direction'] == 'sell':
  214. p.shortPos = float(i['volume'])
  215. p.shortAvg = float(i['cost_hold'])
  216. self.callback['onPosition'](p)
  217. def _get_data(self):
  218. market_data = self.depth + [self.max_buy, self.min_sell]
  219. self.max_buy = 0.0
  220. self.min_sell = 0.0
  221. self.buy_v = 0.0
  222. self.buy_q = 0.0
  223. self.sell_v = 0.0
  224. self.sell_q = 0.0
  225. return {'name': self.name,'data':market_data}
  226. async def go(self):
  227. interval = float(self.params.interval)
  228. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  229. ### onTrade
  230. while 1:
  231. try:
  232. # 更新市场信息
  233. market_data = self._get_data()
  234. self.callback['onMarket'](market_data)
  235. except:
  236. traceback.print_exc()
  237. await asyncio.sleep(interval)
  238. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  239. # run
  240. asyncio.create_task(self.run_public(sub_trade=0, sub_fast=0))
  241. if is_auth:
  242. asyncio.create_task(self.run_private())
  243. while True:
  244. await asyncio.sleep(5)
  245. async def run_public(self, sub_trade=0, sub_fast=0):
  246. while True:
  247. try:
  248. # 尝试连接
  249. print(f'{self.name} 尝试连接ws public')
  250. # 登陆
  251. ws_url = self.URL_public
  252. async with aiohttp.ClientSession(
  253. connector = aiohttp.TCPConnector(
  254. limit=50,
  255. keepalive_timeout=120,
  256. verify_ssl=False,
  257. local_addr=(self.ip,0)
  258. )
  259. ).ws_connect(
  260. ws_url,
  261. proxy=self.proxy,
  262. timeout=30,
  263. receive_timeout=30,
  264. ) as _ws:
  265. print(f'{self.name} ws public 连接成功')
  266. # 订阅
  267. symbol = self.symbol
  268. channels=[
  269. f"market.{symbol}.depth.step6",
  270. ]
  271. if sub_trade:
  272. channels.append(f"market.{symbol}.trade.detail")
  273. for i in channels:
  274. sub_str = json.dumps({"sub": i})
  275. await _ws.send_str(sub_str)
  276. while True:
  277. # 停机信号
  278. if self.stop_flag:return
  279. # 接受消息
  280. try:
  281. msg = await _ws.receive(timeout=30)
  282. except:
  283. print(f'{self.name} ws public 长时间没有收到消息 准备重连...')
  284. self.logger.error(f'{self.name} ws public 长时间没有收到消息 准备重连...')
  285. break
  286. msg = ujson.loads(gzip.decompress(msg.data).decode())
  287. # print(msg)
  288. # 处理消息
  289. if 'ch' in msg:
  290. if 'depth' in msg['ch']:self._update_depth(msg)
  291. if 'trade' in msg['ch']:self._update_trade(msg)
  292. if 'ping' in msg:
  293. await _ws.send_str(json.dumps({"pong":int(time.time())*1000}))
  294. except:
  295. traceback.print_exc()
  296. print(f'{self.name} ws public 连接失败 开始重连...')
  297. self.logger.error(f'{self.name} ws public 连接失败 开始重连...')
  298. # await asyncio.sleep(1)
  299. async def run_private(self):
  300. while True:
  301. try:
  302. # 尝试连接
  303. print(f'{self.name} 尝试连接ws private')
  304. # 登陆
  305. ws_url = self.URL_private
  306. async with aiohttp.ClientSession(
  307. connector = aiohttp.TCPConnector(
  308. limit=50,
  309. keepalive_timeout=120,
  310. verify_ssl=False,
  311. local_addr=(self.ip,0)
  312. )
  313. ).ws_connect(
  314. ws_url,
  315. proxy=self.proxy,
  316. timeout=30,
  317. receive_timeout=30,
  318. ) as _ws:
  319. print(f'{self.name} ws private 连接成功')
  320. # 订阅
  321. def generate_signature(method, params, host, request_path, secret_key):
  322. host_url = urllib.parse.urlparse(host).hostname.lower()
  323. sorted_params = sorted(params.items(), key=lambda d: d[0], reverse=False)
  324. encode_params = urllib.parse.urlencode(sorted_params)
  325. payload = [method, host_url, request_path, encode_params]
  326. payload = '{}\n{}\n{}\n{}'.format(payload[0],payload[1],payload[2],payload[3])
  327. payload = payload.encode(encoding="utf8")
  328. secret_key = secret_key.encode(encoding="utf8")
  329. digest = hmac.new(secret_key, payload, digestmod=hashlib.sha256).digest()
  330. signature = base64.b64encode(digest)
  331. signature = signature.decode()
  332. # print(payload)
  333. # digest = hmac.new(secret_key.encode('utf8'), payload.encode(
  334. # 'utf8'), digestmod=hashlib.sha256).digest()
  335. # signature = base64.b64encode(digest).decode()
  336. # get Signature
  337. return signature
  338. timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
  339. suffix = 'AccessKeyId={}&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp={}'.format(
  340. self.params.access_key, timestamp)
  341. payload = '{}\n{}\n{}\n{}'.format("GET", self.URL_private, "/linear-swap-notification", suffix)
  342. digest = hmac.new(self.params.secret_key.encode('utf8'), payload.encode(
  343. 'utf8'), digestmod=hashlib.sha256).digest()
  344. signature = base64.b64encode(digest).decode()
  345. data = {
  346. "AccessKeyId": self.params.access_key,
  347. "SignatureMethod": "HmacSHA256",
  348. "SignatureVersion": "2",
  349. "Timestamp": timestamp
  350. }
  351. # signature = generate_signature("GET", data, self.URL_private, "/swap-notification", self.params.secret_key)
  352. data["op"] = "auth"
  353. data["type"] = "api"
  354. data["Signature"] = signature
  355. await _ws.send_str(ujson.dumps(data))
  356. # position positions_cross.$contract_code
  357. await _ws.send_str(ujson.dumps({"op":"sub","topic": f"positions_cross.{self.symbol.lower()}"}))
  358. # account accounts_cross.$contract_code
  359. await _ws.send_str(ujson.dumps({"op":"sub","topic": f"accounts_cross.{self.symbol.lower()}"}))
  360. # trade orders_cross.$contract_code
  361. await _ws.send_json(ujson.dumps({"op":"sub","topic": f"orders_cross.{self.symbol.lower()}"}))
  362. while True:
  363. # 停机信号
  364. if self.stop_flag:return
  365. # 接受消息
  366. try:
  367. msg = await _ws.receive(timeout=30)
  368. except:
  369. print(f'{self.name} ws private 长时间没有收到消息 准备重连...')
  370. self.logger.error(f'{self.name} ws private 长时间没有收到消息 准备重连...')
  371. break
  372. msg = ujson.loads(gzip.decompress(msg.data).decode())
  373. print(msg)
  374. # 处理消息
  375. if 'ch' in msg:
  376. if 'positions_cross' in msg['topic']:self._update_position(msg)
  377. if 'accounts_cross' in msg['topic']:self._update_account(msg)
  378. if 'orders_cross' in msg['topic']:self._update_order(msg)
  379. if 'ping' in msg:
  380. await _ws.send_str(json.dumps({"pong":int(time.time())*1000}))
  381. except:
  382. traceback.print_exc()
  383. print(f'{self.name} ws private 连接失败 开始重连...')
  384. self.logger.error(f'{self.name} ws private 连接失败 开始重连...')
  385. # await asyncio.sleep(1)