Browse Source

备用节点

skyfffire 4 months ago
parent
commit
11e224eed7
4 changed files with 802 additions and 6 deletions
  1. 5 0
      s_erc20_to_mexc.py
  2. 789 0
      s_mexc_to_erc20.py
  3. 5 5
      submit_process_demo.py
  4. 3 1
      toto.readme

+ 5 - 0
s_erc20_to_mexc.py

@@ -1,6 +1,7 @@
 import time
 import traceback
 import copy
+import os
 from web3_py_client import EthClient
 from mexc_client import MexcClient
 from decimal import Decimal, ROUND_DOWN
@@ -9,6 +10,7 @@ from checker.logger_config import get_logger
 from pprint import pformat
 
 web3 = EthClient()
+web3_backup = EthClient(os.getenv("RPC_URL_2"))
 mexc = MexcClient()
 
 # 配置日志
@@ -407,7 +409,10 @@ class ArbitrageProcess:
             signed_tx = web3._sign(self.tx, self.gas_limit_multiplier)
             self.chain_tx_hash = web3.w3.to_hex(signed_tx.hash)
             try:
+                # 主要节点先发交易
                 web3.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
+                # 使用备用节点再发一次交易
+                web3_backup.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
             except Exception as e:
                 msg = f"据反饋說链上买入失败:{e}, 交易哈希:{self.chain_tx_hash}"
                 logger.error(msg)

+ 789 - 0
s_mexc_to_erc20.py

@@ -0,0 +1,789 @@
+import time
+import traceback
+import copy
+import os
+from web3_py_client import EthClient
+from mexc_client import MexcClient
+from decimal import Decimal, ROUND_DOWN
+from as_utils import add_state_flow_entry
+from checker.logger_config import get_logger
+from pprint import pformat
+
+web3 = EthClient()
+mexc = MexcClient()
+
+# 配置日志
+logger = get_logger('as')
+
+class ArbitrageProcess:
+    def __init__(self, gas_limit_multiplier, gas_price_multiplier, process_item, 
+        core_data, core_lock,
+        pending_data, pending_lock,
+        mexc_data, mexc_lock,
+    ):
+        """
+        初始化套利流程
+
+        Args:
+            gas_limit_multiplier: gas limit倍数, 一般都不加倍
+            gas_price_multiplier: gas price倍数, 可以提高交易成功率
+            process_item: 信號發送端傳入的原始參數
+        """
+        self.NETWORK = 'ETH'
+
+        tx = process_item['tx']
+        tx.pop('gasPrice', None)
+        tx.pop('value', None)
+        tx.pop('minReceiveAmount', None)
+        tx.pop('slippage', None)
+        tx.pop('maxSpendAmount', None)
+        tx.pop('signatureData', None)
+
+        self.core_data = core_data
+        self.core_lock = core_lock
+        self.pending_data = pending_data
+        self.pending_lock = pending_lock
+        self.mexc_data = mexc_data
+        self.mexc_lock = mexc_lock
+
+        # symbol轉大寫
+        self.symbol = process_item['symbol'].upper()
+
+        self.coin = self.symbol.split('_')[0]
+        self.base_coin = self.symbol.split('_')[1]
+
+        with self.core_lock:
+            self.eth_price = self.core_data['eth_price']
+        
+        with self.mexc_lock:
+            self.withdraw_info = copy.deepcopy(self.mexc_data['coin_info_map'][self.base_coin][self.NETWORK])
+            self.WITHDRAW_FEE = Decimal(self.withdraw_info['withdrawFee']) # 提現手續費
+
+            withdraw_info_formated = pformat(self.withdraw_info, indent=2)
+            logger.info(f'提現信息識別, 手續費:{self.WITHDRAW_FEE}\n{withdraw_info_formated}')
+
+        self.tx = tx
+        
+        self.profit = Decimal(process_item['profit'])               # 這個利潤是實際到手利潤
+        self.profit_limit = Decimal(process_item['profitLimit'])    # 這個利潤是實際到手利潤的limit
+
+        self.gas_limit_multiplier = gas_limit_multiplier
+        self.gas_price_multiplier = gas_price_multiplier
+
+        self.from_token_addr = process_item['fromToken']
+        self.from_token_decimal = Decimal(process_item['fromTokenDecimal'])
+        self.to_token_addr = process_item['toToken']
+        self.to_token_decimal = Decimal(process_item['toTokenDecimal'])
+
+        self.user_exchange_wallet = process_item['userExchangeWallet']
+        self.user_wallet = process_item['userWallet']
+        self.process_item = process_item
+
+
+        # 存储当前套利交易的细节信息,例如买入数量、价格等
+        self.sell_price = Decimal(0)
+        self.buy_price = Decimal(0)
+        self.chain_tx_hash = None                                               # 链上买入的tx hash
+        self.chain_usdt_use = Decimal(process_item['fromTokenAmountHuman'])     # 链上usdt减少量(使用量)
+        self.chain_buy_amount = Decimal(0)                                      # 链上币增加量(购入量), todo, 暂用即时余额代替
+        self.exchange_sell_amount = Decimal(process_item['exchangeOutAmount'])  # 交易所卖出量
+        self.exchange_sell_order_id = None                                      # 交易所卖出id
+        self.exchange_withdrawal_id = None                                      # 交易所提现id
+        self.exchange_withdrawal_amount = None                                  # 交易所提现数量
+        self.actual_profit = Decimal(0)                                         # 實際利潤
+
+        # 定义可能的状态
+        self.STATES = [
+            "CHECK",                        # 检查余额、估算gas等
+            "SELLING_ON_EXCHANGE",          # 正在中心化交易所卖出现货
+            "WAITING_SELL_CONFIRM",         # 等待现货卖出订单确认
+            "BUYING_ON_CHAIN",              # 正在链上买入
+            "WAITING_CHAIN_CONFIRM",        # 等待链上交易确认
+            "WAITING_EXCHANGE_ROLLBACK",    # 等待交易所回滚
+            # "HEDGING_ON_EXCHANGE",          # 正在中心化交易所套保
+            # "WAITING_HEDGE_CONFIRM",        # 等待套保订单确认
+            # "TRANSFERRING_TO_EXCHANGE",     # 正在向交易所转账
+            # "CLOSING_HEDGE",                # 正在平掉套保单
+            # "WAITING_CLOSE_HEDGE_CONFIRM",  # 等待平掉套保单确认
+            "WAITING_TRANSFER_ARRIVE",      # 等待交易所充值到账
+            "TRANSFERRING_TO_CHAIN",        # 正在向链上转账
+            "WAITING_WITHDRAWAL_CONFIRM",   # 等待链上提现确认
+            "COMPLETED",                    # 套利流程完成
+            "REJECT",                       # 套利被程序拒绝
+            "FAILED"                        # 套利流程失败
+        ]
+
+        self.STATE_IDLE = "IDLE"
+        self.STATE_CHECK = "CHECK"
+        self.STATE_SELLING_ON_EXCHANGE = "SELLING_ON_EXCHANGE"
+        self.STATE_WAITING_SELL_CONFIRM = "WAITING_SELL_CONFIRM"
+        self.STATE_BUYING_ON_CHAIN = "BUYING_ON_CHAIN"
+        self.STATE_WAITING_CHAIN_CONFIRM = "WAITING_CHAIN_CONFIRM"
+        self.STATE_WAITING_EXCHANGE_ROLLBACK = "WAITING_EXCHANGE_ROLLBACK"
+        # self.STATE_TRANSFERRING_TO_EXCHANGE = "TRANSFERRING_TO_EXCHANGE"
+        self.STATE_WAITING_TRANSFER_ARRIVE = "WAITING_TRANSFER_ARRIVE"
+        self.STATE_TRANSFERRING_TO_CHAIN = "TRANSFERRING_TO_CHAIN"
+        self.STATE_WAITING_WITHDRAWAL_CONFIRM = "WAITING_WITHDRAWAL_CONFIRM"
+        self.STATE_COMPLETED = "COMPLETED"
+        self.STATE_REJECT = "REJECT"
+        self.STATE_FAILED = "FAILED"
+
+        self.current_state = "IDLE"
+
+    def _set_state(self, state):
+        """
+        设置系统状态,并打印日志
+        """
+        if state in self.STATES:
+            logger.info(f"状态变更:{self.current_state} -> {state}")
+            logger.info('')
+            self.current_state = state
+        else:
+            logger.error(f"尝试设置无效状态:{state}")
+
+    def run_arbitrage_step(self):
+        """
+        根据当前状态执行套利流程的下一步
+        这是一个周期性调用的函数,例如在主循环中调用
+        """
+
+        if self.current_state == self.STATE_CHECK:
+            self._execute_check()
+
+        elif self.current_state == self.STATE_SELLING_ON_EXCHANGE:
+            self._execute_sell_on_exchange()
+
+        elif self.current_state == self.STATE_WAITING_SELL_CONFIRM:
+            self._wait_sell_confirm()
+
+        elif self.current_state == self.STATE_BUYING_ON_CHAIN:
+            self._execute_buy_on_chain()
+
+        elif self.current_state == self.STATE_WAITING_CHAIN_CONFIRM:
+            self._wait_chain_confirm()
+
+        elif self.current_state == self.STATE_WAITING_EXCHANGE_ROLLBACK:
+            self._wait_exchange_rollback()
+
+        # elif self.current_state == "TRANSFERRING_TO_EXCHANGE":
+        #     self._execute_transfer_to_exchange()
+
+        elif self.current_state == self.STATE_WAITING_TRANSFER_ARRIVE:
+            self._wait_transfer_arrive()
+
+        elif self.current_state == self.STATE_TRANSFERRING_TO_CHAIN:
+             self._execute_transfer_to_chain()
+
+        elif self.current_state == self.STATE_WAITING_WITHDRAWAL_CONFIRM:
+             self._wait_withdrawal_confirm()
+
+        elif self.current_state == self.STATE_COMPLETED:
+            msg = "套利流程成功完成!"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+        elif self.current_state == self.STATE_REJECT:
+            msg = "套利流程被程序拒绝"
+            logger.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
+        elif self.current_state == self.STATE_FAILED:
+            msg = "套利流程失败!"
+            logger.error(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
+    def _execute_check(self):
+        """
+        前置检查,防止低能错误
+        """
+        try:
+            # step1,檢查交易所的餘額是否夠用
+            # 处理精度
+            pseudo_amount_to_sell = self.exchange_sell_amount.quantize(Decimal('1'), rounding=ROUND_DOWN)
+
+            # 交易所套保余额判断
+            with self.mexc_lock:
+                balances = self.mexc_data['account_info']['balances']
+
+                for balance in balances:
+                    if balance['asset'] == self.coin:
+                        if Decimal(balance['free']) < pseudo_amount_to_sell:
+                            msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 不能触发套保交易。"
+                            logger.info(msg)
+                            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                            self._set_state(self.STATE_REJECT)
+                            return
+                        else:
+                            msg = f"交易所剩余{self.coin}: {balance['free']}, 交易所准备卖出:{pseudo_amount_to_sell}, 余额校验通过(可以套保)。"
+                            logger.info(msg)
+                            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                            break
+
+            # step2,估算gas
+            logger.info("獲取區塊信息")
+            with self.core_lock:
+                latest_block = copy.deepcopy(self.core_data['block'])
+            self.tx['maxPriorityFeePerGas'] = int(int(self.tx['maxPriorityFeePerGas']) * self.gas_price_multiplier)
+            self.tx['maxFeePerGas'] = int(int(latest_block['baseFeePerGas']) * 2 + self.tx['maxPriorityFeePerGas'])
+
+            gas_price = Decimal(self.tx['maxPriorityFeePerGas'] + self.tx['maxFeePerGas'])
+            gas_price_gwei = gas_price / Decimal('1e9')
+            gas_price_gwei = gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
+
+            tx_formated = pformat(self.tx, indent=2)
+            logger.info(f"鏈上各種校驗\n{tx_formated}")
+            estimated_gas_origin = web3.w3.eth.estimate_gas(self.tx)
+            estimated_gas = int(estimated_gas_origin * self.gas_limit_multiplier)
+            estimated_wei = Decimal(estimated_gas) * gas_price
+            estimated_eth = Decimal(estimated_wei / Decimal('1e18')) / Decimal(2)        # 除以2才是比較接近正常消耗的gas費,否則會過於高估
+            estimated_eth = estimated_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
+
+            msg = f"估算的燃气量: {estimated_gas}, eth消耗: {estimated_eth}, gas price: {gas_price_gwei} gwei, gas估算通過"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+            # step3, 費用與利潤比較
+            estimated_eth_value = estimated_eth * self.eth_price
+            estimated_eth_value = estimated_eth_value.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
+            cost = estimated_eth_value + self.WITHDRAW_FEE          # 成本
+            if self.profit < cost:
+                msg = f"費用判斷不通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
+                logger.info(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                self._set_state(self.STATE_REJECT)
+                return
+            msg = f"費用判斷通過! profit: {self.profit}, eth_value:{estimated_eth_value}, eth: {estimated_eth}, eth_price: {self.eth_price}"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+            # step4, 與賬戶eth餘額比對(至少留0.001,不然沒gas了)
+            MARGIN = Decimal(0.001)
+            # 暫時鎖住core_data
+            with self.core_lock:
+                eth_balance = self.core_data['eth_balance']
+                if eth_balance - estimated_eth < MARGIN:
+                    msg = f"gas餘額判斷不通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
+                    logger.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                    self._set_state(self.STATE_REJECT)
+                    return
+                
+                # 餘額判斷通過后預扣除balance,防止綫程更新不及時導致其他綫程誤發送tx
+                self.core_data['eth_balance'] = self.core_data['eth_balance'] - estimated_eth
+            msg = f"gas餘額判斷通過! MARGIN:{MARGIN}, estimated_eth: {estimated_eth}, eth_balance: {eth_balance}"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+            # final, 設定交易狀態,開始交易
+            self._set_state(self.STATE_SELLING_ON_EXCHANGE)
+        except Exception as e:
+            exc_traceback = traceback.format_exc()
+            msg = f"前置檢查未通過\n{exc_traceback}"
+            logger.error(msg)
+
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state(self.STATE_REJECT)
+
+            # traceback.print_exc()
+
+    # 以下是每个状态对应的具体执行函数
+    def _execute_sell_on_exchange(self):
+        """
+        在中心化交易所卖出现货
+        """
+        msg = "执行:中心化交易所卖出现货..."
+        logger.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+        try:
+            # 第一步直接卖出,这个数量用固定数量
+            pseudo_amount_to_sell = self.exchange_sell_amount
+            # 处理精度
+            pseudo_amount_to_sell = pseudo_amount_to_sell.quantize(Decimal('1'), rounding=ROUND_DOWN)
+            
+            order_params = {
+                "symbol": self.symbol.replace('_', ''),
+                "side": "SELL",
+                "type": "MARKET",
+                "quantity": int(pseudo_amount_to_sell),
+            }
+            order_params_formated = pformat(order_params, indent=2)
+            
+            exchange_sell_order = mexc.trade.post_order(order_params)
+            exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
+
+            msg = f"交易所现货卖出订单已发送 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
+            if 'orderId' not in exchange_sell_order:
+                logger.error(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                self._set_state(self.STATE_FAILED)
+
+                return
+
+            self.exchange_sell_order_id = exchange_sell_order['orderId']
+            
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+            self._set_state(self.STATE_WAITING_SELL_CONFIRM)
+        except Exception as e:
+            exc_traceback = traceback.format_exc()
+            msg = f"交易所现货卖出下单失败\n{exc_traceback}"
+            logger.error(msg)
+
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state(self.STATE_FAILED)
+
+            # traceback.print_exc()
+
+    def _wait_sell_confirm(self):
+        """
+        等待交易所现货卖出订单确认(完全成交)
+        """
+        exchange_sell_order_id = self.exchange_sell_order_id
+        msg = f"等待交易所现货卖出订单确认:{exchange_sell_order_id}"
+        logger.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+
+        try:
+            # 查询交易所订单状态
+            waiting_times = 30
+            last_order = None
+            while waiting_times > 0:
+                params = {
+                    "symbol": self.symbol.replace('_', ''),
+                    "orderId": exchange_sell_order_id
+                }
+                order = mexc.trade.get_order(params)
+                last_order = order
+
+                if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
+                    money = Decimal(order['cummulativeQuoteQty'])
+                    amount = self.exchange_sell_amount
+
+                    self.sell_price = money / amount
+                    self.sell_price = self.sell_price.quantize(Decimal('1e-16'), rounding=ROUND_DOWN)
+
+                    order_formated = pformat(order, indent=2)
+                    msg = f"交易所现货卖出订单已完成, 价格:{self.sell_price}, money: {money}\n order: {order_formated}"
+                    logger.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                    self.exchange_withdrawal_amount = money
+
+                    self._set_state(self.STATE_BUYING_ON_CHAIN)
+                    return
+                else:
+                    time.sleep(1)
+                    waiting_times = waiting_times - 1
+
+            last_order_formated = pformat(last_order, indent=2)
+            msg = f"交易所现货卖出订单失敗, 最後狀態:\n{last_order_formated}。"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state(self.STATE_FAILED)
+        except Exception as e:
+            exc_traceback = traceback.format_exc()
+            msg = f"查询交易所现货卖出订单状态时发生错误\n{exc_traceback}"
+            logger.error(msg)
+
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state(self.STATE_FAILED)
+
+            # traceback.print_exc()
+
+    def _execute_buy_on_chain(self):
+        """
+        在链上执行买入操作
+        """
+        msg = "执行:链上买入操作..."
+        logger.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+        try:
+            # 交易前nonce
+            with self.core_lock:
+                self.tx['nonce'] = self.core_data['nonce']
+            
+            # 调用链上客户端执行买入交易
+            signed_tx = web3._sign(self.tx, self.gas_limit_multiplier)
+            self.chain_tx_hash = web3.w3.to_hex(signed_tx.hash)
+            try:
+                web3.w3.eth.send_raw_transaction(signed_tx.raw_transaction)
+            except Exception as e:
+                msg = f"据反饋說链上买入失败:{e}, 交易哈希:{self.chain_tx_hash}"
+                logger.error(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
+            # 交易成功后刷新全局nonce
+            with self.core_lock:
+                self.core_data['nonce'] = self.core_data['nonce'] + 1
+                block_number = self.core_data['block_number']
+            
+            # 將hash放入pending裏,等待確認
+            with self.pending_lock:
+                self.pending_data[self.chain_tx_hash] = {
+                    "block_number": block_number,
+                    "tx_details": None,
+                    "reponse": None,
+                }
+
+            # 交易成功
+            msg = f"再次確認交易是否上鏈:{self.chain_tx_hash}"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+            self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
+        except Exception as e:
+            exc_traceback = traceback.format_exc()
+            msg = f"鏈上買入未處理的錯誤, 交易哈希:{self.chain_tx_hash}\n{exc_traceback}"
+            logger.error(msg)
+
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state(self.STATE_WAITING_CHAIN_CONFIRM)
+
+            # traceback.print_exc()
+
+    def _wait_chain_confirm(self):
+        """
+        等待链上交易确认
+        """
+
+        chain_tx_hash = self.chain_tx_hash
+        msg = f"等待链上交易确认:{chain_tx_hash}"
+        logger.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+        try:
+            # 給300秒時間進行確認
+            waiting_times = 300
+            while waiting_times > 0:
+                with self.pending_lock:
+                    tx_details = copy.deepcopy(self.pending_data[chain_tx_hash]['tx_details'])
+
+                if tx_details is None:
+                    waiting_times = waiting_times - 1
+                    time.sleep(1)
+
+                    continue
+
+                # # 交易確認后,移除出pending列表
+                # with self.pending_lock:
+                #     del self.pending_data[chain_tx_hash]
+
+                # 交易失敗的邏輯處理,直接進行回滾
+                if 'fromTokenDetails' not in tx_details \
+                or 'toTokenDetails' not in tx_details:
+                    msg = f"链上交易失敗。{tx_details}"
+                    logger.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
+                    self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)                    
+
+                    break
+
+                tx_details_formated = pformat(tx_details, indent=2)
+                msg = f"链上交易已确认。\n details: {tx_details_formated}"
+                logger.info(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                # 獲取交易信息
+                from_token_details = tx_details['fromTokenDetails']
+                to_token_details = tx_details['toTokenDetails']
+
+                from_token_amount = Decimal(from_token_details['amount'])
+                from_token_amount_human = from_token_amount / (Decimal(10) ** self.from_token_decimal)
+                from_token_amount_human = from_token_amount_human.quantize(Decimal('1e-2'), rounding=ROUND_DOWN)
+                self.chain_buy_amount = from_token_amount_human # 存储实际买入数量
+
+                to_token_amount = Decimal(to_token_details['amount'])
+                to_token_amount_human = to_token_amount / (Decimal(10) ** self.to_token_decimal)
+
+                self.buy_price = from_token_amount_human / to_token_amount_human
+                self.buy_price = self.buy_price.quantize(Decimal('1e-16'), rounding=ROUND_DOWN)               
+
+                # 交易預估利潤百分比計算
+                rate = self.sell_price / self.buy_price
+                rate = rate.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
+
+                msg = f"【比率{rate}】。用{from_token_amount_human}买入{to_token_amount_human},价格{self.buy_price}。"
+                logger.info(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                # 判斷快速二賣條件
+                diff = int(to_token_amount_human - self.exchange_sell_amount)
+                value = diff * self.sell_price
+                value = value.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
+                if value > 2:
+                    msg = f"滿足二賣條件,{diff}*{self.sell_price} = {value}"
+                    logger.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                    order_params = {
+                        "symbol": self.symbol.replace('_', ''),
+                        "side": "SELL",
+                        "type": "MARKET",
+                        "quantity": int(diff),
+                    }
+                    order_params_formated = pformat(order_params, indent=2)
+                    exchange_sell_order = mexc.trade.post_order(order_params)
+                    exchange_sell_order_formated = pformat(exchange_sell_order, indent=2)
+
+                    if 'orderId' not in exchange_sell_order:
+                        msg = f"交易所现货二卖下单失败 \n params:{order_params_formated} \n rst: {exchange_sell_order_formated}"
+                        logger.error(msg)
+                        add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                    else:
+                        oid = exchange_sell_order['orderId']
+
+                        # 查询交易所订单状态
+                        waiting_times_inner = 30
+                        last_order = None
+                        while waiting_times_inner > 0:
+                            params = {
+                                "symbol": self.symbol.replace('_', ''),
+                                "orderId": oid
+                            }
+                            order = mexc.trade.get_order(params)
+                            order_formated = pformat(order, indent=2)
+                            last_order = order
+
+                            if order['status'] in ["FILLED", "PARTIALLY_CANCELED"]:
+                                money = Decimal(order['cummulativeQuoteQty'])
+
+                                msg = f"交易所现货二卖订单已完成, money: {money}。\n order: {order_formated}"
+                                logger.info(msg)
+                                add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                                self.exchange_withdrawal_amount = self.exchange_withdrawal_amount + money
+
+                                break
+                            else:
+                                time.sleep(1)
+                                waiting_times_inner = waiting_times_inner - 1
+
+                        if waiting_times_inner <= 0:
+                            last_order_formated = pformat(last_order, indent=2)
+
+                            msg = f"交易所现货二卖订单失敗, 最後狀態:{last_order_formated}。"
+                            logger.info(msg)
+                            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")     
+                else:
+                    msg = f"不滿足二賣條件,{diff}*{self.sell_price} = {value}"
+                    logger.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
+                # 計算實際利潤
+                actual_profit = value
+                actual_gas_price = Decimal(tx_details['gasPrice'])
+                actual_gas_price_gwei = actual_gas_price / Decimal('1e9')
+                actual_gas_price_gwei = actual_gas_price_gwei.quantize(Decimal('1e-9'), rounding=ROUND_DOWN)
+                actual_gas_used = Decimal(tx_details['gasUsed'])
+                actual_wei = actual_gas_price * actual_gas_used
+                actual_eth = actual_wei / Decimal('1e18')
+                actual_eth = actual_eth.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
+                actual_fee_used = actual_eth * self.eth_price
+                actual_fee_used = actual_fee_used.quantize(Decimal('1e-4'), rounding=ROUND_DOWN)
+
+                actual_profit = value - actual_fee_used - self.WITHDRAW_FEE
+
+                msg = f"【最終利潤】{actual_profit}{self.base_coin}(已扣除所有手續費、滑點)\
+                \n鏈上ETH使用: {actual_eth}({actual_fee_used} USD), gas_price: {actual_gas_price_gwei} GWEI, gas_used: {actual_gas_used}\
+                \n交易所出售代幣利潤: {value}, 提現手續費: {self.WITHDRAW_FEE}\
+                "
+                logger.info(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                self._set_state(self.STATE_COMPLETED)
+
+                break
+            
+            # 如果300秒都沒確認成功,該交易大概率沒有上鏈
+            if waiting_times <= 0:
+                with self.pending_lock:
+                    response = copy.deepcopy(self.pending_data[chain_tx_hash]['response'])
+                response_formated = pformat(response, indent=2)
+                msg = f"链上交易确认失败:{chain_tx_hash}\n{response_formated}"
+                logger.error(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
+        except Exception as e:
+            exc_traceback = traceback.format_exc()
+            msg = f"查询链上确认状态时发生错误\n{exc_traceback}"
+            logger.error(msg)
+
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state(self.STATE_WAITING_EXCHANGE_ROLLBACK)
+
+            # traceback.print_exc()
+
+    def _wait_exchange_rollback(self):
+        """
+        市价进行交易所交易回滚
+        """
+        msg = "执行:中心化交易所买入现货回滚..."
+        logger.info(msg)
+        add_state_flow_entry(self.process_item, self.current_state, msg, "pending")
+        try:
+            # 使用预提现数量进行回滚
+            pseudo_amount_to_buy = Decimal(self.exchange_withdrawal_amount)
+            # 处理精度
+            pseudo_amount_to_buy = pseudo_amount_to_buy.quantize(Decimal('1'), rounding=ROUND_DOWN)
+
+            # 交易所U余额判断
+            with self.mexc_lock:
+                balances = self.mexc_data['account_info']['balances']
+                for balance in balances:
+                    if balance['asset'] == self.base_coin:
+                        pseudo_amount_to_buy = min(Decimal(balance['free']), pseudo_amount_to_buy)
+
+                        if pseudo_amount_to_buy < Decimal('10'):
+                            msg = f"交易所剩余{self.base_coin}: {balance['free']}, 小于10, 不能触发回滚交易。"
+                            logger.info(msg)
+                            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                            self._set_state(self.STATE_FAILED)
+                            return
+                        else:
+                            msg = f"交易所剩余{self.base_coin}: {balance['free']}, 交易所准备使用:{pseudo_amount_to_buy}, 余额校验通过(可以回滚)。"
+                            logger.info(msg)
+                            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+                            break
+            
+            order_params = {
+                "symbol": self.symbol.replace('_', ''),
+                "side": "BUY",
+                "type": "MARKET",
+                "quoteOrderQty": int(pseudo_amount_to_buy),
+            }
+            order_params_formated = pformat(order_params, indent=2)
+            exchange_buy_order = mexc.trade.post_order(order_params)
+            exchange_buy_order_formated = pformat(exchange_buy_order, indent=2)
+            if 'orderId' not in exchange_buy_order:
+                msg = f"【回滚】交易所现货买入下单失败\n params:{order_params_formated}\norder: {exchange_buy_order_formated}"
+                logger.error(msg)
+                add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+                self._set_state("FAILED")
+
+                return
+
+            exchange_buy_order_id = exchange_buy_order['orderId']
+            
+            msg = f"【回滚】交易所现货买入订单已发送, 订单ID: {exchange_buy_order_id}"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+            # 查询交易所订单状态
+            waiting_times = 30
+            last_query_rst = None
+            while True:
+                params = {
+                    "symbol": self.symbol.replace('_', ''),
+                    "orderId": exchange_buy_order_id
+                }
+                order = mexc.trade.get_order(params)
+                order_formated = pformat(order, indent=2)
+                last_query_rst = order
+
+                if order['status'] == "FILLED":
+                    money = Decimal(order['cummulativeQuoteQty'])
+                    amount = self.exchange_sell_amount
+                    price = money / amount
+                    price = price.quantize(Decimal('1e-8'), rounding=ROUND_DOWN)
+
+                    msg = f"【回滚】交易所现货买入订单已完全成交, 价格:{price}。\norder: {order_formated}"
+
+                    logger.info(msg)
+                    add_state_flow_entry(self.process_item, self.current_state, msg, "success")
+
+                    self._set_state(self.STATE_FAILED)
+                    return
+                else:
+                    # 继续等待成交
+                    pass
+
+                time.sleep(1)
+                waiting_times = waiting_times - 1
+            
+            last_query_rst_formated = pformat(last_query_rst, indent=2)
+            msg = f"【回滚】回滚交易订单查询超时, 订单ID: {exchange_buy_order_id}\n最终状态:{last_query_rst_formated}"
+            logger.info(msg)
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+
+            self._set_state(self.STATE_FAILED)
+        except Exception as e:
+            exc_traceback = traceback.format_exc()
+            msg = f"【回滚】交易所回滚交易失败\n{exc_traceback}"
+            logger.error(msg)
+            
+            add_state_flow_entry(self.process_item, self.current_state, msg, "fail")
+            self._set_state(self.STATE_FAILED)
+
+            # traceback.print_exc()
+
+# 伪代码示例:如何使用这个类
+if __name__ == "__main__":
+    import ok_chain_client
+    import decimal
+    import pprint
+
+    CHAIN_ID = 1
+    FROM_TOKEN = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
+    FROM_TOKEN_AMOUNT_HUMAM = Decimal('20')
+    FROM_TOKEN_DECIMAL = 6
+    TO_TOKEN = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860'
+    USER_WALLET = ''
+    USER_EXCHANGE_WALLET = '0xc71835a042F4d870B0F4296cc89cAeb921a9f3DA'
+    SYMBOL = "RATO_USDT"
+
+    # 询价,注意!!!这里直接把交易所地址当收款方,省去transfer的流程
+    data = ok_chain_client.swap(CHAIN_ID, 
+                                FROM_TOKEN_AMOUNT_HUMAM * (10 ** FROM_TOKEN_DECIMAL), 
+                                FROM_TOKEN, 
+                                TO_TOKEN, 
+                                1, 
+                                USER_WALLET,
+                                USER_EXCHANGE_WALLET,  # 这里直接把交易所地址当收款方,省去transfer的流程!!!
+                                )
+    if data.get('code') != '0' or not data.get('data'):
+            pprint.pprint(data)
+            pprint.pprint({
+                "error": f"OK API错误({1}) - Code:{data.get('code', 'N/A')}, Msg:{data.get('msg', data.get('message', 'N/A')) if isinstance(data, dict) else '格式错误'}"})
+
+            raise Exception("") 
+    d = data['data'][0]
+    tx = d['tx']
+    router_result = d['routerResult']
+    in_dec, out_dec = int(router_result['fromToken']['decimal']), int(router_result['toToken']['decimal'])
+    atomic_in_base, atomic_out_target = Decimal(router_result['fromTokenAmount']), Decimal(router_result['toTokenAmount'])
+    human_in_base = atomic_in_base / (10 ** in_dec)
+    human_out_target = atomic_out_target / (10 ** out_dec)
+
+    FROM_TOKEN_AMOUNT_HUMAM = human_in_base
+    TO_TOKEN_AMOUNT_HUMAM = human_out_target
+
+    pprint.pprint(tx)
+
+    # 套利流程执行
+    process_item = {
+        "stateFlow": [], # 状态流转记录
+    }
+    ap = ArbitrageProcess(tx, 2, 1.2, 
+                                        FROM_TOKEN, TO_TOKEN, 
+                                        FROM_TOKEN_AMOUNT_HUMAM, TO_TOKEN_AMOUNT_HUMAM,
+                                        USER_EXCHANGE_WALLET, USER_WALLET,
+                                        SYMBOL, process_item)
+
+    # 一般都是从这个流程开始,测试时可以稍作修改、测试后续流程
+    ap._set_state(ap.SELLING_ON_EXCHANGE)
+
+    # 在主循环中周期性调用 run_arbitrage_step
+    while ap.current_state != "COMPLETED" and ap.current_state != "FAILED":
+        ap.run_arbitrage_step()
+
+        if ap.current_state == ap.STATE_WAITING_TRANSFER_ARRIVE or ap.current_state == ap.STATE_WAITING_WITHDRAWAL_CONFIRM:
+            time.sleep(10)
+        # else:
+        #     time.sleep(1)
+
+    logger.info(process_item)
+    if ap.current_state == "COMPLETED":
+        logger.info("套利流程执行成功!")
+    else:
+        logger.info("套利流程执行失败!")

+ 5 - 5
submit_process_demo.py

@@ -20,13 +20,13 @@ def create_mock_arbitrage_data():
     CHAIN_ID = 1
     IN_TOKEN_ADDRESS = '0xdAC17F958D2ee523a2206206994597C13D831ec7' # USDT on Ethereum
     IN_TOKEN_DECIMALS = 6
-    EXCHANGE_OUT_AMOUNT = Decimal(2100000)
-    IN_AMOUNT_TO_QUERY = Decimal(12)
-    OUT_TOKEN_ADDRESS = '0xf816507E690f5Aa4E29d164885EB5fa7a5627860' # RATO on Ethereum
+    EXCHANGE_OUT_AMOUNT = Decimal(2500)
+    IN_AMOUNT_TO_QUERY = Decimal(25)
+    OUT_TOKEN_ADDRESS = '0x9eAeBd7E73D97E78c77fAB743e6FFA1b550e224c' # RXS on Ethereum
     USER_WALLET = wallet["user_wallet"]
     USER_EXCHANGE_WALLET = wallet["user_exchange_wallet"]
     SLIPPAGE = 1
-    MEXC_TARGET_PAIR_USDT = 'RATO_USDT' # MEXC 现货交易对
+    MEXC_TARGET_PAIR_USDT = 'RXS_USDT' # MEXC 现货交易对
 
     # 询价,注意!!!这里直接把交易所地址当收款方,省去transfer的流程
     data = ok_chain_client.swap(CHAIN_ID, 
@@ -51,7 +51,7 @@ def create_mock_arbitrage_data():
     # 构造提交给 arb_executor 的数据体
     data = {
         "tx": tx,
-        "profit": str(0),
+        "profit": str(5),
         "profitLimit": str(0),
         "symbol": MEXC_TARGET_PAIR_USDT,
         "fromToken": IN_TOKEN_ADDRESS,

+ 3 - 1
toto.readme

@@ -46,8 +46,10 @@
 [-] 测试延迟开单
 
 2025-06-25
+[ ] 备用节点广播
 [ ] 另一個方向
 
 有時間再做
-[ ] zeus昨晚上有一个状态识别bug,导致单边
+[ ] 日志分离,否则出现日志文件锁问题
+[-] zeus昨晚上有一个状态识别bug,导致单边(因为精度问题导致)
 [ ] approve自動