okex_usdt_swap_rest.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import json, ujson
  5. import zlib
  6. import hashlib
  7. import hmac
  8. import base64
  9. import traceback
  10. import random, csv, sys
  11. import logging, logging.handlers
  12. from datetime import datetime
  13. from urllib.parse import urlparse
  14. import urllib
  15. from decimal import Decimal
  16. import utils
  17. import model
  18. def empty_call(msg):
  19. pass
  20. def sort_num(n):
  21. if n.isdigit():
  22. return int(n)
  23. else:
  24. return float(n)
  25. class OkexUsdtSwapRest:
  26. """"""
  27. def __init__(self, params:model.ClientParams, colo=0):
  28. if colo:
  29. print('不支持colo高速线路')
  30. self.REST = 'https://www.okx.com' # hk
  31. # REST = 'https://aws.okx.com' # aws
  32. else:
  33. self.REST = 'https://www.okx.com' # hk
  34. # REST = 'https://aws.okx.com' # aws
  35. self.params = params
  36. self.name = self.params.name
  37. self.base = params.pair.split('_')[0].upper()
  38. self.quote = params.pair.split('_')[1].upper()
  39. self.symbol = f"{self.base}-{self.quote}-SWAP"
  40. if len(self.params.pair.split('_')) > 2:
  41. self.delivery = self.params.pair.split('_')[2] # 210924
  42. self.symbol += f"-{self.delivery}"
  43. self.data = {}
  44. self._SESSIONS = dict()
  45. self.callback = {
  46. "onMarket":empty_call,
  47. "onPosition":empty_call,
  48. "onOrder":empty_call,
  49. "onEquity":empty_call,
  50. "onExit":empty_call,
  51. }
  52. self.exchange_info = dict()
  53. self.stepSize = None
  54. self.tickSize = None
  55. self.ctVal = None
  56. self.ctMult = None
  57. self.delays = []
  58. self.avg_delay = 0
  59. self.max_delay = 0
  60. self.proxy = None
  61. self.broker_id = self.params.broker_id
  62. if 'win' in sys.platform:
  63. self.proxy = self.params.proxy
  64. self.logger = self.get_logger()
  65. self.stop_flag = 0
  66. self.coin_value = 0.0
  67. self.cash_value = 0.0
  68. self.getheader = self.make_header()
  69. self.postheader = self.make_header()
  70. #### 指定发包ip
  71. iplist = utils.get_local_ip_list()
  72. self.ip = iplist[int(self.params.ip)]
  73. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='LIMIT'):
  74. '''
  75. 下单接口
  76. '''
  77. if symbol not in self.exchange_info:
  78. await self.before_trade()
  79. # amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))*Decimal(str(self.exchange_info[symbol].stepSize)))
  80. # price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  81. amount = utils.fix_amount(amount, self.exchange_info[symbol].stepSize)
  82. price = utils.fix_price(price, self.exchange_info[symbol].tickSize)
  83. amount = int(amount/self.ctVal) # 这里把币转成张 后续用张来下单
  84. # 似乎有了num_to_str就不再需要下面两行
  85. if origin_side =='kd':
  86. side = 'buy'
  87. positionSide = 'long'
  88. elif origin_side =='pd':
  89. side = 'sell'
  90. positionSide = 'long'
  91. elif origin_side =='kk':
  92. side = 'sell'
  93. positionSide = 'short'
  94. elif origin_side =='pk':
  95. side = 'buy'
  96. positionSide = 'short'
  97. else:
  98. raise Exception(f'下单参数错误 side:{origin_side}')
  99. if amount <= 0.0:
  100. # okex 因为 数量单位为张 很容易出现这个问题 避免频繁写入日志
  101. # self.logger.error(f'下单参数错误 amount:{amount} side:{origin_side}')
  102. order_event = dict()
  103. order_event['status'] = "REMOVE"
  104. order_event['filled_price'] = 0.0
  105. order_event['filled'] = 0.0
  106. order_event['client_id'] = cid
  107. self.callback["onOrder"](order_event)
  108. return None, None
  109. if price <= 0.0:
  110. self.logger.error(f'下单参数错误 price:{price} side:{origin_side}')
  111. order_event = dict()
  112. order_event['status'] = "REMOVE"
  113. order_event['filled_price'] = 0.0
  114. order_event['filled'] = 0.0
  115. order_event['client_id'] = cid
  116. self.callback["onOrder"](order_event)
  117. return None, None
  118. params = {
  119. 'instId': symbol,
  120. 'tdMode': "cross",
  121. 'sz': amount,
  122. 'side': side,
  123. 'posSide': positionSide,
  124. 'ordType': "limit" if order_type!="MARKET" else 'market',
  125. 'clOrdId': cid,
  126. }
  127. if params['ordType'] == 'limit':
  128. params['px'] = utils.num_to_str(price, self.exchange_info[symbol].tickSize)
  129. if self.params.debug == 'True':
  130. await asyncio.sleep(0.1)
  131. return None, None
  132. else:
  133. # 再报单
  134. response, error = await self.http_post_request('/api/v5/trade/order', params)
  135. # 再更新
  136. if response is not None:
  137. if response:
  138. data = response['data'][0]
  139. order_event = dict()
  140. order_event['status'] = "NEW"
  141. order_event['client_id'] = params['clOrdId']
  142. order_event['order_id'] = data['ordId']
  143. self.callback["onOrder"](order_event)
  144. if error:
  145. order_event = dict()
  146. order_event['status'] = "REMOVE"
  147. order_event['filled_price'] = 0
  148. order_event['filled'] = 0
  149. order_event['client_id'] = params['clOrdId']
  150. self.callback["onOrder"](order_event)
  151. return response, error
  152. async def cancel_order(self, order_id=None, client_id=None, symbol=None):
  153. """注意,ok撤单不能更新订单状态,撤单成功也仅仅代表交易所收到了撤单请求"""
  154. params = {
  155. "instId": self.symbol if symbol==None else symbol,
  156. }
  157. if order_id:
  158. params["ordId"] = order_id
  159. if client_id:
  160. params["clOrdId"] = client_id
  161. if self.params.debug == 'True':
  162. await asyncio.sleep(0.1)
  163. return None
  164. else:
  165. response, error = await self.http_post_request('/api/v5/trade/cancel-order', params)
  166. if error:
  167. # print("撤单失败",error)
  168. # 撤单失败 可能已经撤单 是否发生成交需要rest查
  169. # if client_id:await self.check_order(client_id=client_id)
  170. # if order_id:await self.check_order(order_id=order_id)
  171. return error
  172. return response
  173. async def check_order(self, order_id=None, client_id=None, symbol=None):
  174. params = {
  175. "instId": self.symbol if symbol==None else symbol,
  176. }
  177. if order_id:
  178. params["ordId"] = order_id
  179. if client_id:
  180. params["clOrdId"] = client_id
  181. if self.params.debug == 'True':
  182. await asyncio.sleep(0.1)
  183. return None
  184. else:
  185. response, error = await self.http_get_request('/api/v5/trade/order', params)
  186. if error:
  187. print(f"{self.name} 查单失败 {error}")
  188. return error
  189. if response['code']:
  190. for order in response['data']:
  191. if order['state'] in ['canceled', 'filled']:
  192. order_event = dict()
  193. order_event['status'] = 'REMOVE'
  194. order_event['client_id'] = order['clOrdId']
  195. order_event['order_id'] = order['ordId']
  196. order_event['filled'] = float(order['accFillSz'])*self.ctVal if order['accFillSz'] != '' else 0.0 # usdt永续需要考虑每张的单位
  197. order_event['filled_price'] = float(order['avgPx']) if order['avgPx'] != '' else 0.0
  198. self.callback['onOrder'](order_event)
  199. else:
  200. order_event = dict()
  201. order_event['status'] = "NEW"
  202. order_event['client_id'] = order['clOrdId']
  203. order_event['order_id'] = order['ordId']
  204. self.callback['onOrder'](order_event)
  205. return response
  206. async def get_order_list(self):
  207. '''
  208. 获取挂单表
  209. '''
  210. response, error = await self.http_get_request('/api/v5/trade/orders-pending', {'instId':self.symbol})
  211. # print(response)
  212. orders = [] # 查询当前挂单 只可能出现 new 和 partfill 默认成交为0 只有 done状态的订单才考虑是否有成交
  213. if response and response['code']:
  214. for i in response['data']:
  215. order_event = dict()
  216. order_event['status'] = "NEW"
  217. order_event['filled'] = 0
  218. order_event['filled_price'] = 0
  219. order_event['client_id'] = i["clOrdId"]
  220. order_event['order_id'] = i['ordId']
  221. self.callback["onOrder"](order_event)
  222. orders.append(order_event)
  223. if error:
  224. print('查询列表出错',error)
  225. return orders
  226. async def get_server_time(self):
  227. response = await self.http_get_request('/api/v5/public/time')
  228. return response
  229. async def get_equity(self):
  230. ##########
  231. res, err = await self.get_account()
  232. if res:
  233. for data in res['data']:
  234. for i in data['details']:
  235. if self.quote == i['ccy']:
  236. self.data['equity'] = float(i['eq'])
  237. self.callback['onEquity']({self.quote:self.data['equity']})
  238. if err:
  239. print('获取账户信息错误', err)
  240. ##########
  241. async def universalTransfer(self, _type='UMFUTURE_MAIN', asset='USDT', amount=0):
  242. """okex现在统一账户,没有钱包这个概念了,不实现"""
  243. pass
  244. async def futuresTransfer(self, _type='2', asset='USDT', amount=0):
  245. """okex现在统一账户,没有钱包这个概念了,不实现"""
  246. pass
  247. async def buy_token(self):
  248. '''买入平台币'''
  249. pass
  250. async def check_position(self, hold_coin=0.0):
  251. '''
  252. 检查是否存在非运行币种的仓位并take平仓
  253. 已支持全品种
  254. '''
  255. try:
  256. ###
  257. self.logger.info(f'{self.name} 检查遗漏订单')
  258. response, error = await self.http_get_request('/api/v5/trade/orders-pending', {})
  259. if response:
  260. for order in response['data']:
  261. params = {
  262. "instId": order['instId'],
  263. "ordId": order['ordId']
  264. }
  265. res, err = await self.http_post_request('/api/v5/trade/cancel-order', params)
  266. await asyncio.sleep(0.1)
  267. self.logger.info(f"{self.name} 清理遗漏订单 {res} {err}")
  268. ###
  269. if self.exchange_info == dict():
  270. await self.before_trade()
  271. ###
  272. self.logger.info(f'{self.name} 检查遗漏仓位')
  273. # 清空全部仓位
  274. response, error = await self.http_get_request('/api/v5/account/positions', {"instType":"SWAP"})
  275. if response:
  276. for i in response['data']:
  277. symbol = i['instId']
  278. pos = float(i['pos'])
  279. posSide = i['posSide']
  280. ###
  281. ticker, err = await self.http_get_request('/api/v5/market/ticker', {'instId':symbol})
  282. if err:
  283. print(err)
  284. if ticker:
  285. ap = float(ticker['data'][0]['askPx'])
  286. bp = float(ticker['data'][0]['bidPx'])
  287. ### 每个品种都要获取各自的精度
  288. trade_side = 'sell' if posSide == 'long' else "buy"
  289. trade_pos = abs(pos)
  290. trade_pos_side = posSide
  291. params = {
  292. 'instId': symbol,
  293. 'tdMode': "cross",
  294. 'sz': int(trade_pos),
  295. 'px': ap*1.001 if trade_side == 'buy' else bp*0.999,
  296. 'side' : trade_side,
  297. 'posSide': trade_pos_side,
  298. 'ordType': "limit",
  299. }
  300. response, error = await self.http_post_request('/api/v5/trade/order', params)
  301. print("下单结果",response,error)
  302. self.logger.info('遗留仓位检测完毕')
  303. except:
  304. self.logger.error("清仓程序执行出错")
  305. self.logger.error(traceback.format_exc())
  306. return
  307. async def before_trade(self):
  308. """"""
  309. response, error = await self.get_instruments()
  310. if error:
  311. print('获取市场信息错误',error)
  312. else:
  313. self.update_instruments(response)
  314. print(f'before_trade get_instruments successed. {self.symbol} {self.stepSize} {self.tickSize} {self.ctVal} {self.ctMult}')
  315. # 更新账户
  316. res, err = await self.get_account()
  317. if err:
  318. print(err)
  319. else:
  320. for data in res['data']:
  321. for i in data['details']:
  322. if self.quote == i['ccy']:
  323. self.data['equity'] = float(i['eq'])
  324. self.callback['onEquity']({self.quote:self.data['equity']})
  325. self.cash_value = float(i['eq'])
  326. print(f"{self.name} on_go {self.symbol} equity {self.quote} {self.data['equity']}")
  327. await self.change_pos_side()
  328. await asyncio.sleep(1)
  329. await self.set_leverage(10)
  330. await asyncio.sleep(1)
  331. await self.get_position()
  332. async def get_all_position(self):
  333. '''
  334. 获取仓位信息
  335. '''
  336. response, error = await self.http_get_request('/api/v5/account/positions', {})
  337. print(f'查看此账号全部仓位 {response} {error}')
  338. async def get_position(self):
  339. '''
  340. 获取仓位信息
  341. '''
  342. response, error = await self.http_get_request('/api/v5/account/positions', {'instId':self.symbol})
  343. if error:
  344. print(f"{self.name} get_position error {error}")
  345. return None
  346. longPos, shortPos = 0, 0
  347. longAvg, shortAvg = 0, 0
  348. for i in response['data']:
  349. if i['instId'] == self.symbol and i['pos'] and i['avgPx']:
  350. if i['posSide'] == 'long':
  351. longPos = float(i['pos'])*self.ctVal
  352. longAvg = float(i['avgPx'])
  353. elif i['posSide'] == 'short':
  354. shortPos = float(i['pos'])*self.ctVal
  355. shortAvg = float(i['avgPx'])
  356. position = model.Position()
  357. position.longPos = abs(longPos)
  358. position.longAvg = abs(longAvg)
  359. position.shortPos = abs(shortPos)
  360. position.shortAvg = abs(shortAvg)
  361. self.callback['onPosition'](position)
  362. return position
  363. async def get_ticker(self):
  364. res, err = await self.http_get_request('/api/v5/market/ticker', {'instId':self.symbol})
  365. if res:
  366. ap = float(res['data'][0]['askPx'])
  367. bp = float(res['data'][0]['bidPx'])
  368. mp = (ap+bp)/2
  369. ticker = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  370. return ticker
  371. if err:
  372. self.logger.debug(err)
  373. return None
  374. async def get_account(self):
  375. response, error = await self.http_get_request('/api/v5/account/balance', {'ccy':self.quote})
  376. return response, error
  377. async def go(self):
  378. '''
  379. 盘前
  380. 获取市场信息
  381. 获取账户信息
  382. 更改仓位模式(期货)
  383. 清空仓位和挂单
  384. 盘中
  385. 更新账户信息
  386. 更新挂单列表
  387. 更新仓位信息
  388. 更新延迟信息
  389. '''
  390. print('Rest循环器启动')
  391. interval = 60 # 不能太快防止占用限频
  392. ### beforeTrade
  393. await self.before_trade()
  394. await asyncio.sleep(1)
  395. ### onTrade
  396. loop = 0
  397. while 1:
  398. loop += 1
  399. try:
  400. # 停机信号
  401. if self.stop_flag:
  402. return
  403. # 更新账户
  404. res, err = await self.get_account()
  405. if err:
  406. print(err)
  407. else:
  408. for data in res['data']:
  409. for i in data['details']:
  410. if self.quote == i['ccy']:
  411. self.data['equity'] = float(i['eq'])
  412. self.callback['onEquity']({self.quote:self.data['equity']})
  413. # print(f"{self.name} on_go {self.symbol} equity {self.quote} {self.data['equity']}")
  414. # 更新仓位
  415. await self.get_position()
  416. # print(f"{self.name} on_go {self.symbol} position {position}")
  417. await asyncio.sleep(interval)
  418. # 打印延迟
  419. self.get_delay_info()
  420. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  421. except asyncio.CancelledError:
  422. return
  423. except:
  424. traceback.print_exc()
  425. await asyncio.sleep(30)
  426. async def handle_signals(self, orders):
  427. '''
  428. 执行策略指令
  429. 撤销订单
  430. 检查订单
  431. 下达订单
  432. '''
  433. try:
  434. for order_name in orders:
  435. if 'Cancel' in order_name:
  436. cid = orders[order_name][0]
  437. oid = orders[order_name][1]
  438. if cid:
  439. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  440. elif oid:
  441. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  442. for order_name in orders:
  443. if 'Check' in order_name:
  444. cid = orders[order_name][0]
  445. # oid = orders[order_name][1]
  446. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  447. for order_name in orders:
  448. if 'Limits' in order_name:
  449. for i in orders[order_name]:
  450. asyncio.get_event_loop().create_task(self.take_order(
  451. self.symbol,
  452. i[0],
  453. i[1],
  454. i[2],
  455. i[3]
  456. ))
  457. except Exception as e:
  458. traceback.print_exc()
  459. self.logger.error("执行信号出错"+str(e))
  460. await asyncio.sleep(0.1)
  461. async def set_leverage(self, lever=10):
  462. params = {'instId':self.symbol, 'lever':utils.num_to_str(lever, 1), 'mgnMode':'cross'}
  463. res, error = await self.http_post_request('/api/v5/account/set-leverage', params)
  464. if error:
  465. print(f"{self.name} 设置杠杆倍数 {params} failed. -->{error}")
  466. return None, error
  467. if res['code'] == '0':
  468. print(f"{self.name} 设置杠杆倍数 {params} success. -->{res}")
  469. else:
  470. print(f"{self.name} 设置杠杆倍数 {params} 暂时不能设置. -->{res}")
  471. return res, error
  472. async def change_pos_side(self, dual='true'):
  473. ''''''
  474. params = {'posMode': "long_short_mode"}
  475. res, error = await self.http_post_request('/api/v5/account/set-position-mode', params)
  476. if error:
  477. print(f"{self.name} 设置双向持仓 failed. -->{error}")
  478. return res, error
  479. if res['code'] == '0':
  480. print(f"{self.name} 设置双向持仓 success. -->{res}")
  481. else:
  482. print(f"{self.name} 设置双向持仓 暂时不能设置. -->{res}")
  483. return res, error
  484. async def get_instruments(self):
  485. """从rest获取合约信息"""
  486. params = {'instType': 'SWAP'}
  487. res, error = await self.http_get_request('/api/v5/public/instruments', params)
  488. return res, error
  489. def update_instruments(self, data):
  490. """根据信息调整合约信息"""
  491. for info in data['data']:
  492. if info['instId'] == self.symbol:
  493. self.ctVal = sort_num(info['ctVal'])
  494. self.ctMult = sort_num(info['ctMult'])
  495. self.tickSize = sort_num(info['tickSz'])
  496. self.stepSize = sort_num(info['minSz'])*self.ctVal
  497. #### 保存交易规则信息
  498. exchange_info = model.ExchangeInfo()
  499. exchange_info.symbol = info['instId']
  500. exchange_info.multiplier = sort_num(info['ctMult'])
  501. exchange_info.tickSize = sort_num(info['tickSz'])
  502. exchange_info.stepSize = sort_num(info['minSz'])*sort_num(info['ctMult'])
  503. self.exchange_info[exchange_info.symbol] = exchange_info
  504. def __sign(self, timestamp, method, path, data):
  505. if data:
  506. message = timestamp + method + path + data
  507. else:
  508. message = timestamp + method + path
  509. digest = hmac.new(bytes(self.params.secret_key.encode('utf8')), bytes(message.encode('utf8')), digestmod=hashlib.sha256).digest()
  510. return base64.b64encode(digest).decode('utf-8')
  511. def get_logger(self):
  512. logger = logging.getLogger(__name__)
  513. logger.setLevel(logging.DEBUG)
  514. # log to txt
  515. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  516. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  517. handler.setLevel(logging.DEBUG)
  518. handler.setFormatter(formatter)
  519. logger.addHandler(handler)
  520. return logger
  521. async def http_post_request(self, method, query=None, **args):
  522. """"""
  523. params = dict()
  524. params.update(**args)
  525. if query is not None: params.update(query)
  526. data = json.dumps(params)
  527. timestamp = self.timestamp()
  528. sign = self.__sign(timestamp, 'POST', method, data)
  529. self.postheader['OK-ACCESS-SIGN'] = sign
  530. self.postheader['OK-ACCESS-TIMESTAMP'] = timestamp
  531. url = f"{self.REST}{method}"
  532. res, error = await self._request('POST', url, data)
  533. return res, error
  534. async def http_get_request(self, method, query=None, **args):
  535. """"""
  536. params = dict()
  537. params.update(**args)
  538. if query is not None: params.update(query)
  539. timestamp = self.timestamp()
  540. if params:
  541. path = '{method}?{params}'.format(method=method, params=urllib.parse.urlencode(params))
  542. else:
  543. path = method
  544. sign = self.__sign(timestamp, 'GET', path, None)
  545. self.getheader['OK-ACCESS-SIGN'] = sign
  546. self.getheader['OK-ACCESS-TIMESTAMP'] = timestamp
  547. url = f"{self.REST}{path}"
  548. res, error = await self._request('GET', url)
  549. return res, error
  550. async def _request(self, method, url, params=None):
  551. """"""
  552. try:
  553. ######
  554. msg = f"rest请求记录 {method} {url} {params}"
  555. self.logger.debug(msg)
  556. ######
  557. session = self._get_session(url)
  558. start_time = time.time()
  559. if method == 'GET':
  560. headers = self.getheader
  561. response = await session.get(
  562. url,
  563. headers=headers,
  564. timeout=10,
  565. proxy=self.proxy
  566. )
  567. elif method == 'POST':
  568. headers = self.postheader
  569. response = await session.post(
  570. url,
  571. data=params,
  572. headers=headers,
  573. timeout=10,
  574. proxy=self.proxy
  575. )
  576. code = response.status
  577. res = await response.json()
  578. msg = f"rest请求记录 {method} {url} {headers} {params}"
  579. res_msg = msg + f' {res}'
  580. self.logger.debug(res_msg)
  581. if code not in (200, 201, 202, 203, 204, 205, 206):
  582. self.logger.error(f'METHOD:{method} URL:{url} PARAMS:{params} ERROR:{res}')
  583. return None, str(res)
  584. if 'code' in res:
  585. if int(res['code']) not in (0,):
  586. if '51401' in str(res):
  587. pass
  588. else:
  589. self.logger.error(f'METHOD:{method} URL:{url} PARAMS:{params} ERROR:{res}')
  590. return None, str(res)
  591. delay = int(1000*(time.time() - start_time))
  592. self.delays.append(delay)
  593. return res, None
  594. except Exception as e:
  595. print('网络请求错误')
  596. print(f'URL:{url} PARAMS:{params} ERROR:{e}')
  597. self.logger.error(e)
  598. self.logger.error(traceback.format_exc())
  599. return None, str(e)
  600. async def get_history(self):
  601. params = {
  602. 'instType': "SWAP",
  603. 'limit':"100"
  604. }
  605. res, error = await self.http_get_request('/api/v5/trade/fills', params)
  606. b_id=res['data'][0]['billId']
  607. ###
  608. data = []
  609. while 1:
  610. params = {
  611. 'instType': "SWAP",
  612. 'limit':"100",
  613. 'after':b_id
  614. }
  615. await asyncio.sleep(0.3)
  616. res, _ = await self.http_get_request('/api/v5/trade/fills', params)
  617. if res:
  618. if len(res['data']) == 0:
  619. break
  620. for i in res['data']:
  621. data.append(i)
  622. b_id = res['data'][-1]['billId']
  623. # b_id_s = []
  624. # for i in data:
  625. # if i['billId'] in b_id_s:
  626. # print(i['billId'])
  627. # b_id_s.append(i['billId'])
  628. with open("data5.json", 'w+') as f:
  629. f.write(json.dumps(data))
  630. def get_delay_info(self):
  631. if len(self.delays) > 100:
  632. self.delays = self.delays[-100:]
  633. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  634. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  635. def timestamp(self):
  636. return datetime.utcnow().isoformat("T")[:-3] + 'Z'
  637. def login_params(self):
  638. """生成login字符"""
  639. timestamp = str(time.time())
  640. message = timestamp + 'GET' + '/users/self/verify'
  641. mac = hmac.new(bytes(self.params.secret_key.encode('utf8')), bytes(message.encode('utf8')), digestmod=hashlib.sha256).digest()
  642. sign = base64.b64encode(mac)
  643. login_dict = {}
  644. login_dict['apiKey'] = self.params.access_key
  645. login_dict['passphrase'] = self.params.pass_key
  646. login_dict['timestamp'] = timestamp
  647. login_dict['sign'] = sign.decode('utf-8')
  648. login_param = {'op': 'login', 'args': [login_dict]}
  649. login_str = ujson.dumps(login_param)
  650. return login_str
  651. def make_header(self):
  652. """"""
  653. headers = {}
  654. headers['Content-Type'] = 'application/json'
  655. headers['OK-ACCESS-KEY'] = self.params.access_key
  656. headers['OK-ACCESS-SIGN'] = None
  657. headers['OK-ACCESS-TIMESTAMP'] = None
  658. headers['OK-ACCESS-PASSPHRASE'] = self.params.pass_key
  659. return headers
  660. def _get_session(self, url):
  661. parsed_url = urlparse(url)
  662. key = parsed_url.netloc or parsed_url.hostname
  663. if key not in self._SESSIONS:
  664. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  665. session = aiohttp.ClientSession(connector=tcp)
  666. self._SESSIONS[key] = session
  667. return self._SESSIONS[key]