huobi_usdt_swap_rest.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json
  6. import hmac
  7. import base64
  8. import hashlib
  9. import traceback
  10. import logging, logging.handlers
  11. import urllib, sys
  12. from urllib import parse
  13. from urllib.parse import urljoin
  14. import datetime
  15. import logging, logging.handlers
  16. import model
  17. import utils
  18. def empty_call(msg):
  19. print('空的回调函数')
  20. def decimal_amount(amount, d):
  21. if int(d) == 0:
  22. return str(int(amount))
  23. elif int(d) > 0:
  24. return str(round(float(amount), int(d)))
  25. def decimal_price(price, d):
  26. if int(d) == 0:
  27. return str(int(price))
  28. elif int(d) > 0:
  29. return str(round(float(price), int(d)))
  30. class HuobiUsdtSwapRest:
  31. def __init__(self, params:model.ClientParams, colo=0):
  32. if colo:
  33. print('不支持colo高速线路')
  34. self.HOST = 'https://api.hbdm.com'
  35. else:
  36. self.HOST = 'https://api.hbdm.com'
  37. self.params = params
  38. self.base = self.params.pair.split('_')[0].upper()
  39. self.quote = self.params.pair.split('_')[1].upper()
  40. self.symbol = self.base + '-' + self.quote
  41. self.data = {}
  42. self._SESSIONS = dict()
  43. self.data['account'] = {}
  44. self.callback = {
  45. "onMarket":empty_call,
  46. "onPosition":empty_call,
  47. "onOrder":empty_call,
  48. }
  49. self.exchange_info = dict()
  50. self.delays = []
  51. self.max_delay = 0.0
  52. self.avg_delay = 0.0
  53. self.proxy = None
  54. if 'win' in sys.platform:
  55. self.proxy = self.params.proxy
  56. self.logger = self.get_logger()
  57. self.stop_flag = 0
  58. self.coin_value = 0.0
  59. self.cash_value = 0.0
  60. self.multiplier = None
  61. #### 指定发包ip
  62. iplist = utils.get_local_ip_list()
  63. self.ip = iplist[int(self.params.ip)]
  64. def get_delay_info(self):
  65. if len(self.delays) > 100:
  66. self.delays = self.delays[-100:]
  67. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  68. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  69. def get_logger(self):
  70. logger = logging.getLogger(__name__)
  71. logger.setLevel(logging.DEBUG)
  72. # log to txt
  73. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  74. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  75. handler.setLevel(logging.DEBUG)
  76. handler.setFormatter(formatter)
  77. logger.addHandler(handler)
  78. return logger
  79. def _get_session(self, url):
  80. key = url
  81. if key not in self._SESSIONS:
  82. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  83. session = aiohttp.ClientSession(connector=tcp)
  84. self._SESSIONS[key] = session
  85. return self._SESSIONS[key]
  86. def generate_signature(self, method, params, host_url, request_path):
  87. if request_path.startswith("http://") or request_path.startswith("https://"):
  88. host_url = urllib.parse.urlparse(request_path).hostname.lower()
  89. request_path = '/' + '/'.join(request_path.split('/')[3:])
  90. else:
  91. host_url = urllib.parse.urlparse(self.HOST).hostname.lower()
  92. sorted_params = sorted(params.items(), key=lambda d: d[0], reverse=False)
  93. encode_params = urllib.parse.urlencode(sorted_params)
  94. payload = [method, host_url, request_path, encode_params]
  95. payload = "\n".join(payload)
  96. payload = payload.encode(encoding="UTF8")
  97. secret_key = self.params.secret_key.encode(encoding="utf8")
  98. digest = hmac.new(secret_key, payload, digestmod=hashlib.sha256).digest()
  99. signature = base64.b64encode(digest)
  100. signature = signature.decode()
  101. return signature
  102. async def _request(self, method, uri, body=None, params=None, auth=False):
  103. url = urljoin(self.HOST, uri)
  104. if auth:
  105. timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
  106. params = params if params else {}
  107. params.update({"AccessKeyId": self.params.access_key,
  108. "SignatureMethod": "HmacSHA256",
  109. "SignatureVersion": "2",
  110. "Timestamp": timestamp})
  111. host_name = urllib.parse.urlparse(self.HOST).hostname.lower()
  112. params["Signature"] = self.generate_signature(method, params, host_name, uri)
  113. if method == "GET":
  114. headers = {
  115. "Content-type": "application/x-www-form-urlencoded",
  116. "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
  117. "Chrome/39.0.2171.71 Safari/537.36"
  118. }
  119. else:
  120. headers = {
  121. "Accept": "application/json",
  122. "Content-type": "application/json"
  123. }
  124. # 发起请求
  125. session = self._get_session(url)
  126. timeout = aiohttp.ClientTimeout(10)
  127. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  128. self.logger.debug(msg)
  129. try:
  130. start_time = time.time()
  131. if method == "GET":
  132. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  133. elif method == "POST":
  134. response = await session.post(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  135. elif method == "DELETE":
  136. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  137. code = response.status
  138. res = await response.json()
  139. delay = int(1000*(time.time() - start_time))
  140. self.delays.append(delay)
  141. res_msg = msg + f' 回报 {res}'
  142. self.logger.debug(res_msg)
  143. if code not in (200, 201, 202, 203, 204, 205, 206):
  144. self.logger.error(f'请求错误 {res}')
  145. self.logger.error(code)
  146. return None ,res
  147. return json.loads(res), None
  148. except Exception as e:
  149. print('网络请求出错', e)
  150. self.logger.error('请求错误')
  151. self.logger.error(e)
  152. return None, e
  153. async def take_order(self, symbol, amount, origin_side, price, cid, order_type='LIMIT'):
  154. if origin_side =='kd':
  155. side = 'buy'
  156. positionSide = 'open'
  157. elif origin_side =='pd':
  158. side = 'sell'
  159. positionSide = 'close'
  160. elif origin_side =='kk':
  161. side = 'sell'
  162. positionSide = 'open'
  163. elif origin_side =='pk':
  164. side = 'buy'
  165. positionSide = 'close'
  166. else:
  167. raise Exception('下单参数错误')
  168. params = {
  169. 'symbol': symbol,
  170. 'quantity': decimal_amount(amount, self.params['decimal_amount']),
  171. 'side': side,
  172. 'positionSide': positionSide,
  173. 'price': decimal_price(price, self.params['decimal_price'] ),
  174. 'type':order_type,
  175. 'timeInForce':'GTC',
  176. }
  177. # logger.info(f'下单指令 {params}')
  178. if self.params['debug'] == 'True':
  179. return await asyncio.sleep(0.1)
  180. else:
  181. response, error = await self._request('POST', '/linear-swap-api/v1/swap_cross_order', params=params, auth=1)
  182. # logger.info(f'下单回报 {response}')
  183. if response:
  184. order_event = dict()
  185. order_event['status'] = "NEW"
  186. order_event['client_id'] = cid
  187. order_event['order_id'] = response["order_id"]
  188. self.callback["onOrder"](order_event)
  189. if error:
  190. order_event = dict()
  191. order_event['status'] = "REMOVE"
  192. order_event['client_id'] = cid
  193. order_event['filled_price'] = 0.0
  194. order_event['filled'] = 0.0
  195. order_event['fee'] = 0.0
  196. self.callback["onOrder"](order_event)
  197. return response
  198. # async def take_orders(self, orders):
  199. # def change(side):
  200. # if side =='kd':
  201. # side = 'BUY'
  202. # positionSide = 'LONG'
  203. # elif side =='pd':
  204. # side = 'SELL'
  205. # positionSide = 'LONG'
  206. # elif side =='kk':
  207. # side = 'SELL'
  208. # positionSide = 'SHORT'
  209. # elif side =='pk':
  210. # side = 'BUY'
  211. # positionSide = 'SHORT'
  212. # else:
  213. # raise Exception('下单参数错误')
  214. # return side, positionSide
  215. # params = {}
  216. # data = []
  217. # for i in orders:
  218. # data.append({
  219. # 'symbol': i[0],
  220. # 'quantity': i[1],
  221. # 'side': change(i[2])[0],
  222. # 'positionSide': change(i[2])[1],
  223. # 'price': i[3],
  224. # 'type':'LIMIT',
  225. # 'timeInForce':'GTC',
  226. # })
  227. # params['batchOrders'] = json.dumps(data)
  228. # # logger.info(f'下单指令 {params}')
  229. # if self.params['debug'] == 'True':
  230. # return await asyncio.sleep(0.1)
  231. # else:
  232. # response = await self._request('POST', '/fapi/v1/batchOrders', params=params)
  233. # # logger.info(f'下单回报 {response}')
  234. # return response
  235. async def cancel_order(self, order_id=None, client_id=None):
  236. if order_id:
  237. params = {
  238. "contract_code": self.symbol,
  239. "order_id": order_id,
  240. }
  241. response, error = await self._request('POST', f'/linear-swap-api/v1/swap_cross_cancel', params=params, auth=1)
  242. if response:
  243. pass
  244. if error:
  245. pass
  246. return None
  247. async def cancel_all_orders(self):
  248. params = {
  249. "contract_code": self.symbol
  250. }
  251. return await self._request('POST', f'/linear-swap-api/v1/swap_cross_cancelall', params=params, auth=1)
  252. async def get_order_list(self):
  253. params = {'contract_code':self.symbol}
  254. response, error = await self._request('POST', '/linear-swap-api/v1/swap_cross_openorders', params=params, auth=1)
  255. if response:
  256. for i in response:
  257. pass
  258. # if i['direction'] == 'buy' and i['offset'] == 'open':
  259. # side = 'kd'
  260. # elif i['direction'] == 'sell' and i['offset'] == 'close':
  261. # side = 'pd'
  262. # elif i['direction'] == 'sell' and i['offset'] == 'open':
  263. # side = 'kk'
  264. # elif i['direction'] == 'buy' and i['offset'] == 'close':
  265. # side = 'pk'
  266. # orders.append({
  267. # 'order_id':i['order_id'],
  268. # 'symbol':i['contract_code'],
  269. # 'amount':float(i['volume']),
  270. # 'side':side,
  271. # 'price':float(i['price']),
  272. # })
  273. # self.callback['onOrder']({"refresh":orders})
  274. if error:
  275. pass
  276. return None
  277. async def get_server_time(self):
  278. params = {}
  279. response, error = await self._request('GET', '/api/v1/timestamp', params=params)
  280. return response
  281. async def get_account(self):
  282. return await self._request('POST','/linear-swap-api/v1/swap_cross_account_info', params={}, auth=1)
  283. async def get_position(self):
  284. '''获取持仓 symbol: BTC-USDT'''
  285. return await self._request('POST','/linear-swap-api/v1/swap_position_info', params={'contract_code':self.symbol}, auth=1)
  286. async def before_trade(self):
  287. '''获取市场信息'''
  288. res, err = await self._request('GET',f'/linear-swap-api/v1/swap_contract_info', params={}, auth=1)
  289. if err:
  290. print(err)
  291. if res:
  292. for i in res['data']:
  293. if self.symbol == i['name']:
  294. self.multiplier = float(i['contract_size'])
  295. self.tickSize = float(i['price_tick'])
  296. self.stepSize = 1*float(i['contract_size']) # 张 转换为 币
  297. #### 保存交易规则信息
  298. exchange_info = model.ExchangeInfo()
  299. exchange_info.symbol = i['name']
  300. exchange_info.multiplier = float(i['contract_size'])
  301. exchange_info.tickSize = float(i['price_tick'])
  302. exchange_info.stepSize = 1*float(i['contract_size'])
  303. self.exchange_info[exchange_info.symbol] = exchange_info
  304. pass
  305. async def go(self):
  306. while 1:
  307. try:
  308. # 更新账户
  309. res, err = await self.get_account()
  310. for i in res:
  311. self.data['account'][i['asset']] = i['balance']
  312. if self.quote == i['asset'].upper():
  313. cash = float(i['balance'])
  314. self.callback['onEquity']({
  315. self.quote:cash
  316. })
  317. # 更新仓位
  318. res, err = await self.get_position()
  319. if res:
  320. p = model.Position()
  321. for i in res:
  322. if i['symbol'] == self.symbol:
  323. if i['positionSide'] == 'LONG':
  324. p.longPos = float(i['positionAmt'])
  325. p.longAvg = float(i['entryPrice'])
  326. if i['positionSide'] == 'SHORT':
  327. p.shortPos = float(i['positionAmt'])
  328. p.shortAvg = float(i['entryPrice'])
  329. self.callback['onPosition'](p)
  330. await asyncio.sleep(1)
  331. # 打印延迟
  332. self.get_delay_info()
  333. except:
  334. # traceback.print_exc()
  335. await asyncio.sleep(10)
  336. def get_data(self):
  337. return self.data
  338. async def run(self, orders):
  339. '''执行策略指令'''
  340. try:
  341. for order_name in orders:
  342. if 'Cancel' in order_name:
  343. cid = orders[order_name][0]
  344. oid = orders[order_name][1]
  345. if cid:
  346. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  347. if oid:
  348. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  349. for order_name in orders:
  350. if 'Limits' in order_name:
  351. for i in orders[order_name]:
  352. asyncio.get_event_loop().create_task(self.take_order(
  353. self.symbol,
  354. i[0],
  355. i[1],
  356. i[2],
  357. i[3],
  358. ))
  359. for order_name in orders:
  360. if 'Check' in order_name:
  361. cid = orders[order_name][0]
  362. # oid = orders[order_name][1]
  363. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  364. except:
  365. # traceback.print_exc()
  366. await asyncio.sleep(0.1)