binance_spot_ws.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json
  6. import ujson
  7. import zlib
  8. import hashlib
  9. import hmac
  10. import base64
  11. import traceback
  12. import random, csv, sys, utils
  13. import logging, logging.handlers
  14. import model
  15. def empty_call(msg):
  16. pass
  17. def inflate(data):
  18. '''
  19. 解压缩数据
  20. '''
  21. decompress = zlib.decompressobj(-zlib.MAX_WBITS)
  22. inflated = decompress.decompress(data)
  23. inflated += decompress.flush()
  24. return inflated
  25. class BinanceSpotWs:
  26. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  27. if colo:
  28. print('不支持colo高速线路')
  29. self.URL = 'wss://stream.binance.com:9443/ws'
  30. else:
  31. self.URL = 'wss://stream.binance.com:9443/ws'
  32. self.params = params
  33. self.name = self.params.name
  34. self.base = self.params.pair.split('_')[0].upper()
  35. self.quote = self.params.pair.split('_')[1].upper()
  36. self.symbol = self.base + self.quote
  37. self.callback = {
  38. "onMarket":self.save_market,
  39. "onPosition":empty_call,
  40. "onEquity":empty_call,
  41. "onOrder":empty_call,
  42. "onTicker":empty_call,
  43. "onDepth":empty_call,
  44. "onExit":empty_call,
  45. }
  46. self.is_print = is_print
  47. self.proxy = None
  48. if 'win' in sys.platform:
  49. self.proxy = self.params.proxy
  50. self.logger = self.get_logger()
  51. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  52. self.stop_flag = 0
  53. self.public_update_time = time.time()
  54. self.private_update_time = time.time()
  55. self.expired_time = 300
  56. ### 更新id
  57. self.update_flag_e = 0
  58. self.update_flag_u = 0
  59. ###
  60. self.max_buy = 0.0
  61. self.min_sell = 0.0
  62. self.buy_v = 0.0
  63. self.buy_q = 0.0
  64. self.sell_v = 0.0
  65. self.sell_q = 0.0
  66. self.depth = []
  67. ####
  68. self.depth_update = []
  69. self.need_flash = 1
  70. self.lastUpdateId = None # 就是小写u
  71. self.depth_full = dict()
  72. self.depth_full['bids'] = dict()
  73. self.depth_full['asks'] = dict()
  74. #### 指定发包ip
  75. iplist = utils.get_local_ip_list()
  76. self.ip = iplist[int(self.params.ip)]
  77. def get_logger(self):
  78. logger = logging.getLogger(__name__)
  79. logger.setLevel(logging.DEBUG)
  80. # log to txt
  81. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  82. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  83. handler.setLevel(logging.DEBUG)
  84. handler.setFormatter(formatter)
  85. logger.addHandler(handler)
  86. return logger
  87. def save_market(self, msg):
  88. date = time.strftime('%Y-%m-%d',time.localtime())
  89. interval = self.params.interval
  90. if msg:
  91. exchange = msg['name']
  92. if len(msg['data']) > 1:
  93. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  94. 'a',
  95. newline='',
  96. encoding='utf-8') as f:
  97. writer = csv.writer(f, delimiter=',')
  98. writer.writerow(msg['data'])
  99. if self.is_print:print(f'写入行情 {self.symbol}')
  100. async def get_sign(self):
  101. headers = {}
  102. headers['Content-Type'] = 'application/json'
  103. headers['X-MBX-APIKEY'] = self.params.access_key
  104. url = 'https://api.binance.com/api/v3/userDataStream'
  105. session = aiohttp.ClientSession()
  106. response = await session.post(
  107. url,
  108. params=None,
  109. headers=headers,
  110. timeout=10,
  111. proxy=self.proxy
  112. )
  113. self.logger.debug("申请key")
  114. login_str = await response.text()
  115. self.logger.debug(login_str)
  116. await session.close()
  117. return ujson.loads(login_str)['listenKey']
  118. async def long_key(self,listenKey):
  119. headers = {}
  120. headers['Content-Type'] = 'application/json'
  121. headers['X-MBX-APIKEY'] = self.params.access_key
  122. params = {
  123. 'listenKey':listenKey,
  124. }
  125. url = 'https://api.binance.com/api/v3/userDataStream'
  126. session = aiohttp.ClientSession()
  127. response = await session.put(
  128. url,
  129. params=params,
  130. headers=headers,
  131. timeout=5,
  132. proxy=self.proxy
  133. )
  134. self.logger.debug("续期key")
  135. login_str = await response.text()
  136. self.logger.debug(login_str)
  137. await session.close()
  138. return ujson.loads(login_str)
  139. def _check_update_e(self, id):
  140. if id > self.update_flag_e:
  141. self.update_flag_e = id
  142. return 0
  143. else:
  144. return 1
  145. def _check_update_u(self, id):
  146. if id > self.update_flag_u:
  147. self.update_flag_u = id
  148. return 0
  149. else:
  150. return 1
  151. # @timeit
  152. def _update_depth20(self, msg):
  153. self.public_update_time = time.time()
  154. msg = ujson.loads(msg)
  155. if self._check_update_u(msg['lastUpdateId']):
  156. return
  157. else:
  158. # 更新ticker信息但不触发
  159. self.ticker_info["bp"] = float(msg['bids'][0][0])
  160. self.ticker_info["ap"] = float(msg['asks'][0][0])
  161. self.callback['onTicker'](self.ticker_info)
  162. ##### 标准化深度
  163. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  164. step = mp * utils.EFF_RANGE / utils.LEVEL
  165. bp = []
  166. ap = []
  167. bv = [0 for _ in range(utils.LEVEL)]
  168. av = [0 for _ in range(utils.LEVEL)]
  169. for i in range(utils.LEVEL):
  170. bp.append(self.ticker_info["bp"]-step*i)
  171. for i in range(utils.LEVEL):
  172. ap.append(self.ticker_info["ap"]+step*i)
  173. #
  174. price_thre = self.ticker_info["bp"] - step
  175. index = 0
  176. for i in msg['bids']:
  177. price = float(i[0])
  178. amount = float(i[1])
  179. if price > price_thre:
  180. bv[index] += amount
  181. else:
  182. price_thre -= step
  183. index += 1
  184. if index == utils.LEVEL:
  185. break
  186. bv[index] += amount
  187. price_thre = self.ticker_info["ap"] + step
  188. index = 0
  189. for i in msg['asks']:
  190. price = float(i[0])
  191. amount = float(i[1])
  192. if price < price_thre:
  193. av[index] += amount
  194. else:
  195. price_thre += step
  196. index += 1
  197. if index == utils.LEVEL:
  198. break
  199. av[index] += amount
  200. self.depth = bp + bv + ap + av
  201. self.callback['onDepth']({'name':self.name,'data':self.depth})
  202. def _update_depth(self, msg):
  203. self.public_update_time = time.time()
  204. msg = ujson.loads(msg)
  205. self.depth_update.append(msg)
  206. if self.need_flash == 0: # 可以更新深度
  207. for i in self.depth_update[:]:
  208. u = i['u']
  209. U = i['U']
  210. # print(f'处理 {u}')
  211. if u < self.lastUpdateId: # 丢弃过旧的信息
  212. self.depth_update.remove(i)
  213. else:
  214. if u >= self.lastUpdateId+1 and U <= self.lastUpdateId+1: # 后续更新本地副本
  215. if U != self.lastUpdateId + 1:
  216. self.need_flash = 1
  217. self.logger.error('发现遗漏增量深度推送 重置绝对深度')
  218. return
  219. # print(f'符合要求 {u}')
  220. # 开始更新深度
  221. for j in i['b']:
  222. price = float(j[0])
  223. amount = float(j[1])
  224. if amount > 0:
  225. self.depth_full['bids'][price] = amount
  226. else:
  227. if price in self.depth_full['bids']:del(self.depth_full['bids'][price])
  228. for j in i['a']:
  229. price = float(j[0])
  230. amount = float(j[1])
  231. if amount > 0:
  232. self.depth_full['asks'][price] = amount
  233. else:
  234. if price in self.depth_full['asks']:del(self.depth_full['asks'][price])
  235. self.depth_update.remove(i)
  236. self.lastUpdateId = u
  237. else:
  238. self.logger.error('增量深度不满足文档要求的条件')
  239. buyP = list(self.depth_full['bids'].keys())
  240. buyP.sort(reverse=True) # 从大到小
  241. sellP = list(self.depth_full['asks'].keys())
  242. sellP.sort(reverse=False) # 从小到大
  243. # update ticker
  244. self.ticker_info["bp"] = float(buyP[0])
  245. self.ticker_info["ap"] = float(sellP[0])
  246. self.callback['onTicker'](self.ticker_info)
  247. if self.ticker_info["bp"] > self.ticker_info["ap"]:
  248. self.need_flash = 1
  249. ##### normalized depth
  250. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  251. step = mp * utils.EFF_RANGE / utils.LEVEL
  252. bp = []
  253. ap = []
  254. bv = [0 for _ in range(utils.LEVEL)]
  255. av = [0 for _ in range(utils.LEVEL)]
  256. for i in range(utils.LEVEL):
  257. bp.append(self.ticker_info["bp"]-step*i)
  258. for i in range(utils.LEVEL):
  259. ap.append(self.ticker_info["ap"]+step*i)
  260. #
  261. price_thre = self.ticker_info["bp"] - step
  262. index = 0
  263. for price in buyP:
  264. if price > price_thre:
  265. bv[index] += self.depth_full['bids'][price]
  266. else:
  267. price_thre -= step
  268. index += 1
  269. if index == utils.LEVEL:
  270. break
  271. bv[index] += self.depth_full['bids'][price]
  272. price_thre = self.ticker_info["ap"] + step
  273. index = 0
  274. for price in sellP:
  275. if price < price_thre:
  276. av[index] += self.depth_full['asks'][price]
  277. else:
  278. price_thre += step
  279. index += 1
  280. if index == utils.LEVEL:
  281. break
  282. av[index] += self.depth_full['asks'][price]
  283. self.depth = bp + bv + ap + av
  284. self.callback['onDepth']({'name':self.name,'data':self.depth})
  285. def _update_ticker(self, msg):
  286. self.public_update_time = time.time()
  287. msg = ujson.loads(msg)
  288. if self._check_update_u(msg['u']):
  289. return
  290. else:
  291. bp = float(msg['b'])
  292. bq = float(msg['B'])
  293. ap = float(msg['a'])
  294. aq = float(msg['A'])
  295. self.ticker_info['bp'] = bp
  296. self.ticker_info['ap'] = ap
  297. self.callback['onTicker'](self.ticker_info)
  298. #### 标准化深度
  299. self.depth = [bp,bq,ap,aq]
  300. self.callback['onDepth']({'name':self.name,'data':self.depth})
  301. def _update_trade(self, msg):
  302. '''
  303. binance spot 无法和depth比对时间戳 放弃修正depth
  304. '''
  305. self.public_update_time = time.time()
  306. msg = ujson.loads(msg)
  307. price = float(msg['p'])
  308. amount = float(msg['q'])
  309. side = 'sell' if msg['m'] else 'buy'
  310. if price > self.max_buy or self.max_buy == 0.0:
  311. self.max_buy = price
  312. if price < self.min_sell or self.min_sell == 0.0:
  313. self.min_sell = price
  314. if side == 'buy':
  315. self.buy_q += amount
  316. self.buy_v += amount*price
  317. elif side == 'sell':
  318. self.sell_q += amount
  319. self.sell_v += amount*price
  320. #### 修正ticker ####
  321. # side = 'sell' if msg['m'] else 'buy'
  322. # if side == 'buy' and price > self.ticker_info['ap']:
  323. # self.ticker_info['ap'] = price
  324. # self.callback['onTicker'](self.ticker_info)
  325. # if side == 'sell' and price < self.ticker_info['bp']:
  326. # self.ticker_info['bp'] = price
  327. # self.callback['onTicker'](self.ticker_info)
  328. def _update_account(self, msg):
  329. msg = ujson.loads(msg)
  330. for i in msg['B']:
  331. if i['a'] == self.base:
  332. coin = float(i['f'])+float(i['l'])
  333. self.callback['onEquity'] = {
  334. self.base:coin
  335. }
  336. if i['a'] == self.quote:
  337. cash = float(i['f'])+float(i['l'])
  338. self.callback['onEquity'] = {
  339. self.quote:cash
  340. }
  341. self.private_update_time = time.time()
  342. def _update_order(self, msg):
  343. '''将ws收到的订单信息触发quant'''
  344. msg = ujson.loads(msg)
  345. self.logger.debug(f"ws订单推送 {msg}")
  346. data = msg
  347. if self.symbol in data['s']:
  348. order_event = dict()
  349. status = data['X']
  350. if status == "NEW": # 新增
  351. local_status = "NEW"
  352. elif status in ["CANCELED", "FILLED", "EXPIRED"]: # 删除
  353. local_status = "REMOVE"
  354. elif status in ["PARTIALLY_FILLED"]: # 忽略
  355. return
  356. else:
  357. print("未知订单状态",data)
  358. return
  359. order_event['status'] = local_status
  360. order_event['filled_price'] = float(data['Z'])/float(data['z']) if float(data['z']) > 0.0 else 0.0
  361. order_event['filled'] = float(data['z'])
  362. if data['C'] == '':
  363. cid = data['c']
  364. else:
  365. cid = data['C']
  366. order_event['client_id'] = cid
  367. order_event['order_id'] = data['i']
  368. order_event['fee'] = float(data['n'])
  369. self.callback['onOrder'](order_event)
  370. self.private_update_time = time.time()
  371. def _get_data(self):
  372. market_data = self.depth + [self.max_buy, self.min_sell]
  373. self.max_buy = 0.0
  374. self.min_sell = 0.0
  375. self.buy_v = 0.0
  376. self.buy_q = 0.0
  377. self.sell_v = 0.0
  378. self.sell_q = 0.0
  379. return {'name': self.name,'data':market_data}
  380. async def get_depth_flash(self):
  381. headers = {}
  382. headers['Content-Type'] = 'application/json'
  383. url = f'https://api.binance.com/api/v3/depth?symbol={self.symbol}&limit=1000'
  384. session = aiohttp.ClientSession()
  385. response = await session.get(
  386. url,
  387. headers=headers,
  388. timeout=5,
  389. proxy=self.proxy
  390. )
  391. depth_flash = await response.text()
  392. await session.close()
  393. return ujson.loads(depth_flash)
  394. async def go(self):
  395. interval = float(self.params.interval)
  396. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  397. ### onTrade
  398. while 1:
  399. try:
  400. if self.stop_flag == 1:
  401. return
  402. # 更新市场信息
  403. market_data = self._get_data()
  404. self.callback['onMarket'](market_data)
  405. except:
  406. traceback.print_exc()
  407. await asyncio.sleep(interval)
  408. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  409. while True:
  410. try:
  411. # 重置更新时间
  412. self.public_update_time = time.time()
  413. self.private_update_time = time.time()
  414. # 尝试连接
  415. print(f'{self.name} 尝试连接ws')
  416. # 登陆
  417. ws_url = self.URL
  418. if is_auth:
  419. listenKey = await self.get_sign()
  420. listenKeyTime = time.time()
  421. ws_url += '/'+listenKey
  422. async with aiohttp.ClientSession(
  423. connector = aiohttp.TCPConnector(
  424. limit=50,
  425. keepalive_timeout=120,
  426. verify_ssl=False,
  427. local_addr=(self.ip,0)
  428. )
  429. ).ws_connect(
  430. ws_url,
  431. proxy=self.proxy,
  432. timeout=30,
  433. receive_timeout=30,
  434. ) as _ws:
  435. print(f'{self.name} ws连接成功')
  436. self.logger.info(f'{self.name} ws连接成功')
  437. # 订阅 币安 现货 bbo没有事件标记 无法区分
  438. symbol = self.symbol.lower()
  439. if sub_fast:
  440. channels=[f"{symbol}@bookTicker",]
  441. else:
  442. channels=[
  443. # f"{symbol}@depth@100ms",
  444. f"{symbol}@depth20@100ms",
  445. ]
  446. if sub_trade:
  447. channels.append(f"{symbol}@aggTrade")
  448. sub_str = ujson.dumps({"method": "SUBSCRIBE", "params": channels, "id":random.randint(1,1000)})
  449. await _ws.send_str(sub_str)
  450. self.need_flash = 1
  451. while True:
  452. # 停机信号
  453. if self.stop_flag:
  454. await _ws.close()
  455. return
  456. # 接受消息
  457. try:
  458. msg = await _ws.receive(timeout=10)
  459. except:
  460. print(f'{self.name} ws长时间没有收到消息 准备重连...')
  461. self.logger.error(f'{self.name} ws长时间没有收到消息 准备重连...')
  462. break
  463. msg = msg.data
  464. # 处理消息
  465. # if 'depthUpdate' in msg:self._update_depth(msg)
  466. if 'lastUpdateId' in msg:self._update_depth20(msg)
  467. elif 'aggTrade' in msg:self._update_trade(msg)
  468. elif 'A' in msg and 'B' in msg and 'e' not in msg:self._update_ticker(msg)
  469. elif 'outboundAccountPosition' in msg:self._update_account(msg)
  470. elif 'executionReport' in msg:self._update_order(msg)
  471. elif 'ping' in msg:await _ws.send_str('pong')
  472. elif 'listenKeyExpired' in msg or 'expired' in str(msg).lower():
  473. raise Exception('key过期重连')
  474. if is_auth:
  475. if time.time() - listenKeyTime > 60*15: # 每15分钟续一次
  476. print('续期listenKey')
  477. await self.long_key(listenKey)
  478. listenKeyTime = time.time()
  479. if time.time() - self.private_update_time > self.expired_time*5:
  480. raise Exception('长期未更新私有信息重连')
  481. if time.time() - self.public_update_time > self.expired_time:
  482. raise Exception('长期未更新公有信息重连')
  483. # if self.need_flash:
  484. # print('rest获取绝对深度')
  485. # depth_flash = await self.get_depth_flash()
  486. # self.lastUpdateId = depth_flash['lastUpdateId']
  487. # # 检查已有更新中是否包含
  488. # self.depth_full['bids'] = dict()
  489. # self.depth_full['asks'] = dict()
  490. # for i in depth_flash['bids']:self.depth_full['bids'][float(i[0])] = float(i[1])
  491. # for i in depth_flash['asks']:self.depth_full['asks'][float(i[0])] = float(i[1])
  492. # self.need_flash = 0
  493. except:
  494. _ws = None
  495. traceback.print_exc()
  496. print(f'{self.name} ws连接失败 开始重连...')
  497. self.logger.error(f'{self.name} ws连接失败 开始重连...')
  498. # await asyncio.sleep(1)