binance_coin_swap_rest.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json
  6. import hmac
  7. import base64
  8. import hashlib
  9. import traceback
  10. import random, sys
  11. from urllib.parse import urlparse
  12. import logging, logging.handlers
  13. import model, utils
  14. from decimal import Decimal
  15. def empty_call(msg):
  16. print(f'空的回调函数 {msg}')
  17. class BinanceCoinSwapRest:
  18. def __init__(self, params:model.ClientParams, colo=0):
  19. if colo:
  20. print('不支持colo高速线路')
  21. self.HOST = 'https://dapi.binance.com'
  22. else:
  23. self.HOST = 'https://dapi.binance.com'
  24. self.params = params
  25. self.name = self.params.name
  26. self.base = self.params.pair.split('_')[0].upper()
  27. self.quote = self.params.pair.split('_')[1].upper()
  28. self.symbol = self.base + self.quote
  29. if len(self.params.pair.split('_')) > 2:
  30. self.delivery = self.params.pair.split('_')[2] # 210924
  31. self.symbol += f"_{self.delivery}"
  32. else:
  33. self.symbol += '_PERP'
  34. self.data = {}
  35. self._SESSIONS = dict()
  36. self.callback = {
  37. "onMarket":empty_call,
  38. "onPosition":empty_call,
  39. "onOrder":empty_call,
  40. "onEquity":empty_call,
  41. "onExit":empty_call,
  42. }
  43. self.exchange_info = dict()
  44. self.stepSize = None
  45. self.tickSize = None
  46. self.delays = []
  47. self.avg_delay = 0
  48. self.max_delay = 0
  49. self.proxy = None
  50. self.broker_id = self.params.broker_id
  51. if 'win' in sys.platform:
  52. self.proxy = self.params.proxy
  53. self.logger = self.get_logger()
  54. self.multiplier = None
  55. self.mp = 0.0 # 初始mp 在check_position中更新
  56. #### 指定发包ip
  57. iplist = utils.get_local_ip_list()
  58. self.ip = iplist[int(self.params.ip)]
  59. def get_logger(self):
  60. logger = logging.getLogger(__name__)
  61. logger.setLevel(logging.DEBUG)
  62. # log to txt
  63. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  64. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  65. handler.setLevel(logging.DEBUG)
  66. handler.setFormatter(formatter)
  67. logger.addHandler(handler)
  68. return logger
  69. def _get_session(self, url):
  70. parsed_url = urlparse(url)
  71. key = parsed_url.netloc or parsed_url.hostname
  72. if key not in self._SESSIONS:
  73. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  74. session = aiohttp.ClientSession(connector=tcp)
  75. self._SESSIONS[key] = session
  76. return self._SESSIONS[key]
  77. async def _request(self, method, uri, body=None, params=None, HOST=None):
  78. headers = {}
  79. headers["Content-Type"] = "application/json"
  80. headers['X-MBX-APIKEY'] = self.params.access_key
  81. params['timestamp']=int(time.time())*1000
  82. query_string = "&".join(["{}={}".format(k, params[k]) for k in params.keys()])
  83. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  84. params['signature']=signature
  85. if HOST == None:
  86. url = self.HOST + uri
  87. else:
  88. url = HOST + uri
  89. # 发起请求
  90. timeout = aiohttp.ClientTimeout(10)
  91. session = self._get_session(url)
  92. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  93. self.logger.debug(msg)
  94. try:
  95. start_time = time.time()
  96. if method == "GET":
  97. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  98. elif method == "POST":
  99. response = await session.post(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  100. elif method == "DELETE":
  101. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  102. code = response.status
  103. res = await response.json()
  104. if code not in (200, 201, 202, 203, 204, 205, 206):
  105. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  106. self.logger.error(res)
  107. return None, str(res)
  108. delay = int(1000*(time.time() - start_time))
  109. self.delays.append(delay)
  110. return res, None
  111. except Exception as e:
  112. print('网络请求错误')
  113. print(f'URL:{url} PARAMS:{params} ERROR:{e}')
  114. self.logger.error(e)
  115. self.logger.error(traceback.format_exc())
  116. return None, str(e)
  117. def get_delay_info(self):
  118. if len(self.delays) > 100:
  119. self.delays = self.delays[-100:]
  120. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  121. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  122. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='LIMIT'):
  123. '''
  124. 下单接口
  125. input amount 单位为 币 需要转换为 张
  126. '''
  127. if symbol not in self.exchange_info:
  128. await self.before_trade()
  129. # price = float(Decimal(str(price//self.tickSize))*Decimal(str(self.tickSize)))
  130. # amount = amount * price / self.multiplier
  131. # amount = float(Decimal(str(amount//self.stepSize))*Decimal(str(self.stepSize)))
  132. # 转换为张
  133. if origin_side =='kd':
  134. side = 'BUY'
  135. positionSide = 'LONG'
  136. elif origin_side =='pd':
  137. side = 'SELL'
  138. positionSide = 'LONG'
  139. elif origin_side =='kk':
  140. side = 'SELL'
  141. positionSide = 'SHORT'
  142. elif origin_side =='pk':
  143. side = 'BUY'
  144. positionSide = 'SHORT'
  145. else:
  146. raise Exception(f'下单参数错误 side:{origin_side}')
  147. if amount <= 0:
  148. self.logger.error(f'下单参数错误 amount:{amount}')
  149. return None
  150. if price <= 0:
  151. self.logger.error(f'下单参数错误 price:{price}')
  152. return None
  153. params = {
  154. 'symbol': symbol,
  155. 'quantity': amount,
  156. 'side': side,
  157. 'positionSide': positionSide,
  158. 'type':order_type,
  159. 'newClientOrderId':cid,
  160. }
  161. if order_type in ['LIMIT','STOP','TAKE_PROFIT']:
  162. params['price'] = price
  163. params['timeInForce'] = 'GTC'
  164. if self.params.debug == 'True':
  165. await asyncio.sleep(0.1)
  166. return None, None
  167. else:
  168. # 再报单
  169. response, error = await self._request('POST', '/dapi/v1/order', params=params)
  170. # 再更新
  171. if response is not None:
  172. if 'orderId' in response:
  173. order_event = dict()
  174. order_event['status'] = "NEW"
  175. order_event['client_id'] = params["newClientOrderId"]
  176. order_event['order_id'] = response['orderId']
  177. self.callback["onOrder"](order_event)
  178. if error:
  179. order_event = dict()
  180. order_event['status'] = "REMOVE"
  181. order_event['filled_price'] = 0
  182. order_event['filled'] = 0
  183. order_event['client_id'] = params["newClientOrderId"]
  184. self.callback["onOrder"](order_event)
  185. return response, error
  186. async def cancel_order(self, order_id=None, client_id=None, symbol=None):
  187. params = {
  188. "symbol": self.symbol if symbol==None else symbol,
  189. }
  190. if order_id:
  191. params["orderId"] = order_id
  192. if client_id:
  193. params["origClientOrderId"] = client_id
  194. if self.params.debug == 'True':
  195. await asyncio.sleep(0.1)
  196. return None
  197. else:
  198. response, error = await self._request('DELETE', f'/dapi/v1/order', params=params)
  199. if error:
  200. print("撤单失败",error)
  201. # if 'Unknown order sent.' in error:
  202. # 撤单失败 可能已经撤单 是否发生成交需要rest查
  203. if client_id:await self.check_order(client_id=client_id)
  204. if order_id:await self.check_order(order_id=order_id)
  205. return error
  206. if response:
  207. if 'status' in response:
  208. if response['status'] in ['CANCELED','EXPIRED']: # 已撤销 删除本地订单表
  209. order_event = dict()
  210. order_event['status'] = "REMOVE"
  211. order_event['client_id'] = response["clientOrderId"]
  212. order_event['order_id'] = response["orderId"]
  213. order_event['filled'] = float(response["executedQty"]) * self.multiplier / float(response["price"])
  214. order_event['filled_price'] = float(response["price"])
  215. self.callback['onOrder'](order_event)
  216. return response
  217. async def check_order(self, order_id=None, client_id=None, symbol=None):
  218. params = {
  219. "symbol": self.symbol if symbol==None else symbol,
  220. }
  221. if order_id:
  222. params["orderId"] = order_id
  223. if client_id:
  224. params["origClientOrderId"] = client_id
  225. if self.params.debug == 'True':
  226. await asyncio.sleep(0.1)
  227. return None
  228. else:
  229. response, error = await self._request('GET', f'/dapi/v1/order', params=params)
  230. if error:
  231. print("查单失败", error)
  232. if 'Order does not exist' in error:
  233. # 这种情况也可能还会有成交
  234. # 在订单从引擎到数据库的间隙查单会提示不存在 但实际有成交
  235. pass
  236. return error
  237. if response:
  238. if 'status' in response:
  239. # 需要删除本地订单表的情况
  240. if response['status'] in ['CANCELED','EXPIRED','FILLED']:
  241. order_event = dict()
  242. order_event['status'] = "REMOVE"
  243. order_event['client_id'] = response["clientOrderId"]
  244. order_event['order_id'] = response["orderId"]
  245. order_event['filled'] = float(response["executedQty"]) * self.multiplier / float(response["price"])
  246. order_event['filled_price'] = float(response["price"])
  247. self.callback['onOrder'](order_event)
  248. elif response['status'] in ["NEW"]: # 需要更新本地表的情况
  249. order_event = dict()
  250. order_event['status'] = "NEW"
  251. order_event['client_id'] = response["clientOrderId"]
  252. order_event['order_id'] = response['orderId']
  253. self.callback['onOrder'](order_event)
  254. return response
  255. async def get_order_list(self):
  256. '''
  257. 获取挂单表
  258. '''
  259. response, error = await self._request('GET', '/dapi/v1/openOrders', params={'symbol':self.symbol})
  260. orders = [] # 查询当前挂单 只可能出现 new 和 partfill 默认成交为0 只有 done状态的订单才考虑是否有成交
  261. if response:
  262. for i in response:
  263. order_event = dict()
  264. order_event['status'] = "NEW"
  265. order_event['filled'] = 0
  266. order_event['filled_price'] = 0
  267. order_event['client_id'] = i["clientOrderId"]
  268. order_event['order_id'] = i['orderId']
  269. self.callback["onOrder"](order_event)
  270. orders.append(order_event)
  271. if error:
  272. print('查询列表出错',error)
  273. return orders
  274. async def get_server_time(self):
  275. params = {}
  276. response = await self._request('GET', '/dapi/v1/time', params=params)
  277. return response
  278. async def change_pos_side(self, dual='true'):
  279. '''
  280. 获取仓位模式
  281. '''
  282. params = {'dualSidePosition':dual}
  283. response = await self._request('POST', '/dapi/v1/positionSide/dual', params=params)
  284. return response
  285. async def get_ticker(self):
  286. params = {'symbol': self.symbol}
  287. response = await self._request('GET', '/dapi/v1/ticker/bookTicker', params=params)
  288. ap = float(response[0][0]["askPrice"])
  289. bp = float(response[0][0]["bidPrice"])
  290. mp = (ap+bp)*0.5
  291. d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  292. return d
  293. async def before_trade(self):
  294. params = {}
  295. response, error = await self._request('GET', '/dapi/v1/exchangeInfo', params=params)
  296. if response:
  297. for i in response['symbols']:
  298. if self.symbol in i['symbol'].upper():
  299. self.tickSize = float(i['filters'][0]['tickSize'])
  300. self.stepSize = float(i['filters'][1]['stepSize'])
  301. self.multiplier = float(i['contractSize'])
  302. #### 保存交易规则信息
  303. exchange_info = model.ExchangeInfo()
  304. exchange_info.symbol = i['symbol'].upper()
  305. exchange_info.multiplier = float(i['contractSize'])
  306. exchange_info.tickSize = float(i['filters'][0]['tickSize'])
  307. exchange_info.stepSize = float(i['filters'][1]['stepSize'])
  308. self.exchange_info[exchange_info.symbol] = exchange_info
  309. if error:
  310. print('获取市场信息错误',error)
  311. ###
  312. ticker = await self.get_ticker()
  313. ###
  314. res = await self.get_account()
  315. if res:
  316. for i in res:
  317. if self.base == i['asset'].upper():
  318. self.data['equity'] = float(i['balance']) * ticker["mp"]
  319. self.callback['onEquity'](self.data['equity'])
  320. if error:
  321. print('获取账户信息错误',error)
  322. async def universalTransfer(self, _type='UMFUTURE_MAIN', asset='USDT', amount=0):
  323. params = {}
  324. params['type'] = _type
  325. params['asset'] = asset
  326. params['amount'] = amount
  327. print('发起提现')
  328. response = await self._request('POST', '/sapi/v1/asset/transfer', params=params, HOST='https://api.binance.com')
  329. print(f'提现结果 {response}')
  330. return response
  331. async def futuresTransfer(self, _type='2', asset='USDT', amount=0):
  332. '''
  333. 1: 现货账户向USDT合约账户划转
  334. 2: USDT合约账户向现货账户划转
  335. 3: 现货账户向币本位合约账户划转
  336. 4: 币本位合约账户向现货账户划转
  337. '''
  338. params = {}
  339. params['type'] = _type
  340. params['asset'] = asset
  341. params['amount'] = amount
  342. print('发起转账')
  343. response = await self._request('POST', '/sapi/v1/futures/transfer', params=params, HOST='https://api.binance.com')
  344. print(f'转账结果 {response}')
  345. return response
  346. async def get_account(self):
  347. response, error = await self._request('GET','/dapi/v1/balance', params={})
  348. return response
  349. async def buy_token(self):
  350. '''买入平台币'''
  351. pass
  352. async def check_position(self, hold_coin=0):
  353. '''检查是否存在非运行币种的仓位并take平仓'''
  354. self.logger.info('检查遗漏订单')
  355. response, error = await self._request('GET', '/dapi/v1/openOrders', params={'symbol':self.symbol})
  356. self.logger.info(response)
  357. self.logger.info(error)
  358. if error:self.logger.error(error)
  359. if response:
  360. for i in response:
  361. symbol = i['symbol']
  362. order_id = i['orderId']
  363. if symbol == self.symbol:
  364. res = await self.cancel_order(order_id=i['orderId'], symbol=i['symbol'])
  365. await asyncio.sleep(0.1)
  366. self.logger.info(res)
  367. self.logger.info('检查遗漏仓位')
  368. response, error = await self._request('GET','/dapi/v2/positionRisk', params={})
  369. if error is not None:self.logger.error(error)
  370. if response:
  371. for i in response:
  372. symbol = i['symbol']
  373. if symbol == self.symbol:
  374. if i['positionSide'] == 'LONG':
  375. longPos = abs(float(i['positionAmt']))
  376. longAvg = abs(float(i['entryPrice']))
  377. if longPos > 0:
  378. self.logger.info('发现多头遗留仓位 进行平仓')
  379. res, error = await self.take_order(
  380. symbol,
  381. longPos,
  382. 'pd',
  383. 1,
  384. utils.get_cid(),
  385. order_type='MARKET',
  386. )
  387. self.logger.info(res)
  388. self.logger.info(error)
  389. if i['positionSide'] == 'SHORT':
  390. shortPos = abs(float(i['positionAmt']))
  391. shortAvg = abs(float(i['entryPrice']))
  392. if shortPos > 0:
  393. self.logger.info('发现空头遗留仓位 进行平仓')
  394. res, error = await self.take_order(
  395. symbol,
  396. shortPos,
  397. 'pk',
  398. 1,
  399. utils.get_cid(),
  400. order_type='MARKET',
  401. )
  402. self.logger.info(res)
  403. self.logger.info(error)
  404. self.logger.info('遗留仓位检测完毕')
  405. return
  406. async def get_position(self):
  407. '''
  408. 获取仓位信息
  409. '''
  410. response, error = await self._request('GET','/dapi/v2/positionRisk', params={'symbol':self.symbol})
  411. longPos, shortPos = 0, 0
  412. longAvg, shortAvg = 0, 0
  413. position = {
  414. "longPos":abs(longPos),
  415. "shortPos":abs(shortPos),
  416. "longAvg":abs(longAvg),
  417. "shortAvg":abs(shortAvg)}
  418. if response:
  419. for i in response:
  420. if i['symbol'] == self.symbol:
  421. if i['positionSide'] == 'LONG':
  422. longPos = float(i['positionAmt'])
  423. longAvg = float(i['entryPrice'])
  424. if i['positionSide'] == 'SHORT':
  425. shortPos = float(i['positionAmt'])
  426. shortAvg = float(i['entryPrice'])
  427. position = model.Position()
  428. position.longPos = abs(longPos)
  429. position.longAvg = abs(longAvg)
  430. position.shortPos = abs(shortPos)
  431. position.shortAvg = abs(shortAvg)
  432. self.callback['onPosition'](position)
  433. return position
  434. async def go(self):
  435. '''
  436. 盘前
  437. 获取市场信息
  438. 获取账户信息
  439. 更改仓位模式(期货)
  440. 清空仓位和挂单
  441. 盘中
  442. 更新账户信息
  443. 更新挂单列表
  444. 更新仓位信息
  445. 更新延迟信息
  446. '''
  447. print('Rest循环器启动')
  448. interval = 60 # 不能太快防止占用限频
  449. ### beforeTrade
  450. await self.before_trade()
  451. await asyncio.sleep(1)
  452. await self.change_pos_side()
  453. await asyncio.sleep(1)
  454. ### onTrade
  455. loop = 0
  456. while 1:
  457. loop += 1
  458. try:
  459. # 更新账户
  460. res = await self.get_account()
  461. if res:
  462. for i in res:
  463. if self.quote == i['asset'].upper():
  464. self.data['equity'] = float(i['balance'])
  465. self.callback['onEquity'](self.data['equity'])
  466. # 更新仓位
  467. position = await self.get_position()
  468. await asyncio.sleep(interval)
  469. # 打印延迟
  470. self.get_delay_info()
  471. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  472. except:
  473. traceback.print_exc()
  474. await asyncio.sleep(30)
  475. def get_data(self):
  476. return self.data
  477. async def handle_signals(self, orders):
  478. '''
  479. 执行策略指令
  480. 撤销订单
  481. 检查订单
  482. 下达订单
  483. '''
  484. try:
  485. for order_name in orders:
  486. if 'Cancel' in order_name:
  487. cid = orders[order_name][0]
  488. oid = orders[order_name][1]
  489. if cid:
  490. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  491. elif oid:
  492. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  493. for order_name in orders:
  494. if 'Limits' in order_name:
  495. for i in orders[order_name]:
  496. asyncio.get_event_loop().create_task(self.take_order(
  497. self.symbol,
  498. i[0],
  499. i[1],
  500. i[2],
  501. i[3]
  502. ))
  503. for order_name in orders:
  504. if 'Check' in order_name:
  505. cid = orders[order_name][0]
  506. # oid = orders[order_name][1]
  507. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  508. except Exception as e:
  509. traceback.print_exc()
  510. self.logger.error("执行信号出错"+str(e))
  511. await asyncio.sleep(0.1)