binance_spot_rest.py 23 KB


  1. import aiohttp
  2. import time
  3. import asyncio
  4. import zlib
  5. import json
  6. import hmac
  7. import base64
  8. import hashlib
  9. import traceback
  10. import random, sys
  11. from urllib.parse import urlparse
  12. import logging, logging.handlers
  13. import utils
  14. import model
  15. from decimal import Decimal
  16. from decimal import ROUND_HALF_UP, ROUND_FLOOR
  17. def empty_call(msg):
  18. print(f'空的回调函数 {msg}')
  19. class BinanceSpotRest:
  20. def __init__(self, params:model.ClientParams, colo=0):
  21. if colo:
  22. print('不支持colo高速线路')
  23. self.HOST = 'https://api.binance.com'
  24. else:
  25. self.HOST = 'https://api.binance.com'
  26. self.params = params
  27. self.base = self.params.pair.split('_')[0].upper()
  28. self.quote = self.params.pair.split('_')[1].upper()
  29. self.symbol = self.base + self.quote
  30. if len(self.params.pair.split('_')) > 2:
  31. self.delivery = self.params.pair.split('_')[2] # 210924
  32. self.symbol += f"_{self.delivery}"
  33. self.name = self.params.name
  34. self._SESSIONS = dict()
  35. self.callback = {
  36. "onMarket":empty_call,
  37. "onPosition":empty_call,
  38. "onOrder":empty_call,
  39. "onEquity":empty_call,
  40. "onTicker":empty_call,
  41. "onExit":empty_call,
  42. }
  43. self.exchange_info = dict()
  44. self.stepSize = None
  45. self.tickSize = None
  46. self.delays = []
  47. self.avg_delay = 0
  48. self.max_delay = 0
  49. self.proxy = None
  50. self.broker_id = self.params.broker_id
  51. if 'win' in sys.platform:
  52. self.proxy = self.params.proxy
  53. self.logger = self.get_logger()
  54. self.stop_flag = 0
  55. self.coin_value = 0.0
  56. self.cash_value = 0.0
  57. #### 指定发包ip
  58. iplist = utils.get_local_ip_list()
  59. self.ip = iplist[int(self.params.ip)]
  60. def get_logger(self):
  61. logger = logging.getLogger(__name__)
  62. logger.setLevel(logging.DEBUG)
  63. # log to txt
  64. formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s')
  65. handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024)
  66. handler.setLevel(logging.DEBUG)
  67. handler.setFormatter(formatter)
  68. logger.addHandler(handler)
  69. return logger
  70. def _get_session(self, url):
  71. parsed_url = urlparse(url)
  72. key = parsed_url.netloc or parsed_url.hostname
  73. if key not in self._SESSIONS:
  74. tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0))
  75. session = aiohttp.ClientSession(connector=tcp)
  76. self._SESSIONS[key] = session
  77. return self._SESSIONS[key]
  78. async def _request(self, method, uri, body=None, params=None, HOST=None):
  79. headers = {}
  80. headers["Content-Type"] = "application/json"
  81. headers['X-MBX-APIKEY'] = self.params.access_key
  82. if params != None:
  83. params['timestamp']=int(time.time())*1000
  84. query_string = "&".join(["{}={}".format(k, params[k]) for k in params.keys()])
  85. signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest()
  86. params['signature']=signature
  87. if HOST == None:
  88. url = self.HOST + uri
  89. else:
  90. url = HOST + uri
  91. # 发起请求
  92. timeout = aiohttp.ClientTimeout(10)
  93. session = self._get_session(url)
  94. msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body)
  95. self.logger.debug(msg)
  96. try:
  97. start_time = time.time()
  98. if method == "GET":
  99. response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy)
  100. elif method == "POST":
  101. response = await session.post(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  102. elif method == "DELETE":
  103. response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy)
  104. code = response.status
  105. res = await response.json()
  106. res_msg = msg + f' 回报 {res}'
  107. self.logger.debug(res_msg)
  108. if code not in (200, 201, 202, 203, 204, 205, 206):
  109. print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}')
  110. if code == 429 or code == 418:
  111. self.callback['onExit'](f"{self.name} 即将触发限频封禁 紧急退出")
  112. return None, str(res)
  113. delay = int(1000*(time.time() - start_time))
  114. self.delays.append(delay)
  115. return res, None
  116. except Exception as e:
  117. print('网络请求错误')
  118. print(f'URL:{url} PARAMS:{params} ERROR:{e}')
  119. self.logger.error(e)
  120. self.logger.error(traceback.format_exc())
  121. return None, str(e)
  122. def get_delay_info(self):
  123. if len(self.delays) > 100:
  124. self.delays = self.delays[-100:]
  125. if max(self.delays) > self.max_delay:self.max_delay = max(self.delays)
  126. self.avg_delay = round(sum(self.delays)/len(self.delays),1)
  127. async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='LIMIT'):
  128. '''
  129. 下单接口
  130. '''
  131. if symbol not in self.exchange_info:
  132. await self.before_trade()
  133. # amount = float(Decimal(str(amount//self.exchange_info[symbol].stepSize))*Decimal(str(self.exchange_info[symbol].stepSize)))
  134. # price = float(Decimal(str(price//self.exchange_info[symbol].tickSize))*Decimal(str(self.exchange_info[symbol].tickSize)))
  135. amount = utils.fix_amount(amount, self.exchange_info[symbol].stepSize)
  136. price = utils.fix_price(price, self.exchange_info[symbol].tickSize)
  137. if origin_side =='kd':
  138. side = 'BUY'
  139. elif origin_side =='pd':
  140. side = 'SELL'
  141. elif origin_side =='kk':
  142. side = 'SELL'
  143. elif origin_side =='pk':
  144. side = 'BUY'
  145. else:
  146. raise Exception(f'下单参数错误 side:{origin_side}')
  147. if float(amount) <= 0.0:
  148. self.logger.error(f'下单参数错误 amount:{amount}')
  149. order_event = dict()
  150. order_event['status'] = "REMOVE"
  151. order_event['filled_price'] = 0.0
  152. order_event['filled'] = 0.0
  153. order_event['client_id'] = cid
  154. self.callback["onOrder"](order_event)
  155. return None, 'amount error'
  156. if float(price) <= 0.0:
  157. self.logger.error(f'下单参数错误 price:{price}')
  158. order_event = dict()
  159. order_event['status'] = "REMOVE"
  160. order_event['filled_price'] = 0.0
  161. order_event['filled'] = 0.0
  162. order_event['client_id'] = cid
  163. self.callback["onOrder"](order_event)
  164. return None, 'price error'
  165. params = {
  166. 'symbol': symbol,
  167. 'quantity': utils.num_to_str(amount, self.exchange_info[symbol].stepSize),
  168. 'side': side,
  169. 'type':order_type,
  170. 'newClientOrderId':cid,
  171. }
  172. if order_type in ['LIMIT','STOP','TAKE_PROFIT']:
  173. params['price'] = utils.num_to_str(price, self.exchange_info[symbol].tickSize)
  174. params['timeInForce'] = 'GTC'
  175. if self.params.debug == 'True':
  176. await asyncio.sleep(0.1)
  177. return None, None
  178. else:
  179. # 再报单
  180. response, error = await self._request('POST', '/api/v3/order', params=params)
  181. # 再更新
  182. if response is not None:
  183. if 'orderId' in response:
  184. order_event = dict()
  185. order_event['status'] = "NEW"
  186. order_event['client_id'] = params["newClientOrderId"]
  187. order_event['order_id'] = response['orderId']
  188. self.callback["onOrder"](order_event)
  189. if error:
  190. order_event = dict()
  191. order_event['status'] = "REMOVE"
  192. order_event['filled_price'] = 0
  193. order_event['filled'] = 0
  194. order_event['client_id'] = params["newClientOrderId"]
  195. self.callback["onOrder"](order_event)
  196. return response, error
  197. async def cancel_order(self, order_id=None, client_id=None, symbol=None):
  198. params = {
  199. "symbol": self.symbol if symbol==None else symbol,
  200. }
  201. if order_id:
  202. params["orderId"] = order_id
  203. if client_id:
  204. params["origClientOrderId"] = client_id
  205. if self.params.debug == 'True':
  206. await asyncio.sleep(0.1)
  207. return None
  208. else:
  209. response, error = await self._request('DELETE', f'/api/v3/order', params=params)
  210. if error:
  211. print("撤单失败",error)
  212. # if 'Unknown order sent.' in error:
  213. # 撤单失败 可能已经撤单 是否发生成交需要rest查
  214. # if client_id:await self.check_order(client_id=client_id)
  215. # if order_id:await self.check_order(order_id=order_id)
  216. return error
  217. if response:
  218. pass
  219. # if 'status' in response:
  220. # if response['status'] in ['CANCELED','EXPIRED']: # 已撤销 删除本地订单表
  221. # order_event = dict()
  222. # order_event['status'] = "REMOVE"
  223. # order_event['client_id'] = response["origClientOrderId"]
  224. # order_event['order_id'] = response["orderId"]
  225. # order_event['filled'] = float(response["executedQty"])
  226. # order_event['filled_price'] = float(response["cummulativeQuoteQty"])/float(response["executedQty"]) if float(response["executedQty"]) > 0 else 0
  227. # self.callback['onOrder'](order_event)
  228. return response
  229. async def check_order(self, order_id=None, client_id=None, symbol=None):
  230. params = {
  231. "symbol": self.symbol if symbol==None else symbol,
  232. }
  233. if order_id:
  234. params["orderId"] = order_id
  235. if client_id:
  236. params["origClientOrderId"] = client_id
  237. if self.params.debug == 'True':
  238. await asyncio.sleep(0.1)
  239. return None
  240. else:
  241. response, error = await self._request('GET', f'/api/v3/order', params=params)
  242. if error:
  243. print("查单失败", error)
  244. if 'Order does not exist' in error:
  245. # 这种情况也可能还会有成交
  246. # 在订单从引擎到数据库的间隙查单会提示不存在 但实际有成交
  247. pass
  248. return error
  249. if response:
  250. if 'status' in response:
  251. # 需要删除本地订单表的情况
  252. if response['status'] in ['CANCELED','EXPIRED','FILLED']:
  253. order_event = dict()
  254. order_event['status'] = "REMOVE"
  255. order_event['client_id'] = response["clientOrderId"]
  256. order_event['order_id'] = response["orderId"]
  257. order_event['fee'] = 0.0 # 查询订单信息中没有手续费信息
  258. order_event['filled'] = float(response["executedQty"])
  259. order_event['filled_price'] = float(response["cummulativeQuoteQty"])/float(response["executedQty"]) if float(response["executedQty"]) > 0 else 0
  260. self.callback['onOrder'](order_event)
  261. elif response['status'] in ["NEW"]: # 需要更新本地表的情况
  262. order_event = dict()
  263. order_event['status'] = "NEW"
  264. order_event['client_id'] = response["clientOrderId"]
  265. order_event['order_id'] = response['orderId']
  266. self.callback['onOrder'](order_event)
  267. return response
  268. async def get_order_list(self):
  269. '''
  270. 获取挂单表
  271. '''
  272. response, error = await self._request('GET', '/api/v3/openOrders', params={'symbol':self.symbol})
  273. orders = [] # 查询当前挂单 只可能出现 new 和 partfill 默认成交为0 只有 done状态的订单才考虑是否有成交
  274. if response:
  275. for i in response:
  276. order_event = dict()
  277. order_event['status'] = "NEW"
  278. order_event['filled'] = 0
  279. order_event['filled_price'] = 0
  280. order_event['client_id'] = i["clientOrderId"]
  281. order_event['order_id'] = i['orderId']
  282. self.callback["onOrder"](order_event)
  283. orders.append(order_event)
  284. if error:
  285. print('查询列表出错',error)
  286. return orders
  287. async def get_history_order(self):
  288. params = {
  289. "symbol":self.symbol,
  290. "limit":1000,
  291. "startTime":1635815135000,
  292. "endTime":1635901535000,
  293. }
  294. response, error = await self._request('GET', '/api/v3/allOrders', params=params)
  295. import json
  296. fp = open("123.csv", "w")
  297. json.dump(response,fp)
  298. return response
  299. async def get_server_time(self):
  300. params = {}
  301. response = await self._request('GET', '/api/v3/time', params=params)
  302. return response
  303. async def before_trade(self):
  304. response, error = await self._request('GET', '/api/v3/exchangeInfo', params=None)
  305. if response:
  306. for i in response['symbols']:
  307. if self.symbol in i['symbol'].upper():
  308. self.tickSize = float(i['filters'][0]['tickSize'])
  309. self.stepSize = float(i['filters'][1]['stepSize'])
  310. #### 保存交易规则信息
  311. exchange_info = model.ExchangeInfo()
  312. exchange_info.symbol = i['symbol'].upper()
  313. exchange_info.multiplier = 1
  314. exchange_info.tickSize = float(i['filters'][0]['tickSize'])
  315. exchange_info.stepSize = float(i['filters'][1]['stepSize'])
  316. self.exchange_info[exchange_info.symbol] = exchange_info
  317. if error:
  318. print('获取市场信息错误',error)
  319. async def get_equity(self):
  320. res, err = await self.get_account()
  321. if res:
  322. for i in res['balances']:
  323. if self.quote == i['asset'].upper():
  324. cash = float(i['free']) + float(i['locked'])
  325. self.callback['onEquity']({
  326. self.quote:cash
  327. })
  328. self.cash_value = cash
  329. if i['asset'].upper() == self.base:
  330. coin = float(i['free']) + float(i['locked'])
  331. self.callback['onEquity']({
  332. self.base:coin
  333. })
  334. self.coin_value = coin
  335. if err:
  336. print('获取账户信息错误',err)
  337. await asyncio.sleep(1)
  338. async def universalTransfer(self, _type='UMFUTURE_MAIN', asset='USDT', amount=0):
  339. params = {}
  340. params['type'] = _type
  341. params['asset'] = asset
  342. params['amount'] = amount
  343. print('发起提现')
  344. response = await self._request('POST', '/sapi/v3/asset/transfer', params=params, HOST='https://api.binance.com')
  345. print(f'提现结果 {response}')
  346. return response
  347. async def futuresTransfer(self, _type='2', asset='USDT', amount=0):
  348. '''
  349. 1: 现货账户向USDT合约账户划转
  350. 2: USDT合约账户向现货账户划转
  351. 3: 现货账户向币本位合约账户划转
  352. 4: 币本位合约账户向现货账户划转
  353. '''
  354. params = {}
  355. params['type'] = _type
  356. params['asset'] = asset
  357. params['amount'] = amount
  358. print('发起转账')
  359. response = await self._request('POST', '/sapi/v3/futures/transfer', params=params, HOST='https://api.binance.com')
  360. print(f'转账结果 {response}')
  361. return response
  362. async def get_account(self):
  363. return await self._request('GET','/api/v3/account', params={})
  364. async def get_ticker(self):
  365. res ,err = await self._request('GET', '/api/v3/ticker/bookTicker', params=None)
  366. if res:
  367. for i in res:
  368. if i['symbol'] == self.symbol:
  369. ap = float(i['bidPrice'])
  370. bp = float(i['askPrice'])
  371. mp = (ap+bp)*0.5
  372. d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap}
  373. self.callback['onTicker'](d)
  374. return d
  375. if err:
  376. self.logger.error(err)
  377. return None
  378. async def buy_token(self):
  379. '''买入平台币'''
  380. pass
  381. async def check_position(self, hold_coin=0.0):
  382. '''
  383. 重置账户挂单和仓位 已支持全品种
  384. '''
  385. try:
  386. self.logger.info('检查遗漏订单')
  387. response, error = await self._request('GET', '/api/v3/openOrders', params={})
  388. self.logger.info(response)
  389. self.logger.info(error)
  390. if error:self.logger.error(error)
  391. if response:
  392. for i in response:
  393. res = await self.cancel_order(order_id=i['orderId'], symbol=i['symbol'])
  394. await asyncio.sleep(0.1)
  395. self.logger.info(res)
  396. self.logger.info('检查遗漏仓位')
  397. ###########
  398. res ,err = await self._request('GET', '/api/v3/ticker/bookTicker', params=None)
  399. tickers_mp = dict()
  400. if res:
  401. for i in res:
  402. ap = float(i['bidPrice'])
  403. bp = float(i['askPrice'])
  404. mp = (ap+bp)*0.5
  405. tickers_mp[i['symbol']] = mp
  406. if err:
  407. self.logger.error(err)
  408. ###########
  409. if self.exchange_info == dict():
  410. await self.before_trade()
  411. ###########
  412. response, error = await self._request('GET','/api/v3/account', params={})
  413. if error is not None:self.logger.error(error)
  414. if response:
  415. for i in response['balances']:
  416. asset = i['asset']
  417. if asset in ['BNB','USDT', 'TUSD']:
  418. continue
  419. symbol = asset + 'USDT'
  420. if symbol not in tickers_mp:
  421. continue
  422. coin = abs(float(i['free']))+abs(float(i['locked']))
  423. if coin == 0.0:
  424. continue
  425. mp = tickers_mp[symbol]
  426. coin_value = coin * mp
  427. if symbol == self.symbol:
  428. _hold_coin = hold_coin
  429. else:
  430. _hold_coin = 0
  431. diff = _hold_coin - coin_value
  432. diff *= 0.99 # 避免无法下单
  433. self.logger.info(f'需要调整现货仓位{diff}usd')
  434. if diff > 20.0:
  435. self.logger.info('买入现货')
  436. res, err = await self.take_order(
  437. symbol,
  438. diff/mp,
  439. 'kd',
  440. 1,
  441. utils.get_cid(),
  442. 'MARKET'
  443. )
  444. self.logger.info(res)
  445. self.logger.info(err)
  446. elif diff < -20.0:
  447. self.logger.info('卖出现货')
  448. res, err = await self.take_order(
  449. symbol,
  450. -diff/mp,
  451. 'kk',
  452. 1,
  453. utils.get_cid(),
  454. 'MARKET'
  455. )
  456. self.logger.info(res)
  457. self.logger.info(err)
  458. self.logger.info('遗留仓位检测完毕')
  459. except:
  460. self.logger.error("清仓程序执行出错")
  461. self.logger.error(traceback.format_exc())
  462. return
  463. async def go(self):
  464. '''
  465. 盘前
  466. 获取市场信息
  467. 获取账户信息
  468. 更改仓位模式(期货)
  469. 清空仓位和挂单
  470. 盘中
  471. 更新账户信息
  472. 更新挂单列表
  473. 更新仓位信息
  474. 更新延迟信息
  475. '''
  476. print('Rest循环器启动')
  477. interval = 60 # 不能太快防止占用限频
  478. ### beforeTrade
  479. await self.before_trade()
  480. await asyncio.sleep(1)
  481. ### onTrade
  482. loop = 0
  483. while 1:
  484. loop += 1
  485. try:
  486. # 停机信号
  487. if self.stop_flag:
  488. return
  489. # 更新账户
  490. res, err = await self.get_account()
  491. if res:
  492. for i in res['balances']:
  493. if self.quote == i['asset'].upper():
  494. self.callback['onEquity']({
  495. self.quote: float(i['free']) + float(i['locked'])
  496. })
  497. if i['asset'].upper() == self.base:
  498. self.callback['onEquity']({
  499. self.base: float(i['free']) + float(i['locked'])
  500. })
  501. await asyncio.sleep(interval)
  502. # 打印延迟
  503. self.get_delay_info()
  504. self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms')
  505. except asyncio.CancelledError:
  506. return
  507. except:
  508. traceback.print_exc()
  509. await asyncio.sleep(30)
  510. async def handle_signals(self, orders):
  511. '''
  512. 执行策略指令
  513. 撤销订单
  514. 检查订单
  515. 下达订单
  516. '''
  517. try:
  518. for order_name in orders:
  519. if 'Cancel' in order_name:
  520. cid = orders[order_name][0]
  521. oid = orders[order_name][1]
  522. if cid:
  523. asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid))
  524. elif oid:
  525. asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid))
  526. for order_name in orders:
  527. if 'Limits' in order_name:
  528. for i in orders[order_name]:
  529. asyncio.get_event_loop().create_task(self.take_order(
  530. self.symbol,
  531. i[0],
  532. i[1],
  533. i[2],
  534. i[3]
  535. ))
  536. for order_name in orders:
  537. if 'Check' in order_name:
  538. cid = orders[order_name][0]
  539. # oid = orders[order_name][1]
  540. asyncio.get_event_loop().create_task(self.check_order(client_id=cid))
  541. except Exception as e:
  542. traceback.print_exc()
  543. self.logger.error("执行信号出错"+str(e))
  544. await asyncio.sleep(0.1)