ftx_spot_ws.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import json, ujson
  5. import zlib
  6. import hashlib
  7. import hmac
  8. import base64
  9. import traceback
  10. import random, csv, sys
  11. import logging, logging.handlers
  12. from itertools import zip_longest
  13. from datetime import datetime
  14. import urllib
  15. import utils
  16. import model
  17. from collections import defaultdict, deque
  18. def empty_call(msg):
  19. pass
  20. ZERO = 1e-8
  21. class FtxSpotWs:
  22. """"""
  23. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  24. if colo:
  25. print('不支持colo高速线路')
  26. self.URL = 'wss://ftx.com/ws/'
  27. else:
  28. self.URL = 'wss://ftx.com/ws/'
  29. self.params = params
  30. self.name = self.params.name
  31. #
  32. self.base = params.pair.split('_')[0].upper()
  33. self.quote = params.pair.split('_')[1].upper()
  34. if self.quote == "USDT":
  35. self.quote = "USD"
  36. self.symbol = f"{self.base}/{self.quote}"
  37. #print(self.symbol)
  38. #
  39. self.data = dict()
  40. self.data['trade'] = []
  41. self.callback = {
  42. "onMarket":self.save_market,
  43. "onDepth":empty_call,
  44. "onPosition":empty_call,
  45. "onEquity":empty_call,
  46. "onOrder":empty_call,
  47. "onTicker":empty_call,
  48. "onDepth":empty_call,
  49. "onExit":empty_call,
  50. }
  51. self.depth_update = []
  52. self.need_flash = 1
  53. self.updata_u = None
  54. self.last_update_id = None
  55. self.depth = dict()
  56. self.depth['bids'] = dict()
  57. self.depth['asks'] = dict()
  58. self.is_print = is_print
  59. self.proxy = None
  60. if 'win' in sys.platform:
  61. self.proxy = self.params.proxy
  62. self.logger = self.get_logger()
  63. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  64. self.stop_flag = 0
  65. self.public_update_time = time.time()
  66. self.private_update_time = time.time()
  67. self.expired_time = 300
  68. self.max_buy = 0.0
  69. self.min_sell = 0.0
  70. self.buy_v = 0.0
  71. self.buy_q = 0.0
  72. self.sell_v = 0.0
  73. self.sell_q = 0.0
  74. self.stepSize = None
  75. self.tickSize = None
  76. self.ctVal = None # 合约乘数
  77. self.ctMult = None # 合约面值
  78. self.depth = []
  79. self._reset_orderbook()
  80. #### 指定发包ip
  81. iplist = utils.get_local_ip_list()
  82. self.ip = iplist[int(self.params.ip)]
  83. def _reset_orderbook(self) -> None:
  84. self._orderbook_timestamp = 0
  85. self._orderbook = {side: defaultdict(float) for side in ['bids','asks']}
  86. def save_market(self, msg):
  87. print(msg)
  88. #pass
  89. #date = time.strftime('%Y-%m-%d',time.localtime())
  90. #interval = self.params.interval
  91. #if msg:
  92. # exchange = msg['name']
  93. # if len(msg['data']) > 1:
  94. # with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  95. # 'a',
  96. # newline='',
  97. # encoding='utf-8') as f:
  98. # writer = csv.writer(f, delimiter=',')
  99. # writer.writerow(msg['data'])
  100. # if self.is_print:print(f'写入行情 {self.symbol}')
  101. def _get_data(self):
  102. market_data = self.depth + [self.max_buy, self.min_sell]
  103. self.max_buy = 0.0
  104. self.min_sell = 0.0
  105. self.buy_v = 0.0
  106. self.buy_q = 0.0
  107. self.sell_v = 0.0
  108. self.sell_q = 0.0
  109. return {'name': self.name,'data':market_data}
  110. async def go(self):
  111. interval = float(self.params.interval)
  112. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  113. ### onTrade
  114. while 1:
  115. try:
  116. # 更新市场信息
  117. market_data = self._get_data()
  118. self.callback['onMarket'](market_data)
  119. except:
  120. traceback.print_exc()
  121. await asyncio.sleep(interval)
  122. def subscribe_public(self, sub_trade=0):
  123. channels = [
  124. "orderbook",
  125. # "ticker"
  126. ]
  127. if sub_trade:
  128. channels.append("trades")
  129. subs = [ujson.dumps({'op':'subscribe','market':self.symbol, 'channel':channel}) for channel in channels]
  130. return subs
  131. async def run_public(self, sub_trade=0):
  132. """"""
  133. while 1:
  134. try:
  135. self.public_update_time = time.time()
  136. print(f"{self.name} public 尝试连接ws")
  137. ws_url = self.URL
  138. async with aiohttp.ClientSession(
  139. connector = aiohttp.TCPConnector(
  140. limit=50,
  141. keepalive_timeout=120,
  142. verify_ssl=False,
  143. local_addr=(self.ip,0)
  144. )
  145. ).ws_connect(
  146. ws_url,
  147. proxy=self.proxy,
  148. timeout=30,
  149. receive_timeout=30,
  150. ) as _ws:
  151. print(f"{self.name} public ws连接成功")
  152. self.logger.debug(f"{self.name} public ws连接成功")
  153. for sub in self.subscribe_public(sub_trade):
  154. await _ws.send_str(sub)
  155. while True:
  156. # 停机信号
  157. if self.stop_flag:
  158. await _ws.close()
  159. return
  160. # 接受消息
  161. try:
  162. msg = await _ws.receive()
  163. except:
  164. print(f'{self.name} public ws长时间没有收到消息 准备重连...')
  165. self.logger.error(f'{self.name} public ws长时间没有收到消息 准备重连...')
  166. break
  167. msg = msg.data
  168. await self.on_message_public(_ws, msg)
  169. except:
  170. traceback.print_exc()
  171. print(f'{self.name} ws public 连接失败 开始重连...')
  172. self.logger.error(f'{self.name} ws public 连接失败 开始重连...')
  173. self.logger.error(traceback.format_exc())
  174. await asyncio.sleep(1)
  175. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  176. asyncio.create_task(self.run_public(sub_trade))
  177. while True:
  178. await asyncio.sleep(5)
  179. async def on_message_public(self, _ws, msg):
  180. """"""
  181. #print(msg)
  182. if "data" in msg:
  183. # 推送数据时,有data字段,优先级也最高
  184. if "ticker" in msg:
  185. self._update_ticker(msg)
  186. elif "trades" in msg:
  187. self._update_trade(msg)
  188. elif "orderbook" in msg:
  189. await self._update_depth(_ws, msg)
  190. elif "type" in msg:
  191. # event常见于事件回报,一般都可以忽略,只需要看看是否有error
  192. if "error" in msg:
  193. info = f'{self.name} on_message error! --> {msg}'
  194. print(info)
  195. self.logger.error(info)
  196. elif 'ping' in msg:
  197. await _ws.send_str('pong')
  198. else:
  199. print(msg)
  200. def _update_ticker(self, msg):
  201. """"""
  202. self.public_update_time = time.time()
  203. msg = ujson.loads(msg)
  204. ticker = msg['data']
  205. bp = float(ticker['bid']) if ticker['bid'] != 'null' else 0
  206. ap = float(ticker['ask']) if ticker['ask'] != 'null' else 0
  207. lp = float(ticker['last']) if ticker['last'] != 'null' else 0
  208. self.ticker_info["bp"] = bp
  209. self.ticker_info["ap"] = ap
  210. self.callback['onTicker'](self.ticker_info)
  211. def _update_trade(self, msg):
  212. """"""
  213. msg = ujson.loads(msg)
  214. for trade in msg['data']:
  215. price = float(trade['price'])
  216. amount = float(trade['size'])
  217. side = trade['side']
  218. if side == 'buy':
  219. self.buy_q += amount
  220. self.buy_v += amount*price
  221. elif side == 'sell':
  222. self.sell_q += amount
  223. self.sell_v += amount*price
  224. self.data['trade'].append([
  225. side,
  226. amount,
  227. price,
  228. ])
  229. self.public_update_time = time.time()
  230. #print(msg)
  231. async def _update_depth(self, _ws, msg):
  232. """"""
  233. msg = ujson.loads(msg)
  234. if msg['market']!=self.symbol:
  235. return
  236. depth = msg['data']
  237. action = msg['type']
  238. if action == 'partial':
  239. self._reset_orderbook()
  240. for side in {'bids', 'asks'}:
  241. book = self._orderbook[side]
  242. for price, size in depth[side]:
  243. if size:
  244. book[price] = size
  245. else:
  246. del book[price]
  247. self._orderbook_timestamp = depth['time']
  248. ob = self.get_orderbook()
  249. if self.compare_checksum(ob, depth):
  250. self.public_update_time = time.time()
  251. bp = self.depth[0]
  252. ap = self.depth[40]
  253. self.ticker_info["bp"] = bp
  254. self.ticker_info["ap"] = ap
  255. ##### 标准化深度
  256. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  257. step = mp * utils.EFF_RANGE / utils.LEVEL
  258. bp = []
  259. ap = []
  260. bv = [0 for _ in range(utils.LEVEL)]
  261. av = [0 for _ in range(utils.LEVEL)]
  262. for i in range(utils.LEVEL):
  263. bp.append(self.ticker_info["bp"]-step*i)
  264. for i in range(utils.LEVEL):
  265. ap.append(self.ticker_info["ap"]+step*i)
  266. #
  267. price_thre = self.ticker_info["bp"] - step
  268. index = 0
  269. for bid in ob['bids']:
  270. price = float(bid[0])
  271. amount = float(bid[1])
  272. if price > price_thre:
  273. bv[index] += amount
  274. else:
  275. price_thre -= step
  276. index += 1
  277. if index == utils.LEVEL:
  278. break
  279. bv[index] += amount
  280. price_thre = self.ticker_info["ap"] + step
  281. index = 0
  282. for ask in ob['asks']:
  283. price = float(ask[0])
  284. amount = float(ask[1])
  285. if price < price_thre:
  286. av[index] += amount
  287. else:
  288. price_thre += step
  289. index += 1
  290. if index == utils.LEVEL:
  291. break
  292. av[index] += amount
  293. self.depth = bp + bv + ap + av
  294. self.callback['onDepth']({'name':self.name,'data':self.depth})
  295. else:
  296. self._reset_orderbook()
  297. await self.resubscribe_depth(_ws)
  298. async def resubscribe_depth(self, _ws):
  299. info = f"{self.name} checksum not correct!"
  300. print(info)
  301. self.logger.info(info)
  302. sub_str = {'op':"unsubscribe",'market':self.symbol, 'channel':'orderbook'}
  303. await _ws.send_str(ujson.dumps(sub_str))
  304. await asyncio.sleep(1)
  305. sub_str['op'] = 'subscribe'
  306. await _ws.send_str(ujson.dumps(sub_str))
  307. def get_logger(self):
  308. logger = logging.getLogger(__name__)
  309. logger.setLevel(logging.DEBUG)
  310. # log to txt
  311. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  312. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  313. handler.setLevel(logging.DEBUG)
  314. handler.setFormatter(formatter)
  315. logger.addHandler(handler)
  316. return logger
  317. def get_orderbook(self):
  318. return {
  319. side: sorted(
  320. [(price, quantity) for price, quantity in list(self._orderbook[side].items())
  321. if quantity],
  322. key=lambda order: order[0] * (-1 if side == 'bids' else 1)
  323. )
  324. for side in {'bids', 'asks'}
  325. }
  326. @staticmethod
  327. def compare_checksum(ob, depth):
  328. """计算深度的校验和"""
  329. #t1 = time.time()
  330. checksum_data = [
  331. ':'.join([f'{float(order[0])}:{float(order[1])}' for order in (bid, offer) if order])
  332. for (bid, offer) in zip_longest(ob['bids'][:100], ob['asks'][:100])
  333. ]
  334. cm = int(zlib.crc32(':'.join(checksum_data).encode()))
  335. #t2 = time.time()
  336. #print(cm, depth['checksum'], (t2-t1)*1000)
  337. return cm==depth['checksum']