kucoin_spot_rest.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. import random
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json
  7. import hmac
  8. import base64
  9. import hashlib
  10. import traceback
  11. import urllib
  12. from urllib import parse
  13. from urllib.parse import urljoin
  14. import datetime, sys
  15. from urllib.parse import urlparse
  16. import logging, logging.handlers
  17. import utils
  18. import logging, logging.handlers
  19. import model
  20. from decimal import Decimal
  21. def empty_call(msg):
  22. print(f'空的回调函数 {msg}')
  23. class KucoinSpotRest:
  24. def __init__(self, params:model.ClientParams, colo=0):
  25. if colo:
  26. print('不支持colo高速线路')
  27. self.HOST = 'https://api.kucoin.com'
  28. else:
  29. self.HOST = 'https://api.kucoin.com'
  30. self.params = params
  31. self.name = self.params.name
  32. self.base = self.params.pair.split('_')[0].upper()
  33. self.quote = self.params.pair.split('_')[1].upper()
  34. self.symbol = self.base + '-' + self.quote
  35. self.data = {}
  36. self._SESSIONS = dict()
  37. self.logger = self.get_logger()
  38. self.data['account'] = {}
  39. self.callback = {
  40. "onMarket":empty_call,
  41. "onPosition":empty_call,
  42. "onOrder":empty_call,
  43. "onEquity":empty_call,
  44. "onTicker":empty_call,
  45. "onDepth":empty_call,
  46. "onExit":empty_call,
  47. }
  48. self.exchange_info = dict()
  49. self.tickSize = None
  50. self.stepSize = None
  51. self.delays = []
  52. self.max_delay = 0
  53. self.avg_delay = 0
  54. self.proxy = None
  55. if 'win' in sys.platform:
  56. self.proxy = self.params.proxy
  57. self.logger = self.get_logger()
  58. self.mp_from_rest = None
  59. self.stop_flag = 0
  60. self.coin_value = 0.0
  61. self.cash_value = 0.0
  62. #### 指定发包ip
  63. iplist = utils.get_local_ip_list()
  64. self.ip = iplist[int(self.params.ip)]
  65. def get_logger(self):
  66. logger = logging.getLogger(__name__)
  67. logger.setLevel(logging.DEBUG)
  68. # log to txt
  69. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  70. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  71. handler.setLevel(logging.DEBUG)
  72. handler.setFormatter(formatter)
  73. logger.addHandler(handler)
  74. return logger
  75. def get_logger(self):
  76. logger = logging.getLogger(__name__)
  77. logger.setLevel(logging.DEBUG)
  78. # log to txt
  79. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  80. handler = logging.handlers.RotatingFileHandler("log.log",maxBytes=1024*1024,encoding='utf-8')
  81. handler.setLevel(logging.DEBUG)
  82. handler.setFormatter(formatter)
  83. # log to console
  84. console = logging.StreamHandler()
  85. console.setLevel(logging.WARNING)
  86. logger.addHandler(handler)
  87. logger.addHandler(console)
  88. return logger
  89. def _get_session(self, url):
  90. parsed_url = urlparse(url)
  91. key = parsed_url.netloc or parsed_url.hostname
  92. if key not in self._SESSIONS:
  93. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  94. session = aiohttp.ClientSession(connector=tcp)
  95. self._SESSIONS[key] = session
  96. return self._SESSIONS[key]
  97. async def _request(self, method, uri, body=None, params=None, auth=False):
  98. url = urljoin(self.HOST, uri)
  99. headers = {}
  100. if auth:
  101. now_time = int(time.time()) * 1000
  102. str_to_sign = str(now_time) + method + uri
  103. if method in ['GET', 'DELETE']:
  104. data_json = ''
  105. if params:
  106. strl = []
  107. for key in params:
  108. strl.append("{}={}".format(key, params[key]))
  109. data_json += '&'.join(strl)
  110. str_to_sign += '?' + data_json
  111. else:
  112. if body:str_to_sign += body
  113. sign = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256).digest())
  114. passphrase = base64.b64encode(hmac.new(self.params.secret_key.encode('utf-8'), self.params.pass_key.encode('utf-8'), hashlib.sha256).digest())
  115. headers = {
  116. "KC-API-SIGN": sign.decode(),
  117. "KC-API-TIMESTAMP": str(now_time),
  118. "KC-API-KEY": self.params.access_key,
  119. "KC-API-PASSPHRASE": passphrase.decode(),
  120. "Content-Type": "application/json",
  121. "KC-API-KEY-VERSION": "2"
  122. }
  123. headers["User-Agent"] = "kucoin-python-sdk/v1.0"
  124. # 发起请求
  125. session = self._get_session(url)
  126. timeout = aiohttp.ClientTimeout(10)
  127. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  128. self.logger.debug(msg)
  129. try:
  130. start_time = time.time()
  131. if method == "GET":
  132. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  133. elif method == "POST":
  134. response = await session.post(url, params=None, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  135. elif method == "DELETE":
  136. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  137. code = response.status
  138. res = await response.json()
  139. delay = int(1000*(time.time() - start_time))
  140. self.delays.append(delay)
  141. self.get_delay_info()
  142. res_msg = msg + f' 回报 {res}'
  143. self.logger.debug(res_msg)
  144. if code not in (200, 201, 202, 203, 204, 205, 206) or \
  145. int(res['code']) not in (200, 201, 202, 203, 204, 205, 206, 200000):
  146. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  147. return None, res
  148. return res, None
  149. except Exception as e:
  150. print(f'{self.name} rest 请求出错', str(e))
  151. self.logger.error('请求错误'+str(e))
  152. self.logger.error(traceback.format_exc())
  153. return None, e
  154. async def buy_token(self):
  155. '''买入平台币'''
  156. pass
  157. async def check_position(self, hold_coin=0.0):
  158. '''
  159. 现货交易 已支持全品种
  160. '''
  161. try:
  162. #######################
  163. self.logger.info("清空挂单")
  164. params = {
  165. 'status':"active",
  166. 'tradeType':'TRADE',
  167. 'type':'limit'
  168. }
  169. response, error = await self._request('GET', '/api/v1/orders', params=params, auth=1)
  170. if response is not None:
  171. for i in response['data']['items']:
  172. res = await self.cancel_order(order_id=i["id"])
  173. self.logger.info(res)
  174. #######################
  175. self.logger.info("现货全平仓位")
  176. # 更新账户
  177. res, err = await self.get_account()
  178. if err:self.logger.info(err)
  179. if res:
  180. for i in res["data"]:
  181. if i['type'] != 'trade':
  182. continue
  183. coin_name = i['currency']
  184. symbol = coin_name + '-USDT'
  185. if coin_name in ['USDT','KCS']:
  186. continue
  187. if coin_name == self.base:
  188. _hold_coin = hold_coin
  189. else:
  190. _hold_coin = 0
  191. coin = float(i['balance'])
  192. #######################
  193. ticker ,_ = await self._request('GET',f'/api/v1/market/orderbook/level1', params={"symbol":symbol}, auth=1)
  194. if ticker:
  195. ap = float(ticker["data"]["bestAsk"])
  196. bp = float(ticker["data"]["bestBid"])
  197. mp = (ap+bp)*0.5
  198. else:
  199. continue
  200. coin_value = coin * mp
  201. diff = _hold_coin - coin_value
  202. diff *= 0.99 # 避免无法下单
  203. self.logger.info(f'需要调整现货仓位{diff}usd')
  204. if diff > 20.0:
  205. self.logger.info( await self.take_order(
  206. symbol,
  207. diff/mp,
  208. "kd",
  209. 1,
  210. utils.get_cid(),
  211. "market"
  212. ))
  213. elif diff < -20.0:
  214. self.logger.info( await self.take_order(
  215. symbol,
  216. -diff/mp,
  217. "kk",
  218. 1,
  219. utils.get_cid(),
  220. "market"
  221. ))
  222. # #######################
  223. except:
  224. self.logger.error("清仓程序执行出错")
  225. self.logger.error(traceback.format_exc())
  226. return
  227. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='limit'):
  228. if origin_side =='kd':
  229. side = 'buy'
  230. elif origin_side =='pd':
  231. side = 'sell'
  232. elif origin_side =='kk':
  233. side = 'sell'
  234. elif origin_side =='pk':
  235. side = 'buy'
  236. else:
  237. print("现货不允许此交易方向")
  238. return None
  239. if symbol not in self.exchange_info:
  240. await self.before_trade()
  241. # amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))*Decimal(str(self.exchange_info[symbol].stepSize)))
  242. # price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  243. amount = utils.fix_amount(amount, self.exchange_info[symbol].stepSize)
  244. price = utils.fix_price(price, self.exchange_info[symbol].tickSize)
  245. if amount <= 0:
  246. self.logger.error(f'下单参数错误 amount:{amount}')
  247. order_event = dict()
  248. order_event['status'] = "REMOVE"
  249. order_event['filled_price'] = 0.0
  250. order_event['fee'] = 0.0
  251. order_event['filled'] = 0.0
  252. order_event['client_id'] = cid
  253. self.callback["onOrder"](order_event)
  254. return None
  255. if price <= 0:
  256. self.logger.error(f'下单参数错误 price:{price}')
  257. order_event = dict()
  258. order_event['status'] = "REMOVE"
  259. order_event['filled_price'] = 0.0
  260. order_event['fee'] = 0.0
  261. order_event['filled'] = 0.0
  262. order_event['client_id'] = cid
  263. self.callback["onOrder"](order_event)
  264. return None
  265. params = {
  266. 'clientOid':cid,
  267. 'symbol': symbol,
  268. 'size':utils.num_to_str(amount, self.exchange_info[symbol].stepSize),
  269. 'side': side,
  270. 'price':utils.num_to_str(price, self.exchange_info[symbol].tickSize),
  271. 'type':order_type,
  272. }
  273. # logger.info(f'下单指令 {params}')
  274. if self.params.debug == 'True':
  275. return await asyncio.sleep(0.1)
  276. else:
  277. # 发单
  278. response, error = await self._request('POST', '/api/v1/orders', body=json.dumps(params), auth=1)
  279. # 再更新
  280. if response:
  281. # logger.info(f'下单回报 {response}')
  282. # 增加新的
  283. if 'data' in response:
  284. order_event = dict()
  285. order_event['status'] = "NEW"
  286. order_event['client_id'] = params["clientOid"]
  287. order_event['order_id'] = response['data']["orderId"]
  288. self.callback["onOrder"](order_event)
  289. if error:
  290. order_event = dict()
  291. order_event['status'] = "REMOVE"
  292. order_event['filled_price'] = 0.0
  293. order_event['fee'] = 0.0
  294. order_event['filled'] = 0.0
  295. order_event['client_id'] = params["clientOid"]
  296. self.callback["onOrder"](order_event)
  297. return response
  298. async def cancel_order(self, order_id=None, client_id=None):
  299. if order_id:
  300. response, error = await self._request('DELETE', f'/api/v1/orders/{order_id}', auth=1)
  301. elif client_id:
  302. response, error = await self._request('DELETE', f'/api/v1/order/client-order/{client_id}', auth=1)
  303. else:
  304. raise Exception("撤单出错 没指定订单号")
  305. if response:
  306. self.logger.debug(f'撤单回报 {response}')
  307. # 撤单成功不会返回成交信息 所以不触发回调
  308. if error:
  309. return error
  310. # print("撤单失败",error)
  311. # self.logger.error(error)
  312. # if client_id:await self.check_order(client_id=client_id)
  313. # if order_id:await self.check_order(order_id=order_id)
  314. return response
  315. async def check_order(self, order_id=None, client_id=None):
  316. if order_id:
  317. response, error = await self._request('GET', f'/api/v1/orders/{order_id}', auth=1)
  318. elif client_id:
  319. response, error = await self._request('GET', f'/api/v1/order/client-order/{client_id}', auth=1)
  320. else:
  321. return
  322. if response:
  323. self.logger.debug(f'查单回报 {response}')
  324. order_event = dict()
  325. if response["data"]['isActive'] == True:
  326. order_event['status'] = "NEW"
  327. elif response["data"]['isActive'] == False:
  328. order_event['status'] = "REMOVE"
  329. else:
  330. self.logger.error("错误的订单状态")
  331. order_event['price'] = float(response["data"]["price"])
  332. order_event['amount'] = float(response["data"]["size"])
  333. order_event['filled'] = float(response["data"]["dealSize"])
  334. order_event['filled_price'] = float(response["data"]["dealFunds"])/float(response["data"]["dealSize"]) if float(response["data"]["dealSize"]) > 0 else 0
  335. order_event['client_id'] = response["data"]["clientOid"]
  336. order_event['order_id'] = response["data"]['id']
  337. order_event['fee'] = float(response["fee"]) if "fee" in response else 0.0
  338. self.callback["onOrder"](order_event)
  339. if error:
  340. print("查单失败",error)
  341. self.logger.error(error)
  342. return response
  343. async def get_order_list(self):
  344. params = {
  345. 'symbol':self.symbol,
  346. 'status':"active",
  347. 'tradeType':'TRADE',
  348. 'type':'limit'
  349. }
  350. response, error = await self._request('GET', '/api/v1/orders', params=params, auth=1)
  351. orders = [] # 重置本地订单列表
  352. if response is not None:
  353. for i in response['data']['items']:
  354. order_event = dict()
  355. order_event['symbol'] = self.symbol
  356. order_event['price'] = float(i["price"])
  357. order_event['amount'] = float(i["size"])
  358. order_event['filled'] = float(i["dealSize"])
  359. order_event['filled_price'] = float(i["dealFunds"])/float(i["dealSize"]) if float(i["dealSize"]) > 0 else 0
  360. order_event['client_id'] = i["clientOid"]
  361. order_event['order_id'] = i['id']
  362. order_event['fee'] = float(i["fee"]) if "fee" in i else 0.0
  363. if i['isActive'] == True:
  364. order_event['status'] = "NEW"
  365. elif i['isActive'] == False:
  366. order_event['status'] = "REMOVE"
  367. else:
  368. self.logger.error("错误的订单状态")
  369. self.callback["onOrder"](order_event)
  370. if error:
  371. print(error)
  372. return response
  373. async def get_server_time(self):
  374. params = {}
  375. response = await self._request('GET', '/api/v1/timestamp', params=params)
  376. return response
  377. async def get_account(self):
  378. return await self._request('GET','/api/v1/accounts', body={"type":"trade","currency":self.base}, auth=1)
  379. async def get_market_details(self):
  380. return await self._request('GET',f'/api/v1/symbols', params={}, auth=1)
  381. async def get_ticker(self):
  382. res ,err = await self._request('GET',f'/api/v1/market/orderbook/level1', params={"symbol":self.symbol}, auth=1)
  383. if res:
  384. ap = float(res["data"]["bestAsk"])
  385. bp = float(res["data"]["bestBid"])
  386. mp = (ap+bp)*0.5
  387. d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  388. self.callback['onTicker'](d)
  389. return d
  390. if err:
  391. self.logger.error(err)
  392. return None
  393. async def before_trade(self):
  394. # 获取市场最新价格
  395. res = await self.get_ticker()
  396. ticker_price = res["mp"]
  397. if isinstance(ticker_price, float):
  398. self.mp_from_rest = ticker_price
  399. # 获取市场基本情况
  400. res, error = await self.get_market_details()
  401. if error:
  402. pass
  403. else:
  404. for i in res['data']:
  405. if i['symbol'] == self.symbol:
  406. self.stepSize = float(i["baseIncrement"])
  407. self.tickSize = float(i["priceIncrement"])
  408. #### 保存交易规则信息
  409. exchange_info = model.ExchangeInfo()
  410. exchange_info.symbol = i['symbol']
  411. exchange_info.multiplier = 1
  412. exchange_info.tickSize = float(i["priceIncrement"])
  413. exchange_info.stepSize = float(i["baseIncrement"])
  414. self.exchange_info[exchange_info.symbol] = exchange_info
  415. async def get_equity(self):
  416. # 更新账户
  417. res, err = await self.get_account()
  418. if err:print(err)
  419. if res:
  420. for i in res["data"]:
  421. if 'USDT' == i['currency'] and i['type'] == 'trade':
  422. self.data['equity'] = float(i['balance'])
  423. self.callback['onEquity']({
  424. self.quote:self.data['equity']
  425. })
  426. self.cash_value = self.data['equity']
  427. if i['currency'] == self.base and i['type'] == 'trade':
  428. coin = float(i['balance'])
  429. self.callback['onEquity']({
  430. self.base:coin
  431. })
  432. self.coin_value = coin
  433. async def go(self):
  434. await self.before_trade()
  435. await asyncio.sleep(1)
  436. while 1:
  437. try:
  438. # 停机信号
  439. if self.stop_flag:return
  440. # 更新账户
  441. res, err = await self.get_account()
  442. if err:print(err)
  443. if res:
  444. for i in res["data"]:
  445. if self.quote == i['currency'] and i['type'] == 'trade':
  446. self.data['equity'] = float(i['balance'])
  447. self.callback['onEquity']({
  448. self.quote:self.data['equity']
  449. })
  450. if i['currency'] == self.base and i['type'] == 'trade':
  451. coin = float(i['balance'])
  452. self.callback['onEquity']({
  453. self.base:coin
  454. })
  455. # 更新订单
  456. # res = await self.get_order_list()
  457. await asyncio.sleep(60)
  458. # 打印延迟
  459. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  460. # 更新rest最新价格 用于风控 开启可能会干扰ws推送的价格
  461. # res = await self.get_ticker()
  462. # ticker_price = res["mp"]
  463. # if isinstance(ticker_price, float):
  464. # self.mp_from_rest = ticker_price
  465. except:
  466. traceback.print_exc()
  467. await asyncio.sleep(10)
  468. def get_data(self):
  469. return self.data
  470. def get_delay_info(self):
  471. if len(self.delays) > 100:
  472. self.delays = self.delays[-100:]
  473. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  474. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  475. async def handle_signals(self, orders):
  476. '''执行策略指令'''
  477. try:
  478. for order_name in orders:
  479. if 'Cancel' in order_name:
  480. cid = orders[order_name][0]
  481. oid = orders[order_name][1]
  482. if cid:
  483. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  484. elif oid:
  485. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  486. for order_name in orders:
  487. if 'Limits' in order_name:
  488. for i in orders[order_name]:
  489. asyncio.get_event_loop().create_task(self.take_order(
  490. self.symbol,
  491. i[0],
  492. i[1],
  493. i[2],
  494. i[3]
  495. ))
  496. for order_name in orders:
  497. if 'Check' in order_name:
  498. cid = orders[order_name][0]
  499. oid = orders[order_name][1]
  500. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  501. except Exception as e:
  502. traceback.print_exc()
  503. self.logger.error("执行信号出错"+str(e))
  504. await asyncio.sleep(0.1)