import aiohttp import time import asyncio import zlib import json import hmac import base64 import hashlib import traceback import random, sys from urllib.parse import urlparse import logging, logging.handlers import model, utils from decimal import Decimal def empty_call(msg): print(f'空的回调函数 {msg}') class BinanceCoinSwapRest: def __init__(self, params:model.ClientParams, colo=0): if colo: print('不支持colo高速线路') self.HOST = 'https://dapi.binance.com' else: self.HOST = 'https://dapi.binance.com' self.params = params self.name = self.params.name self.base = self.params.pair.split('_')[0].upper() self.quote = self.params.pair.split('_')[1].upper() self.symbol = self.base + self.quote if len(self.params.pair.split('_')) > 2: self.delivery = self.params.pair.split('_')[2] # 210924 self.symbol += f"_{self.delivery}" else: self.symbol += '_PERP' self.data = {} self._SESSIONS = dict() self.callback = { "onMarket":empty_call, "onPosition":empty_call, "onOrder":empty_call, "onEquity":empty_call, "onExit":empty_call, } self.exchange_info = dict() self.stepSize = None self.tickSize = None self.delays = [] self.avg_delay = 0 self.max_delay = 0 self.proxy = None self.broker_id = self.params.broker_id if 'win' in sys.platform: self.proxy = self.params.proxy self.logger = self.get_logger() self.multiplier = None self.mp = 0.0 # 初始mp 在check_position中更新 #### 指定发包ip iplist = utils.get_local_ip_list() self.ip = iplist[int(self.params.ip)] def get_logger(self): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # log to txt formatter = logging.Formatter('[%(asctime)s] - %(levelname)s - %(message)s') handler = logging.handlers.RotatingFileHandler(f"log.log",maxBytes=1024*1024) handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) return logger def _get_session(self, url): parsed_url = urlparse(url) key = parsed_url.netloc or parsed_url.hostname if key not in self._SESSIONS: tcp = aiohttp.TCPConnector(limit=50,keepalive_timeout=120,verify_ssl=False,local_addr=(self.ip,0)) session = aiohttp.ClientSession(connector=tcp) self._SESSIONS[key] = session return self._SESSIONS[key] async def _request(self, method, uri, body=None, params=None, HOST=None): headers = {} headers["Content-Type"] = "application/json" headers['X-MBX-APIKEY'] = self.params.access_key params['timestamp']=int(time.time())*1000 query_string = "&".join(["{}={}".format(k, params[k]) for k in params.keys()]) signature = hmac.new(self.params.secret_key.encode(), msg=query_string.encode(), digestmod=hashlib.sha256).hexdigest() params['signature']=signature if HOST == None: url = self.HOST + uri else: url = HOST + uri # 发起请求 timeout = aiohttp.ClientTimeout(10) session = self._get_session(url) msg = "rest请求记录" + str(method) + str(url) + str(params) + str(body) self.logger.debug(msg) try: start_time = time.time() if method == "GET": response = await session.get(url, params=params, headers=headers, timeout=timeout, proxy=self.proxy) elif method == "POST": response = await session.post(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy) elif method == "DELETE": response = await session.delete(url, params=params, data=body, headers=headers, timeout=timeout, proxy=self.proxy) code = response.status res = await response.json() if code not in (200, 201, 202, 203, 204, 205, 206): print(f'URL:{url} PARAMS:{params} body:{body} ERROR:{res}') self.logger.error(res) return None, str(res) delay = int(1000*(time.time() - start_time)) self.delays.append(delay) return res, None except Exception as e: print('网络请求错误') print(f'URL:{url} PARAMS:{params} ERROR:{e}') self.logger.error(e) self.logger.error(traceback.format_exc()) return None, str(e) def get_delay_info(self): if len(self.delays) > 100: self.delays = self.delays[-100:] if max(self.delays) > self.max_delay:self.max_delay = max(self.delays) self.avg_delay = round(sum(self.delays)/len(self.delays),1) async def take_order(self, symbol, amount, origin_side, price, cid="", order_type='LIMIT'): ''' 下单接口 input amount 单位为 币 需要转换为 张 ''' if symbol not in self.exchange_info: await self.before_trade() # price = float(Decimal(str(price//self.tickSize))*Decimal(str(self.tickSize))) # amount = amount * price / self.multiplier # amount = float(Decimal(str(amount//self.stepSize))*Decimal(str(self.stepSize))) # 转换为张 if origin_side =='kd': side = 'BUY' positionSide = 'LONG' elif origin_side =='pd': side = 'SELL' positionSide = 'LONG' elif origin_side =='kk': side = 'SELL' positionSide = 'SHORT' elif origin_side =='pk': side = 'BUY' positionSide = 'SHORT' else: raise Exception(f'下单参数错误 side:{origin_side}') if amount <= 0: self.logger.error(f'下单参数错误 amount:{amount}') return None if price <= 0: self.logger.error(f'下单参数错误 price:{price}') return None params = { 'symbol': symbol, 'quantity': amount, 'side': side, 'positionSide': positionSide, 'type':order_type, 'newClientOrderId':cid, } if order_type in ['LIMIT','STOP','TAKE_PROFIT']: params['price'] = price params['timeInForce'] = 'GTC' if self.params.debug == 'True': await asyncio.sleep(0.1) return None, None else: # 再报单 response, error = await self._request('POST', '/dapi/v1/order', params=params) # 再更新 if response is not None: if 'orderId' in response: order_event = dict() order_event['status'] = "NEW" order_event['client_id'] = params["newClientOrderId"] order_event['order_id'] = response['orderId'] self.callback["onOrder"](order_event) if error: order_event = dict() order_event['status'] = "REMOVE" order_event['filled_price'] = 0 order_event['filled'] = 0 order_event['client_id'] = params["newClientOrderId"] self.callback["onOrder"](order_event) return response, error async def cancel_order(self, order_id=None, client_id=None, symbol=None): params = { "symbol": self.symbol if symbol==None else symbol, } if order_id: params["orderId"] = order_id if client_id: params["origClientOrderId"] = client_id if self.params.debug == 'True': await asyncio.sleep(0.1) return None else: response, error = await self._request('DELETE', f'/dapi/v1/order', params=params) if error: print("撤单失败",error) # if 'Unknown order sent.' in error: # 撤单失败 可能已经撤单 是否发生成交需要rest查 if client_id:await self.check_order(client_id=client_id) if order_id:await self.check_order(order_id=order_id) return error if response: if 'status' in response: if response['status'] in ['CANCELED','EXPIRED']: # 已撤销 删除本地订单表 order_event = dict() order_event['status'] = "REMOVE" order_event['client_id'] = response["clientOrderId"] order_event['order_id'] = response["orderId"] order_event['filled'] = float(response["executedQty"]) * self.multiplier / float(response["price"]) order_event['filled_price'] = float(response["price"]) self.callback['onOrder'](order_event) return response async def check_order(self, order_id=None, client_id=None, symbol=None): params = { "symbol": self.symbol if symbol==None else symbol, } if order_id: params["orderId"] = order_id if client_id: params["origClientOrderId"] = client_id if self.params.debug == 'True': await asyncio.sleep(0.1) return None else: response, error = await self._request('GET', f'/dapi/v1/order', params=params) if error: print("查单失败", error) if 'Order does not exist' in error: # 这种情况也可能还会有成交 # 在订单从引擎到数据库的间隙查单会提示不存在 但实际有成交 pass return error if response: if 'status' in response: # 需要删除本地订单表的情况 if response['status'] in ['CANCELED','EXPIRED','FILLED']: order_event = dict() order_event['status'] = "REMOVE" order_event['client_id'] = response["clientOrderId"] order_event['order_id'] = response["orderId"] order_event['filled'] = float(response["executedQty"]) * self.multiplier / float(response["price"]) order_event['filled_price'] = float(response["price"]) self.callback['onOrder'](order_event) elif response['status'] in ["NEW"]: # 需要更新本地表的情况 order_event = dict() order_event['status'] = "NEW" order_event['client_id'] = response["clientOrderId"] order_event['order_id'] = response['orderId'] self.callback['onOrder'](order_event) return response async def get_order_list(self): ''' 获取挂单表 ''' response, error = await self._request('GET', '/dapi/v1/openOrders', params={'symbol':self.symbol}) orders = [] # 查询当前挂单 只可能出现 new 和 partfill 默认成交为0 只有 done状态的订单才考虑是否有成交 if response: for i in response: order_event = dict() order_event['status'] = "NEW" order_event['filled'] = 0 order_event['filled_price'] = 0 order_event['client_id'] = i["clientOrderId"] order_event['order_id'] = i['orderId'] self.callback["onOrder"](order_event) orders.append(order_event) if error: print('查询列表出错',error) return orders async def get_server_time(self): params = {} response = await self._request('GET', '/dapi/v1/time', params=params) return response async def change_pos_side(self, dual='true'): ''' 获取仓位模式 ''' params = {'dualSidePosition':dual} response = await self._request('POST', '/dapi/v1/positionSide/dual', params=params) return response async def get_ticker(self): params = {'symbol': self.symbol} response = await self._request('GET', '/dapi/v1/ticker/bookTicker', params=params) ap = float(response[0][0]["askPrice"]) bp = float(response[0][0]["bidPrice"]) mp = (ap+bp)*0.5 d = {"name":self.name,'mp': mp, 'bp':bp, 'ap':ap} return d async def before_trade(self): params = {} response, error = await self._request('GET', '/dapi/v1/exchangeInfo', params=params) if response: for i in response['symbols']: if self.symbol in i['symbol'].upper(): self.tickSize = float(i['filters'][0]['tickSize']) self.stepSize = float(i['filters'][1]['stepSize']) self.multiplier = float(i['contractSize']) #### 保存交易规则信息 exchange_info = model.ExchangeInfo() exchange_info.symbol = i['symbol'].upper() exchange_info.multiplier = float(i['contractSize']) exchange_info.tickSize = float(i['filters'][0]['tickSize']) exchange_info.stepSize = float(i['filters'][1]['stepSize']) self.exchange_info[exchange_info.symbol] = exchange_info if error: print('获取市场信息错误',error) ### ticker = await self.get_ticker() ### res = await self.get_account() if res: for i in res: if self.base == i['asset'].upper(): self.data['equity'] = float(i['balance']) * ticker["mp"] self.callback['onEquity'](self.data['equity']) if error: print('获取账户信息错误',error) async def universalTransfer(self, _type='UMFUTURE_MAIN', asset='USDT', amount=0): params = {} params['type'] = _type params['asset'] = asset params['amount'] = amount print('发起提现') response = await self._request('POST', '/sapi/v1/asset/transfer', params=params, HOST='https://api.binance.com') print(f'提现结果 {response}') return response async def futuresTransfer(self, _type='2', asset='USDT', amount=0): ''' 1: 现货账户向USDT合约账户划转 2: USDT合约账户向现货账户划转 3: 现货账户向币本位合约账户划转 4: 币本位合约账户向现货账户划转 ''' params = {} params['type'] = _type params['asset'] = asset params['amount'] = amount print('发起转账') response = await self._request('POST', '/sapi/v1/futures/transfer', params=params, HOST='https://api.binance.com') print(f'转账结果 {response}') return response async def get_account(self): response, error = await self._request('GET','/dapi/v1/balance', params={}) return response async def buy_token(self): '''买入平台币''' pass async def check_position(self, hold_coin=0): '''检查是否存在非运行币种的仓位并take平仓''' self.logger.info('检查遗漏订单') response, error = await self._request('GET', '/dapi/v1/openOrders', params={'symbol':self.symbol}) self.logger.info(response) self.logger.info(error) if error:self.logger.error(error) if response: for i in response: symbol = i['symbol'] order_id = i['orderId'] if symbol == self.symbol: res = await self.cancel_order(order_id=i['orderId'], symbol=i['symbol']) await asyncio.sleep(0.1) self.logger.info(res) self.logger.info('检查遗漏仓位') response, error = await self._request('GET','/dapi/v2/positionRisk', params={}) if error is not None:self.logger.error(error) if response: for i in response: symbol = i['symbol'] if symbol == self.symbol: if i['positionSide'] == 'LONG': longPos = abs(float(i['positionAmt'])) longAvg = abs(float(i['entryPrice'])) if longPos > 0: self.logger.info('发现多头遗留仓位 进行平仓') res, error = await self.take_order( symbol, longPos, 'pd', 1, utils.get_cid(), order_type='MARKET', ) self.logger.info(res) self.logger.info(error) if i['positionSide'] == 'SHORT': shortPos = abs(float(i['positionAmt'])) shortAvg = abs(float(i['entryPrice'])) if shortPos > 0: self.logger.info('发现空头遗留仓位 进行平仓') res, error = await self.take_order( symbol, shortPos, 'pk', 1, utils.get_cid(), order_type='MARKET', ) self.logger.info(res) self.logger.info(error) self.logger.info('遗留仓位检测完毕') return async def get_position(self): ''' 获取仓位信息 ''' response, error = await self._request('GET','/dapi/v2/positionRisk', params={'symbol':self.symbol}) longPos, shortPos = 0, 0 longAvg, shortAvg = 0, 0 position = { "longPos":abs(longPos), "shortPos":abs(shortPos), "longAvg":abs(longAvg), "shortAvg":abs(shortAvg)} if response: for i in response: if i['symbol'] == self.symbol: if i['positionSide'] == 'LONG': longPos = float(i['positionAmt']) longAvg = float(i['entryPrice']) if i['positionSide'] == 'SHORT': shortPos = float(i['positionAmt']) shortAvg = float(i['entryPrice']) position = model.Position() position.longPos = abs(longPos) position.longAvg = abs(longAvg) position.shortPos = abs(shortPos) position.shortAvg = abs(shortAvg) self.callback['onPosition'](position) return position async def go(self): ''' 盘前 获取市场信息 获取账户信息 更改仓位模式(期货) 清空仓位和挂单 盘中 更新账户信息 更新挂单列表 更新仓位信息 更新延迟信息 ''' print('Rest循环器启动') interval = 60 # 不能太快防止占用限频 ### beforeTrade await self.before_trade() await asyncio.sleep(1) await self.change_pos_side() await asyncio.sleep(1) ### onTrade loop = 0 while 1: loop += 1 try: # 更新账户 res = await self.get_account() if res: for i in res: if self.quote == i['asset'].upper(): self.data['equity'] = float(i['balance']) self.callback['onEquity'](self.data['equity']) # 更新仓位 position = await self.get_position() await asyncio.sleep(interval) # 打印延迟 self.get_delay_info() self.logger.debug(f'报单延迟 平均{self.avg_delay}ms 最大{self.max_delay}ms') except: traceback.print_exc() await asyncio.sleep(30) def get_data(self): return self.data async def handle_signals(self, orders): ''' 执行策略指令 撤销订单 检查订单 下达订单 ''' try: for order_name in orders: if 'Cancel' in order_name: cid = orders[order_name][0] oid = orders[order_name][1] if cid: asyncio.get_event_loop().create_task(self.cancel_order(client_id=cid)) elif oid: asyncio.get_event_loop().create_task(self.cancel_order(order_id=oid)) for order_name in orders: if 'Limits' in order_name: for i in orders[order_name]: asyncio.get_event_loop().create_task(self.take_order( self.symbol, i[0], i[1], i[2], i[3] )) for order_name in orders: if 'Check' in order_name: cid = orders[order_name][0] # oid = orders[order_name][1] asyncio.get_event_loop().create_task(self.check_order(client_id=cid)) except Exception as e: traceback.print_exc() self.logger.error("执行信号出错"+str(e)) await asyncio.sleep(0.1)