binance_usdt_swap_rest.py 24 KB


  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json
  6. import hmac
  7. import base64
  8. import hashlib
  9. import traceback
  10. import random, sys
  11. from urllib.parse import urlparse
  12. import logging, logging.handlers
  13. import utils
  14. import model
  15. from decimal import Decimal
  16. def empty_call(msg):
  17. print(f'空的回调函数 {msg}')
  18. class BinanceUsdtSwapRest:
  19. def __init__(self, params:model.ClientParams, colo=0):
  20. if colo:
  21. print('不支持colo高速线路')
  22. self.HOST = 'https://fapi.binance.com'
  23. else:
  24. self.HOST = 'https://fapi.binance.com'
  25. self.params = params
  26. self.base = self.params.pair.split('_')[0].upper()
  27. self.quote = self.params.pair.split('_')[1].upper()
  28. self.symbol = self.base + self.quote
  29. if self.base in ['shib','xec']:
  30. print("请输入1000shib/1000xec")
  31. if len(self.params.pair.split('_')) > 2:
  32. self.delivery = self.params.pair.split('_')[2] # 210924
  33. self.symbol += f"_{self.delivery}"
  34. self.name = self.params.name
  35. self._SESSIONS = dict()
  36. self.callback = {
  37. "onMarket":empty_call,
  38. "onPosition":empty_call,
  39. "onOrder":empty_call,
  40. "onEquity":empty_call,
  41. "onExit":empty_call,
  42. "onTicker":empty_call,
  43. }
  44. self.exchange_info = dict()
  45. self.stepSize = None
  46. self.tickSize = None
  47. self.delays = []
  48. self.avg_delay = 0
  49. self.max_delay = 0
  50. self.proxy = None
  51. self.broker_id = self.params.broker_id
  52. if 'win' in sys.platform:
  53. self.proxy = self.params.proxy
  54. self.logger = self.get_logger()
  55. self.stop_flag = 0
  56. self.coin_value = 0.0
  57. self.cash_value = 0.0
  58. #### 指定发包ip
  59. iplist = utils.get_local_ip_list()
  60. self.ip = iplist[int(self.params.ip)]
  61. def get_logger(self):
  62. logger = logging.getLogger(__name__)
  63. logger.setLevel(logging.DEBUG)
  64. # log to txt
  65. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  66. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  67. handler.setLevel(logging.DEBUG)
  68. handler.setFormatter(formatter)
  69. logger.addHandler(handler)
  70. return logger
  71. def _get_session(self, url):
  72. parsed_url = urlparse(url)
  73. key = parsed_url.netloc or parsed_url.hostname
  74. if key not in self._SESSIONS:
  75. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  76. session = aiohttp.ClientSession(connector=tcp)
  77. self._SESSIONS[key] = session
  78. return self._SESSIONS[key]
  79. async def _request(self, method, uri, body=None, params=None, HOST=None):
  80. headers = {}
  81. headers["Content-Type"] = "application/json"
  82. headers['X-MBX-APIKEY'] = self.params.access_key
  83. params['timestamp']=int(time.time())*1000
  84. query_string = "&".join(["{}={}".format(k, params[k]) for k in params.keys()])
  85. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  86. params['signature']=signature
  87. if HOST == None:
  88. url = self.HOST + uri
  89. else:
  90. url = HOST + uri
  91. # 发起请求
  92. timeout = aiohttp.ClientTimeout(10)
  93. session = self._get_session(url)
  94. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  95. self.logger.debug(msg)
  96. try:
  97. start_time = time.time()
  98. if method == "GET":
  99. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  100. elif method == "POST":
  101. response = await session.post(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  102. elif method == "DELETE":
  103. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  104. code = response.status
  105. res = await response.json()
  106. res_msg = msg + f' 回报 {res}'
  107. self.logger.debug(res_msg)
  108. if code not in (200, 201, 202, 203, 204, 205, 206):
  109. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  110. if code == 429:
  111. self.callback['onExit'](f"{self.name} 即将触发限频封禁 紧急退出")
  112. return None, str(res)
  113. delay = int(1000*(time.time() - start_time))
  114. self.delays.append(delay)
  115. self.get_delay_info()
  116. return res, None
  117. except Exception as e:
  118. print('网络请求错误')
  119. print(f'URL:{url} PARAMS:{params} ERROR:{e}')
  120. self.logger.error(e)
  121. self.logger.error(traceback.format_exc())
  122. return None, str(e)
  123. def get_delay_info(self):
  124. if len(self.delays) > 100:
  125. self.delays = self.delays[-100:]
  126. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  127. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  128. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='LIMIT'):
  129. '''
  130. 下单接口
  131. '''
  132. if symbol not in self.exchange_info:
  133. await self.before_trade()
  134. # amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))*Decimal(str(self.exchange_info[symbol].stepSize)))
  135. # price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  136. amount = utils.fix_amount(amount, self.exchange_info[symbol].stepSize)
  137. price = utils.fix_price(price, self.exchange_info[symbol].tickSize)
  138. if origin_side =='kd':
  139. side = 'BUY'
  140. positionSide = 'LONG'
  141. elif origin_side =='pd':
  142. side = 'SELL'
  143. positionSide = 'LONG'
  144. elif origin_side =='kk':
  145. side = 'SELL'
  146. positionSide = 'SHORT'
  147. elif origin_side =='pk':
  148. side = 'BUY'
  149. positionSide = 'SHORT'
  150. else:
  151. raise Exception(f'下单参数错误 side:{origin_side}')
  152. if amount <= 0.0:
  153. self.logger.error(f'下单参数错误 amount:{amount}')
  154. order_event = dict()
  155. order_event['status'] = "REMOVE"
  156. order_event['filled_price'] = 0.0
  157. order_event['filled'] = 0.0
  158. order_event['client_id'] = cid
  159. self.callback["onOrder"](order_event)
  160. return None
  161. if price <= 0.0:
  162. self.logger.error(f'下单参数错误 price:{price}')
  163. order_event = dict()
  164. order_event['status'] = "REMOVE"
  165. order_event['filled_price'] = 0.0
  166. order_event['filled'] = 0.0
  167. order_event['client_id'] = cid
  168. self.callback["onOrder"](order_event)
  169. return None
  170. params = {
  171. 'symbol': symbol,
  172. 'quantity': utils.num_to_str(amount, self.exchange_info[symbol].stepSize),
  173. 'side': side,
  174. 'positionSide': positionSide,
  175. 'type':order_type,
  176. 'newClientOrderId':cid,
  177. }
  178. if order_type in ['LIMIT','STOP','TAKE_PROFIT']:
  179. params['price'] = utils.num_to_str(price, self.exchange_info[symbol].tickSize)
  180. # if origin_side in ['kd', 'kk']:
  181. params['timeInForce'] = 'GTC'
  182. # else:
  183. # params['timeInForce'] = 'GTC'
  184. if self.params.debug == 'True':
  185. await asyncio.sleep(0.1)
  186. return None, None
  187. else:
  188. # 再报单
  189. response, error = await self._request('POST', '/fapi/v1/order', params=params)
  190. # 再更新
  191. if response is not None:
  192. if 'orderId' in response:
  193. order_event = dict()
  194. order_event['status'] = "NEW"
  195. order_event['client_id'] = params["newClientOrderId"]
  196. order_event['order_id'] = response['orderId']
  197. self.callback["onOrder"](order_event)
  198. if error:
  199. order_event = dict()
  200. order_event['status'] = "REMOVE"
  201. order_event['filled_price'] = 0
  202. order_event['filled'] = 0
  203. order_event['client_id'] = params["newClientOrderId"]
  204. self.callback["onOrder"](order_event)
  205. return response, error
  206. async def cancel_order(self, order_id=None, client_id=None, symbol=None):
  207. params = {
  208. "symbol": self.symbol if symbol==None else symbol,
  209. }
  210. if order_id:
  211. params["orderId"] = order_id
  212. if client_id:
  213. params["origClientOrderId"] = client_id
  214. if self.params.debug == 'True':
  215. await asyncio.sleep(0.1)
  216. return None
  217. else:
  218. response, error = await self._request('DELETE', f'/fapi/v1/order', params=params)
  219. if error:
  220. print("撤单失败",error)
  221. # if 'Unknown order sent.' in error:
  222. # 撤单失败 可能已经撤单 是否发生成交需要rest查
  223. # if client_id:await self.check_order(client_id=client_id)
  224. # if order_id:await self.check_order(order_id=order_id)
  225. return error
  226. if response:
  227. # if 'status' in response:
  228. # if response['status'] in ['CANCELED','EXPIRED']: # 已撤销 删除本地订单表
  229. # order_event = dict()
  230. # order_event['status'] = "REMOVE"
  231. # order_event['client_id'] = response["clientOrderId"]
  232. # order_event['order_id'] = response["orderId"]
  233. # order_event['filled'] = float(response["executedQty"])
  234. # order_event['filled_price'] = float(response["cumQuote"])/float(response["executedQty"]) if float(response["executedQty"]) > 0 else 0
  235. # self.callback['onOrder'](order_event)
  236. return response
  237. async def check_order(self, order_id=None, client_id=None, symbol=None):
  238. params = {
  239. "symbol": self.symbol if symbol==None else symbol,
  240. }
  241. if order_id:
  242. params["orderId"] = order_id
  243. if client_id:
  244. params["origClientOrderId"] = client_id
  245. if self.params.debug == 'True':
  246. await asyncio.sleep(0.1)
  247. return None
  248. else:
  249. response, error = await self._request('GET', f'/fapi/v1/order', params=params)
  250. if error:
  251. print("查单失败", error)
  252. if 'Order does not exist' in error:
  253. # 这种情况也可能还会有成交
  254. # 在订单从引擎到数据库的间隙查单会提示不存在 但实际有成交
  255. pass
  256. return error
  257. if response:
  258. if 'status' in response:
  259. # 需要删除本地订单表的情况
  260. if response['status'] in ['CANCELED','EXPIRED','FILLED']:
  261. order_event = dict()
  262. order_event['status'] = "REMOVE"
  263. order_event['client_id'] = response["clientOrderId"]
  264. order_event['order_id'] = response["orderId"]
  265. order_event['filled'] = float(response["executedQty"])
  266. order_event['filled_price'] = float(response["cumQuote"])/float(response["executedQty"]) if float(response["executedQty"]) > 0 else 0
  267. self.callback['onOrder'](order_event)
  268. elif response['status'] in ["NEW"]: # 需要更新本地表的情况
  269. order_event = dict()
  270. order_event['status'] = "NEW"
  271. order_event['client_id'] = response["clientOrderId"]
  272. order_event['order_id'] = response['orderId']
  273. self.callback['onOrder'](order_event)
  274. return response
  275. async def get_order_list(self):
  276. '''
  277. 获取挂单表
  278. '''
  279. response, error = await self._request('GET', '/fapi/v1/openOrders', params={'symbol':self.symbol})
  280. orders = [] # 查询当前挂单 只可能出现 new 和 partfill 默认成交为0 只有 done状态的订单才考虑是否有成交
  281. if response:
  282. for i in response:
  283. order_event = dict()
  284. order_event['status'] = "NEW"
  285. order_event['filled'] = 0
  286. order_event['filled_price'] = 0
  287. order_event['client_id'] = i["clientOrderId"]
  288. order_event['order_id'] = i['orderId']
  289. self.callback["onOrder"](order_event)
  290. orders.append(order_event)
  291. if error:
  292. print('查询列表出错',error)
  293. return orders
  294. async def get_history_order(self):
  295. params = {
  296. "limit":1000,
  297. "startTime":1631260140000,
  298. "endTime":1631260200000,
  299. }
  300. response, error = await self._request('GET', '/fapi/v1/allOrders', params=params)
  301. import json
  302. fp = open("123.csv", "w")
  303. json.dump(response,fp)
  304. return response
  305. async def get_server_time(self):
  306. params = {}
  307. response = await self._request('GET', '/fapi/v1/time', params=params)
  308. return response
  309. async def change_pos_side(self, dual='true'):
  310. '''
  311. 获取仓位模式
  312. '''
  313. params = {'dualSidePosition':dual}
  314. response = await self._request('POST', '/fapi/v1/positionSide/dual', params=params)
  315. return response
  316. async def get_exchange_info(self):
  317. response, error = await self._request('GET', '/fapi/v1/exchangeInfo', params={})
  318. return response, error
  319. async def before_trade(self):
  320. ##########
  321. response, error = await self.get_exchange_info()
  322. if response:
  323. for i in response['symbols']:
  324. if self.symbol in i['symbol'].upper():
  325. self.stepSize = float(i['filters'][1]['stepSize'])
  326. self.tickSize = float(i['filters'][0]['tickSize'])
  327. #### 保存交易规则信息
  328. exchange_info = model.ExchangeInfo()
  329. exchange_info.symbol = i['symbol'].upper()
  330. exchange_info.multiplier = 1
  331. exchange_info.stepSize = float(i['filters'][1]['stepSize'])
  332. exchange_info.tickSize = float(i['filters'][0]['tickSize'])
  333. self.exchange_info[exchange_info.symbol] = exchange_info
  334. if error:
  335. print('获取市场信息错误',error)
  336. await self.change_pos_side()
  337. await asyncio.sleep(1)
  338. await self.get_position()
  339. async def get_equity(self):
  340. ##########
  341. res, err = await self.get_account()
  342. if res:
  343. for i in res:
  344. if self.quote == i['asset'].upper():
  345. self.callback['onEquity']({self.quote:float(i['balance'])})
  346. self.cash_value = float(i['balance'])
  347. if err:
  348. print('获取账户信息错误', err)
  349. ##########
  350. async def universalTransfer(self, _type='UMFUTURE_MAIN', asset='USDT', amount=0):
  351. params = {}
  352. params['type'] = _type
  353. params['asset'] = asset
  354. params['amount'] = amount
  355. print('发起提现')
  356. response = await self._request('POST', '/sapi/v1/asset/transfer', params=params, HOST='https://api.binance.com')
  357. print(f'提现结果 {response}')
  358. return response
  359. async def futuresTransfer(self, _type='2', asset='USDT', amount=0):
  360. '''
  361. 1: 现货账户向USDT合约账户划转
  362. 2: USDT合约账户向现货账户划转
  363. 3: 现货账户向币本位合约账户划转
  364. 4: 币本位合约账户向现货账户划转
  365. '''
  366. params = {}
  367. params['type'] = _type
  368. params['asset'] = asset
  369. params['amount'] = amount
  370. print('发起转账')
  371. response = await self._request('POST', '/sapi/v1/futures/transfer', params=params, HOST='https://api.binance.com')
  372. print(f'转账结果 {response}')
  373. return response
  374. async def get_account(self):
  375. response, error = await self._request('GET','/fapi/v2/balance', params={})
  376. return response, error
  377. async def get_ticker(self):
  378. response, error = await self._request('GET','/fapi/v1/ticker/bookTicker', params={"symbol":self.symbol})
  379. if response:
  380. if response['symbol'] == self.symbol:
  381. ap = float(response['bidPrice'])
  382. bp = float(response['askPrice'])
  383. mp = (ap+bp)*0.5
  384. d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  385. self.callback['onTicker'](d)
  386. return d
  387. if error:
  388. print(error)
  389. return None
  390. async def buy_token(self):
  391. '''买入平台币'''
  392. pass
  393. async def check_position(self, hold_coin=0.0):
  394. '''
  395. 检查是否存在非运行币种的仓位并take平仓 已支持全品种
  396. '''
  397. try:
  398. self.logger.info('检查遗漏订单')
  399. response, error = await self._request('GET', '/fapi/v1/openOrders', params={})
  400. self.logger.info(response)
  401. self.logger.info(error)
  402. if error:self.logger.error(error)
  403. if response:
  404. for i in response:
  405. res = await self.cancel_order(order_id=i['orderId'], symbol=i['symbol'])
  406. await asyncio.sleep(0.1)
  407. self.logger.info(res)
  408. # 清空仓位
  409. self.logger.info('检查遗漏仓位')
  410. response, error = await self._request('GET','/fapi/v2/positionRisk', params={})
  411. if error is not None:self.logger.error(error)
  412. if response:
  413. for i in response:
  414. symbol = i['symbol']
  415. if i['positionSide'] == 'LONG':
  416. longPos = abs(float(i['positionAmt']))
  417. longAvg = abs(float(i['entryPrice']))
  418. if longPos > 0:
  419. self.logger.info('发现多头遗留仓位 进行平仓')
  420. res, err = await self.take_order(
  421. symbol,
  422. longPos,
  423. 'pd',
  424. longAvg*0.95,
  425. utils.get_cid(),
  426. order_type='MARKET',
  427. )
  428. self.logger.info(res)
  429. self.logger.info(err)
  430. if i['positionSide'] == 'SHORT':
  431. shortPos = abs(float(i['positionAmt']))
  432. shortAvg = abs(float(i['entryPrice']))
  433. if shortPos > 0:
  434. self.logger.info('发现空头遗留仓位 进行平仓')
  435. res, err = await self.take_order(
  436. symbol,
  437. shortPos,
  438. 'pk',
  439. shortAvg*1.05,
  440. utils.get_cid(),
  441. order_type='MARKET',
  442. )
  443. self.logger.info(res)
  444. self.logger.info(err)
  445. self.logger.info('遗留仓位检测完毕')
  446. except:
  447. self.logger.error("清仓程序执行出错")
  448. self.logger.error(traceback.format_exc())
  449. return
  450. async def get_position(self):
  451. '''
  452. 获取仓位信息
  453. '''
  454. response, error = await self._request('GET','/fapi/v2/positionRisk', params={'symbol':self.symbol})
  455. longPos, shortPos = 0, 0
  456. longAvg, shortAvg = 0, 0
  457. position = {
  458. "longPos":abs(longPos),
  459. "shortPos":abs(shortPos),
  460. "longAvg":abs(longAvg),
  461. "shortAvg":abs(shortAvg)}
  462. if response:
  463. for i in response:
  464. if i['symbol'] == self.symbol:
  465. if i['positionSide'] == 'LONG':
  466. longPos = float(i['positionAmt'])
  467. longAvg = float(i['entryPrice'])
  468. if i['positionSide'] == 'SHORT':
  469. shortPos = float(i['positionAmt'])
  470. shortAvg = float(i['entryPrice'])
  471. position = model.Position()
  472. position.longPos = abs(longPos)
  473. position.longAvg = abs(longAvg)
  474. position.shortPos = abs(shortPos)
  475. position.shortAvg = abs(shortAvg)
  476. self.callback['onPosition'](position)
  477. return position
  478. async def go(self):
  479. '''
  480. 盘前
  481. 获取市场信息
  482. 获取账户信息
  483. 更改仓位模式(期货)
  484. 清空仓位和挂单
  485. 盘中
  486. 更新账户信息
  487. 更新挂单列表
  488. 更新仓位信息
  489. 更新延迟信息
  490. '''
  491. print('Rest循环器启动')
  492. interval = 60 # 不能太快防止占用限频
  493. ### beforeTrade
  494. await self.before_trade()
  495. await asyncio.sleep(1)
  496. ### onTrade
  497. loop = 0
  498. while 1:
  499. loop += 1
  500. try:
  501. # 停机信号
  502. if self.stop_flag:
  503. return
  504. # 更新账户
  505. res, err = await self.get_account()
  506. if err:
  507. print(err)
  508. if res:
  509. for i in res:
  510. if self.quote == i['asset'].upper():
  511. self.callback['onEquity']({self.quote:float(i['balance'])})
  512. # 更新仓位
  513. position = await self.get_position()
  514. await asyncio.sleep(interval)
  515. # 打印延迟
  516. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  517. except asyncio.CancelledError:
  518. return
  519. except:
  520. traceback.print_exc()
  521. await asyncio.sleep(30)
  522. async def handle_signals(self, orders):
  523. '''
  524. 执行策略指令
  525. 撤销订单
  526. 检查订单
  527. 下达订单
  528. '''
  529. try:
  530. for order_name in orders:
  531. if 'Cancel' in order_name:
  532. cid = orders[order_name][0]
  533. oid = orders[order_name][1]
  534. if cid:
  535. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  536. elif oid:
  537. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  538. for order_name in orders:
  539. if 'Limits' in order_name:
  540. for i in orders[order_name]:
  541. asyncio.get_event_loop().create_task(self.take_order(
  542. self.symbol,
  543. i[0],
  544. i[1],
  545. i[2],
  546. i[3]
  547. ))
  548. for order_name in orders:
  549. if 'Check' in order_name:
  550. cid = orders[order_name][0]
  551. # oid = orders[order_name][1]
  552. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  553. except Exception as e:
  554. traceback.print_exc()
  555. self.logger.error("执行信号出错"+str(e))
  556. await asyncio.sleep(0.1)