coinex_spot_rest.py 25 KB


  1. import random
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json
  7. import hmac
  8. import base64
  9. import hashlib
  10. import traceback
  11. import urllib
  12. from urllib import parse
  13. from urllib.parse import urljoin
  14. import datetime
  15. import sys
  16. from urllib.parse import urlparse
  17. import logging
  18. import logging.handlers
  19. import utils
  20. import logging
  21. import logging.handlers
  22. import model
  23. from decimal import Decimal
  24. def empty_call(msg):
  25. print(f'空的回调函数 {msg}')
  26. class CoinExSpotRest:
  27. def __init__(self, params: model.ClientParams, colo=0):
  28. if colo:
  29. print('不支持colo高速线路')
  30. self.HOST = 'https://api.coinex.com'
  31. else:
  32. self.HOST = 'https://api.coinex.com'
  33. self.params = params
  34. self.name = self.params.name
  35. self.base = self.params.pair.split('_')[0].upper()
  36. self.quote = self.params.pair.split('_')[1].upper()
  37. self.symbol = self.base + self.quote
  38. self.data = {}
  39. self._SESSIONS = dict()
  40. self.logger = self.get_logger()
  41. self.data['account'] = {}
  42. self.callback = {
  43. "onMarket": empty_call,
  44. "onPosition": empty_call,
  45. "onOrder": empty_call,
  46. "onEquity": empty_call,
  47. "onTicker": empty_call,
  48. "onExit": empty_call,
  49. }
  50. self.exchange_info = dict()
  51. self.tickSize = None
  52. self.stepSize = None
  53. self.delays = []
  54. self.max_delay = 0
  55. self.avg_delay = 0
  56. self.proxy = None
  57. if 'win' in sys.platform:
  58. self.proxy = self.params.proxy
  59. self.logger = self.get_logger()
  60. self.stop_flag = 0
  61. self.coin_value = 0.0
  62. self.cash_value = 0.0
  63. #### 指定发包ip
  64. iplist = utils.get_local_ip_list()
  65. self.ip = iplist[int(self.params.ip)]
  66. def get_logger(self):
  67. logger = logging.getLogger(__name__)
  68. logger.setLevel(logging.DEBUG)
  69. # log to txt
  70. formatter = logging.Formatter(
  71. '[%(asctime)s] - %(levelname)s - %(message)s')
  72. handler = logging.handlers.RotatingFileHandler(
  73. f"log.log", maxBytes=1024*1024)
  74. handler.setLevel(logging.DEBUG)
  75. handler.setFormatter(formatter)
  76. logger.addHandler(handler)
  77. return logger
  78. def get_logger(self):
  79. logger = logging.getLogger(__name__)
  80. logger.setLevel(logging.DEBUG)
  81. # log to txt
  82. formatter = logging.Formatter(
  83. '[%(asctime)s] - %(levelname)s - %(message)s')
  84. handler = logging.handlers.RotatingFileHandler(
  85. "log.log", maxBytes=1024*1024, encoding='utf-8')
  86. handler.setLevel(logging.DEBUG)
  87. handler.setFormatter(formatter)
  88. # log to console
  89. console = logging.StreamHandler()
  90. console.setLevel(logging.WARNING)
  91. logger.addHandler(handler)
  92. logger.addHandler(console)
  93. return logger
  94. def _get_session(self, url):
  95. parsed_url = urlparse(url)
  96. key = parsed_url.netloc or parsed_url.hostname
  97. if key not in self._SESSIONS:
  98. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  99. session = aiohttp.ClientSession(connector=tcp)
  100. self._SESSIONS[key] = session
  101. return self._SESSIONS[key]
  102. def get_sign(self, params, secret_key):
  103. sort_params = sorted(params)
  104. data = []
  105. for item in sort_params:
  106. data.append(item + '=' + str(params[item]))
  107. str_params = "{0}&secret_key={1}".format('&'.join(data), secret_key)
  108. token = hashlib.md5(str_params.encode("utf8")).hexdigest().upper()
  109. return token
  110. async def _request(self, method, uri, body=None, params=None, auth=False):
  111. url = urljoin(self.HOST, uri)
  112. headers = {}
  113. if auth:
  114. if method in ['GET', 'DELETE']:
  115. params['access_id'] = self.params.access_key
  116. params['tonce'] = int(time.time()*1000)
  117. headers = {
  118. 'AUTHORIZATION': self.get_sign(params, self.params.secret_key),
  119. 'Content-Type': 'application/json; charset=utf-8',
  120. 'Accept': 'application/json',
  121. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36'
  122. }
  123. elif method == 'POST':
  124. body['access_id'] = self.params.access_key
  125. body['tonce'] = int(time.time()*1000)
  126. headers = {
  127. 'AUTHORIZATION': self.get_sign(body, self.params.secret_key),
  128. 'Content-Type': 'application/json; charset=utf-8',
  129. 'Accept': 'application/json',
  130. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36'
  131. }
  132. # 发起请求
  133. session = self._get_session(url)
  134. timeout = aiohttp.ClientTimeout(10)
  135. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  136. self.logger.debug(msg)
  137. try:
  138. start_time = time.time()
  139. if method == "GET":
  140. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  141. elif method == "POST":
  142. response = await session.post(url, params=None, data=json.dumps(body), headers=headers, timeout=timeout, proxy=self.proxy)
  143. elif method == "DELETE":
  144. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  145. code = response.status
  146. res = await response.json()
  147. delay = int(1000*(time.time() - start_time))
  148. self.delays.append(delay)
  149. res_msg = msg + f' 回报 {res}'
  150. self.logger.debug(res_msg)
  151. if code not in (200, 201, 202, 203, 204, 205, 206) or res['code'] not in (0, 200):
  152. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  153. return None, res
  154. return res, None
  155. except Exception as e:
  156. print(f'{self.name} rest 请求出错', str(e))
  157. self.logger.error('请求错误'+str(e))
  158. self.logger.error(traceback.format_exc())
  159. return None, e
  160. async def buy_token(self):
  161. '''买入平台币'''
  162. pass
  163. async def check_position(self, hold_coin=0.0):
  164. '''
  165. 现货交易 已支持全品种
  166. '''
  167. try:
  168. #######################
  169. self.logger.info("清空挂单")
  170. params = {
  171. 'access_id': self.params.access_key,
  172. 'market': self.symbol,
  173. }
  174. response, error = await self._request('DELETE', '/v1/order/pending', params=params, auth=1)
  175. if error:
  176. self.logger.info(error)
  177. if response:
  178. self.logger.info(error)
  179. #############################
  180. res, err = await self.get_account()
  181. if err:
  182. self.logger.info(err)
  183. if res:
  184. for i in res['data']:
  185. if i in ['CET','USDT']:
  186. continue
  187. symbol = i + 'USDT'
  188. #######################
  189. ticker, error = await self._request('GET', f'/v1/market/depth', params={"market": symbol, 'merge': '0.00000001'}, auth=0)
  190. if ticker:
  191. ap = float(ticker["data"]['asks'][0][0])
  192. bp = float(ticker["data"]['bids'][0][0])
  193. mp = (ap+bp)*0.5
  194. if error:
  195. self.logger.error('ger ticker failed!')
  196. continue
  197. coin = float(res['data'][i]['available']) + \
  198. float(res['data'][i]['frozen'])
  199. coin_value = coin * mp
  200. if i == self.base:
  201. _hold_coin = hold_coin
  202. else:
  203. _hold_coin = 0
  204. diff = _hold_coin - coin_value
  205. diff *= 0.99 # 避免无法下单
  206. self.logger.info(f'{symbol}需要调整现货仓位{diff}usd')
  207. if diff > 20.0:
  208. #######################
  209. self.logger.info("清空挂单")
  210. params = {
  211. 'access_id': self.params.access_key,
  212. 'market': symbol,
  213. }
  214. response, error = await self._request('DELETE', '/v1/order/pending', params=params, auth=1)
  215. if error:
  216. self.logger.info(error)
  217. if response:
  218. self.logger.info(error)
  219. #############################
  220. info = await self.take_order(symbol, diff/mp, "kd", mp*1.001, "123", "limit")
  221. self.logger.info(info)
  222. elif diff < -20.0:
  223. #######################
  224. self.logger.info("清空挂单")
  225. params = {
  226. 'access_id': self.params.access_key,
  227. 'market': symbol,
  228. }
  229. response, error = await self._request('DELETE', '/v1/order/pending', params=params, auth=1)
  230. if error:
  231. self.logger.info(error)
  232. if response:
  233. self.logger.info(error)
  234. #############################
  235. info = await self.take_order(symbol, -diff/mp, "kk", mp*0.999, "123", "limit")
  236. self.logger.info(info)
  237. #######################
  238. except:
  239. self.logger.error("清仓程序执行出错")
  240. self.logger.error(traceback.format_exc())
  241. return
  242. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='limit'):
  243. if origin_side == 'kd':
  244. side = 'buy'
  245. elif origin_side == 'pd':
  246. side = 'sell'
  247. elif origin_side == 'kk':
  248. side = 'sell'
  249. elif origin_side == 'pk':
  250. side = 'buy'
  251. else:
  252. print("现货不允许此交易方向")
  253. return None
  254. if symbol not in self.exchange_info:
  255. await self.before_trade()
  256. amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))
  257. * Decimal(str(self.exchange_info[symbol].stepSize)))
  258. price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))
  259. * Decimal(str(self.exchange_info[symbol].tickSize)))
  260. if amount <= 0:
  261. self.logger.error(f'下单参数错误 amount:{amount}')
  262. order_event = dict()
  263. order_event['status'] = "REMOVE"
  264. order_event['filled_price'] = 0.0
  265. order_event['fee'] = 0.0
  266. order_event['filled'] = 0.0
  267. order_event['client_id'] = cid
  268. self.callback["onOrder"](order_event)
  269. return None
  270. if price <= 0:
  271. self.logger.error(f'下单参数错误 price:{price}')
  272. order_event = dict()
  273. order_event['status'] = "REMOVE"
  274. order_event['filled_price'] = 0.0
  275. order_event['fee'] = 0.0
  276. order_event['filled'] = 0.0
  277. order_event['client_id'] = cid
  278. self.callback["onOrder"](order_event)
  279. return None
  280. params = {
  281. 'access_id': self.params.access_key,
  282. 'client_id': cid,
  283. 'market': symbol,
  284. 'amount': utils.num_to_str(amount, self.exchange_info[symbol].stepSize),
  285. 'type': side,
  286. 'price': utils.num_to_str(price, self.exchange_info[symbol].tickSize),
  287. }
  288. # logger.info(f'下单指令 {params}')
  289. if self.params.debug == 'True':
  290. return await asyncio.sleep(0.1)
  291. else:
  292. # 发单
  293. response, error = await self._request('POST', '/v1/order/limit', body=params, auth=1)
  294. # 再更新
  295. if response:
  296. # logger.info(f'下单回报 {response}')
  297. # 增加新的
  298. if 'data' in response:
  299. order_event = dict()
  300. order_event['status'] = "NEW"
  301. order_event['client_id'] = params["client_id"]
  302. order_event['order_id'] = response['data']["id"]
  303. self.callback["onOrder"](order_event)
  304. if error:
  305. order_event = dict()
  306. order_event['status'] = "REMOVE"
  307. order_event['filled_price'] = 0.0
  308. order_event['fee'] = 0.0
  309. order_event['filled'] = 0.0
  310. order_event['client_id'] = params["client_id"]
  311. self.callback["onOrder"](order_event)
  312. return error
  313. return response
  314. async def cancel_order(self, order_id=None, client_id=None):
  315. if order_id:
  316. response, error = await self._request('DELETE', f'/v1/order/pending', params={'market': self.symbol, 'id': order_id}, auth=1)
  317. elif client_id:
  318. response, error = await self._request('DELETE', f'/v1/order/pending', params={'market': self.symbol, 'id': client_id}, auth=1)
  319. else:
  320. raise Exception("撤单出错 没指定订单号")
  321. if response:
  322. self.logger.debug(f'撤单回报 {response}')
  323. # if response["data"]['status'] in ['cancel','done']:
  324. # order_event = dict()
  325. # order_event['status'] = "REMOVE"
  326. # order_event['filled_price'] = float(response['data']['avg_price'])
  327. # asset_fee = float(response['data']["asset_fee"])
  328. # money_fee = float(response['data']["money_fee"])
  329. # stock_fee = float(response['data']["stock_fee"])
  330. # # 非amm品种 优先扣cet 其次u 再次b
  331. # # amm品种 买入收b 卖出收u
  332. # if response['data']['type'] == "sell":
  333. # # 卖出
  334. # order_event['fee'] = money_fee
  335. # elif response['data']['type'] == "buy":
  336. # # 买入
  337. # order_event['fee'] = stock_fee
  338. # order_event['filled'] = float(response['data']['amount']) - float(response['data']['left'])
  339. # order_event['client_id'] = response['data']["client_id"]
  340. # self.callback["onOrder"](order_event)
  341. if error:
  342. print("撤单失败", error)
  343. self.logger.error(error)
  344. # if client_id:await self.check_order(client_id=client_id)
  345. # if order_id:await self.check_order(order_id=order_id)
  346. return response
  347. async def check_order(self, order_id=None, client_id=None):
  348. if order_id:
  349. response, error = await self._request('GET', f'/v1/order/status', params={'market': self.symbol, 'id': order_id}, auth=1)
  350. elif client_id:
  351. response, error = await self._request('GET', f'/v1/order/status', params={'market': self.symbol, 'id': client_id}, auth=1)
  352. else:
  353. return
  354. if response:
  355. self.logger.debug(f'查单回报 {response}')
  356. order_event = dict()
  357. if response["data"]['status'] in ['not_deal', 'part_deal']:
  358. order_event['status'] = "NEW"
  359. elif response["data"]['status'] in ['cancel', 'done']:
  360. order_event['status'] = "REMOVE"
  361. else:
  362. self.logger.error("错误的订单状态")
  363. order_event['price'] = float(response["data"]["price"])
  364. order_event['amount'] = float(response["data"]["amount"])
  365. order_event['filled'] = float(
  366. response["data"]["amount"])-float(response["data"]["left"])
  367. order_event['filled_price'] = float(response["data"]["avg_price"])
  368. order_event['client_id'] = response["data"]["client_id"]
  369. order_event['order_id'] = response["data"]['id']
  370. asset_fee = float(response['data']["asset_fee"])
  371. money_fee = float(response['data']["money_fee"])
  372. stock_fee = float(response['data']["stock_fee"])
  373. # 非amm品种 优先扣cet 其次u 再次b
  374. # amm品种 买入收b 卖出收u
  375. if response['data']['type'] == "sell":
  376. # 卖出
  377. order_event['fee'] = money_fee
  378. elif response['data']['type'] == "buy":
  379. # 买入
  380. order_event['fee'] = stock_fee
  381. self.callback["onOrder"](order_event)
  382. if error:
  383. print("查单失败", error)
  384. self.logger.error(error)
  385. return response
  386. async def get_order_list(self):
  387. params = {
  388. 'market': self.symbol,
  389. 'page': 1,
  390. 'limit': 100,
  391. }
  392. response, error = await self._request('GET', '/v1/order/pending', params=params, auth=1)
  393. orders = [] # 重置本地订单列表
  394. if response is not None:
  395. for i in response['data']['data']:
  396. order_event = dict()
  397. order_event['symbol'] = self.symbol
  398. order_event['price'] = float(i["price"])
  399. order_event['amount'] = float(i["amount"])
  400. order_event['filled'] = float(i["amount"])-float(i["left"])
  401. order_event['filled_price'] = float(i["avg_price"])
  402. order_event['client_id'] = i["client_id"] if 'client_id' in i else ""
  403. order_event['order_id'] = i['id']
  404. asset_fee = float(i["asset_fee"])
  405. money_fee = float(i["money_fee"])
  406. stock_fee = float(i["stock_fee"])
  407. # 非amm品种 优先扣cet 其次u 再次b
  408. # amm品种 买入收b 卖出收u
  409. if i['type'] == "sell":
  410. # 卖出
  411. order_event['fee'] = money_fee
  412. elif i['type'] == "buy":
  413. # 买入
  414. order_event['fee'] = stock_fee
  415. if i['status'] in ['not_deal', 'part_deal']:
  416. order_event['status'] = "NEW"
  417. elif i['status'] in ['cancel', 'done']:
  418. order_event['status'] = "REMOVE"
  419. else:
  420. self.logger.error("错误的订单状态")
  421. self.callback["onOrder"](order_event)
  422. if error:
  423. print(error)
  424. return response
  425. async def get_server_time(self):
  426. params = {}
  427. response = await self._request('GET', '/api/v1/timestamp', params=params)
  428. return response
  429. async def get_account(self):
  430. return await self._request('GET', '/v1/balance/info', params={"access_id": self.params.access_key}, auth=1)
  431. async def get_market_details(self):
  432. return await self._request('GET', f'/v1/market/info', params={}, auth=0)
  433. async def get_ticker(self):
  434. res, err = await self._request('GET', f'/v1/market/depth', params={"market": self.symbol, 'merge': '0.00000001'}, auth=0)
  435. if res:
  436. ap = float(res["data"]['asks'][0][0])
  437. bp = float(res["data"]['bids'][0][0])
  438. mp = (ap+bp)*0.5
  439. d = {"name": self.name, 'mp': mp, 'bp': bp, 'ap': ap}
  440. self.callback['onTicker'](d)
  441. return d
  442. if err:
  443. self.logger.error(err)
  444. return None
  445. async def before_trade(self):
  446. # 获取市场基本情况
  447. res, error = await self.get_market_details()
  448. if error:
  449. pass
  450. else:
  451. for i in res['data']:
  452. if res['data'][i]['name'] == self.symbol:
  453. self.stepSize = float(Decimal("0.1")**Decimal(res['data'][i]["trading_decimal"]))
  454. self.tickSize = float(Decimal("0.1")**Decimal(res['data'][i]["pricing_decimal"]))
  455. #### 保存交易规则信息
  456. exchange_info = model.ExchangeInfo()
  457. exchange_info.symbol = i
  458. exchange_info.multiplier = 1
  459. exchange_info.stepSize = float(Decimal("0.1")**Decimal(res['data'][i]["trading_decimal"]))
  460. exchange_info.tickSize = float(Decimal("0.1")**Decimal(res['data'][i]["pricing_decimal"]))
  461. self.exchange_info[exchange_info.symbol] = exchange_info
  462. async def get_equity(self):
  463. # 更新账户
  464. res, err = await self.get_account()
  465. if err:
  466. print(err)
  467. if res:
  468. for i in res["data"]:
  469. if self.quote == i:
  470. self.data['equity'] = float(
  471. res['data'][i]['available'])+float(res['data'][i]['frozen'])
  472. self.callback['onEquity']({
  473. self.quote: self.data['equity']
  474. })
  475. self.cash_value = self.data['equity']
  476. elif self.base == i:
  477. coin = float(res['data'][i]['available']) + \
  478. float(res['data'][i]['frozen'])
  479. self.callback['onEquity']({
  480. self.base: coin
  481. })
  482. self.coin_value = coin
  483. async def go(self):
  484. await self.before_trade()
  485. await asyncio.sleep(1)
  486. ### 检查是否为AMMM品种
  487. try:
  488. async with aiohttp.ClientSession(connector = aiohttp.TCPConnector(
  489. limit=50,
  490. keepalive_timeout=120,
  491. verify_ssl=False,
  492. local_addr=(self.ip,0)
  493. )) as session:
  494. response = await session.get(
  495. "https://api.coinex.com/v1/amm/market",
  496. proxy=self.proxy
  497. )
  498. res = await response.json()
  499. amm_list = res['data']
  500. print(f'AMM列表{amm_list}')
  501. if self.symbol in amm_list:
  502. self.callback['onExit'](f"{self.name} coinex spot 禁止跑AMM品种")
  503. else:
  504. print(f'不是AMM品种 正常运行')
  505. except:
  506. self.logger.error(traceback.format_exc())
  507. self.callback['onExit'](f"{self.name} coinex spot AMM列表获取失败")
  508. while 1:
  509. try:
  510. # 停机信号
  511. if self.stop_flag:
  512. return
  513. # 更新账户
  514. res, err = await self.get_account()
  515. if err:
  516. print(err)
  517. if res:
  518. for i in res["data"]:
  519. if self.quote == i:
  520. self.data['equity'] = float(
  521. res['data'][i]['available']) + float(res['data'][i]['frozen'])
  522. self.callback['onEquity']({
  523. self.quote: self.data['equity']
  524. })
  525. elif self.base == i:
  526. coin = float(res['data'][i]['available']) + \
  527. float(res['data'][i]['frozen'])
  528. self.callback['onEquity']({
  529. self.base: coin
  530. })
  531. # 更新订单
  532. # res = await self.get_order_list()
  533. await asyncio.sleep(60)
  534. # 打印延迟
  535. self.get_delay_info()
  536. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  537. except:
  538. traceback.print_exc()
  539. await asyncio.sleep(10)
  540. def get_data(self):
  541. return self.data
  542. def get_delay_info(self):
  543. if len(self.delays) > 100:
  544. self.delays = self.delays[-100:]
  545. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  546. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  547. async def handle_signals(self, orders):
  548. '''执行策略指令'''
  549. try:
  550. for order_name in orders:
  551. if 'Cancel' in order_name:
  552. cid = orders[order_name][0]
  553. oid = orders[order_name][1]
  554. # 只能用oid撤单
  555. if oid:
  556. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  557. for order_name in orders:
  558. if 'Limits' in order_name:
  559. for i in orders[order_name]:
  560. asyncio.get_event_loop().create_task(self.take_order(
  561. self.symbol,
  562. i[0],
  563. i[1],
  564. i[2],
  565. i[3]
  566. ))
  567. for order_name in orders:
  568. if 'Check' in order_name:
  569. # cid = orders[order_name][0]
  570. oid = orders[order_name][1]
  571. asyncio.get_event_loop().create_task(self.check_order(order_id=oid))
  572. except Exception as e:
  573. traceback.print_exc()
  574. self.logger.error("执行信号出错"+str(e))
  575. await asyncio.sleep(0.1)