kucoin_usdt_swap_rest.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import json
  5. import hmac
  6. import base64
  7. import hashlib
  8. import traceback
  9. from urllib.parse import urljoin
  10. import sys
  11. from urllib.parse import urlparse
  12. import logging.handlers
  13. import utils
  14. import logging, logging.handlers
  15. import model
  16. from loguru import logger
  17. logger.add("kucoin_limits.log", format="{time} {level} {message}", level="INFO")
  18. def empty_call(msg):
  19. print(f'空的回调函数 {msg}')
  20. def decimal_amount(amount, d):
  21. if int(d) == 0:
  22. return str(int(amount))
  23. elif int(d) > 0:
  24. return str(round(int(amount*10**int(d))*0.1**int(d), int(d)))
  25. def decimal_price(price, d):
  26. if int(d) == 0:
  27. return str(int(price))
  28. elif int(d) > 0:
  29. return str(round(float(price), int(d)))
  30. class KucoinUsdtSwapRest:
  31. def __init__(self, params:model.ClientParams, colo=0):
  32. if colo:
  33. print('不支持colo高速线路')
  34. self.HOST = "https://api-futures.kucoin.com"
  35. else:
  36. self.HOST = "https://api-futures.kucoin.com"
  37. self.params = params
  38. self.name = self.params.name
  39. self.base = self.params.pair.split('_')[0].upper()
  40. # 处理特殊情况
  41. if self.base == "BTC":self.base = "XBT"
  42. self.quote = self.params.pair.split('_')[1].upper()
  43. self.symbol = self.base + self.quote + "M"
  44. self._SESSIONS = dict()
  45. self.logger = self.get_logger()
  46. self.callback = {
  47. "onMarket":empty_call,
  48. "onPosition":empty_call,
  49. "onOrder":empty_call,
  50. "onEquity":empty_call,
  51. "onTicker":empty_call,
  52. "onDepth":empty_call,
  53. "onExit":empty_call,
  54. }
  55. self.exchange_info = dict()
  56. self.decimal_amount = 10
  57. self.decimal_price = 10
  58. self.delays = []
  59. self.max_delay = 0
  60. self.avg_delay = 0
  61. self.multiplier = None
  62. self.proxy = None
  63. if 'win' in sys.platform:
  64. self.proxy = self.params.proxy
  65. self.logger = self.get_logger()
  66. self.coin_value = 0.0
  67. self.cash_value = 0.0
  68. self.last_cash = 0.0
  69. #### 指定发包ip
  70. iplist = utils.get_local_ip_list()
  71. self.ip = iplist[int(self.params.ip)]
  72. def get_logger(self):
  73. logger = logging.getLogger(__name__)
  74. logger.setLevel(logging.DEBUG)
  75. # log to txt
  76. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  77. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  78. handler.setLevel(logging.DEBUG)
  79. handler.setFormatter(formatter)
  80. logger.addHandler(handler)
  81. return logger
  82. def get_logger(self):
  83. logger = logging.getLogger(__name__)
  84. logger.setLevel(logging.DEBUG)
  85. # log to txt
  86. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  87. handler = logging.handlers.RotatingFileHandler("log.log",maxBytes=1024*1024,encoding='utf-8')
  88. handler.setLevel(logging.DEBUG)
  89. handler.setFormatter(formatter)
  90. # log to console
  91. console = logging.StreamHandler()
  92. console.setLevel(logging.WARNING)
  93. logger.addHandler(handler)
  94. logger.addHandler(console)
  95. return logger
  96. def _get_session(self, url):
  97. parsed_url = urlparse(url)
  98. key = parsed_url.netloc or parsed_url.hostname
  99. if key not in self._SESSIONS:
  100. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  101. session = aiohttp.ClientSession(connector=tcp)
  102. self._SESSIONS[key] = session
  103. return self._SESSIONS[key]
  104. async def _request(self, method, uri, body=None, params=None, auth=False):
  105. url = urljoin(self.HOST, uri)
  106. headers = {}
  107. if auth:
  108. now_time = int(time.time()) * 1000
  109. str_to_sign = str(now_time) + method + uri
  110. if method in ['GET', 'DELETE']:
  111. data_json = ''
  112. if params:
  113. strl = []
  114. for key in params:
  115. strl.append("{}={}".format(key, params[key]))
  116. data_json += '&'.join(strl)
  117. str_to_sign += '?' + data_json
  118. else:
  119. if body:str_to_sign += body
  120. sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest())
  121. passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest())
  122. headers = {
  123. "KC-API-SIGN": sign.decode(),
  124. "KC-API-TIMESTAMP": str(now_time),
  125. "KC-API-KEY": self.params.access_key,
  126. "KC-API-PASSPHRASE": passphrase.decode(),
  127. "Content-Type": "application/json",
  128. "KC-API-KEY-VERSION": "2"
  129. }
  130. headers["User-Agent"] = "kucoin-python-sdk/v1.0"
  131. # 发起请求
  132. session = self._get_session(url)
  133. timeout = aiohttp.ClientTimeout(10)
  134. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  135. self.logger.debug(msg)
  136. try:
  137. start_time = time.time()
  138. if method == "GET":
  139. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  140. elif method == "POST":
  141. response = await session.post(url, params=None, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  142. elif method == "DELETE":
  143. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  144. code = response.status
  145. res = await response.json()
  146. delay = int(1000*(time.time() - start_time))
  147. res_msg = msg + f' 回报 {res}'
  148. self.logger.debug(res_msg)
  149. self.delays.append(delay)
  150. if code not in (200, 201, 202, 203, 204, 205, 206) or \
  151. int(res['code']) not in (200, 201, 202, 203, 204, 205, 206, 200000):
  152. print(f'URL:{url} METHOD:{method} PARAMS:{params} body:{body} ERROR:{res}')
  153. self.logger.error('请求错误'+str(res))
  154. self.logger.error(res)
  155. return None, res
  156. return res, None
  157. except Exception as e:
  158. print(f'{self.name} rest 请求出错', str(e))
  159. self.logger.error('请求错误'+str(e))
  160. self.logger.error(traceback.format_exc())
  161. return None, e
  162. async def buy_token(self):
  163. '''买入平台币'''
  164. pass
  165. async def check_position(self, hold_coin=0.0):
  166. '''
  167. 两次执行check_position之间必须停留足够时间 避免position没及时更新 导致重复下单 已支持全品种
  168. '''
  169. try:
  170. ##############################
  171. self.logger.info("清空挂单")
  172. params = {
  173. 'status':"active",
  174. 'tradeType':'TRADE',
  175. }
  176. response, error = await self._request('GET', '/api/v1/orders', params=params, auth=1)
  177. if response is not None and response['data']['items'] is not None:
  178. for i in response['data']['items']:
  179. res = await self.cancel_order(order_id=i["id"])
  180. self.logger.info(res)
  181. self.logger.info(f"{self.name} 全平仓位")
  182. ##############################
  183. # 更新仓位
  184. print("获取仓位")
  185. res, err = await self._request('GET','/api/v1/positions', params={}, auth=1)
  186. if err:self.logger.info(err)
  187. if res:
  188. for i in res['data']:
  189. symbol = i['symbol']
  190. amt = float(i['currentQty'])
  191. # 转换为单位:张
  192. if self.exchange_info == dict():
  193. await self.before_trade()
  194. if amt > 0:
  195. self.logger.info( await self.take_order(
  196. symbol,
  197. amt*self.exchange_info[symbol].multiplier,
  198. "pd",
  199. 1,
  200. utils.get_cid(),
  201. "market"
  202. ))
  203. elif amt < 0:
  204. self.logger.info( await self.take_order(
  205. symbol,
  206. -amt*self.exchange_info[symbol].multiplier,
  207. "pk",
  208. 1,
  209. utils.get_cid(),
  210. "market"
  211. ))
  212. ##############################
  213. except:
  214. self.logger.error("清仓程序执行出错")
  215. self.logger.error(traceback.format_exc())
  216. return
  217. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='limit'):
  218. '''
  219. kumex会合并多空方向的仓位 不支持双向交易 所以reduce_only要小心使用
  220. '''
  221. reduceOnly = False
  222. if origin_side =='kd':
  223. side = 'buy'
  224. elif origin_side =='pd':
  225. side = 'sell'
  226. elif origin_side =='kk':
  227. side = 'sell'
  228. elif origin_side =='pk':
  229. side = 'buy'
  230. else:
  231. return None
  232. # 转换为单位:张
  233. if symbol not in self.exchange_info:
  234. await self.before_trade()
  235. amount_paper = int(amount/self.exchange_info[symbol].multiplier)
  236. if amount_paper == 0:
  237. order_event = dict()
  238. order_event['status'] = "REMOVE"
  239. order_event['client_id'] = cid
  240. order_event['filled'] = 0
  241. order_event['filled_price'] = 0
  242. self.callback["onOrder"](order_event)
  243. return
  244. params = {
  245. 'clientOid':cid,
  246. 'symbol': symbol,
  247. 'size': amount_paper,
  248. 'side': side,
  249. 'leverage': 10, #10, 没kyc老报错,##############################################
  250. 'reduceOnly':reduceOnly,
  251. 'price':utils.num_to_str(price, self.exchange_info[symbol].tickSize),
  252. 'type':order_type,
  253. }
  254. # logger.info(f'下单指令 {params}')
  255. if self.params.debug == 'True':
  256. return await asyncio.sleep(0.1)
  257. else:
  258. # 发单
  259. response, error = await self._request('POST', '/api/v1/orders', body=json.dumps(params), auth=1)
  260. # 更新
  261. if response:
  262. # 增加新的
  263. if 'data' in response:
  264. order_event = dict()
  265. order_event['status'] = "NEW"
  266. order_event['client_id'] = cid
  267. order_event['order_id'] = response['data']["orderId"]
  268. self.callback["onOrder"](order_event)
  269. if error:
  270. order_event = dict()
  271. order_event['status'] = "REMOVE"
  272. order_event['client_id'] = cid
  273. order_event['filled_price'] = 0
  274. order_event['filled'] = 0
  275. self.callback["onOrder"](order_event)
  276. return response
  277. async def cancel_order(self, order_id=None, client_id=None):
  278. if order_id:
  279. response, error = await self._request('DELETE', f'/api/v1/orders/{order_id}', auth=1)
  280. elif client_id:
  281. response, error = await self._request('DELETE', f'/api/v1/orders/byClientOid?clientOid={client_id}', auth=1)
  282. else:
  283. raise Exception("撤单出错 没指定订单号")
  284. if response:
  285. self.logger.debug(f'撤单回报 {response}')
  286. # 撤单成功不会返回成交信息 所以不触发回调
  287. if error:
  288. print("撤单失败",error)
  289. return error
  290. return response
  291. async def check_order(self, order_id=None, client_id=None):
  292. if order_id:
  293. response, error = await self._request('GET', f'/api/v1/orders/{order_id}', auth=1)
  294. elif client_id:
  295. response, error = await self._request('GET', f'/api/v1/orders/byClientOid?clientOid={client_id}', auth=1)
  296. else:
  297. return
  298. if response:
  299. self.logger.debug(f'查单回报 {response}')
  300. order_event = dict()
  301. if response["data"]['isActive'] == True:
  302. order_event['status'] = "NEW"
  303. elif response["data"]['isActive'] == False:
  304. order_event['status'] = "REMOVE"
  305. else:
  306. self.logger.error("错误的订单状态")
  307. if self.multiplier == None:
  308. await self.before_trade()
  309. order_event['filled'] = float(response["data"]["filledSize"])*self.multiplier
  310. order_event['filled_price'] = float(response["data"]["filledValue"])/float(response["data"]["filledSize"])/self.multiplier if float(response["data"]["filledSize"]) > 0 else 0
  311. order_event['client_id'] = response["data"]["clientOid"]
  312. order_event['order_id'] = response["data"]['id']
  313. self.callback["onOrder"](order_event)
  314. if error:
  315. print("查单失败",error)
  316. self.logger.error(error)
  317. return response
  318. async def get_order_list(self):
  319. params = {
  320. 'symbol':self.symbol,
  321. 'status':"active",
  322. 'tradeType':'TRADE',
  323. }
  324. response, error = await self._request('GET', '/api/v1/orders', params=params, auth=1)
  325. orders = [] # 重置本地订单列表
  326. if response is not None:
  327. for i in response['data']['items']:
  328. order_event = dict()
  329. if i['isActive'] == True:
  330. order_event['status'] = "NEW"
  331. elif i['isActive'] == False:
  332. order_event['status'] = "REMOVE"
  333. else:
  334. self.logger.error("错误的订单状态")
  335. order_event['filled'] = float(i["dealSize"])
  336. order_event['filled_price'] = float(i["dealFunds"])/float(i["dealSize"]) if float(i["dealSize"]) > 0 else 0
  337. order_event['client_id'] = i["clientOid"]
  338. order_event['order_id'] = i['id']
  339. self.callback["onOrder"](order_event)
  340. if error:
  341. print(error)
  342. return response
  343. async def get_server_time(self):
  344. params = {}
  345. response = await self._request('GET', '/api/v1/timestamp', params=params)
  346. return response
  347. async def get_account(self):
  348. return await self._request('GET','/api/v1/account-overview', params={"currency":self.quote}, auth=1)
  349. async def get_position(self):
  350. return await self._request('GET','/api/v1/position', params={"symbol":self.symbol}, auth=1)
  351. async def get_market_details(self):
  352. return await self._request('GET',f'api/v1/contracts/active', params={}, auth=1)
  353. async def get_ticker(self):
  354. res, err = await self._request('GET',f'api/v1/ticker', params={"symbol":self.symbol}, auth=0)
  355. if res:
  356. ap = float(res['data']['bestAskPrice'])
  357. bp = float(res['data']['bestBidPrice'])
  358. mp = (ap+bp)/2
  359. ticker = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  360. return ticker
  361. if err:
  362. self.logger.error(err)
  363. return
  364. async def before_trade(self):
  365. # 获取市场
  366. res, error = await self.get_market_details()
  367. if error:
  368. pass
  369. if res:
  370. for i in res['data']:
  371. if i['symbol'] == self.symbol:
  372. # 1张多少个币
  373. self.multiplier = float(i["multiplier"])
  374. # 1张 币的数量精度 张 转换成 币 需要乘以乘数
  375. self.stepSize = i["lotSize"]*self.multiplier
  376. self.tickSize = i['tickSize']
  377. #### 保存交易规则信息
  378. exchange_info = model.ExchangeInfo()
  379. exchange_info.symbol = i['symbol']
  380. exchange_info.multiplier = float(i["multiplier"])
  381. exchange_info.tickSize = i['tickSize']
  382. exchange_info.stepSize = i["lotSize"]*float(i["multiplier"])
  383. self.exchange_info[exchange_info.symbol] = exchange_info
  384. # 设置杠杆
  385. await self._request('GET',f'api/v1/contracts/active', params={}, auth=1)
  386. # 更新账户
  387. res, err = await self.get_position()
  388. if err:print(err)
  389. if res:
  390. amt = float(res['data']['currentQty']) * self.multiplier
  391. ep = float(res['data']['avgEntryPrice'])
  392. pos = model.Position()
  393. if amt == 0.0:
  394. pos.longPos = 0
  395. pos.longAvg = 0
  396. pos.shortPos = 0
  397. pos.shortAvg = 0
  398. elif amt > 0.0:
  399. pos.longPos = amt
  400. pos.longAvg = ep
  401. pos.shortPos = 0
  402. pos.shortAvg = 0
  403. elif amt < 0.0:
  404. pos.longPos = 0
  405. pos.longAvg = 0
  406. pos.shortPos = -amt
  407. pos.shortAvg = ep
  408. self.callback['onPosition'](pos)
  409. async def get_equity(self):
  410. # 更新账户
  411. res, err = await self.get_account()
  412. if err:print(err)
  413. if res:
  414. cash = float(res['data']['accountEquity'])
  415. if self.last_cash == 0:
  416. self.last_cash = cash
  417. self.callback['onEquity']({self.quote:cash})
  418. self.cash_value = cash
  419. async def go(self):
  420. interval = 60
  421. await self.before_trade()
  422. await asyncio.sleep(1)
  423. while 1:
  424. try:
  425. # 更新账户
  426. res, err = await self.get_account()
  427. if err:print(err)
  428. if res:
  429. if res['data']['currency'] == "USDT":
  430. cash = float(res['data']['accountEquity'])
  431. # rest有可能获取到中间态 为了避免得到错误的账户信息 需进行判断
  432. if self.last_cash == 0:
  433. self.last_cash = cash
  434. self.callback['onEquity']({self.quote:cash})
  435. else:
  436. # 判断净值是否出现大幅度偏离 两次更新之间的差值不应超过5%
  437. if abs(self.last_cash - cash)/cash < 0.05:
  438. self.last_cash = cash
  439. self.callback['onEquity']({self.quote:cash})
  440. else:
  441. # 否则舍弃本次更新
  442. pass
  443. # 更新账户
  444. res, err = await self.get_position()
  445. if err:print(err)
  446. if res:
  447. amt = float(res['data']['currentQty']) * self.multiplier
  448. ep = float(res['data']['avgEntryPrice'])
  449. pos = model.Position()
  450. if amt == 0.0:
  451. pos.longPos = 0
  452. pos.longAvg = 0
  453. pos.shortPos = 0
  454. pos.shortAvg = 0
  455. elif amt > 0.0:
  456. pos.longPos = amt
  457. pos.longAvg = ep
  458. pos.shortPos = 0
  459. pos.shortAvg = 0
  460. elif amt < 0.0:
  461. pos.longPos = 0
  462. pos.longAvg = 0
  463. pos.shortPos = -amt
  464. pos.shortAvg = ep
  465. self.callback['onPosition'](pos)
  466. await asyncio.sleep(interval)
  467. # 打印延迟
  468. self.get_delay_info()
  469. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  470. except:
  471. traceback.print_exc()
  472. await asyncio.sleep(10)
  473. def get_delay_info(self):
  474. if len(self.delays) > 100:
  475. self.delays = self.delays[-100:]
  476. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  477. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  478. async def handle_signals(self, orders):
  479. '''执行策略指令'''
  480. try:
  481. for order_name in orders:
  482. if 'Cancel' in order_name:
  483. # cid = orders[order_name][0]
  484. oid = orders[order_name][1]
  485. # kumex 只能按oid撤单
  486. if oid:
  487. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  488. for order_name in orders:
  489. if 'Limits' in order_name:
  490. for i in orders[order_name]:
  491. logger.info(i)
  492. asyncio.get_event_loop().create_task(self.take_order(
  493. self.symbol,
  494. i[0],
  495. i[1],
  496. i[2],
  497. i[3]
  498. ))
  499. if len(orders[order_name]) > 0:
  500. logger.info("")
  501. for order_name in orders:
  502. if 'Check' in order_name:
  503. # cid = orders[order_name][0]
  504. oid = orders[order_name][1]
  505. if oid:
  506. asyncio.get_event_loop().create_task(self.check_order(order_id=oid))
  507. except:
  508. # traceback.print_exc()
  509. await asyncio.sleep(0.1)