huobi_spot_ws.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random
  12. import gzip, csv, sys
  13. import logging, logging.handlers
  14. import utils
  15. import model
  16. def empty_call(msg):
  17. pass
  18. class HuobiSpotWs:
  19. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  20. if colo:
  21. print('不支持colo高速线路')
  22. self.URL = 'wss://api.huobi.pro/ws'
  23. else:
  24. self.URL = 'wss://api.huobi.pro/ws'
  25. self.params = params
  26. self.name = self.params.name
  27. self.base = self.params.pair.split('_')[0].upper()
  28. self.quote = self.params.pair.split('_')[1].upper()
  29. self.symbol = self.base + self.quote
  30. self.data = dict()
  31. self.data['trade'] = []
  32. self.data['force'] = []
  33. self.callback = {
  34. "onMarket":self.save_market,
  35. "onPosition":empty_call,
  36. "onEquity":empty_call,
  37. "onOrder":empty_call,
  38. "onTicker":empty_call,
  39. "onDepth":empty_call,
  40. "onDepth":empty_call,
  41. "onExit":empty_call,
  42. }
  43. self.is_print = is_print
  44. self.proxy = None
  45. if 'win' in sys.platform:
  46. self.proxy = self.params.proxy
  47. self.logger = self.get_logger()
  48. self.stop_flag = 0
  49. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  50. self.public_update_time = time.time()
  51. self.private_update_time = time.time()
  52. self.expired_time = 300
  53. self.update_t = 0.0
  54. self.max_buy = 0.0
  55. self.min_sell = 0.0
  56. self.buy_v = 0.0
  57. self.buy_q = 0.0
  58. self.sell_v = 0.0
  59. self.sell_q = 0.0
  60. self.depth = []
  61. #### 指定发包ip
  62. iplist = utils.get_local_ip_list()
  63. self.ip = iplist[int(self.params.ip)]
  64. def get_logger(self):
  65. logger = logging.getLogger(__name__)
  66. logger.setLevel(logging.DEBUG)
  67. # log to txt
  68. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  69. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  70. handler.setLevel(logging.DEBUG)
  71. handler.setFormatter(formatter)
  72. logger.addHandler(handler)
  73. return logger
  74. def save_market(self, msg):
  75. date = time.strftime('%Y-%m-%d',time.localtime())
  76. interval = self.params.interval
  77. if msg:
  78. exchange = msg['name']
  79. if len(msg['data']) > 1:
  80. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  81. 'a',
  82. newline='',
  83. encoding='utf-8') as f:
  84. writer = csv.writer(f, delimiter=',')
  85. writer.writerow(msg['data'])
  86. if self.is_print:print(f'写入行情 {self.symbol}')
  87. async def get_sign(self):
  88. headers = {}
  89. headers['Content-Type'] = 'application/json'
  90. headers['X-MBX-APIKEY'] = self.params.access_key
  91. params = {
  92. 'timestamp':int(time.time())*1000,
  93. 'recvWindow':5000,
  94. }
  95. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  96. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  97. params['signature']=signature
  98. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  99. session = aiohttp.ClientSession()
  100. response = await session.post(
  101. url,
  102. params=params,
  103. headers=headers,
  104. timeout=5,
  105. proxy=self.proxy
  106. )
  107. login_str = await response.text()
  108. await session.close()
  109. return eval(login_str)['listenKey']
  110. def _update_depth(self, msg):
  111. if msg['ts'] > self.update_t:
  112. self.update_t = msg['ts']
  113. ####
  114. self.ticker_info["bp"] = float(msg['tick']['bids'][0][0])
  115. self.ticker_info["ap"] = float(msg['tick']['asks'][0][0])
  116. self.callback['onTicker'](self.ticker_info)
  117. ##### 标准化深度
  118. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  119. step = mp * utils.EFF_RANGE / utils.LEVEL
  120. bp = []
  121. ap = []
  122. bv = [0 for _ in range(utils.LEVEL)]
  123. av = [0 for _ in range(utils.LEVEL)]
  124. for i in range(utils.LEVEL):
  125. bp.append(self.ticker_info["bp"]-step*i)
  126. for i in range(utils.LEVEL):
  127. ap.append(self.ticker_info["ap"]+step*i)
  128. #
  129. price_thre = self.ticker_info["bp"] - step
  130. index = 0
  131. for bid in msg['tick']['bids']:
  132. price = float(bid[0])
  133. amount = float(bid[1])
  134. if price > price_thre:
  135. bv[index] += amount
  136. else:
  137. price_thre -= step
  138. index += 1
  139. if index == utils.LEVEL:
  140. break
  141. bv[index] += amount
  142. price_thre = self.ticker_info["ap"] + step
  143. index = 0
  144. for ask in msg['tick']['asks']:
  145. price = float(ask[0])
  146. amount = float(ask[1])
  147. if price < price_thre:
  148. av[index] += amount
  149. else:
  150. price_thre += step
  151. index += 1
  152. if index == utils.LEVEL:
  153. break
  154. av[index] += amount
  155. self.depth = bp + bv + ap + av
  156. self.callback['onDepth']({'name':self.name,'data':self.depth})
  157. else:
  158. self.logger.debug(f"depth时间戳错误 {self.update_t}")
  159. def _update_trade(self, msg):
  160. for i in msg['tick']['data']:
  161. side = i['direction']
  162. price = float(i['price'])
  163. amount = float(i['amount'])
  164. if price > self.max_buy or self.max_buy == 0.0:
  165. self.max_buy = price
  166. if price < self.min_sell or self.min_sell == 0.0:
  167. self.min_sell = price
  168. if side == 'buy':
  169. self.buy_q += amount
  170. self.buy_v += amount*price
  171. elif side == 'sell':
  172. self.sell_q += amount
  173. self.sell_v += amount*price
  174. #### 修正ticker ####
  175. # if side == 'buy' and price > self.ticker_info['ap']:
  176. # self.ticker_info['ap'] = price
  177. # self.callback['onTicker'](self.ticker_info)
  178. # if side == 'sell' and price < self.ticker_info['bp']:
  179. # self.ticker_info['bp'] = price
  180. # self.callback['onTicker'](self.ticker_info)
  181. def _update_account(self, msg):
  182. msg = eval(msg)
  183. for i in msg['a']['B']:
  184. if i['a'] == 'USDT':
  185. self.data['equity'] = float(i['wb'])
  186. self.callback['onEquity'](self.data['equity'])
  187. def _update_order(self, msg):
  188. msg = json.loads(msg)
  189. i = msg['o']
  190. if i['s'] == self.symbol:
  191. if i['X'] == 'NEW': # 新增订单
  192. pass
  193. # self.callback['onOrder']({"newOrder":newOrder})
  194. if i['X'] == 'FILLED': # 删除订单
  195. self.callback['onOrder']({"deleteOrder":i['i']})
  196. if i['X'] == 'CANCELED': # 删除订单
  197. self.callback['onOrder']({"deleteOrder":i['i']})
  198. def _update_position(self, msg):
  199. long_pos, short_pos = 0, 0
  200. long_avg, short_avg = 0, 0
  201. msg = eval(msg)
  202. for i in msg['a']['P']:
  203. if i['s'] == self.symbol:
  204. if i['ps'] == 'LONG':
  205. long_pos += float(i['pa'])
  206. long_avg = float(i['ep'])
  207. if i['ps'] == 'SHORT':
  208. short_pos += float(i['pa'])
  209. short_avg = float(i['ep'])
  210. pos = model.Position()
  211. self.callback['onPosition'](pos)
  212. def _get_data(self):
  213. market_data = self.depth + [self.max_buy, self.min_sell]
  214. self.max_buy = 0.0
  215. self.min_sell = 0.0
  216. self.buy_v = 0.0
  217. self.buy_q = 0.0
  218. self.sell_v = 0.0
  219. self.sell_q = 0.0
  220. return {'name': self.name,'data':market_data}
  221. async def go(self):
  222. interval = float(self.params.interval)
  223. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  224. ### onTrade
  225. while 1:
  226. try:
  227. # 更新市场信息
  228. market_data = self._get_data()
  229. self.callback['onMarket'](market_data)
  230. except:
  231. traceback.print_exc()
  232. await asyncio.sleep(interval)
  233. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  234. while True:
  235. try:
  236. # 尝试连接
  237. print(f'{self.name} 尝试连接ws')
  238. # 登陆
  239. ws_url = self.URL
  240. async with aiohttp.ClientSession(
  241. connector = aiohttp.TCPConnector(
  242. limit=50,
  243. keepalive_timeout=120,
  244. verify_ssl=False,
  245. local_addr=(self.ip,0)
  246. )
  247. ).ws_connect(
  248. ws_url,
  249. proxy=self.proxy,
  250. timeout=30,
  251. receive_timeout=30,
  252. ) as _ws:
  253. print(f'{self.name} ws连接成功')
  254. # 订阅
  255. symbol = self.symbol.lower()
  256. channels=[
  257. f"market.{symbol}.depth.step0",
  258. ]
  259. if sub_trade:
  260. channels.append(f"market.{symbol}.trade.detail")
  261. for i in channels:
  262. sub_str = json.dumps({"sub": i})
  263. await _ws.send_str(sub_str)
  264. while True:
  265. # 停机信号
  266. if self.stop_flag:return
  267. # 接受消息
  268. try:
  269. msg = await _ws.receive(timeout=10)
  270. except:
  271. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  272. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  273. break
  274. msg = json.loads(gzip.decompress(msg.data).decode())
  275. # print(msg)
  276. # 处理消息
  277. if 'ch' in msg:
  278. if 'depth' in msg['ch']:self._update_depth(msg)
  279. if 'trade' in msg['ch']:self._update_trade(msg)
  280. # if 'ACCOUNT_UPDATE' in msg:self._update_position(msg)
  281. # if 'ACCOUNT_UPDATE' in msg:self._update_account(msg)
  282. # if 'ORDER_TRADE_UPDATE' in msg:self._update_order(msg)
  283. if 'ping' in msg:
  284. await _ws.send_str(json.dumps({"pong":int(time.time())*1000}))
  285. except:
  286. traceback.print_exc()
  287. print(f'{self.name} ws连接失败 开始重连...')
  288. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  289. # await asyncio.sleep(1)