coinex_usdt_swap_rest.py 23 KB


  1. import random
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json, ujson
  7. import hmac
  8. import base64
  9. import hashlib
  10. import traceback
  11. import urllib
  12. from urllib import parse
  13. from urllib.parse import urljoin
  14. import datetime, sys
  15. from urllib.parse import urlparse
  16. import logging, logging.handlers
  17. import utils
  18. import logging, logging.handlers
  19. import model
  20. from decimal import Decimal
  21. def empty_call(msg):
  22. print(f'空的回调函数 {msg}')
  23. class CoinExUsdtSwapRest:
  24. def __init__(self, params:model.ClientParams, colo=0):
  25. if colo:
  26. print('不支持colo高速线路')
  27. self.HOST = 'https://api.coinex.com'
  28. else:
  29. self.HOST = 'https://api.coinex.com'
  30. self.params = params
  31. self.name = self.params.name
  32. self.base = self.params.pair.split('_')[0].upper()
  33. self.quote = self.params.pair.split('_')[1].upper()
  34. self.symbol = self.base + self.quote
  35. self._SESSIONS = dict()
  36. self.logger = self.get_logger()
  37. self.callback = {
  38. "onMarket":empty_call,
  39. "onPosition":empty_call,
  40. "onOrder":empty_call,
  41. "onEquity":empty_call,
  42. "onTicker":empty_call,
  43. "onDepth":empty_call,
  44. "onExit":empty_call,
  45. }
  46. self.exchange_info = dict()
  47. self.tickSize = None
  48. self.stepSize = None
  49. self.delays = []
  50. self.max_delay = 0
  51. self.avg_delay = 0
  52. self.proxy = None
  53. if 'win' in sys.platform:
  54. self.proxy = self.params.proxy
  55. self.logger = self.get_logger()
  56. self.mp_from_rest = None
  57. self.stop_flag = 0
  58. self.coin_value = 0.0
  59. self.cash_value = 0.0
  60. self.min_trade_amount = 0.0
  61. #### 指定发包ip
  62. iplist = utils.get_local_ip_list()
  63. self.ip = iplist[int(self.params.ip)]
  64. def get_logger(self):
  65. logger = logging.getLogger(__name__)
  66. logger.setLevel(logging.DEBUG)
  67. # log to txt
  68. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %( message)s')
  69. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  70. handler.setLevel(logging.DEBUG)
  71. handler.setFormatter(formatter)
  72. logger.addHandler(handler)
  73. return logger
  74. def get_logger(self):
  75. logger = logging.getLogger(__name__)
  76. logger.setLevel(logging.DEBUG)
  77. # log to txt
  78. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  79. handler = logging.handlers.RotatingFileHandler("log.log",maxBytes=1024*1024,encoding='utf-8')
  80. handler.setLevel(logging.DEBUG)
  81. handler.setFormatter(formatter)
  82. # log to console
  83. console = logging.StreamHandler()
  84. console.setLevel(logging.WARNING)
  85. logger.addHandler(handler)
  86. logger.addHandler(console)
  87. return logger
  88. def _get_session(self, url):
  89. parsed_url = urlparse(url)
  90. key = parsed_url.netloc or parsed_url.hostname
  91. if key not in self._SESSIONS:
  92. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  93. session = aiohttp.ClientSession(connector=tcp)
  94. self._SESSIONS[key] = session
  95. return self._SESSIONS[key]
  96. def get_sign(self, params, secret_key):
  97. sort_params = params
  98. data = []
  99. for item in sort_params:
  100. data.append(item + '=' + str(params[item]))
  101. str_params = "{0}&secret_key={1}".format('&'.join(data), secret_key)
  102. token = hashlib.sha256(str_params.encode("utf8")).hexdigest()
  103. return token
  104. async def _request(self, method, uri, body=None, params=None, auth=False):
  105. url = urljoin(self.HOST, uri)
  106. headers = {}
  107. if auth:
  108. if method in ['GET','DELETE']:
  109. params['timestamp'] = int(time.time()*1000)
  110. headers = {
  111. 'Authorization':self.get_sign(params, self.params.secret_key),
  112. 'Content-Type': 'application/json; charset=utf-8',
  113. 'Accept': 'application/json',
  114. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',
  115. "AccessId":self.params.access_key
  116. }
  117. elif method == 'POST':
  118. body['timestamp'] = int(time.time()*1000)
  119. headers = {
  120. 'Authorization':self.get_sign(body, self.params.secret_key),
  121. 'Content-Type': 'application/json; charset=utf-8',
  122. 'Accept': 'application/json',
  123. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',
  124. "AccessId":self.params.access_key
  125. }
  126. # 发起请求
  127. session = self._get_session(url)
  128. timeout = aiohttp.ClientTimeout(10)
  129. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  130. self.logger.debug(msg)
  131. try:
  132. start_time = time.time()
  133. if method == "GET":
  134. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  135. elif method == "POST":
  136. response = await session.post(url, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  137. elif method == "DELETE":
  138. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  139. code = response.status
  140. res = await response.json(content_type=None)
  141. delay = int(1000*(time.time() - start_time))
  142. self.delays.append(delay)
  143. self.get_delay_info()
  144. res_msg = msg + f' 回报 {res}'
  145. self.logger.debug(res_msg)
  146. if code not in (200, 201, 202, 203, 204, 205, 206) or res['code'] not in (0,200):
  147. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  148. return None, res
  149. return res, None
  150. except Exception as e:
  151. print(f'{self.name} rest 请求出错', str(e))
  152. self.logger.error(f'请求错误 {msg}'+str(e))
  153. self.logger.error(traceback.format_exc())
  154. return None, e
  155. async def get_position(self):
  156. response, error = await self._request('GET', '/perpetual/v1/position/pending', params={"market":self.symbol}, auth=1)
  157. if error:
  158. print(error)
  159. if response:
  160. position = model.Position()
  161. for i in response['data']:
  162. side = i['side']
  163. pos = float(i['amount'])
  164. price = float(i['open_price'])
  165. if side == 1:
  166. position.shortPos = pos
  167. position.shortAvg = price
  168. elif side == 2:
  169. position.longPos = pos
  170. position.longAvg = price
  171. return position
  172. async def buy_token(self):
  173. '''买入平台币'''
  174. pass
  175. async def check_position(self, hold_coin=0.0):
  176. '''
  177. 现货交易 已支持全品种
  178. '''
  179. try:
  180. #######################
  181. self.logger.info("清空挂单")
  182. params = {
  183. 'market':self.symbol,
  184. 'side':0,
  185. }
  186. response, error = await self._request('POST', '/perpetual/v1/order/cancel_all', body=params, auth=1)
  187. if error:
  188. self.logger.info(error)
  189. #############################
  190. self.logger.info("清空仓位")
  191. res, err = await self._request('GET', '/perpetual/v1/position/pending', params={}, auth=1)
  192. if err:
  193. self.logger.info(err)
  194. if res:
  195. for i in res['data']:
  196. if abs(float(i['close_left'])) > 0.0:
  197. params = {
  198. "market":i['market'],
  199. "position_id":i['position_id'],
  200. # "amount":i["amount"]
  201. }
  202. response, error = await self._request('POST', '/perpetual/v1/order/close_market', body=params, auth=1)
  203. if response:
  204. self.logger.info(response)
  205. if error:
  206. self.logger.info(error)
  207. await asyncio.sleep(1)
  208. params = {
  209. "market":i['market'],
  210. "position_id":i['position_id'],
  211. }
  212. response, error = await self._request('POST', '/perpetual/v1/position/market_close', body=params, auth=1)
  213. self.logger.info(response)
  214. self.logger.info(error)
  215. #######################
  216. except:
  217. self.logger.error("清仓程序执行出错")
  218. self.logger.error(traceback.format_exc())
  219. return
  220. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='limit'):
  221. '''
  222. coinex swap 平仓需考虑最小下单量 只能通过close_position和position_id来平仓
  223. '''
  224. if origin_side =='kd':
  225. side = "2"
  226. elif origin_side =='pd':
  227. side = "1"
  228. elif origin_side =='kk':
  229. side = "1"
  230. elif origin_side =='pk':
  231. side = "2"
  232. else:
  233. print("合约不允许此交易方向")
  234. return None
  235. if symbol not in self.exchange_info:
  236. await self.before_trade()
  237. # amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))*Decimal(str(self.exchange_info[symbol].stepSize)))
  238. # price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  239. amount = utils.fix_amount(amount, self.exchange_info[symbol].stepSize)
  240. price = utils.fix_price(price, self.exchange_info[symbol].tickSize)
  241. if float(amount) <= self.min_trade_amount:
  242. # self.logger.error(f'下单参数错误 amount:{amount}')
  243. order_event = dict()
  244. order_event['status'] = "REMOVE"
  245. order_event['filled_price'] = 0.0
  246. order_event['fee'] = 0.0
  247. order_event['filled'] = 0.0
  248. order_event['client_id'] = cid
  249. self.callback["onOrder"](order_event)
  250. return None
  251. if float(price) <= 0.0:
  252. self.logger.error(f'下单参数错误 price:{price}')
  253. order_event = dict()
  254. order_event['status'] = "REMOVE"
  255. order_event['filled_price'] = 0.0
  256. order_event['fee'] = 0.0
  257. order_event['filled'] = 0.0
  258. order_event['client_id'] = cid
  259. self.callback["onOrder"](order_event)
  260. return None
  261. params = {
  262. 'client_id':cid,
  263. 'market': symbol,
  264. 'amount':utils.num_to_str(amount, self.exchange_info[symbol].stepSize),
  265. 'side': side,
  266. 'price':utils.num_to_str(price, self.exchange_info[symbol].tickSize),
  267. }
  268. if self.params.debug == 'True':
  269. return await asyncio.sleep(0.1)
  270. else:
  271. # 发单
  272. if order_type == 'limit':
  273. response, error = await self._request('POST', '/perpetual/v1/order/put_limit', body=params, auth=1)
  274. elif order_type == 'market':
  275. response, error = await self._request('POST', '/perpetual/v1/order/put_market', body=params, auth=1)
  276. # 再更新
  277. if response:
  278. # 增加新的
  279. if 'data' in response:
  280. order_event = dict()
  281. order_event['status'] = "NEW"
  282. order_event['client_id'] = params["client_id"]
  283. order_event['order_id'] = response['data']["order_id"]
  284. self.callback["onOrder"](order_event)
  285. if error:
  286. # coinex swap 有时候返回错误也下单成功 很危险
  287. order_event = dict()
  288. order_event['status'] = "REMOVE"
  289. order_event['filled_price'] = 0.0
  290. order_event['fee'] = 0.0
  291. order_event['filled'] = 0.0
  292. order_event['client_id'] = params["client_id"]
  293. self.callback["onOrder"](order_event)
  294. return response
  295. async def cancel_order(self, order_id=None, client_id=None):
  296. if order_id:
  297. response, error = await self._request('POST', f'/perpetual/v1/order/cancel', body={'market':self.symbol,'order_id':order_id}, auth=1)
  298. elif client_id:
  299. response, error = await self._request('POST', f'/perpetual/v1/order/cancel', body={'market':self.symbol,'order_id':client_id}, auth=1)
  300. else:
  301. raise Exception("撤单出错 没指定订单号")
  302. if response:
  303. self.logger.debug(f'撤单回报 {response}')
  304. # order_event = dict()
  305. # order_event['status'] = "REMOVE"
  306. # order_event['filled_price'] = float(response['data']['price'])
  307. # order_event['fee'] = float(response['data']["deal_fee"])
  308. # order_event['filled'] = float(response['data']['amount']) - float(response['data']['left'])
  309. # order_event['client_id'] = response['data']["client_id"]
  310. # self.callback["onOrder"](order_event)
  311. if error:
  312. print("撤单失败",error)
  313. self.logger.error(error)
  314. return response
  315. async def check_order(self, order_id=None, client_id=None):
  316. if order_id:
  317. response, error = await self._request('GET', f'/perpetual/v1/order/status', params={'market':self.symbol, 'order_id':order_id}, auth=1)
  318. elif client_id:
  319. response, error = await self._request('GET', f'/perpetual/v1/order/status', params={'market':self.symbol, 'order_id':client_id}, auth=1)
  320. else:
  321. return
  322. if response:
  323. self.logger.debug(f'查单回报 {response}')
  324. if response['data']:
  325. order_event = dict()
  326. if response["data"]['status'] in ['cancel','done']:
  327. order_event['status'] = "REMOVE"
  328. else:
  329. order_event['status'] = "NEW"
  330. order_event['price'] = float(response["data"]["price"])
  331. order_event['amount'] = float(response["data"]["amount"])
  332. order_event['filled'] = float(response["data"]["amount"])-float(response["data"]["left"])
  333. order_event['filled_price'] = float(response["data"]["price"])
  334. order_event['client_id'] = response["data"]["client_id"]
  335. order_event['order_id'] = response["data"]['order_id']
  336. order_event['fee'] = float(response["data"]['deal_fee'])
  337. self.callback["onOrder"](order_event)
  338. if error:
  339. print("查单失败",error)
  340. self.logger.error(error)
  341. return response
  342. async def get_order_list(self):
  343. params = {
  344. 'market':self.symbol,
  345. 'offset':100,
  346. "side":0,
  347. 'limit':100,
  348. }
  349. response, error = await self._request('GET', '/perpetual/v1/order/pending', params=params, auth=1)
  350. if response is not None:
  351. for i in response['data']['records']:
  352. order_event = dict()
  353. order_event['symbol'] = self.symbol
  354. order_event['price'] = float(i["price"])
  355. order_event['amount'] = float(i["amount"])
  356. order_event['filled'] = float(i["amount"])-float(i["left"])
  357. order_event['filled_price'] = float(i["avg_price"])
  358. order_event['client_id'] = i["clientOid"]
  359. order_event['order_id'] = i['id']
  360. asset_fee = float(response['data']["asset_fee"])
  361. money_fee = float(response['data']["money_fee"])
  362. stock_fee = float(response['data']["stock_fee"])
  363. if asset_fee > 0.0: # 非amm品种
  364. order_event['fee'] = asset_fee
  365. else: # amm品种
  366. order_event['fee'] = money_fee if money_fee > 0.0 else stock_fee
  367. if response["data"]['status'] == 'not_deal':
  368. order_event['status'] = "NEW"
  369. elif response["data"]['status'] in ['cancel','done']:
  370. order_event['status'] = "REMOVE"
  371. else:
  372. s = response["data"]['status']
  373. self.logger.error(f"错误的订单状态 {s}")
  374. self.callback["onOrder"](order_event)
  375. if error:
  376. print(error)
  377. return response
  378. async def get_server_time(self):
  379. params = {}
  380. response = await self._request('GET', '/perpetual/api/v1/timestamp', params=params)
  381. return response
  382. async def get_account(self):
  383. return await self._request('GET','/perpetual/v1/asset/query', params={}, auth=1)
  384. async def get_market_details(self):
  385. return await self._request('GET',f'/perpetual/v1/market/list', params={}, auth=0)
  386. async def get_ticker(self):
  387. res ,err = await self._request('GET',f'/perpetual/v1/market/depth', params={"market":self.symbol,'merge':'0.00000001'}, auth=0)
  388. if res:
  389. ap = float(res["data"]['asks'][0][0])
  390. bp = float(res["data"]['bids'][0][0])
  391. mp = (ap+bp)*0.5
  392. d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  393. self.callback['onTicker'](d)
  394. return d
  395. if err:
  396. self.logger.error(err)
  397. return None
  398. async def before_trade(self):
  399. # 切换杠杆
  400. await self.change_position_side()
  401. # 获取市场最新价格
  402. res = await self.get_ticker()
  403. ticker_price = res["mp"]
  404. if isinstance(ticker_price, float):
  405. self.mp_from_rest = ticker_price
  406. # 获取市场基本情况
  407. res, error = await self.get_market_details()
  408. if error:
  409. pass
  410. else:
  411. for i in res['data']:
  412. if i['name'] == self.symbol:
  413. self.stepSize = float(Decimal("0.1")**Decimal(i["amount_prec"]))
  414. self.tickSize = float(Decimal("0.1")**Decimal(i["money_prec"]))
  415. self.min_trade_amount = float(i['amount_min'])
  416. #### 保存交易规则信息
  417. exchange_info = model.ExchangeInfo()
  418. exchange_info.symbol = i['name']
  419. exchange_info.multiplier = 1
  420. exchange_info.tickSize = float(Decimal("0.1")**Decimal(i["money_prec"]))
  421. exchange_info.stepSize = float(Decimal("0.1")**Decimal(i["amount_prec"]))
  422. self.exchange_info[exchange_info.symbol] = exchange_info
  423. async def get_equity(self):
  424. # 更新账户
  425. res, err = await self.get_account()
  426. if err:print(err)
  427. if res:
  428. for i in res["data"]:
  429. if self.quote == i:
  430. cash = float(res['data'][i]['available'])+ float(res['data'][i]['frozen'])+ float(res['data'][i]['margin'])+float(res['data'][i]['profit_unreal'])
  431. self.callback['onEquity']({
  432. self.quote:cash
  433. })
  434. self.cash_value = cash
  435. async def change_position_side(self):
  436. res ,err = await self._request('POST',f'/perpetual/v1/market/adjust_leverage', body={"market":self.symbol,'leverage':10,"position_type":2}, auth=1)
  437. if err:print(err)
  438. if res:print(res)
  439. async def go(self):
  440. interval = 60 # 不能太快防止占用限频
  441. await self.before_trade()
  442. await asyncio.sleep(1)
  443. while 1:
  444. try:
  445. # 停机信号
  446. if self.stop_flag:return
  447. # 更新账户
  448. res, err = await self.get_account()
  449. if err:print(err)
  450. if res:
  451. for i in res["data"]:
  452. if self.quote == i:
  453. cash = float(res['data'][i]['available'])+float(res['data'][i]['frozen'])+float(res['data'][i]['margin'])+ float(res['data'][i]['profit_unreal'])
  454. self.callback['onEquity']({
  455. self.quote:cash
  456. })
  457. self.cash_value = cash
  458. # 更新仓位
  459. res, err = await self._request('GET', '/perpetual/v1/position/pending', params={"market":self.symbol}, auth=1)
  460. if err:
  461. self.logger.info(err)
  462. if res:
  463. p = model.Position()
  464. for i in res['data']:
  465. if i['side'] == 1:# sell
  466. p.shortPos = float(i['amount'])
  467. p.shortAvg = float(i['open_price'])
  468. if i['side'] == 2:# buy
  469. p.longPos = float(i['amount'])
  470. p.longAvg = float(i['open_price'])
  471. self.callback['onPosition'](p)
  472. await asyncio.sleep(interval)
  473. # 打印延迟
  474. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  475. except:
  476. traceback.print_exc()
  477. await asyncio.sleep(10)
  478. def get_delay_info(self):
  479. if len(self.delays) > 100:
  480. self.delays = self.delays[-100:]
  481. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  482. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  483. async def handle_signals(self, orders):
  484. '''执行策略指令'''
  485. try:
  486. for order_name in orders:
  487. if 'Cancel' in order_name:
  488. # cid = orders[order_name][0]
  489. oid = orders[order_name][1]
  490. # 只能用oid撤单
  491. if oid:
  492. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  493. for order_name in orders:
  494. if 'Limits' in order_name:
  495. for i in orders[order_name]:
  496. asyncio.get_event_loop().create_task(self.take_order(
  497. self.symbol,
  498. i[0],
  499. i[1],
  500. i[2],
  501. i[3]
  502. ))
  503. for order_name in orders:
  504. if 'Check' in order_name:
  505. # cid = orders[order_name][0]
  506. oid = orders[order_name][1]
  507. # 只能用oid查单
  508. if oid:
  509. asyncio.get_event_loop().create_task(self.check_order(order_id=oid))
  510. except Exception as e:
  511. traceback.print_exc()
  512. self.logger.error("执行信号出错"+str(e))
  513. await asyncio.sleep(0.1)