gate_usdt_swap_rest.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. import random
  2. import aiohttp
  3. import time
  4. import asyncio
  5. import zlib
  6. import json, ujson
  7. import hmac
  8. import base64
  9. import hashlib
  10. import traceback
  11. import urllib
  12. from urllib import parse
  13. from urllib.parse import urljoin
  14. import datetime, sys, utils
  15. from urllib.parse import urlparse
  16. import logging, logging.handlers
  17. import model
  18. from decimal import Decimal
  19. def empty_call(msg):
  20. print(f'空的回调函数 {msg}')
  21. class GateUsdtSwapRest:
  22. def __init__(self, params:model.ClientParams, colo=0):
  23. if colo:
  24. print('使用colo高速线路')
  25. self.HOST = 'https://apiv4-private.gateapi.io'
  26. else:
  27. self.HOST = 'https://api.gateio.ws'
  28. self.params = params
  29. self.name = self.params.name
  30. self.base = self.params.pair.split('_')[0].upper()
  31. self.quote = self.params.pair.split('_')[1].upper()
  32. self.symbol = self.base + '_' + self.quote
  33. self._SESSIONS = dict()
  34. self.callback = {
  35. "onMarket":empty_call,
  36. "onPosition":empty_call,
  37. "onOrder":empty_call,
  38. "onEquity":empty_call,
  39. "onTicker":empty_call,
  40. "onDepth":empty_call,
  41. "onExit":empty_call,
  42. }
  43. self.exchange_info = dict()
  44. self.tickSize = None
  45. self.stepSize = None
  46. self.delays = []
  47. self.max_delay = 0.0
  48. self.avg_delay = 0.0
  49. self.proxy = None
  50. if 'win' in sys.platform:
  51. self.proxy = self.params.proxy
  52. self.logger = self.get_logger()
  53. self.stop_flag = 0
  54. self.coin_value = 0.0
  55. self.cash_value = 0.0
  56. self.multiplier = None
  57. #### 指定发包ip
  58. iplist = utils.get_local_ip_list()
  59. self.ip = iplist[int(self.params.ip)]
  60. def get_logger(self):
  61. logger = logging.getLogger(__name__)
  62. logger.setLevel(logging.DEBUG)
  63. # log to txt
  64. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  65. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  66. handler.setLevel(logging.DEBUG)
  67. handler.setFormatter(formatter)
  68. logger.addHandler(handler)
  69. return logger
  70. def _get_session(self, url):
  71. parsed_url = urlparse(url)
  72. key = parsed_url.netloc or parsed_url.hostname
  73. if key not in self._SESSIONS:
  74. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  75. session = aiohttp.ClientSession(connector=tcp)
  76. self._SESSIONS[key] = session
  77. return self._SESSIONS[key]
  78. def generate_signature(self, method, uri, query_param=None, body=None):
  79. t = time.time()
  80. m = hashlib.sha512()
  81. m.update((body or "").encode('utf-8'))
  82. hashed_payload = m.hexdigest()
  83. s = '%s\n%s\n%s\n%s\n%s' % (method, uri, query_param or "", hashed_payload, t)
  84. sign = hmac.new(self.params.secret_key.encode('utf-8'), s.encode('utf-8'), hashlib.sha512).hexdigest()
  85. return {'KEY': self.params.access_key, 'Timestamp': str(t), 'SIGN': sign}
  86. async def _request(self, method, uri, body=None, params=None, auth=False):
  87. url = urljoin(self.HOST, uri)
  88. if method == "GET":
  89. headers = {
  90. "Content-type": "application/x-www-form-urlencoded",
  91. "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
  92. "Chrome/39.0.2171.71 Safari/537.36"
  93. }
  94. else:
  95. headers = {
  96. "Accept": "application/json",
  97. "Content-type": "application/json"
  98. }
  99. if auth:
  100. if method == "POST":
  101. query_param = ''
  102. if params:
  103. for i in params:
  104. query_param += f'{i}={params[i]}&'
  105. query_param = query_param[:-1]
  106. url += "?"+ query_param
  107. if body:
  108. body = ujson.dumps(body)
  109. sign_headers = self.generate_signature(method, uri, query_param, body)
  110. headers.update(sign_headers)
  111. if method == "GET" or method == "DELETE":
  112. query_param = ''
  113. for i in params:
  114. query_param += f'{i}={params[i]}&'
  115. query_param = query_param[:-1]
  116. sign_headers = self.generate_signature(method, uri, query_param)
  117. headers.update(sign_headers)
  118. # 发起请求
  119. session = self._get_session(url)
  120. timeout = aiohttp.ClientTimeout(10)
  121. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  122. self.logger.debug(msg)
  123. try:
  124. start_time = time.time()
  125. if method == "GET":
  126. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  127. elif method == "POST":
  128. response = await session.post(url, params=None, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  129. elif method == "DELETE":
  130. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  131. code = response.status
  132. res = await response.json()
  133. delay = int(1000*(time.time() - start_time))
  134. self.delays.append(delay)
  135. self.get_delay_info()
  136. res_msg = msg + f' 回报 {res}'
  137. self.logger.debug(res_msg)
  138. if code not in (200, 201, 202, 203, 204, 205, 206):
  139. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  140. return None, res
  141. return res, None
  142. except Exception as e:
  143. print(f'{self.name} 请求出错', e)
  144. self.logger.error('请求错误'+str(e))
  145. self.logger.error(traceback.format_exc())
  146. return None, e
  147. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='limit'):
  148. '''
  149. 传入单位是张 内部转换为币
  150. '''
  151. if origin_side =='kd':
  152. side = 'buy'
  153. reduce_only = False
  154. elif origin_side =='pd':
  155. side = 'sell'
  156. reduce_only = True
  157. elif origin_side =='kk':
  158. side = 'sell'
  159. reduce_only = False
  160. elif origin_side =='pk':
  161. side = 'buy'
  162. reduce_only = True
  163. else:
  164. return None
  165. amount = int(amount/self.exchange_info[symbol].multiplier) # 币转换为张
  166. if amount <= 0.0 or price <= 0.0:
  167. self.logger.error(f"下单参数错误 amount:{amount} price:{price}")
  168. order_event = dict()
  169. order_event['status'] = "REMOVE"
  170. order_event['client_id'] = cid
  171. order_event['filled_price'] = 0.0
  172. order_event['filled'] = 0.0
  173. order_event['fee'] = 0.0
  174. self.callback["onOrder"](order_event)
  175. if side == 'sell':
  176. amount = -amount
  177. params = {
  178. 'text':cid,
  179. 'contract': symbol,
  180. 'size': amount,
  181. 'reduce_only':reduce_only,
  182. 'price': utils.num_to_str(price, self.exchange_info[symbol].tickSize),
  183. 'type':order_type,
  184. }
  185. # logger.info(f'下单指令 {params}')
  186. if self.params.debug == 'True':
  187. return await asyncio.sleep(0.1)
  188. else:
  189. # 发单
  190. response, error = await self._request('POST', '/api/v4/futures/usdt/orders', body=params, auth=1)
  191. if response:
  192. # 增加新的
  193. order_event = dict()
  194. order_event['status'] = "NEW"
  195. order_event['client_id'] = response["text"]
  196. order_event['order_id'] = response["id"]
  197. self.callback["onOrder"](order_event)
  198. if error:
  199. order_event = dict()
  200. order_event['status'] = "REMOVE"
  201. order_event['client_id'] = params["text"]
  202. order_event['filled_price'] = 0.0
  203. order_event['filled'] = 0.0
  204. order_event['fee'] = 0.0
  205. self.callback["onOrder"](order_event)
  206. return error
  207. return response
  208. async def check_order(self, order_id=None, client_id=None):
  209. params = {}
  210. if order_id:
  211. response, error = await self._request('GET', f'/api/v4/futures/usdt/orders/{order_id}', params=params, auth=1)
  212. elif client_id:
  213. response, error = await self._request('GET', f'/api/v4/futures/usdt/orders/{client_id}', params=params, auth=1)
  214. if response:
  215. if response['status'] in ['cancelled','closed','finished']: # 已撤销 或 全部成交
  216. order_event = dict()
  217. order_event['client_id'] = response["text"]
  218. order_event['order_id'] = response['id']
  219. order_event['filled'] = (abs(float(response["size"])) - abs(float(response["left"])))*self.multiplier
  220. order_event['filled_price'] = float(response["price"])
  221. order_event['fee'] = 0.0
  222. order_event['status'] = "REMOVE"
  223. self.callback['onOrder'](order_event)
  224. elif response['status'] in ['open']: # 还在挂单中
  225. order_event = dict()
  226. order_event['client_id'] = response["text"]
  227. order_event['order_id'] = response['id']
  228. order_event['status'] = "NEW"
  229. self.callback['onOrder'](order_event)
  230. if error:
  231. pass
  232. return response
  233. async def cancel_order(self, order_id=None, client_id=None):
  234. params = {
  235. "currency_pair": self.symbol
  236. }
  237. if order_id:
  238. response, error = await self._request('DELETE', f'/api/v4/futures/usdt/orders/{order_id}', params=params, auth=1)
  239. elif client_id:
  240. response, error = await self._request('DELETE', f'/api/v4/futures/usdt/orders/{client_id}', params=params, auth=1)
  241. if response:
  242. pass
  243. # self.logger.info(f'撤单回报 {response}')
  244. # if response['status'] == 'cancelled': # 已撤销
  245. # order_event = dict()
  246. # order_event['price'] = float(response["price"])
  247. # order_event['amount'] = float(response["amount"])
  248. # order_event['client_id'] = response["text"]
  249. # order_event['order_id'] = response['id']
  250. # order_event['filled'] = float(response["amount"]) - float(response["left"])
  251. # order_event['filled_price'] = float(response["price"])
  252. # order_event['fee'] = float(response["fee"])
  253. # order_event['status'] = "REMOVE"
  254. # self.callback['onOrder'](order_event)
  255. if error:
  256. return error
  257. return response
  258. async def get_order_list(self):
  259. params = {
  260. 'currency_pair':self.symbol,
  261. 'status':"open",
  262. }
  263. response, error = await self._request('GET', '/api/v4/futures/usdt/orders', params=params, auth=1)
  264. orders = [] # 重置本地订单列表
  265. if response is not None:
  266. for i in response:
  267. if i['side'] == 'buy':
  268. side = 'kd'
  269. elif i['side'] == 'sell':
  270. side = 'pd'
  271. else:
  272. raise Exception(f"{self.name} wrong side")
  273. order_event = dict()
  274. order_event['price'] = float(i["price"])
  275. order_event['amount'] = float(i["size"])*self.multiplier
  276. order_event['client_id'] = i["text"]
  277. order_event['order_id'] = i['id']
  278. order_event['status'] = "NEW"
  279. self.callback['onOrder'](order_event)
  280. return response
  281. async def get_server_time(self):
  282. params = {}
  283. response = await self._request('GET', '/api/v1/timestamp', params=params)
  284. return response
  285. async def get_account(self):
  286. return await self._request('GET','/api/v4/futures/usdt/accounts', params={}, auth=1)
  287. async def get_position(self):
  288. '''获取持仓 symbol: BTC-USDT'''
  289. return await self._request('GET', f'/api/v4/futures/usdt/dual_comp/positions/{self.symbol}', params={}, auth=1)
  290. async def get_market_details(self):
  291. return await self._request('GET',f'/api/v4/futures/usdt/contracts', params={}, auth=1)
  292. async def get_ticker(self):
  293. res, err = await self._request('GET',f'/api/v4/futures/usdt/tickers', params={"contract":self.symbol}, auth=1)
  294. if res:
  295. ap = float(res[0]["last"])
  296. bp = float(res[0]["last"])
  297. mp = (ap+bp)*0.5
  298. d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  299. self.callback['onTicker'](d)
  300. return d
  301. if err:
  302. self.logger.error(err)
  303. return None
  304. async def before_trade(self):
  305. ### 获取市场信息
  306. res, err = await self.get_market_details()
  307. if err:
  308. pass
  309. if res:
  310. for i in res:
  311. if self.symbol == i['name']:
  312. self.tickSize = float(i['order_price_round'])
  313. self.multiplier = float(i['quanto_multiplier'])
  314. self.stepSize = float(i['order_size_min'])*float(i['quanto_multiplier']) # 张 转换为 币
  315. #### 保存交易规则信息
  316. exchange_info = model.ExchangeInfo()
  317. exchange_info.symbol = i['name']
  318. exchange_info.multiplier = float(i['quanto_multiplier'])
  319. exchange_info.tickSize = float(i['order_price_round'])
  320. exchange_info.stepSize = float(i['order_size_min'])*float(i['quanto_multiplier'])
  321. self.exchange_info[exchange_info.symbol] = exchange_info
  322. ### 获取持仓模式
  323. res, err = await self._request('POST', f'/api/v4/futures/usdt/dual_mode', params={'dual_mode':"true"}, auth=1)
  324. if err:
  325. print(err)
  326. if res:
  327. print(res)
  328. ### 杠杆
  329. res, err = await self._request('POST', f'/api/v4/futures/usdt/dual_comp/positions/{self.symbol}/leverage', params={'leverage':"20"}, auth=1)
  330. if err:
  331. print(err)
  332. if res:
  333. print(res)
  334. async def get_equity(self):
  335. # 更新账户
  336. res, err = await self.get_account()
  337. if err:print(err)
  338. if res:
  339. if res['currency'] == self.quote:
  340. cash = float(res['total'])
  341. self.callback['onEquity']({
  342. self.quote:cash
  343. })
  344. self.cash_value = cash
  345. async def buy_token(self):
  346. '''买入平台币'''
  347. pass
  348. async def check_position(self, hold_coin=0.0):
  349. '''
  350. 清空挂单清空仓位 已支持全品种
  351. '''
  352. try:
  353. #############################
  354. self.logger.info("清空挂单")
  355. params = {
  356. 'contract':self.symbol
  357. }
  358. response, error = await self._request('DELETE', '/api/v4/futures/usdt/orders', params=params, auth=1)
  359. if response:
  360. self.logger.info(response)
  361. if error:
  362. self.logger.info(error)
  363. #############################
  364. #############################
  365. self.logger.info("检查遗漏仓位")
  366. res, err = await self._request('GET', f'/api/v4/futures/usdt/positions', params={}, auth=1)
  367. if err:
  368. self.logger.info(err)
  369. if res:
  370. for i in res:
  371. symbol = i['contract']
  372. if symbol not in self.exchange_info:
  373. await self.before_trade()
  374. size = abs(float(i['size'])) # 单位张
  375. side = i['mode']
  376. if size == 0:
  377. pass
  378. else:
  379. #######
  380. res, err = await self._request('GET',f'/api/v4/futures/usdt/tickers', params={"contract":symbol}, auth=1)
  381. if res:
  382. ap = float(res[0]["last"])
  383. bp = float(res[0]["last"])
  384. mp = (ap+bp)*0.5
  385. if err:
  386. pass
  387. #######
  388. amount = abs(size)*self.exchange_info[symbol].multiplier
  389. #######
  390. if side == 'dual_short':
  391. # pk
  392. price = float(Decimal(str(mp*1.001//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  393. res = await self.take_order(symbol, amount, "pk", price, "t-123", "limit")
  394. self.logger.info(res)
  395. if side == 'dual_long':
  396. # pd
  397. price = float(Decimal(str(mp*0.999//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  398. res = await self.take_order(symbol, amount, "pd", price, "t-123", "limit")
  399. self.logger.info(res)
  400. except:
  401. self.logger.error("清仓程序执行出错")
  402. self.logger.error(traceback.format_exc())
  403. return
  404. async def go(self):
  405. interval = 60
  406. await self.before_trade()
  407. await asyncio.sleep(1)
  408. while 1:
  409. try:
  410. # 停机信号
  411. if self.stop_flag:return
  412. # 更新账户
  413. res, err = await self.get_account()
  414. if err:print(err)
  415. if res:
  416. if res['currency'] == self.quote:
  417. cash = float(res['total'])
  418. self.callback['onEquity']({
  419. self.quote:cash
  420. })
  421. self.cash_value = cash
  422. self.logger.debug(f"rest cash {cash}")
  423. # 更新仓位
  424. res, err = await self.get_position()
  425. if err:
  426. self.logger.info(err)
  427. if res:
  428. p = model.Position()
  429. for i in res:
  430. symbol = i['contract']
  431. size = abs(float(i['size']))*self.multiplier
  432. price = float(i['entry_price'])
  433. side = i['mode']
  434. if self.symbol == symbol:
  435. if size == 0:
  436. pass
  437. else:
  438. #######
  439. if side == 'dual_short':
  440. p.shortAvg = price
  441. p.shortPos = size
  442. if side == 'dual_long':
  443. p.longAvg = price
  444. p.longPos = size
  445. self.callback['onPosition'](p)
  446. await asyncio.sleep(interval)
  447. # 打印延迟
  448. self.logger.debug(f'{self.name} rest报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  449. except:
  450. traceback.print_exc()
  451. await asyncio.sleep(10)
  452. async def transfor(self):
  453. params = {
  454. 'currency':'USDT',
  455. 'from':'spot',
  456. 'to':'futures',
  457. 'amount':'400',
  458. 'settle':'USDT',
  459. }
  460. response, error = await self._request('POST', '/api/v4/wallet/transfers', body=params, auth=1)
  461. if response:
  462. print(response)
  463. if error:
  464. print(error)
  465. def get_delay_info(self):
  466. if len(self.delays) > 100:
  467. self.delays = self.delays[-100:]
  468. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  469. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  470. async def handle_signals(self, orders):
  471. '''执行策略指令'''
  472. try:
  473. for order_name in orders:
  474. if 'Cancel' in order_name:
  475. cid = orders[order_name][0]
  476. oid = orders[order_name][1]
  477. if cid:
  478. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  479. elif oid:
  480. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  481. for order_name in orders:
  482. if 'Limits' in order_name:
  483. for i in orders[order_name]:
  484. asyncio.get_event_loop().create_task(self.take_order(
  485. self.symbol,
  486. i[0],
  487. i[1],
  488. i[2],
  489. i[3],
  490. ))
  491. for order_name in orders:
  492. if 'Check' in order_name:
  493. cid = orders[order_name][0]
  494. # oid = orders[order_name][1]
  495. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  496. except:
  497. # traceback.print_exc()
  498. await asyncio.sleep(0.1)