gate_spot_rest.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. import random
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json
  7. import hmac
  8. import base64
  9. import hashlib
  10. import traceback
  11. import urllib
  12. from urllib import parse
  13. from urllib.parse import urljoin
  14. import datetime, sys, utils
  15. from urllib.parse import urlparse
  16. import logging, logging.handlers
  17. import model
  18. from decimal import Decimal
  19. def empty_call(msg):
  20. print(f'空的回调函数 {msg}')
  21. class GateSpotRest:
  22. def __init__(self, params:model.ClientParams, colo=0):
  23. if colo:
  24. print('使用colo高速线路')
  25. self.HOST = 'https://apiv4-private.gateapi.io'
  26. else:
  27. self.HOST = 'https://api.gateio.ws'
  28. self.params = params
  29. self.name = self.params.name
  30. self.base = self.params.pair.split('_')[0].upper()
  31. self.quote = self.params.pair.split('_')[1].upper()
  32. self.symbol = self.base + '_' + self.quote
  33. self._SESSIONS = dict()
  34. self.callback = {
  35. "onMarket":empty_call,
  36. "onPosition":empty_call,
  37. "onOrder":empty_call,
  38. "onEquity":empty_call,
  39. "onTicker":empty_call,
  40. "onDepth":empty_call,
  41. "onExit":empty_call,
  42. }
  43. self.exchange_info = dict()
  44. self.tickSize = None
  45. self.stepSize = None
  46. self.delays = []
  47. self.max_delay = 0.0
  48. self.avg_delay = 0.0
  49. self.proxy = None
  50. if 'win' in sys.platform:
  51. self.proxy = self.params.proxy
  52. self.logger = self.get_logger()
  53. self.stop_flag = 0
  54. self.coin_value = 0.0
  55. self.cash_value = 0.0
  56. #### 指定发包ip
  57. iplist = utils.get_local_ip_list()
  58. self.ip = iplist[int(self.params.ip)]
  59. def get_logger(self):
  60. logger = logging.getLogger(__name__)
  61. logger.setLevel(logging.DEBUG)
  62. # log to txt
  63. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  64. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  65. handler.setLevel(logging.DEBUG)
  66. handler.setFormatter(formatter)
  67. logger.addHandler(handler)
  68. return logger
  69. def _get_session(self, url):
  70. parsed_url = urlparse(url)
  71. key = parsed_url.netloc or parsed_url.hostname
  72. if key not in self._SESSIONS:
  73. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  74. session = aiohttp.ClientSession(connector=tcp)
  75. self._SESSIONS[key] = session
  76. return self._SESSIONS[key]
  77. def generate_signature(self, method, uri, query_param=None, body=None):
  78. t = time.time()
  79. m = hashlib.sha512()
  80. m.update((body or "").encode('utf-8'))
  81. hashed_payload = m.hexdigest()
  82. s = '%s\n%s\n%s\n%s\n%s' % (method, uri, query_param or "", hashed_payload, t)
  83. sign = hmac.new(self.params.secret_key.encode('utf-8'), s.encode('utf-8'), hashlib.sha512).hexdigest()
  84. return {'KEY': self.params.access_key, 'Timestamp': str(t), 'SIGN': sign}
  85. async def _request(self, method, uri, body=None, params=None, auth=False):
  86. url = urljoin(self.HOST, uri)
  87. if method == "GET":
  88. headers = {
  89. "Content-type": "application/x-www-form-urlencoded",
  90. "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
  91. "Chrome/39.0.2171.71 Safari/537.36"
  92. }
  93. else:
  94. headers = {
  95. "Accept": "application/json",
  96. "Content-type": "application/json"
  97. }
  98. if auth:
  99. if method == "POST":
  100. body = json.dumps(params)
  101. query_param = None
  102. sign_headers = self.generate_signature(method, uri, query_param, body)
  103. headers.update(sign_headers)
  104. if method == "GET" or method == "DELETE":
  105. query_param = ''
  106. for i in params:
  107. query_param += f'{i}={params[i]}&'
  108. query_param = query_param[:-1]
  109. sign_headers = self.generate_signature(method, uri, query_param)
  110. headers.update(sign_headers)
  111. # 发起请求
  112. session = self._get_session(url)
  113. timeout = aiohttp.ClientTimeout(10)
  114. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  115. self.logger.debug(msg)
  116. try:
  117. start_time = time.time()
  118. if method == "GET":
  119. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  120. elif method == "POST":
  121. response = await session.post(url, params=None, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  122. elif method == "DELETE":
  123. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  124. code = response.status
  125. res = await response.json()
  126. delay = int(1000*(time.time() - start_time))
  127. self.delays.append(delay)
  128. res_msg = msg + f' 回报 {res}'
  129. self.logger.debug(res_msg)
  130. if code not in (200, 201, 202, 203, 204, 205, 206):
  131. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  132. return None, res
  133. return res, None
  134. except Exception as e:
  135. print(f'{self.name} 请求出错', e)
  136. self.logger.error('请求错误'+str(e))
  137. self.logger.error(traceback.format_exc())
  138. return None, e
  139. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='limit'):
  140. if origin_side =='kd':
  141. side = 'buy'
  142. elif origin_side =='pd':
  143. side = 'sell'
  144. elif origin_side =='kk':
  145. side = 'sell'
  146. elif origin_side =='pk':
  147. side = 'buy'
  148. else:
  149. return None
  150. if symbol not in self.exchange_info:
  151. await self.before_trade()
  152. # amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))*Decimal(str(self.exchange_info[symbol].stepSize)))
  153. # price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  154. amount = utils.fix_amount(amount, self.exchange_info[symbol].stepSize)
  155. price = utils.fix_price(price, self.exchange_info[symbol].tickSize)
  156. if amount <= 0.0 or price <= 0.0:
  157. self.logger.error(f"下单参数错误 amount:{amount} price:{price}")
  158. order_event = dict()
  159. order_event['status'] = "REMOVE"
  160. order_event['client_id'] = cid
  161. order_event['filled_price'] = 0.0
  162. order_event['filled'] = 0.0
  163. order_event['fee'] = 0.0
  164. self.callback["onOrder"](order_event)
  165. params = {
  166. 'text':cid,
  167. 'currency_pair': symbol,
  168. 'amount': utils.num_to_str(amount, self.exchange_info[symbol].stepSize),
  169. 'side': side,
  170. 'account':"spot",
  171. 'price': utils.num_to_str(price, self.exchange_info[symbol].tickSize),
  172. 'type':order_type,
  173. }
  174. # logger.info(f'下单指令 {params}')
  175. if self.params.debug == 'True':
  176. return await asyncio.sleep(0.1)
  177. else:
  178. # 发单
  179. response, error = await self._request('POST', '/api/v4/spot/orders', params=params, auth=1)
  180. if response:
  181. # 增加新的
  182. order_event = dict()
  183. order_event['status'] = "NEW"
  184. order_event['client_id'] = response["text"]
  185. order_event['order_id'] = response["id"]
  186. self.callback["onOrder"](order_event)
  187. if error:
  188. order_event = dict()
  189. order_event['status'] = "REMOVE"
  190. order_event['client_id'] = params["text"]
  191. order_event['filled_price'] = 0.0
  192. order_event['filled'] = 0.0
  193. order_event['fee'] = 0.0
  194. self.callback["onOrder"](order_event)
  195. return error
  196. return response
  197. async def check_order(self, order_id=None, client_id=None):
  198. params = {
  199. "currency_pair": self.symbol
  200. }
  201. if order_id:
  202. response, error = await self._request('GET', f'/api/v4/spot/orders/{order_id}', params=params, auth=1)
  203. elif client_id:
  204. response, error = await self._request('GET', f'/api/v4/spot/orders/{client_id}', params=params, auth=1)
  205. if response:
  206. if response['status'] in ['cancelled','closed']: # 已撤销 或 全部成交
  207. order_event = dict()
  208. order_event['client_id'] = response["text"]
  209. order_event['order_id'] = response['id']
  210. order_event['filled'] = float(response["amount"]) - float(response["left"])
  211. order_event['filled_price'] = float(response["price"])
  212. order_event['fee'] = float(response["fee"])
  213. order_event['status'] = "REMOVE"
  214. self.callback['onOrder'](order_event)
  215. else: # 还在挂单中
  216. order_event = dict()
  217. order_event['client_id'] = response["text"]
  218. order_event['order_id'] = response['id']
  219. order_event['status'] = "NEW"
  220. self.callback['onOrder'](order_event)
  221. if error:
  222. pass
  223. return response
  224. async def cancel_order(self, order_id=None, client_id=None):
  225. params = {
  226. "currency_pair": self.symbol
  227. }
  228. if order_id:
  229. response, error = await self._request('DELETE', f'/api/v4/spot/orders/{order_id}', params=params, auth=1)
  230. elif client_id:
  231. response, error = await self._request('DELETE', f'/api/v4/spot/orders/{client_id}', params=params, auth=1)
  232. if response:
  233. # rest cancel 如果有回报 可能会和ws回报 产生重复处理
  234. self.logger.debug(f'撤单回报 {response}')
  235. # if response['status'] == 'cancelled': # 已撤销
  236. # order_event = dict()
  237. # order_event['price'] = float(response["price"])
  238. # order_event['amount'] = float(response["amount"])
  239. # order_event['client_id'] = response["text"]
  240. # order_event['order_id'] = response['id']
  241. # order_event['filled'] = float(response["amount"]) - float(response["left"])
  242. # order_event['filled_price'] = float(response["price"])
  243. # order_event['fee'] = float(response["fee"])
  244. # order_event['status'] = "REMOVE"
  245. # self.callback['onOrder'](order_event)
  246. if error:
  247. return error
  248. return response
  249. async def get_order_list(self):
  250. params = {
  251. 'currency_pair':self.symbol,
  252. 'status':"open",
  253. }
  254. response, error = await self._request('GET', '/api/v4/spot/orders', params=params, auth=1)
  255. orders = [] # 重置本地订单列表
  256. if response is not None:
  257. for i in response:
  258. if i['side'] == 'buy':
  259. side = 'kd'
  260. elif i['side'] == 'sell':
  261. side = 'pd'
  262. else:
  263. raise Exception(f"{self.name} wrong side")
  264. order_event = dict()
  265. order_event['price'] = float(i["price"])
  266. order_event['amount'] = float(i["amount"])
  267. order_event['client_id'] = i["text"]
  268. order_event['order_id'] = i['id']
  269. order_event['status'] = "NEW"
  270. self.callback['onOrder'](order_event)
  271. return response
  272. async def get_server_time(self):
  273. params = {}
  274. response = await self._request('GET', '/api/v1/timestamp', params=params)
  275. return response
  276. async def get_account(self):
  277. return await self._request('GET','/api/v4/spot/accounts', params={}, auth=1)
  278. async def get_position(self):
  279. '''获取持仓 symbol: BTC-USDT'''
  280. return await self._request('POST','/linear-swap-api/v1/swap_position_info', params={'contract_code':self.symbol}, auth=1)
  281. async def get_market_details(self):
  282. return await self._request('GET',f'/api/v4/spot/currency_pairs', params={}, auth=1)
  283. async def get_ticker(self):
  284. res, err = await self._request('GET',f'/api/v4/spot/tickers', params={"currency_pair":self.symbol}, auth=1)
  285. if res:
  286. ap = float(res[0]["lowest_ask"])
  287. bp = float(res[0]["highest_bid"])
  288. mp = (ap+bp)*0.5
  289. d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  290. self.callback['onTicker'](d)
  291. return d
  292. if err:
  293. self.logger.error(err)
  294. return None
  295. async def before_trade(self):
  296. res, error = await self.get_market_details()
  297. if error:
  298. pass
  299. else:
  300. for i in res:
  301. if i['id'] == self.symbol:
  302. self.tickSize = float(Decimal("0.1")**Decimal(i['precision']))
  303. self.stepSize = float(Decimal("0.1")**Decimal(i['amount_precision']))
  304. #### 保存交易规则信息
  305. exchange_info = model.ExchangeInfo()
  306. exchange_info.symbol = i['id']
  307. exchange_info.multiplier = 1
  308. exchange_info.tickSize = float(Decimal("0.1")**Decimal(i['precision']))
  309. exchange_info.stepSize = float(Decimal("0.1")**Decimal(i['amount_precision']))
  310. self.exchange_info[exchange_info.symbol] = exchange_info
  311. async def get_equity(self):
  312. # 更新账户
  313. res, err = await self.get_account()
  314. if err:print(err)
  315. if res:
  316. for i in res:
  317. if self.quote == i['currency'].upper():
  318. cash = float(i['available']) + float(i['locked'])
  319. self.callback['onEquity']({
  320. self.quote:cash
  321. })
  322. self.cash_value = cash
  323. if i['currency'].upper() == self.base:
  324. coin = float(i['available']) + float(i['locked'])
  325. self.callback['onEquity']({
  326. self.base:coin
  327. })
  328. self.coin_value = coin
  329. async def buy_token(self):
  330. '''买入平台币'''
  331. # 获取u数量 平台币数量
  332. # 更新账户
  333. cash, token = 0.0, 0.0
  334. res, err = await self.get_account()
  335. if err:self.logger.error(err)
  336. if res:
  337. for i in res:
  338. if 'USDT' == i['currency'].upper():
  339. cash = float(i['available']) + float(i['locked'])
  340. if 'GT' == i['currency'].upper():
  341. token = float(i['available']) + float(i['locked'])
  342. self.logger.info(f"持u{cash} 持GT{token}")
  343. # 获取平台币价格
  344. res, err = await self._request('GET',f'/api/v4/spot/tickers', params={"currency_pair":'GT_USDT'}, auth=1)
  345. if err:print(err)
  346. if res:
  347. ap = float(res[0]["lowest_ask"])
  348. bp = float(res[0]["highest_bid"])
  349. mp = (ap+bp)*0.5
  350. # 判断是否需要买入
  351. token_value = token * mp
  352. if token_value < 30:
  353. self.logger.info(f"GT数量过少")
  354. if cash > 200:
  355. self.logger.info(f"准备买入GT")
  356. # 下单买入50uGT
  357. res = await self.take_order("GT_USDT", 50/mp, "kd", mp*1.001, "t-888", "limit")
  358. self.logger.info(res)
  359. else:
  360. self.logger.warning(f"现金不足 无法买入GT")
  361. else:
  362. self.logger.info(f"GT数量充足")
  363. async def check_position(self, hold_coin=0.0):
  364. '''
  365. 清空挂单清空仓位
  366. '''
  367. try:
  368. #############################
  369. self.logger.info("获取挂单")
  370. params = {
  371. }
  372. response, error = await self._request('GET', '/api/v4/spot/open_orders', params=params, auth=1)
  373. if response is not None:
  374. for i in response:
  375. params = {
  376. "currency_pair": i['currency_pair']
  377. }
  378. for j in i['orders']:
  379. oid = j['id']
  380. r, e = await self._request('DELETE', f'/api/v4/spot/orders/{oid}', params=params, auth=1)
  381. print(r,e)
  382. if error:
  383. self.logger.info(error)
  384. #############################
  385. res, err = await self.get_account()
  386. if err:self.logger.info(err)
  387. if res:
  388. coin = 0.0
  389. for i in res:
  390. coin_name = i['currency'].upper()
  391. if coin_name in ['GT','USDT', 'POINT']:
  392. continue
  393. symbol = coin_name + '_USDT'
  394. if coin_name == self.base:
  395. _hold_coin = hold_coin
  396. else:
  397. _hold_coin = 0
  398. coin = float(i['available']) + float(i['locked'])
  399. #################
  400. ticker, _ = await self._request('GET',f'/api/v4/spot/tickers', params={"currency_pair":symbol}, auth=1)
  401. if ticker:
  402. ap = float(ticker[0]["lowest_ask"])
  403. bp = float(ticker[0]["highest_bid"])
  404. mp = (ap+bp)*0.5
  405. #################
  406. coin_value = coin * mp
  407. diff = _hold_coin - coin_value
  408. diff *= 0.99 # 避免无法下单
  409. self.logger.info(f'{symbol}需要调整现货仓位{diff}usd')
  410. if diff > 20.0:
  411. res = await self.take_order(symbol, diff/mp, "kd", mp*1.001, "t-123", "limit")
  412. elif diff < -20.0:
  413. res = await self.take_order(symbol, -diff/mp, "kk", mp*0.999, "t-123", "limit")
  414. #############################
  415. params = {
  416. 'currency_pair':self.symbol,
  417. 'status':"open",
  418. }
  419. response, error = await self._request('GET', '/api/v4/spot/orders', params=params, auth=1)
  420. if response is not None:
  421. for i in response:
  422. await self.cancel_order(order_id=i['id'])
  423. await self.cancel_order(client_id=i['text'])
  424. if error:
  425. self.logger.info(error)
  426. except:
  427. self.logger.error("清仓程序执行出错")
  428. self.logger.error(traceback.format_exc())
  429. return
  430. async def go(self):
  431. await self.before_trade()
  432. await asyncio.sleep(1)
  433. while 1:
  434. try:
  435. # 停机信号
  436. if self.stop_flag:return
  437. # 更新账户
  438. res, err = await self.get_account()
  439. if err:print(err)
  440. if res:
  441. for i in res:
  442. if self.quote == i['currency'].upper():
  443. cash = float(i['available']) + float(i['locked'])
  444. self.callback['onEquity']({
  445. self.quote:cash
  446. })
  447. self.cash_value = cash
  448. if i['currency'].upper() == self.base:
  449. coin = float(i['available']) + float(i['locked'])
  450. self.callback['onEquity']({
  451. self.base:coin
  452. })
  453. self.coin_value = coin
  454. # 更新订单
  455. # res = await self.get_order_list()
  456. await asyncio.sleep(60)
  457. # 打印延迟
  458. self.get_delay_info()
  459. self.logger.debug(f'{self.name} rest报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  460. except:
  461. traceback.print_exc()
  462. await asyncio.sleep(10)
  463. def get_delay_info(self):
  464. if len(self.delays) > 100:
  465. self.delays = self.delays[-100:]
  466. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  467. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  468. async def handle_signals(self, orders):
  469. '''执行策略指令'''
  470. try:
  471. for order_name in orders:
  472. if 'Cancel' in order_name:
  473. cid = orders[order_name][0]
  474. oid = orders[order_name][1]
  475. # gate优先用oid撤单
  476. if oid:
  477. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  478. elif cid:
  479. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  480. for order_name in orders:
  481. if 'Limits' in order_name:
  482. for i in orders[order_name]:
  483. asyncio.get_event_loop().create_task(self.take_order(
  484. self.symbol,
  485. i[0],
  486. i[1],
  487. i[2],
  488. i[3],
  489. ))
  490. for order_name in orders:
  491. if 'Check' in order_name:
  492. # gate优先用oid查单
  493. cid = orders[order_name][0]
  494. oid = orders[order_name][1]
  495. if oid:
  496. asyncio.get_event_loop().create_task(self.check_order(order_id=oid))
  497. elif cid:
  498. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  499. except:
  500. # traceback.print_exc()
  501. await asyncio.sleep(0.1)