ftx_usdt_swap_ws.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import json, ujson
  5. import zlib
  6. import hashlib
  7. import hmac
  8. import base64
  9. import traceback
  10. import random, csv, sys
  11. import logging, logging.handlers
  12. from itertools import zip_longest
  13. from datetime import datetime
  14. import urllib
  15. import utils
  16. import model
  17. from collections import defaultdict, deque
  18. def empty_call(msg):
  19. pass
  20. ZERO = 1e-8
  21. class FtxUsdtSwapWs:
  22. """"""
  23. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  24. if colo:
  25. print('不支持colo高速线路')
  26. self.URL = 'wss://ftx.com/ws/'
  27. else:
  28. self.URL = 'wss://ftx.com/ws/'
  29. self.params = params
  30. self.name = self.params.name
  31. #
  32. self.base = params.pair.split('_')[0].upper()
  33. self.quote = params.pair.split('_')[1].upper()
  34. self.symbol = f"{self.base}-PERP"
  35. #
  36. self.data = dict()
  37. self.data['trade'] = []
  38. self.data['force'] = []
  39. self.callback = {
  40. "onMarket":self.save_market,
  41. "onDepth":empty_call,
  42. "onPosition":empty_call,
  43. "onEquity":empty_call,
  44. "onOrder":empty_call,
  45. "onTicker":empty_call,
  46. "onDepth":empty_call,
  47. "onExit":empty_call,
  48. }
  49. self.depth_update = []
  50. self.need_flash = 1
  51. self.updata_u = None
  52. self.last_update_id = None
  53. self.depth = dict()
  54. self.depth['bids'] = dict()
  55. self.depth['asks'] = dict()
  56. self.is_print = is_print
  57. self.proxy = None
  58. if 'win' in sys.platform:
  59. self.proxy = self.params.proxy
  60. self.logger = self.get_logger()
  61. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  62. self.stop_flag = 0
  63. self.public_update_time = time.time()
  64. self.private_update_time = time.time()
  65. self.expired_time = 300
  66. self.max_buy = 0.0
  67. self.min_sell = 0.0
  68. self.buy_v = 0.0
  69. self.buy_q = 0.0
  70. self.sell_v = 0.0
  71. self.sell_q = 0.0
  72. self.stepSize = None
  73. self.tickSize = None
  74. self.ctVal = None # 合约乘数
  75. self.ctMult = None # 合约面值
  76. self.depth = []
  77. self._reset_orderbook()
  78. #### 指定发包ip
  79. iplist = utils.get_local_ip_list()
  80. self.ip = iplist[int(self.params.ip)]
  81. def _reset_orderbook(self) -> None:
  82. self._orderbook_timestamp = 0
  83. self._orderbook = {side: defaultdict(float) for side in ['bids','asks']}
  84. def save_market(self, msg):
  85. print(msg)
  86. #pass
  87. #date = time.strftime('%Y-%m-%d',time.localtime())
  88. #interval = self.params.interval
  89. #if msg:
  90. # exchange = msg['name']
  91. # if len(msg['data']) > 1:
  92. # with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  93. # 'a',
  94. # newline='',
  95. # encoding='utf-8') as f:
  96. # writer = csv.writer(f, delimiter=',')
  97. # writer.writerow(msg['data'])
  98. # if self.is_print:print(f'写入行情 {self.symbol}')
  99. def _get_data(self):
  100. market_data = self.depth + [self.max_buy, self.min_sell]
  101. self.max_buy = 0.0
  102. self.min_sell = 0.0
  103. self.buy_v = 0.0
  104. self.buy_q = 0.0
  105. self.sell_v = 0.0
  106. self.sell_q = 0.0
  107. return {'name': self.name,'data':market_data}
  108. async def go(self):
  109. interval = float(self.params.interval)
  110. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  111. ### onTrade
  112. while 1:
  113. try:
  114. # 更新市场信息
  115. market_data = self._get_data()
  116. self.callback['onMarket'](market_data)
  117. except:
  118. traceback.print_exc()
  119. await asyncio.sleep(interval)
  120. def subscribe_public(self, sub_trade = 0):
  121. channels = [
  122. "orderbook",
  123. # "ticker"
  124. ]
  125. if sub_trade:
  126. channels.append("trades")
  127. subs = [ujson.dumps({'op':'subscribe','market':self.symbol, 'channel':channel}) for channel in channels]
  128. return subs
  129. async def run_public(self, sub_trade=0):
  130. """"""
  131. while 1:
  132. try:
  133. self.public_update_time = time.time()
  134. print(f"{self.name} public 尝试连接ws")
  135. ws_url = self.URL
  136. async with aiohttp.ClientSession(
  137. connector = aiohttp.TCPConnector(
  138. limit=50,
  139. keepalive_timeout=120,
  140. verify_ssl=False,
  141. local_addr=(self.ip,0)
  142. )
  143. ).ws_connect(
  144. ws_url,
  145. proxy=self.proxy,
  146. timeout=30,
  147. receive_timeout=30,
  148. ) as _ws:
  149. print(f"{self.name} public ws连接成功")
  150. self.logger.debug(f"{self.name} public ws连接成功")
  151. for sub in self.subscribe_public(sub_trade):
  152. await _ws.send_str(sub)
  153. while True:
  154. # 停机信号
  155. if self.stop_flag:
  156. await _ws.close()
  157. return
  158. # 接受消息
  159. try:
  160. msg = await _ws.receive()
  161. except:
  162. print(f'{self.name} public ws长时间没有收到消息 准备重连...')
  163. self.logger.error(f'{self.name} public ws长时间没有收到消息 准备重连...')
  164. break
  165. msg = msg.data
  166. await self.on_message_public(_ws, msg)
  167. except:
  168. traceback.print_exc()
  169. print(f'{self.name} ws public 连接失败 开始重连...')
  170. self.logger.error(f'{self.name} ws public 连接失败 开始重连...')
  171. self.logger.error(traceback.format_exc())
  172. await asyncio.sleep(1)
  173. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  174. asyncio.create_task(self.run_public(sub_trade))
  175. while True:
  176. await asyncio.sleep(5)
  177. async def on_message_public(self, _ws, msg):
  178. """"""
  179. #print(msg)
  180. if "data" in msg:
  181. # 推送数据时,有data字段,优先级也最高
  182. if "ticker" in msg:
  183. self._update_ticker(msg)
  184. elif "trades" in msg:
  185. self._update_trade(msg)
  186. elif "orderbook" in msg:
  187. await self._update_depth(_ws, msg)
  188. elif "type" in msg:
  189. # event常见于事件回报,一般都可以忽略,只需要看看是否有error
  190. if "error" in msg:
  191. info = f'{self.name} on_message error! --> {msg}'
  192. print(info)
  193. self.logger.error(info)
  194. elif 'ping' in msg:
  195. await _ws.send_str('pong')
  196. else:
  197. print(msg)
  198. def _update_ticker(self, msg):
  199. """"""
  200. self.public_update_time = time.time()
  201. msg = ujson.loads(msg)
  202. ticker = msg['data']
  203. bp = float(ticker['bid']) if ticker['bid'] != 'null' else 0
  204. ap = float(ticker['ask']) if ticker['ask'] != 'null' else 0
  205. self.ticker_info["bp"] = bp
  206. self.ticker_info["ap"] = ap
  207. self.callback['onTicker'](self.ticker_info)
  208. def _update_trade(self, msg):
  209. """"""
  210. self.public_update_time = time.time()
  211. msg = ujson.loads(msg)
  212. for trade in msg['data']:
  213. price = float(trade['price'])
  214. amount = float(trade['size'])
  215. side = trade['side']
  216. if price > self.max_buy or self.max_buy == 0.0:
  217. self.max_buy = price
  218. if price < self.min_sell or self.min_sell == 0.0:
  219. self.min_sell = price
  220. if side == 'buy':
  221. self.buy_q += amount
  222. self.buy_v += amount*price
  223. elif side == 'sell':
  224. self.sell_q += amount
  225. self.sell_v += amount*price
  226. async def _update_depth(self, _ws, msg):
  227. """"""
  228. msg = ujson.loads(msg)
  229. if msg['market']!=self.symbol:
  230. return
  231. depth = msg['data']
  232. action = msg['type']
  233. if action == 'partial':
  234. self._reset_orderbook()
  235. for side in {'bids', 'asks'}:
  236. book = self._orderbook[side]
  237. for price, size in depth[side]:
  238. if size:
  239. book[price] = size
  240. else:
  241. del book[price]
  242. self._orderbook_timestamp = depth['time']
  243. ob = self.get_orderbook()
  244. if self.compare_checksum(ob, depth):
  245. self.public_update_time = time.time()
  246. bp = float(ob['bids'][0][0])
  247. ap = float(ob['asks'][0][0])
  248. self.ticker_info["bp"] = bp
  249. self.ticker_info["ap"] = ap
  250. self.callback['onTicker'](self.ticker_info)
  251. ##### 标准化深度
  252. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  253. step = mp * utils.EFF_RANGE / utils.LEVEL
  254. bp = []
  255. ap = []
  256. bv = [0 for _ in range(utils.LEVEL)]
  257. av = [0 for _ in range(utils.LEVEL)]
  258. for i in range(utils.LEVEL):
  259. bp.append(self.ticker_info["bp"]-step*i)
  260. for i in range(utils.LEVEL):
  261. ap.append(self.ticker_info["ap"]+step*i)
  262. #
  263. price_thre = self.ticker_info["bp"] - step
  264. index = 0
  265. for bid in ob['bids']:
  266. price = float(bid[0])
  267. amount = float(bid[1])
  268. if price > price_thre:
  269. bv[index] += amount
  270. else:
  271. price_thre -= step
  272. index += 1
  273. if index == utils.LEVEL:
  274. break
  275. bv[index] += amount
  276. price_thre = self.ticker_info["ap"] + step
  277. index = 0
  278. for ask in ob['asks']:
  279. price = float(ask[0])
  280. amount = float(ask[1])
  281. if price < price_thre:
  282. av[index] += amount
  283. else:
  284. price_thre += step
  285. index += 1
  286. if index == utils.LEVEL:
  287. break
  288. av[index] += amount
  289. self.depth = bp + bv + ap + av
  290. self.callback['onDepth']({'name':self.name,'data':self.depth})
  291. else:
  292. self._reset_orderbook()
  293. await self.resubscribe_depth(_ws)
  294. async def resubscribe_depth(self, _ws):
  295. info = f"{self.name} checksum not correct!"
  296. print(info)
  297. self.logger.info(info)
  298. sub_str = {'op':"unsubscribe",'market':self.symbol, 'channel':'orderbook'}
  299. await _ws.send_str(ujson.dumps(sub_str))
  300. await asyncio.sleep(1)
  301. sub_str['op'] = 'subscribe'
  302. await _ws.send_str(ujson.dumps(sub_str))
  303. def get_logger(self):
  304. logger = logging.getLogger(__name__)
  305. logger.setLevel(logging.DEBUG)
  306. # log to txt
  307. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  308. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  309. handler.setLevel(logging.DEBUG)
  310. handler.setFormatter(formatter)
  311. logger.addHandler(handler)
  312. return logger
  313. def get_orderbook(self):
  314. return {
  315. side: sorted(
  316. [(price, quantity) for price, quantity in list(self._orderbook[side].items())
  317. if quantity],
  318. key=lambda order: order[0] * (-1 if side == 'bids' else 1)
  319. )
  320. for side in {'bids', 'asks'}
  321. }
  322. @staticmethod
  323. def compare_checksum(ob, depth):
  324. """计算深度的校验和"""
  325. #t1 = time.time()
  326. checksum_data = [
  327. ':'.join([f'{float(order[0])}:{float(order[1])}' for order in (bid, offer) if order])
  328. for (bid, offer) in zip_longest(ob['bids'][:100], ob['asks'][:100])
  329. ]
  330. cm = int(zlib.crc32(':'.join(checksum_data).encode()))
  331. #t2 = time.time()
  332. #print(cm, depth['checksum'], (t2-t1)*1000)
  333. return cm==depth['checksum']