okex_usdt_swap_ws.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import json, ujson
  5. from numpy import subtract
  6. import zlib
  7. import hashlib
  8. import hmac
  9. import base64
  10. import traceback
  11. import random, csv, sys
  12. import logging, logging.handlers
  13. from datetime import datetime
  14. import urllib
  15. import utils
  16. import model
  17. def timeit(func):
  18. def wrapper(*args, **kwargs):
  19. nowTime = time.time()
  20. res = func(*args, **kwargs)
  21. spend_time = time.time() - nowTime
  22. spend_time = round(spend_time * 1e6, 3)
  23. print(f'{func.__name__} 耗时 {spend_time} us')
  24. return res
  25. return wrapper
  26. # okex 必须要订阅限价频道 防止无法下单
  27. def empty_call(msg):
  28. pass
  29. # -- 增量维护本地深度工具 ----------------------
  30. def update_bids(depth, bids_p):
  31. bids_u = depth['bids']
  32. for i in bids_u:
  33. bid_price = i[0]
  34. for j in bids_p:
  35. if bid_price == j[0]:
  36. if i[1] == '0':
  37. bids_p.remove(j)
  38. break
  39. else:
  40. del j[1]
  41. j.insert(1, i[1])
  42. break
  43. else:
  44. if i[1] != "0":
  45. bids_p.append(i)
  46. else:
  47. bids_p.sort(key=lambda price: sort_num(price[0]), reverse=True)
  48. return bids_p
  49. def update_asks(depth, asks_p):
  50. asks_u = depth['asks']
  51. for i in asks_u:
  52. ask_price = i[0]
  53. for j in asks_p:
  54. if ask_price == j[0]:
  55. if i[1] == '0':
  56. asks_p.remove(j)
  57. break
  58. else:
  59. del j[1]
  60. j.insert(1, i[1])
  61. break
  62. else:
  63. if i[1] != "0":
  64. asks_p.append(i)
  65. else:
  66. asks_p.sort(key=lambda price: sort_num(price[0]))
  67. return asks_p
  68. def sort_num(n):
  69. if n.isdigit():
  70. return int(n)
  71. else:
  72. return float(n)
  73. # @timeit
  74. def check(bids, asks):
  75. # 获取bid档str
  76. bids_l = []
  77. bid_l = []
  78. count_bid = 1
  79. while count_bid <= 25:
  80. if count_bid > len(bids):
  81. break
  82. bids_l.append(bids[count_bid-1])
  83. count_bid += 1
  84. for j in bids_l:
  85. str_bid = ':'.join(j[0 : 2])
  86. bid_l.append(str_bid)
  87. # 获取ask档str
  88. asks_l = []
  89. ask_l = []
  90. count_ask = 1
  91. while count_ask <= 25:
  92. if count_ask > len(asks):
  93. break
  94. asks_l.append(asks[count_ask-1])
  95. count_ask += 1
  96. for k in asks_l:
  97. str_ask = ':'.join(k[0 : 2])
  98. ask_l.append(str_ask)
  99. # 拼接str
  100. num = ''
  101. if len(bid_l) == len(ask_l):
  102. for m in range(len(bid_l)):
  103. num += bid_l[m] + ':' + ask_l[m] + ':'
  104. elif len(bid_l) > len(ask_l):
  105. # bid档比ask档多
  106. for n in range(len(ask_l)):
  107. num += bid_l[n] + ':' + ask_l[n] + ':'
  108. for l in range(len(ask_l), len(bid_l)):
  109. num += bid_l[l] + ':'
  110. elif len(bid_l) < len(ask_l):
  111. # ask档比bid档多
  112. for n in range(len(bid_l)):
  113. num += bid_l[n] + ':' + ask_l[n] + ':'
  114. for l in range(len(bid_l), len(ask_l)):
  115. num += ask_l[l] + ':'
  116. new_num = num[:-1]
  117. int_checksum = zlib.crc32(new_num.encode())
  118. fina = change(int_checksum)
  119. return fina
  120. def change(num_old):
  121. num = pow(2, 31) - 1
  122. if num_old > num:
  123. out = num_old - num * 2 - 2
  124. else:
  125. out = num_old
  126. return out
  127. #--------------------------------------------------
  128. class OkexUsdtSwapWs:
  129. def __init__(self, params:model.ClientParams, colo=0, is_print=0):
  130. if colo:
  131. print('不支持colo高速线路 请修改hosts')
  132. #### hk
  133. self.URL_PUBLIC = 'wss://ws.okx.com:8443/ws/v5/public'
  134. self.URL_PRIVATE = 'wss://ws.okx.com:8443/ws/v5/private'
  135. self.REST = 'https://www.okx.com'
  136. #### aws
  137. # self.URL_PUBLIC = 'wss://wsaws.okx.com:8443/ws/v5/public'
  138. # self.URL_PRIVATE = 'wss://wsaws.okx.com:8443/ws/v5/private'
  139. # self.REST = 'https://aws.okx.com'
  140. else:
  141. #### hk
  142. self.URL_PUBLIC = 'wss://ws.okx.com:8443/ws/v5/public'
  143. self.URL_PRIVATE = 'wss://ws.okx.com:8443/ws/v5/private'
  144. self.REST = 'https://www.okx.com'
  145. #### aws
  146. # self.URL_PUBLIC = 'wss://wsaws.okx.com:8443/ws/v5/public'
  147. # self.URL_PRIVATE = 'wss://wsaws.okx.com:8443/ws/v5/private'
  148. # self.REST = 'https://aws.okx.com'
  149. self.params = params
  150. self.name = self.params.name
  151. self.base = params.pair.split('_')[0].upper()
  152. self.quote = params.pair.split('_')[1].upper()
  153. self.symbol = f"{self.base}-{self.quote}-SWAP"
  154. if len(self.params.pair.split('_')) > 2:
  155. self.delivery = self.params.pair.split('_')[2] # 210924
  156. self.symbol += f"-{self.delivery}"
  157. self.data = dict()
  158. self.data['trade'] = []
  159. self.data['force'] = []
  160. self.callback = {
  161. "onMarket":self.save_market,
  162. "onPosition":empty_call,
  163. "onEquity":empty_call,
  164. "onOrder":empty_call,
  165. "onTicker":empty_call,
  166. "onDepth":empty_call,
  167. "onExit":empty_call,
  168. }
  169. self.depth_update = []
  170. self.need_flash = 1
  171. self.updata_u = None
  172. self.last_update_id = None
  173. self.depth = []
  174. self.is_print = is_print
  175. self.proxy = None
  176. if 'win' in sys.platform:
  177. self.proxy = self.params.proxy
  178. self.logger = self.get_logger()
  179. self.ticker_info = {"name":self.name,'bp':0.0,'ap':0.0}
  180. self.stop_flag = 0
  181. self.public_update_time = time.time()
  182. self.private_update_time = time.time()
  183. self.expired_time = 300
  184. self.update_t = 0
  185. self.max_buy = 0.0
  186. self.min_sell = 0.0
  187. self.buy_v = 0.0
  188. self.buy_q = 0.0
  189. self.sell_v = 0.0
  190. self.sell_q = 0.0
  191. self.getheader = self.make_header()
  192. self.postheader = self.make_header()
  193. self._detail_ob = {} # 用于增量更新depth
  194. self.stepSize = None
  195. self.tickSize = None
  196. self.ctVal = None # 合约乘数
  197. self.ctMult = None # 合约面值
  198. self.depth = []
  199. self.sub_trade = 0
  200. self.sub_fast = 0
  201. #### 指定发包ip
  202. iplist = utils.get_local_ip_list()
  203. self.ip = iplist[int(self.params.ip)]
  204. def save_market(self, msg):
  205. date = time.strftime('%Y-%m-%d',time.localtime())
  206. interval = self.params.interval
  207. if msg:
  208. exchange = msg['name']
  209. if len(msg['data']) > 1:
  210. with open(f'./history/{exchange}_{self.symbol}_{interval}_{date}.csv',
  211. 'a',
  212. newline='',
  213. encoding='utf-8') as f:
  214. writer = csv.writer(f, delimiter=',')
  215. writer.writerow(msg['data'])
  216. if self.is_print:print(f'写入行情 {self.symbol}')
  217. # ------------------------------------------------------
  218. # -- core ----------------------------------------------
  219. async def get_instruments(self):
  220. """从rest获取合约信息"""
  221. params = {'instId': self.symbol, 'instType': 'SWAP'}
  222. session = aiohttp.ClientSession()
  223. response = await self.http_get_request_pub(session, '/api/v5/public/instruments', params)
  224. data = await response.text()
  225. await session.close()
  226. return ujson.loads(data)
  227. def update_instruments(self, data):
  228. """根据信息调整合约信息"""
  229. for info in data['data']:
  230. if info['instId'] == self.symbol:
  231. self.stepSize = sort_num(info['minSz'])
  232. self.tickSize = sort_num(info['tickSz'])
  233. self.ctVal = sort_num(info['ctVal'])
  234. self.ctMult = sort_num(info['ctMult'])
  235. return
  236. async def get_depth_flash(self):
  237. """rest获取深度信息"""
  238. params = {'instId': self.symbol, 'sz':20}
  239. session = aiohttp.ClientSession()
  240. response = await self.http_get_request(session, '/api/v5/market/books', params)
  241. depth_flash = await response.text()
  242. await session.close()
  243. return ujson.loads(depth_flash)
  244. def _get_data(self):
  245. market_data = self.depth + [self.max_buy, self.min_sell]
  246. self.max_buy = 0.0
  247. self.min_sell = 0.0
  248. self.buy_v = 0.0
  249. self.buy_q = 0.0
  250. self.sell_v = 0.0
  251. self.sell_q = 0.0
  252. return {'name': self.name,'data':market_data}
  253. async def go(self):
  254. interval = float(self.params.interval)
  255. if self.is_print:print(f'Ws循环器启动 interval {interval}')
  256. ### onTrade
  257. while 1:
  258. try:
  259. # 更新市场信息
  260. market_data = self._get_data()
  261. self.callback['onMarket'](market_data)
  262. except:
  263. traceback.print_exc()
  264. await asyncio.sleep(interval)
  265. def subscribe_private(self):
  266. subs = [
  267. {'channel':'balance_and_position'},
  268. {'channel':'account'},
  269. {'channel':'orders', 'instType':"SWAP", 'instId':self.symbol}
  270. ]
  271. return ujson.dumps({'op':'subscribe', 'args':subs})
  272. def subscribe_public(self):
  273. channels = []
  274. if self.sub_fast:
  275. channels.append("books50-l2-tbt")
  276. else:
  277. channels.append("books5")
  278. # "tickers", # 100ms 比book50慢
  279. # "books",
  280. # "books5",
  281. # "books-l2-tbt",
  282. # "books50-l2-tbt",
  283. # "price-limit"
  284. if self.sub_trade:
  285. channels.append("trades")
  286. subs = [{'instId':self.symbol, 'channel':channel} for channel in channels]
  287. return ujson.dumps({'op':'subscribe', 'args':subs})
  288. async def run_private(self):
  289. """"""
  290. while 1:
  291. try:
  292. self.private_update_time = time.time()
  293. print(f"{self.name} private 尝试连接ws")
  294. ws_url = self.URL_PRIVATE
  295. async with aiohttp.ClientSession(
  296. connector = aiohttp.TCPConnector(
  297. limit=50,
  298. keepalive_timeout=120,
  299. verify_ssl=False,
  300. local_addr=(self.ip,0)
  301. )
  302. ).ws_connect(
  303. ws_url,
  304. proxy=self.proxy,
  305. timeout=30,
  306. receive_timeout=30,
  307. ) as _ws:
  308. print(f"{self.name} private ws连接成功")
  309. self.logger.debug(f"{self.name} private ws连接成功")
  310. await _ws.send_str(self.login_params())
  311. msg = await _ws.receive(timeout=30)
  312. loggined = False
  313. if msg:
  314. msg = ujson.loads(msg.data)
  315. if msg['event'] == 'login' and msg['code'] == "0":
  316. loggined = True
  317. print(f"{self.name} private login success.")
  318. self.logger.debug(f"{self.name} private login success.")
  319. await _ws.send_str(self.subscribe_private()) # login成功就需要去订阅
  320. if not loggined:
  321. print(f"{self.name} private login failed. --> {msg}")
  322. self.logger.debug(f"{self.name} private login failed. --> {msg}")
  323. await asyncio.sleep(3)
  324. while loggined:
  325. # 停机信号
  326. if self.stop_flag:
  327. await _ws.close()
  328. return
  329. # 接受消息
  330. try:
  331. msg = await _ws.receive(timeout=30)
  332. except:
  333. print(f'{self.name} private ws长时间没有收到消息 准备重连...')
  334. self.logger.error(f'{self.name} private ws长时间没有收到消息 准备重连...')
  335. break
  336. msg = msg.data
  337. await self.on_message_private(_ws, msg)
  338. except:
  339. traceback.print_exc()
  340. print(f'{self.name} ws public 连接失败 开始重连...')
  341. self.logger.error(f'{self.name} ws public 连接失败 开始重连...')
  342. self.logger.error(traceback.format_exc())
  343. await asyncio.sleep(1)
  344. async def run_public(self):
  345. """"""
  346. while 1:
  347. try:
  348. self.public_update_time = time.time()
  349. print(f"{self.name} public 尝试连接ws")
  350. ws_url = self.URL_PUBLIC
  351. async with aiohttp.ClientSession(
  352. connector = aiohttp.TCPConnector(
  353. limit=50,
  354. keepalive_timeout=120,
  355. verify_ssl=False,
  356. local_addr=(self.ip,0)
  357. )
  358. ).ws_connect(
  359. ws_url,
  360. proxy=self.proxy,
  361. timeout=30,
  362. receive_timeout=30,
  363. ) as _ws:
  364. print(f"{self.name} public ws连接成功")
  365. self.logger.debug(f"{self.name} public ws连接成功")
  366. await _ws.send_str(self.subscribe_public())
  367. while True:
  368. # 停机信号
  369. if self.stop_flag:
  370. await _ws.close()
  371. return
  372. # 接受消息
  373. try:
  374. msg = await _ws.receive(timeout=30)
  375. except:
  376. print(f'{self.name} public ws长时间没有收到消息 准备重连...')
  377. self.logger.error(f'{self.name} public ws长时间没有收到消息 准备重连...')
  378. break
  379. msg = msg.data
  380. await self.on_message_public(_ws, msg)
  381. except:
  382. traceback.print_exc()
  383. print(f'{self.name} ws public 连接失败 开始重连...')
  384. self.logger.error(f'{self.name} ws public 连接失败 开始重连...')
  385. self.logger.error(traceback.format_exc())
  386. await asyncio.sleep(1)
  387. def _update_ticker(self, msg):
  388. """"""
  389. self.public_update_time = time.time()
  390. msg = ujson.loads(msg)
  391. ticker = msg['data'][0]
  392. t = int(ticker['ts'])
  393. if t > self.update_t:
  394. self.update_t = t
  395. bp = float(ticker['bidPx'])
  396. ap = float(ticker['askPx'])
  397. bq = float(ticker['bidSz'])
  398. aq = float(ticker['askSz'])
  399. self.ticker_info["bp"] = bp
  400. self.ticker_info["ap"] = ap
  401. self.callback['onTicker'](self.ticker_info)
  402. ####
  403. self.depth = [bp, bq, ap, aq]
  404. self.callback['onDepth']({'name':self.name,'data':self.depth})
  405. def _update_trade(self, msg):
  406. """"""
  407. self.public_update_time = time.time()
  408. msg = ujson.loads(msg)
  409. for i in msg['data']:
  410. side = i['side']
  411. amount = float(i['sz'])*self.ctVal # paper trans to coin
  412. price = float(i['px'])
  413. if price > self.max_buy or self.max_buy == 0.0:
  414. self.max_buy = price
  415. if price < self.min_sell or self.min_sell == 0.0:
  416. self.min_sell = price
  417. if side == 'buy':
  418. self.buy_q += amount
  419. self.buy_v += amount*price
  420. elif side == 'sell':
  421. self.sell_q += amount
  422. self.sell_v += amount*price
  423. #### 修正ticker ####
  424. # if side == 'buy' and price > self.ticker_info['ap']:
  425. # self.ticker_info['ap'] = price
  426. # self.callback['onTicker'](self.ticker_info)
  427. # if side == 'sell' and price < self.ticker_info['bp']:
  428. # self.ticker_info['bp'] = price
  429. # self.callback['onTicker'](self.ticker_info)
  430. async def _update_depth(self, _ws, msg):
  431. """"""
  432. self.public_update_time = time.time()
  433. msg = ujson.loads(msg)
  434. if "action" not in msg:
  435. # books5 就没有action,但5档实在不够用,而且间隔200ms
  436. depth = msg['data'][0]
  437. bp = float(depth['bids'][0][0])
  438. bv = float(depth['bids'][0][1])
  439. ap = float(depth['asks'][0][0])
  440. av = float(depth['asks'][0][1])
  441. self.ticker_info["bp"] = bp
  442. self.ticker_info["ap"] = ap
  443. self.callback['onTicker'](self.ticker_info)
  444. self.depth = [bp,bv,ap,av]
  445. self.callback['onDepth']({'name':self.name,'data':self.depth})
  446. return
  447. depth = msg['data'][0]
  448. action = msg['action']
  449. if action == 'update':
  450. self._update_depth_update(depth)
  451. elif action == 'snapshot':
  452. self._update_depth_snapshot(depth)
  453. ob = self._detail_ob
  454. if self.compare_checksum(ob, depth):
  455. t = int(depth['ts'])
  456. if t > self.update_t:
  457. self.update_t = t
  458. self.ticker_info["bp"] = float(ob['bids'][0][0])
  459. self.ticker_info["ap"] = float(ob['asks'][0][0])
  460. self.callback['onTicker'](self.ticker_info)
  461. ##### 标准化深度
  462. mp = (self.ticker_info["bp"] + self.ticker_info["ap"])*0.5
  463. step = mp * utils.EFF_RANGE / utils.LEVEL
  464. bp = []
  465. ap = []
  466. bv = [0 for _ in range(utils.LEVEL)]
  467. av = [0 for _ in range(utils.LEVEL)]
  468. for i in range(utils.LEVEL):
  469. bp.append(self.ticker_info["bp"]-step*i)
  470. for i in range(utils.LEVEL):
  471. ap.append(self.ticker_info["ap"]+step*i)
  472. #
  473. price_thre = self.ticker_info["bp"] - step
  474. index = 0
  475. for bid in ob['bids']:
  476. price = float(bid[0])
  477. amount = float(bid[1])
  478. if price > price_thre:
  479. bv[index] += amount
  480. else:
  481. price_thre -= step
  482. index += 1
  483. if index == utils.LEVEL:
  484. break
  485. bv[index] += amount
  486. price_thre = self.ticker_info["ap"] + step
  487. index = 0
  488. for ask in ob['asks']:
  489. price = float(ask[0])
  490. amount = float(ask[1])
  491. if price < price_thre:
  492. av[index] += amount
  493. else:
  494. price_thre += step
  495. index += 1
  496. if index == utils.LEVEL:
  497. break
  498. av[index] += amount
  499. self.depth = bp + bv + ap + av
  500. self.callback['onDepth']({'name':self.name,'data':self.depth})
  501. else:
  502. await self.resubscribe_depth(_ws)
  503. async def resubscribe_depth(self, _ws):
  504. info = f"{self.name} checksum not correct!"
  505. print(info)
  506. self.logger.info(info)
  507. args = []
  508. if self.sub_fast:
  509. args.append({'channel':'books50-l2-tbt','instId':self.symbol})
  510. else:
  511. args.append({'channel':'books5','instId':self.symbol},)
  512. # {'channel':'books','instId':self.symbol},
  513. # {'channel':'books5','instId':self.symbol},
  514. # {'channel':'books50-l2-tbt','instId':self.symbol},
  515. # {'channel':'books-l2-tbt','instId':self.symbol},
  516. sub_str = {'op':"unsubscribe",'args':args}
  517. await _ws.send_str(ujson.dumps(sub_str))
  518. await asyncio.sleep(1)
  519. sub_str['op'] = 'subscribe'
  520. await _ws.send_str(ujson.dumps(sub_str))
  521. def _update_depth_update(self, depth):
  522. ob = self._detail_ob
  523. ob['timestamp'] = depth['ts']
  524. bids_p = ob['bids']
  525. asks_p = ob['asks']
  526. bids_p = update_bids(depth, bids_p)
  527. asks_p = update_asks(depth, asks_p)
  528. def _update_depth_snapshot(self, depth):
  529. self._detail_ob = depth
  530. def _update_order(self, msg):
  531. '''将ws收到的订单信息触发quant'''
  532. msg = ujson.loads(msg)
  533. self.logger.debug(f"ws订单推送 {msg}")
  534. order = msg['data'][0]
  535. if order['instId'] == self.symbol:
  536. order_event = dict()
  537. status = order['state']
  538. if status in ["live", 'partially_filled']:
  539. local_status = 'NEW'
  540. elif status in ['canceled', 'filled']:
  541. local_status = 'REMOVE'
  542. else:
  543. print(f'未知订单状态 {order}')
  544. return
  545. order_event['status'] = local_status
  546. order_event['filled_price'] = float(order['avgPx'])
  547. order_event['filled'] = float(order['accFillSz'])*self.ctVal # usdt永续需要考虑每张的单位
  548. order_event['client_id'] = order['clOrdId']
  549. order_event['order_id'] = order['ordId']
  550. if order['feeCcy'] == 'USDT':
  551. order_event['fee'] = -float(order['fee'])
  552. self.callback['onOrder'](order_event)
  553. # print(order_event)
  554. self.private_update_time = time.time()
  555. def _update_balance_position(self, msg):
  556. """"""
  557. msg = ujson.loads(msg)
  558. msg = msg['data'][0]
  559. # accounts = msg['balData']
  560. # self._update_account(accounts)
  561. positions = msg['posData']
  562. self._update_position(positions)
  563. def _update_position(self, positions):
  564. long_pos, short_pos = 0, 0
  565. long_avg, short_avg = 0, 0
  566. is_update = 0
  567. for i in positions:
  568. if i['instId'] == self.symbol:
  569. is_update = 1
  570. if i['posSide'] == 'long':
  571. long_pos += abs(float(i['pos'])*self.ctVal)
  572. long_avg = abs(float(i['avgPx']))
  573. elif i['posSide'] == 'short':
  574. short_pos += abs(float(i['pos'])*self.ctVal)
  575. short_avg = abs(float(i['avgPx']))
  576. if is_update:
  577. pos = model.Position()
  578. pos.longPos = long_pos
  579. pos.longAvg = long_avg
  580. pos.shortPos = short_pos
  581. pos.shortAvg = short_avg
  582. self.callback['onPosition'](pos)
  583. #print(f'{self.symbol} {long_pos} {long_avg} {short_pos} {short_avg}')
  584. self.private_update_time = time.time()
  585. def _update_account(self, accounts):
  586. accounts = ujson.loads(accounts)
  587. for data in accounts['data']:
  588. for i in data['details']:
  589. if i['ccy'] == self.quote:
  590. self.data['equity'] = float(i['eq'])
  591. self.callback['onEquity']({self.quote:self.data['equity']})
  592. self.private_update_time = time.time()
  593. def _update_price_limit(self, accounts):
  594. accounts = ujson.loads(accounts)
  595. buy_limit = float(accounts['data'][0]['buyLmt'])
  596. sell_limit = float(accounts['data'][0]['sellLmt'])
  597. if self.ticker_info['bp'] > 0 and self.ticker_info['ap'] > 0:
  598. mp = (self.ticker_info['bp']+self.ticker_info['ap'])*0.5
  599. upper = buy_limit * 0.99
  600. lower = sell_limit * 1.01
  601. if mp > upper or mp < lower:
  602. self.callback['onExit'](f"{self.name} 触发限价警告 准备停机")
  603. self.private_update_time = time.time()
  604. async def on_message_private(self, _ws, msg):
  605. """"""
  606. if "data" in msg:
  607. # 推送数据时,有data字段,优先级也最高
  608. if "orders" in msg:
  609. self._update_order(msg)
  610. elif "balance_and_position" in msg:
  611. self._update_balance_position(msg)
  612. elif "account" in msg:
  613. self._update_account(msg)
  614. elif "event" in msg:
  615. # event常见于事件回报,一般都可以忽略,只需要看看是否有error
  616. if "error" in msg:
  617. info = f'{self.name} on_message error! --> {msg}'
  618. print(info)
  619. self.logger.error(info)
  620. elif 'ping' in msg:
  621. await _ws.send_str('pong')
  622. else:
  623. print(msg)
  624. async def on_message_public(self, _ws, msg):
  625. """"""
  626. #print(msg)
  627. if "data" in msg:
  628. # 推送数据时,有data字段,优先级也最高
  629. if "tickers" in msg:
  630. self._update_ticker(msg)
  631. elif "trades" in msg:
  632. self._update_trade(msg)
  633. elif "books" in msg:
  634. await self._update_depth(_ws, msg)
  635. elif "price-limit" in msg:
  636. self._update_price_limit(msg)
  637. elif "event" in msg:
  638. # event常见于事件回报,一般都可以忽略,只需要看看是否有error
  639. if "error" in msg:
  640. info = f'{self.name} on_message error! --> {msg}'
  641. print(info)
  642. self.logger.error(info)
  643. elif 'ping' in msg:
  644. await _ws.send_str('pong')
  645. else:
  646. print(msg)
  647. async def run(self, is_auth=0, sub_trade=0, sub_fast=0):
  648. # update sub info
  649. self.sub_fast = sub_fast
  650. self.sub_trade = sub_trade
  651. # exchange info
  652. info = await self.get_instruments()
  653. self.update_instruments(info)
  654. print(f"{self.name} public 更新产品信息 ws {self.symbol} {self.ctVal} {self.ctMult}")
  655. # run
  656. asyncio.create_task(self.run_public())
  657. if is_auth:
  658. asyncio.create_task(self.run_private())
  659. while True:
  660. await asyncio.sleep(5)
  661. # ------------------------------------------------------
  662. # -- utils ---------------------------------------------
  663. @staticmethod
  664. def compare_checksum(ob, depth):
  665. """计算深度的校验和"""
  666. #t1 = time.time()
  667. # 降低校验频率
  668. if random.randint(0,10) == 0:
  669. cm = check(ob["bids"], ob['asks'])
  670. #t2 = time.time()
  671. #print(cm, depth['checksum'], (t2-t1)*1000)
  672. return cm==depth['checksum']
  673. else:
  674. return 1
  675. def get_logger(self):
  676. logger = logging.getLogger(__name__)
  677. logger.setLevel(logging.DEBUG)
  678. # log to txt
  679. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  680. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  681. handler.setLevel(logging.DEBUG)
  682. handler.setFormatter(formatter)
  683. logger.addHandler(handler)
  684. return logger
  685. async def http_get_request(self, session, method, query=None, **args):
  686. """"""
  687. params = dict()
  688. params.update(**args)
  689. if query is not None: params.update(query)
  690. timestamp = self.timestamp()
  691. if params:
  692. path = '{method}?{params}'.format(method=method, params=urllib.parse.urlencode(params))
  693. else:
  694. path = method
  695. sign = self.__sign(timestamp, 'GET', path, None)
  696. self.getheader['OK-ACCESS-SIGN'] = sign
  697. self.getheader['OK-ACCESS-TIMESTAMP'] = timestamp
  698. url = f"{self.REST}{path}"
  699. rst = await session.get(
  700. url,
  701. headers=self.getheader,
  702. timeout=5,
  703. proxy=self.proxy
  704. )
  705. return rst
  706. async def http_get_request_pub(self, session, method, query=None, **args):
  707. """"""
  708. params = dict()
  709. params.update(**args)
  710. if query is not None: params.update(query)
  711. timestamp = self.timestamp()
  712. if params:
  713. path = '{method}?{params}'.format(method=method, params=urllib.parse.urlencode(params))
  714. else:
  715. path = method
  716. headers = {}
  717. headers['Content-Type'] = 'application/json'
  718. url = f"{self.REST}{path}"
  719. rst = await session.get(
  720. url,
  721. headers=headers,
  722. timeout=5,
  723. proxy=self.proxy
  724. )
  725. return rst
  726. def timestamp(self):
  727. return datetime.utcnow().isoformat("T")[:-3] + 'Z'
  728. def __sign(self, timestamp, method, path, data):
  729. if data:
  730. message = timestamp + method + path + data
  731. else:
  732. message = timestamp + method + path
  733. digest = hmac.new(bytes(self.params.secret_key.encode('utf8')), bytes(message.encode('utf8')), digestmod=hashlib.sha256).digest()
  734. return base64.b64encode(digest).decode('utf-8')
  735. def login_params(self):
  736. """生成login字符"""
  737. timestamp = str(time.time())
  738. message = timestamp + 'GET' + '/users/self/verify'
  739. mac = hmac.new(bytes(self.params.secret_key.encode('utf8')), bytes(message.encode('utf8')), digestmod=hashlib.sha256).digest()
  740. sign = base64.b64encode(mac)
  741. login_dict = {}
  742. login_dict['apiKey'] = self.params.access_key
  743. login_dict['passphrase'] = self.params.pass_key
  744. login_dict['timestamp'] = timestamp
  745. login_dict['sign'] = sign.decode('utf-8')
  746. login_param = {'op': 'login', 'args': [login_dict]}
  747. login_str = ujson.dumps(login_param)
  748. return login_str
  749. def make_header(self):
  750. """"""
  751. headers = {}
  752. headers['Content-Type'] = 'application/json'
  753. headers['OK-ACCESS-KEY'] = self.params.access_key
  754. headers['OK-ACCESS-SIGN'] = None
  755. headers['OK-ACCESS-TIMESTAMP'] = None
  756. headers['OK-ACCESS-PASSPHRASE'] = self.params.pass_key
  757. return headers