binance_coin_swap_ws.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json, ujson
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random, csv, sys
  12. import logging, logging.handlers
  13. import utils
  14. import model
  15. def empty_call(msg):
  16. pass
  17. def inflate(data):
  18. '''
  19. 解压缩数据
  20. '''
  21. decompress = zlib.decompressobj(-zlib.MAX_WBITS)
  22. inflated = decompress.decompress(data)
  23. inflated += decompress.flush()
  24. return inflated
  25. class BinanceCoinSwapWs:
  26. def __init__(self, params: model.ClientParams, colo=0, is_print=0):
  27. if colo:
  28. print('不支持colo高速线路')
  29. self.URL = 'wss://dstream.binance.com/ws/'
  30. else:
  31. self.URL = 'wss://dstream.binance.com/ws/'
  32. self.params = params
  33. self.name = self.params.name
  34. self.base = self.params.pair.split('_')[0].upper()
  35. self.quote = self.params.pair.split('_')[1].upper()
  36. self.symbol = self.base + self.quote
  37. if len(self.params.pair.split('_')) > 2:
  38. self.delivery = self.params.pair.split('_')[2] # 210924
  39. self.symbol += f"_{self.delivery}"
  40. else:
  41. self.symbol += '_PERP'
  42. self.callback = {
  43. "onMarket":self.save_market,
  44. "onPosition":empty_call,
  45. "onEquity":empty_call,
  46. "onOrder":empty_call,
  47. "onTicker":empty_call,
  48. "onDepth":empty_call,
  49. "onExit":empty_call,
  50. }
  51. self.is_print = is_print
  52. self.proxy = None
  53. if 'win' in sys.platform:
  54. self.proxy = self.params.proxy
  55. self.logger = self.get_logger()
  56. self.multiplier = None
  57. self.stop_flag = 0
  58. self.max_buy = 0.0
  59. self.min_sell = 0.0
  60. self.buy_v = 0.0
  61. self.buy_q = 0.0
  62. self.sell_v = 0.0
  63. self.sell_q = 0.0
  64. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  65. self.depth = []
  66. #### 指定发包ip
  67. iplist = utils.get_local_ip_list()
  68. self.ip = iplist[int(self.params.ip)]
  69. def get_logger(self):
  70. logger = logging.getLogger(__name__)
  71. logger.setLevel(logging.DEBUG)
  72. # log to txt
  73. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  74. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  75. handler.setLevel(logging.DEBUG)
  76. handler.setFormatter(formatter)
  77. logger.addHandler(handler)
  78. return logger
  79. def save_market(self, msg):
  80. date = time.strftime('%Y-%m-%d',time.localtime())
  81. interval = self.params.interval
  82. if msg:
  83. exchange = msg['name']
  84. if len(msg['data']) > 1:
  85. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  86. 'a',
  87. newline='',
  88. encoding='utf-8') as f:
  89. writer = csv.writer(f, delimiter=',')
  90. writer.writerow(msg['data'])
  91. if self.is_print:print(f'写入行情 {self.symbol}')
  92. async def get_sign(self):
  93. headers = {}
  94. headers['Content-Type'] = 'application/json'
  95. headers['X-MBX-APIKEY'] = self.params.access_key
  96. params = {
  97. 'timestamp':int(time.time())*1000,
  98. 'recvWindow':5000,
  99. }
  100. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  101. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  102. params['signature']=signature
  103. url = 'https://dapi.binance.com/dapi/v1/listenKey'
  104. session = aiohttp.ClientSession()
  105. response = await session.post(
  106. url,
  107. params=params,
  108. headers=headers,
  109. timeout=5,
  110. proxy=self.proxy
  111. )
  112. login_str = await response.text()
  113. await session.close()
  114. return ujson.loads(login_str)['listenKey']
  115. async def get_depth_flash(self):
  116. headers = {}
  117. headers['Content-Type'] = 'application/json'
  118. headers['X-MBX-APIKEY'] = self.params.access_key
  119. url = f'https://dapi.binance.com/dapi/v1/depth?symbol={self.symbol}&limit=1000'
  120. session = aiohttp.ClientSession()
  121. response = await session.get(
  122. url,
  123. headers=headers,
  124. timeout=5,
  125. proxy=self.proxy
  126. )
  127. depth_flash = await response.text()
  128. await session.close()
  129. return ujson.loads(depth_flash)
  130. def _update_ticker(self, msg):
  131. msg = ujson.loads(msg)
  132. bp = float(msg['b'])
  133. bq = float(msg['B'])
  134. ap = float(msg['a'])
  135. aq = float(msg['A'])
  136. self.ticker_info['bp'] = bp
  137. self.ticker_info['ap'] = ap
  138. self.callback['onTicker'](self.ticker_info)
  139. #### 标准化深度
  140. self.depth = [bp,bq,ap,aq]
  141. self.callback['onDepth']({'name':self.name,'data':self.depth})
  142. def _update_depth(self, msg):
  143. msg = ujson.loads(msg)
  144. ##### on ticker event
  145. self.ticker_info['bp'] = float(msg['b'][0][0])
  146. self.ticker_info['ap'] = float(msg['a'][0][0])
  147. self.callback['onTicker'](self.ticker_info)
  148. ##### 标准化深度
  149. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  150. step = mp * utils.EFF_RANGE / utils.LEVEL
  151. bp = []
  152. ap = []
  153. bv = [0 for _ in range(utils.LEVEL)]
  154. av = [0 for _ in range(utils.LEVEL)]
  155. for i in range(utils.LEVEL):
  156. bp.append(self.ticker_info["bp"]-step*i)
  157. for i in range(utils.LEVEL):
  158. ap.append(self.ticker_info["ap"]+step*i)
  159. #
  160. price_thre = self.ticker_info["bp"] - step
  161. index = 0
  162. for bid in msg['b']:
  163. price = float(bid[0])
  164. amount = float(bid[1])
  165. if price > price_thre:
  166. bv[index] += amount
  167. else:
  168. price_thre -= step
  169. index += 1
  170. if index == utils.LEVEL:
  171. break
  172. bv[index] += amount
  173. price_thre = self.ticker_info["ap"] + step
  174. index = 0
  175. for ask in msg['a']:
  176. price = float(ask[0])
  177. amount = float(ask[1])
  178. if price < price_thre:
  179. av[index] += amount
  180. else:
  181. price_thre += step
  182. index += 1
  183. if index == utils.LEVEL:
  184. break
  185. av[index] += amount
  186. self.depth = bp + bv + ap + av
  187. self.callback['onDepth']({'name':self.name,'data':self.depth})
  188. def _update_trade(self, msg):
  189. msg = ujson.loads(msg)
  190. side = 'sell' if msg['m'] else 'buy'
  191. price = float(msg['p'])
  192. amount = float(msg['q'])
  193. if price > self.max_buy or self.max_buy == 0.0:
  194. self.max_buy = price
  195. if price < self.min_sell or self.min_sell == 0.0:
  196. self.min_sell = price
  197. if side == 'buy':
  198. self.buy_q += amount
  199. self.buy_v += amount*price
  200. elif side == 'sell':
  201. self.sell_q += amount
  202. self.sell_v += amount*price
  203. def _update_account(self, msg):
  204. msg = ujson.loads(msg)
  205. for i in msg['a']['B']:
  206. if i['a'].lower() == self.base.lower():
  207. self.callback['onEquity']({self.quote:float(i['wb'])})
  208. def _update_order(self, msg):
  209. msg = ujson.loads(msg)
  210. i = msg['o']
  211. if i['s'] == self.symbol:
  212. if i['X'] == 'NEW': # 新增订单
  213. pass
  214. if i['X'] == 'FILLED': # 删除订单
  215. self.callback['onOrder']({"deleteOrder":i['i']})
  216. if i['X'] == 'CANCELED': # 删除订单
  217. self.callback['onOrder']({"deleteOrder":i['i']})
  218. def _update_position(self, msg):
  219. long_pos, short_pos = 0, 0
  220. long_avg, short_avg = 0, 0
  221. msg = ujson.loads(msg)
  222. for i in msg['a']['P']:
  223. if i['s'] == self.symbol:
  224. if i['ps'] == 'LONG':
  225. long_pos += abs(float(i['pa']))
  226. long_avg = abs(float(i['ep']))
  227. if i['ps'] == 'SHORT':
  228. short_pos += abs(float(i['pa']))
  229. short_avg = abs(float(i['ep']))
  230. pos = model.Position()
  231. pos.longPos = long_pos
  232. pos.shortPos = short_pos
  233. pos.longAvg = long_avg
  234. pos.shortAvg = short_avg
  235. self.callback['onPosition'](pos)
  236. def _get_data(self):
  237. market_data = self.depth + [self.max_buy, self.min_sell]
  238. self.max_buy = 0
  239. self.min_sell = 0
  240. self.buy_v = 0.0
  241. self.buy_q = 0.0
  242. self.sell_v = 0.0
  243. self.sell_q = 0.0
  244. return {'name': self.name,'data':market_data}
  245. async def go(self):
  246. interval = float(self.params.interval)
  247. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  248. ### onTrade
  249. while 1:
  250. try:
  251. # 更新市场信息
  252. market_data = self._get_data()
  253. self.callback['onMarket'](market_data)
  254. except:
  255. traceback.print_exc()
  256. await asyncio.sleep(interval)
  257. async def before_trade(self):
  258. session = aiohttp.ClientSession()
  259. response = await session.get(
  260. "https://dapi.binance.com/dapi/v1/exchangeInfo",
  261. proxy=self.proxy
  262. )
  263. response = await response.json()
  264. for i in response['symbols']:
  265. if self.symbol in i['symbol'].upper():
  266. self.multiplier = float(i['contractSize'])
  267. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  268. session = aiohttp.ClientSession()
  269. while True:
  270. try:
  271. # 尝试连接
  272. print(f'{self.name} 尝试连接ws')
  273. # 登陆
  274. if is_auth:
  275. listenKey = await self.get_sign()
  276. else:
  277. listenKey = 'qqlh'
  278. # 更新
  279. await self.before_trade()
  280. async with session.ws_connect(
  281. self.URL+listenKey,
  282. proxy=self.proxy,
  283. timeout=30,
  284. receive_timeout=30,
  285. ) as _ws:
  286. print(f'{self.name} ws连接成功')
  287. self.logger.info(f'{self.name} ws连接成功')
  288. # 订阅
  289. symbol = self.symbol.lower()
  290. if sub_fast:
  291. channels=[f"{symbol}@bookTicker",]
  292. else:
  293. channels=[f"{symbol}@depth20@100ms",]
  294. if sub_trade:
  295. channels.append(f"{symbol}@aggTrade")
  296. sub_str = ujson.dumps({"method": "SUBSCRIBE", "params": channels, "id":random.randint(1,1000)})
  297. await _ws.send_str(sub_str)
  298. while True:
  299. # 停机信号
  300. if self.stop_flag:return
  301. # 接受消息
  302. try:
  303. msg = await _ws.receive(timeout=30)
  304. except:
  305. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  306. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  307. break
  308. msg = msg.data
  309. # 处理消息
  310. if 'depthUpdate' in msg:self._update_depth(msg)
  311. elif 'aggTrade' in msg:self._update_trade(msg)
  312. elif 'bookTicker' in msg:self._update_ticker(msg)
  313. elif 'ACCOUNT_UPDATE' in msg:self._update_position(msg)
  314. elif 'ACCOUNT_UPDATE' in msg:self._update_account(msg)
  315. elif 'ORDER_TRADE_UPDATE' in msg:self._update_order(msg)
  316. elif 'ping' in msg:await _ws.send_str('pong')
  317. elif 'listenKeyExpired' in msg:raise Exception('key过期重连')
  318. except:
  319. _ws = None
  320. traceback.print_exc()
  321. print(f'{self.name} ws连接失败 开始重连...')
  322. self.logger.error(f'{self.name} ws连接失败 开始重连...')