binance_usdt_swap_ws.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json, ujson
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random, csv, sys, utils
  12. import logging, logging.handlers
  13. import model
  14. from loguru import logger
  15. def empty_call(msg):
  16. pass
  17. def timeit(func):
  18. def wrapper(*args, **kwargs):
  19. nowTime = time.time()
  20. res = func(*args, **kwargs)
  21. spend_time = time.time() - nowTime
  22. spend_time = round(spend_time * 1000, 5)
  23. print(f'{func.__name__} 耗时 {spend_time} ms')
  24. return res
  25. return wrapper
  26. def inflate(data):
  27. '''
  28. 解压缩数据
  29. '''
  30. decompress = zlib.decompressobj(-zlib.MAX_WBITS)
  31. inflated = decompress.decompress(data)
  32. inflated += decompress.flush()
  33. return inflated
  34. class BinanceUsdtSwapWs:
  35. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  36. if colo:
  37. print('不支持colo高速线路')
  38. self.URL = 'wss://fstream.binance.com/ws/'
  39. else:
  40. self.URL = 'wss://fstream.binance.com/ws/'
  41. self.params = params
  42. self.name = self.params.name
  43. self.base = params.pair.split('_')[0].upper()
  44. self.quote = params.pair.split('_')[1].upper()
  45. self.symbol = self.base + self.quote
  46. if len(self.params.pair.split('_')) > 2:
  47. self.delivery = self.params.pair.split('_')[2] # 210924
  48. self.symbol += f"_{self.delivery}"
  49. self.callback = {
  50. "onMarket":self.save_market,
  51. "onPosition":empty_call,
  52. "onEquity":empty_call,
  53. "onOrder":empty_call,
  54. "onTicker":empty_call,
  55. "onDepth":empty_call,
  56. "onExit":empty_call,
  57. }
  58. self.is_print = is_print
  59. self.proxy = None
  60. if 'win' in sys.platform:
  61. self.proxy = self.params.proxy
  62. self.logger = self.get_logger()
  63. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  64. self.stop_flag = 0
  65. self.public_update_time = time.time()
  66. self.private_update_time = time.time()
  67. self.expired_time = 300
  68. ### 更新id
  69. self.update_flag_u = 0
  70. self.max_buy = 0.0
  71. self.min_sell = 0.0
  72. self.buy_v = 0.0
  73. self.buy_q = 0.0
  74. self.sell_v = 0.0
  75. self.sell_q = 0.0
  76. self.depth = []
  77. ####
  78. self.depth_update = []
  79. self.need_flash = 1
  80. self.lastUpdateId = None # 就是小写u
  81. self.depth_full = dict()
  82. self.depth_full['bids'] = dict()
  83. self.depth_full['asks'] = dict()
  84. self.decimal = 99
  85. #### 指定发包ip
  86. iplist = utils.get_local_ip_list()
  87. self.ip = iplist[int(self.params.ip)]
  88. def get_logger(self):
  89. logger = logging.getLogger(__name__)
  90. logger.setLevel(logging.DEBUG)
  91. # log to txt
  92. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  93. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  94. handler.setLevel(logging.DEBUG)
  95. handler.setFormatter(formatter)
  96. logger.addHandler(handler)
  97. return logger
  98. def save_market(self, msg):
  99. date = time.strftime('%Y-%m-%d',time.localtime())
  100. interval = self.params.interval
  101. if msg:
  102. exchange = msg['name']
  103. if len(msg['data']) > 1:
  104. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  105. 'a',
  106. newline='',
  107. encoding='utf-8') as f:
  108. writer = csv.writer(f, delimiter=',')
  109. writer.writerow(msg['data'])
  110. if self.is_print:print(f'写入行情 {self.symbol}')
  111. async def get_sign(self):
  112. headers = {}
  113. headers['Content-Type'] = 'application/json'
  114. headers['X-MBX-APIKEY'] = self.params.access_key
  115. params = {
  116. 'timestamp':int(time.time())*1000,
  117. 'recvWindow':5000,
  118. }
  119. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  120. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  121. params['signature']=signature
  122. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  123. session = aiohttp.ClientSession()
  124. response = await session.post(
  125. url,
  126. params=params,
  127. headers=headers,
  128. timeout=5,
  129. proxy=self.proxy
  130. )
  131. self.logger.debug("申请key")
  132. login_str = await response.text()
  133. print(login_str)
  134. self.logger.debug(login_str)
  135. await session.close()
  136. try:
  137. return ujson.loads(login_str)['listenKey']
  138. except:
  139. self.logger.error('登录失败')
  140. return 'qqlh'
  141. async def long_key(self):
  142. headers = {}
  143. headers['Content-Type'] = 'application/json'
  144. headers['X-MBX-APIKEY'] = self.params.access_key
  145. params = {
  146. 'timestamp':int(time.time())*1000,
  147. 'recvWindow':5000,
  148. }
  149. query_string = "&".join(["{}={}".format(k, params[k]) for k in sorted(params.keys())])
  150. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  151. params['signature']=signature
  152. url = 'https://fapi.binance.com/fapi/v1/listenKey'
  153. session = aiohttp.ClientSession()
  154. response = await session.put(
  155. url,
  156. params=params,
  157. headers=headers,
  158. timeout=5,
  159. proxy=self.proxy
  160. )
  161. self.logger.debug("续期key")
  162. login_str = await response.text()
  163. self.logger.debug(login_str)
  164. await session.close()
  165. return ujson.loads(login_str)
  166. def _check_update_u(self, id):
  167. if id > self.update_flag_u:
  168. self.update_flag_u = id
  169. return 0
  170. else:
  171. return 1
  172. def _update_ticker(self, msg):
  173. self.public_update_time = time.time()
  174. msg = ujson.loads(msg)
  175. if self._check_update_u(msg['u']):
  176. return
  177. else:
  178. bp = float(msg['b'])
  179. bq = float(msg['B'])
  180. ap = float(msg['a'])
  181. aq = float(msg['A'])
  182. self.ticker_info["bp"] = bp
  183. self.ticker_info["ap"] = ap
  184. self.callback['onTicker'](self.ticker_info)
  185. ### 标准化深度
  186. self.depth = [bp,bq,ap,aq]
  187. self.callback['onDepth']({'name':self.name,'data':self.depth})
  188. # @timeit
  189. def _update_depth20(self, msg):
  190. # ge = json.loads(msg)['T']
  191. # logger.info(int(time.time() * 1000) - ge)
  192. # logger.info(msg)
  193. self.public_update_time = time.time()
  194. msg = ujson.loads(msg)
  195. if self._check_update_u(msg['u']):
  196. return
  197. else:
  198. # 更新ticker信息但不触发
  199. self.ticker_info["bp"] = float(msg['b'][0][0])
  200. self.ticker_info["ap"] = float(msg['a'][0][0])
  201. self.callback['onTicker'](self.ticker_info)
  202. if self.decimal == 99:self.decimal = utils.num_to_decimal(msg['b'][0][0])
  203. ##### 标准化深度
  204. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  205. step = round(mp * utils.EFF_RANGE / utils.LEVEL, self.decimal)
  206. bp = []
  207. ap = []
  208. bv = [0 for _ in range(utils.LEVEL)]
  209. av = [0 for _ in range(utils.LEVEL)]
  210. for i in range(utils.LEVEL):
  211. bp.append(round(self.ticker_info["bp"]-step*i, self.decimal))
  212. for i in range(utils.LEVEL):
  213. ap.append(round(self.ticker_info["ap"]+step*i, self.decimal))
  214. ###############################################
  215. price_thre = self.ticker_info["bp"] - step
  216. index = 0
  217. for i in msg['b']:
  218. price = float(i[0])
  219. amount = float(i[1])
  220. if price > price_thre:
  221. bv[index] += amount
  222. else:
  223. price_thre -= step
  224. index += 1
  225. if index == utils.LEVEL:
  226. break
  227. bv[index] += amount
  228. price_thre = self.ticker_info["ap"] + step
  229. index = 0
  230. for i in msg['a']:
  231. price = float(i[0])
  232. amount = float(i[1])
  233. if price < price_thre:
  234. av[index] += amount
  235. else:
  236. price_thre += step
  237. index += 1
  238. if index == utils.LEVEL:
  239. break
  240. av[index] += amount
  241. self.depth = bp + bv + ap + av
  242. self.callback['onDepth']({'name':self.name,'data':self.depth})
  243. # @timeit
  244. def _update_depth(self, msg):
  245. self.public_update_time = time.time()
  246. msg = ujson.loads(msg)
  247. self.depth_update.append(msg)
  248. ### 检查是否有遗漏
  249. for i in range(1,len(self.depth_update)):
  250. if self.depth_update[i]['pu'] != self.depth_update[i]['u']:
  251. self.need_flash = 1
  252. self.logger.error('发现遗漏增量深度推送 重置绝对深度')
  253. return
  254. # print(len(self.depth_update))
  255. if self.need_flash == 0: # 可以更新深度
  256. for i in self.depth_update[:]:
  257. u = i['u']
  258. U = i['U']
  259. pu = i['pu']
  260. # print(f'处理 {u}')
  261. if u < self.lastUpdateId: # 丢弃过旧的信息
  262. self.depth_update.remove(i)
  263. else:
  264. if u >= self.lastUpdateId: # 后续更新本地副本
  265. # print(f'符合要求 {u}')
  266. # 开始更新深度
  267. for j in i['b']:
  268. price = float(j[0])
  269. amount = float(j[1])
  270. if amount > 0:
  271. self.depth_full['bids'][price] = amount
  272. else:
  273. if price in self.depth_full['bids']:del(self.depth_full['bids'][price])
  274. for j in i['a']:
  275. price = float(j[0])
  276. amount = float(j[1])
  277. if amount > 0:
  278. self.depth_full['asks'][price] = amount
  279. else:
  280. if price in self.depth_full['asks']:del(self.depth_full['asks'][price])
  281. self.depth_update.remove(i)
  282. self.lastUpdateId = u
  283. else:
  284. self.logger.error('增量深度不满足文档要求的条件')
  285. buyP = list(self.depth_full['bids'].keys())
  286. buyP.sort(reverse=True) # 从大到小
  287. sellP = list(self.depth_full['asks'].keys())
  288. sellP.sort(reverse=False) # 从小到大
  289. # update ticker
  290. self.ticker_info["bp"] = float(buyP[0])
  291. self.ticker_info["ap"] = float(sellP[0])
  292. self.callback['onTicker'](self.ticker_info)
  293. if self.ticker_info["bp"] > self.ticker_info["ap"]:
  294. self.need_flash = 1
  295. ##### 标准化深度
  296. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  297. step = mp * utils.EFF_RANGE / utils.LEVEL
  298. bp = []
  299. ap = []
  300. bv = [0 for _ in range(utils.LEVEL)]
  301. av = [0 for _ in range(utils.LEVEL)]
  302. for i in range(utils.LEVEL):
  303. bp.append(self.ticker_info["bp"]-step*i)
  304. for i in range(utils.LEVEL):
  305. ap.append(self.ticker_info["ap"]+step*i)
  306. #
  307. price_thre = self.ticker_info["bp"] - step
  308. index = 0
  309. for price in buyP:
  310. if price > price_thre:
  311. bv[index] += self.depth_full['bids'][price]
  312. else:
  313. price_thre -= step
  314. index += 1
  315. if index == utils.LEVEL:
  316. break
  317. bv[index] += self.depth_full['bids'][price]
  318. price_thre = self.ticker_info["ap"] + step
  319. index = 0
  320. for price in sellP:
  321. if price < price_thre:
  322. av[index] += self.depth_full['asks'][price]
  323. else:
  324. price_thre += step
  325. index += 1
  326. if index == utils.LEVEL:
  327. break
  328. av[index] += self.depth_full['asks'][price]
  329. self.depth = bp + bv + ap + av
  330. self.callback['onDepth']({'name':self.name,'data':self.depth})
  331. def _update_trade(self, msg):
  332. '''
  333. 根据trade修正depth对性能消耗很大
  334. '''
  335. self.public_update_time = time.time()
  336. msg = ujson.loads(msg)
  337. price = float(msg['p'])
  338. if self.decimal == 99:self.decimal=utils.num_to_decimal(price)
  339. amount = float(msg['q'])
  340. side = 'sell' if msg['m'] else 'buy'
  341. if price > self.max_buy or self.max_buy == 0.0:
  342. self.max_buy = price
  343. if price < self.min_sell or self.min_sell == 0.0:
  344. self.min_sell = price
  345. if side == 'buy':
  346. self.buy_q += amount
  347. self.buy_v += amount*price
  348. elif side == 'sell':
  349. self.sell_q += amount
  350. self.sell_v += amount*price
  351. #### 修正ticker ####
  352. # side = 'sell' if msg['m'] else 'buy'
  353. # if side == 'buy' and price > self.ticker_info['ap']:
  354. # self.ticker_info['ap'] = price
  355. # self.callback['onTicker'](self.ticker_info)
  356. # if side == 'sell' and price < self.ticker_info['bp']:
  357. # self.ticker_info['bp'] = price
  358. # self.callback['onTicker'](self.ticker_info)
  359. def _update_account(self, msg):
  360. self.private_update_time = time.time()
  361. msg = ujson.loads(msg)
  362. for i in msg['a']['B']:
  363. if i['a'] == self.quote:
  364. self.callback['onEquity']({self.quote:float(i['wb'])})
  365. def _update_order(self, msg):
  366. '''将ws收到的订单信息触发quant'''
  367. msg = ujson.loads(msg)
  368. self.logger.debug(f"ws订单推送 {msg}")
  369. data = msg['o']
  370. if self.symbol in data['s']:
  371. order_event = dict()
  372. status = data['X']
  373. if status == "NEW": # 新增
  374. local_status = "NEW"
  375. elif status in ["CANCELED", "FILLED", "EXPIRED"]: # 删除
  376. local_status = "REMOVE"
  377. elif status in ["PARTIALLY_FILLED"]: # 忽略
  378. return
  379. else:
  380. print("未知订单状态",data)
  381. return
  382. order_event['status'] = local_status
  383. order_event['filled_price'] = float(data['ap'])
  384. order_event['filled'] = float(data['z'])
  385. order_event['client_id'] = data['c']
  386. order_event['order_id'] = data['i']
  387. self.callback['onOrder'](order_event)
  388. self.private_update_time = time.time()
  389. def _update_position(self, msg):
  390. long_pos, short_pos = 0, 0
  391. long_avg, short_avg = 0, 0
  392. msg = ujson.loads(msg)
  393. is_update = 0
  394. for i in msg['a']['P']:
  395. if i['s'] == self.symbol:
  396. is_update = 1
  397. if i['ps'] == 'LONG':
  398. long_pos += abs(float(i['pa']))
  399. long_avg = abs(float(i['ep']))
  400. if i['ps'] == 'SHORT':
  401. short_pos += abs(float(i['pa']))
  402. short_avg = abs(float(i['ep']))
  403. if is_update:
  404. pos = model.Position()
  405. pos.longPos = long_pos
  406. pos.longAvg = long_avg
  407. pos.shortPos = short_pos
  408. pos.shortAvg = short_avg
  409. self.callback['onPosition'](pos)
  410. self.private_update_time = time.time()
  411. def _get_data(self):
  412. market_data = self.depth + [self.max_buy, self.min_sell]
  413. self.max_buy = 0.0
  414. self.min_sell = 0.0
  415. self.buy_v = 0.0
  416. self.buy_q = 0.0
  417. self.sell_v = 0.0
  418. self.sell_q = 0.0
  419. return {'name': self.name,'data':market_data}
  420. async def get_depth_flash(self):
  421. headers = {}
  422. headers['Content-Type'] = 'application/json'
  423. url = f'https://fapi.binance.com/fapi/v1/depth?symbol={self.symbol}&limit=1000'
  424. session = aiohttp.ClientSession()
  425. response = await session.get(
  426. url,
  427. headers=headers,
  428. timeout=5,
  429. proxy=self.proxy
  430. )
  431. depth_flash = await response.text()
  432. await session.close()
  433. return ujson.loads(depth_flash)
  434. async def go(self):
  435. interval = float(self.params.interval)
  436. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  437. ### onTrade
  438. while 1:
  439. try:
  440. # 更新市场信息
  441. market_data = self._get_data()
  442. self.callback['onMarket'](market_data)
  443. except:
  444. traceback.print_exc()
  445. await asyncio.sleep(interval)
  446. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  447. while True:
  448. try:
  449. # 重置更新时间
  450. self.public_update_time = time.time()
  451. self.private_update_time = time.time()
  452. # 尝试连接
  453. print(f'{self.name} 尝试连接ws')
  454. # 登陆
  455. if is_auth:
  456. listenKey = await self.get_sign()
  457. listenKeyTime = time.time()
  458. else:
  459. listenKey = 'qqlh'
  460. ws_url = self.URL+listenKey
  461. async with aiohttp.ClientSession(
  462. connector = aiohttp.TCPConnector(
  463. limit=50,
  464. keepalive_timeout=120,
  465. verify_ssl=False,
  466. local_addr=(self.ip,0)
  467. )
  468. ).ws_connect(
  469. ws_url,
  470. proxy=self.proxy,
  471. timeout=30,
  472. receive_timeout=30,
  473. ) as _ws:
  474. print(f'{self.name} ws连接成功')
  475. self.logger.debug(f'{self.name} ws连接成功')
  476. # 订阅
  477. symbol = self.symbol.lower()
  478. if sub_fast:
  479. channels=[
  480. f"{symbol}@bookTicker",
  481. ]
  482. else:
  483. channels=[
  484. # f"{symbol}@depth@100ms",
  485. f"{symbol}@depth20@100ms",
  486. ]
  487. if sub_trade:
  488. channels.append(f"{symbol}@aggTrade")
  489. sub_str = ujson.dumps({"method": "SUBSCRIBE", "params": channels, "id":random.randint(1,1000)})
  490. await _ws.send_str(sub_str)
  491. self.need_flash = 1
  492. while True:
  493. # 停机信号
  494. if self.stop_flag:
  495. await _ws.close()
  496. return
  497. # 接受消息
  498. try:
  499. msg = await _ws.receive(timeout=30)
  500. except:
  501. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  502. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  503. break
  504. msg = msg.data
  505. # 处理消息
  506. # if 'depthUpdate' in msg:self._update_depth(msg)
  507. if 'depthUpdate' in msg:self._update_depth20(msg)
  508. elif 'bookTicker' in msg:self._update_ticker(msg)
  509. elif 'aggTrade' in msg:self._update_trade(msg)
  510. elif 'ACCOUNT_UPDATE' in msg:
  511. self._update_position(msg)
  512. self._update_account(msg)
  513. elif 'ORDER_TRADE_UPDATE' in msg:self._update_order(msg)
  514. elif 'ping' in msg:await _ws.send_str('pong')
  515. elif 'listenKeyExpired' in msg:raise Exception('key过期重连')
  516. # 续期listenkey
  517. if is_auth:
  518. if time.time() - listenKeyTime > 60*15: # 每15分钟续一次
  519. print('续期listenKey')
  520. listenKeyTime = time.time()
  521. await self.long_key()
  522. if time.time() - self.private_update_time > self.expired_time*5:
  523. raise Exception('长期未更新私有信息重连')
  524. if time.time() - self.public_update_time > self.expired_time:
  525. raise Exception('长期未更新公有信息重连')
  526. # if self.need_flash:
  527. # depth_flash = await self.get_depth_flash()
  528. # self.lastUpdateId = depth_flash['lastUpdateId']
  529. # print(f'更新绝对深度 {self.lastUpdateId}')
  530. # # 检查已有更新中是否包含
  531. # self.depth_full['bids'] = dict()
  532. # self.depth_full['asks'] = dict()
  533. # for i in depth_flash['bids']:self.depth_full['bids'][float(i[0])] = float(i[1])
  534. # for i in depth_flash['asks']:self.depth_full['asks'][float(i[0])] = float(i[1])
  535. # self.need_flash = 0
  536. except:
  537. traceback.print_exc()
  538. print(f'{self.name} ws连接失败 开始重连...')
  539. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  540. self.logger.error(traceback.format_exc())
  541. await asyncio.sleep(1)