import time import traceback import copy import os from web3_py_client import EthClient from mexc_client import MexcClient from decimal import Decimal, ROUND_DOWN from as_utils import add_state_flow_entry from checker.logger_config import get_logger from pprint import pformat web3 = EthClient() web3_backup = EthClient(os.getenv("RPC_URL_2")) mexc = MexcClient() # 配置日志 logger = get_logger('as') class ArbitrageProcess: def __init__(self, gas_limit_multiplier, gas_price_multiplier, process_item, core_data, core_lock, pending_data, pending_lock, mexc_data, mexc_lock, ): """ 初始化套利流程 Args: gas_limit_multiplier: gas limit倍数, 一般都不加倍 gas_price_multiplier: gas price倍数, 可以提高交易成功率 process_item: 信號發送端傳入的原始參數 """ self.NETWORK = 'ETH' tx = process_item['tx'] tx.pop('gasPrice', None) tx.pop('value', None) tx.pop('minReceiveAmount', None) tx.pop('slippage', None) tx.pop('maxSpendAmount', None) tx.pop('signatureData', None) self.core_data = core_data self.core_lock = core_lock self.pending_data = pending_data self.pending_lock = pending_lock self.mexc_data = mexc_data self.mexc_lock = mexc_lock # symbol轉大寫 self.symbol = process_item['symbol'].upper() self.coin = self.symbol.split('_')[0] self.base_coin = self.symbol.split('_')[1] # 获取eth价格 with self.core_lock: self.eth_price = self.core_data['eth_price'] # 获取提现信息 with self.mexc_lock: self.withdraw_info = copy.deepcopy(self.mexc_data['coin_info_map'][self.base_coin][self.NETWORK]) self.WITHDRAW_FEE = Decimal(self.withdraw_info['withdrawFee']) # 提現手續費 withdraw_info_formated = pformat(self.withdraw_info, indent=2) logger.info(f'提現信息識別, 手續費:{self.WITHDRAW_FEE}\n{withdraw_info_formated}') self.tx = tx self.profit = Decimal(process_item['profit']) # 這個利潤是實際到手利潤 self.profit_limit = Decimal(process_item['profitLimit']) # 這個利潤是實際到手利潤的limit self.gas_limit_multiplier = gas_limit_multiplier self.gas_price_multiplier = gas_price_multiplier self.from_token_addr = process_item['fromToken'] self.from_token_decimal = Decimal(process_item['fromTokenDecimal']) self.to_token_addr = process_item['toToken'] self.to_token_decimal = Decimal(process_item['toTokenDecimal']) self.user_exchange_wallet = process_item['userExchangeWallet'] self.user_wallet = process_item['userWallet'] self.process_item = process_item # 存储当前套利交易的细节信息,例如买入数量、价格等 self.sell_price = Decimal(0) self.sell_value = Decimal(0) # 实际卖出价值 self.buy_price = Decimal(0) self.chain_tx_hash = None # 链上买入的tx hash self.exchange_sell_amount = Decimal(process_item['exchangeOutAmount']) # 交易所卖出量 self.actual_profit = Decimal(0) # 實際利潤 # 定义可能的状态 self.STATES = [ "CHECK", # 检查余额、估算gas等 "SELLING_ON_EXCHANGE", # 正在中心化交易所卖出现货 "WAITING_SELL_CONFIRM", # 等待现货卖出订单确认 "BUYING_ON_CHAIN", # 正在链上买入 "WAITING_CHAIN_CONFIRM", # 等待链上交易确认 "WAITING_EXCHANGE_ROLLBACK", # 等待交易所回滚 "COMPLETED", # 套利流程完成 "REJECT", # 套利被程序拒绝 "FAILED" # 套利流程失败 ] self.STATE_IDLE = "IDLE" self.STATE_CHECK = "CHECK" self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE" self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM" self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN" self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM" self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK" self.STATE_COMPLETED = "COMPLETED" self.STATE_REJECT = "REJECT" self.STATE_FAILED = "FAILED" # 所有前置信息获取都没有问题的话就等待开机信号 self.current_state = self.STATE_IDLE # --------------------------------------- 获取交易规则 --------------------------------------- exchange_info_params = { "symbols": self.symbol.replace('_', '') } exchange_info_rst = mexc.market.get_exchangeInfo(exchange_info_params) # 返回值检查 if 'symbols' not in exchange_info_rst or len(exchange_info_rst['symbols']) != 1: params_formated = pformat(exchange_info_params, indent=2) info_formated = pformat(exchange_info_rst, indent=2) msg = f'获取交易规则时出现错误\n{exchange_info_params}\n{info_formated}' logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self.current_state = self.STATE_FAILED return # 返回的交易对信息核对] exchange_info = exchange_info_rst['symbols'][0] if exchange_info['symbol'].upper() != self.symbol.replace('_', ''): info_formated = pformat(exchange_info, indent=2) msg = f'获取到的交易规则与交易币对无关\n{info_formated}' logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self.current_state = self.STATE_FAILED return # 精度取得, 假如是RATOUSDT这个交易对的话: self.coin_asset_precision = Decimal(f'1e-{exchange_info['baseAssetPrecision']}') # 这是RATO的精度 self.base_coin_asset_precision = Decimal(f'1e-{exchange_info['quoteAssetPrecision']}') # 这是USDT的精度 self.price_precision = Decimal(f'1e-{exchange_info['quotePrecision']}') # 这是价格的精度 def _set_state(self, state): """ 设置系统状态,并打印日志 """ if state in self.STATES: logger.info(f"状态变更:{self.current_state} -> {state}") logger.info('') self.current_state = state else: logger.error(f"尝试设置无效状态:{state}") def run_arbitrage_step(self): """ 根据当前状态执行套利流程的下一步 这是一个周期性调用的函数,例如在主循环中调用 """ if self.current_state == self.STATE_CHECK: self._execute_check() elif self.current_state == self.STATE_SELLING_ON_EXCHANGE: self._execute_sell_on_exchange() elif self.current_state == self.STATE_WAITING_SELL_CONFIRM: self._wait_sell_confirm() elif self.current_state == self.STATE_BUYING_ON_CHAIN: self._execute_buy_on_chain() elif self.current_state == self.STATE_WAITING_CHAIN_CONFIRM: self._wait_chain_confirm() elif self.current_state == self.STATE_WAITING_EXCHANGE_ROLLBACK: self._wait_exchange_rollback() elif self.current_state == self.STATE_COMPLETED: msg = "套利流程成功完成!" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") elif self.current_state == self.STATE_REJECT: msg = "套利流程被程序拒绝" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") elif self.current_state == self.STATE_FAILED: msg = "套利流程失败!" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") def _execute_check(self): """ 前置检查,防止低能错误 """ try: # step1,檢查交易所的餘額是否夠用 # 处理精度 pseudo_amount_to_sell = self.exchange_sell_amount.quantize(self.coin_asset_precision, rounding=ROUND_DOWN) # 交易所套保余额判断 with self.mexc_lock: balances = self.mexc_data['account_info']['balances'] for balance in balances: if balance['asset'] == self.coin: if Decimal(balance['free']) < pseudo_amount_to_sell: msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 不能触发套保交易。" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_REJECT) return else: msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 余额校验通过(可以套保)。" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") break # step2,估算gas logger.info("獲取區塊信息") with self.core_lock: latest_block = copy.deepcopy(self.core_data['block']) self.tx['maxPriorityFeePerGas'] = int(int(self.tx['maxPriorityFeePerGas']) * self.gas_price_multiplier) self.tx['maxFeePerGas'] = int(int(latest_block['baseFeePerGas']) * 2 + self.tx['maxPriorityFeePerGas']) gas_price = Decimal(self.tx['maxPriorityFeePerGas'] + self.tx['maxFeePerGas']) gas_price_gwei = gas_price / Decimal('1e9') gas_price_gwei = gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN) tx_formated = pformat(self.tx, indent=2) logger.info(f"鏈上各種校驗\n{tx_formated}") estimated_gas_origin = web3.w3.eth.estimate_gas(self.tx) estimated_gas = int(estimated_gas_origin * self.gas_limit_multiplier) estimated_wei = Decimal(estimated_gas) * gas_price estimated_eth = Decimal(estimated_wei / Decimal('1e18')) / Decimal(2) # 除以2才是比較接近正常消耗的gas費,否則會過於高估 estimated_eth = estimated_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN) msg = f"估算的燃气量: {estimated_gas}, eth消耗: {estimated_eth}, gas price: {gas_price_gwei} gwei, gas估算通過" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") # step3, 費用與利潤比較 estimated_eth_value = estimated_eth * self.eth_price estimated_eth_value = estimated_eth_value.quantize(Decimal('1e-2'), rounding=ROUND_DOWN) cost = estimated_eth_value + self.WITHDRAW_FEE # 成本 if self.profit < cost: msg = f"費用判斷不通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_REJECT) return msg = f"費用判斷通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") # step4, 與賬戶eth餘額比對(至少留0.002,不然沒gas了) MARGIN = Decimal(0.002) # 暫時鎖住core_data with self.core_lock: eth_balance = self.core_data['eth_balance'] if eth_balance - estimated_eth < MARGIN: msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_REJECT) return # 餘額判斷通過后預扣除balance,防止綫程更新不及時導致其他綫程誤發送tx self.core_data['eth_balance'] = self.core_data['eth_balance'] - estimated_eth msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") # final, 設定交易狀態,開始交易 self._set_state(self.STATE_SELLING_ON_EXCHANGE) except Exception as e: exc_traceback = traceback.format_exc() msg = f"前置檢查未通過\n{exc_traceback}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_REJECT) # traceback.print_exc() # 以下是每个状态对应的具体执行函数 def _execute_sell_on_exchange(self): """ 在中心化交易所卖出现货 """ msg = "执行:中心化交易所卖出现货..." logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "pending") try: order_times = 0 self.already_sold_amount = Decimal(0) while self.already_sold_amount < self.exchange_sell_amount and order_times < 5: order_times = order_times + 1 # 第一步直接卖出,这个数量用固定数量 pseudo_amount_to_sell = self.exchange_sell_amount - self.already_sold_amount # 处理精度 pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(self.coin_asset_precision, rounding=ROUND_DOWN) # 初始化 quantity 变量 quantity_for_api = None # 用求余法判断是否是整数 if pseudo_amount_to_sell % 1 == 0: # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量 quantity_for_api = int(pseudo_amount_to_sell) else: # 如果是非整数,转换为 float 类型。这是最常见的API数量类型 quantity_for_api = float(pseudo_amount_to_sell) order_params = { "symbol": self.symbol.replace('_', ''), "side": "SELL", "type": "MARKET", "quantity": quantity_for_api, } order_params_formated = pformat(order_params, indent=2) exchange_sell_order = mexc.trade.post_order(order_params) exchange_sell_order_formated = pformat(exchange_sell_order, indent=2) msg = f"交易所现货卖出订单已发送 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") if 'orderId' not in exchange_sell_order: continue # 查询交易所订单状态 exchange_sell_order_id = exchange_sell_order['orderId'] waiting_times = 5 while waiting_times > 0: params = { "symbol": self.symbol.replace('_', ''), "orderId": exchange_sell_order_id } order = mexc.trade.get_order(params) order_formated = pformat(order, indent=2) if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]: # 以实际成交数量为准 money = Decimal(order['cummulativeQuoteQty']) self.already_sold_amount = self.already_sold_amount + Decimal(order['executedQty']) self.sell_value = self.sell_value + money self.sell_price = self.sell_value / self.already_sold_amount self.sell_price = self.sell_price.quantize(self.price_precision, rounding=ROUND_DOWN) msg = f"交易所现货卖出订单已完成, 价格:{self.sell_price}, money: {money}\n order: {order_formated}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") break else: msg = f"交易所现货卖出失败\n order: {order_formated}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") time.sleep(1) waiting_times = waiting_times - 1 if order_times < 5: msg = 'mexc现货卖出流程完成' logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") self._set_state(self.STATE_BUYING_ON_CHAIN) else: msg = 'mexc现货卖出流程失败, 重试次数大于5' logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK) except Exception as e: exc_traceback = traceback.format_exc() msg = f"交易所现货卖出下单失败\n{exc_traceback}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_FAILED) # traceback.print_exc() def _execute_buy_on_chain(self): """ 在链上执行买入操作 """ msg = "执行:链上买入操作..." logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "pending") try: # 交易前nonce with self.core_lock: self.tx['nonce'] = self.core_data['nonce'] # 调用链上客户端执行买入交易 signed_tx = web3._sign(self.tx, self.gas_limit_multiplier) self.chain_tx_hash = web3.w3.to_hex(signed_tx.hash) try: # 主要节点先发交易 web3.w3.eth.send_raw_transaction(signed_tx.raw_transaction) # 使用备用节点再发一次交易 web3_backup.w3.eth.send_raw_transaction(signed_tx.raw_transaction) except Exception as e: msg = f"据反饋說链上买入失败:{e}, 交易哈希:{self.chain_tx_hash}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") # 交易成功后刷新全局nonce with self.core_lock: self.core_data['nonce'] = self.core_data['nonce'] + 1 block_number = self.core_data['block_number'] # 將hash放入pending裏,等待確認 with self.pending_lock: self.pending_data[self.chain_tx_hash] = { "block_number": block_number, "tx_details": None, "reponse": None, } # 交易成功 msg = f"再次確認交易是否上鏈:{self.chain_tx_hash}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") self._set_state(self.STATE_WAITING_CHAIN_CONFIRM) except Exception as e: exc_traceback = traceback.format_exc() msg = f"鏈上買入未處理的錯誤, 交易哈希:{self.chain_tx_hash}\n{exc_traceback}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_WAITING_CHAIN_CONFIRM) # traceback.print_exc() def _wait_chain_confirm(self): """ 等待链上交易确认 """ chain_tx_hash = self.chain_tx_hash msg = f"等待链上交易确认:{chain_tx_hash}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "pending") try: # 給120秒時間進行確認 waiting_times = 120 while waiting_times > 0: with self.pending_lock: tx_details = copy.deepcopy(self.pending_data[chain_tx_hash]['tx_details']) if tx_details is None: waiting_times = waiting_times - 1 time.sleep(1) continue # # 交易確認后,移除出pending列表 # with self.pending_lock: # del self.pending_data[chain_tx_hash] # 交易失敗的邏輯處理,直接進行回滾 if 'fromTokenDetails' not in tx_details \ or 'toTokenDetails' not in tx_details: msg = f"链上交易失敗。{tx_details}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK) break tx_details_formated = pformat(tx_details, indent=2) msg = f"链上交易已确认。\n details: {tx_details_formated}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") # 獲取交易信息 from_token_details = tx_details['fromTokenDetails'] to_token_details = tx_details['toTokenDetails'] from_token_amount = Decimal(from_token_details['amount']) from_token_amount_human = from_token_amount / (Decimal(10) ** self.from_token_decimal) from_token_amount_human = from_token_amount_human.quantize(self.base_coin_asset_precision, rounding=ROUND_DOWN) to_token_amount = Decimal(to_token_details['amount']) to_token_amount_human = to_token_amount / (Decimal(10) ** self.to_token_decimal) self.buy_price = from_token_amount_human / to_token_amount_human self.buy_price = self.buy_price.quantize(self.price_precision, rounding=ROUND_DOWN) # 交易預估利潤百分比計算 rate = self.sell_price / self.buy_price rate = rate.quantize(Decimal('1e-4'), rounding=ROUND_DOWN) msg = f"【比率{rate}】。用{from_token_amount_human}买入{to_token_amount_human},价格{self.buy_price}。" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") # 判斷快速二賣條件 diff = to_token_amount_human - self.exchange_sell_amount diff = diff.quantize(self.coin_asset_precision, rounding=ROUND_DOWN) value = diff * self.sell_price value = value.quantize(Decimal('1e-4'), rounding=ROUND_DOWN) if value > 2: msg = f"滿足二賣條件,{diff}*{self.sell_price} = {value}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") # 初始化 quantity 变量 quantity_for_api = None # 用求余法判断是否是整数 if diff % 1 == 0: # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量 quantity_for_api = int(diff) else: # 如果是非整数,转换为 float 类型。这是最常见的API数量类型 quantity_for_api = float(diff) order_params = { "symbol": self.symbol.replace('_', ''), "side": "SELL", "type": "MARKET", "quantity": quantity_for_api, } order_params_formated = pformat(order_params, indent=2) exchange_sell_order = mexc.trade.post_order(order_params) exchange_sell_order_formated = pformat(exchange_sell_order, indent=2) if 'orderId' not in exchange_sell_order: msg = f"交易所现货二卖下单失败 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") else: oid = exchange_sell_order['orderId'] # 查询交易所订单状态 waiting_times_inner = 30 last_order = None while waiting_times_inner > 0: params = { "symbol": self.symbol.replace('_', ''), "orderId": oid } order = mexc.trade.get_order(params) order_formated = pformat(order, indent=2) last_order = order if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]: money = Decimal(order['cummulativeQuoteQty']) msg = f"交易所现货二卖订单已完成, money: {money}。\n order: {order_formated}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") break else: time.sleep(1) waiting_times_inner = waiting_times_inner - 1 if waiting_times_inner <= 0: last_order_formated = pformat(last_order, indent=2) msg = f"交易所现货二卖订单失敗, 最後狀態:{last_order_formated}。" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") else: msg = f"不滿足二賣條件,{diff}*{self.sell_price} = {value}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") # 計算實際利潤 actual_profit = value actual_gas_price = Decimal(tx_details['gasPrice']) actual_gas_price_gwei = actual_gas_price / Decimal('1e9') actual_gas_price_gwei = actual_gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN) actual_gas_used = Decimal(tx_details['gasUsed']) actual_wei = actual_gas_price * actual_gas_used actual_eth = actual_wei / Decimal('1e18') actual_eth = actual_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN) actual_fee_used = actual_eth * self.eth_price actual_fee_used = actual_fee_used.quantize(Decimal('1e-4'), rounding=ROUND_DOWN) actual_profit = value - actual_fee_used - self.WITHDRAW_FEE msg = f"【最終利潤】{actual_profit}{self.base_coin}(已扣除所有手續費、滑點)\ \n鏈上ETH使用: {actual_eth}({actual_fee_used} USD), gas_price: {actual_gas_price_gwei} GWEI, gas_used: {actual_gas_used}\ \n交易所出售代幣利潤: {value}, 提現手續費: {self.WITHDRAW_FEE}\ " logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") self._set_state(self.STATE_COMPLETED) break # 如果300秒都沒確認成功,該交易大概率沒有上鏈 if waiting_times <= 0: with self.pending_lock: response = copy.deepcopy(self.pending_data[chain_tx_hash]) response_formated = pformat(response, indent=2) msg = f"链上交易确认失败:{chain_tx_hash}\n{response_formated}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK) except Exception as e: exc_traceback = traceback.format_exc() msg = f"查询链上确认状态时发生错误\n{exc_traceback}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK) # traceback.print_exc() def _wait_exchange_rollback(self): """ 市价进行交易所交易回滚 """ msg = "执行:中心化交易所买入现货回滚..." logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "pending") try: # 使用预提现数量进行回滚 pseudo_amount_to_buy = Decimal(self.sell_value) # 处理精度 pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(self.base_coin_asset_precision, rounding=ROUND_DOWN) # 剩余计价币余额 free_balance = Decimal(0) # 交易所U余额判断 with self.mexc_lock: balances = self.mexc_data['account_info']['balances'] for balance in balances: if balance['asset'] == self.base_coin: free_balance = Decimal(balance['free']) pseudo_amount_to_buy = min(free_balance, pseudo_amount_to_buy) if pseudo_amount_to_buy < Decimal('10'): msg = f"交易所剩余{self.base_coin}: {free_balance}, 小于10, 不能触发回滚交易。" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_FAILED) return else: msg = f"交易所剩余{self.base_coin}: {free_balance}, 交易所准备使用:{pseudo_amount_to_buy}, 余额校验通过(可以回滚)。" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") break # 有可能会遇到手续费占用问题 ready_to_buy = self.exchange_sell_amount if self.exchange_sell_amount * self.sell_price * Decimal(1.002) >= free_balance: ready_to_buy = (free_balance * Decimal(0.998)) / self.sell_price ready_to_buy = ready_to_buy.quantize(self.coin_asset_precision, rounding=ROUND_DOWN) # 初始化 quantity 变量 quantity_for_api = None # 用求余法判断是否是整数 if ready_to_buy % 1 == 0: # 如果是整数,转换为 int 类型。某些API可能只接受整数交易对的整数数量 quantity_for_api = int(ready_to_buy) else: # 如果是非整数,转换为 float 类型。这是最常见的API数量类型 quantity_for_api = float(ready_to_buy) order_params = { "symbol": self.symbol.replace('_', ''), "side": "BUY", "price": self.sell_price, "type": "LIMIT", "quantity": quantity_for_api, } order_params_formated = pformat(order_params, indent=2) exchange_buy_order = mexc.trade.post_order(order_params) exchange_buy_order_formated = pformat(exchange_buy_order, indent=2) if 'orderId' not in exchange_buy_order: msg = f"【回滚】交易所现货买入下单失败\n params:{order_params_formated}\norder: {exchange_buy_order_formated}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state("FAILED") return exchange_buy_order_id = exchange_buy_order['orderId'] msg = f"【回滚】交易所现货买入订单已发送, 订单ID: {exchange_buy_order_id}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") # 查询交易所订单状态 last_query_rst = None while True: params = { "symbol": self.symbol.replace('_', ''), "orderId": exchange_buy_order_id } order = mexc.trade.get_order(params) order_formated = pformat(order, indent=2) last_query_rst = order if order['status'] == "FILLED": money = Decimal(order['cummulativeQuoteQty']) amount = self.exchange_sell_amount price = money / amount price = price.quantize(self.price_precision, rounding=ROUND_DOWN) msg = f"【回滚】交易所现货买入订单已完全成交, 价格:{price}。\norder: {order_formated}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "success") self._set_state(self.STATE_FAILED) return else: # 继续等待成交 pass time.sleep(1) last_query_rst_formated = pformat(last_query_rst, indent=2) msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_buy_order_id}\n最终状态:{last_query_rst_formated}" logger.info(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_FAILED) except Exception as e: exc_traceback = traceback.format_exc() msg = f"【回滚】交易所回滚交易失败\n{exc_traceback}" logger.error(msg) add_state_flow_entry(self.process_item, self.current_state, msg, "fail") self._set_state(self.STATE_FAILED) # traceback.print_exc()